Skip to content

ssh_process_lifetime_manager_from_forked_process

drunc.processes.ssh_process_lifetime_manager_from_forked_process

Provides a subprocess-isolated wrapper around SSHProcessLifetimeManagerShell.

All method calls are forwarded to an SSHProcessLifetimeManagerShell instance running in a dedicated child process via multiprocessing queues, keeping SSH activity and threading state fully isolated from the parent process.

Classes

SSHProcessLifetimeManagerShellOnForkedProcess(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

A ProcessLifetimeManager that delegates process management to an SSHProcessLifetimeManagerShell instance running in a dedicated child process.

All public methods forward their call to the child process via multiprocessing queues and block until a response is received. This isolates SSH connections, threads, and file-descriptor state from the parent process, which is useful when the parent uses fork-unsafe libraries or needs clean process boundaries.

_call() is safe to invoke from multiple threads simultaneously. Pending requests are matched to their responses by unique request IDs, and a dedicated response-dispatcher thread routes each response to the correct waiting caller without any cross-caller interference.

Initialise the forked-process manager and start the child process immediately.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts.

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost connections only.

False
logger Optional[Logger]

Logger instance used by the parent process. The child creates its own independent logger.

None
on_process_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Optional callback invoked in the parent process when a managed process exits. Signature: (uuid, exit_code, exception). The exception is reconstructed as a RuntimeError from the serialised message forwarded by the child process.

None
Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[int], Optional[Exception]], None]
    ] = None,
) -> None:
    """
    Initialise the forked-process manager and start the child process immediately.

    Args:
        disable_host_key_check:
            Disable SSH host key verification for all hosts.
        disable_localhost_host_key_check:
            Disable SSH host key verification for localhost connections only.
        logger:
            Logger instance used by the parent process. The child creates
            its own independent logger.
        on_process_exit:
            Optional callback invoked in the *parent* process when a managed
            process exits. Signature: (uuid, exit_code, exception).
            The exception is reconstructed as a RuntimeError from the
            serialised message forwarded by the child process.
    """
    self.log = logger if logger is not None else get_logger(__name__)
    self._on_process_exit = on_process_exit

    # Queues for IPC between parent and child.
    self._request_queue: multiprocessing.Queue = multiprocessing.Queue()
    self._response_queue: multiprocessing.Queue = multiprocessing.Queue()
    self._callback_queue: multiprocessing.Queue = multiprocessing.Queue()

    # Dedicated queue for log records forwarded from the child process.
    # A QueueListener in the parent drains this queue and dispatches
    # records to drunc hierarchy handlers, with root fallback.
    self._log_queue: multiprocessing.Queue = multiprocessing.Queue()
    self._log_listener = logging.handlers.QueueListener(
        self._log_queue,
        *_resolve_parent_log_handlers(),
        respect_handler_level=True,
    )

    # Maps request_id -> dict with event + result storage, used to match
    # asynchronous queue responses back to their blocking callers.
    self._pending: Dict[str, Dict[str, Any]] = {}
    self._pending_lock = threading.Lock()

    # Start the child process.
    self._worker = multiprocessing.Process(
        target=_worker_process_main,
        args=(
            self._request_queue,
            self._response_queue,
            self._callback_queue,
            self._log_queue,
            disable_host_key_check,
            disable_localhost_host_key_check,
        ),
        daemon=True,
        name="SSHLifetimeManagerWorker",
    )
    self._worker.start()
    self.log.debug(
        f"SSHProcessLifetimeManagerShell worker process started (PID {self._worker.pid})"
    )

    # Start listener thread only after forking worker to avoid forking
    # while this parent thread is already running.
    self._log_listener.start()

    # Background thread that routes response messages to their waiting callers.
    self._response_dispatcher = threading.Thread(
        target=self._run_response_dispatcher,
        name="SSHLifetimeManagerResponseDispatcher",
        daemon=True,
    )
    self._response_dispatcher.start()

    # Background thread that invokes on_process_exit for events forwarded
    # by the child via the callback queue.
    self._callback_dispatcher = threading.Thread(
        target=self._run_callback_dispatcher,
        name="SSHLifetimeManagerCallbackDispatcher",
        daemon=True,
    )
    self._callback_dispatcher.start()
Functions
crash_process(uuid)

Simulate a process crash by sending SIGKILL without performing any cleanup.

Delegates to the underlying SSHProcessLifetimeManagerShell running in the forked worker process. Sends SIGKILL to the remote process without cleaning up any associated resources, simulating an unexpected crash.

Parameters:

Name Type Description Default
uuid str

Process UUID to crash

required
Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def crash_process(self, uuid: str) -> None:
    """
    Simulate a process crash by sending SIGKILL without performing any cleanup.

    Delegates to the underlying SSHProcessLifetimeManagerShell running in
    the forked worker process. Sends SIGKILL to the remote process without
    cleaning up any associated resources, simulating an unexpected crash.

    Args:
        uuid: Process UUID to crash
    """
    self._call("crash_process", uuid)
