Skip to content

ssh_process_lifetime_manager_paramiko

drunc.processes.ssh_process_lifetime_manager_paramiko

Provides SSH connection and lifetime management

Classes

SSHProcessLifetimeManagerParamiko(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

Supports process lifecycle management of processes started via SSH, output capture, and exit code tracking.

Initialise SSH connection manager.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost

False
logger Optional[Logger]

Logger instance for real-time output logging

None
on_process_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Optional callback function(uuid, exit_code, exception) invoked when process exits

None
Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[int], Optional[Exception]], None]
    ] = None,
):
    """
    Initialise SSH connection manager.

    Args:
        disable_host_key_check: Disable SSH host key verification for all hosts
        disable_localhost_host_key_check: Disable SSH host key verification for localhost
        logger: Logger instance for real-time output logging
        on_process_exit: Optional callback function(uuid, exit_code, exception) invoked when process exits
    """
    self.disable_host_key_check = disable_host_key_check
    self.disable_localhost_host_key_check = disable_localhost_host_key_check
    self.log = logger if logger else get_logger(__name__)
    self.log.warning(
        "The paramiko-based SSH process manager is NOT actively maintatined. Consider using the shell-based SSH process manager instead."
    )
    self.on_process_exit = on_process_exit

    # Connection and channel tracking (one per UUID)
    self.connections: Dict[str, paramiko.SSHClient] = {}
    self.channels: Dict[str, paramiko.Channel] = {}
    self.metadata: Dict[str, ProcessMetadata] = {}

    # Thread tracking for monitoring
    self.watchers: List[threading.Thread] = []

    # Output capture for process monitoring
    self.stdout_buffers: Dict[str, List[str]] = {}
    self.stderr_buffers: Dict[str, List[str]] = {}

    # Exit code tracking
    self.exit_codes: Dict[str, Optional[int]] = {}

    # Thread-safe locks
    self.locks: Dict[str, threading.Lock] = {}
    self.global_lock = threading.Lock()
    self.issued_GSS_API_warning = False
Functions
get_active_process_keys()

Get list of active process UUIDs.

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_active_process_keys(self) -> List[str]:
    """Get list of active process UUIDs."""
    with self.global_lock:
        return list(self.connections.keys())
get_metadata_file_path(uuid) staticmethod

Generate metadata file path for a given process UUID.

Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp. The path will be expanded on the remote host when the command executes.

Parameters:

Name Type Description Default
uuid str

Process UUID to generate metadata file path for

required

Returns:

Type Description
str

Shell-expandable path string containing environment variable reference

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
@staticmethod
def get_metadata_file_path(uuid: str) -> str:
    """
    Generate metadata file path for a given process UUID.

    Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp.
    The path will be expanded on the remote host when the command executes.

    Args:
        uuid: Process UUID to generate metadata file path for

    Returns:
        Shell-expandable path string containing environment variable reference
    """
    return f"${{XDG_RUNTIME_DIR:-/tmp}}/drunc/metadata_{uuid}.json"
get_process_stderr(uuid)

Get stderr from process.

Note: With output redirection to log files, stderr capture is minimal.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stderr content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get stderr from process.

    Note: With output redirection to log files, stderr capture is minimal.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stderr content as string, None if not found
    """
    if uuid not in self.stderr_buffers:
        return None

    with self.locks.get(uuid, threading.Lock()):
        return "\n".join(self.stderr_buffers[uuid])
get_process_stdout(uuid)

Get stdout from process.

Note: With output redirection to log files, stdout capture is minimal. This primarily captures the initial "SSHPM: Starting process..." message.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stdout content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get stdout from process.

    Note: With output redirection to log files, stdout capture is minimal.
    This primarily captures the initial "SSHPM: Starting process..." message.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stdout content as string, None if not found
    """
    if uuid not in self.stdout_buffers:
        return None

    with self.locks.get(uuid, threading.Lock()):
        return "\n".join(self.stdout_buffers[uuid])
is_process_alive(uuid)

Check if SSH process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check

required

Returns:

Type Description
bool

