DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TPBundleHandler.cpp
Go to the documentation of this file.
1
13
15#include "logging/Logging.hpp"
16
17#include <memory>
18#include <string>
19#include <utility>
20#include <vector>
21
22namespace dunedaq {
23namespace dfmodules {
24
25void
26TimeSliceAccumulator::add_tpset(trigger::TPSet&& tpset)
27{
28 // if this TPSet is near one of the edges of our window, handle it specially
29 if (tpset.start_time <= m_begin_time || tpset.end_time >= m_end_time) {
30 trigger::TPSet working_tpset;
33 bool first = true;
34 for (auto& trigprim : tpset.objects) {
35 if (trigprim.time_start >= m_begin_time && trigprim.time_start < m_end_time) {
36 working_tpset.objects.push_back(trigprim);
37 if (first) {
38 first_time = trigprim.time_start;
39 first = false;
40 }
41 last_time = trigprim.time_start;
42 }
43 }
44 if (working_tpset.objects.size() == 0) {
45 if (tpset.end_time == m_begin_time) {
46 // the end of the TPSet just missed the start of our window, so not a big deal
47 TLOG_DEBUG(22) << "Note: no TPs were used from a TPSet with start_time=" << tpset.start_time
48 << ", end_time=" << tpset.end_time << ", TSAccumulator begin and end times:" << m_begin_time
49 << ", " << m_end_time;
50 } else {
51 // woah, something unexpected happened
52 ers::warning(NoTPsInWindow(ERS_HERE, tpset.start_time, tpset.end_time, m_begin_time, m_end_time));
53 }
54 return;
55 }
56 working_tpset.type = tpset.type;
57 working_tpset.seqno = tpset.seqno;
58 working_tpset.origin = tpset.origin;
59 working_tpset.start_time = first_time;
60 working_tpset.end_time = last_time;
61 tpset = std::move(working_tpset);
62 }
63
64 // create an entry in the top-level map for the sourceid in this TPSet, if needed
65 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
66 if (m_tpbundles_by_sourceid_and_start_time.count(tpset.origin) == 0) {
67 tpbundles_by_start_time_t empty_bundle_map;
68 m_tpbundles_by_sourceid_and_start_time[tpset.origin] = empty_bundle_map;
69 }
70
71 // store the TPSet in the map
72 if (m_tpbundles_by_sourceid_and_start_time[tpset.origin].count(tpset.start_time)) {
73 ers::warning(DuplicateTPWindow(ERS_HERE, tpset.origin.id, tpset.start_time));
74 }
75 m_tpbundles_by_sourceid_and_start_time[tpset.origin].emplace(tpset.start_time, std::move(tpset));
76 m_update_time = std::chrono::steady_clock::now();
77}
78
79std::unique_ptr<daqdataformats::TimeSlice>
80TimeSliceAccumulator::get_timeslice()
81{
82 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
83 std::vector<std::unique_ptr<daqdataformats::Fragment>> list_of_fragments;
84
85 // loop over all SourceID present in this accumulator
86 for (auto& [sourceid, bundle_map] : m_tpbundles_by_sourceid_and_start_time) {
87
88 // build up the list of pieces that we will use to contruct the Fragment
89 std::vector<std::pair<void*, size_t>> list_of_pieces;
90 for (auto& [start_time, tpset] : bundle_map) {
91 list_of_pieces.push_back(std::make_pair<void*, size_t>(
92 &tpset.objects[0], tpset.objects.size() * sizeof(trgdataformats::TriggerPrimitive)));
93 }
94 std::unique_ptr<daqdataformats::Fragment> frag(new daqdataformats::Fragment(list_of_pieces));
95
96 frag->set_run_number(m_run_number);
97 frag->set_trigger_number(m_slice_number);
98 frag->set_window_begin(m_begin_time);
99 frag->set_window_end(m_end_time);
100 frag->set_element_id(sourceid);
101 frag->set_detector_id(static_cast<uint16_t>(detdataformats::DetID::Subdetector::kDAQ));
103
104 size_t frag_payload_size = frag->get_size() - sizeof(dunedaq::daqdataformats::FragmentHeader);
105 TLOG_DEBUG(21) << "In get_timeslice, Source ID is " << sourceid << ", number of pieces is " << list_of_pieces.size()
106 << ", size of Fragment payload is " << frag_payload_size << ", size of TP is "
107 << sizeof(trgdataformats::TriggerPrimitive);
108
109 list_of_fragments.push_back(std::move(frag));
110 }
111
112 std::unique_ptr<daqdataformats::TimeSlice> time_slice(new daqdataformats::TimeSlice(m_slice_number, m_run_number));
113 time_slice->set_fragments(std::move(list_of_fragments));
114 return time_slice;
115}
116
117void
118TPBundleHandler::add_tpset(trigger::TPSet&& tpset)
119{
120 // if the tpset seems to span multiple timeslices, then we add it to all
121 // of the relevant accumulators. This runs the risk of adding a tpset
122 // to an accumululator that won't find any TPs within the
123 // accumulator window (because of edge effects), but we want to be
124 // cautious here (and we'll protect against the absence of tpsets later).
125 // Of course, adding the tpset to multiple accumulators requires copies...
126 size_t tsidx_from_begin_time = tpset.start_time / m_slice_interval;
127 size_t tsidx_from_end_time = tpset.end_time / m_slice_interval;
128 if (m_slice_index_offset == 0) {
129 m_slice_index_offset = tsidx_from_begin_time - 1;
130 }
131
132 // 24-Mar-2024, KAB: added check for TimeSlice indexes that are earlier
133 // than the one that we started with. We try to gracefully handle them
134 // by adjusting the slice_ids of existing slices, but if we can't do that,
135 // we discard them so that we don't get
136 // TimeSlices with large timeslice_ids (e.g. -1 converted to a uint64_t).
137 if (tsidx_from_begin_time <= m_slice_index_offset) {
138 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
139 int64_t diff = static_cast<int64_t>(tsidx_from_begin_time) - static_cast<int64_t>(m_slice_index_offset);
140 if (! m_one_or_more_time_slices_have_aged_out) {
141 TLOG() << "Updating the slice numbers of existing accumulators by " << (1-diff);
142 for (auto& [local_tsidx, local_accum] : m_timeslice_accumulators) {
143 local_accum.update_slice_number(1 - diff);
144 }
145 m_slice_index_offset -= (1 - diff);
146 }
147 else {
148 ers::warning(TardyTPSetReceived(ERS_HERE, tpset.origin.id, tpset.start_time, diff));
149 return;
150 }
151 }
152
153 // add the TPSet to any 'extra' accumulators
154 for (size_t tsidx = (tsidx_from_begin_time + 1); tsidx <= tsidx_from_end_time; ++tsidx) {
155 {
156 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
157 if (m_timeslice_accumulators.count(tsidx) == 0) {
158 TimeSliceAccumulator accum(tsidx * m_slice_interval,
159 (tsidx + 1) * m_slice_interval,
160 tsidx - m_slice_index_offset,
161 m_run_number);
162 m_timeslice_accumulators[tsidx] = accum;
163 }
164 }
165 trigger::TPSet tpset_copy = tpset;
166 m_timeslice_accumulators[tsidx].add_tpset(std::move(tpset_copy));
167 }
168
169 // add the TPSet to the accumulator associated with the begin time
170 {
171 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
172 if (m_timeslice_accumulators.count(tsidx_from_begin_time) == 0) {
173 TimeSliceAccumulator accum(tsidx_from_begin_time * m_slice_interval,
174 (tsidx_from_begin_time + 1) * m_slice_interval,
175 tsidx_from_begin_time - m_slice_index_offset,
176 m_run_number);
177 m_timeslice_accumulators[tsidx_from_begin_time] = accum;
178 }
179 }
180 m_timeslice_accumulators[tsidx_from_begin_time].add_tpset(std::move(tpset));
181}
182
183std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
184TPBundleHandler::get_properly_aged_timeslices()
185{
186 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
187 std::vector<daqdataformats::timestamp_t> elements_to_be_removed;
188
189 auto now = std::chrono::steady_clock::now();
190 for (auto& [tsidx, accum] : m_timeslice_accumulators) {
191 if ((now - accum.get_update_time()) >= m_cooling_off_time) {
192 list_of_timeslices.push_back(accum.get_timeslice());
193 elements_to_be_removed.push_back(tsidx);
194 }
195 }
196
197 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
198 for (auto& tsidx : elements_to_be_removed) {
199 m_timeslice_accumulators.erase(tsidx);
200 }
201
202 if (list_of_timeslices.size() > 0) {m_one_or_more_time_slices_have_aged_out = true;}
203 return list_of_timeslices;
204}
205
206std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
207TPBundleHandler::get_all_remaining_timeslices()
208{
209 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
210
211 for (auto& [tsidx, accum] : m_timeslice_accumulators) {
212 list_of_timeslices.push_back(accum.get_timeslice());
213 }
214
215 {
216 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
217 m_timeslice_accumulators.clear();
218 }
219
220 return list_of_timeslices;
221}
222
223} // namespace dfmodules
224} // namespace dunedaq
#define ERS_HERE
static constexpr timestamp_t s_invalid_timestamp
An invalid timestamp.
Definition Types.hpp:65
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
@ kTriggerPrimitive
Trigger format TPs produced by trigger code.
uint64_t timestamp_t
Type used to represent DUNE timing system timestamps.
Definition Types.hpp:36
Set< trgdataformats::TriggerPrimitive > TPSet
Definition TPSet.hpp:20
Including Qt Headers.
Cannot add TPSet with start_time
Cannot add TPSet with sourceid
Cannot add TPSet with because another TPSet with these values already size_t TardyTPSetReceived
void warning(const Issue &issue)
Definition ers.hpp:115
The header for a DUNE Fragment.