get_active_process_keys()

Get list of active process UUIDs from the child process.

Returns:

Type Description
List[str]

List of active process UUID strings.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def get_active_process_keys(self) -> List[str]:
    """
    Get list of active process UUIDs from the child process.

    Returns:
        List of active process UUID strings.
    """
    return self._call("get_active_process_keys")
get_process_stderr(uuid)

Get accumulated stderr from a managed process.

Parameters:

Name Type Description Default
uuid str

Process UUID.

required

Returns:

Type Description
Optional[str]

Stderr content as a string, or None if not available.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get accumulated stderr from a managed process.

    Args:
        uuid: Process UUID.

    Returns:
        Stderr content as a string, or None if not available.
    """
    return self._call("get_process_stderr", uuid)
get_process_stdout(uuid)

Get accumulated stdout from a managed process.

Parameters:

Name Type Description Default
uuid str

Process UUID.

required

Returns:

Type Description
Optional[str]

Stdout content as a string, or None if not available.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get accumulated stdout from a managed process.

    Args:
        uuid: Process UUID.

    Returns:
        Stdout content as a string, or None if not available.
    """
    return self._call("get_process_stdout", uuid)
get_remote_pid(uuid)

Retrieve the remote PID for a managed process via the child process.

Delegates to the underlying SSHProcessLifetimeManagerShell running in the forked worker process. The returned RemotePidResult dataclass is picklable and transmitted across the process boundary without special handling.

Parameters:

Name Type Description Default
uuid str

Process UUID to query.

required

Returns:

Type Description
RemotePidResult

RemotePidResult with pid set on success, or reason explaining

RemotePidResult

why the PID is unavailable (e.g. metadata not yet written).

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def get_remote_pid(self, uuid: str) -> RemotePidResult:
    """
    Retrieve the remote PID for a managed process via the child process.

    Delegates to the underlying SSHProcessLifetimeManagerShell running in the
    forked worker process. The returned RemotePidResult dataclass is picklable
    and transmitted across the process boundary without special handling.

    Args:
        uuid: Process UUID to query.

    Returns:
        RemotePidResult with ``pid`` set on success, or ``reason`` explaining
        why the PID is unavailable (e.g. metadata not yet written).
    """
    return self._call("get_remote_pid", uuid)
is_process_alive(uuid)

Check if a managed process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check.

required

Returns:

Type Description
bool

