Skip to content

k8s_process_manager

drunc.process_manager.k8s_process_manager

Classes

K8sProcessManager(configuration, **kwargs)

Bases: ProcessManager

Manages processes as Kubernetes Pods. This ProcessManager interfaces with the Kubernetes API to start, stop, and monitor applications running in Pods. It includes special handling for a local connectivity service, which involves: 1. Using a NodePort service for the orchestrator for external access.

Source code in drunc/process_manager/k8s_process_manager.py
def __init__(self, configuration, **kwargs) -> None:
    """
    Manages processes as Kubernetes Pods.
    This ProcessManager interfaces with the Kubernetes API to start, stop, and monitor
    applications running in Pods. It includes special handling for a local connectivity
    service, which involves:
    1.  Using a NodePort service for the orchestrator for external access.
    """
    self.session = getpass.getuser()
    super().__init__(configuration=configuration, session=self.session, **kwargs)
    self.log = get_logger("process_manager.k8s-process-manager")

    try:
        config.load_kube_config()
    except ConfigException as e:
        self.log.critical("--- 🚨 KUBERNETES CONFIGURATION ERROR ---")
        self.log.critical(f"Failed to load kube-config: {e}")
        self.log.critical(
            "Please ensure 'kubectl' is configured correctly or the KUBECONFIG environment variable is set."
        )
        self.log.critical("----------------------------------------------")
        raise

    self._k8s_client = client
    self._core_v1_api = client.CoreV1Api()
    self._meta_v1_api = client.V1ObjectMeta
    self._pod_spec_v1_api = client.V1PodSpec
    self._api_error_v1_api = client.rest.ApiException

    self.managed_sessions = set()
    self.watchers = []
    self._start_watcher()
    self.sessions_pending_deletion = set()
    self.uuids_pending_deletion = set()
    self.termination_complete_event = threading.Event()
    self.final_exit_codes = {}
    self.local_connection_server_is_booted = False

    # Host verification cache: {hostname: (is_valid, timestamp)}
    self._host_cache = {}
    self._host_cache_lock = threading.Lock()

    # Get settings from configuration
    settings = getattr(self.configuration.data, "settings", {})

    # Labels
    labels = settings.get("labels", {})
    self.drunc_label = labels.get("drunc_label", "drunc.daq")

    # Connection server
    connection_server = settings.get("connection_server", {})
    self.connection_server_name = connection_server.get(
        "name", "local-connection-server"
    )
    self.connection_server_port = None
    self.connection_server_node_port = None

    # Pod management
    pod_management = settings.get("pod_management", {})
    self.kill_timeout = pod_management.get("kill_timeout", 30)
    self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60)
    self.total_shutdown_timeout = pod_management.get("total_shutdown_timeout", 60)

    # Volume mounts
    self.volume_configs = settings.get("volumes", [])

    # Cleanup
    cleanup = settings.get("cleanup", {})
    self.restart_cleanup_time = cleanup.get("restart_cleanup_time", 10.0)
    self.restart_cleanup_polling = cleanup.get("restart_cleanup_polling", 0.5)

    # Checking
    checking = settings.get("checking", {})
    self.watcher_retry_sleep = checking.get("watcher_retry_sleep", 5)
    self.pod_status_check_sleep = checking.get("pod_status_check_sleep", 1)
    self._host_cache_expiry = checking.get("host_cache_expiry", 300)
    self.grpc_startup_timeout = checking.get("grpc_startup_timeout", 30)
    self.socket_retry_timeout = checking.get("socket_retry_timeout", 1.0)

    self.log.debug(f"Using kill_timeout of {self.kill_timeout} seconds.")

    namespaces = self._core_v1_api.list_namespace(
        label_selector=f"creator.{self.drunc_label}={self.__class__.__name__}"
    )
    namespace_names = [ns.metadata.name for ns in namespaces.items]
    namespace_list_str = "\n - ".join(namespace_names)

    if namespace_list_str:
        self.log.info(
            f"Active namespaces created by drunc:\n - {namespace_list_str}"
        )
    else:
        self.log.info("No active namespace created by drunc")

    # Set up signal handlers for cleanup when parent process dies
    self._setup_signal_handlers()
Functions
__boot(boot_request, uuid)

Internal boot method. Handles pre-checks, pod creation, and blocking wait for critical services.

Source code in drunc/process_manager/k8s_process_manager.py
def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
    """
    Internal boot method. Handles pre-checks, pod creation, and blocking wait for critical services.
    """
    session = boot_request.process_description.metadata.session
    podname = boot_request.process_description.metadata.name

    # Pre-checks (Session validation, NodePort collision)
    self._run_pre_boot_checks(session, podname, boot_request)

    # Resource Creation (Namespace, Pod, Labels)
    self._create_namespace(session)
    self.boot_request[uuid] = BootRequest()
    self.boot_request[uuid].CopyFrom(boot_request)

    self._create_pod(podname, session, boot_request)
    self._add_label(podname, "pod", "uuid", uuid, session=session)
    self.log.info(f'"{session}.{podname}":{uuid} boot request sent.')

    # Special handling and blocking wait for critical processes
    if podname == self.connection_server_name:
        self._wait_for_lcs_readiness(podname, session)
    elif "root-controller" in podname:
        self._wait_for_controller_readiness(podname, session, boot_request)

    # Post-Process
    pd, pr, pu = (
        ProcessDescription(),
        ProcessRestriction(),
        ProcessUUID(uuid=uuid),
    )
    pd.CopyFrom(boot_request.process_description)
    pr.CopyFrom(boot_request.process_restriction)

    return ProcessInstance(
        process_description=pd,
        process_restriction=pr,
        status_code=ProcessInstance.StatusCode.RUNNING,
        uuid=pu,
    )
is_alive(podname, session)

Checks if a pod is currently in the 'Running' phase.

Source code in drunc/process_manager/k8s_process_manager.py
def is_alive(self, podname, session) -> bool:
    """Checks if a pod is currently in the 'Running' phase."""
    try:
        pod_status = self._core_v1_api.read_namespaced_pod_status(podname, session)
        return pod_status.status.phase == "Running"
    except self._api_error_v1_api as e:
        if e.status == 404:
            return False
        self.log.error(f"Error checking status for pod {session}.{podname}: {e}")
        return False
notify_termination(proc_uuid, exit_code, reason, session)

Callback for when a pod terminates.

Source code in drunc/process_manager/k8s_process_manager.py
def notify_termination(self, proc_uuid, exit_code, reason, session) -> None:
    """Callback for when a pod terminates."""
    self.log.debug(
        f"notify_termination called for '{proc_uuid}'. Pending={self.uuids_pending_deletion}"
    )

    if proc_uuid in self.boot_request:
        self.final_exit_codes[proc_uuid] = exit_code

        meta = self.boot_request[proc_uuid].process_description.metadata
        end_str = f"Pod '{meta.name}' (session: '{session}', user: '{meta.user}', uuid: {proc_uuid}) terminated with exit code {exit_code}. Reason: {reason}"
        self.log.info(end_str)
        self.broadcast(end_str, BroadcastType.SUBPROCESS_STATUS_UPDATE)

    if proc_uuid in self.uuids_pending_deletion:
        self.uuids_pending_deletion.remove(proc_uuid)
        self.log.debug(
            f"Watcher confirmed termination of {proc_uuid}. {len(self.uuids_pending_deletion)} pods remaining."
        )
        if not self.uuids_pending_deletion:
            self.log.debug("All pending pods terminated, setting event.")
            self.termination_complete_event.set()

Functions