LCOV - code coverage report
Current view: top level - crtmodules/src - SourceModel.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 38 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 22 0

            Line data    Source code
       1              : /**
       2              :  * @file SourceModel.hpp FELIX CR's ELink concept wrapper
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_
       9              : #define CRTMODULES_SRC_SOURCEMODEL_HPP_
      10              : 
      11              : #include "SourceConcept.hpp"
      12              : 
      13              : 
      14              : #include "iomanager/IOManager.hpp"
      15              : #include "iomanager/Sender.hpp"
      16              : #include "logging/Logging.hpp"
      17              : 
      18              : #include "crtmodules/opmon/SourceModel.pb.h"
      19              : 
      20              : // #include "datahandlinglibs/utils/ReusableThread.hpp"
      21              : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
      22              : #include "datahandlinglibs/utils/BufferCopy.hpp" 
      23              : 
      24              : // #include <folly/ProducerConsumerQueue.h>
      25              : // #include <nlohmann/json.hpp>
      26              : 
      27              : #include <atomic>
      28              : #include <memory>
      29              : #include <mutex>
      30              : #include <string>
      31              : 
      32              : 
      33              : namespace dunedaq::crtmodules {
      34              : 
      35              : template<class TargetPayloadType>
      36              : class SourceModel : public SourceConcept
      37              : {
      38              : public:
      39              :   using sink_t = iomanager::SenderConcept<TargetPayloadType>;
      40              :   using inherited = SourceConcept;
      41              :   using data_t = nlohmann::json;
      42              : 
      43              :   /**
      44              :    * @brief Buffer size based on TargetPayloadType
      45              :    */
      46              :   static constexpr auto buffer_size = sizeof(TargetPayloadType);
      47              : 
      48              :   /**
      49              :    * @brief SourceModel Constructor
      50              :    * @param name Instance name for this SourceModel instance
      51              :    */
      52            0 :   SourceModel()
      53            0 :     : SourceConcept()
      54            0 :   {}
      55            0 :   ~SourceModel() {}
      56              : 
      57            0 :   void set_sink(const std::string& sink_name, bool callback_mode) override
      58              :   {
      59            0 :     m_callback_mode = callback_mode;
      60            0 :     if (callback_mode) {
      61            0 :       TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
      62              :     } else {
      63            0 :       if (m_sink_is_set) {
      64            0 :         TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
      65              :       } else {
      66            0 :         m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
      67            0 :         m_sink_is_set = true;
      68              :       }
      69              :     }
      70            0 :   }
      71              : 
      72            0 :   void acquire_callback() override
      73              :   {
      74            0 :     if (m_callback_mode) {
      75            0 :       if (m_callback_is_acquired) {
      76            0 :         TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
      77              :       } else {
      78              :         // Getting DataMoveCBRegistry
      79            0 :         auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
      80            0 :         m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
      81            0 :         m_callback_is_acquired = true;
      82            0 :       }
      83              :     } else {
      84            0 :       TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
      85              :     }
      86            0 :   }
      87              : 
      88              :   std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
      89              : 
      90            0 :   bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
      91              :   {
      92            0 :     bool push_out = true;
      93              :     if (push_out) {
      94              : 
      95            0 :       TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
      96              :   
      97            0 :       if (m_callback_mode) {
      98            0 :         (*m_sink_callback)(std::move(target_payload));
      99              :       } else {
     100            0 :         if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
     101              :           //if(m_dropped_packets == 0 || m_dropped_packets%10000) {
     102              :           //  TLOG() << "Dropped data " << m_dropped_packets;
     103              :           //}
     104            0 :           ++m_dropped_packets;
     105              :         }
     106              :       }
     107              : 
     108              :     } else {
     109              :       TargetPayloadType target_payload;
     110              :       uint32_t bytes_copied = 0;
     111              :       datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
     112              :     }
     113              : 
     114            0 :     return true;
     115              :   }
     116              : 
     117            0 :   std::size_t get_frame_size() const override {
     118              :     TargetPayloadType target_payload; 
     119            0 :     return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
     120              :   }
     121              :     
     122            0 :   void generate_opmon_data() override {
     123              : 
     124            0 :     opmon::SourceInfo info;
     125            0 :     info.set_dropped_frames( m_dropped_packets.load() ); 
     126              : 
     127            0 :     publish( std::move(info) );
     128            0 :   }
     129              :   
     130              : private:
     131              :   // Sink internals
     132              :   std::string m_sink_id;
     133              :   bool m_sink_is_set{ false };
     134              :   std::shared_ptr<sink_t> m_sink_queue;
     135              : 
     136              :   // Callback internals
     137              :   bool m_callback_mode;
     138              :   bool m_callback_is_acquired{ false };
     139              :   using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
     140              :   sink_cb_t m_sink_callback;
     141              : 
     142              :   std::atomic<uint64_t> m_dropped_packets{0};
     143              : 
     144              : };
     145              : 
     146              : } // namespace dunedaq::crtmodules
     147              : 
     148              : #endif // CRTMODULES_SRC_SOURCEMODEL_HPP_
        

Generated by: LCOV version 2.0-1