Line data Source code
1 : /**
2 : * @file TRMonRequestorModule.cpp TRMonRequestorModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "TRMonRequestorModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 : #include "dfmodules/opmon/TRMonRequestorModule.pb.h"
12 :
13 : #include "appmodel/TRMonRequestorModule.hpp"
14 : #include "confmodel/Connection.hpp"
15 : #include "logging/Logging.hpp"
16 :
17 : #include "iomanager/IOManager.hpp"
18 :
19 : #include <chrono>
20 : #include <memory>
21 : #include <string>
22 : #include <thread>
23 : #include <utility>
24 : #include <vector>
25 :
26 : /**
27 : * @brief TRACE debug levels used in this source file
28 : */
29 : enum
30 : {
31 : TLVL_ENTER_EXIT_METHODS = 5,
32 : TLVL_INIT = 8,
33 : TLVL_WORK_STEPS = 10,
34 : TLVL_BOOKKEEPING = 15,
35 : TLVL_DISPATCH_DATAREQ = 21,
36 : TLVL_FRAGMENT_RECEIVE = 22
37 : };
38 :
39 : namespace dunedaq::dfmodules {
40 :
41 0 : TRMonRequestorModule::TRMonRequestorModule(const std::string& name)
42 : : dunedaq::appfwk::DAQModule(name)
43 0 : , m_working_thread(std::bind(&TRMonRequestorModule::do_work, this, std::placeholders::_1))
44 : {
45 :
46 0 : register_command("conf", &TRMonRequestorModule::do_conf);
47 0 : register_command("scrap", &TRMonRequestorModule::do_scrap);
48 0 : register_command("start", &TRMonRequestorModule::do_start);
49 0 : register_command("stop", &TRMonRequestorModule::do_stop);
50 0 : }
51 :
52 : void
53 0 : TRMonRequestorModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
54 : {
55 :
56 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
57 :
58 : //--------------------------------
59 : // Get single queues
60 : //---------------------------------
61 :
62 0 : auto mdal = mcfg->get_dal<appmodel::TRMonRequestorModule>(get_name());
63 0 : if (!mdal) {
64 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
65 : }
66 :
67 0 : auto iom = iomanager::IOManager::get();
68 0 : for (auto con : mdal->get_inputs()) {
69 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecisionToken>()) {
70 0 : m_token_receiver = iom->get_receiver<dfmessages::TriggerDecisionToken>(con->UID());
71 : }
72 : }
73 :
74 0 : if (m_token_receiver == nullptr) {
75 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Input queue");
76 : }
77 :
78 0 : for (auto con : mdal->get_outputs()) {
79 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TRMonRequest>()) {
80 0 : m_trmon_senders[con->UID()] = iom->get_sender<dfmessages::TRMonRequest>(con->UID());
81 : }
82 : }
83 :
84 0 : m_reply_connection = mdal->get_trigger_record_destination()->UID();
85 0 : m_requestor_conf = mdal->get_configuration();
86 :
87 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
88 0 : }
89 :
90 : void
91 0 : TRMonRequestorModule::generate_opmon_data()
92 : {
93 :
94 0 : opmon::TRMonRequestorInfo i;
95 :
96 : // operation metrics
97 0 : i.set_trigger_records_requested(m_trigger_records_requested.exchange(0));
98 :
99 0 : publish(std::move(i));
100 0 : }
101 :
102 : void
103 0 : TRMonRequestorModule::do_conf(const CommandData_t&)
104 : {
105 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
106 :
107 0 : m_request_interval = std::chrono::milliseconds(m_requestor_conf->get_minimum_request_interval_ms());
108 0 : m_trigger_type_mask = m_requestor_conf->get_trigger_type_mask();
109 0 : if (m_trigger_type_mask == 0) {
110 0 : m_trigger_type_mask = dfmessages::TRMonTriggerTypes::s_any_trigger_type;
111 : }
112 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
113 0 : }
114 :
115 : void
116 0 : TRMonRequestorModule::do_scrap(const CommandData_t& /*args*/)
117 : {
118 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
119 :
120 0 : TLOG() << get_name() << " successfully scrapped";
121 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
122 0 : }
123 :
124 : void
125 0 : TRMonRequestorModule::do_start(const CommandData_t& args)
126 : {
127 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
128 :
129 : // clean books from possible previous memory
130 0 : m_trigger_records_requested.store(0);
131 0 : m_current_request_number = 0;
132 0 : m_token_count = m_requestor_conf->get_maximum_outstanding_requests();
133 :
134 : // 19-Dec-2024, KAB: check that DataRequest senders are ready to send. This is done so
135 : // that the IOManager infrastructure fetches the necessary connection details from
136 : // the ConnectivityService at 'start' time, instead of the first time that the sender
137 : // is used to send a message. This avoids delays in the sending of the first request in
138 : // the first data-taking run in a DAQ session. Such delays can lead to undesirable
139 : // system behavior like trigger inhibits.
140 0 : {
141 0 : for (auto& sender_pair : m_trmon_senders) {
142 0 : bool is_ready = sender_pair.second->is_ready_for_sending(std::chrono::milliseconds(100));
143 0 : TLOG_DEBUG(0) << "The TRMonRequest sender for " << sender_pair.first << " " << (is_ready ? "is" : "is not")
144 0 : << " ready.";
145 : }
146 0 : m_trmon_sender_iter = m_trmon_senders.begin();
147 : }
148 :
149 0 : m_run_number.reset(new const daqdataformats::run_number_t(args.at("run").get<daqdataformats::run_number_t>()));
150 :
151 0 : m_token_receiver->add_callback(std::bind(&TRMonRequestorModule::token_callback, this, std::placeholders::_1));
152 :
153 0 : m_working_thread.start_working_thread(get_name());
154 :
155 0 : TLOG() << get_name() << " successfully started";
156 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
157 0 : }
158 :
159 : void
160 0 : TRMonRequestorModule::do_stop(const CommandData_t& /*args*/)
161 : {
162 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
163 :
164 0 : m_working_thread.stop_working_thread();
165 :
166 0 : m_token_receiver->remove_callback();
167 0 : m_token_count = 0;
168 :
169 0 : TLOG() << get_name() << " successfully stopped";
170 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
171 0 : }
172 :
173 : void
174 0 : TRMonRequestorModule::token_callback(const dfmessages::TriggerDecisionToken& token)
175 : {
176 0 : if (token.run_number == *m_run_number) {
177 0 : m_token_count++;
178 : }
179 0 : }
180 :
181 : void
182 0 : TRMonRequestorModule::do_work(std::atomic<bool>& run_flag)
183 : {
184 0 : auto last_trmon_send = std::chrono::steady_clock::now();
185 0 : auto short_sleep = m_request_interval / 100;
186 0 : auto long_sleep = m_request_interval / 10;
187 0 : while (run_flag) {
188 0 : if (m_token_count.load() > 0) {
189 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - last_trmon_send) >
190 : m_request_interval) {
191 0 : send_trmon_request();
192 0 : last_trmon_send = std::chrono::steady_clock::now();
193 0 : m_token_count--;
194 : } else {
195 0 : std::this_thread::sleep_for(long_sleep);
196 : }
197 : } else {
198 0 : std::this_thread::sleep_for(short_sleep);
199 : }
200 : }
201 0 : }
202 :
203 : void
204 0 : TRMonRequestorModule::send_trmon_request()
205 : {
206 0 : if (m_trmon_senders.size() == 0) {
207 0 : return;
208 : }
209 :
210 0 : dfmessages::TRMonRequest req;
211 0 : req.request_number = ++m_current_request_number;
212 0 : req.trigger_type_mask = m_trigger_type_mask;
213 0 : req.run_number = *m_run_number;
214 0 : req.data_destination = m_reply_connection;
215 :
216 0 : m_trmon_sender_iter->second->send(std::move(req), std::chrono::milliseconds(100));
217 0 : ++m_trmon_sender_iter;
218 0 : if (m_trmon_sender_iter == m_trmon_senders.end()) {
219 0 : m_trmon_sender_iter = m_trmon_senders.begin();
220 : }
221 0 : m_trigger_records_requested++;
222 0 : }
223 :
224 : } // namespace dunedaq::dfmodules
225 :
226 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TRMonRequestorModule)
|