Skip to content

ssh_process_lifetime_manager_shell

drunc.processes.ssh_process_lifetime_manager_shell

Provides SSH connection and lifetime management using sh library.

This implementation uses the sh library to execute SSH commands, replicating the behaviour of the original SSHProcessManager.

Classes

ProcessWatcherThread(uuid, process, on_exit, logger)

Bases: Thread

Thread that monitors a background SSH process and invokes callback on exit.

Initialise process watcher thread.

Parameters:

Name Type Description Default
uuid str

Process UUID to monitor

required
process RunningCommand

sh.RunningCommand instance to monitor

required
on_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Callback function invoked on process exit

required
logger Logger

Logger instance for output

required
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    uuid: str,
    process: sh.RunningCommand,
    on_exit: Optional[Callable[[str, Optional[int], Optional[Exception]], None]],
    logger: logging.Logger,
):
    """
    Initialise process watcher thread.

    Args:
        uuid: Process UUID to monitor
        process: sh.RunningCommand instance to monitor
        on_exit: Callback function invoked on process exit
        logger: Logger instance for output
    """
    super().__init__(name=f"ShellWatcher-{uuid}", daemon=True)
    self.uuid = uuid
    self.process = process
    self.on_exit = on_exit
    self.logger = logger
Functions
run()

Monitor process and invoke callback on exit.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def run(self):
    """
    Monitor process and invoke callback on exit.
    """
    exception = None
    exit_code = None

    try:
        # Wait for process to complete
        self.process.wait()
        exit_code = self.process.exit_code
        self.logger.debug(f"Shell process {self.uuid} exited with code {exit_code}")

    except sh.ErrorReturnCode as e:
        # Process exited with non-zero code
        exception = e
        exit_code = e.exit_code
        # self.logger.error(f"Shell process {self.uuid} error: {e}")
        self.logger.debug(f"Shell process {self.uuid} error: {e}")

    except Exception as e:
        # Unexpected error during monitoring
        exception = e
        self.logger.error(f"Shell process {self.uuid} watcher error: {e}")

    # Invoke callback with results
    if self.on_exit:
        try:
            self.on_exit(self.uuid, exit_code, exception)
        except Exception as callback_error:
            self.logger.error(
                f"Error in process exit callback for {self.uuid}: {callback_error}"
            )

SSHProcessLifetimeManagerShell(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

Manages process lifecycle using sh library for SSH connections.

This implementation uses the sh library's SSH command wrapper to start and manage remote processes, matching the behaviour of the original SSHProcessManager implementation.

Initialise SSH process lifetime manager using sh library.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost

False
logger Optional[Logger]

Logger instance for real-time output logging

None
on_process_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Optional callback function(uuid, exit_code, exception) invoked when process exits

None
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[int], Optional[Exception]], None]
    ] = None,
):
    """
    Initialise SSH process lifetime manager using sh library.

    Args:
        disable_host_key_check: Disable SSH host key verification for all hosts
        disable_localhost_host_key_check: Disable SSH host key verification for localhost
        logger: Logger instance for real-time output logging
        on_process_exit: Optional callback function(uuid, exit_code, exception) invoked when process exits
    """
    self.disable_host_key_check = disable_host_key_check
    self.disable_localhost_host_key_check = disable_localhost_host_key_check
    self.log = logger if logger else get_logger(__name__)
    self.on_process_exit = on_process_exit

    # Create SSH command wrapper
    self.ssh = sh.Command("/usr/bin/ssh")

    # Process tracking (one per UUID)
    self.process_store: Dict[str, sh.RunningCommand] = {}

    # Thread tracking for monitoring
    self.watchers: List[threading.Thread] = []

    # Thread-safe lock for process store modifications
    self.lock = threading.Lock()
Functions
cleanup_all()

Clean up all processes and resources.

Terminates all managed processes and releases all associated resources.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def cleanup_all(self) -> None:
    """
    Clean up all processes and resources.

    Terminates all managed processes and releases all associated resources.
    """
    # Get list of UUIDs to terminate
    with self.lock:
        uuids = list(self.process_store.keys())

    # Terminate all processes
    for uuid in uuids:
        self.cleanup_process(uuid)

    # Wait for watcher threads
    for watcher in self.watchers:
        try:
            watcher.join(timeout=2.0)
        except Exception:
            pass

    self.watchers.clear()
cleanup_process(uuid)

Clean up process resources.

Terminates the process (if still running) and releases all associated resources.

Parameters:

Name Type Description Default
uuid str

Process UUID to clean up

