Skip to content

ssh_process_manager

drunc.process_manager.ssh_process_manager

Classes

SSHProcessManager(configuration, LifetimeManagerClass, **kwargs)

Bases: ProcessManager

Source code in drunc/process_manager/ssh_process_manager.py
def __init__(
    self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs
):
    self.ssh_lifetime_manager = None
    self.session = getpass.getuser()  # unfortunate

    super().__init__(configuration=configuration, session=self.session, **kwargs)

    self.disable_localhost_host_key_check = False
    self.disable_host_key_check = False

    if self.configuration.data.settings:
        self.disable_localhost_host_key_check = (
            self.configuration.data.settings.get(
                "disable_localhost_host_key_check", False
            )
        )
        self.disable_host_key_check = self.configuration.data.settings.get(
            "disable_host_key_check", False
        )

    # self.children_logs_depth = 1000
    # self.children_logs = {}

    self.ssh_lifetime_manager = LifetimeManagerClass(
        disable_host_key_check=self.disable_host_key_check,
        disable_localhost_host_key_check=self.disable_localhost_host_key_check,
        logger=self.log,
        on_process_exit=self._on_ssh_process_exit,
    )
Functions
__boot(boot_request, uuid)

Boot a remote process via SSH on an available host.

Attempts to start the process on each allowed host in sequence until successful. Updates boot request with the actual hostname used and returns process status information.

Parameters:

Name Type Description Default
boot_request BootRequest

BootRequest containing process configuration and restrictions

required
uuid str

Unique identifier for this process

required

Returns:

Type Description
ProcessInstance

ProcessInstance containing process status and metadata

Raises:

Type Description
DruncCommandException

If no allowed hosts provided or process already exists

Source code in drunc/process_manager/ssh_process_manager.py
def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
    """
    Boot a remote process via SSH on an available host.

    Attempts to start the process on each allowed host in sequence until
    successful. Updates boot request with the actual hostname used and
    returns process status information.

    Args:
        boot_request: BootRequest containing process configuration and restrictions
        uuid: Unique identifier for this process

    Returns:
        ProcessInstance containing process status and metadata

    Raises:
        DruncCommandException: If no allowed hosts provided or process already exists
    """
    self.log.debug(
        f"{self.name} booting '{boot_request.process_description.metadata.name}' "
        f"from session '{boot_request.process_description.metadata.session}'"
    )

    # Validate boot request
    if len(boot_request.process_restriction.allowed_hosts) < 1:
        raise DruncCommandException("No allowed host provided! bailing")

    if uuid in self.boot_request:
        raise DruncCommandException(f"Process {uuid} already exists!")

    # Store boot request for lifecycle management
    self.boot_request[uuid] = BootRequest()
    self.boot_request[uuid].CopyFrom(boot_request)

    hostname = ""
    errors = ""

    # Attempt to start process on each allowed host
    for host in boot_request.process_restriction.allowed_hosts:
        try:
            # Update hostname in boot request for this attempt
            self.boot_request[uuid].process_description.metadata.hostname = host

            # Start the process via SSH manager
            self.ssh_lifetime_manager.start_process(
                uuid=uuid, boot_request=self.boot_request[uuid]
            )

            # Success - record the hostname used
            hostname = host
            break

        except Exception as e:
            errors += str(e)
            self.log.warning(f"Couldn't start on host {host}, reason:\n{e!s}")
            continue

    # Store the successful hostname in boot request metadata
    self.boot_request[uuid].process_description.metadata.hostname = hostname

    self.log.info(
        f"Booted '{boot_request.process_description.metadata.name}' "
        f"from session '{boot_request.process_description.metadata.session}' "
        f"with UUID {uuid}"
    )

    # Build process instance response
    pd = ProcessDescription()
    pd.CopyFrom(self.boot_request[uuid].process_description)
    pr = ProcessRestriction()
    pr.CopyFrom(self.boot_request[uuid].process_restriction)
    pu = ProcessUUID(uuid=uuid)

    # Query current process status
    alive = self.ssh_lifetime_manager.is_process_alive(uuid)
    return_code = self.ssh_lifetime_manager.get_exit_code(uuid)
    status_code = (
        ProcessInstance.StatusCode.RUNNING
        if alive
        else ProcessInstance.StatusCode.DEAD
    )

    pi = ProcessInstance(
        process_description=pd,
        process_restriction=pr,
        status_code=status_code,
        return_code=return_code,
        uuid=pu,
    )

    return pi
kill_processes(uuids)

Terminate processes by their UUIDs.

Iterates through the provided UUID list and terminates each process via the SSH connection manager. Collects process status information for each terminated process.

Parameters:

Name Type Description Default
uuids list

List of process UUIDs to terminate

required

Returns:

Type Description
ProcessInstanceList

ProcessInstanceList containing status of terminated processes

Source code in drunc/process_manager/ssh_process_manager.py
def kill_processes(self, uuids: list) -> ProcessInstanceList:
    """
    Terminate processes by their UUIDs.

    Iterates through the provided UUID list and terminates each process
    via the SSH connection manager. Collects process status information
    for each terminated process.

    Args:
        uuids: List of process UUIDs to terminate

    Returns:
        ProcessInstanceList containing status of terminated processes
    """
    ret = []

    for proc_uuid in uuids:
        app_name = self.boot_request[proc_uuid].process_description.metadata.name

        # Terminate process if still alive
        if self.ssh_lifetime_manager.is_process_alive(proc_uuid):
            self.log.debug(f"Killing '{app_name}' with UUID {proc_uuid}")
            self.ssh_lifetime_manager.terminate_process(
                proc_uuid, timeout=self.configuration.data.kill_timeout
            )
            self.log.info(f"Killed '{app_name}' with UUID {proc_uuid}")

        # Build process instance for response
        pd = ProcessDescription()
        pd.CopyFrom(self.boot_request[proc_uuid].process_description)
        pr = ProcessRestriction()
        pr.CopyFrom(self.boot_request[proc_uuid].process_restriction)
        pu = ProcessUUID(uuid=proc_uuid)

        # Get final exit code
        return_code = self.ssh_lifetime_manager.get_exit_code(proc_uuid)

        ret += [
            ProcessInstance(
                process_description=pd,
                process_restriction=pr,
                status_code=ProcessInstance.StatusCode.DEAD,
                return_code=return_code,
                uuid=pu,
            )
        ]

        # Clean up SSH resources
        self.ssh_lifetime_manager.cleanup_process(proc_uuid)

    return ProcessInstanceList(
        name=self.name,
        token=None,
        values=ret,
        flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
    )