True if process is alive, False otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if SSH process is alive.

    Args:
        uuid: Process UUID to check

    Returns:
        True if process is alive, False otherwise
    """
    if uuid not in self.channels:
        return False

    channel = self.channels[uuid]
    channel_alive = not channel.exit_status_ready()

    metadata: ProcessMetadata = self.metadata.get(uuid, None)
    if metadata is None:
        self.log.debug(f"No metadata found for UUID {uuid} whilst checking alive.")
        return channel_alive

    remote_process_alive = self._is_remote_process_alive(
        metadata.hostname, metadata.user, metadata.pid
    )
    return channel_alive and remote_process_alive
kill_all_processes()

Clean up all processes and resources.

Terminates all managed processes and releases all associated resources. Safe to call multiple times.

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def kill_all_processes(self) -> dict[str, Optional[int]]:
    """
    Clean up all processes and resources.

    Terminates all managed processes and releases all associated resources.
    Safe to call multiple times.
    """
    # Get list of UUIDs to terminate
    with self.global_lock:
        uuids = list(self.connections.keys())

    process_exit_codes: dict[str, Optional[int]] = {}

    # Terminate all processes (each kill_process call auto-cleans up on success)
    for uuid in uuids:
        try:
            process_exit_codes[uuid] = self.kill_process(
                uuid,
                timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
            )
        except Exception as e:
            self.log.error(f"Error during cleanup of process {uuid}: {e}")

    # Wait for watcher threads to complete
    for watcher in self.watchers:
        try:
            watcher.join(timeout=2.0)
        except Exception:
            pass

    self.watchers.clear()
    return process_exit_codes
kill_process(uuid, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS)

Kill a remote process and clean up associated resources upon successful termination.

Sends termination signals to the remote process and waits for it to die. If the process terminates successfully, cleans up all associated resources including remote metadata files and internal tracking. If termination fails, forcibly closes the SSH channel to send a SIGHUP to the remote process.

Safe to call multiple times - subsequent calls will have no effect if resources have already been cleaned up.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
timeout float

Timeout for graceful termination in seconds

DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

Returns:

Type Description
Optional[int]

Exit code of the terminated process, or None if not found or still running

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def kill_process(
    self,
    uuid: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
) -> Optional[int]:
    """
    Kill a remote process and clean up associated resources upon successful termination.

    Sends termination signals to the remote process and waits for it to die.
    If the process terminates successfully, cleans up all associated resources
    including remote metadata files and internal tracking. If termination fails,
    forcibly closes the SSH channel to send a SIGHUP to the remote process.

    Safe to call multiple times - subsequent calls will have no effect if
    resources have already been cleaned up.

    Args:
        uuid: Process UUID to terminate
        timeout: Timeout for graceful termination in seconds

    Returns:
        Exit code of the terminated process, or None if not found or still running
    """
    if uuid not in self.connections:
        return None

    channel = self.channels.get(uuid)
    if channel and channel.exit_status_ready():
        return channel.recv_exit_status()

    metadata = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.warning(
            f"No metadata or PID for {uuid}, closing SSH connection. Cannot guarantee process termination."
        )
        self._kill_process_channel(uuid, channel)
        exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
        self._cleanup_process_resources(uuid)
        return exit_code

    hostname = metadata.hostname
    user = metadata.user
    remote_pid = metadata.pid
    metadata_file = SSHProcessLifetimeManagerParamiko.get_metadata_file_path(uuid)
    process_dead = False

    try:
        signal_client = self._create_ssh_client(hostname, user, enable_agent=True)
        if signal_client is None:
            self.log.warning(
                f"Could not create signal client for {uuid}, closing connection"
            )
            self._kill_process_channel(uuid, channel)
            exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
            self._cleanup_process_resources(uuid)
            return exit_code

        try:
            if not self.is_process_alive(uuid):
                self.log.info(
                    f"Skipping killing remote process {uuid} (PID {remote_pid}). It is already dead."
                )
                process_dead = True

            if not process_dead:
                self.log.debug(f"Sending SIGQUIT to remote PID {remote_pid}")
                self._send_remote_signal(signal_client, remote_pid, "QUIT")
                process_dead = self.wait_for_process_to_die(
                    uuid, timeout=timeout, logger=self.log
                )
                if process_dead:
                    self.log.info(
                        f"Remote process {uuid} (PID {remote_pid}) terminated gracefully following SIGQUIT signal."
                    )
                else:
                    self.log.info(
                        f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGQUIT signal."
                    )

            if not process_dead:
                self.log.debug(f"Sending SIGKILL to remote PID {remote_pid}")
                self._send_remote_signal(signal_client, remote_pid, "KILL")
                process_dead = self.wait_for_process_to_die(
                    uuid, timeout=timeout, logger=self.log
                )

                if process_dead:
                    self.log.info(
                        f"Remote process {uuid} (PID {remote_pid}) terminated forcibly following SIGKILL signal."
                    )
                else:
                    self.log.debug(
                        f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGKILL signal."
                    )

            if not process_dead:
                self.log.error(
                    f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGKILL signal."
                )
                # Forcibly close channel since graceful termination failed
                self._kill_process_channel(uuid, channel)
                exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
                # Don't clean up resources to aid debugging
                return exit_code
            else:
                # Process died successfully - wait for watcher thread to capture exit code
                exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
                # Clean up remote metadata file on successful termination
                self._cleanup_remote_file_paramiko(signal_client, metadata_file)
                # Clean up local resources
                self._cleanup_process_resources(uuid)
                return exit_code

        finally:
            signal_client.close()

    except Exception as e:
        self.log.error(f"Error terminating remote process {uuid}: {e}")
        # Exception during termination - forcibly close channel
        self._kill_process_channel(uuid, channel)
        return None
kill_processes(uuids, process_timeouts=None)

not implemented in paramiko manager

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def kill_processes(
    self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None
) -> Dict[str, Optional[int]]:
    """not implemented in paramiko manager"""
    self.log.warning("kill_processes is not implemented in Paramiko manager")
    return {}
kill_processes_by_role(role, candidate_uuids, process_timeouts=None)

not implemented in paramiko manager

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def kill_processes_by_role(
    self,
    role: str,
    candidate_uuids: List[str],
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[int]]:
    """not implemented in paramiko manager"""
    self.log.warning(
        "kill_processes_by_role is not implemented in Paramiko manager"
    )
    return {}
pop_early_exit_code(uuid)

Get process exit code. Cleaning up all process resources if the exit code is found. The only way this doesn't return None is if the process is dead without kill_process being called.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[int]

Exit code if process has terminated, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def pop_early_exit_code(self, uuid: str) -> Optional[int]:
    """
    Get process exit code. Cleaning up all process resources if the
    exit code is found. The only way this doesn't return None is if
    the process is dead without kill_process being called.

    Args:
        uuid: Process UUID

    Returns:
        Exit code if process has terminated, None if still running or not found
    """

    if uuid not in self.exit_codes:
        self.log.debug(f"Process {uuid} not found in store for exit code retrieval")
        return None

    with self.global_lock:
        early_exit_code = self.exit_codes.get(uuid)

    if early_exit_code is not None:
        self.log.warning(
            f"Process {uuid} exited early without being killed. Exit code {early_exit_code}"
        )
        self.log.debug(
            f"Cleaning up resources for process {uuid} with exit code {early_exit_code}"
        )
        self._cleanup_process_resources(uuid)

    return early_exit_code
read_log_file(hostname, user, log_file, num_lines=100)

Read remote log file via SSH.

Creates a temporary SSH connection to read the log file and returns the last N lines.

Parameters:

Name Type Description Default
hostname str

Target hostname

required
user str

SSH username

required
log_file str

Remote log file path

required
num_lines int

Number of lines to read from end of file

100

Returns:

Type Description
List[str]

List of log lines

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def read_log_file(
    self, hostname: str, user: str, log_file: str, num_lines: int = 100
) -> List[str]:
    """
    Read remote log file via SSH.

    Creates a temporary SSH connection to read the log file and returns
    the last N lines.

    Args:
        hostname: Target hostname
        user: SSH username
        log_file: Remote log file path
        num_lines: Number of lines to read from end of file

    Returns:
        List of log lines
    """
    # Create temporary SSH client for log retrieval
    # Disable agent/key lookup for simpler temporary connection
    client = self._create_ssh_client(hostname, user, enable_agent=False)
    if client is None:
        self.log.debug(
            f"Returning empty log lines for {hostname}, was unable to connect."
        )
        return []

    try:
        # Execute tail command to retrieve last N lines
        stdin, stdout, stderr = client.exec_command(
            f"tail -{num_lines} {log_file}", timeout=10.0
        )

        # Read output lines
        lines = stdout.readlines()

        # Check for errors during command execution
        error_output = stderr.read().decode("utf-8", errors="replace")
        if error_output:
            self.log.warning(f"Error reading log file: {error_output}")
            return [f"Could not retrieve logs: {error_output}"]

        return lines

    except Exception as e:
        self.log.error(f"Failed to read remote log file: {e}")
        return [f"Could not retrieve logs: {e!s}"]

    finally:
        # Always close temporary connection
        if client is not None:
            try:
                client.close()
            except Exception:
                pass
read_process_metadata(uuid, metadata_file, hostname, user, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA)

Read process metadata from remote JSON file with retry logic.

Parameters:

Name Type Description Default
uuid str

Process UUID

required
metadata_file str

Remote path to metadata file (may contain shell variables)

required
hostname str

Target hostname

required
user str

SSH username

required
timeout float

Maximum time to wait for metadata file in seconds

DEFAULT_TIMEOUT_FOR_READING_METADATA

Returns:

Type Description
Optional[ProcessMetadata]

ProcessMetadata instance if file exists and is valid, None otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def read_process_metadata(
    self,
    uuid: str,
    metadata_file: str,
    hostname: str,
    user: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA,
) -> Optional[ProcessMetadata]:
    """
    Read process metadata from remote JSON file with retry logic.

    Args:
        uuid: Process UUID
        metadata_file: Remote path to metadata file (may contain shell variables)
        hostname: Target hostname
        user: SSH username
        timeout: Maximum time to wait for metadata file in seconds

    Returns:
        ProcessMetadata instance if file exists and is valid, None otherwise
    """

    def attempt_read():
        try:
            # Create temporary SSH connection for metadata read
            client = self._create_ssh_client(hostname, user, enable_agent=True)
            if client is None:
                self.log.debug(f"Failed to connect for metadata read: {uuid}")
                return None

            try:
                # Expand shell variables in path since SFTP doesn't do this
                stdin, stdout, stderr = client.exec_command(
                    f"echo {metadata_file}", timeout=5.0
                )
                expanded_path = stdout.read().decode("utf-8").strip()

                # Use SFTP to read metadata file with expanded path
                sftp = client.open_sftp()

                # Read remote file
                with sftp.file(expanded_path, "r") as f:
                    json_content = f.read().decode("utf-8")

                sftp.close()

                # Parse JSON and create metadata object
                metadata = ProcessMetadata.from_json(json_content)

                self.log.debug(f"Read metadata for {uuid} from {expanded_path}")
                return metadata

            finally:
                client.close()

        except Exception as e:
            self.log.debug(f"Failed to read metadata for {uuid}: {e}")
            return None

    # Use wait_for to retry reading metadata until timeout
    return wait_for(
        attempt_read,
        expected_value=lambda x: x is not None,
        timeout=timeout,
        poll_interval=0.5,
    )
start_process(uuid, boot_request)

Start a remote process via SSH using the boot request configuration.

Extracts all necessary parameters from the boot request and delegates to _execute_ssh_command for SSH connection and process execution.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process

required
boot_request BootRequest

BootRequest containing process configuration, metadata, environment variables, and execution parameters

required

Raises:

Type Description
RuntimeError

If SSH connection or process execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process via SSH using the boot request configuration.

    Extracts all necessary parameters from the boot request and delegates
    to _execute_ssh_command for SSH connection and process execution.

    Args:
        uuid: Unique identifier for this process
        boot_request: BootRequest containing process configuration, metadata,
                    environment variables, and execution parameters

    Raises:
        RuntimeError: If SSH connection or process execution fails
    """
    # Extract connection parameters from boot request metadata
    hostname = boot_request.process_description.metadata.hostname
    user = boot_request.process_description.metadata.user
    log_file = boot_request.process_description.process_logs_path

    # Extract environment variables from boot request
    env_vars = (
        dict(boot_request.process_description.env)
        if boot_request.process_description.env
        else {}
    )

    # Build command string from executable and arguments
    cmd = ""
    for exe_arg in boot_request.process_description.executable_and_arguments:
        cmd += exe_arg.exec
        for arg in exe_arg.args:
            cmd += f" {arg}"
        cmd += ";"

    # Remove trailing semicolon if present
    if cmd.endswith(";"):
        cmd = cmd[:-1]

    # Execute the command via SSH
    self._execute_ssh_command(
        uuid=uuid,
        boot_request=boot_request,
        hostname=hostname,
        user=user if user else getpass.getuser(),
        command=cmd,
        log_file=log_file,
        env_vars=env_vars,
    )
