Skip to content

ssh_process_lifetime_manager_shell

drunc.processes.ssh_process_lifetime_manager_shell

Provides SSH connection and lifetime management using sh library to invoke shell commands over SSH.

Classes

ProcessWatcherThread(uuid, process, manager, hostname, user, metadata_file, on_exit, logger)

Bases: Thread

Thread that monitors a background SSH process and invokes callback on exit.

Initialise process watcher thread.

Parameters:

Name Type Description Default
uuid str

Process UUID to monitor

required
process RunningCommand

sh.RunningCommand instance to monitor

required
manager SSHProcessLifetimeManagerShell

Parent manager instance for metadata updates

required
hostname str

Remote hostname for metadata retrieval

required
user str

Remote user for metadata retrieval

required
metadata_file str

Path to metadata file on remote host

required
on_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Callback function invoked on process exit

required
logger Logger

Logger instance for output

required
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    uuid: str,
    process: sh.RunningCommand,
    manager: "SSHProcessLifetimeManagerShell",
    hostname: str,
    user: str,
    metadata_file: str,
    on_exit: Optional[Callable[[str, Optional[int], Optional[Exception]], None]],
    logger: logging.Logger,
):
    """
    Initialise process watcher thread.

    Args:
        uuid: Process UUID to monitor
        process: sh.RunningCommand instance to monitor
        manager: Parent manager instance for metadata updates
        hostname: Remote hostname for metadata retrieval
        user: Remote user for metadata retrieval
        metadata_file: Path to metadata file on remote host
        on_exit: Callback function invoked on process exit
        logger: Logger instance for output
    """
    super().__init__(name=f"ShellWatcher-{uuid}", daemon=True)
    self.uuid = uuid
    self.process = process
    self.manager = manager
    self.hostname = hostname
    self.user = user
    self.metadata_file = metadata_file
    self.on_exit = on_exit
    self.logger = logger
    self.__is_monitoring_remotely = False
Functions
is_monitoring_remotely()

Check if the watcher is monitoring the remote process directly.

Returns:

Type Description
bool

True if monitoring remote process, False if monitoring SSH client

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def is_monitoring_remotely(self) -> bool:
    """
    Check if the watcher is monitoring the remote process directly.

    Returns:
        True if monitoring remote process, False if monitoring SSH client
    """
    return self.__is_monitoring_remotely
run()

Monitor process, read metadata asynchronously, and invoke callback on exit.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def run(self):
    """
    Monitor process, read metadata asynchronously, and invoke callback on exit.
    """
    try:
        metadata = self.manager.read_process_metadata(
            self.uuid,
            self.metadata_file,
            self.hostname,
            self.user,
        )
        if metadata:
            with self.manager.lock:
                self.manager.metadata[self.uuid] = metadata
            self.logger.debug(f"Metadata retrieved for process {self.uuid}")
        else:
            # If metadata could not be read, fall back to monitoring SSH client
            self.logger.warning(
                f"Failed to retrieve metadata for process {self.uuid}. "
                f"Falling back to SSH client monitoring."
            )
            self._monitor_ssh_client()
            return
    except Exception as e:
        self.logger.warning(
            f"Exception reading metadata for process {self.uuid}: {e}. "
            f"Falling back to SSH client monitoring."
        )
        self._monitor_ssh_client()
        return

    # Monitor the remote process directly
    self._monitor_remote_process(metadata.pid)

SSHProcessLifetimeManagerShell(disable_host_key_check=False, disable_localhost_host_key_check=False, logger=None, on_process_exit=None)

Bases: ProcessLifetimeManager

Manages process lifecycle using sh library for SSH connections. Uses the sh library's SSH command wrapper to start and manage remote processes.

Initialise SSH process lifetime manager using sh library.

Parameters:

Name Type Description Default
disable_host_key_check bool

Disable SSH host key verification for all hosts

False
disable_localhost_host_key_check bool

Disable SSH host key verification for localhost

False
logger Optional[Logger]

Logger instance for real-time output logging

None
on_process_exit Optional[Callable[[str, Optional[int], Optional[Exception]], None]]

Optional callback function(uuid, exit_code, exception) invoked when process exits

