Line data Source code
1 : /**
2 : * @file DFOModule.hpp
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #ifndef DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
10 : #define DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
11 :
12 : #include "dfmodules/TriggerRecordBuilderData.hpp"
13 :
14 : #include "appmodel/DFOConf.hpp"
15 :
16 : #include "daqdataformats/TriggerRecord.hpp"
17 : #include "dfmessages/DataRequest.hpp"
18 : #include "dfmessages/TriggerDecision.hpp"
19 : #include "dfmessages/TriggerDecisionToken.hpp"
20 : #include "dfmessages/TriggerInhibit.hpp"
21 : #include "trgdataformats/TriggerCandidateData.hpp"
22 :
23 : #include "iomanager/Sender.hpp"
24 :
25 : #include "appfwk/DAQModule.hpp"
26 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
27 :
28 : #include <map>
29 : #include <memory>
30 : #include <string>
31 : #include <utility>
32 : #include <vector>
33 : #include <mutex>
34 :
35 : namespace dunedaq {
36 :
37 : // Disable coverage checking LCOV_EXCL_START
38 : ERS_DECLARE_ISSUE(dfmodules,
39 : TRBModuleAppUpdate,
40 : "TRBModule app " << connection_name << ": " << message,
41 : ((std::string)connection_name)((std::string)message))
42 : ERS_DECLARE_ISSUE(dfmodules,
43 : UnknownTokenSource,
44 : "Token from unknown source: " << connection_name,
45 : ((std::string)connection_name))
46 : ERS_DECLARE_ISSUE(dfmodules,
47 : DFOModuleRunNumberMismatch,
48 : "DFOModule encountered run number mismatch: recvd ("
49 : << received_run_number << ") != " << run_number << " from " << src_app << " for trigger_number "
50 : << trig_num,
51 : ((uint32_t)received_run_number)((uint32_t)run_number)((std::string)src_app)(
52 : (uint32_t)trig_num)) // NOLINT(build/unsigned)
53 : ERS_DECLARE_ISSUE(dfmodules,
54 : IncompleteTriggerDecision,
55 : "TriggerDecision " << trigger_number << " didn't complete within timeout in run " << run_number,
56 : ((uint32_t)trigger_number)((uint32_t)run_number)) // NOLINT(build/unsigned)
57 : ERS_DECLARE_ISSUE(dfmodules,
58 : UnableToAssign,
59 : "TriggerDecision " << trigger_number << " could not be assigned",
60 : ((uint32_t)trigger_number)) // NOLINT(build/unsigned)
61 : ERS_DECLARE_ISSUE(dfmodules,
62 : AssignedToBusyApp,
63 : "TriggerDecision " << trigger_number << " was assigned to DF app " << app << " that was busy with "
64 : << used_slots << " TDs",
65 : ((uint32_t)trigger_number)((std::string)app)((size_t)used_slots)) // NOLINT(build/unsigned)
66 : // Re-enable coverage checking LCOV_EXCL_STOP
67 :
68 : namespace dfmodules {
69 :
70 : /**
71 : * @brief DFOModule distributes triggers according to the
72 : * availability of the DF apps in the system
73 : */
74 : class DFOModule : public dunedaq::appfwk::DAQModule
75 : {
76 : public:
77 : /**
78 : * @brief DFOModule Constructor
79 : * @param name Instance name for this DFOModule instance
80 : */
81 : explicit DFOModule(const std::string& name);
82 :
83 : DFOModule(const DFOModule&) = delete; ///< DFOModule is not copy-constructible
84 : DFOModule& operator=(const DFOModule&) =
85 : delete; ///< DFOModule is not copy-assignable
86 : DFOModule(DFOModule&&) = delete; ///< DFOModule is not move-constructible
87 : DFOModule& operator=(DFOModule&&) = delete; ///< DFOModule is not move-assignable
88 :
89 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
90 :
91 : protected:
92 : virtual std::shared_ptr<AssignedTriggerDecision> find_slot(const dfmessages::TriggerDecision& decision);
93 : // find_slot operates on a round-robin logic
94 :
95 : using trbd_ptr_t = std::shared_ptr<TriggerRecordBuilderData>;
96 : using data_structure_t = std::map<std::string, trbd_ptr_t>;
97 : data_structure_t m_dataflow_availability;
98 : data_structure_t::iterator m_last_assignement_it;
99 : std::function<void(nlohmann::json&)> m_metadata_function;
100 :
101 : private:
102 : // Commands
103 : void do_conf(const CommandData_t&);
104 : void do_start(const CommandData_t&);
105 : void do_stop(const CommandData_t&);
106 : void do_scrap(const CommandData_t&);
107 :
108 : void generate_opmon_data() override;
109 :
110 : virtual void receive_trigger_complete_token(const dfmessages::TriggerDecisionToken&);
111 : void receive_trigger_decision(const dfmessages::TriggerDecision&);
112 : virtual bool is_busy() const;
113 : bool is_empty() const;
114 : size_t used_slots() const;
115 : void notify_trigger_if_needed() const;
116 : bool dispatch(const std::shared_ptr<AssignedTriggerDecision>& assignment);
117 : virtual void assign_trigger_decision(const std::shared_ptr<AssignedTriggerDecision>& assignment);
118 :
119 : // Configuration
120 : const appmodel::DFOConf* m_dfo_conf;
121 : std::chrono::milliseconds m_queue_timeout;
122 : std::chrono::microseconds m_stop_timeout;
123 : dunedaq::daqdataformats::run_number_t m_run_number;
124 :
125 : // Connections
126 : std::shared_ptr<iomanager::SenderConcept<dfmessages::TriggerInhibit>> m_busy_sender;
127 : std::string m_token_connection;
128 : std::string m_td_connection;
129 : size_t m_td_send_retries;
130 : size_t m_busy_threshold;
131 : size_t m_free_threshold;
132 : std::vector<std::string> m_trb_conn_ids;
133 :
134 : // Coordination
135 : std::atomic<bool> m_running_status{ false };
136 : mutable std::atomic<bool> m_last_notified_busy{ false };
137 : std::chrono::steady_clock::time_point m_last_token_received;
138 : std::chrono::steady_clock::time_point m_last_td_received;
139 : mutable std::mutex m_notify_trigger_mutex;
140 :
141 : // Struct for statistic
142 : struct TriggerData {
143 : std::atomic<uint64_t> received{0};
144 : std::atomic<uint64_t> completed{0};
145 : };
146 : static std::set<trgdataformats::TriggerCandidateData::Type>
147 7 : unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) {
148 7 : std::set<trgdataformats::TriggerCandidateData::Type> results;
149 7 : if (t == dfmessages::TypeDefaults::s_invalid_trigger_type)
150 : return results;
151 7 : const std::bitset<64> bits(t);
152 455 : for( size_t i = 0; i < bits.size(); ++i ) {
153 448 : if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i);
154 : }
155 : return results;
156 0 : }
157 :
158 : // Statistics
159 : std::atomic<uint64_t> m_received_tokens{ 0 }; // NOLINT (build/unsigned)
160 : std::atomic<uint64_t> m_sent_decisions{ 0 }; // NOLINT (build/unsigned)
161 : std::atomic<uint64_t> m_received_decisions{ 0 }; // NOLINT (build/unsigned)
162 : std::atomic<uint64_t> m_waiting_for_decision{ 0 }; // NOLINT (build/unsigned)
163 : std::atomic<uint64_t> m_deciding_destination{ 0 }; // NOLINT (build/unsigned)
164 : std::atomic<uint64_t> m_forwarding_decision{ 0 }; // NOLINT (build/unsigned)
165 : std::atomic<uint64_t> m_waiting_for_token{ 0 }; // NOLINT (build/unsigned)
166 : std::atomic<uint64_t> m_processing_token{ 0 }; // NOLINT (build/unsigned)
167 : std::map<dunedaq::trgdataformats::TriggerCandidateData::Type, TriggerData> m_trigger_counters;
168 : std::mutex m_trigger_counters_mutex; // used to safely handle the map above
169 7 : TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) {
170 7 : auto it = m_trigger_counters.find(type);
171 7 : if (it != m_trigger_counters.end()) return it->second;
172 :
173 2 : std::lock_guard<std::mutex> guard(m_trigger_counters_mutex);
174 2 : return m_trigger_counters[type];
175 2 : }
176 :
177 : };
178 : } // namespace dfmodules
179 : } // namespace dunedaq
180 :
181 : #endif // DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_
|