Skip to content

k8s_process_manager

drunc.process_manager.k8s_process_manager

Classes

K8sPodWatcherThread(pm)

Bases: Thread

Initialize the pod watcher thread that monitors and notifies on pod events.

Parameters:

Name Type Description Default
pm

The K8sProcessManager instance to watch.

required
Source code in drunc/process_manager/k8s_process_manager.py
def __init__(self, pm) -> None:
    """
    Initialize the pod watcher thread that monitors and notifies on pod events.

    Args:
        pm: The K8sProcessManager instance to watch.
    """
    threading.Thread.__init__(self)
    self.pm = pm
    self.daemon = True
    self.processed_uuids = set()
Functions
run()

Run the pod watcher loop.

Continuously watches for Kubernetes pod events across all namespaces managed by the process manager. Detects terminal pod states (Succeeded, Failed, or Deleted) and notifies the process manager of terminations. Automatically restarts the watch stream on API errors or disconnections.

Source code in drunc/process_manager/k8s_process_manager.py
def run(self) -> None:
    """
    Run the pod watcher loop.

    Continuously watches for Kubernetes pod events across all namespaces
    managed by the process manager. Detects terminal pod states (Succeeded,
    Failed, or Deleted) and notifies the process manager of terminations.
    Automatically restarts the watch stream on API errors or disconnections.
    """
    self.pm.log.info("K8sPodWatcherThread started")
    while True:
        try:
            w = watch.Watch()
            stream = w.stream(
                self.pm._core_v1_api.list_pod_for_all_namespaces,
                label_selector=self.pm._get_creator_label_selector(),
            )
            for event in stream:
                pod = event["object"]
                metadata = pod.metadata
                status = pod.status
                phase = status.phase
                proc_uuid = metadata.labels.get(f"uuid.{self.pm.drunc_label}")
                session = metadata.namespace

                if not proc_uuid:
                    continue

                if proc_uuid in self.processed_uuids:
                    continue

                self.pm.log.debug(
                    f"Watcher saw event: type={event['type']}, phase={phase}, uuid={proc_uuid}"
                )

                is_terminal_phase = phase in ["Succeeded", "Failed"]
                is_deleted_event = event["type"] == "DELETED"

                if is_terminal_phase or is_deleted_event:
                    exit_code = -1
                    reason = "Unknown"

                    self.pm.log.debug(
                        f"Pod {proc_uuid} terminated: phase={phase}, is_terminal={is_terminal_phase}, is_deleted={is_deleted_event}"
                    )
                    if phase == "Succeeded":
                        exit_code = 0
                        reason = "GracefulShutdown"
                    elif (
                        status.container_statuses
                        and status.container_statuses[0].state.terminated
                    ):
                        terminated_state = status.container_statuses[
                            0
                        ].state.terminated
                        exit_code = terminated_state.exit_code
                        reason = (
                            terminated_state.reason
                        )  # Finally, handle deleted events
                    elif is_deleted_event:
                        if phase == "Succeeded":
                            exit_code = 0
                            reason = "GracefulShutdown"
                        else:
                            exit_code = -1
                            reason = "PodDeleted"

                    self.pm.log.debug(
                        f"Final result for pod {proc_uuid}: exit_code={exit_code}, reason={reason}"
                    )

                    self.processed_uuids.add(proc_uuid)
                    self.pm.notify_termination(
                        proc_uuid, exit_code, reason, session
                    )

        except ApiException as e:
            if e.status == 410:
                pass
            else:
                self.pm.log.error(
                    f"K8s API error in watcher: {e}. Restarting watch."
                )
            sleep(3)

        except Exception as e:
            self.pm.log.error(
                "K8s watcher thread encountered an error, stacktrace present in the debug logs. Restarting watch."
            )
            self.pm.log.debug(f"K8s watcher thread error: {e}.")
            sleep(self.pm.watcher_retry_sleep)

K8sProcessManager(configuration, **kwargs)

Bases: ProcessManager

Manages processes as Kubernetes Pods. This ProcessManager interfaces with the Kubernetes API to start, stop, and monitor applications running in Pods.

Parameters:

Name Type Description Default
configuration ProcessManagerConfHandler

The process manager configuration object containing image, settings (labels, service, pod_management, volumes, cleanup, checking), and other runtime parameters.

required
**kwargs

Additional keyword arguments passed to the parent ProcessManager.

{}

Raises:

Type Description
ConfigException

If the Kubernetes configuration cannot be loaded.

