Skip to content

shell_utils

drunc.controller.interface.shell_utils

Functions

get_hostname_smart(ip_address, timeout_seconds=0.2) cached

Resolves an IP to a hostname, with optimizations: 1. Caches all results. 2. Immediately skips private/internal IPs (like K8s). 3. Uses a short timeout for public IPs.

Source code in drunc/controller/interface/shell_utils.py
@lru_cache(maxsize=4096)
def get_hostname_smart(ip_address: str, timeout_seconds: float = 0.2) -> str:
    """
    Resolves an IP to a hostname, with optimizations:
    1. Caches all results.
    2. Immediately skips private/internal IPs (like K8s).
    3. Uses a short timeout for public IPs.
    """

    # If private IP (k8s), don't try to resolve it
    if is_private_ip(ip_address):
        return ip_address

    # If public IP, try to resolve it.
    original_timeout = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(timeout_seconds)

        hostname, _, _ = socket.gethostbyaddr(ip_address)
        return hostname

    except (socket.herror, socket.gaierror, socket.timeout):
        return ip_address

    finally:
        socket.setdefaulttimeout(original_timeout)

is_private_ip(ip_str) cached

Checks if an IP address is private (RFC 1918), loopback, or link-local. These IPs will almost never have a public reverse DNS record.

Source code in drunc/controller/interface/shell_utils.py
@lru_cache(maxsize=1024)
def is_private_ip(ip_str: str) -> bool:
    """
    Checks if an IP address is private (RFC 1918), loopback, or link-local.
    These IPs will almost never have a public reverse DNS record.
    """
    if not ip_str:
        return True
    try:
        ip_obj = ipaddress.ip_address(ip_str)
        # .is_private = 10.x, 172.16-31.x, 192.168.x
        # .is_loopback = 127.x.x.x
        # .is_link_local = 169.254.x.x
        return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local
    except ValueError:
        # Not 'valid' IP address -> treat as private
        return True

run_one_fsm_command(controller_name, transition_name, obj, target, **kwargs)

Run one FSM command on the controller

Parameters:

Name Type Description Default
controller_name str

Name of the controller

required
transition_name str

Name of the transition to run

required
obj UnifiedShellContext

Unified shell context

required
target str

Target to run the command on

required
**kwargs

Arguments to the command

{}

Returns:

Type Description
None

None

Raises:

Type Description
ArgumentException

If there is an issue with the arguments

ServerTimeout

If the server times out

Source code in drunc/controller/interface/shell_utils.py
def run_one_fsm_command(
    controller_name: str,
    transition_name: str,
    obj: UnifiedShellContext,
    target: str,
    **kwargs,
) -> None:
    """
    Run one FSM command on the controller

    Args:
        controller_name (str): Name of the controller
        transition_name (str): Name of the transition to run
        obj (UnifiedShellContext): Unified shell context
        target (str): Target to run the command on
        **kwargs: Arguments to the command

    Returns:
        None

    Raises:
        ArgumentException: If there is an issue with the arguments
        ServerTimeout: If the server times out
    """
    log.info(
        f"Running transition '{transition_name}' on controller '{controller_name}', targeting: '{target if target else controller_name}'"
    )

    # If running in batch or semibatch mode, and error is detected, exit
    if (
        obj.running_mode in [UnifiedShellMode.BATCH, UnifiedShellMode.SEMIBATCH]
        and obj.get_driver("controller").status().status.in_error
    ):
        obj.get_driver("controller").status()
        log.error(
            "Running in batch mode, and because error state is detected, exiting."
        )
        sys.exit(1)

    execute_along_path = False
    execute_on_all_subsequent_children_in_path = True

    execute_on_root_controller = False
    if target == "":
        execute_on_root_controller = True
    elif target == controller_name:
        execute_on_root_controller = True

    if execute_on_root_controller:
        fsm_description = (
            obj.get_driver("controller")
            .describe_fsm(
                target=controller_name,
                execute_along_path=True,
                execute_on_all_subsequent_children_in_path=False,
            )
            .description
        )

        command_desc = search_fsm_command(transition_name, fsm_description.commands)

        if command_desc is None:
            log.error(
                f'Command "{transition_name}" does not exist, or is not accessible right now'
            )
            return
    else:

        class DummyCommand:
            pass

        command_desc = DummyCommand()
        command_desc.arguments = []

    try:
        formated_args = validate_and_format_fsm_arguments(
            kwargs, command_desc.arguments
        )
        data = FSMCommand(
            command_name=transition_name,
            arguments=formated_args,
        )

        timeout = 60
        time_start = time.time()
        result = None

        with ThreadPoolExecutor() as executor:
            future = executor.submit(
                obj.get_driver("controller").execute_fsm_command,
                command=data,
                target=target,
                execute_along_path=execute_along_path,
                execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
                timeout=timeout,
            )

            with StatusTableUpdater(obj) as updater:
                task = updater.add_task(
                    f"Waiting for [yellow]{transition_name}[/yellow] to complete...",
                    total=timeout,
                )
                while time.time() - time_start < timeout and not future.done():
                    updater.update(task, completed=time.time() - time_start)
                    updater.update_table()
                    time.sleep(0.5)

                updater.update_table()

            result = future.result(timeout=1)

    except ArgumentException as ae:
        log.exception(
            str(ae)
        )  # TODO: Manually raise exception, see if the str declaration is needed with rich handling
        return
    except ServerTimeout as e:
        log.error(e)
        log.error(
            "The command timed out, unfortunately this means the server is in undefined state, and [red]your best option at this stage is to [bold]terminate[/bold] and [bold]boot[/bold][/]."
        )
        log.error(
            "Alternatively, if you are patient, you can try to wait a bit longer and send [yellow]'status'[/yellow] to check if the command ends up being executed (you may want to check the logs of the controller and application with the [yellow]'logs'[/yellow] command)."
        )
        return

    if not result:
        return

    t = Table(title=f"{transition_name} execution report")
    t.add_column("Name")
    t.add_column("Command execution")
    t.add_column("FSM transition")

    def bool_to_success(flag_message, message_type):
        flag = message_type.Name(flag_message).replace("_", " ").title()
        success = False

        if (
            message_type == FSMResponseFlag
            and flag_message == FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY
        ):
            success = True
        if (
            message_type == ResponseFlag
            and flag_message == ResponseFlag.EXECUTED_SUCCESSFULLY
        ):
            success = True

        return f"[dark_green]{flag}[/]" if success else f"[red]{flag}[/]"

    def add_to_table(table, response, prefix=""):
        executed_command = response.flag == ResponseFlag.EXECUTED_SUCCESSFULLY

        table.add_row(
            prefix + response.name,
            bool_to_success(response.flag, message_type=ResponseFlag),
            (
                bool_to_success(response.fsm_flag, message_type=FSMResponseFlag)
                if executed_command
                else "[red]NA[/]"
            ),
        )
        for child_response in response.children:
            add_to_table(table, child_response, "  " + prefix)

    add_to_table(t, result)
    obj.print(t)  # rich tables require console printing

    statuses = obj.get_driver("controller").status()
    descriptions = obj.get_driver("controller").describe()
    t = get_status_table(statuses, descriptions)
    obj.print(t)
    obj.print_status_summary()