Skip to content

ssh_process_lifetime_manager_shell

drunc.processes.ssh_process_lifetime_manager_shell

Provides SSH connection and lifetime management using sh library to invoke shell commands over SSH.

Classes

ProcessWatcherThread(uuid, running_process, manager, hostname, user, metadata_file, on_exit, logger)

Bases: Thread

Thread that monitors a background SSH process and invokes callback on exit.

Initialise process watcher thread.

Parameters:

Name Type Description Default
uuid str

Process UUID to monitor

required
running_process RunningSSHProcess

Runtime model for the process being monitored

required
manager SSHProcessLifetimeManagerShell

Parent manager instance for metadata updates

required
hostname str

Remote hostname for metadata retrieval

required
user str

Remote user for metadata retrieval

required
metadata_file str

Path to metadata file on remote host

required
on_exit Optional[Callable[[str, Optional[ExitStatus], Optional[Exception]], None]]

Callback function invoked on process exit

required
logger Logger

Logger instance for output

required
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    uuid: str,
    running_process: RunningSSHProcess,
    manager: "SSHProcessLifetimeManagerShell",
    hostname: str,
    user: str,
    metadata_file: str,
    on_exit: Optional[
        Callable[[str, Optional[ExitStatus], Optional[Exception]], None]
    ],
    logger: logging.Logger,
):
    """
    Initialise process watcher thread.

    Args:
        uuid: Process UUID to monitor
        running_process: Runtime model for the process being monitored
        manager: Parent manager instance for metadata updates
        hostname: Remote hostname for metadata retrieval
        user: Remote user for metadata retrieval
        metadata_file: Path to metadata file on remote host
        on_exit: Callback function invoked on process exit
        logger: Logger instance for output
    """
    super().__init__(name=f"ShellWatcher-{uuid}", daemon=True)
    self.uuid = uuid
    self.running_process = running_process
    self.manager = manager
    self.hostname = hostname
    self.user = user
    self.metadata_file = metadata_file
    self.on_exit = on_exit
    self.logger = logger
    self.__is_monitoring_remotely = False
Methods:
is_monitoring_remotely()

Check if the watcher is monitoring the remote process directly.

Returns:

Type Description
bool

True if monitoring remote process, False if monitoring SSH client

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def is_monitoring_remotely(self) -> bool:
    """
    Check if the watcher is monitoring the remote process directly.

    Returns:
        True if monitoring remote process, False if monitoring SSH client
    """
    return self.__is_monitoring_remotely
run()

Monitor process, read metadata asynchronously, and invoke callback on exit.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def run(self):
    """
    Monitor process, read metadata asynchronously, and invoke callback on exit.
    """
    try:
        metadata = self.manager.read_process_metadata(
            self.uuid,
            self.metadata_file,
            self.hostname,
            self.user,
        )
        if metadata:
            with self.manager.lock:
                self.manager.metadata[self.uuid] = metadata
            self.logger.debug(f"Metadata retrieved for process {self.uuid}")

            # Log the terminal commands used to manually SIGKILL this process
            # from outside the process manager which can be useful for debugging
            # unexpected process deaths
            if metadata.pid is not None:
                self.logger.debug(
                    f"To manually kill remote process '{metadata.name}' (UUID: {self.uuid}), run: "
                    f"ssh {self.user}@{self.hostname} kill -9 {metadata.pid}"
                )
            self.logger.debug(
                f"To manually kill the local SSH client for '{metadata.name}' (UUID: {self.uuid}), run: "
                f"kill -9 {self.running_process.process.pid}"
            )
        else:
            self.logger.warning(
                f"Failed to retrieve metadata for process {self.uuid}. "
                f"Remote process monitoring will not be started."
            )
            return
    except Exception as e:
        self.logger.warning(
            f"Exception reading metadata for process {self.uuid}: {e}. "
            f"Remote process monitoring will not be started."
        )
        return

    if metadata.pid is None:
        self.logger.warning(
            f"Metadata for process {self.uuid} did not contain a PID. "
            f"Remote process monitoring will not be started."
        )
        return

    # Monitor the remote process directly
    self.manager._register_remote_process_watcher(self.uuid, self)
    self._monitor_remote_process(metadata.pid)

SSHClientWatcherThread(uuid, running_process, manager, hostname, user, metadata_file, logger)