Source code in drunc/process_manager/k8s_process_manager.py
def __init__(self, configuration: ProcessManagerConfHandler, **kwargs) -> None:
    """
    Manages processes as Kubernetes Pods.
    This ProcessManager interfaces with the Kubernetes API to start, stop, and monitor
    applications running in Pods.

    Args:
        configuration: The process manager configuration object containing image,
            settings (labels, service, pod_management, volumes, cleanup, checking),
            and other runtime parameters.
        **kwargs: Additional keyword arguments passed to the parent ProcessManager.

    Raises:
        ConfigException: If the Kubernetes configuration cannot be loaded.
    """

    # Get the username for the session. This is needed as k8s does not pass the
    # username through to the pod
    self.session = getpass.getuser()
    super().__init__(configuration=configuration, session=self.session, **kwargs)

    # Setup the loger
    self.log = get_logger("process_manager.k8s-process-manager")

    # Validate that the host this process manager is running on is part of a
    # kubernetes cluster
    try:
        config.load_kube_config()
    except ConfigException as e:
        self.log.critical("--- 🚨 KUBERNETES CONFIGURATION ERROR ---")
        self.log.critical(f"Failed to load kube-config: {e}")
        self.log.critical(
            "Please ensure 'kubectl' is configured correctly or the KUBECONFIG environment variable is set."
        )
        self.log.critical("----------------------------------------------")
        sys.exit(1)

    # Set up the hooks to the k8s API, makes later setup easier
    self._k8s_client = client
    self._core_v1_api = client.CoreV1Api()
    self._meta_v1_api = client.V1ObjectMeta
    self._pod_spec_v1_api = client.V1PodSpec
    self._api_error_v1_api = client.rest.ApiException

    # Storage for process orchestrator parameters
    self.managed_sessions = set()
    self.watchers = []
    self._start_watcher()
    self.sessions_pending_deletion = set()
    self.uuids_pending_deletion = set()
    self.termination_complete_event = threading.Event()
    self.final_exit_codes = {}
    self.local_connection_server_is_booted = False

    # Host verification cache: {hostname: (is_valid, timestamp)}
    self._host_cache = {}
    self._host_cache_lock = threading.Lock()

    # Get settings from configuration JSON file
    # Any comments following this one will relate to the parameters retrieved from
    # the configuration file if the comment starts as "CONFIGURATION -"
    settings = getattr(self.configuration.data, "settings", {})

    # CONFIGURATION - label defaults
    labels = settings.get("labels", {})
    self.drunc_label = labels.get("drunc_label", "drunc.daq")

    # Readout app selector
    self.perf_selector = settings.get("readout_app_selector", "runp").lower()

    # Readout app selector
    self.perf_selector = settings.get("readout_app_selector", "runp").lower()

    # CONFIGURATION - connection server connection port numbers
    self.connection_server_port = None
    self.connection_server_node_port = None

    # CONFIGURATION - per-pod service port number
    service = settings.get("service", {})
    self.headless_discovery_port = service.get("headless_discovery_port", 80)

    # CONFIGURATION - pod startup management parameters
    pod_management = settings.get("pod_management", {})
    self.kill_timeout = pod_management.get("kill_timeout", 30)
    self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60)

    # CONFIGURATION - restart cleanup parameters
    cleanup = settings.get("cleanup", {})
    self.restart_cleanup_time = cleanup.get("restart_cleanup_time", 10.0)
    self.restart_cleanup_polling = cleanup.get("restart_cleanup_polling", 0.5)

    # CONFIGURATION - volume mounts
    self.volume_configs = settings.get("volumes", [])

    # CONFIGURATION - home path definition
    self.home_path_base = settings.get("home_path_base", None)

    # CONFIGURATION - timeouts and check parameters
    checking = settings.get("checking", {})
    self.watcher_retry_sleep = checking.get("watcher_retry_sleep", 5)
    self.pod_status_check_sleep = checking.get("pod_status_check_sleep", 1)
    self._host_cache_expiry = checking.get("host_cache_expiry", 300)
    self.grpc_startup_timeout = checking.get("grpc_startup_timeout", 30)
    self.socket_retry_timeout = checking.get("socket_retry_timeout", 1.0)

    # Get and print the list of active namespaces managed by drunc
    namespaces = self._core_v1_api.list_namespace(
        label_selector=f"creator.{self.drunc_label}={self.__class__.__name__}"
    )
    namespace_names = [ns.metadata.name for ns in namespaces.items]
    namespace_list_str = "\n - ".join(namespace_names)
    if namespace_list_str:
        self.log.info(
            f"Active namespaces created by drunc:\n - {namespace_list_str}"
        )
    else:
        self.log.info("No active namespace created by drunc")

    # Set up signal handlers for cleanup when parent process dies
    self._setup_signal_handlers()
Functions
__boot(boot_request, uuid)

Internal boot method for creating a pod and waiting for critical services.

Orchestrates the full boot sequence: determines tree labels and roles, runs pre-boot validation, prepares the namespace, stores the boot request, creates the pod and its service, adds the UUID label, and performs blocking readiness waits for the LCS or root controller if applicable.

Returns:

Type Description
ProcessInstance

process_instance - a ProcessInstance protobuf with RUNNING status