validate_host_connection(host, auth_method, user=getpass.getuser())

Validate SSH connections for all hosts in the collected applications.

This method attempts to establish an SSH connection to the specified host and execute a simple command to verify connectivity. Used to validate access.

Parameters:

Name Type Description Default
host str

Target hostname

required
auth_method str

Authentication method to use ('publickey' or 'gssapi-with-mic')

required
user str

SSH username (default: current user)

getuser()

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If SSH connection or command execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str = getpass.getuser(),
) -> None:
    """
    Validate SSH connections for all hosts in the collected applications.

    This method attempts to establish an SSH connection to the specified host
    and execute a simple command to verify connectivity. Used to validate access.

    Args:
        host: Target hostname
        auth_method: Authentication method to use ('publickey' or 'gssapi-with-mic')
        user: SSH username (default: current user)

    Returns:
        None

    Raises:
        RuntimeError: If SSH connection or command execution fails
    """
    # Create and connect SSH client
    client = self._create_ssh_client(
        hostname=host, user=user, enable_agent=True, auth_methods=[auth_method]
    )
    if client is None:
        raise RuntimeError("SSH connection failed")
    try:
        # Attempt SSH connection and command execution
        remote_cmd = f'echo "{user} established SSH successfully";'
        stdin, stdout, stderr = client.exec_command(remote_cmd, timeout=10.0)

        # recv_exit_status() blocks until the remote command has finished and
        # returns the exit code
        exit_status = stdout.channel.recv_exit_status()

        self.log.debug(f"SSH doctor command exit status: {exit_status}")

    finally:
        if client:
            client.close()

Functions