required
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def cleanup_process(self, uuid: str) -> None:
    """
    Clean up process resources.

    Terminates the process (if still running) and releases all associated resources.

    Args:
        uuid: Process UUID to clean up
    """
    # Terminate if still running
    if uuid in self.process_store:
        process = self.process_store[uuid]
        if process.is_alive():
            self.terminate_process(uuid)

    # Remove from process store
    with self.lock:
        if uuid in self.process_store:
            del self.process_store[uuid]
get_active_process_keys()

Get list of active process UUIDs.

Returns:

Type Description
List[str]

List of active process UUID strings

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_active_process_keys(self) -> List[str]:
    """
    Get list of active process UUIDs.

    Returns:
        List of active process UUID strings
    """
    with self.lock:
        return list(self.process_store.keys())
get_exit_code(uuid)

Get process exit code.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[int]

Exit code if process has terminated, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_exit_code(self, uuid: str) -> Optional[int]:
    """
    Get process exit code.

    Args:
        uuid: Process UUID

    Returns:
        Exit code if process has terminated, None if still running or not found
    """
    if uuid not in self.process_store:
        return None

    process = self.process_store[uuid]
    if process.is_alive():
        return None

    try:
        return process.exit_code
    except Exception:
        return None
get_process_stderr(uuid)

Get stderr from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stderr content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get stderr from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stderr content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid]
        if hasattr(process, "stderr"):
            stderr_data = process.stderr
            if stderr_data:
                return str(stderr_data)
    except Exception as e:
        self.log.debug(f"Error getting stderr for {uuid}: {e}")

    return None
get_process_stdout(uuid)

Get stdout from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stdout content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get stdout from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stdout content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid]
        if hasattr(process, "stdout"):
            stdout_data = process.stdout
            if stdout_data:
                return str(stdout_data)
    except Exception as e:
        self.log.debug(f"Error getting stdout for {uuid}: {e}")

    return None
is_process_alive(uuid)

Check if process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check

required

Returns:

Type Description
bool

True if process is alive, False otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if process is alive.

    Args:
        uuid: Process UUID to check

    Returns:
        True if process is alive, False otherwise
    """
    if uuid not in self.process_store:
        return False

    process = self.process_store[uuid]
    return process.is_alive()
read_log_file(hostname, user, log_file, num_lines=100)

Read remote log file via SSH.

Creates a temporary SSH connection to read the log file and returns the last N lines using the tail command.

Parameters:

Name Type Description Default
hostname str

Target hostname

required
user str

SSH username

required
log_file str

Remote log file path

required
num_lines int

Number of lines to read from end of file

100

Returns:

Type Description
List[str]

List of log lines

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def read_log_file(
    self, hostname: str, user: str, log_file: str, num_lines: int = 100
) -> List[str]:
    """
    Read remote log file via SSH.

    Creates a temporary SSH connection to read the log file and returns
    the last N lines using the tail command.

    Args:
        hostname: Target hostname
        user: SSH username
        log_file: Remote log file path
        num_lines: Number of lines to read from end of file

    Returns:
        List of log lines
    """
    # Create temporary file for output
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()

    try:
        # Build user@host string
        user_host = f"{user}@{hostname}"

        # Determine host key checking policy
        disable_host_key_check = self.disable_host_key_check or (
            self.disable_localhost_host_key_check
            and hostname in ("localhost", "127.0.0.1", "::1")
        )

        # Build SSH arguments
        arguments = [user_host, "-tt", "-o", "StrictHostKeyChecking=no"]

        # Add host key check bypass options if configured
        if disable_host_key_check:
            arguments.extend(
                [
                    "-o",
                    "LogLevel=error",
                    "-o",
                    "GlobalKnownHostsFile=/dev/null",
                    "-o",
                    "UserKnownHostsFile=/dev/null",
                ]
            )

        # Add tail command
        arguments.extend(["tail", f"-{num_lines}", log_file])

        # Execute SSH command with output redirection
        self.ssh(
            *arguments,
            _out=temp_file.name,
            _err_to_out=True,
        )

        # Read output lines from temporary file
        with open(temp_file.name) as f:
            lines = f.readlines()

            # Remove SSH connection closure message if present
            if lines and "Connection to " in lines[-1] and " closed." in lines[-1]:
                lines = lines[:-1]

            return lines

    except Exception as e:
        self.log.error(f"Failed to read remote log file: {e}")
        return [f"Could not retrieve logs: {e!s}"]

    finally:
        # Clean up temporary file
        try:
            os.remove(temp_file.name)
        except Exception:
            pass
start_process(uuid, boot_request)

Start a remote process via SSH using the boot request configuration.

Extracts all necessary parameters from the boot request and executes the process on the remote host using sh library's SSH wrapper.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process

required
boot_request BootRequest

BootRequest containing process configuration, metadata, environment variables, and execution parameters

required

Raises:

Type Description
RuntimeError

If SSH connection or process execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process via SSH using the boot request configuration.

    Extracts all necessary parameters from the boot request and executes
    the process on the remote host using sh library's SSH wrapper.

    Args:
        uuid: Unique identifier for this process
        boot_request: BootRequest containing process configuration, metadata,
                    environment variables, and execution parameters

    Raises:
        RuntimeError: If SSH connection or process execution fails
    """
    # Extract connection parameters from boot request metadata
    hostname = boot_request.process_description.metadata.hostname
    user = boot_request.process_description.metadata.user
    log_file = boot_request.process_description.process_logs_path

    # Extract environment variables from boot request
    env_vars = (
        dict(boot_request.process_description.env)
        if boot_request.process_description.env
        else {}
    )

    # Build command string from executable and arguments
    cmd = ""
    for exe_arg in boot_request.process_description.executable_and_arguments:
        cmd += exe_arg.exec
        for arg in exe_arg.args:
            cmd += f" {arg}"
        cmd += ";"

    # Remove trailing semicolon if present
    if cmd.endswith(";"):
        cmd = cmd[:-1]

    # Execute the command via SSH
    self._execute_ssh_command(
        uuid=uuid,
        boot_request=boot_request,
        hostname=hostname,
        user=user if user else getpass.getuser(),
        command=cmd,
        log_file=log_file,
        env_vars=env_vars,
    )
