Skip to content

process_manager_driver

drunc.process_manager.process_manager_driver

Classes

ProcessManagerDriver(address, token)

Source code in drunc/process_manager/process_manager_driver.py
def __init__(self, address: str, token: Token):
    self.log = get_logger("process_manager_driver", rich_handler=True)
    self.address = address
    options = [
        ("grpc.keepalive_time_ms", 60000)  # pings the server every 60 seconds
    ]
    self.channel = grpc.insecure_channel(self.address, options=options)
    self.stub = ProcessManagerStub(self.channel)
    self.token = copy_token(token)
Functions
check_port_conflicts(db, session_dal)

Check that the ports allocated in the configuration file are available. If the file is editable, make the changes in the file itself. Otherwise, make the changes in the session_dal, logging the difference.

Note - this logic will go into the run control servuce, where the mapping from OKS files will be run.

Returns db - configuration database object, potentially updated if there were port conflicts session_dal - DAL object, potentially updated if there were port conflicts

Source code in drunc/process_manager/process_manager_driver.py
def check_port_conflicts(
    self, db: conffwk.Configuration, session_dal: "conffwk.dal.Session"
) -> tuple[conffwk.Configuration, "conffwk.dal.Session"]:
    """
    Check that the ports allocated in the configuration file are available. If the
    file is editable, make the changes in the file itself. Otherwise, make the
    changes in the session_dal, logging the difference.

    Note - this logic will go into the run control servuce, where the mapping from
    OKS files will be run.

    Args:
        db - configuration database object to get the port numbers from
        session_dal - DAL object to get the port numbers from

    Returns
        db - configuration database object, potentially updated if there were port conflicts
        session_dal - DAL object, potentially updated if there were port conflicts

    Raises:
        None
    """

    # Firstly, check if the file is read only. If so, we will only update the DAL
    configuration_file = db.active_database
    config_is_read_only: bool = file_is_read_only(
        get_full_db_path(configuration_file)
    )

    # Get the configuration ID to use in logging and potential DAL re-instantiation
    configuration_id = session_dal.id

    # Keep track of whether we made any changes, to avoid unnecessary DAL re-instantiation
    config_updated = False

    # Check that the address of the root controller is available, otherwise change
    # it to one that is available
    root_controller_host: str = session_dal.segment.controller.runs_on.runs_on.id
    root_controller_service_list: int = [
        service
        for service in session_dal.segment.controller.exposes_service
        if "_control" in service.id
    ]
    root_controller_service = root_controller_service_list[0]
    root_controller_port = root_controller_service.port

    if not is_port_available(root_controller_host, root_controller_port):
        config_updated = True
        if config_is_read_only:
            new_port = find_free_port(30000, 32767)
            root_controller_service.port = new_port
            self.log.info(
                f"Configuration file is read-only, updated root controller port in DAL to {new_port} to resolve conflict with occupied port {root_controller_port}"
            )
        else:
            new_port = set_rc_controller_port(configuration_file, configuration_id)
            strip_non_drunc_loggers()
            self.log.info(
                f"The root controller port at {root_controller_port} is occupied, updating it to {new_port}"
            )

    # If a local connectivity service is being used, perform the same checks
    # Temporarily removed to allow integration tests to pass without restructuring
    # Note - if infrastructure applications outside of the connectivity service are spawned, this will need to be adjusted.
    if session_dal.infrastructure_applications:  # Check if the own application needs to be spawned, or if an externally managed one is in use (e.g. if using ehn1 connectivity service or integration tests.)
        connectivity_service_host: str = session_dal.connectivity_service.host
        connectivity_service_port = session_dal.connectivity_service.service.port
        if not is_port_available(
            connectivity_service_host, connectivity_service_port
        ):
            config_updated = True
            if config_is_read_only:
                err_str = (
                    "Configuration is read only, and [red]the connectivity service "
                    f"address ({connectivity_service_host}:"
                    f"{connectivity_service_port}) is currently occupied[/red]. "
                    "[yellow]To fix this, clone the configuration file locally and "
                    "rerun[/yellow]."
                )
                raise DruncSetupException(err_str)
            else:
                new_port = set_connectivity_service_port(
                    configuration_file, configuration_id
                )
                strip_non_drunc_loggers()
                self.log.info(
                    f"The local connectivity service port at {connectivity_service_port} is occupied, updating it to {new_port}"
                )

    if not config_updated:
        self.log.info("Configuration did not require modifications.")
        return db, session_dal

    if config_is_read_only:
        # If the configuration file is read-only, we updated the DAL directly, so we can just
        # return it without re-instantiating
        self.log.info(
            "Configuration required updates but file is read-only, returning updated DAL without changing the original file."
        )
        return db, session_dal
    else:
        # If the configuration file has been modified, instantiate a new DAL
        updated_db = conffwk.Configuration("oksconflibs:" + configuration_file)
        updated_session_dal = updated_db.get_dal(
            class_name="Session", uid=configuration_id
        )
        self.log.info(
            "Configuration required updates and file is writable, re-instantiating DAL to reflect changes in the file."
        )
        return (
            updated_db,
            updated_session_dal,
        )
close()

Close the gRPC channel.

Returns:

Type Description
None

None

Source code in drunc/process_manager/process_manager_driver.py
def close(self) -> None:
    """
    Close the gRPC channel.

    Args:
        None

    Returns:
        None

    Raises:
        None
    """
    try:
        self.log.debug("Closing gRPC channel to Process Manager")
        self.channel.close()
    except Exception as e:
        self.log.error(f"Error closing gRPC channel: {e}", exc_info=True)
update_connectivity_port_dal(env_variables, new_port)

Process a dal::Variable object, placing key/value pairs in a dictionary

Source code in drunc/process_manager/process_manager_driver.py
def update_connectivity_port_dal(
    self,
    env_variables: list["conffwk.dal.Variable | conffwk.dal.VariableSet"],
    new_port: int,
) -> None:
    """Process a dal::Variable object, placing key/value pairs in a dictionary"""
    for item in env_variables:
        if item.className() == "VariableSet":
            self.update_connectivity_port_dal(item.contains, new_port)
        else:
            if item.className() == "Variable":
                if item.name == "CONNECTION_PORT":
                    item.value = new_port

Functions