Source code in drunc/process_manager/k8s_process_manager.py
def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
    """
    Internal boot method for creating a pod and waiting for critical services.

    Orchestrates the full boot sequence: determines tree labels and roles,
    runs pre-boot validation, prepares the namespace, stores the boot request,
    creates the pod and its service, adds the UUID label, and performs
    blocking readiness waits for the LCS or root controller if applicable.

    Args:
        boot_request - the BootRequest protobuf defining the process to start
        uuid - the UUID string to assign to this process

    Returns:
        process_instance - a ProcessInstance protobuf with RUNNING status
    """
    session = boot_request.process_description.metadata.session
    podname = boot_request.process_description.metadata.name
    tree_labels = self._get_tree_labels(
        boot_request.process_description.metadata.tree_id, podname, boot_request
    )
    # Pre-checks (Session validation, NodePort collision)
    self._run_pre_boot_checks(session, podname, boot_request)

    # Resource Creation (Namespace, Pod, Labels)
    self._prepare_namespace(session)
    self.boot_request[uuid] = BootRequest()
    self.boot_request[uuid].CopyFrom(boot_request)

    self._create_pod(podname, session, boot_request, tree_labels)
    self._add_label(podname, "pod", "uuid", uuid, session=session)
    self.log.info(f'"{session}.{podname}":{uuid} boot request sent.')

    # Special handling and blocking wait for critical processes
    if self._is_local_connection_server(tree_labels, podname):
        self._wait_for_lcs_readiness(podname, session)
    elif self._is_root_controller(tree_labels):
        self._wait_for_controller_readiness(podname, session, boot_request)

    # Post-Process
    pd, pr, pu = (
        ProcessDescription(),
        ProcessRestriction(),
        ProcessUUID(uuid=uuid),
    )
    pd.CopyFrom(boot_request.process_description)
    pr.CopyFrom(boot_request.process_restriction)

    return ProcessInstance(
        process_description=pd,
        process_restriction=pr,
        status_code=ProcessInstance.StatusCode.RUNNING,
        uuid=pu,
    )
is_alive(podname, session)

Checks if a pod is currently in the 'Running' phase.

Parameters:

Name Type Description Default
podname str

The name of the pod to check.

required
session str

The Kubernetes namespace (session) containing the pod.

required

Returns:

Type Description
bool

True if the pod exists and its phase is 'Running', False otherwise.

Source code in drunc/process_manager/k8s_process_manager.py
def is_alive(self, podname: str, session: str) -> bool:
    """
    Checks if a pod is currently in the 'Running' phase.

    Args:
        podname: The name of the pod to check.
        session: The Kubernetes namespace (session) containing the pod.

    Returns:
        True if the pod exists and its phase is 'Running', False otherwise.
    """

    try:
        # Attempt to get the pod status, if you can the pod is alive
        pod_status = self._core_v1_api.read_namespaced_pod_status(podname, session)
        return pod_status.status.phase == "Running"
    except self._api_error_v1_api as e:
        # Error 404 implies that if pod is not found, i.e. it is not alive
        if e.status == 404:
            return False
        # If some other exception occurs, the pod is not found and the cause of the
        # exception is logged.
        self.log.error(f"Error checking status for pod {session}.{podname}: {e}")
        return False
notify_termination(proc_uuid, exit_code, reason, session)

Callback for when a pod terminates.

Updates the final exit code, broadcasts a status update, and signals the termination_complete_event when all pending deletions are confirmed.

Parameters:

Name Type Description Default
proc_uuid str

The UUID string of the terminated process.

required
exit_code int

The integer exit code of the terminated pod.

required
reason str

A string describing the termination reason (e.g. 'GracefulShutdown', 'PodDeleted').

required
session str

The Kubernetes namespace (session) the pod belonged to.

required
Source code in drunc/process_manager/k8s_process_manager.py
def notify_termination(
    self, proc_uuid: str, exit_code: int, reason: str, session: str
) -> None:
    """
    Callback for when a pod terminates.

    Updates the final exit code, broadcasts a status update, and signals
    the termination_complete_event when all pending deletions are confirmed.

    Args:
        proc_uuid: The UUID string of the terminated process.
        exit_code: The integer exit code of the terminated pod.
        reason: A string describing the termination reason (e.g. 'GracefulShutdown', 'PodDeleted').
        session: The Kubernetes namespace (session) the pod belonged to.
    """
    self.log.debug(
        f"notify_termination called for '{proc_uuid}'. Pending={self.uuids_pending_deletion}"
    )

    # Publish a log message and to kafka for each process that is terminated
    if proc_uuid in self.boot_request:
        # Get the exit data, and compose a message for tty viewing
        self.final_exit_codes[proc_uuid] = exit_code
        meta = self.boot_request[proc_uuid].process_description.metadata
        end_str = f"Pod '{meta.name}' (session: '{session}', user: '{meta.user}', uuid: {proc_uuid}) terminated with exit code {exit_code}. Reason: {reason}"

        # Publish this information
        self.log.info(end_str)
        self.broadcast(end_str, BroadcastType.SUBPROCESS_STATUS_UPDATE)

    # Clear the list of processes being removed
    if proc_uuid in self.uuids_pending_deletion:
        self.uuids_pending_deletion.remove(proc_uuid)
        self.log.debug(
            f"Watcher confirmed termination of {proc_uuid}. {len(self.uuids_pending_deletion)} pods remaining."
        )
        if not self.uuids_pending_deletion:
            self.log.debug("All pending pods terminated, setting event.")
            self.termination_complete_event.set()

Functions