Skip to content

process_manager

drunc.grpc_testing_tools.process_manager

Manager Service Implementation

Provides the gRPC servicer implementation for the Manager service, which acts as the top-level coordinator in the system hierarchy. Supports booting remote servers via SSH and managing their lifecycle.

Classes

ManagerServiceImpl(lifetime_manager_type=ProcessManagerTypes.SSH_PARAMIKO)

Bases: ManagerServiceServicer

Implementation of Manager gRPC service compatible with druncschema components.

The Manager service acts as the top-level coordinator and uses SSHProcessLifetimeManager for SSH-based process execution.

Initialise the Manager service implementation.

Source code in drunc/grpc_testing_tools/process_manager.py
def __init__(self, lifetime_manager_type=ProcessManagerTypes.SSH_PARAMIKO):
    """Initialise the Manager service implementation."""
    if lifetime_manager_type == ProcessManagerTypes.SSH_PARAMIKO:
        self.ssh_manager = SSHProcessLifetimeManagerParamiko(
            disable_host_key_check=True,
            disable_localhost_host_key_check=True,
            logger=logging.getLogger(__name__),
        )
    elif lifetime_manager_type == ProcessManagerTypes.SSH_SHELL:
        self.ssh_manager = SSHProcessLifetimeManagerShell(
            disable_host_key_check=True,
            disable_localhost_host_key_check=True,
            logger=logging.getLogger(__name__),
        )
    else:
        raise ValueError(f"Unknown lifetime_manager_type: {lifetime_manager_type}")

    # Track booted processes
    self.booted_servers: Dict[str, Dict] = {}
    self.boot_lock = threading.Lock()
Functions
Kill(request, context)

Handle graceful shutdown requests for the Manager service.

First sends Kill gRPC requests to all booted child servers, waits for their termination, then shuts down the Manager itself.

Parameters:

Name Type Description Default
request KillRequest

KillRequest containing shutdown parameters

required
context

gRPC context object

required

Returns:

Type Description
KillResponse

KillResponse indicating shutdown status

Source code in drunc/grpc_testing_tools/process_manager.py
def Kill(self, request: KillRequest, context) -> KillResponse:
    """
    Handle graceful shutdown requests for the Manager service.

    First sends Kill gRPC requests to all booted child servers, waits for their
    termination, then shuts down the Manager itself.

    Args:
        request: KillRequest containing shutdown parameters
        context: gRPC context object

    Returns:
        KillResponse indicating shutdown status
    """
    grace_period = (
        max(request.grace_period_seconds, 1)
        if request.grace_period_seconds > 0
        else 2
    )

    reason = request.reason or "No reason provided"

    # Kill all booted servers first
    kill_failures = []
    with self.boot_lock:
        if self.booted_servers:
            print(
                f"Manager killing {len(self.booted_servers)} booted servers before shutdown..."
            )

            for process_uuid, server_info in list(self.booted_servers.items()):
                try:
                    boot_request = server_info["request"]
                    process_desc = boot_request.process_description

                    print(f"Stopping booted server: {process_uuid}")

                    # Extract server details from process description
                    hostname = process_desc.metadata.hostname
                    # Extract port from process arguments
                    port = self._extract_port_from_args(process_desc)
                    server_type = (
                        process_desc.metadata.name
                    )  # Assuming name indicates type

                    # First, send Kill gRPC request to the booted server
                    kill_success, kill_msg = self._kill_booted_server_via_grpc(
                        process_uuid,
                        hostname,
                        port,
                        server_type,
                        grace_period,
                    )
                    print(f"Kill gRPC request to {process_uuid}: {kill_msg}")

                    # Wait for the server process to terminate
                    start_time = time.time()
                    while (
                        time.time() - start_time
                    ) < grace_period and self.ssh_manager.is_process_alive(
                        process_uuid
                    ):
                        time.sleep(0.1)

                    # If still alive, stop via SSH termination
                    if self.ssh_manager.is_process_alive(process_uuid):
                        print(
                            f"Server {process_uuid} still alive, terminating SSH connection"
                        )
                        self.ssh_manager.terminate_process(
                            process_uuid, timeout=grace_period
                        )

                        # Wait again for process termination after SSH kill
                        start_time = time.time()
                        while (
                            time.time() - start_time
                        ) < grace_period and self.ssh_manager.is_process_alive(
                            process_uuid
                        ):
                            time.sleep(0.1)

                    # Check final status
                    if self.ssh_manager.is_process_alive(process_uuid):
                        kill_failures.append(
                            f"{process_uuid} (process still alive after timeout)"
                        )
                    else:
                        print(f"Successfully stopped booted server: {process_uuid}")

                    # Cleanup process
                    self.ssh_manager.cleanup_process(process_uuid)

                except Exception as e:
                    error_msg = f"{process_uuid} ({str(e)})"
                    kill_failures.append(error_msg)
                    print(f"Error stopping booted server {process_uuid}: {e}")

            # Clear the booted servers dict
            self.booted_servers.clear()

    # If any children failed to terminate, return error
    if kill_failures:
        failure_details = "; ".join(kill_failures)
        response_message = (
            f"Manager Kill incomplete - failed to terminate {len(kill_failures)} "
            f"booted server(s): {failure_details}"
        )
        return KillResponse(shutdown_initiated=False, message=response_message)

    # All children terminated successfully, now shutdown Manager
    booted_count = len(self.booted_servers) if self.booted_servers else 0
    response_details = [
        "Manager Kill method executed successfully",
        f"Terminated {booted_count} booted server(s)"
        if booted_count > 0
        else "No booted servers to terminate",
        f"Reason: {reason}",
        f"Grace period: {grace_period}s",
        f"PID: {os.getpid()}",
        "Shutdown thread starting...",
    ]

    def delayed_shutdown():
        """Send SIGTERM to this process after a brief delay."""
        time.sleep(0.5)  # Allow response to be sent
        os.kill(os.getpid(), signal.SIGTERM)

    # Start shutdown in separate thread to avoid blocking the response
    shutdown_thread = threading.Thread(target=delayed_shutdown)
    shutdown_thread.daemon = True
    shutdown_thread.start()

    return KillResponse(
        shutdown_initiated=True, message=" | ".join(response_details)
    )
