Skip to content

utils

drunc.process_manager.utils

Classes

Functions

get_pm_type_from_name(pm_name)

Get the ProcessManagerTypes enum value from a string name.

Parameters:

Name Type Description Default
pm_name str

The name of the process manager type (e.g., "SSH", "K8s").

required

Returns:

Name Type Description
ProcessManagerTypes ProcessManagerTypes

The corresponding enum value.

Source code in drunc/process_manager/utils.py
def get_pm_type_from_name(pm_name: str) -> ProcessManagerTypes:
    """
    Get the ProcessManagerTypes enum value from a string name.

    Args:
        pm_name (str): The name of the process manager type (e.g., "SSH", "K8s").

    Returns:
        ProcessManagerTypes: The corresponding enum value.
    """
    pm_conf_file = get_process_manager_configuration(pm_name)

    conf_path, conf_type = parse_conf_url(pm_conf_file)
    pmch = ProcessManagerConfHandler(
        log_path="./", type=conf_type, data=conf_path.split(":")[1]
    )

    return pmch.data.type

on_parent_exit(signum)

Return a function to be run in a child process which will trigger SIGNAME to be sent when the parent process dies

Source code in drunc/process_manager/utils.py
def on_parent_exit(signum):
    """Return a function to be run in a child process which will trigger
    SIGNAME to be sent when the parent process dies
    """

    def set_parent_exit_signal():
        from ctypes import cdll

        # http://linux.die.net/man/2/prctl
        result = cdll["libc.so.6"].prctl(PR_SET_PDEATHSIG, signum)
        if result != 0:
            raise PrCtlError("prctl failed with error code %s" % result)

    return set_parent_exit_signal

order_process_by_name(processes)

Given a list of processes, perform a tiered order by the name

Source code in drunc/process_manager/utils.py
def order_process_by_name(processes: list[ProcessInstance]):
    """Given a list of processes, perform a tiered order by the name"""
    by_session = {}
    for process in processes:
        m = process.process_description.metadata
        by_session.setdefault(m.session, []).append(process)

    ordered = []
    for session in sorted(by_session.keys()):
        session_processes = by_session[session]
        node_by_id = {}
        children = {}
        roots = []

        for process in session_processes:
            tree_id = process.process_description.metadata.tree_id or ""
            node_by_id.setdefault(tree_id, []).append(process)

        for tree_id, processes in node_by_id.items():
            node_by_id[tree_id] = sorted(
                processes,
                key=lambda p: (
                    p.process_description.metadata.name,
                    p.uuid.uuid,
                ),
            )

        for tree_id in node_by_id.keys():
            parent_id = tree_id.rsplit(".", 1)[0] if "." in tree_id else None
            if not parent_id or parent_id not in node_by_id:
                roots.append(tree_id)
            else:
                children.setdefault(parent_id, []).append(tree_id)

        def sort_key(tree_id):
            m = node_by_id[tree_id][0].process_description.metadata
            return (m.name, tree_id)

        def walk(tree_id):
            ordered.extend(node_by_id[tree_id])
            for child_id in sorted(children.get(tree_id, []), key=sort_key):
                walk(child_id)

        for root_id in sorted(roots, key=sort_key):
            walk(root_id)

    return ordered

validate_k8s_session_name(session)

Validate that the session/namespace name is valid according to RFC1123 label standard.

Parameters:

Name Type Description Default
session str

The session/namespace name to validate.

required

Returns:

Name Type Description
bool bool

True if the session name is valid, False otherwise.

Source code in drunc/process_manager/utils.py
def validate_k8s_session_name(session: str) -> bool:
    """
    Validate that the session/namespace name is valid according to RFC1123 label standard.

    Args:
        session (str): The session/namespace name to validate.

    Returns:
        bool: True if the session name is valid, False otherwise.
    """
    session_re = re.compile(r"^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?$")
    if not session_re.match(session):
        return False
    return True