Skip to content

process_manager

drunc.grpc_testing_tools.process_manager

Manager Service Implementation

Provides the gRPC servicer implementation for the Manager service, which acts as the top-level coordinator in the system hierarchy. Supports booting remote servers via SSH and managing their lifecycle.

Classes

ManagerServiceImpl(lifetime_manager_type=ProcessManagerTypes.SSH_SHELL)

Bases: ManagerServiceServicer

Implementation of Manager gRPC service compatible with druncschema components.

The Manager service acts as the top-level coordinator and uses SSHProcessLifetimeManager for SSH-based process execution.

Initialise the Manager service implementation.

Source code in drunc/grpc_testing_tools/process_manager.py
def __init__(self, lifetime_manager_type=ProcessManagerTypes.SSH_SHELL):
    """Initialise the Manager service implementation."""
    if lifetime_manager_type == ProcessManagerTypes.SSH_PARAMIKO:
        self.ssh_manager = SSHProcessLifetimeManagerParamiko(
            disable_host_key_check=True,
            disable_localhost_host_key_check=True,
            logger=logging.getLogger(__name__),
        )
    elif lifetime_manager_type == ProcessManagerTypes.SSH_SHELL:
        self.ssh_manager = SSHProcessLifetimeManagerShell(
            disable_host_key_check=True,
            disable_localhost_host_key_check=True,
            logger=logging.getLogger(__name__),
        )
    else:
        raise ValueError(f"Unknown lifetime_manager_type: {lifetime_manager_type}")

    # Track booted processes
    self.booted_servers: Dict[str, Dict] = {}
    self.boot_lock = threading.Lock()
Methods:
Kill(request, context)

Handle graceful shutdown requests for the Manager service.

First sends Kill gRPC requests to all booted child servers, waits for their termination, then shuts down the Manager itself.

Parameters:

Name Type Description Default
request KillRequest

KillRequest containing shutdown parameters

required
context

gRPC context object

required

Returns:

Type Description
KillResponse

KillResponse indicating shutdown status

Source code in drunc/grpc_testing_tools/process_manager.py
def Kill(self, request: KillRequest, context) -> KillResponse:
    """
    Handle graceful shutdown requests for the Manager service.

    First sends Kill gRPC requests to all booted child servers, waits for their
    termination, then shuts down the Manager itself.

    Args:
        request: KillRequest containing shutdown parameters
        context: gRPC context object

    Returns:
        KillResponse indicating shutdown status
    """
    grace_period = (
        max(request.grace_period_seconds, 1)
        if request.grace_period_seconds > 0
        else 2
    )

    reason = request.reason or "No reason provided"

    # Kill all booted servers first
    kill_failures = []
    with self.boot_lock:
        if self.booted_servers:
            print(
                f"Manager killing {len(self.booted_servers)} booted servers before shutdown..."
            )

            for process_uuid, server_info in list(self.booted_servers.items()):
                try:
                    boot_request = server_info["request"]
                    process_desc = boot_request.process_description

                    print(f"Stopping booted server: {process_uuid}")

                    # Extract server details from process description
                    hostname = process_desc.metadata.hostname
                    # Extract port from process arguments
                    port = self._extract_port_from_args(process_desc)
                    server_type = (
                        process_desc.metadata.name
                    )  # Assuming name indicates type

                    # First, send Kill gRPC request to the booted server
                    kill_success, kill_msg = self._kill_booted_server_via_grpc(
                        process_uuid,
                        hostname,
                        port,
                        server_type,
                        grace_period,
                    )
                    print(f"Kill gRPC request to {process_uuid}: {kill_msg}")

                    # Wait for the server process to terminate
                    start_time = time.time()
                    while (
                        time.time() - start_time
                    ) < grace_period and self.ssh_manager.is_process_alive(
                        process_uuid
                    ):
                        time.sleep(0.1)

                    # If still alive, stop via SSH termination
                    if self.ssh_manager.is_process_alive(process_uuid):
                        print(
                            f"Server {process_uuid} still alive, terminating SSH connection"
                        )
                        exit_code = self.ssh_manager.kill_process(process_uuid)

                        # Wait again for process termination after SSH kill
                        start_time = time.time()
                        while (
                            time.time() - start_time
                        ) < grace_period and self.ssh_manager.is_process_alive(
                            process_uuid
                        ):
                            time.sleep(0.1)

                    # Check final status
                    if self.ssh_manager.is_process_alive(process_uuid):
                        kill_failures.append(
                            f"{process_uuid} (process still alive after timeout)"
                        )
                    else:
                        print(
                            f"Successfully stopped booted server: {process_uuid}. Exit code: {exit_code}"
                        )

                except Exception as e:
                    error_msg = f"{process_uuid} ({str(e)})"
                    kill_failures.append(error_msg)
                    print(f"Error stopping booted server {process_uuid}: {e}")

            # Clear the booted servers dict
            self.booted_servers.clear()

    # If any children failed to terminate, return error
    if kill_failures:
        failure_details = "; ".join(kill_failures)
        response_message = (
            f"Manager Kill incomplete - failed to terminate {len(kill_failures)} "
            f"booted server(s): {failure_details}"
        )
        return KillResponse(shutdown_initiated=False, message=response_message)

    # All children terminated successfully, now shutdown Manager
    booted_count = len(self.booted_servers) if self.booted_servers else 0
    response_details = [
        "Manager Kill method executed successfully",
        f"Terminated {booted_count} booted server(s)"
        if booted_count > 0
        else "No booted servers to terminate",
        f"Reason: {reason}",
        f"Grace period: {grace_period}s",
        f"PID: {os.getpid()}",
        "Shutdown thread starting...",
    ]

    def delayed_shutdown():
        """Send SIGTERM to this process after a brief delay."""
        time.sleep(0.5)  # Allow response to be sent
        os.kill(os.getpid(), signal.SIGTERM)

    # Start shutdown in separate thread to avoid blocking the response
    shutdown_thread = threading.Thread(target=delayed_shutdown)
    shutdown_thread.daemon = True
    shutdown_thread.start()

    return KillResponse(
        shutdown_initiated=True, message=" | ".join(response_details)
    )
