Line data Source code
1 : /**
2 : * @file TPStreamWriterModule.cpp TPStreamWriterModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "TPStreamWriterModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 : #include "dfmodules/TPBundleHandler.hpp"
12 : #include "dfmodules/opmon/TPStreamWriter.pb.h"
13 :
14 : #include "appmodel/DataStoreConf.hpp"
15 : #include "appmodel/TPStreamWriterModule.hpp"
16 : #include "confmodel/Connection.hpp"
17 : #include "confmodel/Session.hpp"
18 : #include "iomanager/IOManager.hpp"
19 : #include "daqdataformats/Fragment.hpp"
20 : #include "daqdataformats/Types.hpp"
21 : #include "logging/Logging.hpp"
22 : #include "rcif/cmd/Nljs.hpp"
23 :
24 : #include "boost/date_time/posix_time/posix_time.hpp"
25 :
26 : #include <chrono>
27 : #include <memory>
28 : #include <sstream>
29 : #include <string>
30 : #include <utility>
31 : #include <vector>
32 :
33 : enum
34 : {
35 : TLVL_ENTER_EXIT_METHODS = 5,
36 : TLVL_CONFIG = 7,
37 : };
38 :
39 : namespace dunedaq {
40 : namespace dfmodules {
41 :
42 0 : TPStreamWriterModule::TPStreamWriterModule(const std::string& name)
43 : : dunedaq::appfwk::DAQModule(name)
44 0 : , m_thread(std::bind(&TPStreamWriterModule::do_work, this, std::placeholders::_1))
45 0 : , m_queue_timeout(100)
46 : {
47 0 : register_command("conf", &TPStreamWriterModule::do_conf);
48 0 : register_command("start", &TPStreamWriterModule::do_start);
49 0 : register_command("stop", &TPStreamWriterModule::do_stop);
50 0 : register_command("scrap", &TPStreamWriterModule::do_scrap);
51 0 : }
52 :
53 : void
54 0 : TPStreamWriterModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
55 : {
56 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
57 0 : auto mdal = mcfg->get_dal<appmodel::TPStreamWriterModule>(get_name());
58 0 : if (!mdal) {
59 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
60 : }
61 0 : assert(mdal->get_inputs().size() == 1);
62 0 : m_module_configuration = mcfg;
63 0 : m_tpset_source = iomanager::IOManager::get()->get_receiver<trigger::TPSet>(mdal->get_inputs()[0]->UID());
64 0 : m_writer_identifier = mdal->get_writer_identifier();
65 0 : m_tp_writer_conf = mdal->get_configuration();
66 0 : m_source_id = mdal->get_source_id();
67 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
68 0 : }
69 :
70 : void
71 0 : TPStreamWriterModule::generate_opmon_data() {
72 0 : opmon::TPStreamWriterInfo info;
73 :
74 0 : info.set_heartbeat_tpsets_received(m_heartbeat_tpsets.exchange(0));
75 0 : info.set_tpsets_with_tps_received(m_tpsets_with_tps.exchange(0));
76 0 : info.set_tps_received(m_tps_received.exchange(0));
77 0 : info.set_tps_written(m_tps_written.exchange(0));
78 0 : info.set_tps_discarded(m_tps_discarded.exchange(0));
79 0 : info.set_total_tps_received(m_total_tps_received.load());
80 0 : info.set_total_tps_written(m_total_tps_written.load());
81 0 : info.set_total_tps_discarded(m_total_tps_discarded.load());
82 0 : info.set_tardy_timeslice_max_seconds(m_tardy_timeslice_max_seconds.exchange(0.0));
83 0 : info.set_timeslices_written(m_timeslices_written.exchange(0));
84 0 : info.set_bytes_output(m_bytes_output.exchange(0));
85 :
86 0 : publish(std::move(info));
87 0 : }
88 :
89 : void
90 0 : TPStreamWriterModule::do_conf(const CommandData_t&)
91 : {
92 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
93 0 : m_accumulation_interval_ticks = m_tp_writer_conf->get_tp_accumulation_interval();
94 0 : m_accumulation_inactivity_time_before_write =
95 0 : std::chrono::milliseconds(static_cast<int>(1000*m_tp_writer_conf->get_tp_accumulation_inactivity_time_before_write_sec()));
96 0 : m_warn_user_when_tardy_tps_are_discarded = m_tp_writer_conf->get_warn_user_when_tardy_tps_are_discarded();
97 0 : m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0;
98 :
99 : // create the DataStore instance here
100 0 : try {
101 0 : m_data_writer = make_data_store(m_tp_writer_conf->get_data_store_params()->get_type(),
102 0 : m_tp_writer_conf->get_data_store_params()->UID(),
103 0 : m_module_configuration, m_writer_identifier);
104 0 : register_node("data_writer", m_data_writer);
105 0 : } catch (const ers::Issue& excpt) {
106 0 : throw UnableToConfigure(ERS_HERE, get_name(), excpt);
107 0 : }
108 :
109 : // ensure that we have a valid dataWriter instance
110 0 : if (m_data_writer.get() == nullptr) {
111 0 : throw InvalidDataWriterModule(ERS_HERE, get_name());
112 : }
113 :
114 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
115 0 : }
116 :
117 : void
118 0 : TPStreamWriterModule::do_start(const CommandData_t& payload)
119 : {
120 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
121 0 : rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
122 0 : m_run_number = start_params.run;
123 0 : m_total_tps_received.store(0);
124 0 : m_total_tps_written.store(0);
125 0 : m_total_tps_discarded.store(0);
126 :
127 : // 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run.
128 : // I've put this call fairly early in this method because it could throw an
129 : // exception and abort the run start. And, it seems sensible to avoid starting
130 : // threads, etc. if we throw an exception.
131 0 : try {
132 0 : m_data_writer->prepare_for_run(m_run_number, (start_params.production_vs_test == "TEST"));
133 0 : } catch (const ers::Issue& excpt) {
134 0 : throw UnableToStart(ERS_HERE, get_name(), m_run_number, excpt);
135 0 : }
136 :
137 0 : m_thread.start_working_thread(get_name());
138 :
139 0 : TLOG() << get_name() << " successfully started for run number " << m_run_number;
140 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
141 0 : }
142 :
143 : void
144 0 : TPStreamWriterModule::do_stop(const CommandData_t& /*payload*/)
145 : {
146 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
147 0 : m_thread.stop_working_thread();
148 :
149 : // 06-Mar-2022, KAB: added this call to allow DataStore to finish up with this run.
150 : // I've put this call fairly late in this method so that any draining of queues
151 : // (or whatever) can take place before we finalize things in the DataStore.
152 0 : try {
153 0 : m_data_writer->finish_with_run(m_run_number);
154 0 : } catch (const std::exception& excpt) {
155 0 : ers::error(ProblemDuringStop(ERS_HERE, get_name(), m_run_number, excpt));
156 0 : }
157 :
158 0 : TLOG() << get_name() << " successfully stopped for run number " << m_run_number;
159 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
160 0 : }
161 :
162 : void
163 0 : TPStreamWriterModule::do_scrap(const CommandData_t& /*payload*/)
164 : {
165 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method";
166 :
167 : // clear/reset the DataStore instance here
168 0 : m_data_writer.reset();
169 :
170 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method";
171 0 : }
172 :
173 : void
174 0 : TPStreamWriterModule::do_work(std::atomic<bool>& running_flag)
175 : {
176 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
177 :
178 0 : using namespace std::chrono;
179 0 : size_t n_tpset_received = 0;
180 0 : auto start_time = steady_clock::now();
181 0 : daqdataformats::timestamp_t first_timestamp = 0;
182 0 : daqdataformats::timestamp_t last_timestamp = 0;
183 :
184 0 : TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write);
185 :
186 0 : bool possible_pending_data = true;
187 0 : size_t largest_timeslice_number = 0;
188 0 : while (running_flag.load() || possible_pending_data) {
189 0 : trigger::TPSet tpset;
190 0 : try {
191 0 : tpset = m_tpset_source->receive(m_queue_timeout);
192 0 : ++n_tpset_received;
193 :
194 0 : if (tpset.type == trigger::TPSet::Type::kHeartbeat) {
195 0 : ++m_heartbeat_tpsets;
196 0 : continue;
197 : }
198 :
199 0 : TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin
200 0 : << ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is "
201 0 : << tpset.run_number << ", slice id is " << (tpset.start_time / m_accumulation_interval_ticks);
202 :
203 : // 30-Mar-2022, KAB: added test for matching run number. This is to avoid getting
204 : // confused by TPSets that happen to be leftover in transit from one run to the
205 : // next (which we have observed in v2.10.x systems).
206 0 : if (tpset.run_number != m_run_number) {
207 0 : TLOG_DEBUG(22) << "Discarding TPSet with invalid run number " << tpset.run_number << " (current is "
208 0 : << m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno;
209 0 : continue;
210 0 : }
211 0 : ++m_tpsets_with_tps;
212 :
213 0 : size_t num_tps_in_tpset = tpset.objects.size();
214 0 : tp_bundle_handler.add_tpset(std::move(tpset));
215 0 : m_tps_received += num_tps_in_tpset;
216 0 : m_total_tps_received += num_tps_in_tpset;
217 0 : possible_pending_data = true;
218 0 : } catch (iomanager::ConnectionInstanceNotFound&) {
219 : // sleep for a little bit; and indicate no pending data, in case we never get a connection
220 : // and the run ends - we don't want to believe that there is pending data in that case.
221 0 : usleep(1000 * m_queue_timeout.count());
222 0 : possible_pending_data = false;
223 0 : } catch (iomanager::TimeoutExpired&) {
224 : // nothing special to do here, we'll simply let the rest of the code in this
225 : // while loop do its job
226 0 : }
227 :
228 0 : std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
229 0 : if (running_flag.load()) {
230 0 : list_of_timeslices = tp_bundle_handler.get_properly_aged_timeslices();
231 : } else {
232 0 : list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices();
233 0 : possible_pending_data = false;
234 : }
235 :
236 : // keep track of the largest timeslice number (for reporting on tardy ones)
237 0 : for (auto& timeslice_ptr : list_of_timeslices) {
238 0 : largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number);
239 : }
240 :
241 : // attempt to write out each TimeSlice
242 0 : for (auto& timeslice_ptr : list_of_timeslices) {
243 0 : daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id);
244 0 : timeslice_ptr->set_element_id(sid);
245 :
246 : // write the TSH and the fragments as a set of data blocks
247 0 : bool should_retry = true;
248 0 : size_t retry_wait_usec = 1000;
249 0 : do {
250 0 : should_retry = false;
251 0 : size_t number_of_tps = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive));
252 0 : try {
253 0 : m_data_writer->write(*timeslice_ptr);
254 0 : ++m_timeslices_written;
255 0 : m_bytes_output += timeslice_ptr->get_total_size_bytes();
256 0 : m_tps_written += number_of_tps;
257 0 : m_total_tps_written += number_of_tps;
258 0 : } catch (const RetryableDataStoreProblem& excpt) {
259 0 : should_retry = true;
260 0 : ers::error(DataWritingProblem(ERS_HERE,
261 0 : get_name(),
262 0 : timeslice_ptr->get_header().timeslice_number,
263 0 : timeslice_ptr->get_header().run_number,
264 : excpt));
265 0 : usleep(retry_wait_usec);
266 0 : retry_wait_usec = std::min(retry_wait_usec * 2, 1000000UL);
267 0 : } catch (const IgnorableDataStoreProblem& excpt) {
268 0 : int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number;
269 0 : double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff;
270 0 : m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late);
271 0 : m_tps_discarded += number_of_tps;
272 0 : m_total_tps_discarded += number_of_tps;
273 0 : if (m_warn_user_when_tardy_tps_are_discarded) {
274 0 : std::ostringstream sid_list;
275 0 : bool first_frag = true;
276 0 : for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) {
277 0 : if (first_frag) {first_frag = false;}
278 0 : else {sid_list << ",";}
279 0 : sid_list << frag_ptr->get_element_id().to_string();
280 : }
281 0 : ers::warning(TardyTPsDiscarded(ERS_HERE,
282 0 : get_name(),
283 0 : sid_list.str(),
284 0 : timeslice_ptr->get_header().timeslice_number,
285 : seconds_too_late));
286 0 : }
287 0 : } catch (const std::exception& excpt) {
288 0 : m_tps_discarded += number_of_tps;
289 0 : m_total_tps_discarded += number_of_tps;
290 0 : ers::error(DataWritingProblem(ERS_HERE,
291 0 : get_name(),
292 0 : timeslice_ptr->get_header().timeslice_number,
293 0 : timeslice_ptr->get_header().run_number,
294 : excpt));
295 0 : }
296 0 : } while (should_retry && running_flag.load());
297 : }
298 :
299 0 : if (first_timestamp == 0) {
300 0 : first_timestamp = tpset.start_time;
301 : }
302 0 : last_timestamp = tpset.start_time;
303 0 : } // while(running)
304 :
305 0 : auto end_time = steady_clock::now();
306 0 : auto time_ms = duration_cast<milliseconds>(end_time - start_time).count();
307 0 : float rate_hz = 1e3 * static_cast<float>(n_tpset_received) / time_ms;
308 0 : float inferred_clock_frequency = 1e3 * (last_timestamp - first_timestamp) / time_ms;
309 :
310 0 : TLOG() << "Received " << n_tpset_received << " TPSets in " << time_ms << "ms. " << rate_hz
311 0 : << " TPSet/s. Inferred clock frequency " << inferred_clock_frequency << "Hz";
312 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
313 0 : } // NOLINT Function length
314 :
315 : } // namespace dfmodules
316 : } // namespace dunedaq
317 :
318 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::TPStreamWriterModule)
|