Bases: Thread

Thread that monitors the local SSH client and classifies its exit.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    uuid: str,
    running_process: RunningSSHProcess,
    manager: "SSHProcessLifetimeManagerShell",
    hostname: str,
    user: str,
    metadata_file: str,
    logger: logging.Logger,
):
    super().__init__(name=f"ShellClientWatcher-{uuid}", daemon=True)
    self.uuid = uuid
    self.running_process = running_process
    self.manager = manager
    self.hostname = hostname
    self.user = user
    self.metadata_file = metadata_file
    self.logger = logger

SSHProcessLifetimeManagerShell(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

Manages process lifecycle using sh library for SSH connections. Uses the sh library's SSH command wrapper to start and manage remote processes.

Initialise SSH process lifetime manager using sh library.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost

False
logger Optional[Logger]

Logger instance for real-time output logging

None
on_process_exit Optional[Callable[[str, Optional[ExitStatus], Optional[Exception]], None]]

Optional callback function(uuid, exit_status, exception) invoked when process exits

None
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[ExitStatus], Optional[Exception]], None]
    ] = None,
):
    """
    Initialise SSH process lifetime manager using sh library.

    Args:
        disable_host_key_check: Disable SSH host key verification for all hosts
        disable_localhost_host_key_check: Disable SSH host key verification for localhost
        logger: Logger instance for real-time output logging
        on_process_exit: Optional callback function(uuid, exit_status, exception) invoked when process exits
    """
    self.disable_host_key_check = disable_host_key_check
    self.disable_localhost_host_key_check = disable_localhost_host_key_check
    self.log = (
        logger
        if logger
        else get_logger("ssh_process_lifetime_manager_shell", rich_handler=True)
    )
    self._on_process_exit = on_process_exit

    # Create SSH command wrapper
    self.ssh = sh.Command("/usr/bin/ssh")

    # Process tracking (one per UUID)
    self.process_store: Dict[str, RunningSSHProcess] = {}

    # Thread tracking for monitoring
    self.client_watchers: Dict[str, SSHClientWatcherThread] = {}
    self.remote_process_watchers: Dict[str, ProcessWatcherThread] = {}

    # Thread-safe lock for process store modifications
    self.lock = threading.Lock()

    # metadata for each process
    self.metadata: Dict[str, Optional[ProcessMetadata]] = {}
Methods:
crash_process(uuid, signal='KILL')

Simulate an unexpected process crash by sending by ending the remote process without cleanup. This leaves the process manager in the same state as if the process had crashed unexpectedly, allowing crash-recovery logic to be exercised in tests.

Parameters:

Name Type Description Default
uuid str

Process UUID to crash

required
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def crash_process(self, uuid: str, signal: str = "KILL") -> None:
    """
    Simulate an unexpected process crash by sending by ending the remote process without cleanup.
    This leaves the process manager in the same state
    as if the process had crashed unexpectedly, allowing crash-recovery logic
    to be exercised in tests.

    Args:
        uuid: Process UUID to crash
    """
    if uuid not in self.process_store:
        self.log.warning(f"crash_process called for unknown UUID {uuid}")
        return

    running_process = self.process_store[uuid]
    hostname = running_process.hostname
    user = running_process.user

    metadata = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.warning(
            f"No remote PID for {uuid}, cannot send {signal} to simulate crash."
        )
        return

    remote_pid = metadata.pid
    self.log.debug(
        f"Simulating crash of process {uuid} (PID {remote_pid}): "
        f"sending {signal} without cleanup."
    )
    self._send_remote_signal(hostname, user, remote_pid, signal)
get_active_process_keys()

Get list of active process UUIDs.

Returns:

Type Description
List[str]

List of active process UUID strings

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_active_process_keys(self) -> List[str]:
    """
    Get list of active process UUIDs.

    Returns:
        List of active process UUID strings
    """
    with self.lock:
        return list(self.process_store.keys())
get_metadata_file_path(uuid) staticmethod

Generate metadata file path for a given process UUID.

Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp. The path will be expanded on the remote host when the command executes.

Parameters:

Name Type Description Default
uuid str

Process UUID to generate metadata file path for

required

Returns:

Type Description
str

