Skip to content

shell_utils

drunc.controller.interface.shell_utils

Functions

generate_fsm_command(ctx, transition, controller_name)

Generate a click command for a given FSM transition.

Parameters:

Name Type Description Default
ctx

UnifiedShellContext

required
transition FSMCommandDescription

FSMCommandDescription of the transition to generate the command for

required
controller_name str

Name of the controller to run the command on

required

Returns:

Type Description

A click command that can be added to the CLI

Raises:

Type Description
Exception

If the argument type is unhandled

Source code in drunc/controller/interface/shell_utils.py
def generate_fsm_command(ctx, transition: FSMCommandDescription, controller_name: str):
    """
    Generate a click command for a given FSM transition.

    Args:
        ctx: UnifiedShellContext
        transition: FSMCommandDescription of the transition to generate the command for
        controller_name: Name of the controller to run the command on

    Returns:
        A click command that can be added to the CLI

    Raises:
        Exception: If the argument type is unhandled
    """

    # Construct the partial command executing the defined FSM command with click options
    cmd: functools.partial = functools.partial(
        run_one_fsm_command,
        controller_name=controller_name,
        transition_name=transition.name,
    )
    cmd = click.pass_obj(cmd)
    cmd = click.option(
        "--target",
        type=str,
        help="The target to address",
        default="",
    )(cmd)

    # Define the mapping of gRPC argument types to click types
    type_map: dict[int, str | int | float | bool] = {
        Argument.Type.STRING: str,
        Argument.Type.INT: int,
        Argument.Type.FLOAT: float,
        Argument.Type.BOOL: bool,
    }

    # Define the mapping of gRPC argument types to their corresponding protobuf message
    # types for default value unpacking
    msg_map: dict(any_pb2) = {
        str: string_msg,
        int: int_msg,
        float: float_msg,
        bool: bool_msg,
    }

    # Iterate over the Arguments of the Transitions, and add them as click options to
    # the click command
    for argument in transition.arguments:  # type: Argument
        # Map the gRPC argument type to a click type, raise an exception if the type is
        # unhandled
        atype: Argument.Type.V = type_map.get(argument.type)
        if not atype:
            raise Exception(f"Unhandled argument type '{argument.type}'")

        # Unpack the default value of the argument if it exists, and convert it to the
        # appropriate type
        raw_default: int | float | str | bool | None = None
        if argument.HasField("default_value"):
            unpacked = unpack_any(argument.default_value, msg_map[atype])
            raw_default = atype(unpacked.value)

        # Check for default values defined in the environment variables
        argument_name_cli: str = argument.name.lower().replace("_", "-")
        env_var: str = f"DRUNC_{argument.name.upper()}_DEFAULT"
        env_val: str | None = os.getenv(env_var)

        # Assign the default value if it is present
        if env_val is not None:
            log.info(f"Env override for {argument_name_cli}: {env_val}")
            default_value = atype(env_val)
        else:
            default_value = raw_default

        # Add the argument to the click command
        cmd = click.option(
            f"--{argument_name_cli}",
            type=atype,
            default=default_value,
            show_default=True,
            required=(
                (argument.presence == Argument.Presence.MANDATORY)
                and (default_value is None)
            ),
            help=argument.help,
        )(cmd)

    # Construct the click command
    cmd_name: str = format_name_for_cli(transition.name)
    cmd = click.command(
        name=cmd_name,
        help=f"Execute the transition {transition.name} on the controller {controller_name}",
    )(cmd)

    return cmd, cmd_name

get_hostname_smart(ip_or_host, timeout_seconds=0.2) cached

Resolves an IP to a hostname, with optimizations: 1. Caches all results. 2. Immediately skips private/internal IPs (like K8s). 3. Uses a short timeout for public IPs.

Source code in drunc/controller/interface/shell_utils.py
@functools.lru_cache(maxsize=4096)
def get_hostname_smart(ip_or_host: str, timeout_seconds: float = 0.2) -> str:
    """
    Resolves an IP to a hostname, with optimizations:
    1. Caches all results.
    2. Immediately skips private/internal IPs (like K8s).
    3. Uses a short timeout for public IPs.
    """

    if not ip_or_host:
        return ""

    try:
        ip_address = ipaddress.ip_address(ip_or_host)
    except ValueError:
        return ip_or_host
    # If public IP, try to resolve it.
    original_timeout = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(timeout_seconds)
        try:
            hostname, _, _ = socket.gethostbyaddr(str(ip_address))
            return hostname
        except (socket.herror, socket.gaierror, socket.timeout, OSError):
            return ip_or_host

    finally:
        socket.setdefaulttimeout(original_timeout)

