Skip to content

grpc_child

drunc.controller.children_interface.grpc_child

Classes

gRPCChildNode(name, configuration, uri, connectivity_service, init_token=None)

Bases: ChildNode

Source code in drunc/controller/children_interface/grpc_child.py
def __init__(
    self,
    name: str,
    configuration: gRCPChildConfHandler,
    uri: str,
    connectivity_service,
    init_token: Token | None = None,
):
    super().__init__(name, ControlType.gRPC)

    self.configuration = configuration
    self.connectivity_service = connectivity_service
    self._lock = threading.Lock()
    self.init_token = init_token

    host, port = uri.split(":")
    port = int(port)

    if port == 0:
        raise DruncSetupException(
            f"Application {name} does not expose a control service in the configuration, or has not advertised itself to the application registry service, or the application registry service is not reachable."
        )

    self.uri = f"{host}:{port}"
    self._setup_connection()
Functions
handle_child_grpc_error(error)

Handle gRPC errors from sending commands to the child controller.

Parameters:

Name Type Description Default
error RpcError

The gRPC error to handle.

required
Source code in drunc/controller/children_interface/grpc_child.py
def handle_child_grpc_error(self, error: grpc.RpcError) -> NoReturn:
    """Handle gRPC errors from sending commands to the child controller.

    Args:
        error: The gRPC error to handle.
    """
    rethrow_if_unreachable_server(error)

    # RpcError is also a subclass of Call, and can be used in from_call.
    # The type stubs in types-grpcio do not reflect this, so we must cast.
    # See https://github.com/grpc/grpc/issues/10885.
    status = rpc_status.from_call(cast(grpc.Call, error))

    self.log.error(f"Error sending command to child node {self.name} at {self.uri}")

    if hasattr(status, "message"):
        self.log.error(status.message)

    if hasattr(status, "details"):
        for detail in status.details:
            if detail.Is(Stacktrace.DESCRIPTOR):
                text = "Stacktrace on remote server!\n"
                stack = unpack_any(detail, Stacktrace)
                for l in stack.text:
                    text += l + "\n"
                self.log.error(text)
            elif detail.Is(PlainText.DESCRIPTOR):
                text = unpack_any(detail, PlainText)
                self.log.error(text)

    raise error

Functions