Shell-expandable path string containing environment variable reference

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
@staticmethod
def get_metadata_file_path(uuid: str) -> str:
    """
    Generate metadata file path for a given process UUID.

    Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp.
    The path will be expanded on the remote host when the command executes.

    Args:
        uuid: Process UUID to generate metadata file path for

    Returns:
        Shell-expandable path string containing environment variable reference
    """
    return f"${{XDG_RUNTIME_DIR:-/tmp}}/drunc/metadata_{uuid}.json"
get_process_stderr(uuid)

Get stderr from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stderr content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get stderr from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stderr content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid].process
        if hasattr(process, "stderr"):
            stderr_data = process.stderr
            if stderr_data:
                return str(stderr_data)
    except Exception as e:
        self.log.debug(f"Error getting stderr for {uuid}: {e}")

    return None
get_process_stdout(uuid)

Get stdout from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stdout content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get stdout from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stdout content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid].process
        if hasattr(process, "stdout"):
            stdout_data = process.stdout
            if stdout_data:
                return str(stdout_data)
    except Exception as e:
        self.log.debug(f"Error getting stdout for {uuid}: {e}")

    return None
get_remote_pid(uuid)

Return the remote PID for the process identified by uuid.

Parameters:

Name Type Description Default
uuid str

Process UUID to query.

required

Returns:

Type Description
RemotePidResult

RemotePidResult with pid set on success, or reason

RemotePidResult

set to "no metadata" when the metadata file has not yet

RemotePidResult

been written or could not be read.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_remote_pid(self, uuid: str) -> RemotePidResult:
    """
    Return the remote PID for the process identified by *uuid*.

    Args:
        uuid: Process UUID to query.

    Returns:
        RemotePidResult with ``pid`` set on success, or ``reason``
        set to ``"no metadata"`` when the metadata file has not yet
        been written or could not be read.
    """
    with self.lock:
        running_process = self.process_store.get(uuid)
        if running_process is not None and running_process.remote_pid is not None:
            return RemotePidResult(pid=running_process.remote_pid)
        metadata = self.metadata.get(uuid)
    if metadata is None or metadata.pid is None:
        return RemotePidResult(reason="no metadata")
    return RemotePidResult(pid=metadata.pid)
get_runtime_pids(uuid)

Return best-effort runtime PID snapshot for a managed process.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_runtime_pids(self, uuid: str) -> Dict[str, Optional[int]]:
    """Return best-effort runtime PID snapshot for a managed process."""
    with self.lock:
        running_process = self.process_store.get(uuid)
        if running_process is None:
            return {
                "ssh_client_pid": None,
                "remote_pid": None,
                "remote_monitoring_pid": None,
                "client_monitoring_pid": None,
            }

        return {
            "ssh_client_pid": running_process.ssh_client_pid,
            "remote_pid": running_process.remote_pid,
            "remote_monitoring_pid": running_process.remote_monitoring_pid,
            "client_monitoring_pid": running_process.client_monitoring_pid,
        }
is_process_alive(uuid)

Check if process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check

required

Returns:

Type Description
bool