None
Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def __init__(
    self,
    disable_host_key_check: bool = False,
    disable_localhost_host_key_check: bool = False,
    logger: Optional[logging.Logger] = None,
    on_process_exit: Optional[
        Callable[[str, Optional[int], Optional[Exception]], None]
    ] = None,
):
    """
    Initialise SSH process lifetime manager using sh library.

    Args:
        disable_host_key_check: Disable SSH host key verification for all hosts
        disable_localhost_host_key_check: Disable SSH host key verification for localhost
        logger: Logger instance for real-time output logging
        on_process_exit: Optional callback function(uuid, exit_code, exception) invoked when process exits
    """
    self.disable_host_key_check = disable_host_key_check
    self.disable_localhost_host_key_check = disable_localhost_host_key_check
    self.log = logger if logger else get_logger(__name__)
    self.on_process_exit = on_process_exit

    # Create SSH command wrapper
    self.ssh = sh.Command("/usr/bin/ssh")

    # Process tracking (one per UUID)
    self.process_store: Dict[str, sh.RunningCommand] = {}

    # Thread tracking for monitoring
    self.watchers: Dict[str, threading.Thread] = {}

    # Thread-safe lock for process store modifications
    self.lock = threading.Lock()

    # metadata for each process
    self.metadata: Dict[str, ProcessMetadata] = {}
Functions
get_active_process_keys()

Get list of active process UUIDs.

Returns:

Type Description
List[str]

List of active process UUID strings

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_active_process_keys(self) -> List[str]:
    """
    Get list of active process UUIDs.

    Returns:
        List of active process UUID strings
    """
    with self.lock:
        return list(self.process_store.keys())
get_metadata_file_path(uuid) staticmethod

Generate metadata file path for a given process UUID.

Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp. The path will be expanded on the remote host when the command executes.

Parameters:

Name Type Description Default
uuid str

Process UUID to generate metadata file path for

required

Returns:

Type Description
str

Shell-expandable path string containing environment variable reference

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
@staticmethod
def get_metadata_file_path(uuid: str) -> str:
    """
    Generate metadata file path for a given process UUID.

    Uses XDG_RUNTIME_DIR if available, otherwise falls back to /tmp.
    The path will be expanded on the remote host when the command executes.

    Args:
        uuid: Process UUID to generate metadata file path for

    Returns:
        Shell-expandable path string containing environment variable reference
    """
    return f"${{XDG_RUNTIME_DIR:-/tmp}}/drunc/metadata_{uuid}.json"
get_process_stderr(uuid)

