Skip to content

process_manager_driver

drunc.process_manager.process_manager_driver

Classes

ProcessManagerDriver(address, token)

Source code in drunc/process_manager/process_manager_driver.py
def __init__(self, address: str, token: Token):
    self.log = get_logger("process_manager_driver", rich_handler=True)
    self.address = address
    options = [
        ("grpc.keepalive_time_ms", 60000)  # pings the server every 60 seconds
    ]
    self.channel = grpc.insecure_channel(self.address, options=options)
    self.stub = ProcessManagerStub(self.channel)
    self.token = copy_token(token)
Functions
close()

Close the gRPC channel.

Returns:

Type Description
None

None

Source code in drunc/process_manager/process_manager_driver.py
def close(self) -> None:
    """
    Close the gRPC channel.

    Args:
        None

    Returns:
        None

    Raises:
        None
    """
    try:
        self.log.debug("Closing gRPC channel to Process Manager")
        self.channel.close()
    except Exception as e:
        self.log.error(f"Error closing gRPC channel: {e}", exc_info=True)

Functions