MakeRequest(request, context)

Handle incoming connectivity test requests.

Parameters:

Name Type Description Default
request

DummyRequest containing message and timestamp

required
context

gRPC context object

required

Returns:

Type Description

DummyResponse with echoed message confirming Manager is responsive

Source code in drunc/grpc_testing_tools/process_manager.py
def MakeRequest(self, request, context):
    """
    Handle incoming connectivity test requests.

    Args:
        request: DummyRequest containing message and timestamp
        context: gRPC context object

    Returns:
        DummyResponse with echoed message confirming Manager is responsive
    """
    return DummyResponse(reply=f"Manager server response: {request.message}")
boot(request, context)

Boot a new gRPC server process via SSH using BootRequest.

Parameters:

Name Type Description Default
request BootRequest

BootRequest containing ProcessDescription and restrictions

required
context

gRPC context object

required

Returns:

Type Description
ProcessInstanceList

ProcessInstanceList indicating success/failure and providing server details

Source code in drunc/grpc_testing_tools/process_manager.py
def boot(self, request: BootRequest, context) -> ProcessInstanceList:
    """
    Boot a new gRPC server process via SSH using BootRequest.

    Args:
        request: BootRequest containing ProcessDescription and restrictions
        context: gRPC context object

    Returns:
        ProcessInstanceList indicating success/failure and providing server details
    """
    with self.boot_lock:
        process_uuid = request.process_description.metadata.uuid.uuid
        process_name = request.process_description.metadata.name

        # Validate process UUID is unique
        if process_uuid in self.booted_servers:
            return ProcessInstanceList(
                name="boot_error",
                token=request.token,
                values=[],
                flag=ResponseFlag(
                    success=False,
                    message=f"Process UUID '{process_uuid}' already exists",
                ),
            )

        try:
            # Extract connection details from process metadata
            hostname = request.process_description.metadata.hostname

            # Start process via SSH using start_process method
            self.ssh_manager.start_process(
                uuid=process_uuid,
                boot_request=request,
            )

            # Store server info
            self.booted_servers[process_uuid] = {
                "request": request,
                "command": self._build_server_command_from_description(
                    request.process_description
                ),
            }

            # Create successful process instance
            process_instance = ProcessInstance(
                process_description=request.process_description,
                process_restriction=request.process_restriction,
                status_code=ProcessInstance.StatusCode.RUNNING,
                return_code=0,
                uuid=request.process_description.metadata.uuid,
            )

            return ProcessInstanceList(
                name=process_name,
                token=request.token,
                values=[process_instance],
                flag=ResponseFlag(
                    success=True,
                    message=f"Successfully booted {process_name} on {hostname}",
                ),
            )

        except Exception as e:
            return ProcessInstanceList(
                name=process_name,
                token=request.token,
                values=[],
                flag=ResponseFlag(
                    success=False,
                    message=f"Boot failed: {str(e)}",
                ),
            )