Line data Source code
1 : /**
2 : * @file TPBundleHandler.cpp TPBundleHandler Class Implementation
3 : *
4 : * The TPBundleHandler class takes care of assembling and repacking TriggerPrimitives
5 : * for storage on disk as part of a TP stream.
6 : *
7 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #include "dfmodules/TPBundleHandler.hpp"
13 :
14 : #include "detdataformats/DetID.hpp"
15 : #include "logging/Logging.hpp"
16 :
17 : #include <memory>
18 : #include <string>
19 : #include <utility>
20 : #include <vector>
21 :
22 : namespace dunedaq {
23 : namespace dfmodules {
24 :
25 : void
26 0 : TimeSliceAccumulator::add_tpset(trigger::TPSet&& tpset)
27 : {
28 : // if this TPSet is near one of the edges of our window, handle it specially
29 0 : if (tpset.start_time <= m_begin_time || tpset.end_time >= m_end_time) {
30 0 : trigger::TPSet working_tpset;
31 0 : daqdataformats::timestamp_t first_time = daqdataformats::TypeDefaults::s_invalid_timestamp;
32 0 : daqdataformats::timestamp_t last_time = daqdataformats::TypeDefaults::s_invalid_timestamp;
33 0 : bool first = true;
34 0 : for (auto& trigprim : tpset.objects) {
35 0 : if (trigprim.time_start >= m_begin_time && trigprim.time_start < m_end_time) {
36 0 : working_tpset.objects.push_back(trigprim);
37 0 : if (first) {
38 0 : first_time = trigprim.time_start;
39 0 : first = false;
40 : }
41 0 : last_time = trigprim.time_start;
42 : }
43 : }
44 0 : if (working_tpset.objects.size() == 0) {
45 0 : if (tpset.end_time == m_begin_time) {
46 : // the end of the TPSet just missed the start of our window, so not a big deal
47 0 : TLOG_DEBUG(22) << "Note: no TPs were used from a TPSet with start_time=" << tpset.start_time
48 0 : << ", end_time=" << tpset.end_time << ", TSAccumulator begin and end times:" << m_begin_time
49 0 : << ", " << m_end_time;
50 : } else {
51 : // woah, something unexpected happened
52 0 : ers::warning(NoTPsInWindow(ERS_HERE, tpset.start_time, tpset.end_time, m_begin_time, m_end_time));
53 : }
54 0 : return;
55 : }
56 0 : working_tpset.type = tpset.type;
57 0 : working_tpset.seqno = tpset.seqno;
58 0 : working_tpset.origin = tpset.origin;
59 0 : working_tpset.start_time = first_time;
60 0 : working_tpset.end_time = last_time;
61 0 : tpset = std::move(working_tpset);
62 0 : }
63 :
64 : // create an entry in the top-level map for the sourceid in this TPSet, if needed
65 0 : auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
66 0 : if (m_tpbundles_by_sourceid_and_start_time.count(tpset.origin) == 0) {
67 0 : tpbundles_by_start_time_t empty_bundle_map;
68 0 : m_tpbundles_by_sourceid_and_start_time[tpset.origin] = empty_bundle_map;
69 0 : }
70 :
71 : // store the TPSet in the map
72 0 : if (m_tpbundles_by_sourceid_and_start_time[tpset.origin].count(tpset.start_time)) {
73 0 : ers::warning(DuplicateTPWindow(ERS_HERE, tpset.origin.id, tpset.start_time));
74 : }
75 0 : m_tpbundles_by_sourceid_and_start_time[tpset.origin].emplace(tpset.start_time, std::move(tpset));
76 0 : m_update_time = std::chrono::steady_clock::now();
77 0 : }
78 :
79 : std::unique_ptr<daqdataformats::TimeSlice>
80 0 : TimeSliceAccumulator::get_timeslice()
81 : {
82 0 : auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
83 0 : std::vector<std::unique_ptr<daqdataformats::Fragment>> list_of_fragments;
84 :
85 : // loop over all SourceID present in this accumulator
86 0 : for (auto& [sourceid, bundle_map] : m_tpbundles_by_sourceid_and_start_time) {
87 :
88 : // build up the list of pieces that we will use to contruct the Fragment
89 0 : std::vector<std::pair<void*, size_t>> list_of_pieces;
90 0 : for (auto& [start_time, tpset] : bundle_map) {
91 0 : list_of_pieces.push_back(std::make_pair<void*, size_t>(
92 0 : &tpset.objects[0], tpset.objects.size() * sizeof(trgdataformats::TriggerPrimitive)));
93 : }
94 0 : std::unique_ptr<daqdataformats::Fragment> frag(new daqdataformats::Fragment(list_of_pieces));
95 :
96 0 : frag->set_run_number(m_run_number);
97 0 : frag->set_trigger_number(m_slice_number);
98 0 : frag->set_window_begin(m_begin_time);
99 0 : frag->set_window_end(m_end_time);
100 0 : frag->set_element_id(sourceid);
101 0 : frag->set_detector_id(static_cast<uint16_t>(detdataformats::DetID::Subdetector::kDAQ));
102 0 : frag->set_type(daqdataformats::FragmentType::kTriggerPrimitive);
103 :
104 0 : size_t frag_payload_size = frag->get_size() - sizeof(dunedaq::daqdataformats::FragmentHeader);
105 0 : TLOG_DEBUG(21) << "In get_timeslice, Source ID is " << sourceid << ", number of pieces is " << list_of_pieces.size()
106 0 : << ", size of Fragment payload is " << frag_payload_size << ", size of TP is "
107 0 : << sizeof(trgdataformats::TriggerPrimitive);
108 :
109 0 : list_of_fragments.push_back(std::move(frag));
110 0 : }
111 :
112 0 : std::unique_ptr<daqdataformats::TimeSlice> time_slice(new daqdataformats::TimeSlice(m_slice_number, m_run_number));
113 0 : time_slice->set_fragments(std::move(list_of_fragments));
114 0 : return time_slice;
115 0 : }
116 :
117 : void
118 0 : TPBundleHandler::add_tpset(trigger::TPSet&& tpset)
119 : {
120 : // if the tpset seems to span multiple timeslices, then we add it to all
121 : // of the relevant accumulators. This runs the risk of adding a tpset
122 : // to an accumululator that won't find any TPs within the
123 : // accumulator window (because of edge effects), but we want to be
124 : // cautious here (and we'll protect against the absence of tpsets later).
125 : // Of course, adding the tpset to multiple accumulators requires copies...
126 0 : size_t tsidx_from_begin_time = tpset.start_time / m_slice_interval;
127 0 : size_t tsidx_from_end_time = tpset.end_time / m_slice_interval;
128 0 : if (m_slice_index_offset == 0) {
129 0 : m_slice_index_offset = tsidx_from_begin_time - 1;
130 : }
131 :
132 : // 24-Mar-2024, KAB: added check for TimeSlice indexes that are earlier
133 : // than the one that we started with. We try to gracefully handle them
134 : // by adjusting the slice_ids of existing slices, but if we can't do that,
135 : // we discard them so that we don't get
136 : // TimeSlices with large timeslice_ids (e.g. -1 converted to a uint64_t).
137 0 : if (tsidx_from_begin_time <= m_slice_index_offset) {
138 0 : auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
139 0 : int64_t diff = static_cast<int64_t>(tsidx_from_begin_time) - static_cast<int64_t>(m_slice_index_offset);
140 0 : if (! m_one_or_more_time_slices_have_aged_out) {
141 0 : TLOG() << "Updating the slice numbers of existing accumulators by " << (1-diff);
142 0 : for (auto& [local_tsidx, local_accum] : m_timeslice_accumulators) {
143 0 : local_accum.update_slice_number(1 - diff);
144 : }
145 0 : m_slice_index_offset -= (1 - diff);
146 : }
147 : else {
148 0 : ers::warning(TardyTPSetReceived(ERS_HERE, tpset.origin.id, tpset.start_time, diff));
149 0 : return;
150 : }
151 0 : }
152 :
153 : // add the TPSet to any 'extra' accumulators
154 0 : for (size_t tsidx = (tsidx_from_begin_time + 1); tsidx <= tsidx_from_end_time; ++tsidx) {
155 0 : {
156 0 : auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
157 0 : if (m_timeslice_accumulators.count(tsidx) == 0) {
158 0 : TimeSliceAccumulator accum(tsidx * m_slice_interval,
159 0 : (tsidx + 1) * m_slice_interval,
160 0 : tsidx - m_slice_index_offset,
161 0 : m_run_number);
162 0 : m_timeslice_accumulators[tsidx] = accum;
163 0 : }
164 0 : }
165 0 : trigger::TPSet tpset_copy = tpset;
166 0 : m_timeslice_accumulators[tsidx].add_tpset(std::move(tpset_copy));
167 0 : }
168 :
169 : // add the TPSet to the accumulator associated with the begin time
170 0 : {
171 0 : auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
172 0 : if (m_timeslice_accumulators.count(tsidx_from_begin_time) == 0) {
173 0 : TimeSliceAccumulator accum(tsidx_from_begin_time * m_slice_interval,
174 0 : (tsidx_from_begin_time + 1) * m_slice_interval,
175 0 : tsidx_from_begin_time - m_slice_index_offset,
176 0 : m_run_number);
177 0 : m_timeslice_accumulators[tsidx_from_begin_time] = accum;
178 0 : }
179 0 : }
180 0 : m_timeslice_accumulators[tsidx_from_begin_time].add_tpset(std::move(tpset));
181 : }
182 :
183 : std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
184 0 : TPBundleHandler::get_properly_aged_timeslices()
185 : {
186 0 : std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
187 0 : std::vector<daqdataformats::timestamp_t> elements_to_be_removed;
188 :
189 0 : auto now = std::chrono::steady_clock::now();
190 0 : for (auto& [tsidx, accum] : m_timeslice_accumulators) {
191 0 : if ((now - accum.get_update_time()) >= m_cooling_off_time) {
192 0 : list_of_timeslices.push_back(accum.get_timeslice());
193 0 : elements_to_be_removed.push_back(tsidx);
194 : }
195 : }
196 :
197 0 : auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
198 0 : for (auto& tsidx : elements_to_be_removed) {
199 0 : m_timeslice_accumulators.erase(tsidx);
200 : }
201 :
202 0 : if (list_of_timeslices.size() > 0) {m_one_or_more_time_slices_have_aged_out = true;}
203 0 : return list_of_timeslices;
204 0 : }
205 :
206 : std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
207 0 : TPBundleHandler::get_all_remaining_timeslices()
208 : {
209 0 : std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
210 :
211 0 : for (auto& [tsidx, accum] : m_timeslice_accumulators) {
212 0 : list_of_timeslices.push_back(accum.get_timeslice());
213 : }
214 :
215 0 : {
216 0 : auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
217 0 : m_timeslice_accumulators.clear();
218 0 : }
219 :
220 0 : return list_of_timeslices;
221 0 : }
222 :
223 : } // namespace dfmodules
224 : } // namespace dunedaq
|