Line data Source code
1 : /**
2 : * @file TriggerInhibitAgent.cpp TriggerInhibitAgent class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "dfmodules/TriggerInhibitAgent.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "logging/Logging.hpp"
13 :
14 : #include <memory>
15 : #include <string>
16 : #include <utility>
17 :
18 : /**
19 : * @brief Name used by TRACE TLOG calls from this source file
20 : */
21 : #define TRACE_NAME "TriggerInhibitAgent" // NOLINT
22 : enum
23 : {
24 : TLVL_ENTER_EXIT_METHODS = 5,
25 : TLVL_WORK_STEPS = 10
26 : };
27 :
28 : namespace dunedaq {
29 : namespace dfmodules {
30 :
31 0 : TriggerInhibitAgent::TriggerInhibitAgent(const std::string& parent_name,
32 : std::shared_ptr<trigdecreceiver_t> our_input,
33 0 : std::shared_ptr<triginhsender_t> our_output)
34 0 : : NamedObject(parent_name + "::TriggerInhibitAgent")
35 0 : , m_thread(std::bind(&TriggerInhibitAgent::do_work, this, std::placeholders::_1))
36 0 : , m_queue_timeout(100)
37 0 : , m_threshold_for_inhibit(1)
38 0 : , m_trigger_decision_receiver(our_input)
39 0 : , m_trigger_inhibit_sender(our_output)
40 0 : , m_trigger_number_at_start_of_processing_chain(0)
41 0 : , m_trigger_number_at_end_of_processing_chain(0)
42 0 : {}
43 :
44 : void
45 0 : TriggerInhibitAgent::start_checking()
46 : {
47 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering start_checking() method";
48 0 : m_thread.start_working_thread();
49 0 : TLOG() << get_name() << " successfully started";
50 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting start_checking() method";
51 0 : }
52 :
53 : void
54 0 : TriggerInhibitAgent::stop_checking()
55 : {
56 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering stop_checking() method";
57 0 : m_thread.stop_working_thread();
58 0 : TLOG() << get_name() << " successfully stopped";
59 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting stop_checking() method";
60 0 : }
61 :
62 : void
63 0 : TriggerInhibitAgent::do_work(std::atomic<bool>& running_flag)
64 : {
65 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
66 :
67 : // configuration (hard-coded, for now; will be input from calling code later)
68 0 : int fake_busy_interval_sec = 0;
69 0 : std::chrono::seconds chrono_fake_busy_interval(fake_busy_interval_sec);
70 0 : int fake_busy_duration_sec = 0;
71 0 : std::chrono::seconds chrono_fake_busy_duration(fake_busy_duration_sec);
72 0 : int min_interval_between_inhibit_messages_msec = 0;
73 0 : std::chrono::milliseconds chrono_min_interval_between_inhibit_messages(min_interval_between_inhibit_messages_msec);
74 :
75 : // initialization
76 0 : enum LocalState
77 : {
78 : no_update,
79 : free_state,
80 : busy_state
81 : };
82 0 : std::chrono::steady_clock::time_point current_time = std::chrono::steady_clock::now();
83 : // std::chrono::steady_clock::time_point start_time_of_latest_fake_busy = current_time - chrono_fake_busy_duration;
84 0 : std::chrono::steady_clock::time_point last_sent_time = current_time;
85 0 : LocalState requested_state = no_update;
86 0 : LocalState current_state = free_state;
87 0 : int32_t received_message_count = 0;
88 0 : int32_t sent_message_count = 0;
89 :
90 : // work loop
91 0 : while (running_flag.load()) {
92 :
93 : // check if a TriggerDecision message has arrived, and save the trigger
94 : // number contained within it, if one has arrived
95 0 : try {
96 0 : dfmessages::TriggerDecision trig_dec = m_trigger_decision_receiver->receive(m_queue_timeout);
97 0 : ++received_message_count;
98 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Popped the TriggerDecision for trigger number "
99 0 : << trig_dec.trigger_number << " off the input queue";
100 0 : m_trigger_number_at_start_of_processing_chain.store(trig_dec.trigger_number);
101 0 : } catch (const iomanager::TimeoutExpired& excpt) {
102 : // it is perfectly reasonable that there will be no data in the queue some
103 : // fraction of the times that we check, so we just continue on and try again later
104 0 : }
105 :
106 : // to-do: add some logic to fake inhibits
107 :
108 : // check if A) we are supposed to be checking the trigger_number difference, and
109 : // B) if so, whether an Inhibit should be asserted or cleared
110 0 : uint32_t threshold = m_threshold_for_inhibit.load(); // NOLINT
111 0 : if (threshold > 0) {
112 0 : daqdataformats::trigger_number_t temp_trig_num_at_start = m_trigger_number_at_start_of_processing_chain.load();
113 0 : daqdataformats::trigger_number_t temp_trig_num_at_end = m_trigger_number_at_end_of_processing_chain.load();
114 0 : if (temp_trig_num_at_start >= temp_trig_num_at_end &&
115 0 : (temp_trig_num_at_start - temp_trig_num_at_end) >= threshold) {
116 0 : if (current_state == free_state) {
117 0 : requested_state = busy_state;
118 : }
119 : } else {
120 0 : if (current_state == busy_state) {
121 0 : requested_state = free_state;
122 : }
123 : }
124 : }
125 :
126 : // to-do: add some logic to periodically send a message even if nothing has changed
127 :
128 : // send an Inhibit messages, if needed (either Busy or Free state)
129 0 : if (requested_state != no_update && requested_state != current_state) {
130 0 : if ((std::chrono::steady_clock::now() - last_sent_time) >= chrono_min_interval_between_inhibit_messages) {
131 0 : dfmessages::TriggerInhibit inhibit_message;
132 0 : if (requested_state == busy_state) {
133 0 : inhibit_message.busy = true;
134 : } else {
135 : inhibit_message.busy = false;
136 : }
137 :
138 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing a TriggerInhibit message with busy state set to "
139 0 : << inhibit_message.busy << " onto the output queue";
140 0 : try {
141 0 : m_trigger_inhibit_sender->send(std::move(inhibit_message), m_queue_timeout);
142 0 : ++sent_message_count;
143 : #if 0
144 : // temporary logging
145 : std::ostringstream oss_sent;
146 : oss_sent << ": Successfully pushed a TriggerInhibit message with busy state set to " << inhibit_message.busy
147 : << " onto the output queue";
148 : TLOG() << ProgressUpdate(ERS_HERE, get_name(), oss_sent.str());
149 : #endif
150 : // if we successfully pushed the message to the Sink, then we assume that the
151 : // receiver will get it, and we update our internal state accordingly
152 0 : current_state = requested_state;
153 0 : requested_state = no_update;
154 0 : last_sent_time = std::chrono::steady_clock::now();
155 0 : } catch (const iomanager::TimeoutExpired& excpt) {
156 : // It is not ideal if we fail to send the inhibit message out, but rather than
157 : // retrying some unknown number of times, we simply output a TRACE message and
158 : // go on. This has the benefit of being responsive with pulling TriggerDecision
159 : // messages off the input queue, and maybe our Busy/Free state will have changed
160 : // by the time that the receiver is ready to receive more messages.
161 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name()
162 0 : << ": TIMEOUT pushing a TriggerInhibit message onto the output queue";
163 0 : }
164 : }
165 : }
166 : }
167 :
168 0 : std::ostringstream oss_summ;
169 0 : oss_summ << ": Exiting the do_work() method, received " << received_message_count
170 0 : << " TriggerDecision messages and sent " << sent_message_count
171 0 : << " TriggerInhibit messages of all types (both Busy and Free).";
172 0 : TLOG() << ProgressUpdate(ERS_HERE, get_name(), oss_summ.str());
173 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
174 0 : }
175 :
176 : } // namespace dfmodules
177 : } // namespace dunedaq
|