Skip to content

ssh_process_lifetime_manager_paramiko

drunc.processes.ssh_process_lifetime_manager_paramiko

Provides SSH connection and lifetime management

Classes

SSHProcessLifetimeManagerParamiko(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

Supports process lifecycle management of processes started via SSH, output capture, and exit code tracking.

Initialise SSH connection manager.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost

False
logger Optional[Logger]

Logger instance for real-time output logging

None
on_process_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Optional callback function(uuid, exit_code, exception) invoked when process exits

None
Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[int], Optional[Exception]], None]
    ] = None,
):
    """
    Initialise SSH connection manager.

    Args:
        disable_host_key_check: Disable SSH host key verification for all hosts
        disable_localhost_host_key_check: Disable SSH host key verification for localhost
        logger: Logger instance for real-time output logging
        on_process_exit: Optional callback function(uuid, exit_code, exception) invoked when process exits
    """
    self.disable_host_key_check = disable_host_key_check
    self.disable_localhost_host_key_check = disable_localhost_host_key_check
    self.log = logger if logger else get_logger(__name__)
    self.on_process_exit = on_process_exit

    # Connection and channel tracking (one per UUID)
    self.connections: Dict[str, paramiko.SSHClient] = {}
    self.channels: Dict[str, paramiko.Channel] = {}

    # Thread tracking for monitoring
    self.watchers: List[threading.Thread] = []

    # Output capture for process monitoring
    self.stdout_buffers: Dict[str, List[str]] = {}
    self.stderr_buffers: Dict[str, List[str]] = {}

    # Exit code tracking
    self.exit_codes: Dict[str, Optional[int]] = {}

    # Thread-safe locks
    self.locks: Dict[str, threading.Lock] = {}
    self.global_lock = threading.Lock()
Functions
cleanup_all()

Clean up all processes and resources.

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def cleanup_all(self) -> None:
    """Clean up all processes and resources."""
    # Terminate all processes
    with self.global_lock:
        uuids = list(self.connections.keys())

    for uuid in uuids:
        self.cleanup_process(uuid)

    # Wait for watcher threads
    for watcher in self.watchers:
        try:
            watcher.join(timeout=2.0)
        except Exception:
            pass

    self.watchers.clear()
cleanup_process(uuid)

Clean up process resources.

Terminates the process (if still running) and releases all associated resources.

Parameters:

Name Type Description Default
uuid str

Process UUID to clean up

required
Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def cleanup_process(self, uuid: str) -> None:
    """
    Clean up process resources.

    Terminates the process (if still running) and releases all associated resources.

    Args:
        uuid: Process UUID to clean up
    """
    # Terminate if still running
    if uuid in self.channels:
        channel = self.channels[uuid]
        if not channel.exit_status_ready():
            self.terminate_process(uuid)

    # Close and remove connection
    if uuid in self.connections:
        try:
            self.connections[uuid].close()
        except Exception as e:
            self.log.debug(f"Error closing connection for {uuid}: {e}")

        with self.global_lock:
            del self.connections[uuid]

    # Remove channel
    if uuid in self.channels:
        with self.global_lock:
            del self.channels[uuid]

    # Clean up tracking structures
    with self.global_lock:
        if uuid in self.exit_codes:
            del self.exit_codes[uuid]
        if uuid in self.stdout_buffers:
            del self.stdout_buffers[uuid]
        if uuid in self.stderr_buffers:
            del self.stderr_buffers[uuid]
        if uuid in self.locks:
            del self.locks[uuid]
get_active_process_keys()

Get list of active process UUIDs.

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_active_process_keys(self) -> List[str]:
    """Get list of active process UUIDs."""
    with self.global_lock:
        return list(self.connections.keys())
get_exit_code(uuid)

Get process exit code.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[int]

Exit code if process has terminated, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_exit_code(self, uuid: str) -> Optional[int]:
    """
    Get process exit code.

    Args:
        uuid: Process UUID

    Returns:
        Exit code if process has terminated, None if still running or not found
    """
    if uuid not in self.exit_codes:
        return None

    return self.exit_codes[uuid]
get_process_stderr(uuid)

Get stderr from process.

Note: With output redirection to log files, stderr capture is minimal.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stderr content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get stderr from process.

    Note: With output redirection to log files, stderr capture is minimal.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stderr content as string, None if not found
    """
    if uuid not in self.stderr_buffers:
        return None

    with self.locks.get(uuid, threading.Lock()):
        return "\n".join(self.stderr_buffers[uuid])
get_process_stdout(uuid)

Get stdout from process.

Note: With output redirection to log files, stdout capture is minimal. This primarily captures the initial "SSHPM: Starting process..." message.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stdout content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get stdout from process.

    Note: With output redirection to log files, stdout capture is minimal.
    This primarily captures the initial "SSHPM: Starting process..." message.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stdout content as string, None if not found
    """
    if uuid not in self.stdout_buffers:
        return None

    with self.locks.get(uuid, threading.Lock()):
        return "\n".join(self.stdout_buffers[uuid])
is_process_alive(uuid)

Check if SSH process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check

required

Returns:

Type Description
bool

True if process is alive, False otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if SSH process is alive.

    Args:
        uuid: Process UUID to check

    Returns:
        True if process is alive, False otherwise
    """
    if uuid not in self.channels:
        return False

    channel = self.channels[uuid]
    return not channel.exit_status_ready()
read_log_file(hostname, user, log_file, num_lines=100)

Read remote log file via SSH.

Creates a temporary SSH connection to read the log file and returns the last N lines.

Parameters:

Name Type Description Default
hostname str

Target hostname

required
user str

SSH username

required
log_file str

Remote log file path

required
num_lines int

Number of lines to read from end of file

100

Returns:

Type Description
List[str]

List of log lines

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def read_log_file(
    self, hostname: str, user: str, log_file: str, num_lines: int = 100
) -> List[str]:
    """
    Read remote log file via SSH.

    Creates a temporary SSH connection to read the log file and returns
    the last N lines.

    Args:
        hostname: Target hostname
        user: SSH username
        log_file: Remote log file path
        num_lines: Number of lines to read from end of file

    Returns:
        List of log lines
    """
    # Create temporary SSH client for log retrieval
    # Disable agent/key lookup for simpler temporary connection
    client = self._create_ssh_client(hostname, user, enable_agent=False)
    if client is None:
        self.log.debug(
            f"Returning empty log lines for {hostname}, was unable to connect."
        )
        return []

    try:
        # Execute tail command to retrieve last N lines
        stdin, stdout, stderr = client.exec_command(
            f"tail -{num_lines} {log_file}", timeout=10.0
        )

        # Read output lines
        lines = stdout.readlines()

        # Check for errors during command execution
        error_output = stderr.read().decode("utf-8", errors="replace")
        if error_output:
            self.log.warning(f"Error reading log file: {error_output}")
            return [f"Could not retrieve logs: {error_output}"]

        return lines

    except Exception as e:
        self.log.error(f"Failed to read remote log file: {e}")
        return [f"Could not retrieve logs: {e!s}"]

    finally:
        # Always close temporary connection
        if client is not None:
            try:
                client.close()
            except Exception:
                pass
start_process(uuid, boot_request)

Start a remote process via SSH using the boot request configuration.

Extracts all necessary parameters from the boot request and delegates to _execute_ssh_command for SSH connection and process execution.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process

required
boot_request BootRequest

BootRequest containing process configuration, metadata, environment variables, and execution parameters

required

Raises:

Type Description
RuntimeError

If SSH connection or process execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process via SSH using the boot request configuration.

    Extracts all necessary parameters from the boot request and delegates
    to _execute_ssh_command for SSH connection and process execution.

    Args:
        uuid: Unique identifier for this process
        boot_request: BootRequest containing process configuration, metadata,
                    environment variables, and execution parameters

    Raises:
        RuntimeError: If SSH connection or process execution fails
    """
    # Extract connection parameters from boot request metadata
    hostname = boot_request.process_description.metadata.hostname
    user = boot_request.process_description.metadata.user
    log_file = boot_request.process_description.process_logs_path

    # Extract environment variables from boot request
    env_vars = (
        dict(boot_request.process_description.env)
        if boot_request.process_description.env
        else {}
    )

    # Build command string from executable and arguments
    cmd = ""
    for exe_arg in boot_request.process_description.executable_and_arguments:
        cmd += exe_arg.exec
        for arg in exe_arg.args:
            cmd += f" {arg}"
        cmd += ";"

    # Remove trailing semicolon if present
    if cmd.endswith(";"):
        cmd = cmd[:-1]

    # Execute the command via SSH
    self._execute_ssh_command(
        uuid=uuid,
        boot_request=boot_request,
        hostname=hostname,
        user=user if user else getpass.getuser(),
        command=cmd,
        log_file=log_file,
        env_vars=env_vars,
    )