True if process is alive, False otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if process is alive.

    Args:
        uuid: Process UUID to check

    Returns:
        True if process is alive, False otherwise
    """
    if uuid not in self.process_store:
        return False

    running_process = self.process_store[uuid]
    process = running_process.process
    metadata: Optional[ProcessMetadata] = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.debug(
            f"No metadata or PID found for {uuid}, relying on SSH client process status"
        )
        return process.is_alive()

    remote_process_alive = self._is_remote_process_alive(
        running_process.hostname,
        running_process.user,
        metadata.pid,
        uuid,
    )
    return process.is_alive() and remote_process_alive
kill_all_processes(process_timeouts=None)

Kill all active processes in role-based shutdown order.

Retrieves all active process UUIDs and delegates to kill_processes() for role-based termination. Waits for all monitoring threads to complete after termination.

Parameters:

Name Type Description Default
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[ExitStatus]]

Dictionary mapping all process UUIDs to their exit statuses

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_all_processes(
    self, process_timeouts: Optional[Dict[str, float]] = None
) -> Dict[str, Optional[ExitStatus]]:
    """
    Kill all active processes in role-based shutdown order.

    Retrieves all active process UUIDs and delegates to kill_processes()
    for role-based termination. Waits for all monitoring threads to
    complete after termination.

    Args:
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping all process UUIDs to their exit statuses
    """
    # Retrieve all active process UUIDs
    with self.lock:
        active_uuids = list(self.process_store.keys())

    if not active_uuids:
        self.log.info("No processes to terminate")
        return {}

    self.log.info(f"Terminating all {len(active_uuids)} active process(es)")

    # Delegate to kill_processes for role-based shutdown ordering
    all_exit_statuses = self.kill_processes(active_uuids, process_timeouts)

    # Wait for all watcher threads to complete
    with self.lock:
        watchers_to_join = list(self.client_watchers.values()) + list(
            self.remote_process_watchers.values()
        )

    for watcher in watchers_to_join:
        try:
            watcher.join(timeout=2.0)
        except Exception:
            pass

    with self.lock:
        self.client_watchers.clear()
        self.remote_process_watchers.clear()

    return all_exit_statuses
kill_process(uuid, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS)

Kill a remote process and clean up all associated resources.

Sends termination signals to the remote process, waits for it to die, cleans up remote metadata files, terminates the SSH client, and removes the process from internal tracking. Safe to call multiple times.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
timeout float

Timeout for graceful termination in seconds

DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

Returns:

Type Description
ExitStatus | None

ExitStatus of the terminated process, or None if not found or still running

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_process(
    self,
    uuid: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
) -> ExitStatus | None:
    """
    Kill a remote process and clean up all associated resources.

    Sends termination signals to the remote process, waits for it to die,
    cleans up remote metadata files, terminates the SSH client, and removes
    the process from internal tracking. Safe to call multiple times.

    Args:
        uuid: Process UUID to terminate
        timeout: Timeout for graceful termination in seconds

    Returns:
        ExitStatus of the terminated process, or None if not found or still running
    """
    if uuid not in self.process_store:
        return None

    # Read metadata to get remote PID
    metadata = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.warning(
            f"No remote PID for {uuid}, terminating SSH client. Cannot guarantee remote process termination."
        )
        return self.kill_process_without_metadata(
            uuid,
            as_manual_pm_kill=True,
            timeout=timeout,
        )

    running_process = self.process_store[uuid]

    hostname = running_process.hostname
    user = running_process.user
    metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid)

    remote_pid = metadata.pid
    process_dead = False

    try:
        if not self.is_process_alive(uuid):
            self.log.info(
                f"Skipping killing remote process {uuid} (PID {remote_pid}). It is already dead."
            )
            exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout)
            self._cleanup_remote_file(hostname, user, metadata_file)
            self._cleanup_process_resources(uuid)
            if exit_code is None:
                return None
            return ExitStatus(ExitStatusSource.REMOTE_MONITORING, exit_code)

        if not process_dead:
            with self.lock:
                running_process.pending_exit_status_source = (
                    ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID
                )
            self.log.debug(f"Sending SIGQUIT to remote PID {remote_pid}")
            self._send_remote_signal(hostname, user, remote_pid, "QUIT")
            process_dead = self.wait_for_process_to_die(
                uuid, timeout=timeout, logger=self.log
            )
            if process_dead:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) terminated gracefully following SIGQUIT signal."
                )
            else:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) did not terminate within timeout of {timeout} seconds after SIGQUIT signal."
                )

        if not process_dead:
            self.log.debug(f"Sending SIGKILL to remote PID {remote_pid}")
            self._send_remote_signal(hostname, user, remote_pid, "KILL")
            process_dead = self.wait_for_process_to_die(
                uuid, timeout=timeout, logger=self.log
            )

            if process_dead:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) terminated forcibly following SIGKILL signal."
                )
            else:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) did not terminate within timeout of {timeout} seconds after SIGKILL signal."
                )

        if not process_dead:
            self.log.error(
                f"Remote process {uuid} (PID {remote_pid}) still did not terminate after SIGKILL signal."
            )
            with self.lock:
                running_process.pending_exit_status_source = None
        else:
            exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout)
            self._cleanup_remote_file(hostname, user, metadata_file)
            self._cleanup_process_resources(uuid)
            if exit_code is None:
                return None
            return ExitStatus(
                ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID, exit_code
            )

    except Exception as e:
        with self.lock:
            if uuid in self.process_store:
                self.process_store[uuid].pending_exit_status_source = None
        self.log.error(f"Error terminating remote process {uuid}: {e}")
        return None

    return None
kill_process_without_metadata(uuid, signal_name='KILL', as_manual_pm_kill=True, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS)

Terminate process by signalling the local SSH client without using remote metadata. Prefer kill_process(..) to this method, this is mainly intended to help with testing

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
signal_name str

Signal to send to SSH client process group (QUIT/KILL)

'KILL'
as_manual_pm_kill bool

If True, classify as process-manager initiated kill. If False, classify as external kill i.e. outside of process manager control

True
timeout float

Maximum time to wait for process termination in seconds

DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

Returns:

Type Description
Optional[ExitStatus]

ExitStatus if termination state can be determined, None otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_process_without_metadata(
    self,
    uuid: str,
    signal_name: str = "KILL",
    as_manual_pm_kill: bool = True,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
) -> Optional[ExitStatus]:
    """
    Terminate process by signalling the local SSH client without using remote metadata.
    Prefer kill_process(..) to this method, this is mainly intended to help with testing

    Args:
        uuid: Process UUID to terminate
        signal_name: Signal to send to SSH client process group (QUIT/KILL)
        as_manual_pm_kill: If True, classify as process-manager initiated kill.
                           If False, classify as external kill i.e. outside of process manager control
        timeout: Maximum time to wait for process termination in seconds

    Returns:
        ExitStatus if termination state can be determined, None otherwise
    """
    with self.lock:
        running_process = self.process_store.get(uuid)
        metadata = self.metadata.get(uuid)

    if running_process is None:
        self.log.warning(
            f"kill_process_without_metadata called for unknown UUID {uuid}"
        )
        return None

    signal_name = signal_name.upper()

    source_for_return = (
        ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT
        if as_manual_pm_kill
        else ExitStatusSource.CLIENT_MONITORING
    )
    # Ensure watcher callbacks classify this termination path as requested.
    with self.lock:
        running_process.pending_exit_status_source = source_for_return

    try:
        running_process.kill_client(signal_name=signal_name)
    except Exception as e:
        self.log.debug(
            f"Exception was raised when terminating SSH client process: {e}"
        )

    exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout)

    if exit_code is None and signal_name == "QUIT" and not as_manual_pm_kill:
        remote_pid = metadata.pid if metadata is not None else None
        self._handle_external_client_sigquit(
            uuid,
            running_process.hostname,
            running_process.user,
            remote_pid,
            SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid),
            timeout=timeout,
        )
        exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout)

    if exit_code is None:
        with self.lock:
            running_process.pending_exit_status_source = None
        return None

    self._cleanup_process_resources(uuid)

    return ExitStatus(source_for_return, exit_code)
kill_processes(uuids, process_timeouts=None)

Kill multiple processes by their UUIDs in role-based shutdown order.

Executes a staged shutdown by role. Processes within each role are terminated asynchronously. After all roles complete, any remaining processes are killed asynchronously as a fallback.

Parameters:

Name Type Description Default
uuids List[str]

List of process UUIDs to terminate

required
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[ExitStatus]]

Dictionary mapping process UUIDs to their exit statuses

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_processes(
    self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None
) -> Dict[str, Optional[ExitStatus]]:
    """
    Kill multiple processes by their UUIDs in role-based shutdown order.

    Executes a staged shutdown by role. Processes within each role are terminated
    asynchronously. After all roles complete, any remaining processes
    are killed asynchronously as a fallback.

    Args:
        uuids: List of process UUIDs to terminate
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping process UUIDs to their exit statuses
    """
    if not uuids:
        self.log.debug("No processes to kill")
        return {}

    if process_timeouts is None:
        process_timeouts = {}

    # Ensure all UUIDs have timeout values
    for uuid in uuids:
        if uuid not in process_timeouts:
            process_timeouts[uuid] = self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

    all_exit_statuses: Dict[str, Optional[ExitStatus]] = {}
    killed_uuids = set()

    # Execute role-based shutdown in stages
    for role in PROCESS_SHUTDOWN_ORDERING:
        with self.lock:
            uuids_in_role = [
                uuid
                for uuid in uuids
                if (metadata := self.metadata.get(uuid)) is not None
                and metadata.role == role
            ]

        # Match k8s PM behavior: if role is absent, do not log/start/end a stage.
        if not uuids_in_role:
            continue

        self.log.info(
            f"--- Termination of role '{role}' ({len(uuids_in_role)} process(es)) ---"
        )
        role_exit_statuses = self.kill_processes_by_role(
            role, uuids, process_timeouts=process_timeouts
        )
        all_exit_statuses.update(role_exit_statuses)
        killed_uuids.update(role_exit_statuses.keys())

        if role_exit_statuses:
            self.log.info(f"--- Shutdown stage: Role '{role}' complete ---")

    # Identify processes not killed during role-based shutdown
    remaining_uuids = [uuid for uuid in uuids if uuid not in killed_uuids]

    if remaining_uuids:
        self.log.info(
            f"Fallback: Killing {len(remaining_uuids)} process(es) without "
            f"role metadata asynchronously"
        )

        # Kill remaining processes asynchronously without role ordering
        fallback_exit_statuses: Dict[str, Optional[ExitStatus]] = {}

        with ThreadPoolExecutor(max_workers=len(remaining_uuids)) as executor:
            # Submit kill tasks for all remaining processes
            future_to_uuid = {
                executor.submit(
                    self.kill_process, uuid, process_timeouts[uuid]
                ): uuid
                for uuid in remaining_uuids
            }

            # Collect results as they complete
            for future in as_completed(future_to_uuid):
                uuid = future_to_uuid[future]
                try:
                    fallback_exit_statuses[uuid] = future.result()
                except Exception as e:
                    self.log.error(
                        f"Error during fallback termination of process {uuid}: {e}"
                    )
                    fallback_exit_statuses[uuid] = None

        all_exit_statuses.update(fallback_exit_statuses)

    return all_exit_statuses
kill_processes_by_role(role, candidate_uuids, process_timeouts=None)

Kill all processes with the specified role from candidate UUID list.

Filters candidate UUIDs by matching metadata roles, then terminates matching processes asynchronously using a thread pool.

Parameters:

Name Type Description Default
role str

Process role to match

required
candidate_uuids List[str]

List of process UUIDs to filter by role

required
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[ExitStatus]]

Dictionary mapping terminated process UUIDs to their exit statuses

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_processes_by_role(
    self,
    role: str,
    candidate_uuids: List[str],
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[ExitStatus]]:
    """
    Kill all processes with the specified role from candidate UUID list.

    Filters candidate UUIDs by matching metadata roles, then terminates
    matching processes asynchronously using a thread pool.

    Args:
        role: Process role to match
        candidate_uuids: List of process UUIDs to filter by role
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping terminated process UUIDs to their exit statuses
    """
    self.log.debug(f"process_timeouts: {process_timeouts}")
    if process_timeouts is None:
        process_timeouts = {}

    # Filter candidate UUIDs by role using process metadata
    uuids_to_kill = []
    with self.lock:
        for uuid in candidate_uuids:
            metadata = self.metadata.get(uuid)
            if metadata is None or metadata.role != role:
                continue

            uuids_to_kill.append(uuid)
            process_timeouts.setdefault(
                uuid,
                self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
            )

    if not uuids_to_kill:
        return {}

    self.log.info(f"Killing {len(uuids_to_kill)} process(es) with role '{role}'")

    exit_statuses: Dict[str, Optional[ExitStatus]] = {}

    with ThreadPoolExecutor(max_workers=len(uuids_to_kill)) as executor:
        future_to_uuid = {
            executor.submit(self.kill_process, uuid, process_timeouts[uuid]): uuid
            for uuid in uuids_to_kill
        }

        for future in as_completed(future_to_uuid):
            uuid = future_to_uuid[future]
            try:
                exit_statuses[uuid] = future.result()
            except Exception as e:
                self.log.error(
                    f"Error during termination of process {uuid} with role '{role}': {e}"
                )
                exit_statuses[uuid] = None

    return exit_statuses
pop_early_exit_status(uuid)

Get process exit code if process exited early without being killed.

This method checks if a process has terminated unexpectedly (without kill_process being called). If an exit code is found, the process resources are cleaned up automatically.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[ExitStatus]

ExitStatus if process has terminated early, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def pop_early_exit_status(self, uuid: str) -> Optional[ExitStatus]:
    """
    Get process exit code if process exited early without being killed.

    This method checks if a process has terminated unexpectedly (without
    kill_process being called). If an exit code is found, the process
    resources are cleaned up automatically.

    Args:
        uuid: Process UUID

    Returns:
        ExitStatus if process has terminated early, None if still running or not found
    """
    if uuid not in self.process_store:
        self.log.debug(f"Process {uuid} not found in store for exit code retrieval")
        return None

    process = self.process_store[uuid].process
    if process.is_alive():
        return None

    try:
        process.wait()
        early_exit_code = process.exit_code
    except sh.ErrorReturnCode as e:
        early_exit_code = e.exit_code
    except Exception as e:
        self.log.debug(f"Exception thrown getting exit code for {uuid}: {e}")
        return None

    if early_exit_code is not None:
        exit_status = ExitStatus(
            ExitStatusSource.CLIENT_MONITORING,
            early_exit_code,
        )
        self.log.warning(
            f"Process {uuid} exited early without being killed. Exit status {exit_status!r}"
        )
        self.log.debug(
            f"Cleaning up resources for process {uuid} with exit status {exit_status!r}"
        )
        self._cleanup_process_resources(uuid)
        return exit_status

    return None
read_log_file(hostname, user, log_file, num_lines=100)

Read remote log file via SSH.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def read_log_file(
    self, hostname: str, user: str, log_file: str, num_lines: int = 100
) -> List[str]:
    """Read remote log file via SSH."""
    # Create temporary file for output
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()

    try:
        # Build user@host string
        user_host = f"{user}@{hostname}"

        # Build SSH arguments using helper method
        arguments = self._build_ssh_arguments(hostname, user_host)
        arguments.extend(["tail", f"-{num_lines}", log_file])

        # Execute SSH command with output redirection
        self.ssh(
            *arguments,
            _out=temp_file.name,
            _err_to_out=True,
        )

        # Read output lines from temporary file
        with open(temp_file.name) as f:
            lines = f.readlines()

            # Remove SSH connection closure message if present
            if lines and "Connection to " in lines[-1] and " closed." in lines[-1]:
                lines = lines[:-1]

            return lines

    except Exception as e:
        self.log.error(f"Failed to read remote log file: {e}")
        return [f"Could not retrieve logs: {e!s}"]

    finally:
        # Clean up temporary file
        try:
            os.remove(temp_file.name)
        except Exception:
            pass
read_process_metadata(uuid, metadata_file, hostname, user, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA)

Read process metadata from remote JSON file with a single SSH call.

Uses a remote-side wait loop to avoid multiple SSH round-trips. The remote command polls for file existence and reads it once available, all within a single SSH session.

Parameters:

Name Type Description Default
uuid str

Process UUID for identification in logs

required
metadata_file str

Absolute path to metadata file on remote host

required
hostname str

Target hostname for SSH connection

required
user str

SSH username for authentication

required
timeout float

Maximum time in seconds to wait for metadata file availability

DEFAULT_TIMEOUT_FOR_READING_METADATA

Returns:

Type Description
Optional[ProcessMetadata]

ProcessMetadata instance if file exists and is valid, None otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def read_process_metadata(
    self,
    uuid: str,
    metadata_file: str,
    hostname: str,
    user: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA,
) -> Optional[ProcessMetadata]:
    """
    Read process metadata from remote JSON file with a single SSH call.

    Uses a remote-side wait loop to avoid multiple SSH round-trips. The remote
    command polls for file existence and reads it once available, all within
    a single SSH session.

    Args:
        uuid: Process UUID for identification in logs
        metadata_file: Absolute path to metadata file on remote host
        hostname: Target hostname for SSH connection
        user: SSH username for authentication
        timeout: Maximum time in seconds to wait for metadata file availability

    Returns:
        ProcessMetadata instance if file exists and is valid, None otherwise
    """
    try:
        user_host = f"{user}@{hostname}"

        # Metadata read is non-interactive and machine-readable.
        arguments = self._build_ssh_arguments(
            hostname,
            user_host,
            use_tty=False,
        )

        remote_command = (
            f"timeout {timeout} sh -c '"
            f'metadata_file="{metadata_file}"; '
            f'while [ ! -s "$metadata_file" ]; do sleep 0.05; done; '
            f'cat "$metadata_file"'
            f"'"
        )
        arguments.append(remote_command)

        result = self.ssh(*arguments)
        json_content = str(result).strip()

        self.log.debug(f"Metadata content for {uuid}: {json_content!r}")

        metadata = ProcessMetadata.from_json(json_content)

        with self.lock:
            running_process = self.process_store.get(uuid)
            if running_process is not None:
                running_process.populate_from_metadata(metadata)

        return metadata

    except Exception as e:
        self.log.warning(f"Failed to read metadata for {uuid}: {e}")
        return None
start_process(uuid, boot_request)

Start a remote process via SSH using the boot request configuration.

Extracts all necessary parameters from the boot request and executes the process on the remote host using sh library's SSH wrapper.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process

required
boot_request BootRequest

BootRequest containing process configuration, metadata, environment variables, and execution parameters

required

Raises:

Type Description
RuntimeError

If SSH connection or process execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process via SSH using the boot request configuration.

    Extracts all necessary parameters from the boot request and executes
    the process on the remote host using sh library's SSH wrapper.

    Args:
        uuid: Unique identifier for this process
        boot_request: BootRequest containing process configuration, metadata,
                    environment variables, and execution parameters

    Raises:
        RuntimeError: If SSH connection or process execution fails
    """
    # Extract connection parameters from boot request metadata
    hostname = boot_request.process_description.metadata.hostname
    user = boot_request.process_description.metadata.user
    log_file = boot_request.process_description.process_logs_path

    # Extract environment variables from boot request
    env_vars = (
        dict(boot_request.process_description.env)
        if boot_request.process_description.env
        else {}
    )

    # Build command string from executable and arguments
    cmd = ""
    for exe_arg in boot_request.process_description.executable_and_arguments:
        cmd += exe_arg.exec
        for arg in exe_arg.args:
            cmd += f" {arg}"
        cmd += ";"

    # Remove trailing semicolon if present
    if cmd.endswith(";"):
        cmd = cmd[:-1]

    # Execute the command via SSH
    self._execute_bootrequest_via_ssh(
        uuid=uuid,
        boot_request=boot_request,
        hostname=hostname,
        user=user if user else getpass.getuser(),
        command=cmd,
        log_file=log_file,
        env_vars=env_vars,
    )
