Line data Source code
1 : /**
2 : * @file DataWriterModule.cpp DataWriterModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "DataWriterModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 : #include "dfmodules/opmon/DataWriter.pb.h"
12 :
13 : #include "appmodel/DataStoreConf.hpp"
14 : #include "appmodel/DataWriterModule.hpp"
15 : #include "appmodel/TRBModule.hpp"
16 : #include "confmodel/Application.hpp"
17 : #include "confmodel/Connection.hpp"
18 : #include "confmodel/Session.hpp"
19 : #include "daqdataformats/Fragment.hpp"
20 : #include "dfmessages/TriggerDecision.hpp"
21 : #include "dfmessages/TriggerRecord_serialization.hpp"
22 : #include "iomanager/IOManager.hpp"
23 : #include "logging/Logging.hpp"
24 : #include "rcif/cmd/Nljs.hpp"
25 :
26 : #include <algorithm>
27 : #include <cstdlib>
28 : #include <memory>
29 : #include <string>
30 : #include <thread>
31 : #include <utility>
32 : #include <vector>
33 :
34 : /**
35 : * @brief Name used by TRACE TLOG calls from this source file
36 : */
37 : // #define TRACE_NAME "DataWriterModule" // NOLINT This is the default
38 : enum
39 : {
40 : TLVL_ENTER_EXIT_METHODS = 5,
41 : TLVL_CONFIG = 7,
42 : TLVL_WORK_STEPS = 10,
43 : TLVL_SEQNO_MAP_CONTENTS = 13,
44 : TLVL_FRAGMENT_HEADER_DUMP = 17
45 : };
46 :
47 : namespace dunedaq {
48 : namespace dfmodules {
49 :
50 0 : DataWriterModule::DataWriterModule(const std::string& name)
51 : : dunedaq::appfwk::DAQModule(name)
52 0 : , m_queue_timeout(100)
53 0 : , m_data_storage_is_enabled(true)
54 0 : , m_thread(std::bind(&DataWriterModule::do_work, this, std::placeholders::_1))
55 : {
56 0 : register_command("conf", &DataWriterModule::do_conf);
57 0 : register_command("start", &DataWriterModule::do_start);
58 0 : register_command("stop", &DataWriterModule::do_stop);
59 0 : register_command("scrap", &DataWriterModule::do_scrap);
60 0 : }
61 :
62 : void
63 0 : DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
64 : {
65 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
66 0 : auto mdal = mcfg->get_dal<appmodel::DataWriterModule>(get_name());
67 0 : if (!mdal) {
68 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
69 : }
70 0 : auto iom = iomanager::IOManager::get();
71 :
72 0 : auto inputs = mdal->get_inputs();
73 0 : auto outputs = mdal->get_outputs();
74 :
75 0 : if (inputs.size() != 1) {
76 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Expected 1 input, got " + std::to_string(inputs.size()));
77 : }
78 0 : if (outputs.size() != 1) {
79 0 : throw appfwk::CommandFailed(
80 0 : ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));
81 : }
82 :
83 0 : m_module_configuration = mcfg;
84 0 : m_data_writer_conf = mdal->get_configuration();
85 0 : m_writer_identifier = mdal->get_writer_identifier();
86 :
87 0 : if (inputs[0]->get_data_type() != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
88 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue");
89 : }
90 0 : if (outputs[0]->get_data_type() != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
91 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue");
92 : }
93 :
94 0 : m_trigger_record_connection = inputs[0]->UID();
95 :
96 0 : auto modules = mcfg->get_modules();
97 0 : std::string trb_uid = "";
98 0 : bool is_trmon = false;
99 0 : for (auto& mod : modules) {
100 0 : if (mod->class_name() == "TRBModule") {
101 0 : trb_uid = mod->UID();
102 : break;
103 : }
104 0 : if (mod->class_name() == "TRMonRequestorModule") {
105 : is_trmon = true;
106 : break;
107 : }
108 : }
109 :
110 0 : if (!is_trmon) {
111 0 : auto trbdal = mcfg->get_dal<appmodel::TRBModule>(trb_uid);
112 0 : if (!trbdal) {
113 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve TRB configuration object");
114 : }
115 0 : for (auto con : trbdal->get_inputs()) {
116 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
117 0 : m_trigger_decision_connection = con->UID();
118 : }
119 : }
120 : }
121 :
122 : // try to create the receiver to see test the connection anyway
123 0 : m_tr_receiver = iom->get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
124 :
125 0 : m_token_output = iom->get_sender<dfmessages::TriggerDecisionToken>(outputs[0]->UID());
126 :
127 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
128 0 : }
129 :
130 : void
131 0 : DataWriterModule::generate_opmon_data()
132 : {
133 :
134 0 : opmon::DataWriterInfo dwi;
135 :
136 0 : dwi.set_records_received(m_records_received_tot.load());
137 : // dwi.new_records_received = m_records_received.exchange(0);
138 0 : dwi.set_records_written(m_records_written_tot.load());
139 0 : dwi.set_new_records_written(m_records_written.exchange(0));
140 : // dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
141 : // dwi.new_bytes_output = m_bytes_output.exchange(0);
142 0 : dwi.set_writing_time_us(m_writing_us.exchange(0));
143 :
144 0 : publish(std::move(dwi));
145 0 : }
146 :
147 : void
148 0 : DataWriterModule::do_conf(const CommandData_t&)
149 : {
150 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
151 :
152 0 : m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale();
153 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale;
154 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are "
155 0 : << m_data_writer_conf->get_data_store_params();
156 0 : m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000;
157 0 : if (m_min_write_retry_time_usec < 1) {
158 0 : m_min_write_retry_time_usec = 1;
159 : }
160 0 : m_max_write_retry_time_usec = m_data_writer_conf->get_max_write_retry_time_ms() * 1000;
161 0 : m_write_retry_time_increase_factor = m_data_writer_conf->get_write_retry_time_increase_factor();
162 :
163 : // create the DataStore instance here
164 0 : try {
165 0 : m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(),
166 0 : m_data_writer_conf->get_data_store_params()->UID(),
167 0 : m_module_configuration,
168 0 : m_writer_identifier);
169 0 : register_node("data_writer", m_data_writer);
170 0 : } catch (const ers::Issue& excpt) {
171 0 : throw UnableToConfigure(ERS_HERE, get_name(), excpt);
172 0 : }
173 :
174 : // ensure that we have a valid dataWriter instance
175 0 : if (m_data_writer.get() == nullptr) {
176 0 : throw InvalidDataWriterModule(ERS_HERE, get_name());
177 : }
178 :
179 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
180 0 : }
181 :
182 : void
183 0 : DataWriterModule::do_start(const CommandData_t& payload)
184 : {
185 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
186 :
187 0 : rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
188 0 : m_data_storage_is_enabled = (!start_params.disable_data_storage);
189 0 : m_run_number = start_params.run;
190 :
191 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Sending initial TriggerDecisionToken to DFO to announce my presence";
192 0 : dfmessages::TriggerDecisionToken token;
193 0 : token.run_number = 0;
194 0 : token.trigger_number = 0;
195 0 : token.decision_destination = m_trigger_decision_connection;
196 :
197 : int wasSentSuccessfully = 5;
198 0 : do {
199 0 : try {
200 0 : m_token_output->send(std::move(token), m_queue_timeout);
201 : wasSentSuccessfully = 0;
202 0 : } catch (const ers::Issue& excpt) {
203 0 : std::ostringstream oss_warn;
204 0 : oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
205 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
206 0 : wasSentSuccessfully--;
207 0 : std::this_thread::sleep_for(std::chrono::microseconds(5000));
208 0 : }
209 0 : } while (wasSentSuccessfully);
210 :
211 : // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
212 : // I've put this call fairly early in this method because it could throw an
213 : // exception and abort the run start. And, it seems sensible to avoid starting
214 : // threads, etc. if we throw an exception.
215 0 : if (m_data_storage_is_enabled) {
216 :
217 : // ensure that we have a valid dataWriter instance
218 0 : if (m_data_writer.get() == nullptr) {
219 : // this check is done essentially to notify the user
220 : // in case the "start" has been called before the "conf"
221 0 : ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name()));
222 : }
223 :
224 0 : try {
225 0 : m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
226 0 : } catch (const ers::Issue& excpt) {
227 0 : throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
228 0 : }
229 : }
230 :
231 0 : m_seqno_counts.clear();
232 :
233 0 : m_records_received = 0;
234 0 : m_records_received_tot = 0;
235 0 : m_records_written = 0;
236 0 : m_records_written_tot = 0;
237 0 : m_bytes_output = 0;
238 0 : m_bytes_output_tot = 0;
239 :
240 0 : m_running.store(true);
241 :
242 0 : m_thread.start_working_thread(get_name());
243 : // iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
244 : // m_trigger_record_connection, bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
245 :
246 0 : TLOG() << get_name() << " successfully started for run number " << m_run_number;
247 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
248 0 : }
249 :
250 : void
251 0 : DataWriterModule::do_stop(const CommandData_t& /*args*/)
252 : {
253 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
254 :
255 0 : m_running.store(false);
256 0 : m_thread.stop_working_thread();
257 : // iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>(
258 : // m_trigger_record_connection );
259 :
260 : // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
261 : // I've put this call fairly late in this method so that any draining of queues
262 : // (or whatever) can take place before we finalize things in the DataStore.
263 0 : if (m_data_storage_is_enabled) {
264 0 : try {
265 0 : m_data_writer->finish_with_run(m_run_number);
266 0 : } catch (const std::exception& excpt) {
267 0 : ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
268 0 : }
269 : }
270 :
271 0 : TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
272 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
273 0 : }
274 :
275 : void
276 0 : DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
277 : {
278 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
279 :
280 : // clear/reset the DataStore instance here
281 0 : m_data_writer.reset();
282 :
283 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
284 0 : }
285 :
286 : void
287 0 : DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>& trigger_record_ptr)
288 : {
289 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr";
290 :
291 0 : ++m_records_received;
292 0 : ++m_records_received_tot;
293 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number "
294 0 : << trigger_record_ptr->get_header_ref().get_trigger_number() << "."
295 0 : << trigger_record_ptr->get_header_ref().get_sequence_number() << ", run number "
296 0 : << trigger_record_ptr->get_header_ref().get_run_number() << " off the input connection";
297 :
298 0 : if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) {
299 0 : ers::error(InvalidRunNumber(ERS_HERE,
300 0 : get_name(),
301 : "TriggerRecord",
302 : trigger_record_ptr->get_header_ref().get_run_number(),
303 : m_run_number,
304 : trigger_record_ptr->get_header_ref().get_trigger_number(),
305 0 : trigger_record_ptr->get_header_ref().get_sequence_number()));
306 0 : return;
307 : }
308 :
309 : // 03-Feb-2021, KAB: adding support for a data-storage prescale.
310 : // In this "if" statement, I deliberately compare the result of (N mod prescale) to 1
311 : // instead of zero, since I think that it would be nice to always get the first event
312 : // written out.
313 0 : if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load() % m_data_storage_prescale) == 1)) {
314 :
315 0 : if (m_data_storage_is_enabled) {
316 :
317 0 : std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
318 :
319 0 : bool should_retry = true;
320 0 : size_t retry_wait_usec = m_min_write_retry_time_usec;
321 0 : do {
322 0 : should_retry = false;
323 0 : try {
324 0 : m_data_writer->write(*trigger_record_ptr);
325 0 : ++m_records_written;
326 0 : ++m_records_written_tot;
327 0 : m_bytes_output += trigger_record_ptr->get_total_size_bytes();
328 0 : m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
329 0 : } catch (const RetryableDataStoreProblem& excpt) {
330 0 : should_retry = true;
331 0 : ers::error(DataWritingProblem(ERS_HERE,
332 0 : get_name(),
333 : trigger_record_ptr->get_header_ref().get_trigger_number(),
334 : trigger_record_ptr->get_header_ref().get_sequence_number(),
335 0 : trigger_record_ptr->get_header_ref().get_run_number(),
336 0 : excpt));
337 0 : if (retry_wait_usec > m_max_write_retry_time_usec) {
338 : retry_wait_usec = m_max_write_retry_time_usec;
339 : }
340 0 : usleep(retry_wait_usec);
341 0 : retry_wait_usec *= m_write_retry_time_increase_factor;
342 0 : } catch (const std::exception& excpt) {
343 0 : ers::error(DataWritingProblem(ERS_HERE,
344 0 : get_name(),
345 : trigger_record_ptr->get_header_ref().get_trigger_number(),
346 : trigger_record_ptr->get_header_ref().get_sequence_number(),
347 0 : trigger_record_ptr->get_header_ref().get_run_number(),
348 0 : excpt));
349 0 : }
350 0 : } while (should_retry && m_running.load());
351 :
352 0 : std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
353 0 : auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
354 0 : m_writing_us += writing_time.count();
355 : } // if m_data_storage_is_enabled
356 : }
357 :
358 0 : bool send_trigger_complete_message = m_running.load();
359 0 : if (trigger_record_ptr->get_header_ref().get_max_sequence_number() > 0) {
360 0 : daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number();
361 0 : if (m_seqno_counts.count(trigno) > 0) {
362 0 : ++m_seqno_counts[trigno];
363 : } else {
364 0 : m_seqno_counts[trigno] = 1;
365 : }
366 : // in the following comparison GT (>) is used since the counts are one-based and the
367 : // max sequence number is zero-based.
368 0 : if (m_seqno_counts[trigno] > trigger_record_ptr->get_header_ref().get_max_sequence_number()) {
369 0 : m_seqno_counts.erase(trigno);
370 : } else {
371 : // Using const .count and .at to avoid reintroducing element to map
372 0 : TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS)
373 0 : << get_name() << ": the sequence number count for trigger number " << trigno << " is "
374 0 : << (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
375 0 : << "in the seqno map is " << m_seqno_counts.size() << ").";
376 0 : send_trigger_complete_message = false;
377 : }
378 : }
379 0 : if (send_trigger_complete_message) {
380 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing the TriggerDecisionToken for trigger number "
381 0 : << trigger_record_ptr->get_header_ref().get_trigger_number()
382 0 : << " onto the relevant output queue";
383 0 : dfmessages::TriggerDecisionToken token;
384 0 : token.run_number = m_run_number;
385 0 : token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number();
386 0 : token.decision_destination = m_trigger_decision_connection;
387 :
388 : bool wasSentSuccessfully = false;
389 0 : do {
390 0 : try {
391 0 : m_token_output->send(std::move(token), m_queue_timeout);
392 : wasSentSuccessfully = true;
393 0 : } catch (const ers::Issue& excpt) {
394 0 : std::ostringstream oss_warn;
395 0 : oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
396 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
397 0 : }
398 0 : } while (!wasSentSuccessfully && m_running.load());
399 0 : }
400 :
401 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR";
402 : } // NOLINT(readability/fn_size)
403 :
404 : void
405 0 : DataWriterModule::do_work(std::atomic<bool>& running_flag)
406 : {
407 : // 27-Jan-2026, KAB: we want this code to drain all pending TriggerRecords from
408 : // the input queue at end-run time. So, we check if there is data in the queue
409 : // and continue the while loop if there is any.
410 0 : while (running_flag.load() || m_tr_receiver->data_pending()) {
411 0 : try {
412 0 : std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver->receive(std::chrono::milliseconds(10));
413 0 : receive_trigger_record(tr);
414 0 : } catch (const iomanager::TimeoutExpired& excpt) {
415 0 : } catch (const ers::Issue& excpt) {
416 0 : ers::warning(excpt);
417 0 : }
418 : }
419 0 : }
420 :
421 : } // namespace dfmodules
422 : } // namespace dunedaq
423 :
424 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DataWriterModule)
|