DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TPBundleHandler.hpp
Go to the documentation of this file.
1
12#ifndef DFMODULES_SRC_DFMODULES_TPBUNDLEHANDLER_HPP_
13#define DFMODULES_SRC_DFMODULES_TPBUNDLEHANDLER_HPP_
14
18#include "ers/Issue.hpp"
19#include "trigger/TPSet.hpp"
20#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
21
22#include <chrono>
23#include <map>
24#include <memory>
25#include <mutex>
26#include <vector>
27
28namespace dunedaq {
29
30// Disable coverage checking LCOV_EXCL_START
32 NoTPsInWindow,
33 "No TriggerPrimitives were used from a TPSet with start_time="
34 << tpset_start_time << ", end_time=" << tpset_end_time
35 << ", TSAccumulator begin and end times:" << window_begin_time << ", " << window_end_time,
36 ((daqdataformats::timestamp_t)tpset_start_time)((daqdataformats::timestamp_t)tpset_end_time)(
37 (daqdataformats::timestamp_t)window_begin_time)((daqdataformats::timestamp_t)window_end_time))
38ERS_DECLARE_ISSUE(dfmodules,
40 "Cannot add TPSet with sourceid="
41 << tpset_source_id << ", start_time=" << tpset_start_time
42 << " to bundle, because another TPSet with these values already exists",
43 ((size_t)tpset_source_id)((daqdataformats::timestamp_t)tpset_start_time))
44ERS_DECLARE_ISSUE(dfmodules,
46 "Received a TPSet with a timestamp that is too early compared to ones that have already "
47 << "been processed, sourceid=" << tpset_source_id << ", start_time=" << tpset_start_time
48 << ", the calculated timeslice_id is " << tsid,
49 ((size_t)tpset_source_id)((daqdataformats::timestamp_t)tpset_start_time)((int64_t)tsid))
50// Re-enable coverage checking LCOV_EXCL_STOP
51
52namespace dfmodules {
53
54class TimeSliceAccumulator
55{
56public:
57 TimeSliceAccumulator() {}
58
59 TimeSliceAccumulator(daqdataformats::timestamp_t begin_time,
63 : m_begin_time(begin_time)
64 , m_end_time(end_time)
65 , m_slice_number(slice_number)
66 , m_run_number(run_number)
67 , m_update_time(std::chrono::steady_clock::now())
68 {
69 }
70
71 TimeSliceAccumulator& operator=(const TimeSliceAccumulator& other)
72 {
73 if (this != &other) {
74 std::lock(m_bundle_map_mutex, other.m_bundle_map_mutex);
75 std::lock_guard<std::mutex> lhs_lk(m_bundle_map_mutex, std::adopt_lock);
76 std::lock_guard<std::mutex> rhs_lk(other.m_bundle_map_mutex, std::adopt_lock);
77 m_begin_time = other.m_begin_time;
78 m_end_time = other.m_end_time;
79 m_slice_number = other.m_slice_number;
80 m_run_number = other.m_run_number;
81 m_update_time = other.m_update_time;
82 m_tpbundles_by_sourceid_and_start_time = other.m_tpbundles_by_sourceid_and_start_time;
83 }
84 return *this;
85 }
86
87 void add_tpset(trigger::TPSet&& tpset);
88
89 std::unique_ptr<daqdataformats::TimeSlice> get_timeslice();
90
91 std::chrono::steady_clock::time_point get_update_time() const
92 {
93 auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
94 return m_update_time;
95 }
96
97 void update_slice_number(int delta)
98 {
99 m_slice_number += delta;
100 }
101
102private:
103 daqdataformats::timestamp_t m_begin_time;
106 daqdataformats::run_number_t m_run_number;
107 std::chrono::steady_clock::time_point m_update_time;
108 typedef std::map<daqdataformats::timestamp_t, trigger::TPSet> tpbundles_by_start_time_t;
109 typedef std::map<daqdataformats::SourceID, tpbundles_by_start_time_t> bundles_by_sourceid_t;
110 bundles_by_sourceid_t m_tpbundles_by_sourceid_and_start_time;
111 mutable std::mutex m_bundle_map_mutex;
112};
113
114class TPBundleHandler
115{
116public:
117 TPBundleHandler(daqdataformats::timestamp_t slice_interval,
119 std::chrono::steady_clock::duration cooling_off_time)
120 : m_slice_interval(slice_interval)
121 , m_run_number(run_number)
122 , m_cooling_off_time(cooling_off_time)
123 , m_slice_index_offset(0)
124 , m_one_or_more_time_slices_have_aged_out(false)
125 {
126 }
127
128 TPBundleHandler(TPBundleHandler const&) = delete;
129 TPBundleHandler(TPBundleHandler&&) = delete;
130 TPBundleHandler& operator=(TPBundleHandler const&) = delete;
131 TPBundleHandler& operator=(TPBundleHandler&&) = delete;
132
133 void add_tpset(trigger::TPSet&& tpset);
134
135 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> get_properly_aged_timeslices();
136
137 std::vector<std::unique_ptr<daqdataformats::TimeSlice>> get_all_remaining_timeslices();
138
139private:
140 daqdataformats::timestamp_t m_slice_interval;
141 daqdataformats::run_number_t m_run_number;
142 std::chrono::steady_clock::duration m_cooling_off_time;
143 size_t m_slice_index_offset;
144 std::map<daqdataformats::timestamp_t, TimeSliceAccumulator> m_timeslice_accumulators;
145 mutable std::mutex m_accumulator_map_mutex;
146 bool m_one_or_more_time_slices_have_aged_out;
147};
148} // namespace dfmodules
149} // namespace dunedaq
150
151#endif // DFMODULES_SRC_DFMODULES_TPBUNDLEHANDLER_HPP_
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
A set of TPs or TAs in a given time window, defined by its start and end times.
Definition Set.hpp:26
uint32_t run_number_t
Type used to represent run number.
Definition Types.hpp:20
uint64_t timestamp_t
Type used to represent DUNE timing system timestamps.
Definition Types.hpp:36
uint64_t timeslice_number_t
Type used to represent timeslice number.
Definition Types.hpp:49
Including Qt Headers.
Cannot add TPSet with because another TPSet with these values already size_t tpset_source_id((daqdataformats::timestamp_t) tpset_start_time)) ERS_DECLARE_ISSUE(dfmodules
Cannot add TPSet with because another TPSet with these values already size_t TardyTPSetReceived