Line data Source code
1 : /**
2 : * @file MLTModule.hpp
3 : *
4 : * MLTModule is a DAQModule that generates trigger decisions
5 : * for standalone tests. It receives information on the current time and the
6 : * availability of the DF to absorb data and forms decisions at a configurable
7 : * rate and with configurable size.
8 : *
9 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
10 : * Licensing/copyright details are in the COPYING file that you should have
11 : * received with this code.
12 : */
13 :
14 : #ifndef TRIGGER_PLUGINS_MODULELEVELTRIGGER_HPP_
15 : #define TRIGGER_PLUGINS_MODULELEVELTRIGGER_HPP_
16 :
17 : #include "trigger/Issues.hpp"
18 : #include "trigger/Latency.hpp"
19 : #include "trigger/LivetimeCounter.hpp"
20 : #include "trigger/TokenManager.hpp"
21 : #include "trigger/opmon/latency_info.pb.h"
22 : #include "trigger/opmon/moduleleveltrigger_info.pb.h"
23 :
24 : #include "appfwk/DAQModule.hpp"
25 :
26 : #include "appmodel/MLTConf.hpp"
27 : #include "appmodel/MLTModule.hpp"
28 : #include "appmodel/ROIGroupConf.hpp"
29 : #include "appmodel/SourceIDConf.hpp"
30 : #include "appmodel/SubdetectorReadoutWindowMap.hpp"
31 : #include "appmodel/TCReadoutMap.hpp"
32 :
33 : #include "confmodel/Connection.hpp"
34 : #include "confmodel/GeoId.hpp"
35 :
36 : #include "daqdataformats/SourceID.hpp"
37 : #include "dfmessages/TriggerDecision.hpp"
38 : #include "dfmessages/TriggerDecisionToken.hpp"
39 : #include "dfmessages/TriggerInhibit.hpp"
40 : #include "dfmessages/Types.hpp"
41 : #include "hdf5libs/HDF5RawDataFile.hpp"
42 : #include "iomanager/Receiver.hpp"
43 : #include "trgdataformats/TriggerCandidateData.hpp"
44 : #include "trgdataformats/Types.hpp"
45 : #include "triggeralgs/TriggerCandidate.hpp"
46 :
47 : #include <map>
48 : #include <memory>
49 : #include <set>
50 : #include <string>
51 : #include <vector>
52 :
53 : namespace dunedaq {
54 :
55 : namespace trigger {
56 :
57 : /**
58 : * @brief MLTModule is the last level of the data selection
59 : * system, which reads in trigger candidates and sends trigger
60 : * decisions, subject to availability of TriggerDecisionTokens
61 : */
62 : class MLTModule : public dunedaq::appfwk::DAQModule
63 : {
64 : public:
65 : typedef dunedaq::detdataformats::DetID::Subdetector SubdetectorID;
66 : /**
67 : * @brief MLTModule Constructor
68 : * @param name Instance name for this MLTModule instance
69 : */
70 : explicit MLTModule(const std::string& name);
71 :
72 : MLTModule(const MLTModule&) = delete; ///< MLTModule is not copy-constructible
73 : MLTModule& operator=(const MLTModule&) = delete; ///< MLTModule is not copy-assignable
74 : MLTModule(MLTModule&&) = delete; ///< MLTModule is not move-constructible
75 : MLTModule& operator=(MLTModule&&) = delete; ///< MLTModule is not move-assignable
76 :
77 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
78 : void generate_opmon_data() override;
79 :
80 : private:
81 : // Commands
82 : void do_start(const CommandData_t& obj);
83 : void do_stop(const CommandData_t& obj);
84 : void do_pause(const CommandData_t& obj);
85 : void do_resume(const CommandData_t& obj);
86 : void do_configure(const CommandData_t& /*obj*/);
87 : void do_scrap(const CommandData_t& /*obj*/);
88 :
89 : void trigger_decisions_callback(dfmessages::TriggerDecision& decision);
90 : void dfo_busy_callback(dfmessages::TriggerInhibit& inhibit);
91 :
92 : std::map<std::string, int> decode_geoid(uint64_t _geoid_int);
93 :
94 : const dunedaq::appmodel::MLTModule* m_mtrg;
95 : const dunedaq::confmodel::Session* m_session;
96 :
97 : // Queue sources and sinks
98 : std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TriggerDecision>> m_decision_input;
99 : std::shared_ptr<iomanager::SenderConcept<dfmessages::TriggerDecision>> m_decision_output;
100 : std::shared_ptr<iomanager::ReceiverConcept<dfmessages::TriggerInhibit>> m_inhibit_input;
101 :
102 : /* TD requests
103 : std::vector<daqdataformats::SourceID> m_mandatory_links;
104 : std::map<int, std::vector<daqdataformats::SourceID>> m_group_links;
105 : nlohmann::json m_group_links_data;
106 : int m_total_group_links;
107 : void parse_group_links(const nlohmann::json& data);
108 : void print_group_links();
109 : dfmessages::ComponentRequest create_request_for_link(daqdataformats::SourceID link,
110 : triggeralgs::timestamp_t start,
111 : triggeralgs::timestamp_t end);
112 : std::vector<dfmessages::ComponentRequest> create_all_decision_requests(std::vector<daqdataformats::SourceID> links,
113 : triggeralgs::timestamp_t start,
114 : triggeralgs::timestamp_t end);
115 : void add_requests_to_decision(dfmessages::TriggerDecision& decision,
116 : std::vector<dfmessages::ComponentRequest> requests);
117 : */
118 : /* ROI
119 : bool m_use_roi_readout;
120 : struct roi_group
121 : {
122 : int n_links;
123 : float prob;
124 : triggeralgs::timestamp_t time_window;
125 : std::string mode;
126 : };
127 : std::map<int, roi_group> m_roi_conf;
128 : std::vector<const appmodel::ROIGroupConf*> m_roi_conf_data;
129 : void parse_roi_conf(const std::vector<const appmodel::ROIGroupConf*>& data);
130 : void print_roi_conf(std::map<int, roi_group> roi_conf);
131 : std::vector<int> m_roi_conf_ids;
132 : std::vector<float> m_roi_conf_probs;
133 : std::vector<float> m_roi_conf_probs_c;
134 : float get_random_num_float(float limit);
135 : int get_random_num_int();
136 : int pick_roi_group_conf();
137 : void roi_readout_make_requests(dfmessages::TriggerDecision& decision);
138 :
139 : int m_repeat_trigger_count{ 1 };
140 : */
141 : // paused state, in which we don't send triggers
142 : std::atomic<bool> m_paused;
143 : std::atomic<bool> m_dfo_is_busy;
144 : // std::atomic<bool> m_hsi_passthrough;
145 : // std::atomic<bool> m_tc_merging;
146 :
147 : dfmessages::trigger_number_t m_last_trigger_number;
148 :
149 : dfmessages::run_number_t m_run_number;
150 :
151 : // Are we in the RUNNING state?
152 : std::atomic<bool> m_running_flag{ false };
153 : // Are we in a configured state, ie after conf and before scrap?
154 : std::atomic<bool> m_configured_flag{ false };
155 :
156 : // LivetimeCounter
157 : std::shared_ptr<LivetimeCounter> m_livetime_counter;
158 : LivetimeCounter::state_time_t m_lc_kLive_count;
159 : LivetimeCounter::state_time_t m_lc_kPaused_count;
160 : LivetimeCounter::state_time_t m_lc_kDead_count;
161 : LivetimeCounter::state_time_t m_lc_deadtime;
162 :
163 : /* New buffering
164 : struct PendingTD
165 : {
166 : std::vector<triggeralgs::TriggerCandidate> contributing_tcs;
167 : triggeralgs::timestamp_t readout_start;
168 : triggeralgs::timestamp_t readout_end;
169 : int64_t walltime_expiration;
170 : };
171 : std::vector<PendingTD> m_pending_tds;
172 : std::vector<PendingTD> m_sent_tds;
173 : std::mutex m_td_vector_mutex;
174 :
175 : void add_tc(const triggeralgs::TriggerCandidate& tc);
176 : void add_td(const PendingTD& pending_td);
177 : void add_tc_ignored(const triggeralgs::TriggerCandidate& tc);
178 : void call_tc_decision(const PendingTD& pending_td);
179 : bool check_overlap(const triggeralgs::TriggerCandidate& tc, const PendingTD& pending_td);
180 : bool check_overlap_td(const PendingTD& pending_td);
181 : bool check_td_readout_length(const PendingTD&);
182 : void clear_td_vectors();
183 : void flush_td_vectors();
184 : std::vector<PendingTD> get_ready_tds(std::vector<PendingTD>& pending_tds);
185 : int64_t m_buffer_timeout;
186 : int64_t m_td_readout_limit;
187 : std::atomic<bool> m_send_timed_out_tds;
188 : int m_earliest_tc_index;
189 : int get_earliest_tc_index(const PendingTD& pending_td);
190 : */
191 : /* Bitwords logic
192 : bool m_use_bitwords;
193 : nlohmann::json m_trigger_bitwords_json;
194 : bool m_bitword_check;
195 : std::bitset<16> m_TD_bitword;
196 : std::vector<std::bitset<16>> m_trigger_bitwords;
197 : std::bitset<16> get_TD_bitword(const PendingTD& ready_td);
198 : void print_trigger_bitwords(std::vector<std::bitset<16>> trigger_bitwords);
199 : bool check_trigger_bitwords();
200 : void print_bitword_flags(nlohmann::json m_trigger_bitwords_json);
201 : void set_trigger_bitwords();
202 : void set_trigger_bitwords(const std::vector<std::string>& _bitwords);
203 : */
204 : /* Readout map config
205 : bool m_use_readout_map;
206 : std::vector<const appmodel::TCReadoutMap*> m_readout_window_map_data;
207 : std::map<trgdataformats::TriggerCandidateData::Type, std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>>
208 : m_readout_window_map;
209 : void parse_readout_map(const std::vector<const appmodel::TCReadoutMap*>& data);
210 : void print_readout_map(std::map<trgdataformats::TriggerCandidateData::Type,
211 : std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>> map);
212 : */
213 : /* Create the next trigger decision
214 : dfmessages::TriggerDecision create_decision(const PendingTD& pending_td);
215 : dfmessages::trigger_type_t m_trigger_type_shifted;
216 : */
217 : /* Optional list of TC types to ignore
218 : std::vector<unsigned int> m_ignored_tc_types;
219 : bool m_ignoring_tc_types;
220 : bool check_trigger_type_ignore(unsigned int tc_type);
221 : */
222 :
223 : /// @brief SourceID -- SubdetectorID map
224 : std::map<dfmessages::SourceID, SubdetectorID> m_srcid_detid_map;
225 :
226 : /// @brief Subdetector--readout-window map config
227 : std::map<SubdetectorID, std::pair<triggeralgs::timestamp_t, triggeralgs::timestamp_t>>
228 : m_subdetector_readout_window_map;
229 :
230 : // Opmon variables
231 : using metric_counter_type = uint64_t; // decltype(moduleleveltriggerinfo::Info::tc_received_count);
232 : std::atomic<metric_counter_type> m_td_msg_received_count{ 0 };
233 : std::atomic<metric_counter_type> m_td_sent_count{ 0 };
234 : std::atomic<metric_counter_type> m_td_total_count{ 0 };
235 : // DFO state related
236 : std::atomic<metric_counter_type> m_td_inhibited_count{ 0 };
237 : std::atomic<metric_counter_type> m_td_paused_count{ 0 };
238 : std::atomic<metric_counter_type> m_td_queue_timeout_expired_err_count{ 0 };
239 : // livetime related
240 : std::atomic<metric_counter_type> m_lc_kLive{ 0 };
241 : std::atomic<metric_counter_type> m_lc_kPaused{ 0 };
242 : std::atomic<metric_counter_type> m_lc_kDead{ 0 };
243 : bool m_lc_started = false;
244 :
245 : // Struct for per TC stats
246 : struct TDData
247 : {
248 : std::atomic<metric_counter_type> received{ 0 };
249 : std::atomic<metric_counter_type> sent{ 0 };
250 : std::atomic<metric_counter_type> failed_send{ 0 };
251 : std::atomic<metric_counter_type> paused{ 0 };
252 : std::atomic<metric_counter_type> inhibited{ 0 };
253 : };
254 0 : static std::set<trgdataformats::TriggerCandidateData::Type> unpack_types(const dfmessages::trigger_type_t& t)
255 : {
256 0 : std::set<trgdataformats::TriggerCandidateData::Type> results;
257 0 : if (t == dfmessages::TypeDefaults::s_invalid_trigger_type)
258 : return results;
259 0 : const std::bitset<64> bits(t);
260 0 : for (size_t i = 0; i < bits.size(); ++i) {
261 0 : if (bits[i])
262 0 : results.insert((trgdataformats::TriggerCandidateData::Type)i);
263 : }
264 : return results;
265 0 : }
266 :
267 : std::map<dunedaq::trgdataformats::TriggerCandidateData::Type, TDData> m_trigger_counters;
268 :
269 : std::mutex m_trigger_mutex;
270 0 : TDData& get_trigger_counter(trgdataformats::TriggerCandidateData::Type type)
271 : {
272 0 : auto it = m_trigger_counters.find(type);
273 0 : if (it != m_trigger_counters.end())
274 0 : return it->second;
275 :
276 0 : std::lock_guard<std::mutex> guard(m_trigger_mutex);
277 0 : return m_trigger_counters[type];
278 0 : }
279 :
280 : // Create an instance of the Latency class
281 : std::atomic<bool> m_latency_monitoring{ false };
282 : dunedaq::trigger::Latency m_latency_instance;
283 : dunedaq::trigger::Latency m_latency_requests_instance;
284 : std::atomic<metric_counter_type> m_latency_in{ 0 };
285 : std::atomic<metric_counter_type> m_latency_out{ 0 };
286 : std::atomic<metric_counter_type> m_latency_window_start{ 0 };
287 : std::atomic<metric_counter_type> m_latency_window_end{ 0 };
288 :
289 : void print_opmon_stats();
290 : };
291 : } // namespace trigger
292 : } // namespace dunedaq
293 :
294 : #endif // TRIGGER_PLUGINS_MODULELEVELTRIGGER_HPP_
295 :
296 : // Local Variables:
297 : // c-basic-offset: 2
298 : // End:
|