DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
InfoGatherer.hpp
Go to the documentation of this file.
1
12#ifndef TIMINGLIBS_SRC_INFOGATHERER_HPP_
13#define TIMINGLIBS_SRC_INFOGATHERER_HPP_
14
17
18#include "ers/Issue.hpp"
19#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
20
21#include "iomanager/Sender.hpp"
23
24#include "nlohmann/json.hpp"
25
26#include <functional>
27#include <future>
28#include <list>
29#include <memory>
30#include <shared_mutex>
31#include <string>
32
33namespace dunedaq {
34
39 GatherThreadingIssue, // Issue Class Name
40 "Gather Threading Issue detected: " << err, // Message
41 ((std::string)err)) // Message parameters
42
45 " Failed to send send " << device << " device info to " << destination << ".",
46 ((std::string)device)((std::string)destination))
47namespace timinglibs {
48
53class InfoGatherer
54{
55public:
61 explicit InfoGatherer(std::function<void(InfoGatherer&)> gather_data,
62 uint gather_interval,
63 const std::string& device_name,
64 int op_mon_level)
65 : m_run_gathering(false)
66 , m_gathering_thread(nullptr)
67 , m_gather_interval(gather_interval)
68 , m_device_name(device_name)
69 , m_last_gathered_time(0)
70 , m_op_mon_level(op_mon_level)
71 , m_gather_data(gather_data)
72 , m_device_info_connection_id(device_name+"_info")
73 , m_hw_info_sender(nullptr)
74 , m_sent_counter(0)
75 , m_failed_to_send_counter(0)
76 , m_queue_timeout(1)
77 {
78 // m_info_collector = std::make_unique<opmonlib::InfoCollector>();
79 m_hw_info_sender = iomanager::IOManager::get()->get_sender<nlohmann::json>(m_device_info_connection_id);
80 }
81
82 virtual ~InfoGatherer()
83 {
84 if (run_gathering()) stop_gathering_thread();
85 }
86
87 InfoGatherer(const InfoGatherer&) = delete;
88 InfoGatherer& operator=(const InfoGatherer&) = delete;
89 InfoGatherer(InfoGatherer&&) = delete;
90 InfoGatherer& operator=(InfoGatherer&&) = delete;
91
96 void start_gathering_thread(const std::string& name = "noname")
97 {
98 if (run_gathering()) {
99 ers::warning(GatherThreadingIssue(ERS_HERE,
100 "Attempted to start gathering thread "
101 "when it is already supposed to be running!"));
102 return;
103 }
104 m_run_gathering = true;
105 m_gathering_thread.reset(new std::thread([&] { m_gather_data(*this); }));
106 auto handle = m_gathering_thread->native_handle();
107 auto rc = pthread_setname_np(handle, name.c_str());
108 if (rc != 0) {
109 std::ostringstream s;
110 s << "The name " << name << " provided for the thread is too long.";
111 ers::warning(GatherThreadingIssue(ERS_HERE, s.str()));
112 }
113 }
114
121 void stop_gathering_thread()
122 {
123 if (!run_gathering()) {
124 ers::warning(GatherThreadingIssue(ERS_HERE,
125 "Attempted to stop gathering thread "
126 "when it is not supposed to be running!"));
127 return;
128 }
129 m_run_gathering = false;
130 if (m_gathering_thread->joinable()) {
131 try {
132 m_gathering_thread->join();
133 } catch (std::system_error const& e) {
134 throw GatherThreadingIssue(ERS_HERE, std::string("Error while joining gathering thread, ") + e.what());
135 }
136 } else {
137 throw GatherThreadingIssue(ERS_HERE, "Thread not in joinable state during working thread stop!");
138 }
139 }
140
145 bool run_gathering() const { return m_run_gathering.load(); }
146
147 void update_gather_interval(uint new_gather_interval) { m_gather_interval.store(new_gather_interval); }
148 uint get_gather_interval() const { return m_gather_interval.load(); }
149
150 void update_last_gathered_time(int64_t last_time) { m_last_gathered_time.store(last_time); }
151 time_t get_last_gathered_time() const { return m_last_gathered_time.load(); }
152
153 std::string get_device_name() const { return m_device_name; }
154
155 int get_op_mon_level() const { return m_op_mon_level; }
156
157 template<class DSGN>
158 void collect_info_from_device(const DSGN& device)
159 {
160 std::unique_lock info_collector_lock(m_info_collector_mutex);
161 m_device_info.reset( new timing::timingfirmwareinfo::TimingDeviceInfo() );
162 device.get_info(*m_device_info);
163 update_last_gathered_time(std::time(nullptr));
164 send_device_info();
165 }
166
167 // void add_info_to_collector(std::string label, opmonlib::InfoCollector& ic)
168 // {
169 // std::unique_lock info_collector_lock(m_info_collector_mutex);
170 // if (m_info_collector->is_empty()) {
171 // TLOG_DEBUG(3) << "skipping add info for gatherer: " << get_device_name()
172 // << " with gathered time: " << get_last_gathered_time() << " and level " << get_op_mon_level();
173 // } else {
174 // ic.add(label, *m_info_collector);
175 // }
176 // m_info_collector = std::make_unique<opmonlib::InfoCollector>();
177 // update_last_gathered_time(0);
178 // }
179
180private:
181 void send_device_info()
182 {
183 //if (m_info_collector->is_empty())
184 //{
185 // TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name() << ", collector empty.";
186 // return;
187 //}
188
189 if (!m_hw_info_sender)
190 {
191 TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name();
192 return;
193 }
194
195 nlohmann::json info;
196 to_json(info, *m_device_info);
197 bool was_successfully_sent = false;
198 while (!was_successfully_sent)
199 {
200 try
201 {
202 m_hw_info_sender->send(std::move(info), m_queue_timeout);
203 TLOG_DEBUG(4) << "sent " << get_device_name() << " info";
204 ++m_sent_counter;
205 was_successfully_sent = true;
206 }
207 catch (const dunedaq::iomanager::TimeoutExpired& excpt)
208 {
209 ers::error(DeviceInfoSendFailed(ERS_HERE, m_device_name, m_device_info_connection_id));
210 ++m_failed_to_send_counter;
211 }
212 }
213 }
214
215protected:
216 std::atomic<bool> m_run_gathering;
217 std::unique_ptr<std::thread> m_gathering_thread;
218 std::atomic<uint> m_gather_interval;
219 mutable std::shared_mutex m_mon_data_mutex;
220 std::string m_device_name;
221 std::atomic<time_t> m_last_gathered_time;
222 int m_op_mon_level;
223 // std::unique_ptr<opmonlib::InfoCollector> m_info_collector;
224 std::unique_ptr<timing::timingfirmwareinfo::TimingDeviceInfo> m_device_info;
225 mutable std::mutex m_info_collector_mutex;
226 std::function<void(InfoGatherer&)> m_gather_data;
227 std::string m_device_info_connection_id;
229 std::shared_ptr<sink_t> m_hw_info_sender;
230 std::atomic<uint> m_sent_counter;
231 std::atomic<uint> m_failed_to_send_counter;
232 std::chrono::milliseconds m_queue_timeout;
233};
234
235} // namespace timinglibs
236} // namespace dunedaq
237
238#endif // TIMINGLIBS_SRC_INFOGATHERER_HPP_
239
240// Local Variables:
241// c-basic-offset: 2
242// End:
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
#define ERS_HERE
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Including Qt Headers.
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81