Source code for geophires_x_client
import atexit
import os
import sys
import threading
from multiprocessing import Manager
from multiprocessing import current_process
from pathlib import Path
# noinspection PyPep8Naming
from geophires_x import GEOPHIRESv3 as geophires
from .common import _get_logger
from .geophires_input_parameters import GeophiresInputParameters
from .geophires_input_parameters import ImmutableGeophiresInputParameters
from .geophires_x_result import GeophiresXResult
[docs]
class GeophiresXClient:
"""
A thread-safe and process-safe client for running GEOPHIRES simulations.
It automatically manages a background process via atexit and provides an
explicit shutdown() method for advanced use cases like testing.
"""
# --- Class-level shared resources ---
_manager = None
_cache = None
_lock = None
_init_lock = threading.Lock()
"""A standard threading lock to make the one-time initialization thread-safe."""
def __init__(self, enable_caching=False, logger_name=None):
if logger_name is None:
logger_name = __name__
self._logger = _get_logger(logger_name=logger_name)
self._enable_caching = enable_caching
if enable_caching and GeophiresXClient._manager is None:
# Lazy-initialize shared resources if they haven't been already.
self._initialize_shared_resources()
@classmethod
def _initialize_shared_resources(cls):
"""
Initializes the multiprocessing Manager and shared resources in a
thread-safe and now process-safe manner. It also registers the
shutdown hook to ensure automatic cleanup on application exit.
"""
# Ensure that only the top-level user process can create the manager.
# A spawned child process, which re-imports this script, will have a different name
# (e.g., 'Spawn-1') and will skip this entire block, preventing a recursive crash.
if current_process().name == 'MainProcess':
with cls._init_lock:
if cls._manager is None:
cls._logger = _get_logger(__name__) # Add a logger for this class method
cls._logger.debug('MainProcess is creating the shared multiprocessing manager...')
cls._manager = Manager()
cls._cache = cls._manager.dict()
cls._lock = cls._manager.RLock()
# Register the shutdown method to be called automatically on exit.
atexit.register(cls.shutdown)
[docs]
@classmethod
def shutdown(cls):
"""
Explicitly shuts down the background manager process and de-registers
the atexit hook to prevent errors if called multiple times.
This is useful for test suites or applications that need to precisely
control the resource lifecycle.
"""
with cls._init_lock:
if cls._manager is not None:
cls._logger = _get_logger(__name__)
cls._logger.debug('Shutting down the shared multiprocessing manager...')
cls._manager.shutdown()
# De-register the hook to avoid trying to shut down twice.
try:
atexit.unregister(cls.shutdown)
except Exception as e:
# Fails in some environments (e.g. pytest), but is not critical
cls._logger.debug(
f'Encountered exception shutting down the shared multiprocessing manager (OK): ' f'{e!s}'
)
cls._manager = None
cls._cache = None
cls._lock = None
[docs]
def get_geophires_result(self, input_params: GeophiresInputParameters) -> GeophiresXResult:
"""
Calculates a GEOPHIRES result, using a cross-process cache to avoid
re-computing results for the same inputs. Caching is only effective
when providing an instance of ImmutableGeophiresInputParameters.
"""
is_immutable = isinstance(input_params, ImmutableGeophiresInputParameters)
if not (self._enable_caching and is_immutable and GeophiresXClient._manager is not None):
return self._run_simulation(input_params)
cache_key = hash(input_params)
with GeophiresXClient._lock:
if cache_key in GeophiresXClient._cache:
# self._logger.debug(f'Cache hit for inputs: {input_params}')
return GeophiresXClient._cache[cache_key]
# Cache miss
result = self._run_simulation(input_params)
GeophiresXClient._cache[cache_key] = result
return result
def _run_simulation(self, input_params: GeophiresInputParameters) -> GeophiresXResult:
"""Helper method to encapsulate the actual GEOPHIRES run."""
stash_cwd = Path.cwd()
stash_sys_argv = sys.argv
sys.argv = ['', input_params.as_file_path(), input_params.get_output_file_path()]
try:
geophires.main(enable_geophires_logging_config=False)
except Exception as e:
raise RuntimeError(f'GEOPHIRES encountered an exception: {e!s}') from e
except SystemExit:
raise RuntimeError('GEOPHIRES exited without giving a reason') from None
finally:
sys.argv = stash_sys_argv
os.chdir(stash_cwd)
self._logger.info(f'GEOPHIRES-X output file: {input_params.get_output_file_path()}')
result = GeophiresXResult(input_params.get_output_file_path())
return result