Line data Source code
1 : #include "trigger/TPRequestHandler.hpp"
2 : #include "appmodel/DataHandlerConf.hpp"
3 : #include "appmodel/RequestHandler.hpp"
4 :
5 : #include "rcif/cmd/Nljs.hpp"
6 :
7 : namespace dunedaq {
8 : namespace trigger {
9 :
10 : void
11 0 : TPRequestHandler::conf(const appmodel::DataHandlerModule* conf) {
12 :
13 0 : for (auto output : conf->get_outputs()) {
14 0 : if (output->get_data_type() == "TPSet") {
15 0 : try {
16 0 : m_tpset_sink = iomanager::IOManager::get()->get_sender<dunedaq::trigger::TPSet>(output->UID());
17 0 : } catch (const ers::Issue& excpt) {
18 0 : throw datahandlinglibs::ResourceQueueError(ERS_HERE, "tp queue", "DefaultRequestHandlerModel", excpt);
19 0 : }
20 : }
21 : }
22 0 : inherited2::conf(conf);
23 0 : }
24 :
25 : void
26 0 : TPRequestHandler::scrap(const appfwk::DAQModule::CommandData_t& args)
27 : {
28 0 : m_tpset_sink.reset();
29 0 : inherited2::scrap(args);
30 0 : }
31 :
32 : void
33 0 : TPRequestHandler::start(const appfwk::DAQModule::CommandData_t& args)
34 : {
35 :
36 0 : m_oldest_ts=0;
37 0 : m_newest_ts=0;
38 0 : m_start_win_ts=0;
39 0 : m_end_win_ts=0;
40 0 : m_first_cycle = true;
41 :
42 0 : inherited2::start(args);
43 0 : rcif::cmd::StartParams start_params = args.get<rcif::cmd::StartParams>();
44 0 : m_run_number = start_params.run;
45 :
46 0 : }
47 :
48 : void
49 0 : TPRequestHandler::periodic_data_transmission() {
50 :
51 0 : if (m_tpset_sink == nullptr) return;
52 :
53 0 : dunedaq::dfmessages::DataRequest dr;
54 :
55 0 : {
56 0 : std::unique_lock<std::mutex> lock(m_cv_mutex);
57 0 : m_cv.wait(lock, [&] { return !m_cleanup_requested; });
58 0 : m_requests_running++;
59 0 : }
60 0 : m_cv.notify_all();
61 0 : if(m_latency_buffer->occupancy() != 0) {
62 : // Prepare response
63 0 : RequestResult rres(ResultCode::kUnknown, dr);
64 0 : std::vector<std::pair<void*, size_t>> frag_pieces;
65 :
66 : // Get the newest TP
67 0 : SkipListAcc acc(inherited2::m_latency_buffer->get_skip_list());
68 0 : auto tail = acc.last();
69 0 : auto head = acc.first();
70 0 : m_newest_ts = (*tail).get_timestamp();
71 0 : m_oldest_ts = (*head).get_timestamp();
72 :
73 0 : if (m_first_cycle) {
74 0 : m_start_win_ts = m_oldest_ts;
75 0 : m_first_cycle = false;
76 : }
77 0 : if (m_newest_ts - m_start_win_ts > m_ts_set_sender_offset_ticks) {
78 0 : m_end_win_ts = m_newest_ts - m_ts_set_sender_offset_ticks;
79 0 : frag_pieces = get_fragment_pieces(m_start_win_ts, m_end_win_ts, rres);
80 0 : auto num_tps = frag_pieces.size();
81 0 : trigger::TPSet tpset;
82 0 : tpset.run_number = m_run_number;
83 0 : tpset.type = num_tps>0 ? trigger::TPSet::Type::kPayload : trigger::TPSet::Type::kHeartbeat;
84 0 : tpset.origin = m_sourceid;
85 0 : tpset.start_time = m_start_win_ts; // provisory timestamp, will be filled with first TP
86 0 : tpset.end_time = m_end_win_ts; // provisory timestamp, will be filled with last TP
87 0 : tpset.seqno = m_next_tpset_seqno++; // NOLINT(runtime/increment_decrement)
88 : // reserve the space for efficiency
89 0 : if (num_tps > 0) {
90 0 : tpset.objects.reserve(frag_pieces.size());
91 0 : bool first_tp = true;
92 0 : for( auto f : frag_pieces) {
93 0 : trgdataformats::TriggerPrimitive tp = *(static_cast<trgdataformats::TriggerPrimitive*>(f.first));
94 :
95 0 : if(first_tp) {
96 0 : tpset.start_time = tp.time_start;
97 0 : first_tp = false;
98 : }
99 0 : tpset.end_time = tp.time_start;
100 0 : tpset.objects.emplace_back(std::move(tp));
101 : }
102 : }
103 0 : if(!m_tpset_sink->try_send(std::move(tpset), iomanager::Sender::s_no_block)) {
104 0 : ers::warning(DroppedTPSet(ERS_HERE, m_start_win_ts, m_end_win_ts));
105 0 : m_num_periodic_send_failed++;
106 : }
107 0 : m_num_periodic_sent++;
108 :
109 : //remember what we sent for the next loop
110 0 : m_start_win_ts = m_end_win_ts;
111 0 : }
112 0 : }
113 0 : {
114 0 : std::lock_guard<std::mutex> lock(m_cv_mutex);
115 0 : m_requests_running--;
116 0 : }
117 0 : m_cv.notify_all();
118 0 : return;
119 0 : }
120 :
121 : } // namespace fdreadoutlibs
122 : } // namespace dunedaq
|