Skip to content

ssh_process_manager

drunc.process_manager.ssh_process_manager

Classes

SSHProcessManager(configuration, LifetimeManagerClass, **kwargs)

Bases: ProcessManager

Source code in drunc/process_manager/ssh_process_manager.py
def __init__(
    self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs
):
    self.ssh_lifetime_manager: Optional[ProcessLifetimeManager] = None
    self.session = getpass.getuser()  # unfortunate

    super().__init__(configuration=configuration, session=self.session, **kwargs)

    self.disable_localhost_host_key_check = False
    self.disable_host_key_check = False

    if self.configuration.data.settings:
        self.disable_localhost_host_key_check = (
            self.configuration.data.settings.get(
                "disable_localhost_host_key_check", False
            )
        )
        self.disable_host_key_check = self.configuration.data.settings.get(
            "disable_host_key_check", False
        )

    # self.children_logs_depth = 1000
    # self.children_logs = {}

    self.ssh_lifetime_manager = LifetimeManagerClass(
        disable_host_key_check=self.disable_host_key_check,
        disable_localhost_host_key_check=self.disable_localhost_host_key_check,
        logger=self.log,
        on_process_exit=self._on_ssh_process_exit,
    )
    # stores the exit codes for all dead processes by uuid
    self.archived_exit_codes: dict[str, int] = {}
Functions
__boot(boot_request, uuid)

Boot a remote process via SSH on an available host.

Attempts to start the process on each allowed host in sequence until successful. Updates boot request with the actual hostname used and returns process status information.

Parameters:

Name Type Description Default
boot_request BootRequest

BootRequest containing process configuration and restrictions

required
uuid str

Unique identifier for this process

required

Returns:

Type Description
ProcessInstance

ProcessInstance containing process status and metadata

Raises:

Type Description
DruncCommandException

If no allowed hosts provided or process already exists

Source code in drunc/process_manager/ssh_process_manager.py
def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
    """
    Boot a remote process via SSH on an available host.

    Attempts to start the process on each allowed host in sequence until
    successful. Updates boot request with the actual hostname used and
    returns process status information.

    Args:
        boot_request: BootRequest containing process configuration and restrictions
        uuid: Unique identifier for this process

    Returns:
        ProcessInstance containing process status and metadata

    Raises:
        DruncCommandException: If no allowed hosts provided or process already exists
    """
    self.log.debug(
        f"{self.name} booting '{boot_request.process_description.metadata.name}' "
        f"from session '{boot_request.process_description.metadata.session}'"
    )

    # Validate boot request
    if len(boot_request.process_restriction.allowed_hosts) < 1:
        raise DruncCommandException("No allowed host provided! bailing")

    if uuid in self.boot_request:
        raise DruncCommandException(f"Process {uuid} already exists!")

    # Store boot request for lifecycle management
    self.boot_request[uuid] = BootRequest()
    self.boot_request[uuid].CopyFrom(boot_request)

    hostname = ""
    errors = ""

    # Attempt to start process on each allowed host
    for host in boot_request.process_restriction.allowed_hosts:
        try:
            # Update hostname in boot request for this attempt
            self.boot_request[uuid].process_description.metadata.hostname = host

            # Start the process via SSH manager
            self.ssh_lifetime_manager.start_process(
                uuid=uuid, boot_request=self.boot_request[uuid]
            )

            # Success - record the hostname used
            hostname = host
            break

        except Exception as e:
            errors += str(e)
            self.log.warning(f"Couldn't start on host {host}, reason:\n{e!s}")
            continue

    # Store the successful hostname in boot request metadata
    self.boot_request[uuid].process_description.metadata.hostname = hostname

    self.log.info(
        f"Booted '{boot_request.process_description.metadata.name}' "
        f"from session '{boot_request.process_description.metadata.session}' "
        f"with UUID {uuid}"
    )

    # Query current process status
    alive = self.ssh_lifetime_manager.is_process_alive(uuid)
    return_code = self.ssh_lifetime_manager.pop_early_exit_code(uuid)

    # Archive exit code if process exited early
    if return_code is not None:
        self.log.debug(f"Process {uuid} exited early with exit code: {return_code}")
        self.archived_exit_codes[uuid] = return_code

    # Determine status code based on liveness
    status_code = (
        ProcessInstance.StatusCode.RUNNING
        if alive
        else ProcessInstance.StatusCode.DEAD
    )

    # Build ProcessInstance response
    pi = self._build_process_instance(
        uuid=uuid,
        status_code=status_code,
        return_code=return_code,
    )

    return pi
kill_processes(uuids)

Kill processes by their UUIDs.

Delegates batch termination to SSH lifetime manager. Constructs ProcessInstance objects from termination results.

Parameters:

Name Type Description Default
uuids list

List of process UUIDs to terminate

required

Returns:

Type Description
ProcessInstanceList

ProcessInstanceList containing status of terminated processes

Source code in drunc/process_manager/ssh_process_manager.py
def kill_processes(self, uuids: list) -> ProcessInstanceList:
    """
    Kill processes by their UUIDs.

    Delegates batch termination to SSH lifetime manager. Constructs
    ProcessInstance objects from termination results.

    Args:
        uuids: List of process UUIDs to terminate

    Returns:
        ProcessInstanceList containing status of terminated processes
    """
    # Delegate shutdown to lifetime manager and retrieve exit codes
    exit_codes = self.ssh_lifetime_manager.kill_processes(
        uuids, self._get_process_timeouts(uuids)
    )

    # Archive exit codes for future reference
    self.archived_exit_codes.update(exit_codes)

    # Build ProcessInstance objects from termination results
    ret = [
        self._build_process_instance(
            uuid=uuid,
            status_code=ProcessInstance.StatusCode.DEAD,
            return_code=exit_codes.get(uuid),
        )
        for uuid in uuids
    ]

    return ProcessInstanceList(
        name=self.name,
        token=None,
        values=ret,
        flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
    )