MakeRequest(request, context)

Handle incoming connectivity test requests.

Parameters:

Name Type Description Default
request

DummyRequest containing message and timestamp

required
context

gRPC context object

required

Returns:

Type Description

DummyResponse with echoed message confirming Manager is responsive

Source code in drunc/grpc_testing_tools/process_manager.py
def MakeRequest(self, request, context):
    """
    Handle incoming connectivity test requests.

    Args:
        request: DummyRequest containing message and timestamp
        context: gRPC context object

    Returns:
        DummyResponse with echoed message confirming Manager is responsive
    """
    return DummyResponse(reply=f"Manager server response: {request.message}")
boot(request, context)

Boot a new gRPC server process via SSH using BootRequest.

Parameters:

Name Type Description Default
request BootRequest

BootRequest containing ProcessDescription and restrictions

required
context

gRPC context object

required

Returns:

Type Description
ProcessInstanceList

ProcessInstanceList indicating success/failure and providing server details

Source code in drunc/grpc_testing_tools/process_manager.py
def boot(self, request: BootRequest, context) -> ProcessInstanceList:
    """
    Boot a new gRPC server process via SSH using BootRequest.

    Args:
        request: BootRequest containing ProcessDescription and restrictions
        context: gRPC context object

    Returns:
        ProcessInstanceList indicating success/failure and providing server details
    """
    with self.boot_lock:
        process_uuid = request.process_description.metadata.uuid.uuid
        process_name = request.process_description.metadata.name

        # Validate process UUID is unique
        if process_uuid in self.booted_servers:
            return ProcessInstanceList(
                name="boot_error",
                token=request.token,
                values=[],
                flag=ResponseFlag(
                    success=False,
                    message=f"Process UUID '{process_uuid}' already exists",
                ),
            )

        try:
            # Extract connection details from process metadata
            hostname = request.process_description.metadata.hostname

            # Start process via SSH using start_process method
            self.ssh_manager.start_process(
                uuid=process_uuid,
                boot_request=request,
            )

            # Store server info
            self.booted_servers[process_uuid] = {
                "request": request,
                "command": self._build_server_command_from_description(
                    request.process_description
                ),
            }

            # Create successful process instance
            process_instance = ProcessInstance(
                process_description=request.process_description,
                process_restriction=request.process_restriction,
                status_code=ProcessInstance.StatusCode.RUNNING,
                return_code=0,
                uuid=request.process_description.metadata.uuid,
            )

            return ProcessInstanceList(
                name=process_name,
                token=request.token,
                values=[process_instance],
                flag=ResponseFlag(
                    success=True,
                    message=f"Successfully booted {process_name} on {hostname}",
                ),
            )

        except Exception as e:
            return ProcessInstanceList(
                name=process_name,
                token=request.token,
                values=[],
                flag=ResponseFlag(
                    success=False,
                    message=f"Boot failed: {str(e)}",
                ),
            )