terminate_process(uuid, timeout=10.0)

Terminate process by sending signals.

Sends SIGQUIT followed by SIGKILL if necessary, with the configured timeout between signals.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
timeout float

Timeout between signals in seconds

10.0
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def terminate_process(self, uuid: str, timeout: float = 10.0) -> None:
    """
    Terminate process by sending signals.

    Sends SIGQUIT followed by SIGKILL if necessary, with the configured
    timeout between signals.

    Args:
        uuid: Process UUID to terminate
        timeout: Timeout between signals in seconds
    """
    if uuid not in self.process_store:
        return

    process = self.process_store[uuid]

    if not process.is_alive():
        return  # Already terminated

    try:
        # Signal sequence: SIGQUIT (graceful) then SIGKILL (forceful)
        signal_sequence = [
            signal.SIGQUIT,
            signal.SIGKILL,
        ]

        for sig in signal_sequence:
            if not process.is_alive():
                self.log.info(f"Process {uuid} terminated")
                break

            self.log.debug(
                f"Sending signal '{str(sig).split('.')[-1]}' to process {uuid}"
            )
            process.signal_group(sig)

            if not process.is_alive():
                break

            sleep(timeout)

    except Exception as e:
        self.log.warning(f"Error terminating process {uuid}: {e}")
validate_host_connection(host, auth_method, user=getpass.getuser())

Validate SSH connection to the specified host.

Attempts to establish an SSH connection to the host and execute a simple echo command to verify connectivity.

Parameters:

Name Type Description Default
host str

Target hostname

required
auth_method str

Authentication method (not used in sh implementation)

required
user str

SSH username (default: current user)

getuser()

Raises:

Type Description
RuntimeError

If SSH connection or command execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str = getpass.getuser(),
) -> None:
    """
    Validate SSH connection to the specified host.

    Attempts to establish an SSH connection to the host and execute a
    simple echo command to verify connectivity.

    Args:
        host: Target hostname
        auth_method: Authentication method (not used in sh implementation)
        user: SSH username (default: current user)

    Raises:
        RuntimeError: If SSH connection or command execution fails
    """
    try:
        # Build user@host string
        user_host = f"{user}@{host}"

        # Determine host key checking policy
        disable_host_key_check = self.disable_host_key_check or (
            self.disable_localhost_host_key_check
            and host in ("localhost", "127.0.0.1", "::1")
        )

        # Build remote command
        remote_cmd = f'echo "{user} established SSH successfully";'

        # Build SSH arguments
        arguments = [user_host, "-tt", "-o", "StrictHostKeyChecking=no"]

        # Add host key check bypass options if configured
        if disable_host_key_check:
            arguments.extend(
                [
                    "-o",
                    "LogLevel=error",
                    "-o",
                    "GlobalKnownHostsFile=/dev/null",
                    "-o",
                    "UserKnownHostsFile=/dev/null",
                ]
            )

        # Add the remote command
        arguments.append(remote_cmd)

        # Execute SSH command and wait for completion
        self.ssh(*arguments)

        self.log.debug(f"SSH validation successful for {user}@{host}")

    except Exception as e:
        self.log.error(f"SSH validation failed for {user}@{host}: {e}")
        raise RuntimeError(f"SSH connection validation failed: {e}")

Functions