Skip to content

controller_driver

drunc.controller.controller_driver

Classes

ControllerDriver(address, token)

Source code in drunc/controller/controller_driver.py
def __init__(self, address: str, token: Token):
    self.log = get_logger("controller.ControllerDriver")
    self.address = address
    options = [
        ("grpc.keepalive_time_ms", 60000)  # pings the server every 60 seconds
    ]

    try:
        resolved_address = self._resolve_address_to_ipv4(address)
    except ValueError as e:
        self.log.error(
            f"Failed to initialize ControllerDriver. Invalid address '{self.address}': {e}"
        )
        raise e

    target_address = f"ipv4:{resolved_address}"
    self.channel = grpc.insecure_channel(target_address, options=options)
    self.stub = ControllerStub(self.channel)
    self.token = Token()
    self.token.CopyFrom(token)

Functions