LCOV - code coverage report
Current view: top level - dpdklibs/src - SourceModel.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 43 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 40 0

            Line data    Source code
       1              : /**
       2              :  * @file SourceModel.hpp FELIX CR's ELink concept wrapper
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
       9              : #define DPDKLIBS_SRC_SOURCEMODEL_HPP_
      10              : 
      11              : #include "SourceConcept.hpp"
      12              : 
      13              : #include "dpdklibs/Issues.hpp"
      14              : 
      15              : #include "iomanager/IOManager.hpp"
      16              : #include "iomanager/Sender.hpp"
      17              : #include "logging/Logging.hpp"
      18              : 
      19              : #include "dpdklibs/opmon/SourceModel.pb.h"
      20              : 
      21              : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
      22              : 
      23              : // #include <folly/ProducerConsumerQueue.h>
      24              : // #include <nlohmann/json.hpp>
      25              : 
      26              : #include <atomic>
      27              : #include <memory>
      28              : #include <mutex>
      29              : #include <string>
      30              : 
      31              : 
      32              : namespace dunedaq::dpdklibs {
      33              : 
      34              : template<class TargetPayloadType>
      35              : class SourceModel : public SourceConcept
      36              : {
      37              : public:
      38              :   using sink_t = iomanager::SenderConcept<TargetPayloadType>;
      39              :   using inherited = SourceConcept;
      40              :   using data_t = nlohmann::json;
      41              : 
      42              :   /**
      43              :    * @brief SourceModel Constructor
      44              :    * @param name Instance name for this SourceModel instance
      45              :    */
      46            0 :   SourceModel()
      47            0 :     : SourceConcept()
      48            0 :   {}
      49            0 :   ~SourceModel() {}
      50              : 
      51            0 :   void set_sink(const std::string& sink_name, bool callback_mode) override
      52              :   {
      53            0 :     m_callback_mode = callback_mode;
      54            0 :     if (callback_mode) {
      55            0 :       TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
      56              :     } else {
      57            0 :       if (m_sink_is_set) {
      58            0 :         TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
      59              :       } else {
      60            0 :         m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
      61            0 :         m_sink_is_set = true;
      62              :       }
      63              :     }
      64            0 :   }
      65              : 
      66            0 :   void acquire_callback() override
      67              :   {
      68            0 :     if (m_callback_mode) {
      69            0 :       if (m_callback_is_acquired) {
      70            0 :         TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
      71              :       } else {
      72              :         // Getting DataMoveCBRegistry
      73            0 :         auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
      74            0 :         m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
      75            0 :         m_callback_is_acquired = true;
      76            0 :       }
      77              :     } else {
      78            0 :       TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
      79              :     }
      80            0 :   }
      81              : 
      82              :   // Exposes sink via returning a pointer to it. 
      83              :   std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
      84              : 
      85              :   // Process an incoming raw byte buffer and extract complete frames of type TargetPayloadType.
      86            0 :   void handle_daq_frame(char* buffer, std::size_t size)
      87              :   {
      88              :     // Calculate how many full frames fit in the incoming message buffer.
      89            0 :     std::size_t full_frames = size / m_expected_frame_size;
      90              :     
      91              :     // Calculate leftover bytes that don't form a complete frame.
      92            0 :     if (size % m_expected_frame_size > 0) [[unlikely]] {
      93            0 :       ++m_leftover_bytes_encountered;
      94              :     }
      95              :     
      96              :     // Process each full frames
      97            0 :     for (std::size_t i = 0; i < full_frames; ++i) {
      98              :       // Calculate pointer to the i-th frame chunk inside the message buffer.
      99            0 :       const char* src = buffer + i * m_expected_frame_size;
     100              :     
     101              :       // Materialize a real TargetPayloadType object by copying bytes from the buffer.
     102              :       // This is defined behavior, alignment-safe, and fast, without pointer vodoo
     103              :       // Previously reinterpret_cast to TargetPayloadType* introduced alignment traps 
     104              :       // “pretend there’s a constructed object there” UB. Scatter won't work like that.
     105              :       TargetPayloadType frame;
     106            0 :       std::memcpy(&frame, src, m_expected_frame_size);
     107              : 
     108            0 :       if (m_callback_mode) {
     109              :         // Pass by value (moved); no references into 'buffer', so no UAF.
     110            0 :         (*m_sink_callback)(std::move(frame));
     111              :       } else {
     112              :         // Queue mode: attempt to enqueue the frame in a non-blocking way.
     113            0 :         if (!m_sink_queue->try_send(std::move(frame), iomanager::Sender::s_no_block)) {
     114            0 :            ++m_failed_to_send_daq_payloads;
     115              :         }
     116              :       }
     117              :     }
     118            0 :   }
     119              : 
     120            0 :   void generate_opmon_data() override {
     121              :       
     122            0 :     if(m_failed_to_send_daq_payloads != 0) {
     123            0 :         ers::warning(FailedToSendData(ERS_HERE, m_sink_id, m_failed_to_send_daq_payloads));
     124              :     }
     125              : 
     126            0 :     opmon::SourceInfo info;
     127            0 :     info.set_failed_to_send_daq_payloads( m_failed_to_send_daq_payloads.exchange(0) );
     128            0 :     info.set_leftover_bytes_encountered( m_leftover_bytes_encountered.exchange(0) );
     129              : 
     130            0 :     publish( std::move(info) );
     131            0 :   }
     132              :   
     133              : private:
     134              :   // Constants
     135              :   const std::size_t m_expected_frame_size = sizeof(TargetPayloadType);
     136              : 
     137              :   // Sink internals
     138              :   std::string m_sink_id;
     139              :   bool m_sink_is_set{ false };
     140              :   std::shared_ptr<sink_t> m_sink_queue;
     141              : 
     142              :   // Callback internals
     143              :   bool m_callback_mode;
     144              :   bool m_callback_is_acquired{ false };
     145              :   using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
     146              :   sink_cb_t m_sink_callback;
     147              : 
     148              :   // Stats
     149              :   std::atomic<uint64_t> m_leftover_bytes_encountered{0};
     150              :   std::atomic<uint64_t> m_failed_to_send_daq_payloads{0};
     151              : 
     152              : };
     153              : 
     154              : } // namespace dunedaq::dpdklibs
     155              : 
     156              : #endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
        

Generated by: LCOV version 2.0-1