Skip to content

shell

drunc.unified_shell.shell

Classes

Functions

unified_shell(ctx, process_manager, configuration_file, configuration_id, session_name, log_level, override_logs, log_path, safe_mode)

The unified shell is a command line interface to interact with the process manager and the controller. It can start a process manager if a configuration file is provided, or connect to an existing one if a gRPC address is provided. It also starts a controller interface to interact with the controller of the DAQ system.

Parameters:

Name Type Description Default
ctx Context

Click context object.

required
process_manager str

The process manager configuration file or gRPC address.

required
configuration_file str

The configuration file to use.

required
configuration_id str

Configuration ID to use, as defined in the configuration file.

required
session_name str

The session name to use.

required
log_level str

The log level to use.

required
override_logs bool

Whether to override logs or not.

required
log_path str

The log path to use.

required
safe_mode bool

Whether to enable safe mode or not.

required

Returns:

Type Description
None

None

Raises:

Type Description
DruncSetupException

If the process manager does not start in time.

ServerUnreachable

If the process manager is not reachable.

SystemExit

If the process manager configuration is invalid or if the connection to the process manager fails.

Source code in drunc/unified_shell/shell.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
@click_shell.shell(
    prompt="drunc-unified-shell > ",
    chain=True,
    hist_file=os.path.expanduser("~") + "/.drunc-unified-shell.history",
)
@click.option(
    "-l",
    "--log-level",
    type=click.Choice(logging_log_levels.keys(), case_sensitive=False),
    default="INFO",
    help="Set the log level",
)
@click.argument("process-manager", type=str, nargs=1)
@click.argument("configuration-file", type=str, nargs=1)
@click.argument("configuration-id", type=str, nargs=1)
@click.argument("session-name", type=str, nargs=1)
@click.option(
    "-o/-no",
    "--override-logs/--no-override-logs",
    type=bool,
    default=True,
    help="Override logs, if --no-override-logs filenames have the timestamp of the run.",
)  # For production, change default to false/remove it
@click.option(
    "-lp",
    "--log-path",
    type=str,
    default=None,
    help="Log path of process_manager logs.",
)
@click.option(
    "-s",
    "--safe-mode",
    is_flag=True,
    default=False,
    help=(
        "Enable safe mode. This will force attempting to return the state to initial "
        "before running terminate. This is not particularly useful for testing but "
        "will be useful for hardware operations."
    ),
)  # For production, change default to true/remove it
@click.pass_context
def unified_shell(
    ctx: click.core.Context,
    process_manager: str,
    configuration_file: str,
    configuration_id: str,
    session_name: str,
    log_level: str,
    override_logs: bool,
    log_path: str,
    safe_mode: bool,
) -> None:
    """
    The unified shell is a command line interface to interact with the process manager
    and the controller. It can start a process manager if a configuration file is
    provided, or connect to an existing one if a gRPC address is provided. It also
    starts a controller interface to interact with the controller of the DAQ system.

    Args:
        ctx: Click context object.
        process_manager: The process manager configuration file or gRPC address.
        configuration_file: The configuration file to use.
        configuration_id: Configuration ID to use, as defined in the configuration file.
        session_name: The session name to use.
        log_level: The log level to use.
        override_logs: Whether to override logs or not.
        log_path: The log path to use.
        safe_mode: Whether to enable safe mode or not.

    Returns:
        None

    Raises:
        DruncSetupException: If the process manager does not start in time.
        ServerUnreachable: If the process manager is not reachable.
        SystemExit: If the process manager configuration is invalid or if the
            connection to the process manager fails.
    """
    # Set up the drunc and unified_shell loggers
    get_root_logger(log_level)
    unified_shell_log = get_logger("unified_shell", rich_handler=True)
    unified_shell_log.debug("Setting up the [green]unified_shell[/green] logger")

    # Parse the process manager argument to determine if it's a config or an address
    process_manager_url: ParseResult = urlparse(process_manager)
    internal_pm: bool = True
    if process_manager_url.scheme == "grpc":  # i.e. if it's an address
        internal_pm = False

    # If using a k8s process manager, validate the session name before proceeding
    if get_pm_type_from_name(
        process_manager
    ) == ProcessManagerTypes.K8s and not validate_k8s_session_name(session_name):
        unified_shell_log.error(
            f"[red]Invalid session/namespace name [bold]({session_name})[/bold][/red]. "
            "Must match RFC1123 label: lowercase alphanumeric or '-', start/end with "
            "alphanumeric, max 63 chars."
        )
        sys.exit(1)

    # Setup configuration related context variables
    ctx.obj.configuration_file = f"oksconflibs:{configuration_file}"
    ctx.obj.configuration_id = configuration_id
    ctx.obj.session_name = session_name

    # Get the session DAL
    db = conffwk.Configuration(ctx.obj.configuration_file)
    session_dal = db.get_dal(class_name="Session", uid=ctx.obj.configuration_id)
    app_log_path = session_dal.log_path

    unified_shell_log.info(
        f"[green]Setting up to use the process manager[/green] with configuration "
        f"[green]{process_manager}[/green] and configuration id [green]"
        f'"{configuration_id}"[/green] from [green]{ctx.obj.configuration_file}[/green]'
    )

    # Establish communication with the process manager, spawning it if needed
    if internal_pm:  # Spawn the Process Manager
        unified_shell_log.debug(
            f"Spawning process_manager with configuration {process_manager}"
        )
        # Check if process_manager is a packaged config
        process_manager_conf_file = get_process_manager_configuration(process_manager)

        # Validate the process manager configuration before starting it
        if not validate_pm_config(process_manager_conf_file):
            unified_shell_log.error(
                "Process manager configuration validation failed. Exiting."
            )
            sys.exit(1)

        # Start the process manager as a separate process
        unified_shell_log.info("Starting process manager")
        ready_event = mp.Event()
        port = mp.Value("i", 0)

        unified_shell_log.debug("[green]Process manager[/green] starting")
        ctx.obj.pm_process = mp.Process(
            target=run_pm,
            kwargs={
                "pm_conf": process_manager_conf_file,
                "pm_address": "localhost:0",
                "override_logs": override_logs,
                "log_level": log_level,
                "log_path": app_log_path,
                "ready_event": ready_event,
                "signal_handler": ignore_sigint_sighandler,
                # sigint gets sent to the PM, so we need to ignore it, otherwise everytime the user ctrl-c on the shell, the PM goes down
                "generated_port": port,
            },
        )
        ctx.obj.pm_process.start()
        unified_shell_log.debug("[green]Process manager[/green] started")

        # Check if the process manager started correctly
        process_started = False
        for _ in range(100):  # 10s timeout
            if ready_event.is_set():
                process_started = True
                break

            if not ctx.obj.pm_process.is_alive():
                exit_code = ctx.obj.pm_process.exitcode
                unified_shell_log.error(
                    f"[red]Process manager process died unexpectedly with exit code {exit_code}."
                )
                unified_shell_log.error(
                    "[red]This is likely a configuration error (e.g., bad kube-config)."
                )
                unified_shell_log.error(
                    "[red]Please check the full traceback in the terminal above this message.[/red]"
                )
                sys.exit(exit_code if exit_code else 1)
            sleep(0.1)

        if not process_started:
            # This message will only show if the process is *alive* but never sent the "ready" signal
            raise DruncSetupException(
                "[red]Process manager timed out starting. Check logs for details.[/red]"
            )

        # Setup the process manager address
        process_manager_address = resolve_localhost_and_127_ip_to_network_ip(
            f"localhost:{port.value}"
        )

    else:  # Connect to an existing process manager at the provided address
        process_manager_address = process_manager.replace(
            "grpc://", ""
        )  # remove the grpc scheme
        unified_shell_log.info(
            f"[green]unified_shell[/green] connected to the [green]process_manager"
            f"[/green] at address [green]{process_manager_address}[/green]"
        )

    unified_shell_log.debug(
        f"[green]process_manager[/green] started, communicating through address [green]"
        f"{process_manager_address}[/green]"
    )
    ctx.obj.reset(address_pm=process_manager_address)

    # Run a simple command (describe) to check the connection with the process manager
    desc: Description | None = None
    try:
        desc = ctx.obj.get_driver().describe()
    except Exception as e:
        unified_shell_log.error(
            f"[red]Could not connect to the process manager at the address: [/red]"
            f"[green]{process_manager_address}[/green]"
        )
        unified_shell_log.debug(f"Reason: {e}")

        if type(e) == ServerUnreachable:
            unified_shell_log.error(
                "[red]This can happen if you have the webproxy enabled at CERN. Ensure "
                "http_proxy, https_proxy, no_proxy, and equivalent aren't set. [/red]"
            )

        if internal_pm and not ctx.obj.pm_process.is_alive():
            unified_shell_log.error(
                f"[red]The process_manager is dead[/red], exit code "
                f"{ctx.obj.pm_process.exitcode}"
            )

        if ctx.obj.pm_process.is_alive():
            ctx.obj.pm_process.terminate()
            ctx.obj.pm_process.join()

        sys.exit(1)

    # Broadcasting configuration if requested
    if desc.HasField("broadcast"):
        unified_shell_log.debug("Broadcasting")
        ctx.obj.start_listening_pm(
            broadcaster_conf=desc.broadcast,
        )

    # Add the unified shell Click commands to the CLI
    unified_shell_log.debug("Adding [green]unified_shell[/green] commands")
    ctx.command.add_command(boot, "boot")
    ctx.obj.dynamic_commands.add("boot")

    # Add the process manager Click commands to the CLI
    unified_shell_log.debug("Adding [green]process_manager[/green] commands")
    process_manager_commands: list[click.Command] = [
        kill,
        terminate,
        flush,
        logs,
        restart,
        ps,
    ]
    for cmd in process_manager_commands:
        ctx.command.add_command(cmd, format_name_for_cli(cmd.name))
        ctx.obj.dynamic_commands.add(format_name_for_cli(cmd.name))

    # Get all the controller commands by instantiating the stateful node defined in the
    # configuration and getting the FSM transitions from it.
    unified_shell_log.debug("Defining the pseudo controller to get its FSM commands")
    controller_name = session_dal.segment.controller.id
    controller_configuration = ControllerConfHandler(
        type=ConfTypes.OKSFileName,
        data=ctx.obj.configuration_file,
        oks_key=OKSKey(
            schema_file="schema/confmodel/dunedaq.schema.xml",
            class_name="RCApplication",
            obj_uid=controller_name,
            session=ctx.obj.configuration_id,
        ),
        session_name=ctx.obj.session_name,
    )
    # Avoid setting up the ELISA logbook for the unified shell
    os.environ["DUNEDAQ_ELISA_LOGBOOK_APPARATUS"] = "unified_shell"

    unified_shell_log.debug("Initializing the [green]FSM[/green]")
    fsmch = FSMConfHandler(data=controller_configuration.data.controller.fsm)

    unified_shell_log.debug("Initializing the [green]StatefulNode[/green]")
    stateful_node = StatefulNode(fsm_configuration=fsmch, top_segment_controller=False)

    unified_shell_log.debug("Retrieving the [green]StatefulNode[/green] transitions")
    transitions = convert_fsm_transition(stateful_node.get_all_fsm_transitions())

    # Add the FSM transitions and sequences as Click commands to the CLI
    unified_shell_log.debug(
        "Adding [green]controller[/green] commands to the click context"
    )
    for transition in transitions.commands:
        ctx.command.add_command(
            *generate_fsm_command(ctx.obj, transition, controller_name)
        )
        ctx.obj.dynamic_commands.add(format_name_for_cli(transition.name))
    for sequence in session_dal.segment.controller.fsm.command_sequences:
        ctx.command.add_command(
            *generate_fsm_sequence_command(ctx, sequence, controller_name)
        )
        ctx.obj.dynamic_commands.add(format_name_for_cli(sequence.id))

    # Add the controller Click commands to the CLI
    controller_commands: list[click.Command] = [
        status,
        recompute_status,
        connect,
        disconnect,
        take_control,
        surrender_control,
        who_am_i,
        who_is_in_charge,
        include,
        exclude,
        wait,
        expert_command,
        to_error,
    ]
    for cmd in controller_commands:
        ctx.command.add_command(cmd, format_name_for_cli(cmd.name))
        ctx.obj.dynamic_commands.add(format_name_for_cli(cmd.name))

    # If any of the commands is in the click commands, set batch mode
    if any([arg in ctx.obj.dynamic_commands for arg in sys.argv]):
        ctx.obj.running_mode = UnifiedShellMode.BATCH
        ctx.command.add_command(start_shell, "start-shell")
        ctx.obj.dynamic_commands.add("start-shell")

    # If start-shell is in the arguments, set semibatch mode
    if "start-shell" in sys.argv:
        ctx.obj.running_mode = UnifiedShellMode.SEMIBATCH

    def cleanup():
        """
        Cleanup function to be called on exit.

        This function handles the termination of processes, retraction from the
        connectivity service, and logging shutdown.
        """
        unified_shell_log.info("[green]Shutting down the unified_shell[/green]")

        # Attempt a stateful shutdown of the controller if possible, returning to
        # initial state before terminating
        if ctx.obj.get_driver("controller", quiet_fail=True):
            try:
                if ctx.obj.get_driver("controller").status().status.in_error:
                    unified_shell_log.warning(
                        "Controller is in error, cannot gracefully shutdown"
                    )
                else:
                    # Safe mode crucial for use with hardware
                    if safe_mode:
                        try:
                            unified_shell_log.info(
                                "Attempting graceful shutdown of the controller"
                            )
                            stop_run_cmd = ctx.command.commands.get("stop-run")
                            scrap_cmd = ctx.command.commands.get("scrap")
                            if stop_run_cmd is not None:
                                ctx.invoke(stop_run_cmd)
                            else:
                                unified_shell_log.warning(
                                    "Command 'stop-run' not found; skipping graceful "
                                    "shutdown step."
                                )
                            if scrap_cmd is not None:
                                ctx.invoke(scrap_cmd)
                            else:
                                unified_shell_log.warning(
                                    "Command 'scrap' not found; skipping graceful "
                                    "shutdown step."
                                )
                            unified_shell_log.info("Controller shutdown gracefully")
                        except Exception as e:
                            unified_shell_log.error(
                                f"Could not shutdown the controller gracefully, reason: {e}"
                            )
            except Exception as e:
                unified_shell_log.error(
                    f"Could not retrieve the controller status, reason: {e}"
                )
            ctx.obj.delete_driver("controller")

        # Terminate any residual processes
        if ctx.obj.get_driver("process_manager"):
            ctx.obj.get_driver("process_manager").terminate()

        # Check if any processes are still running
        if (
            len(
                ctx.obj.get_driver("process_manager")
                .ps(ProcessQuery(user=getpass.getuser(), session=ctx.obj.session_name))
                .values
            )
            > 0
        ):
            unified_shell_log.error(
                "Some processes are still running, you might want to check them"
            )

        # Retract the session from the connectivity service if using a standalone
        # (microservice) connectivity service instead of a local process.
        # Needed as daq_applications continue to publish during terminate. With a large
        # number of daq_applications, sending terminate will take some time to get to
        # all the processes. During this time the non-terminated processes will continue
        # to publish to the connectivity service, and this stale information needs to be
        # cleaned up by retracting the session (again).
        if session_dal.connectivity_service.host != "localhost":
            connectivity_service_address: str = (
                f"{session_dal.connectivity_service.host}:"
                f"{session_dal.connectivity_service.service.port}"
            )
            csc = ConnectivityServiceClient(
                ctx.obj.session_name, connectivity_service_address
            )
            try:
                csc.retract_partition(fail_quickly=True, fail_quietly=True)
                unified_shell_log.debug(
                    "Session retracted from the connectivity service"
                )
            except Exception as e:
                unified_shell_log.error(
                    f"Could not retract the session from the connectivity service: {e}"
                )

        # Remove the connection to the process manager
        ctx.obj.get_driver("process_manager").close()
        ctx.obj.delete_driver("process_manager")

        # Remove the process manager
        if internal_pm:
            ctx.obj.pm_process.terminate()  # Send a SIGTERM to the pm_process
            ctx.obj.pm_process.join(timeout=2)  # Block continuing execution for 2s
            if ctx.obj.pm_process.is_alive():
                unified_shell_log.warning(
                    "Process manager did not exit in time, terminating forcefully."
                )
                ctx.obj.pm_process.kill()  # Send a SIGKILL
                ctx.obj.pm_process.join()  # Block until the process is dead
            unified_shell_log.debug("Process manager terminated")

        unified_shell_log.info("[green]unified_shell exited successfully[/green]")
        logging.shutdown()  # Shutdown logging
        ctx.obj.terminate()  # Terminate the broadcasters in the context
        ctx.exit()  # Close the click context

    ctx.call_on_close(cleanup)

    # Handle SIGTERM to gracefully shutdown the unified_shell
    def signal_sigterm_handler(signum: int, frame: types.FrameType) -> None:
        """
        Handle the SIGTERM signal to gracefully shut down the unified_shell.

        Args:
            signum: The signal number.
            frame: The current stack frame (not used).
        """
        unified_shell_log.info(
            "[red]SIGTERM received, shutting down unified_shell...[/red]"
        )
        ctx.exit()
        return

    # Register the SIGTERM handler to gracefully shut down the server
    signal.signal(signal.SIGTERM, signal_sigterm_handler)

    unified_shell_log.info(
        "[green]unified_shell[/green] ready with [green]process_manager[/green] and [green]controller[/green] commands"
    )