12#ifndef DFMODULES_SRC_DFMODULES_TPBUNDLEHANDLER_HPP_
13#define DFMODULES_SRC_DFMODULES_TPBUNDLEHANDLER_HPP_
33 "No TriggerPrimitives were used from a TPSet with start_time="
34 << tpset_start_time <<
", end_time=" << tpset_end_time
35 <<
", TSAccumulator begin and end times:" << window_begin_time <<
", " << window_end_time,
40 "Cannot add TPSet with sourceid="
42 <<
" to bundle, because another TPSet with these values already exists",
46 "Received a TPSet with a timestamp that is too early compared to ones that have already "
47 <<
"been processed, sourceid=" <<
tpset_source_id <<
", start_time=" << tpset_start_time
48 <<
", the calculated timeslice_id is " << tsid,
54class TimeSliceAccumulator
57 TimeSliceAccumulator() {}
63 : m_begin_time(begin_time)
64 , m_end_time(end_time)
65 , m_slice_number(slice_number)
66 , m_run_number(run_number)
67 , m_update_time(std::chrono::steady_clock::now())
71 TimeSliceAccumulator& operator=(
const TimeSliceAccumulator& other)
74 std::lock(m_bundle_map_mutex, other.m_bundle_map_mutex);
75 std::lock_guard<std::mutex> lhs_lk(m_bundle_map_mutex, std::adopt_lock);
76 std::lock_guard<std::mutex> rhs_lk(other.m_bundle_map_mutex, std::adopt_lock);
77 m_begin_time = other.m_begin_time;
78 m_end_time = other.m_end_time;
79 m_slice_number = other.m_slice_number;
80 m_run_number = other.m_run_number;
81 m_update_time = other.m_update_time;
82 m_tpbundles_by_sourceid_and_start_time = other.m_tpbundles_by_sourceid_and_start_time;
89 std::unique_ptr<daqdataformats::TimeSlice> get_timeslice();
91 std::chrono::steady_clock::time_point get_update_time()
const
93 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
97 void update_slice_number(
int delta)
99 m_slice_number += delta;
107 std::chrono::steady_clock::time_point m_update_time;
108 typedef std::map<daqdataformats::timestamp_t, trigger::TPSet> tpbundles_by_start_time_t;
109 typedef std::map<daqdataformats::SourceID, tpbundles_by_start_time_t> bundles_by_sourceid_t;
110 bundles_by_sourceid_t m_tpbundles_by_sourceid_and_start_time;
111 mutable std::mutex m_bundle_map_mutex;
119 std::chrono::steady_clock::duration cooling_off_time)
120 : m_slice_interval(slice_interval)
121 , m_run_number(run_number)
122 , m_cooling_off_time(cooling_off_time)
123 , m_slice_index_offset(0)
124 , m_one_or_more_time_slices_have_aged_out(
false)
128 TPBundleHandler(TPBundleHandler
const&) =
delete;
129 TPBundleHandler(TPBundleHandler&&) =
delete;
130 TPBundleHandler& operator=(TPBundleHandler
const&) =
delete;
131 TPBundleHandler& operator=(TPBundleHandler&&) =
delete;
135 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> get_properly_aged_timeslices();
137 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> get_all_remaining_timeslices();
142 std::chrono::steady_clock::duration m_cooling_off_time;
143 size_t m_slice_index_offset;
144 std::map<daqdataformats::timestamp_t, TimeSliceAccumulator> m_timeslice_accumulators;
145 mutable std::mutex m_accumulator_map_mutex;
146 bool m_one_or_more_time_slices_have_aged_out;
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
A set of TPs or TAs in a given time window, defined by its start and end times.
Cannot add TPSet with because another TPSet with these values already size_t tpset_source_id((daqdataformats::timestamp_t) tpset_start_time)) ERS_DECLARE_ISSUE(dfmodules
Cannot add TPSet with because another TPSet with these values already size_t TardyTPSetReceived