29 if (tpset.start_time <= m_begin_time || tpset.end_time >= m_end_time) {
34 for (
auto& trigprim : tpset.objects) {
35 if (trigprim.time_start >= m_begin_time && trigprim.time_start < m_end_time) {
36 working_tpset.objects.push_back(trigprim);
38 first_time = trigprim.time_start;
41 last_time = trigprim.time_start;
44 if (working_tpset.objects.size() == 0) {
45 if (tpset.end_time == m_begin_time) {
47 TLOG_DEBUG(22) <<
"Note: no TPs were used from a TPSet with start_time=" << tpset.start_time
48 <<
", end_time=" << tpset.end_time <<
", TSAccumulator begin and end times:" << m_begin_time
49 <<
", " << m_end_time;
52 ers::warning(NoTPsInWindow(
ERS_HERE, tpset.start_time, tpset.end_time, m_begin_time, m_end_time));
56 working_tpset.type = tpset.type;
57 working_tpset.seqno = tpset.seqno;
58 working_tpset.origin = tpset.origin;
59 working_tpset.start_time = first_time;
60 working_tpset.end_time = last_time;
61 tpset = std::move(working_tpset);
65 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
66 if (m_tpbundles_by_sourceid_and_start_time.count(tpset.origin) == 0) {
67 tpbundles_by_start_time_t empty_bundle_map;
68 m_tpbundles_by_sourceid_and_start_time[tpset.origin] = empty_bundle_map;
72 if (m_tpbundles_by_sourceid_and_start_time[tpset.origin].count(tpset.start_time)) {
75 m_tpbundles_by_sourceid_and_start_time[tpset.origin].emplace(tpset.start_time, std::move(tpset));
76 m_update_time = std::chrono::steady_clock::now();
79std::unique_ptr<daqdataformats::TimeSlice>
80TimeSliceAccumulator::get_timeslice()
82 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
83 std::vector<std::unique_ptr<daqdataformats::Fragment>> list_of_fragments;
86 for (
auto& [
sourceid, bundle_map] : m_tpbundles_by_sourceid_and_start_time) {
89 std::vector<std::pair<void*, size_t>> list_of_pieces;
91 list_of_pieces.push_back(std::make_pair<void*, size_t>(
92 &tpset.objects[0], tpset.objects.size() *
sizeof(trgdataformats::TriggerPrimitive)));
94 std::unique_ptr<daqdataformats::Fragment> frag(
new daqdataformats::Fragment(list_of_pieces));
96 frag->set_run_number(m_run_number);
97 frag->set_trigger_number(m_slice_number);
98 frag->set_window_begin(m_begin_time);
99 frag->set_window_end(m_end_time);
105 TLOG_DEBUG(21) <<
"In get_timeslice, Source ID is " <<
sourceid <<
", number of pieces is " << list_of_pieces.size()
106 <<
", size of Fragment payload is " << frag_payload_size <<
", size of TP is "
107 <<
sizeof(trgdataformats::TriggerPrimitive);
109 list_of_fragments.push_back(std::move(frag));
112 std::unique_ptr<daqdataformats::TimeSlice> time_slice(
new daqdataformats::TimeSlice(m_slice_number, m_run_number));
113 time_slice->set_fragments(std::move(list_of_fragments));
126 size_t tsidx_from_begin_time = tpset.start_time / m_slice_interval;
127 size_t tsidx_from_end_time = tpset.end_time / m_slice_interval;
128 if (m_slice_index_offset == 0) {
129 m_slice_index_offset = tsidx_from_begin_time - 1;
137 if (tsidx_from_begin_time <= m_slice_index_offset) {
138 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
139 int64_t diff =
static_cast<int64_t
>(tsidx_from_begin_time) -
static_cast<int64_t
>(m_slice_index_offset);
140 if (! m_one_or_more_time_slices_have_aged_out) {
141 TLOG() <<
"Updating the slice numbers of existing accumulators by " << (1-diff);
142 for (
auto& [local_tsidx, local_accum] : m_timeslice_accumulators) {
143 local_accum.update_slice_number(1 - diff);
145 m_slice_index_offset -= (1 - diff);
154 for (
size_t tsidx = (tsidx_from_begin_time + 1); tsidx <= tsidx_from_end_time; ++tsidx) {
156 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
157 if (m_timeslice_accumulators.count(tsidx) == 0) {
158 TimeSliceAccumulator accum(tsidx * m_slice_interval,
159 (tsidx + 1) * m_slice_interval,
160 tsidx - m_slice_index_offset,
162 m_timeslice_accumulators[tsidx] = accum;
166 m_timeslice_accumulators[tsidx].add_tpset(std::move(tpset_copy));
171 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
172 if (m_timeslice_accumulators.count(tsidx_from_begin_time) == 0) {
173 TimeSliceAccumulator accum(tsidx_from_begin_time * m_slice_interval,
174 (tsidx_from_begin_time + 1) * m_slice_interval,
175 tsidx_from_begin_time - m_slice_index_offset,
177 m_timeslice_accumulators[tsidx_from_begin_time] = accum;
180 m_timeslice_accumulators[tsidx_from_begin_time].add_tpset(std::move(tpset));
183std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
184TPBundleHandler::get_properly_aged_timeslices()
186 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
187 std::vector<daqdataformats::timestamp_t> elements_to_be_removed;
189 auto now = std::chrono::steady_clock::now();
190 for (
auto& [tsidx, accum] : m_timeslice_accumulators) {
191 if ((
now - accum.get_update_time()) >= m_cooling_off_time) {
192 list_of_timeslices.push_back(accum.get_timeslice());
193 elements_to_be_removed.push_back(tsidx);
197 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
198 for (
auto& tsidx : elements_to_be_removed) {
199 m_timeslice_accumulators.erase(tsidx);
202 if (list_of_timeslices.size() > 0) {m_one_or_more_time_slices_have_aged_out =
true;}
203 return list_of_timeslices;
206std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
207TPBundleHandler::get_all_remaining_timeslices()
209 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
211 for (
auto& [tsidx, accum] : m_timeslice_accumulators) {
212 list_of_timeslices.push_back(accum.get_timeslice());
216 auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
217 m_timeslice_accumulators.clear();
220 return list_of_timeslices;
#define TLOG_DEBUG(lvl,...)
Set< trgdataformats::TriggerPrimitive > TPSet
Cannot add TPSet with start_time
Cannot add TPSet with sourceid
Cannot add TPSet with because another TPSet with these values already size_t TardyTPSetReceived
void warning(const Issue &issue)