True if the process is alive, False otherwise.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if a managed process is alive.

    Args:
        uuid: Process UUID to check.

    Returns:
        True if the process is alive, False otherwise.
    """
    return self._call("is_process_alive", uuid)
kill_all_processes(process_timeouts=None)

Kill all managed processes.

Parameters:

Name Type Description Default
process_timeouts Optional[Dict[str, float]]

Optional per-UUID timeout overrides in seconds.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping all process UUIDs to their exit codes.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def kill_all_processes(
    self,
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[int]]:
    """
    Kill all managed processes.

    Args:
        process_timeouts: Optional per-UUID timeout overrides in seconds.

    Returns:
        Dictionary mapping all process UUIDs to their exit codes.
    """
    return self._call("kill_all_processes", process_timeouts)
kill_process(uuid, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS)

Kill a remote process and clean up its resources.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate.

required
timeout float

Graceful termination timeout in seconds.

DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

Returns:

Type Description
Optional[int]

Exit code of the terminated process, or None if undetermined.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def kill_process(
    self,
    uuid: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
) -> Optional[int]:
    """
    Kill a remote process and clean up its resources.

    Args:
        uuid:    Process UUID to terminate.
        timeout: Graceful termination timeout in seconds.

    Returns:
        Exit code of the terminated process, or None if undetermined.
    """
    return self._call("kill_process", uuid, timeout)
kill_processes(uuids, process_timeouts=None)

Kill multiple processes in role-based shutdown order.

Parameters:

Name Type Description Default
uuids List[str]

List of process UUIDs to terminate.

required
process_timeouts Optional[Dict[str, float]]

Optional per-UUID timeout overrides in seconds.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping process UUIDs to their exit codes.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def kill_processes(
    self,
    uuids: List[str],
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[int]]:
    """
    Kill multiple processes in role-based shutdown order.

    Args:
        uuids:            List of process UUIDs to terminate.
        process_timeouts: Optional per-UUID timeout overrides in seconds.

    Returns:
        Dictionary mapping process UUIDs to their exit codes.
    """
    return self._call("kill_processes", uuids, process_timeouts)
kill_processes_by_role(role, candidate_uuids, process_timeouts=None)

Kill all processes with the specified role from the candidate UUID list.

Parameters:

Name Type Description Default
role str

Process role to match (e.g. "controller", "application").

required
candidate_uuids List[str]

List of process UUIDs to filter by role.

required
process_timeouts Optional[Dict[str, float]]

Optional per-UUID timeout overrides in seconds.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping terminated process UUIDs to their exit codes.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def kill_processes_by_role(
    self,
    role: str,
    candidate_uuids: List[str],
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[int]]:
    """
    Kill all processes with the specified role from the candidate UUID list.

    Args:
        role:             Process role to match (e.g. "controller", "application").
        candidate_uuids:  List of process UUIDs to filter by role.
        process_timeouts: Optional per-UUID timeout overrides in seconds.

    Returns:
        Dictionary mapping terminated process UUIDs to their exit codes.
    """
    return self._call(
        "kill_processes_by_role", role, candidate_uuids, process_timeouts
    )
pop_early_exit_code(uuid)

Retrieve and remove the exit code of a process that exited unexpectedly.

Parameters:

Name Type Description Default
uuid str

Process UUID.

required

Returns:

Type Description
Optional[int]

Exit code if the process terminated early without being explicitly killed,

Optional[int]

None if still running or not found.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def pop_early_exit_code(self, uuid: str) -> Optional[int]:
    """
    Retrieve and remove the exit code of a process that exited unexpectedly.

    Args:
        uuid: Process UUID.

    Returns:
        Exit code if the process terminated early without being explicitly killed,
        None if still running or not found.
    """
    return self._call("pop_early_exit_code", uuid)
read_log_file(hostname, user, log_file, num_lines=100)

Read the last N lines of a remote log file via the child process.

Parameters:

Name Type Description Default
hostname str

Target hostname.

required
user str

SSH username.

required
log_file str

Remote log file path.

required
num_lines int

Number of lines to retrieve from the end of the file.

100

Returns:

Type Description
List[str]

List of log lines.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def read_log_file(
    self,
    hostname: str,
    user: str,
    log_file: str,
    num_lines: int = 100,
) -> List[str]:
    """
    Read the last N lines of a remote log file via the child process.

    Args:
        hostname:  Target hostname.
        user:      SSH username.
        log_file:  Remote log file path.
        num_lines: Number of lines to retrieve from the end of the file.

    Returns:
        List of log lines.
    """
    return self._call("read_log_file", hostname, user, log_file, num_lines)
shutdown()

Gracefully shut down the child process and all background dispatcher threads.

Sends shutdown sentinels to each queue so the child's event loop and both dispatcher threads exit cleanly. Joins the child process with a short timeout and forcibly terminates it if it does not exit in time.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def shutdown(self) -> None:
    """
    Gracefully shut down the child process and all background dispatcher threads.

    Sends shutdown sentinels to each queue so the child's event loop and both
    dispatcher threads exit cleanly. Joins the child process with a short
    timeout and forcibly terminates it if it does not exit in time.
    """
    self.log.debug("Shutting down SSHProcessLifetimeManager worker process...")

    # Signal the child's event loop to exit.
    try:
        self._request_queue.put(None)
    except Exception:
        pass

    # Signal the dispatcher threads to exit.
    try:
        self._response_queue.put(None)
    except Exception:
        pass

    try:
        self._callback_queue.put(None)
    except Exception:
        pass

    self._worker.join(timeout=5.0)

    if self._worker.is_alive():
        self.log.warning(
            "Worker process did not exit within the timeout; terminating forcibly."
        )
        self._worker.terminate()
        self._worker.join(timeout=2.0)

    # Stop listener last so final child log records are drained.
    try:
        self._log_listener.stop()
    except Exception:
        pass

    self.log.debug("SSHProcessLifetimeManager worker process shut down.")
start_process(uuid, boot_request)

Start a remote process in the child process using the boot request configuration.

The BootRequest protobuf message is serialised to bytes before being sent across the process boundary and deserialised inside the child, avoiding any reliance on protobuf pickling support.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process.

required
boot_request BootRequest

BootRequest containing process configuration.

required

Raises:

Type Description
RuntimeError

If the child process raises during process startup.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process in the child process using the boot request configuration.

    The BootRequest protobuf message is serialised to bytes before being sent
    across the process boundary and deserialised inside the child, avoiding any
    reliance on protobuf pickling support.

    Args:
        uuid:         Unique identifier for this process.
        boot_request: BootRequest containing process configuration.

    Raises:
        RuntimeError: If the child process raises during process startup.
    """
    boot_request_bytes = boot_request.SerializeToString()
    self._call("_start_process_from_bytes", uuid, boot_request_bytes)
validate_host_connection(host, auth_method, user)

Validate an SSH connection to the specified host via the child process.

Parameters:

Name Type Description Default
host str

Target hostname.

required
auth_method str

Authentication method (passed through to inner manager).

required
user str

SSH username.

required

Raises:

Type Description
RuntimeError

If the SSH connection validation fails.

Source code in drunc/processes/ssh_process_lifetime_manager_from_forked_process.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str,
) -> None:
    """
    Validate an SSH connection to the specified host via the child process.

    Args:
        host:        Target hostname.
        auth_method: Authentication method (passed through to inner manager).
        user:        SSH username.

    Raises:
        RuntimeError: If the SSH connection validation fails.
    """
    self._call("validate_host_connection", host, auth_method, user)

Functions