Line data Source code
1 : /**
2 : * @file DataWriterModule.cpp DataWriterModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "DataWriterModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 : #include "dfmodules/opmon/DataWriter.pb.h"
12 :
13 : #include "confmodel/Application.hpp"
14 : #include "confmodel/Session.hpp"
15 : #include "appmodel/DataWriterModule.hpp"
16 : #include "appmodel/TRBModule.hpp"
17 : #include "appmodel/DataStoreConf.hpp"
18 : #include "confmodel/Connection.hpp"
19 : #include "daqdataformats/Fragment.hpp"
20 : #include "dfmessages/TriggerDecision.hpp"
21 : #include "dfmessages/TriggerRecord_serialization.hpp"
22 : #include "logging/Logging.hpp"
23 : #include "iomanager/IOManager.hpp"
24 : #include "rcif/cmd/Nljs.hpp"
25 :
26 : #include <algorithm>
27 : #include <cstdlib>
28 : #include <memory>
29 : #include <string>
30 : #include <thread>
31 : #include <utility>
32 : #include <vector>
33 :
34 : /**
35 : * @brief Name used by TRACE TLOG calls from this source file
36 : */
37 : //#define TRACE_NAME "DataWriterModule" // NOLINT This is the default
38 : enum
39 : {
40 : TLVL_ENTER_EXIT_METHODS = 5,
41 : TLVL_CONFIG = 7,
42 : TLVL_WORK_STEPS = 10,
43 : TLVL_SEQNO_MAP_CONTENTS = 13,
44 : TLVL_FRAGMENT_HEADER_DUMP = 17
45 : };
46 :
47 : namespace dunedaq {
48 : namespace dfmodules {
49 :
50 0 : DataWriterModule::DataWriterModule(const std::string& name)
51 : : dunedaq::appfwk::DAQModule(name)
52 0 : , m_queue_timeout(100)
53 0 : , m_data_storage_is_enabled(true)
54 0 : , m_thread(std::bind(&DataWriterModule::do_work, this, std::placeholders::_1))
55 : {
56 0 : register_command("conf", &DataWriterModule::do_conf);
57 0 : register_command("start", &DataWriterModule::do_start);
58 0 : register_command("stop", &DataWriterModule::do_stop);
59 0 : register_command("scrap", &DataWriterModule::do_scrap);
60 0 : }
61 :
62 : void
63 0 : DataWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
64 : {
65 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
66 0 : auto mdal = mcfg->get_dal<appmodel::DataWriterModule>(get_name());
67 0 : if (!mdal) {
68 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
69 : }
70 0 : auto iom = iomanager::IOManager::get();
71 :
72 0 : auto inputs = mdal->get_inputs();
73 0 : auto outputs = mdal->get_outputs();
74 :
75 0 : if (inputs.size() != 1) {
76 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Expected 1 input, got " + std::to_string(inputs.size()));
77 : }
78 0 : if (outputs.size() != 1) {
79 0 : throw appfwk::CommandFailed(
80 0 : ERS_HERE, "init", get_name(), "Expected 1 output, got " + std::to_string(outputs.size()));
81 : }
82 :
83 0 : m_module_configuration = mcfg;
84 0 : m_data_writer_conf = mdal->get_configuration();
85 0 : m_writer_identifier = mdal->get_writer_identifier();
86 :
87 0 : if (inputs[0]->get_data_type() != datatype_to_string<std::unique_ptr<daqdataformats::TriggerRecord>>()) {
88 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerRecord Input queue");
89 : }
90 0 : if (outputs[0]->get_data_type() != datatype_to_string<dfmessages::TriggerDecisionToken>()) {
91 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "TriggerDecisionToken Output queue");
92 : }
93 :
94 0 : m_trigger_record_connection = inputs[0]->UID();
95 :
96 0 : auto modules = mcfg->get_modules();
97 0 : std::string trb_uid = "";
98 0 : bool is_trmon = false;
99 0 : for (auto& mod : modules) {
100 0 : if (mod->class_name() == "TRBModule") {
101 0 : trb_uid = mod->UID();
102 : break;
103 : }
104 0 : if (mod->class_name() == "TRMonRequestorModule") {
105 : is_trmon = true;
106 : break;
107 : }
108 : }
109 :
110 0 : if (!is_trmon) {
111 0 : auto trbdal = mcfg->get_dal<appmodel::TRBModule>(trb_uid);
112 0 : if (!trbdal) {
113 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve TRB configuration object");
114 : }
115 0 : for (auto con : trbdal->get_inputs()) {
116 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TriggerDecision>()) {
117 0 : m_trigger_decision_connection = con->UID();
118 : }
119 : }
120 : }
121 :
122 : // try to create the receiver to see test the connection anyway
123 0 : m_tr_receiver = iom -> get_receiver<std::unique_ptr<daqdataformats::TriggerRecord>>(m_trigger_record_connection);
124 :
125 0 : m_token_output = iom->get_sender<dfmessages::TriggerDecisionToken>(outputs[0]->UID());
126 :
127 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
128 0 : }
129 :
130 : void
131 0 : DataWriterModule::generate_opmon_data() {
132 :
133 0 : opmon::DataWriterInfo dwi;
134 :
135 0 : dwi.set_records_received(m_records_received_tot.load());
136 : // dwi.new_records_received = m_records_received.exchange(0);
137 0 : dwi.set_records_written(m_records_written_tot.load());
138 0 : dwi.set_new_records_written(m_records_written.exchange(0));
139 : // dwi.bytes_output = m_bytes_output_tot.load(); MR: byte writing to be delegated to DataStorage
140 : // dwi.new_bytes_output = m_bytes_output.exchange(0);
141 0 : dwi.set_writing_time_us(m_writing_us.exchange(0));
142 :
143 0 : publish(std::move(dwi));
144 0 : }
145 :
146 : void
147 0 : DataWriterModule::do_conf(const CommandData_t&)
148 : {
149 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
150 :
151 0 : m_data_storage_prescale = m_data_writer_conf->get_data_storage_prescale();
152 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_storage_prescale is " << m_data_storage_prescale;
153 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": data_store_parameters are " << m_data_writer_conf->get_data_store_params();
154 0 : m_min_write_retry_time_usec = m_data_writer_conf->get_min_write_retry_time_ms() * 1000;
155 0 : if (m_min_write_retry_time_usec < 1) {
156 0 : m_min_write_retry_time_usec = 1;
157 : }
158 0 : m_max_write_retry_time_usec = m_data_writer_conf->get_max_write_retry_time_ms() * 1000;
159 0 : m_write_retry_time_increase_factor = m_data_writer_conf->get_write_retry_time_increase_factor();
160 :
161 : // create the DataStore instance here
162 0 : try {
163 0 : m_data_writer = make_data_store(m_data_writer_conf->get_data_store_params()->get_type(),
164 0 : m_data_writer_conf->get_data_store_params()->UID(),
165 0 : m_module_configuration, m_writer_identifier);
166 0 : register_node("data_writer", m_data_writer);
167 0 : } catch (const ers::Issue& excpt) {
168 0 : throw UnableToConfigure(ERS_HERE, get_name(), excpt);
169 0 : }
170 :
171 : // ensure that we have a valid dataWriter instance
172 0 : if (m_data_writer.get() == nullptr) {
173 0 : throw InvalidDataWriterModule(ERS_HERE, get_name());
174 : }
175 :
176 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
177 0 : }
178 :
179 : void
180 0 : DataWriterModule::do_start(const CommandData_t& payload)
181 : {
182 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
183 :
184 0 : rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
185 0 : m_data_storage_is_enabled = (!start_params.disable_data_storage);
186 0 : m_run_number = start_params.run;
187 :
188 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Sending initial TriggerDecisionToken to DFO to announce my presence";
189 0 : dfmessages::TriggerDecisionToken token;
190 0 : token.run_number = 0;
191 0 : token.trigger_number = 0;
192 0 : token.decision_destination = m_trigger_decision_connection;
193 :
194 : int wasSentSuccessfully = 5;
195 0 : do {
196 0 : try {
197 0 : m_token_output->send(std::move(token), m_queue_timeout);
198 : wasSentSuccessfully = 0;
199 0 : } catch (const ers::Issue& excpt) {
200 0 : std::ostringstream oss_warn;
201 0 : oss_warn << "Send with sender \"" << m_token_output->get_name() << "\" failed";
202 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
203 0 : wasSentSuccessfully--;
204 0 : std::this_thread::sleep_for(std::chrono::microseconds(5000));
205 0 : }
206 0 : } while (wasSentSuccessfully);
207 :
208 : // 04-Feb-2021, KAB: added this call to allow DataStore to prepare for the run.
209 : // I've put this call fairly early in this method because it could throw an
210 : // exception and abort the run start. And, it seems sensible to avoid starting
211 : // threads, etc. if we throw an exception.
212 0 : if (m_data_storage_is_enabled) {
213 :
214 : // ensure that we have a valid dataWriter instance
215 0 : if (m_data_writer.get() == nullptr) {
216 : // this check is done essentially to notify the user
217 : // in case the "start" has been called before the "conf"
218 0 : ers::fatal(InvalidDataWriterModule(ERS_HERE, get_name()));
219 : }
220 :
221 0 : try {
222 0 : m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
223 0 : } catch (const ers::Issue& excpt) {
224 0 : throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
225 0 : }
226 : }
227 :
228 0 : m_seqno_counts.clear();
229 :
230 0 : m_records_received = 0;
231 0 : m_records_received_tot = 0;
232 0 : m_records_written = 0;
233 0 : m_records_written_tot = 0;
234 0 : m_bytes_output = 0;
235 0 : m_bytes_output_tot = 0;
236 :
237 0 : m_running.store(true);
238 :
239 0 : m_thread.start_working_thread(get_name());
240 : //iomanager::IOManager::get()->add_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection,
241 : // bind( &DataWriterModule::receive_trigger_record, this, std::placeholders::_1) );
242 :
243 0 : TLOG() << get_name() << " successfully started for run number " << m_run_number;
244 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
245 0 : }
246 :
247 : void
248 0 : DataWriterModule::do_stop(const CommandData_t& /*args*/)
249 : {
250 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
251 :
252 0 : m_running.store(false);
253 0 : m_thread.stop_working_thread();
254 : //iomanager::IOManager::get()->remove_callback<std::unique_ptr<daqdataformats::TriggerRecord>>( m_trigger_record_connection );
255 :
256 : // 04-Feb-2021, KAB: added this call to allow DataStore to finish up with this run.
257 : // I've put this call fairly late in this method so that any draining of queues
258 : // (or whatever) can take place before we finalize things in the DataStore.
259 0 : if (m_data_storage_is_enabled) {
260 0 : try {
261 0 : m_data_writer->finish_with_run(m_run_number);
262 0 : } catch (const std::exception& excpt) {
263 0 : ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
264 0 : }
265 : }
266 :
267 0 : TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
268 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
269 0 : }
270 :
271 : void
272 0 : DataWriterModule::do_scrap(const CommandData_t& /*payload*/)
273 : {
274 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
275 :
276 : // clear/reset the DataStore instance here
277 0 : m_data_writer.reset();
278 :
279 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
280 0 : }
281 :
282 : void
283 0 : DataWriterModule::receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord> & trigger_record_ptr)
284 : {
285 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": receiving a new TR ptr";
286 :
287 0 : ++m_records_received;
288 0 : ++m_records_received_tot;
289 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Obtained the TriggerRecord for trigger number "
290 0 : << trigger_record_ptr->get_header_ref().get_trigger_number() << "."
291 0 : << trigger_record_ptr->get_header_ref().get_sequence_number()
292 0 : << ", run number " << trigger_record_ptr->get_header_ref().get_run_number()
293 0 : << " off the input connection";
294 :
295 0 : if (trigger_record_ptr->get_header_ref().get_run_number() != m_run_number) {
296 0 : ers::error(InvalidRunNumber(ERS_HERE, get_name(), "TriggerRecord", trigger_record_ptr->get_header_ref().get_run_number(),
297 : m_run_number, trigger_record_ptr->get_header_ref().get_trigger_number(),
298 0 : trigger_record_ptr->get_header_ref().get_sequence_number()));
299 0 : return;
300 : }
301 :
302 : // 03-Feb-2021, KAB: adding support for a data-storage prescale.
303 : // In this "if" statement, I deliberately compare the result of (N mod prescale) to 1
304 : // instead of zero, since I think that it would be nice to always get the first event
305 : // written out.
306 0 : if (m_data_storage_prescale <= 1 || ((m_records_received_tot.load() % m_data_storage_prescale) == 1)) {
307 :
308 0 : if (m_data_storage_is_enabled) {
309 :
310 0 : std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
311 :
312 0 : bool should_retry = true;
313 0 : size_t retry_wait_usec = m_min_write_retry_time_usec;
314 0 : do {
315 0 : should_retry = false;
316 0 : try {
317 0 : m_data_writer->write(*trigger_record_ptr);
318 0 : ++m_records_written;
319 0 : ++m_records_written_tot;
320 0 : m_bytes_output += trigger_record_ptr->get_total_size_bytes();
321 0 : m_bytes_output_tot += trigger_record_ptr->get_total_size_bytes();
322 0 : } catch (const RetryableDataStoreProblem& excpt) {
323 0 : should_retry = true;
324 0 : ers::error(DataWritingProblem(ERS_HERE,
325 0 : get_name(),
326 : trigger_record_ptr->get_header_ref().get_trigger_number(),
327 : trigger_record_ptr->get_header_ref().get_sequence_number(),
328 0 : trigger_record_ptr->get_header_ref().get_run_number(),
329 : excpt));
330 0 : if (retry_wait_usec > m_max_write_retry_time_usec) {
331 : retry_wait_usec = m_max_write_retry_time_usec;
332 : }
333 0 : usleep(retry_wait_usec);
334 0 : retry_wait_usec *= m_write_retry_time_increase_factor;
335 0 : } catch (const std::exception& excpt) {
336 0 : ers::error(DataWritingProblem(ERS_HERE,
337 0 : get_name(),
338 : trigger_record_ptr->get_header_ref().get_trigger_number(),
339 : trigger_record_ptr->get_header_ref().get_sequence_number(),
340 0 : trigger_record_ptr->get_header_ref().get_run_number(),
341 : excpt));
342 0 : }
343 0 : } while (should_retry && m_running.load());
344 :
345 0 : std::chrono::steady_clock::time_point end_time = std::chrono::steady_clock::now();
346 0 : auto writing_time = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
347 0 : m_writing_us += writing_time.count();
348 : } // if m_data_storage_is_enabled
349 : }
350 :
351 0 : bool send_trigger_complete_message = m_running.load();
352 0 : if (trigger_record_ptr->get_header_ref().get_max_sequence_number() > 0) {
353 0 : daqdataformats::trigger_number_t trigno = trigger_record_ptr->get_header_ref().get_trigger_number();
354 0 : if (m_seqno_counts.count(trigno) > 0) {
355 0 : ++m_seqno_counts[trigno];
356 : } else {
357 0 : m_seqno_counts[trigno] = 1;
358 : }
359 : // in the following comparison GT (>) is used since the counts are one-based and the
360 : // max sequence number is zero-based.
361 0 : if (m_seqno_counts[trigno] > trigger_record_ptr->get_header_ref().get_max_sequence_number()) {
362 0 : m_seqno_counts.erase(trigno);
363 : } else {
364 : // Using const .count and .at to avoid reintroducing element to map
365 0 : TLOG_DEBUG(TLVL_SEQNO_MAP_CONTENTS) << get_name() << ": the sequence number count for trigger number " << trigno
366 0 : << " is " << (m_seqno_counts.count(trigno) ? m_seqno_counts.at(trigno) : 0) << " (number of entries "
367 0 : << "in the seqno map is " << m_seqno_counts.size() << ").";
368 0 : send_trigger_complete_message = false;
369 : }
370 : }
371 0 : if (send_trigger_complete_message) {
372 0 : TLOG_DEBUG(TLVL_WORK_STEPS) << get_name() << ": Pushing the TriggerDecisionToken for trigger number "
373 0 : << trigger_record_ptr->get_header_ref().get_trigger_number()
374 0 : << " onto the relevant output queue";
375 0 : dfmessages::TriggerDecisionToken token;
376 0 : token.run_number = m_run_number;
377 0 : token.trigger_number = trigger_record_ptr->get_header_ref().get_trigger_number();
378 0 : token.decision_destination = m_trigger_decision_connection;
379 :
380 : bool wasSentSuccessfully = false;
381 0 : do {
382 0 : try {
383 0 : m_token_output -> send( std::move(token), m_queue_timeout );
384 : wasSentSuccessfully = true;
385 0 : } catch (const ers::Issue& excpt) {
386 0 : std::ostringstream oss_warn;
387 0 : oss_warn << "Send with sender \"" << m_token_output -> get_name() << "\" failed";
388 0 : ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt));
389 0 : }
390 0 : } while (!wasSentSuccessfully && m_running.load());
391 :
392 0 : }
393 :
394 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": operations completed for TR";
395 : } // NOLINT(readability/fn_size)
396 :
397 : void
398 0 : DataWriterModule::do_work(std::atomic<bool>& running_flag) {
399 0 : while (running_flag.load()) {
400 0 : try {
401 0 : std::unique_ptr<daqdataformats::TriggerRecord> tr = m_tr_receiver-> receive(std::chrono::milliseconds(10));
402 0 : receive_trigger_record(tr);
403 0 : }
404 0 : catch(const iomanager::TimeoutExpired& excpt) {
405 0 : }
406 0 : catch(const ers::Issue & excpt) {
407 0 : ers::warning(excpt);
408 0 : }
409 : }
410 0 : }
411 :
412 : } // namespace dfmodules
413 : } // namespace dunedaq
414 :
415 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DataWriterModule)
|