validate_host_connection(host, auth_method, user=getpass.getuser())

Validate SSH connection to the specified host.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str = getpass.getuser(),
) -> None:
    """Validate SSH connection to the specified host."""
    try:
        # Build user@host string
        user_host = f"{user}@{host}"

        # Build remote command
        remote_cmd = f'echo "{user} established SSH successfully";'

        # Build SSH arguments using helper method
        arguments = self._build_ssh_arguments(host, user_host)
        arguments.append(remote_cmd)

        # Execute SSH command and wait for completion
        self.ssh(*arguments)

        self.log.debug(f"SSH validation successful for {user}@{host}")

    except Exception as e:
        self.log.error(f"SSH validation failed for {user}@{host}: {e}")
        raise RuntimeError(f"SSH connection validation failed: {e}")
wait_for_process_exit_code(uuid, timeout)

Wait for specified timeout to see if a process exit code is available.

Parameters:

Name Type Description Default
uuid str

Process UUID to wait for

required
timeout float

Maximum time to wait in seconds

required

Returns:

Type Description
Optional[int]

Exit code if process has terminated, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int]:
    """
    Wait for specified timeout to see if a process exit code is available.

    Args:
        uuid: Process UUID to wait for
        timeout: Maximum time to wait in seconds

    Returns:
        Exit code if process has terminated, None if still running or not found
    """
    # Get process reference under lock to avoid race condition
    with self.lock:
        if uuid not in self.process_store:
            return None
        process = self.process_store[uuid].process

    def check_exit_status():
        return not process.is_alive()

    # Wait for process to exit
    got_exit = wait_for(check_exit_status, expected_value=True, timeout=timeout)

    if got_exit:
        try:
            process.wait()
            return process.exit_code
        except sh.ErrorReturnCode as e:
            return e.exit_code
        except Exception as e:
            self.log.debug(f"Exception getting exit code for {uuid}: {e}")
            return None
    else:
        self.log.warning(f"Timeout waiting for exit code of process {uuid}")
        return None

Functions: