Skip to content

grpc_child

drunc.controller.children_interface.grpc_child

Classes

gRPCChildNode(name, configuration, uri, connectivity_service, init_token=None)

Bases: ChildNode

Source code in drunc/controller/children_interface/grpc_child.py
def __init__(
    self,
    name: str,
    configuration: gRCPChildConfHandler,
    uri: str,
    connectivity_service,
    init_token: Token | None = None,
):
    super().__init__(name, ControlType.gRPC)

    self.configuration = configuration
    self.connectivity_service = connectivity_service
    self._lock = threading.Lock()
    self.init_token = init_token

    host, port = uri.split(":")
    port = int(port)

    if port == 0:
        raise DruncSetupException(
            f"Application {name} does not expose a control service in the configuration, or has not advertised itself to the application registry service, or the application registry service is not reachable."
        )

    self.uri = f"{host}:{port}"
    self._setup_connection()
Methods:
check_connection()

Probe child connectivity and retry once after reconnecting if needed.

Use the describe endpoint to check if the child is reachable. If not, the node attempts to resolve a fresh endpoint from the connectivity service, rebuild the gRPC channel, and retry the probe once.

Returns:

Type Description
bool

True if the child is reachable either immediately or after a successful

bool

reconnection attempt, otherwise False.

Source code in drunc/controller/children_interface/grpc_child.py
def check_connection(self) -> bool:
    """Probe child connectivity and retry once after reconnecting if needed.

    Use the describe endpoint to check if the child is reachable. If not,
    the node attempts to resolve a fresh endpoint from the connectivity
    service, rebuild the gRPC channel, and retry the probe once.

    Returns:
        True if the child is reachable either immediately or after a successful
        reconnection attempt, otherwise False.
    """
    request = DescribeRequest(
        token=None,
        target="",
        execute_along_path=False,
        execute_on_all_subsequent_children_in_path=False,
    )

    try:
        self.stub.describe(request)
        return True
    except grpc.RpcError as error:
        try:
            self.handle_child_grpc_error(error)
        except ServerUnreachable:
            self.log.info(
                f"Connection to {self.name} at {self.uri} failed during connectivity check, attempting to reconnect..."
            )
            try:
                self._attempt_reconnection(lambda: self.stub.describe(request))
                return True
            except Exception as reconnect_error:
                self.log.warning(
                    f"Connection check failed for {self.name}: {reconnect_error}"
                )
                return False
        except Exception as unexpected_error:
            self.log.warning(
                f"Connection check failed for {self.name}: {unexpected_error}"
            )
            return False

    return False
handle_child_grpc_error(error)

Handle gRPC errors from sending commands to the child controller.

Parameters:

Name Type Description Default
error RpcError

The gRPC error to handle.

required
Source code in drunc/controller/children_interface/grpc_child.py
def handle_child_grpc_error(self, error: grpc.RpcError) -> NoReturn:
    """Handle gRPC errors from sending commands to the child controller.

    Args:
        error: The gRPC error to handle.
    """
    rethrow_if_unreachable_server(error)

    # RpcError is also a subclass of Call, and can be used in from_call.
    # The type stubs in types-grpcio do not reflect this, so we must cast.
    # See https://github.com/grpc/grpc/issues/10885.
    status = rpc_status.from_call(cast(grpc.Call, error))

    self.log.error(f"Error sending command to child node {self.name} at {self.uri}")

    if hasattr(status, "message"):
        self.log.error(status.message)

    if hasattr(status, "details"):
        for detail in status.details:
            if detail.Is(Stacktrace.DESCRIPTOR):
                text = "Stacktrace on remote server!\n"
                stack = unpack_any(detail, Stacktrace)
                for l in stack.text:
                    text += l + "\n"
                self.log.error(text)
            elif detail.Is(PlainText.DESCRIPTOR):
                text = unpack_any(detail, PlainText)
                self.log.error(text)

    raise error

Functions: