LCOV - code coverage report
Current view: top level - timinglibs/src - InfoGatherer.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 2.6 % 76 2
Test Date: 2025-12-21 13:07:08 Functions: 6.9 % 29 2

            Line data    Source code
       1              : /**
       2              :  * @file InfoGatherer.hpp
       3              :  *
       4              :  * InfoGatherer is a DAQModule implementation that
       5              :  * provides the a mechanism of collecting and filling monitoring data in a dedicated thread.
       6              :  *
       7              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       8              :  * Licensing/copyright details are in the COPYING file that you should have
       9              :  * received with this code.
      10              :  */
      11              : 
      12              : #ifndef TIMINGLIBS_SRC_INFOGATHERER_HPP_
      13              : #define TIMINGLIBS_SRC_INFOGATHERER_HPP_
      14              : 
      15              : #include "timing/timingfirmwareinfo/Nljs.hpp"
      16              : #include "timing/timingfirmwareinfo/Structs.hpp"
      17              : 
      18              : #include "ers/Issue.hpp"
      19              : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
      20              : 
      21              : #include "iomanager/Sender.hpp"
      22              : #include "iomanager/IOManager.hpp"
      23              : 
      24              : #include "nlohmann/json.hpp"
      25              : 
      26              : #include <functional>
      27              : #include <future>
      28              : #include <list>
      29              : #include <memory>
      30              : #include <shared_mutex>
      31              : #include <string>
      32              : 
      33              : namespace dunedaq {
      34              : 
      35              : /**
      36              :  * @brief An ERS Issue raised when a threading state error occurs
      37              :  */
      38            1 : ERS_DECLARE_ISSUE(timinglibs,                                 // Namespace
      39              :                   GatherThreadingIssue,                       // Issue Class Name
      40              :                   "Gather Threading Issue detected: " << err, // Message
      41              :                   ((std::string)err))                         // Message parameters
      42              : 
      43            1 : ERS_DECLARE_ISSUE(timinglibs,
      44              :                   DeviceInfoSendFailed,
      45              :                   " Failed to send send " << device << " device info to " << destination << ".",
      46              :                   ((std::string)device)((std::string)destination))
      47              : namespace timinglibs {
      48              : 
      49              : /**
      50              :  * @brief InfoGatherer helper class for DAQ module monitor
      51              :  * data gathering.
      52              :  */
      53              : class InfoGatherer
      54              : {
      55              : public:
      56              :   /**
      57              :    * @brief InfoGatherer Constructor
      58              :    * @param gather_data function for data gathering
      59              :    * @param gather_interval interval for data gathering in us
      60              :    */
      61            0 :   explicit InfoGatherer(std::function<void(InfoGatherer&)> gather_data,
      62              :                         uint gather_interval,
      63              :                         const std::string& device_name,
      64              :                         int op_mon_level)
      65            0 :     : m_run_gathering(false)
      66            0 :     , m_gathering_thread(nullptr)
      67            0 :     , m_gather_interval(gather_interval)
      68            0 :     , m_device_name(device_name)
      69            0 :     , m_last_gathered_time(0)
      70            0 :     , m_op_mon_level(op_mon_level)
      71            0 :     , m_gather_data(gather_data)
      72            0 :     , m_device_info_connection_id(device_name+"_info")
      73            0 :     , m_hw_info_sender(nullptr)
      74            0 :     , m_sent_counter(0)
      75            0 :     , m_failed_to_send_counter(0)
      76            0 :     , m_queue_timeout(1)
      77              :   {
      78              :     //    m_info_collector = std::make_unique<opmonlib::InfoCollector>();
      79            0 :     m_hw_info_sender = iomanager::IOManager::get()->get_sender<nlohmann::json>(m_device_info_connection_id);
      80            0 :   }
      81              : 
      82            0 :   virtual ~InfoGatherer()
      83            0 :   {
      84            0 :     if (run_gathering()) stop_gathering_thread();
      85            0 :   }
      86              : 
      87              :   InfoGatherer(const InfoGatherer&) = delete;            ///< InfoGatherer is not copy-constructible
      88              :   InfoGatherer& operator=(const InfoGatherer&) = delete; ///< InfoGatherer is not copy-assignable
      89              :   InfoGatherer(InfoGatherer&&) = delete;                 ///< InfoGatherer is not move-constructible
      90              :   InfoGatherer& operator=(InfoGatherer&&) = delete;      ///< InfoGatherer is not move-assignable
      91              : 
      92              :   /**
      93              :    * @brief Start the monitoring thread (which executes the m_gather_data() function)
      94              :    * @throws MonitorThreadingIssue if the thread is already running
      95              :    */
      96            0 :   void start_gathering_thread(const std::string& name = "noname")
      97              :   {
      98            0 :     if (run_gathering()) {
      99            0 :       ers::warning(GatherThreadingIssue(ERS_HERE,
     100              :                                  "Attempted to start gathering thread "
     101            0 :                                  "when it is already supposed to be running!"));
     102            0 :       return;
     103              :     }
     104            0 :     m_run_gathering = true;
     105            0 :     m_gathering_thread.reset(new std::thread([&] { m_gather_data(*this); }));
     106            0 :     auto handle = m_gathering_thread->native_handle();
     107            0 :     auto rc = pthread_setname_np(handle, name.c_str());
     108            0 :     if (rc != 0) {
     109            0 :       std::ostringstream s;
     110            0 :       s << "The name " << name << " provided for the thread is too long.";
     111            0 :       ers::warning(GatherThreadingIssue(ERS_HERE, s.str()));
     112            0 :     }
     113              :   }
     114              : 
     115              :   /**
     116              :    * @brief Stop the gathering thread
     117              :    * @throws GatherThreadingIssue If the thread has not yet been started
     118              :    * @throws GatherThreadingIssue If the thread is not in the joinable state
     119              :    * @throws GatherThreadingIssue If an exception occurs during thread join
     120              :    */
     121            0 :   void stop_gathering_thread()
     122              :   {
     123            0 :     if (!run_gathering()) {
     124            0 :       ers::warning(GatherThreadingIssue(ERS_HERE,
     125              :                                  "Attempted to stop gathering thread "
     126            0 :                                  "when it is not supposed to be running!"));
     127            0 :       return;
     128              :     }
     129            0 :     m_run_gathering = false;
     130            0 :     if (m_gathering_thread->joinable()) {
     131            0 :       try {
     132            0 :         m_gathering_thread->join();
     133            0 :       } catch (std::system_error const& e) {
     134            0 :         throw GatherThreadingIssue(ERS_HERE, std::string("Error while joining gathering thread, ") + e.what());
     135            0 :       }
     136              :     } else {
     137            0 :       throw GatherThreadingIssue(ERS_HERE, "Thread not in joinable state during working thread stop!");
     138              :     }
     139              :   }
     140              : 
     141              :   /**
     142              :    * @brief Determine if the thread is currently running
     143              :    * @return Whether the thread is currently running
     144              :    */
     145            0 :   bool run_gathering() const { return m_run_gathering.load(); }
     146              : 
     147              :   void update_gather_interval(uint new_gather_interval) { m_gather_interval.store(new_gather_interval); }
     148            0 :   uint get_gather_interval() const { return m_gather_interval.load(); }
     149              : 
     150            0 :   void update_last_gathered_time(int64_t last_time) { m_last_gathered_time.store(last_time); }
     151              :   time_t get_last_gathered_time() const { return m_last_gathered_time.load(); }
     152              : 
     153            0 :   std::string get_device_name() const { return m_device_name; }
     154              : 
     155              :   int get_op_mon_level() const { return m_op_mon_level; }
     156              : 
     157              :   template<class DSGN>
     158            0 :   void collect_info_from_device(const DSGN& device)
     159              :   {
     160            0 :     std::unique_lock info_collector_lock(m_info_collector_mutex);
     161            0 :     m_device_info.reset( new timing::timingfirmwareinfo::TimingDeviceInfo() );
     162            0 :     device.get_info(*m_device_info);
     163            0 :     update_last_gathered_time(std::time(nullptr));
     164            0 :     send_device_info();
     165            0 :   }
     166              : 
     167              :   // void add_info_to_collector(std::string label, opmonlib::InfoCollector& ic)
     168              :   // {
     169              :   //   std::unique_lock info_collector_lock(m_info_collector_mutex);
     170              :   //   if (m_info_collector->is_empty()) {
     171              :   //     TLOG_DEBUG(3) << "skipping add info for gatherer: " << get_device_name()
     172              :   //            << " with gathered time: " << get_last_gathered_time() << " and level " << get_op_mon_level();
     173              :   //   } else {
     174              :   //     ic.add(label, *m_info_collector);
     175              :   //   }
     176              :   //   m_info_collector = std::make_unique<opmonlib::InfoCollector>();
     177              :   //   update_last_gathered_time(0);
     178              :   // }
     179              : 
     180              : private:
     181            0 :   void send_device_info()
     182              :   {
     183              :     //if (m_info_collector->is_empty())
     184              :     //{
     185              :     //  TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name() << ", collector empty.";
     186              :     //  return;
     187              :     //}
     188              : 
     189            0 :     if (!m_hw_info_sender)
     190              :     {
     191            0 :       TLOG_DEBUG(3) << "skipping sending info for gatherer: " << get_device_name();
     192            0 :       return;
     193              :     }
     194              :     
     195            0 :     nlohmann::json info;
     196            0 :     to_json(info, *m_device_info);
     197              :     bool was_successfully_sent = false;
     198            0 :     while (!was_successfully_sent)
     199              :     {
     200            0 :       try
     201              :       {
     202            0 :         m_hw_info_sender->send(std::move(info), m_queue_timeout);
     203            0 :         TLOG_DEBUG(4) << "sent " << get_device_name() <<  " info";
     204            0 :         ++m_sent_counter;
     205            0 :         was_successfully_sent = true;
     206              :       }
     207            0 :       catch (const dunedaq::iomanager::TimeoutExpired& excpt)
     208              :       {
     209            0 :         ers::error(DeviceInfoSendFailed(ERS_HERE, m_device_name, m_device_info_connection_id));
     210            0 :         ++m_failed_to_send_counter;
     211            0 :       }
     212              :     }
     213            0 :   }
     214              : 
     215              : protected:
     216              :   std::atomic<bool> m_run_gathering;
     217              :   std::unique_ptr<std::thread> m_gathering_thread;
     218              :   std::atomic<uint> m_gather_interval;
     219              :   mutable std::shared_mutex m_mon_data_mutex;
     220              :   std::string m_device_name;
     221              :   std::atomic<time_t> m_last_gathered_time;
     222              :   int m_op_mon_level;
     223              :   //  std::unique_ptr<opmonlib::InfoCollector> m_info_collector;
     224              :   std::unique_ptr<timing::timingfirmwareinfo::TimingDeviceInfo> m_device_info;
     225              :   mutable std::mutex m_info_collector_mutex;
     226              :   std::function<void(InfoGatherer&)> m_gather_data;
     227              :   std::string m_device_info_connection_id;
     228              :   using sink_t = dunedaq::iomanager::SenderConcept<nlohmann::json>;
     229              :   std::shared_ptr<sink_t> m_hw_info_sender;
     230              :   std::atomic<uint> m_sent_counter;
     231              :   std::atomic<uint> m_failed_to_send_counter;
     232              :   std::chrono::milliseconds m_queue_timeout;
     233              : };
     234              : 
     235              : } // namespace timinglibs
     236              : } // namespace dunedaq
     237              : 
     238              : #endif // TIMINGLIBS_SRC_INFOGATHERER_HPP_
     239              : 
     240              : // Local Variables:
     241              : // c-basic-offset: 2
     242              : // End:
        

Generated by: LCOV version 2.0-1