terminate_process(uuid, timeout=10.0)

Terminate SSH process by closing the connection.

The PTY allocation ensures the remote process receives SIGHUP when the SSH connection closes, causing graceful termination.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
timeout float

Timeout for graceful termination (used for waiting)

10.0
Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def terminate_process(self, uuid: str, timeout: float = 10.0) -> None:
    """
    Terminate SSH process by closing the connection.

    The PTY allocation ensures the remote process receives SIGHUP when
    the SSH connection closes, causing graceful termination.

    Args:
        uuid: Process UUID to terminate
        timeout: Timeout for graceful termination (used for waiting)
    """
    if uuid not in self.connections:
        return

    channel = self.channels.get(uuid)
    if channel and channel.exit_status_ready():
        return  # Already terminated

    try:
        # Close the channel and connection
        # This sends SIGHUP to the remote process due to PTY
        if channel:
            channel.close()

        self.connections[uuid].close()

        # Wait for exit status with timeout
        if channel:
            start_time = time.time()
            while (
                not channel.exit_status_ready()
                and (time.time() - start_time) < timeout
            ):
                time.sleep(0.1)

    except Exception as e:
        self.log.warning(f"Error terminating process {uuid}: {e}")
validate_host_connection(host, auth_method, user=getpass.getuser())

Validate SSH connections for all hosts in the collected applications.

This method attempts to establish an SSH connection to the specified host and execute a simple command to verify connectivity. Used to validate access.

Parameters:

Name Type Description Default
host str

Target hostname

required
auth_method str

Authentication method to use ('publickey' or 'gssapi-with-mic')

required
user str

SSH username (default: current user)

getuser()

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If SSH connection or command execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_paramiko.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str = getpass.getuser(),
) -> None:
    """
    Validate SSH connections for all hosts in the collected applications.

    This method attempts to establish an SSH connection to the specified host
    and execute a simple command to verify connectivity. Used to validate access.

    Args:
        host: Target hostname
        auth_method: Authentication method to use ('publickey' or 'gssapi-with-mic')
        user: SSH username (default: current user)

    Returns:
        None

    Raises:
        RuntimeError: If SSH connection or command execution fails
    """
    # Create and connect SSH client
    client = self._create_ssh_client(
        hostname=host, user=user, enable_agent=True, auth_methods=[auth_method]
    )
    if client is None:
        raise RuntimeError("SSH connection failed")
    try:
        # Attempt SSH connection and command execution
        remote_cmd = f'echo "{user} established SSH successfully";'
        stdin, stdout, stderr = client.exec_command(remote_cmd, timeout=10.0)

        # recv_exit_status() blocks until the remote command has finished and
        # returns the exit code
        exit_status = stdout.channel.recv_exit_status()

        self.log.debug(f"SSH doctor command exit status: {exit_status}")

    finally:
        if client:
            client.close()

Functions