LCOV - code coverage report
Current view: top level - ipm/src - CallbackAdapter.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 98.3 % 60 59
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 9 9

            Line data    Source code
       1              : /**
       2              :  *
       3              :  * @file CallbackAdapter.cpp ipm CallbackAdapter class
       4              :  *
       5              :  * This is part of the DUNE DAQ Application Framework, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "CallbackAdapter.hpp"
      11              : 
      12              : #include "logging/Logging.hpp"
      13              : 
      14              : #include <memory>
      15              : #include <string>
      16              : #include <utility>
      17              : 
      18              : namespace dunedaq::ipm {
      19              : 
      20           43 : CallbackAdapter::~CallbackAdapter() noexcept
      21              : {
      22           43 :   {
      23           43 :     std::lock_guard<std::mutex> lk(m_callback_mutex);
      24           43 :     m_callback = nullptr;
      25           43 :   }
      26           43 :   shutdown();
      27           43 :   m_receiver_ptr = nullptr;
      28           43 : }
      29              : 
      30              : void
      31           40 : CallbackAdapter::set_receiver(Receiver* receiver_ptr)
      32              : {
      33           40 :   {
      34           40 :     std::lock_guard<std::mutex> lk(m_callback_mutex);
      35           40 :     m_receiver_ptr = receiver_ptr;
      36           40 :   }
      37              : 
      38           40 :   if (m_receiver_ptr != nullptr && m_callback != nullptr) {
      39            0 :     startup();
      40              :   }
      41           40 : }
      42              : 
      43              : void
      44            4 : CallbackAdapter::set_callback(std::function<void(Receiver::Response&)> callback)
      45              : {
      46            4 :   {
      47            4 :     std::lock_guard<std::mutex> lk(m_callback_mutex);
      48            4 :     m_callback = callback;
      49            4 :   }
      50              : 
      51            4 :   if (m_receiver_ptr != nullptr && m_callback != nullptr) {
      52            4 :     startup();
      53              :   }
      54            4 : }
      55              : 
      56              : void
      57           45 : CallbackAdapter::clear_callback()
      58              : {
      59           45 :   {
      60           45 :     std::lock_guard<std::mutex> lk(m_callback_mutex);
      61           45 :     m_callback = nullptr;
      62           45 :   }
      63           45 :   shutdown();
      64           45 : }
      65              : 
      66              : void
      67           92 : CallbackAdapter::shutdown()
      68              : {
      69           92 :   if (m_thread && m_thread->joinable())
      70            4 :     m_thread->join();
      71              : 
      72           92 :   m_is_listening = false;
      73           92 :   m_thread.reset(nullptr);
      74           92 : }
      75              : 
      76              : void
      77            4 : CallbackAdapter::startup()
      78              : {
      79            4 :   shutdown();
      80            4 :   m_is_listening = false;
      81            8 :   m_thread = std::make_unique<std::thread>([&] { thread_loop(); });
      82              : 
      83           28 :   while (!m_is_listening.load()) {
      84           24 :     usleep(1000);
      85              :   }
      86            4 : }
      87              : 
      88              : void
      89            4 : CallbackAdapter::thread_loop()
      90              : {
      91        58681 :   do {
      92        58681 :     try {
      93        58681 :       auto response = m_receiver_ptr->receive(Receiver::s_no_block);
      94              : 
      95        58656 :       TLOG_DEBUG(45) << "Received " << response.data.size() << " bytes. Dispatching to callback.";
      96        58656 :       {
      97        58656 :         std::lock_guard<std::mutex> lk(m_callback_mutex);
      98        58656 :         if (m_callback != nullptr) {
      99        58654 :           m_callback(response);
     100              :         }
     101        58656 :       }
     102        58681 :     } catch (ipm::ReceiveTimeoutExpired const& tmo) {
     103           25 :       usleep(10000);
     104           25 :     }
     105        58681 :     m_is_listening = true;
     106        58685 :   } while (m_callback != nullptr && m_receiver_ptr != nullptr);
     107            4 : }
     108              : 
     109              : } // namespace dunedaq::ipm
        

Generated by: LCOV version 2.0-1