Get stderr from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stderr content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stderr(self, uuid: str) -> Optional[str]:
    """
    Get stderr from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stderr content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid]["process"]
        if hasattr(process, "stderr"):
            stderr_data = process.stderr
            if stderr_data:
                return str(stderr_data)
    except Exception as e:
        self.log.debug(f"Error getting stderr for {uuid}: {e}")

    return None
get_process_stdout(uuid)

Get stdout from process.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[str]

Accumulated stdout content as string, None if not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def get_process_stdout(self, uuid: str) -> Optional[str]:
    """
    Get stdout from process.

    Args:
        uuid: Process UUID

    Returns:
        Accumulated stdout content as string, None if not found
    """
    if uuid not in self.process_store:
        return None

    try:
        process = self.process_store[uuid]["process"]
        if hasattr(process, "stdout"):
            stdout_data = process.stdout
            if stdout_data:
                return str(stdout_data)
    except Exception as e:
        self.log.debug(f"Error getting stdout for {uuid}: {e}")

    return None
is_process_alive(uuid)

Check if process is alive.

Parameters:

Name Type Description Default
uuid str

Process UUID to check

required

Returns:

Type Description
bool

True if process is alive, False otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def is_process_alive(self, uuid: str) -> bool:
    """
    Check if process is alive.

    Args:
        uuid: Process UUID to check

    Returns:
        True if process is alive, False otherwise
    """
    if uuid not in self.process_store:
        return False

    process = self.process_store[uuid]["process"]
    metadata: ProcessMetadata = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.debug(
            f"No metadata or PID found for {uuid}, relying on SSH client process status"
        )
        return process.is_alive()

    remote_process_alive = self._is_remote_process_alive(
        self.process_store[uuid]["hostname"],
        self.process_store[uuid]["user"],
        metadata.pid,
        uuid,
    )
    return process.is_alive() and remote_process_alive
kill_all_processes(process_timeouts=None)

Kill all active processes in role-based shutdown order.

Retrieves all active process UUIDs and delegates to kill_processes() for role-based termination. Waits for all monitoring threads to complete after termination.

Parameters:

Name Type Description Default
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping all process UUIDs to their exit codes

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_all_processes(
    self, process_timeouts: Optional[Dict[str, float]] = None
) -> Dict[str, Optional[int]]:
    """
    Kill all active processes in role-based shutdown order.

    Retrieves all active process UUIDs and delegates to kill_processes()
    for role-based termination. Waits for all monitoring threads to
    complete after termination.

    Args:
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping all process UUIDs to their exit codes
    """
    # Retrieve all active process UUIDs
    with self.lock:
        active_uuids = list(self.process_store.keys())

    if not active_uuids:
        self.log.info("No processes to terminate")
        return {}

    self.log.info(f"Terminating all {len(active_uuids)} active process(es)")

    # Delegate to kill_processes for role-based shutdown ordering
    all_exit_codes = self.kill_processes(active_uuids, process_timeouts)

    # Wait for all watcher threads to complete
    with self.lock:
        watchers_to_join = list(self.watchers.values())

    for watcher in watchers_to_join:
        try:
            watcher.join(timeout=2.0)
        except Exception:
            pass

    with self.lock:
        self.watchers.clear()

    return all_exit_codes
kill_process(uuid, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS)

Kill a remote process and clean up all associated resources.

Sends termination signals to the remote process, waits for it to die, cleans up remote metadata files, terminates the SSH client, and removes the process from internal tracking. Safe to call multiple times.

Parameters:

Name Type Description Default
uuid str

Process UUID to terminate

required
timeout float

Timeout for graceful termination in seconds

DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

Returns:

Type Description
Optional[int]

Exit code of the terminated process, or None if not found or still running

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_process(
    self,
    uuid: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS,
) -> Optional[int]:
    """
    Kill a remote process and clean up all associated resources.

    Sends termination signals to the remote process, waits for it to die,
    cleans up remote metadata files, terminates the SSH client, and removes
    the process from internal tracking. Safe to call multiple times.

    Args:
        uuid: Process UUID to terminate
        timeout: Timeout for graceful termination in seconds

    Returns:
        Exit code of the terminated process, or None if not found or still running
    """
    if uuid not in self.process_store:
        return None

    process_info = self.process_store[uuid]

    hostname = process_info["hostname"]
    user = process_info["user"]
    metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid)

    # Read metadata to get remote PID
    metadata = self.metadata.get(uuid, None)
    if metadata is None or metadata.pid is None:
        self.log.warning(
            f"No remote PID for {uuid}, terminating SSH client. Cannot guarantee remote process termination."
        )
        self._kill_client_process(process_info)
        exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
        self._cleanup_process_resources(uuid)
        return exit_code

    remote_pid = metadata.pid
    process_dead = False

    try:
        if not self.is_process_alive(uuid):
            self.log.info(
                f"Skipping killing remote process {uuid} (PID {remote_pid}). It is already dead."
            )
            process_dead = True

        if not process_dead:
            self.log.debug(f"Sending SIGQUIT to remote PID {remote_pid}")
            self._send_remote_signal(hostname, user, remote_pid, "QUIT")
            process_dead = self.wait_for_process_to_die(
                uuid, timeout=timeout, logger=self.log
            )
            if process_dead:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) terminated gracefully following SIGQUIT signal."
                )
            else:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGQUIT signal."
                )

        if not process_dead:
            self.log.debug(f"Sending SIGKILL to remote PID {remote_pid}")
            self._send_remote_signal(hostname, user, remote_pid, "KILL")
            process_dead = self.wait_for_process_to_die(
                uuid, timeout=timeout, logger=self.log
            )

            if process_dead:
                self.log.info(
                    f"Remote process {uuid} (PID {remote_pid}) terminated forcibly following SIGKILL signal."
                )
            else:
                self.log.debug(
                    f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGKILL signal."
                )

        if not process_dead:
            self.log.error(
                f"Remote process {uuid} (PID {remote_pid}) still did not terminate after SIGKILL signal."
            )
        else:
            exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout)
            self._cleanup_remote_file(hostname, user, metadata_file)
            self._cleanup_process_resources(uuid)
            return exit_code

    except Exception as e:
        self.log.error(f"Error terminating remote process {uuid}: {e}")
        return None
kill_processes(uuids, process_timeouts=None)

Kill multiple processes by their UUIDs in role-based shutdown order.

Executes a staged shutdown by role. Processes within each role are terminated asynchronously. After all roles complete, any remaining processes are killed asynchronously as a fallback.

Parameters:

Name Type Description Default
uuids List[str]

List of process UUIDs to terminate

required
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping process UUIDs to their exit codes

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_processes(
    self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None
) -> Dict[str, Optional[int]]:
    """
    Kill multiple processes by their UUIDs in role-based shutdown order.

    Executes a staged shutdown by role. Processes within each role are terminated
    asynchronously. After all roles complete, any remaining processes
    are killed asynchronously as a fallback.

    Args:
        uuids: List of process UUIDs to terminate
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping process UUIDs to their exit codes
    """
    if not uuids:
        self.log.debug("No processes to kill")
        return {}

    if process_timeouts is None:
        process_timeouts = {}

    # Ensure all UUIDs have timeout values
    for uuid in uuids:
        if uuid not in process_timeouts:
            process_timeouts[uuid] = self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS

    all_exit_codes: Dict[str, Optional[int]] = {}
    killed_uuids = set()

    # Execute role-based shutdown in stages
    for role in PROCESS_SHUTDOWN_ORDERING:
        self.log.info(
            f"--- Shutdown stage: Terminating role '{role}' from provided UUIDs ---"
        )
        role_exit_codes = self.kill_processes_by_role(
            role, uuids, process_timeouts=process_timeouts
        )
        all_exit_codes.update(role_exit_codes)
        killed_uuids.update(role_exit_codes.keys())

        if role_exit_codes:
            self.log.info(f"--- Shutdown stage: Role '{role}' complete ---")

    # Identify processes not killed during role-based shutdown
    remaining_uuids = [uuid for uuid in uuids if uuid not in killed_uuids]

    if remaining_uuids:
        self.log.info(
            f"Fallback: Killing {len(remaining_uuids)} process(es) without "
            f"role metadata asynchronously"
        )

        # Kill remaining processes asynchronously without role ordering
        fallback_exit_codes: Dict[str, Optional[int]] = {}

        with ThreadPoolExecutor(max_workers=len(remaining_uuids)) as executor:
            # Submit kill tasks for all remaining processes
            future_to_uuid = {
                executor.submit(
                    self.kill_process, uuid, process_timeouts[uuid]
                ): uuid
                for uuid in remaining_uuids
            }

            # Collect results as they complete
            for future in as_completed(future_to_uuid):
                uuid = future_to_uuid[future]
                try:
                    exit_code = future.result()
                    fallback_exit_codes[uuid] = exit_code
                except Exception as e:
                    self.log.error(
                        f"Error during fallback termination of process {uuid}: {e}"
                    )
                    fallback_exit_codes[uuid] = None

        all_exit_codes.update(fallback_exit_codes)

    return all_exit_codes
kill_processes_by_role(role, candidate_uuids, process_timeouts=None)

Kill all processes with the specified role from candidate UUID list.

Filters candidate UUIDs by matching metadata roles, then terminates matching processes asynchronously using a thread pool.

Parameters:

Name Type Description Default
role str

Process role to match

required
candidate_uuids List[str]

List of process UUIDs to filter by role

required
process_timeouts Optional[Dict[str, float]]

Dictionary mapping process UUIDs to timeout values in seconds. Uses default timeout for unmapped UUIDs.

None

Returns:

Type Description
Dict[str, Optional[int]]

Dictionary mapping terminated process UUIDs to their exit codes

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def kill_processes_by_role(
    self,
    role: str,
    candidate_uuids: List[str],
    process_timeouts: Optional[Dict[str, float]] = None,
) -> Dict[str, Optional[int]]:
    """
    Kill all processes with the specified role from candidate UUID list.

    Filters candidate UUIDs by matching metadata roles, then terminates
    matching processes asynchronously using a thread pool.

    Args:
        role: Process role to match
        candidate_uuids: List of process UUIDs to filter by role
        process_timeouts: Dictionary mapping process UUIDs to timeout values
                        in seconds. Uses default timeout for unmapped UUIDs.

    Returns:
        Dictionary mapping terminated process UUIDs to their exit codes
    """
    self.log.debug(f"process_timeouts: {process_timeouts}")
    if process_timeouts is None:
        process_timeouts = {}

    # Filter candidate UUIDs by role using process metadata
    uuids_to_kill = []
    with self.lock:
        for uuid in candidate_uuids:
            metadata = self.metadata.get(uuid, None)
            if metadata and metadata.role == role:
                uuids_to_kill.append(uuid)
                if uuid not in process_timeouts:
                    process_timeouts[uuid] = (
                        self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS
                    )

    if not uuids_to_kill:
        self.log.debug(f"No processes found with role '{role}' in candidate list")
        return {}

    self.log.info(
        f"Killing {len(uuids_to_kill)} process(es) with role '{role}' "
        f"from {len(candidate_uuids)} candidates"
    )

    exit_codes: Dict[str, Optional[int]] = {}

    # Terminate processes asynchronously using thread pool
    with ThreadPoolExecutor(max_workers=len(uuids_to_kill)) as executor:
        # Submit kill tasks for all matching processes
        future_to_uuid = {
            executor.submit(self.kill_process, uuid, process_timeouts[uuid]): uuid
            for uuid in uuids_to_kill
        }

        # Collect results as they complete
        for future in as_completed(future_to_uuid):
            uuid = future_to_uuid[future]
            try:
                exit_code = future.result()
                exit_codes[uuid] = exit_code
            except Exception as e:
                self.log.error(
                    f"Error during termination of process {uuid} with role '{role}': {e}"
                )
                exit_codes[uuid] = None

    return exit_codes
pop_early_exit_code(uuid)

Get process exit code if process exited early without being killed.

This method checks if a process has terminated unexpectedly (without kill_process being called). If an exit code is found, the process resources are cleaned up automatically.

Parameters:

Name Type Description Default
uuid str

Process UUID

required

Returns:

Type Description
Optional[int]

Exit code if process has terminated early, None if still running or not found

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def pop_early_exit_code(self, uuid: str) -> Optional[int]:
    """
    Get process exit code if process exited early without being killed.

    This method checks if a process has terminated unexpectedly (without
    kill_process being called). If an exit code is found, the process
    resources are cleaned up automatically.

    Args:
        uuid: Process UUID

    Returns:
        Exit code if process has terminated early, None if still running or not found
    """
    if uuid not in self.process_store:
        self.log.debug(f"Process {uuid} not found in store for exit code retrieval")
        return None

    process = self.process_store[uuid]["process"]
    if process.is_alive():
        return None

    try:
        early_exit_code = process.exit_code
    except Exception as e:
        self.log.debug(f"Exception thrown getting exit code for {uuid}: {e}")
        return None

    if early_exit_code is not None:
        self.log.warning(
            f"Process {uuid} exited early without being killed. Exit code {early_exit_code}"
        )
        self.log.debug(
            f"Cleaning up resources for process {uuid} with exit code {early_exit_code}"
        )
        self._cleanup_process_resources(uuid)

    return early_exit_code
read_log_file(hostname, user, log_file, num_lines=100)

Read remote log file via SSH.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def read_log_file(
    self, hostname: str, user: str, log_file: str, num_lines: int = 100
) -> List[str]:
    """Read remote log file via SSH."""
    # Create temporary file for output
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()

    try:
        # Build user@host string
        user_host = f"{user}@{hostname}"

        # Build SSH arguments using helper method
        arguments = self._build_ssh_arguments(hostname, user_host)
        arguments.extend(["tail", f"-{num_lines}", log_file])

        # Execute SSH command with output redirection
        self.ssh(
            *arguments,
            _out=temp_file.name,
            _err_to_out=True,
        )

        # Read output lines from temporary file
        with open(temp_file.name) as f:
            lines = f.readlines()

            # Remove SSH connection closure message if present
            if lines and "Connection to " in lines[-1] and " closed." in lines[-1]:
                lines = lines[:-1]

            return lines

    except Exception as e:
        self.log.error(f"Failed to read remote log file: {e}")
        return [f"Could not retrieve logs: {e!s}"]

    finally:
        # Clean up temporary file
        try:
            os.remove(temp_file.name)
        except Exception:
            pass
read_process_metadata(uuid, metadata_file, hostname, user, timeout=ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA)

Read process metadata from remote JSON file with a single SSH call.

Uses a remote-side wait loop to avoid multiple SSH round-trips. The remote command polls for file existence and reads it once available, all within a single SSH session.

Parameters:

Name Type Description Default
uuid str

Process UUID for identification in logs

required
metadata_file str

Absolute path to metadata file on remote host

required
hostname str

Target hostname for SSH connection

required
user str

SSH username for authentication

required
timeout float

Maximum time in seconds to wait for metadata file availability

DEFAULT_TIMEOUT_FOR_READING_METADATA

Returns:

Type Description
Optional[ProcessMetadata]

ProcessMetadata instance if file exists and is valid, None otherwise

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def read_process_metadata(
    self,
    uuid: str,
    metadata_file: str,
    hostname: str,
    user: str,
    timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_READING_METADATA,
) -> Optional[ProcessMetadata]:
    """
    Read process metadata from remote JSON file with a single SSH call.

    Uses a remote-side wait loop to avoid multiple SSH round-trips. The remote
    command polls for file existence and reads it once available, all within
    a single SSH session.

    Args:
        uuid: Process UUID for identification in logs
        metadata_file: Absolute path to metadata file on remote host
        hostname: Target hostname for SSH connection
        user: SSH username for authentication
        timeout: Maximum time in seconds to wait for metadata file availability

    Returns:
        ProcessMetadata instance if file exists and is valid, None otherwise
    """
    try:
        # Build user@host string for SSH connection
        user_host = f"{user}@{hostname}"

        # Build SSH arguments including connection parameters
        arguments = self._build_ssh_arguments(hostname, user_host)

        # Remote command: wait for file to exist, then read it
        # Polls every 50ms, times out after specified duration
        remote_command = (
            f"timeout {timeout} bash -c '"
            f"while [ ! -f {metadata_file} ]; do sleep 0.05; done; "
            f"cat {metadata_file}"
            f"'"
        )
        arguments.append(remote_command)

        # Execute SSH command to wait for and read file (single round-trip)
        result = self.ssh(*arguments)
        json_content = str(result).strip()

        # Parse JSON content and instantiate metadata object
        metadata = ProcessMetadata.from_json(json_content)

        return metadata

    except Exception as e:
        self.log.debug(f"Failed to read metadata for {uuid}: {e}")
        return None
start_process(uuid, boot_request)

Start a remote process via SSH using the boot request configuration.

Extracts all necessary parameters from the boot request and executes the process on the remote host using sh library's SSH wrapper.

Parameters:

Name Type Description Default
uuid str

Unique identifier for this process

required
boot_request BootRequest

BootRequest containing process configuration, metadata, environment variables, and execution parameters

required

Raises:

Type Description
RuntimeError

If SSH connection or process execution fails

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def start_process(self, uuid: str, boot_request: BootRequest) -> None:
    """
    Start a remote process via SSH using the boot request configuration.

    Extracts all necessary parameters from the boot request and executes
    the process on the remote host using sh library's SSH wrapper.

    Args:
        uuid: Unique identifier for this process
        boot_request: BootRequest containing process configuration, metadata,
                    environment variables, and execution parameters

    Raises:
        RuntimeError: If SSH connection or process execution fails
    """
    # Extract connection parameters from boot request metadata
    hostname = boot_request.process_description.metadata.hostname
    user = boot_request.process_description.metadata.user
    log_file = boot_request.process_description.process_logs_path

    # Extract environment variables from boot request
    env_vars = (
        dict(boot_request.process_description.env)
        if boot_request.process_description.env
        else {}
    )

    # Build command string from executable and arguments
    cmd = ""
    for exe_arg in boot_request.process_description.executable_and_arguments:
        cmd += exe_arg.exec
        for arg in exe_arg.args:
            cmd += f" {arg}"
        cmd += ";"

    # Remove trailing semicolon if present
    if cmd.endswith(";"):
        cmd = cmd[:-1]

    # Execute the command via SSH
    self._execute_bootrequest_via_ssh(
        uuid=uuid,
        boot_request=boot_request,
        hostname=hostname,
        user=user if user else getpass.getuser(),
        command=cmd,
        log_file=log_file,
        env_vars=env_vars,
    )
validate_host_connection(host, auth_method, user=getpass.getuser())

Validate SSH connection to the specified host.

Source code in drunc/processes/ssh_process_lifetime_manager_shell.py
def validate_host_connection(
    self,
    host: str,
    auth_method: str,
    user: str = getpass.getuser(),
) -> None:
    """Validate SSH connection to the specified host."""
    try:
        # Build user@host string
        user_host = f"{user}@{host}"

        # Build remote command
        remote_cmd = f'echo "{user} established SSH successfully";'

        # Build SSH arguments using helper method
        arguments = self._build_ssh_arguments(host, user_host)
        arguments.append(remote_cmd)

        # Execute SSH command and wait for completion
        self.ssh(*arguments)

        self.log.debug(f"SSH validation successful for {user}@{host}")

    except Exception as e:
        self.log.error(f"SSH validation failed for {user}@{host}: {e}")
        raise RuntimeError(f"SSH connection validation failed: {e}")

Functions