run_one_fsm_command(obj, controller_name, transition_name, target, **kwargs)

Run one FSM command on the controller

Parameters:

Name Type Description Default
controller_name str

Name of the controller

required
transition_name str

Name of the transition to run

required
obj UnifiedShellContext

Unified shell context

required
target str

Target to run the command on

required
**kwargs

Arguments to the command

{}

Returns:

Type Description
None

None

Raises:

Type Description
ArgumentException

If there is an issue with the arguments

ServerTimeout

If the server times out

Source code in drunc/controller/interface/shell_utils.py
def run_one_fsm_command(
    obj: UnifiedShellContext,
    controller_name: str,
    transition_name: str,
    target: str,
    **kwargs,
) -> None:
    """
    Run one FSM command on the controller

    Args:
        controller_name (str): Name of the controller
        transition_name (str): Name of the transition to run
        obj (UnifiedShellContext): Unified shell context
        target (str): Target to run the command on
        **kwargs: Arguments to the command

    Returns:
        None

    Raises:
        ArgumentException: If there is an issue with the arguments
        ServerTimeout: If the server times out
    """
    log.info(
        f"Running transition '{transition_name}' on controller '{controller_name}', targeting: '{target if target else controller_name}'"
    )

    # If running in batch or semibatch mode, and error is detected, exit
    if (
        obj.running_mode in [UnifiedShellMode.BATCH, UnifiedShellMode.SEMIBATCH]
        and obj.get_driver("controller").status().status.in_error
    ):
        obj.get_driver("controller").status()
        log.error(
            "Running in batch mode, and because error state is detected, exiting."
        )
        sys.exit(1)

    execute_along_path = False
    execute_on_all_subsequent_children_in_path = True

    execute_on_root_controller = False
    if target == "":
        execute_on_root_controller = True
    elif target == controller_name:
        execute_on_root_controller = True

    if execute_on_root_controller:
        fsm_description = (
            obj.get_driver("controller")
            .describe_fsm(
                target=controller_name,
                execute_along_path=True,
                execute_on_all_subsequent_children_in_path=False,
            )
            .description
        )

        command_desc = search_fsm_command(transition_name, fsm_description.commands)

        if command_desc is None:
            log.error(
                f'Command "{transition_name}" does not exist, or is not accessible right now'
            )
            return
    else:

        class DummyCommand:
            pass

        command_desc = DummyCommand()
        command_desc.arguments = []

    try:
        formated_args = validate_and_format_fsm_arguments(
            kwargs, command_desc.arguments
        )
        data = FSMCommand(
            command_name=transition_name,
            arguments=formated_args,
        )

        timeout = 60
        time_start = time.time()
        result = None

        with ThreadPoolExecutor() as executor:
            future = executor.submit(
                obj.get_driver("controller").execute_fsm_command,
                command=data,
                target=target,
                execute_along_path=execute_along_path,
                execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
                timeout=timeout,
            )

            with StatusTableUpdater(obj) as updater:
                task = updater.add_task(
                    f"Waiting for [yellow]{transition_name}[/yellow] to complete...",
                    total=timeout,
                )
                while time.time() - time_start < timeout and not future.done():
                    updater.update(task, completed=time.time() - time_start)
                    updater.update_table()
                    time.sleep(0.5)

                updater.update_table()

            result = future.result(timeout=1)

    except ArgumentException as ae:
        log.exception(
            str(ae)
        )  # TODO: Manually raise exception, see if the str declaration is needed with rich handling
        return
    except ServerTimeout as e:
        log.error(e)
        log.error(
            "The command timed out, unfortunately this means the server is in undefined state, and [red]your best option at this stage is to [bold]terminate[/bold] and [bold]boot[/bold][/]."
        )
        log.error(
            "Alternatively, if you are patient, you can try to wait a bit longer and send [yellow]'status'[/yellow] to check if the command ends up being executed (you may want to check the logs of the controller and application with the [yellow]'logs'[/yellow] command)."
        )
        return

    if not result:
        return

    t = Table(title=f"{transition_name} execution report")
    t.add_column("Name")
    t.add_column("Command execution")
    t.add_column("FSM transition")

    def bool_to_success(flag_message, message_type):
        flag = message_type.Name(flag_message).replace("_", " ").title()
        success = False

        if (
            message_type == FSMResponseFlag
            and flag_message == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY
        ):
            success = True
        if (
            message_type == ResponseFlag
            and flag_message == ResponseFlag.EXECUTED_SUCCESSFULLY
        ):
            success = True

        return f"[dark_green]{flag}[/]" if success else f"[red]{flag}[/]"

    def add_to_table(table, response, prefix=""):
        executed_command = response.flag == ResponseFlag.EXECUTED_SUCCESSFULLY

        table.add_row(
            prefix + response.name,
            bool_to_success(response.flag, message_type=ResponseFlag),
            (
                bool_to_success(response.fsm_flag, message_type=FSMResponseFlag)
                if executed_command
                else "[red]NA[/]"
            ),
        )
        for child_response in sorted(response.children, key=lambda c: c.name):
            add_to_table(table, child_response, "  " + prefix)

    add_to_table(t, result)
    obj.print(t)  # rich tables require console printing

    statuses = obj.get_driver("controller").status()
    descriptions = obj.get_driver("controller").describe()
    t = get_status_table(statuses, descriptions)
    obj.print(t)
    obj.print_status_summary()

