Source code for rtm_wrapper.execution

"""
Simulation sweep execution.

Executor objects are responsible for repeatedly running simulations with a specific
RTM engine according to a sweep specification.
"""

from __future__ import annotations

import abc
import base64
import concurrent.futures
import contextlib
import datetime
import gzip
import itertools
import logging
import math
import multiprocessing
import operator
import os
import pathlib
import pickle
import tempfile
from abc import ABC
from collections.abc import Sequence
from typing import Any, Callable, Literal

import numpy as np
import xarray as xr
from typing_extensions import Never

import rtm_wrapper.util as rtm_util
from rtm_wrapper.engines.base import EngineOutputs, RTMEngine
from rtm_wrapper.simulation import SweepSimulation


[docs] class SweepExecutor(abc.ABC): """Base class for simulation executors."""
[docs] @abc.abstractmethod def run( self, sweep: SweepSimulation, engine: RTMEngine, **kwargs: Any, ) -> None: """Execute a sweep simulation using the given engine."""
[docs] @abc.abstractmethod def collect_results(self) -> xr.Dataset: """Collect the results from the last run sweep."""
[docs] class LocalMemoryExecutor(SweepExecutor, ABC): """ Base class for executors that store the entire simulation results in a local, in-memory ``xarray.Dataset``. """ _results: xr.Dataset | None def __init__(self) -> None: self._results = None
[docs] def steps_for(self, sweep: SweepSimulation) -> int: """ Return a forecast for the number of times ``step_callback`` will be called if the given simulation is pass to ``run``. """ return sweep.sweep_size
[docs] def run( self, sweep: SweepSimulation, engine: RTMEngine, *, step_callback: Callable[..., None] | None = None, **kwargs: Any, ) -> None: self._allocate_results_like(sweep.sweep_spec, engine) assert self._results is not None # for type checker sim_start = datetime.datetime.now().astimezone().isoformat() self._run(sweep, engine, step_callback=step_callback, **kwargs) sim_end = datetime.datetime.now().astimezone().isoformat() engine_type = type(engine) # Populate metadata attributes self._results = self._results.assign_attrs( { "version": rtm_util.build_version(), "platform": rtm_util.platform_summary(), "engine": f"{engine_type.__module__}.{engine_type.__qualname__}", "base_repr": repr(sweep.base), "base_pzb64": base64.b64encode( gzip.compress(pickle.dumps(sweep.base)) ).decode(), "sim_start": sim_start, "sim_end": sim_end, } )
@abc.abstractmethod def _run( self, sweep: SweepSimulation, engine: RTMEngine, *, step_callback: Callable[..., None] | None = None, **kwargs: Any, ) -> None: ...
[docs] def collect_results(self) -> xr.Dataset: if self._results is None: raise ValueError("no simulations have been run yet") return self._results
def _allocate_results_like( self, sweep_spec: xr.Dataset, engine: RTMEngine, ) -> None: data_vars = {} sweep_dims = sweep_spec.indexes.dims for output_name in engine.requested_outputs: output_type = engine.outputs._dtypes[output_name] output_metadata = engine.outputs._metadata[output_name] if output_type == np.dtype(object): raise RuntimeError( f"cannot sweep with output '{output_name}' of object type" ) # TODO: tidy add mechanism to report dtypes for virtual outputs? if output_type is Never: raise RuntimeError( f"sweeping with virtual output ('{output_name}') not yet supported. " f"As a quick-fix, register an extracted output that mirrors it." ) data_vars[output_name] = ( # All output variables have the same shape as the input grid. tuple(sweep_dims.keys()), np.empty(tuple(sweep_dims.values()), dtype=output_type), # Add output metadata as attributes. output_metadata, ) self._results = xr.Dataset(data_vars, coords=sweep_spec.coords)
[docs] class SerialExecutor(LocalMemoryExecutor): """Executor that runs simulations in series.""" def _run( self, sweep: SweepSimulation, engine: RTMEngine, *, step_callback: Callable[[tuple[int, ...]], None] | None = None, **kwargs: Any, ) -> None: assert self._results is not None # for type checker if kwargs: raise ValueError(f"unknown kwargs {kwargs}") for idx in np.ndindex(sweep.sweep_shape): inputs = sweep[idx] out = engine.run_simulation(inputs) # type: ignore for output_name in self._results.keys(): self._results.data_vars[output_name][idx] = out[output_name] # type: ignore if step_callback is not None: step_callback(idx)
[docs] class ConcurrentExecutor(LocalMemoryExecutor): """ Executor the launches simulations in concurrent worker threads. This executor is designed to take advantage of engines that release the GIL while running. .. warning:: This executor assumes that the provided engine's ``run_simulation`` method is thread-safe. All worker threads operate on the same engine instance. Make sure that the provided engine *does not* mutate itself or any global state without appropriate locking. """ _max_workers: int def __init__( self, max_workers: int | None = None, ) -> None: super().__init__() if max_workers is None: # Override ThreadPoolExecutor's default. # Engines tend to be CPU bound (not I/O bound), so the default of more # threads than cores is detrimental. max_workers = multiprocessing.cpu_count() self._max_workers = max_workers def _run( self, sweep: SweepSimulation, engine: RTMEngine, *, step_callback: Callable[[tuple[int, ...]], None] | None = None, on_error: Literal["ignore", "abort"] = "abort", **kwargs: Any, ) -> None: assert self._results is not None # for type checker if kwargs: raise ValueError(f"unknown kwargs {kwargs}") logger = logging.getLogger(__name__) # Execute simulations in worker threads. # This is fast so long as the engine releases the GIL while running. with concurrent.futures.ThreadPoolExecutor( max_workers=self._max_workers ) as executor: def target(idx: tuple[int, ...]) -> EngineOutputs: return engine.run_simulation(sweep[idx]) # type: ignore futures_to_index = { executor.submit(target, idx): idx for idx in np.ndindex(sweep.sweep_shape) } for future in concurrent.futures.as_completed(futures_to_index): idx = futures_to_index[future] try: out = future.result() for output_name in self._results.keys(): self._results.variables[output_name][idx] = out[output_name] # type: ignore except Exception as ex: error_input = sweep[idx] logger.error( "exception occurred when running simulation with input=%r, idx=%r", error_input, idx, exc_info=ex, ) if on_error == "abort": raise if step_callback is not None: step_callback(idx)
[docs] class ParallelConcurrentExecutor(LocalMemoryExecutor): """ Executor that runs multiple :class:`.ConcurrentExecutor`\\s in spawned subprocesses. This can improve performance over ``ConcurrentExecutor`` when simulation sweeps are Python bounded, which can happen when individual simulator runs are fast or when many simulation works are used. """ _split_dim: str | None = None _split_sections: int | Sequence[int] | None = None _max_managers: int | None _max_workers: int | None def __init__( self, split_dim: str | None = None, split_sections: int | Sequence[int] | None = None, max_managers: int | None = None, max_workers: int | None = None, ) -> None: self._split_dim = split_dim self._split_sections = split_sections self._max_managers = max_managers self._max_workers = max_workers super().__init__()
[docs] def steps_for(self, sweep: SweepSimulation) -> int: if self._split_sections is not None: if isinstance(self._split_sections, int): return self._split_sections else: return len(self._split_sections) return os.cpu_count()
def _allocate_results_like(self, sweep_spec: xr.Dataset, engine: RTMEngine) -> None: # Delay results allocation to the end of _run. self._results = xr.Dataset() def _run( self, sweep: SweepSimulation, engine: RTMEngine, *, work_directory: pathlib.Path | str | None = None, step_callback: Callable[[pathlib.Path], None] | None = None, **kwargs: Any, ) -> None: if self._split_dim is None: dim = max(sweep.dims.items(), key=operator.itemgetter(1))[0] else: dim = self._split_dim sections: int | Sequence[int] if self._split_sections is None: sections = os.cpu_count() else: sections = self._split_sections # TODO consider using lazy splitting split_sweeps = list(sweep.split(sections, dim)) if work_directory is not None: dir_ctx = contextlib.nullcontext(work_directory) else: dir_ctx = tempfile.TemporaryDirectory() # type: ignore with dir_ctx as work_dir: self._run_sims( split_sweeps, engine, dim, pathlib.Path(work_dir), step_callback ) def _run_sims( self, sweeps: list[SweepSimulation], engine: RTMEngine, concat_dim: str, work_directory: pathlib.Path, step_callback: Callable[[pathlib.Path], None] | None = None, ) -> None: file_prefix = datetime.datetime.now().astimezone().isoformat() ndigits = math.floor(math.log10(len(sweeps)) + 1) file_gen = ( work_directory.joinpath(f"{file_prefix}_{i:0{ndigits}d}.nc") for i in itertools.count() ) work_directory.mkdir(exist_ok=True) context = multiprocessing.get_context("spawn") with concurrent.futures.ProcessPoolExecutor( max_workers=self._max_managers, mp_context=context, ) as executor: futures_to_file: dict[concurrent.futures.Future[None], pathlib.Path] = { executor.submit( _parallel_sim_target, swp, engine, save_file, self._max_workers, ): save_file for (swp, save_file) in zip(sweeps, file_gen) } # TODO any handling? for future in concurrent.futures.as_completed(futures_to_file): save_file = futures_to_file[future] try: future.result() except Exception: raise if step_callback is not None: step_callback(save_file) ds = [xr.load_dataset(file) for file in futures_to_file.values()] self._results = xr.combine_nested( ds, concat_dim=concat_dim, compat="equals", join="exact", combine_attrs="drop", )
def _parallel_sim_target( sweep: SweepSimulation, engine: RTMEngine, save_file: pathlib.Path, max_workers: int | None, ) -> None: executor = ConcurrentExecutor(max_workers=max_workers) executor.run(sweep, engine) results = executor.collect_results() results.to_netcdf(save_file)