Skip to content

controller_interface

interfaces.controller_interface

Module providing functions to interact with the drunc controller.

Attributes

MSG_TYPE = {Argument.Type.INT: int_msg, Argument.Type.FLOAT: float_msg, Argument.Type.STRING: string_msg, Argument.Type.BOOL: bool_msg} module-attribute

Mapping of argument types to their protobuf message types.

Functions

get_arguments(event)

Get the arguments required to run an event.

Parameters:

Name Type Description Default
event str

The event to get the arguments for.

required

Returns:

Type Description
list[Argument]

The arguments for the event.

Source code in interfaces/controller_interface.py
def get_arguments(event: str) -> list[Argument]:
    """Get the arguments required to run an event.

    Args:
        event: The event to get the arguments for.

    Returns:
        The arguments for the event.
    """
    controller = get_controller_driver()
    events = controller.describe_fsm().data.commands
    try:
        command = next(c for c in events if c.name == event)
    except StopIteration:
        raise ValueError(
            f"Event '{event}' not found in FSM. Valid events are: "
            f"{', '.join(c.name for c in events)}"
        )
    return command.arguments

get_controller_driver()

Get a ControllerDriver instance.

Source code in interfaces/controller_interface.py
def get_controller_driver() -> ControllerDriver:
    """Get a ControllerDriver instance."""
    uri = get_controller_uri()
    token = create_dummy_token_from_uname()
    return ControllerDriver(uri, token=token)

get_controller_status()

Get the controller status.

Source code in interfaces/controller_interface.py
def get_controller_status() -> Status:
    """Get the controller status."""
    return get_controller_driver().status()

get_controller_uri() cached

Find where the root controller is running via the connectivity service.

Returns:

Name Type Description
str str

The URI of the root controller.

Source code in interfaces/controller_interface.py
@functools.cache
def get_controller_uri() -> str:
    """Find where the root controller is running via the connectivity service.

    Returns:
        str: The URI of the root controller.
    """
    csc = ConnectivityServiceClient(settings.CSC_SESSION, settings.CSC_URL)
    uris = csc.resolve("root-controller_control", "RunControlMessage")
    if len(uris) != 1:
        raise ValueError(
            f"Expected 1 URI for root-controller, found {len(uris)}: {uris}"
        )

    return uris[0]["uri"].removeprefix("grpc://")

get_detectors(description=None)

Get the detectors available in the controller for each application.

Returns:

Type Description
dict[str, str]

The detectors available in the controller.

Source code in interfaces/controller_interface.py
def get_detectors(description: Description | None = None) -> dict[str, str]:
    """Get the detectors available in the controller for each application.

    Returns:
        The detectors available in the controller.
    """
    detectors = {}
    if description is None:
        description = get_controller_driver().describe()

    if hasattr(description.data, "info"):  # type: ignore [union-attr]
        detectors[description.data.name] = description.data.info  # type: ignore [union-attr]

    for child in description.children:  # type: ignore [union-attr]
        if child is not None:
            detectors.update(get_detectors(child))

    return detectors

get_fsm_state()

Get the finite state machine state.

Returns:

Name Type Description
str str

The state the FSM is in.

Source code in interfaces/controller_interface.py
def get_fsm_state() -> str:
    """Get the finite state machine state.

    Returns:
        str: The state the FSM is in.
    """
    return get_controller_status().data.state  # type: ignore [attr-defined]

process_arguments(event, arguments)

Process the arguments for an event.

Parameters:

Name Type Description Default
event str

The event to process.

required
arguments dict[str, Any]

The arguments to process.

required

Returns:

Name Type Description
dict dict[str, Any]

The processed arguments in a form compatible with the protobuf definition.

Source code in interfaces/controller_interface.py
def process_arguments(  # type: ignore[explicit-any]
    event: str,
    arguments: dict[str, Any],
) -> dict[str, Any]:
    """Process the arguments for an event.

    Args:
        event: The event to process.
        arguments: The arguments to process.

    Returns:
        dict: The processed arguments in a form compatible with the protobuf definition.
    """
    valid_args = get_arguments(event)
    processed = {}
    for arg in valid_args:
        if arg.name not in arguments or arguments[arg.name] is None:
            continue

        processed[arg.name] = pack_to_any(MSG_TYPE[arg.type](value=arguments[arg.name]))

    return processed

send_event(event, arguments)

Send an event to the controller.

Parameters:

Name Type Description Default
event str

The event to send.

required
arguments dict[str, Any]

The arguments for the event.

required

Raises:

Type Description
RuntimeError

If the event failed, reporting the flag.

Source code in interfaces/controller_interface.py
def send_event(  # type: ignore[explicit-any]
    event: str,
    arguments: dict[str, Any],
) -> None:
    """Send an event to the controller.

    Args:
        event: The event to send.
        arguments: The arguments for the event.

    Raises:
        RuntimeError: If the event failed, reporting the flag.
    """
    controller = get_controller_driver()
    controller.take_control()
    command = FSMCommand(
        command_name=event, arguments=process_arguments(event, arguments)
    )
    response = controller.execute_fsm_command(arguments=command)
    if response.flag != FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY:
        raise RuntimeError(
            f"Event '{event}' failed with flag "
            f"{FSMResponseFlag(response.flag)} "  # type: ignore [call-arg]
            f"and message '{response.data}'"
        )