Line data Source code
1 : /**
2 : * @file InfoGatherer.hpp
3 : *
4 : * InfoGatherer is a DAQModule implementation that
5 : * provides the a mechanism of collecting and filling monitoring data in a dedicated thread.
6 : *
7 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
8 : * Licensing/copyright details are in the COPYING file that you should have
9 : * received with this code.
10 : */
11 :
12 : #ifndef TIMINGLIBS_SRC_INFOGATHERER_HPP_
13 : #define TIMINGLIBS_SRC_INFOGATHERER_HPP_
14 :
15 : #include "timing/timingfirmwareinfo/Nljs.hpp"
16 : #include "timing/timingfirmwareinfo/Structs.hpp"
17 :
18 : #include "ers/Issue.hpp"
19 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
20 :
21 : #include "iomanager/Sender.hpp"
22 : #include "iomanager/IOManager.hpp"
23 :
24 : #include "nlohmann/json.hpp"
25 :
26 : #include <functional>
27 : #include <future>
28 : #include <list>
29 : #include <memory>
30 : #include <shared_mutex>
31 : #include <string>
32 :
33 : namespace dunedaq {
34 :
35 : /**
36 : * @brief An ERS Issue raised when a threading state error occurs
37 : */
38 1 : ERS_DECLARE_ISSUE(timinglibs, // Namespace
39 : GatherThreadingIssue, // Issue Class Name
40 : "Gather Threading Issue detected: " << err, // Message
41 : ((std::string)err)) // Message parameters
42 :
43 1 : ERS_DECLARE_ISSUE(timinglibs,
44 : DeviceInfoSendFailed,
45 : " Failed to send send " << device << " device info to " << destination << ".",
46 : ((std::string)device)((std::string)destination))
47 : namespace timinglibs {
48 :
49 : /**
50 : * @brief InfoGatherer helper class for DAQ module monitor
51 : * data gathering.
52 : */
53 : class InfoGatherer
54 : {
55 : public:
56 : /**
57 : * @brief InfoGatherer Constructor
58 : * @param gather_data function for data gathering
59 : * @param gather_interval interval for data gathering in us
60 : */
61 0 : explicit InfoGatherer(std::function<void(InfoGatherer&)> gather_data,
62 : uint gather_interval,
63 : const std::string& device_name,
64 : int op_mon_level)
65 0 : : m_run_gathering(false)
66 0 : , m_gathering_thread(nullptr)
67 0 : , m_gather_interval(gather_interval)
68 0 : , m_device_name(device_name)
69 0 : , m_last_gathered_time(0)
70 0 : , m_op_mon_level(op_mon_level)
71 0 : , m_gather_data(gather_data)
72 0 : , m_device_info_connection_id(device_name+"_info")
73 0 : , m_hw_info_sender(nullptr)
74 0 : , m_sent_counter(0)
75 0 : , m_failed_to_send_counter(0)
76 0 : , m_queue_timeout(1)
77 : {
78 : // m_info_collector = std::make_unique<opmonlib::InfoCollector>();
79 0 : m_hw_info_sender = iomanager::IOManager::get()->get_sender<nlohmann::json>(m_device_info_connection_id);
80 0 : }
81 :
82 0 : virtual ~InfoGatherer()
83 0 : {
84 0 : if (run_gathering()) stop_gathering_thread();
85 0 : }
86 :
87 : InfoGatherer(const InfoGatherer&) = delete; ///< InfoGatherer is not copy-constructible
88 : InfoGatherer& operator=(const InfoGatherer&) = delete; ///< InfoGatherer is not copy-assignable
89 : InfoGatherer(InfoGatherer&&) = delete; ///< InfoGatherer is not move-constructible
90 : InfoGatherer& operator=(InfoGatherer&&) = delete; ///< InfoGatherer is not move-assignable
91 :
92 : /**
93 : * @brief Start the monitoring thread (which executes the m_gather_data() function)
94 : * @throws MonitorThreadingIssue if the thread is already running
95 : */
96 0 : void start_gathering_thread(const std::string& name = "noname")
97 : {
98 0 : if (run_gathering()) {
99 0 : ers::warning(GatherThreadingIssue(ERS_HERE,
100 : "Attempted to start gathering thread "
101 0 : "when it is already supposed to be running!"));
102 0 : return;
103 : }
104 0 : m_run_gathering = true;
105 0 : m_gathering_thread.reset(new std::thread([&] { m_gather_data(*this); }));
106 0 : auto handle = m_gathering_thread->native_handle();
107 0 : auto rc = pthread_setname_np(handle, name.c_str());
108 0 : if (rc != 0) {
109 0 : std::ostringstream s;
110 0 : s << "The name " << name << " provided for the thread is too long.";
111 0 : ers::warning(GatherThreadingIssue(ERS_HERE, s.str()));
112 0 : }
113 : }
114 :
115 : /**
116 : * @brief Stop the gathering thread
117 : * @throws GatherThreadingIssue If the thread has not yet been started
118 : * @throws GatherThreadingIssue If the thread is not in the joinable state
119 : * @throws GatherThreadingIssue If an exception occurs during thread join
120 : */
121 0 : void stop_gathering_thread()
122 : {
123 0 : if (!run_gathering()) {
124 0 : ers::warning(GatherThreadingIssue(ERS_HERE,
125 : "Attempted to stop gathering thread "
126 0 : "when it is not supposed to be running!"));
127 0 : return;
128 : }
129 0 : m_run_gathering = false;
130 0 : if (m_gathering_thread->joinable()) {
131 0 : try {
132 0 : m_gathering_thread->join();
133 0 : } catch (std::system_error const& e) {
134 0 : throw GatherThreadingIssue(ERS_HERE, std::string("Error while joining gathering thread, ") + e.what());
135 0 : }
136 : } else {
137 0 : throw GatherThreadingIssue(ERS_HERE, "Thread not in joinable state during working thread stop!");
138 : }
139 : }
140 :
141 : /**
142 : * @brief Determine if the thread is currently running
143 : * @return Whether the thread is currently running
144 : */
145 0 : bool run_gathering() const { return m_run_gathering.load(); }
146 :
147 : void update_gather_interval(uint new_gather_interval) { m_gather_interval.store(new_gather_interval); }
148 0 : uint get_gather_interval() const { return m_gather_interval.load(); }
149 :
150 0 : void update_last_gathered_time(int64_t last_time) { m_last_gathered_time.store(last_time); }
151 : time_t get_last_gathered_time() const { return m_last_gathered_time.load(); }
152 :
153 0 : std::string get_device_name() const { return m_device_name; }
154 :
155 : int get_op_mon_level() const { return m_op_mon_level; }
156 :
157 : template<class DSGN>
158 0 : void collect_info_from_device(const DSGN& device)
159 : {
160 0 : std::unique_lock info_collector_lock(m_info_collector_mutex);
161 0 : m_device_info.reset( new timing::timingfirmwareinfo::TimingDeviceInfo() );
162 0 : device.get_info(*m_device_info);
163 0 : update_last_gathered_time(std::time(nullptr));
164 0 : send_device_info();
165 0 : }
166 :
167 : // void add_info_to_collector(std::string label, opmonlib::InfoCollector& ic)
168 : // {
169 : // std::unique_lock info_collector_lock(m_info_collector_mutex);
170 : // if (m_info_collector->is_empty()) {
171 : // TLOG_DEBUG(3) << "skipping add info for gatherer: " << get_device_name()
172 : // << " with gathered time: " << get_last_gathered_time() << " and level " << get_op_mon_level();
173 : // } else {
174 : // ic.add(label, *m_info_collector);
175 : // }
176 : // m_info_collector = std::make_unique<opmonlib::InfoCollector>();
177 : // update_last_gathered_time(0);
178 : // }
179 :
180 : private:
181 0 : void send_device_info()
182 : {
183 : //if (m_info_collector->is_empty())
184 : //{
185 : // TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name() << ", collector empty.";
186 : // return;
187 : //}
188 :
189 0 : if (!m_hw_info_sender)
190 : {
191 0 : TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name();
192 0 : return;
193 : }
194 :
195 0 : nlohmann::json info;
196 0 : to_json(info, *m_device_info);
197 : bool was_successfully_sent = false;
198 0 : while (!was_successfully_sent)
199 : {
200 0 : try
201 : {
202 0 : m_hw_info_sender->send(std::move(info), m_queue_timeout);
203 0 : TLOG_DEBUG(4) << "sent " << get_device_name() << " info";
204 0 : ++m_sent_counter;
205 0 : was_successfully_sent = true;
206 : }
207 0 : catch (const dunedaq::iomanager::TimeoutExpired& excpt)
208 : {
209 0 : ers::error(DeviceInfoSendFailed(ERS_HERE, m_device_name, m_device_info_connection_id));
210 0 : ++m_failed_to_send_counter;
211 0 : }
212 : }
213 0 : }
214 :
215 : protected:
216 : std::atomic<bool> m_run_gathering;
217 : std::unique_ptr<std::thread> m_gathering_thread;
218 : std::atomic<uint> m_gather_interval;
219 : mutable std::shared_mutex m_mon_data_mutex;
220 : std::string m_device_name;
221 : std::atomic<time_t> m_last_gathered_time;
222 : int m_op_mon_level;
223 : // std::unique_ptr<opmonlib::InfoCollector> m_info_collector;
224 : std::unique_ptr<timing::timingfirmwareinfo::TimingDeviceInfo> m_device_info;
225 : mutable std::mutex m_info_collector_mutex;
226 : std::function<void(InfoGatherer&)> m_gather_data;
227 : std::string m_device_info_connection_id;
228 : using sink_t = dunedaq::iomanager::SenderConcept<nlohmann::json>;
229 : std::shared_ptr<sink_t> m_hw_info_sender;
230 : std::atomic<uint> m_sent_counter;
231 : std::atomic<uint> m_failed_to_send_counter;
232 : std::chrono::milliseconds m_queue_timeout;
233 : };
234 :
235 : } // namespace timinglibs
236 : } // namespace dunedaq
237 :
238 : #endif // TIMINGLIBS_SRC_INFOGATHERER_HPP_
239 :
240 : // Local Variables:
241 : // c-basic-offset: 2
242 : // End:
|