validate_and_format_fsm_arguments(arguments, command_arguments)

Validates and formats the arguments passed to an FSM command based on the command's argument descriptions.

Parameters:

Name Type Description Default
arguments dict

A dictionary of argument names and their values passed to the command.

required
command_arguments list

A list of Argument descriptions for the command.

required

Returns:

Name Type Description
dict dict[str, int | bool | str | float | None]

A dictionary of argument names and their formatted values, ready to be sent to the controller.

Raises:

Type Description
ArgumentException

If there is an issue with the arguments (missing, duplicate, invalid type, or unhandled type)

Source code in drunc/controller/interface/shell_utils.py
def validate_and_format_fsm_arguments(
    arguments: dict[str, int | bool | str | float | None] | None,
    command_arguments: list[Argument],
) -> dict[str, int | bool | str | float | None]:
    """
    Validates and formats the arguments passed to an FSM command based on the command's
    argument descriptions.

    Args:
        arguments (dict): A dictionary of argument names and their values passed to the command.
        command_arguments (list): A list of Argument descriptions for the command.

    Returns:
        dict: A dictionary of argument names and their formatted values, ready to be sent to the controller.

    Raises:
        ArgumentException: If there is an issue with the arguments (missing, duplicate, invalid type, or unhandled type)
    """
    # If the argument dict is empty, don't bother trying to read it
    if not arguments:
        return {}

    # Define the output dict that will be sent to the controller, with argument names
    # and their formatted values
    out_dict: dict[str, any_pb2] = {}

    # Strip out any arguments that are None, as they are considered not passed, and will
    # be set to default values if they exist, or raise an error if they are mandatory
    # without default value
    arguments: dict[str, int | bool | str | float] = {
        k: v for k, v in arguments.items() if v is not None
    }

    # Iterate over the command's argument descriptions, validate the passed arguments,
    # and format them to be sent to the controller
    for argument_desc in command_arguments:  #  type: Argument
        aname: str = argument_desc.name
        atype: str = Argument.Type.Name(argument_desc.type)
        adefa: str | int | float | bool | None = argument_desc.default_value

        # Check for duplicate arguments
        if aname in out_dict:
            raise DuplicateArgument(aname)

        # Check for missing mandatory arguments
        if (
            argument_desc.presence == Argument.Presence.MANDATORY
            and aname not in arguments
        ):
            raise MissingArgument(aname, atype)

        # If the argument is not passed, and it has a default value, use the default value
        value: str | int | float | bool | None = arguments.get(aname)
        if value is None:
            out_dict[aname] = adefa
            continue

        # Convert the argument value to the appropriate type based on the argument
        # description, and format it to be sent to the controller
        match argument_desc.type:
            case Argument.Type.INT:
                try:
                    value = int(value)
                except Exception as e:
                    raise InvalidArgumentType(aname, value, atype) from e
                value = int_msg(value=value)
            case Argument.Type.FLOAT:
                try:
                    value = float(value)
                except Exception as e:
                    raise InvalidArgumentType(aname, value, atype) from e
                value = float_msg(value=value)
            case Argument.Type.STRING:
                value = string_msg(value=value)
            case Argument.Type.BOOL:
                bvalue = value  # .lower() in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup', 'certainly']
                try:
                    value = bool_msg(value=bvalue)
                except Exception as e:
                    raise InvalidArgumentType(aname, value, atype) from e
            case _:
                try:
                    pretty_type = Argument.Type.Name(argument_desc.type)
                except:
                    pretty_type = argument_desc.type
                raise UnhandledArgumentType(argument_desc.name, pretty_type)
        out_dict[aname] = pack_to_any(value)

    return out_dict