12#ifndef TIMINGLIBS_SRC_INFOGATHERER_HPP_
13#define TIMINGLIBS_SRC_INFOGATHERER_HPP_
24#include "nlohmann/json.hpp"
30#include <shared_mutex>
40 "Gather Threading Issue detected: " << err,
45 " Failed to send send " << device <<
" device info to " << destination <<
".",
46 ((std::string)device)((std::string)destination))
61 explicit InfoGatherer(std::function<
void(InfoGatherer&)> gather_data,
63 const std::string& device_name,
65 : m_run_gathering(
false)
66 , m_gathering_thread(
nullptr)
67 , m_gather_interval(gather_interval)
68 , m_device_name(device_name)
69 , m_last_gathered_time(0)
70 , m_op_mon_level(op_mon_level)
71 , m_gather_data(gather_data)
72 , m_device_info_connection_id(device_name+
"_info")
73 , m_hw_info_sender(
nullptr)
75 , m_failed_to_send_counter(0)
79 m_hw_info_sender = iomanager::IOManager::get()->get_sender<nlohmann::json>(m_device_info_connection_id);
82 virtual ~InfoGatherer()
84 if (run_gathering()) stop_gathering_thread();
87 InfoGatherer(
const InfoGatherer&) =
delete;
88 InfoGatherer& operator=(
const InfoGatherer&) =
delete;
89 InfoGatherer(InfoGatherer&&) =
delete;
90 InfoGatherer& operator=(InfoGatherer&&) =
delete;
96 void start_gathering_thread(
const std::string& name =
"noname")
98 if (run_gathering()) {
100 "Attempted to start gathering thread "
101 "when it is already supposed to be running!"));
104 m_run_gathering =
true;
105 m_gathering_thread.reset(
new std::thread([&] { m_gather_data(*
this); }));
106 auto handle = m_gathering_thread->native_handle();
107 auto rc = pthread_setname_np(handle, name.c_str());
109 std::ostringstream s;
110 s <<
"The name " << name <<
" provided for the thread is too long.";
121 void stop_gathering_thread()
123 if (!run_gathering()) {
125 "Attempted to stop gathering thread "
126 "when it is not supposed to be running!"));
129 m_run_gathering =
false;
130 if (m_gathering_thread->joinable()) {
132 m_gathering_thread->join();
133 }
catch (std::system_error
const& e) {
134 throw GatherThreadingIssue(
ERS_HERE, std::string(
"Error while joining gathering thread, ") + e.what());
137 throw GatherThreadingIssue(
ERS_HERE,
"Thread not in joinable state during working thread stop!");
145 bool run_gathering()
const {
return m_run_gathering.load(); }
147 void update_gather_interval(uint new_gather_interval) { m_gather_interval.store(new_gather_interval); }
148 uint get_gather_interval()
const {
return m_gather_interval.load(); }
150 void update_last_gathered_time(int64_t last_time) { m_last_gathered_time.store(last_time); }
151 time_t get_last_gathered_time()
const {
return m_last_gathered_time.load(); }
153 std::string get_device_name()
const {
return m_device_name; }
155 int get_op_mon_level()
const {
return m_op_mon_level; }
158 void collect_info_from_device(
const DSGN& device)
160 std::unique_lock info_collector_lock(m_info_collector_mutex);
162 device.get_info(*m_device_info);
163 update_last_gathered_time(std::time(
nullptr));
181 void send_device_info()
189 if (!m_hw_info_sender)
191 TLOG_DEBUG(3) <<
"skipping sending info for gatherer: " << get_device_name();
196 to_json(info, *m_device_info);
197 bool was_successfully_sent =
false;
198 while (!was_successfully_sent)
202 m_hw_info_sender->send(std::move(info), m_queue_timeout);
203 TLOG_DEBUG(4) <<
"sent " << get_device_name() <<
" info";
205 was_successfully_sent =
true;
207 catch (
const dunedaq::iomanager::TimeoutExpired& excpt)
210 ++m_failed_to_send_counter;
216 std::atomic<bool> m_run_gathering;
217 std::unique_ptr<std::thread> m_gathering_thread;
218 std::atomic<uint> m_gather_interval;
219 mutable std::shared_mutex m_mon_data_mutex;
220 std::string m_device_name;
221 std::atomic<time_t> m_last_gathered_time;
224 std::unique_ptr<timing::timingfirmwareinfo::TimingDeviceInfo> m_device_info;
225 mutable std::mutex m_info_collector_mutex;
226 std::function<void(InfoGatherer&)> m_gather_data;
227 std::string m_device_info_connection_id;
229 std::shared_ptr<sink_t> m_hw_info_sender;
230 std::atomic<uint> m_sent_counter;
231 std::atomic<uint> m_failed_to_send_counter;
232 std::chrono::milliseconds m_queue_timeout;
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
#define TLOG_DEBUG(lvl,...)
void warning(const Issue &issue)
void error(const Issue &issue)