LCOV - code coverage report
Current view: top level - flxlibs/src - ElinkModel.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 88 0
Test Date: 2026-03-29 15:29:34 Functions: 0.0 % 57 0

            Line data    Source code
       1              : /**
       2              :  * @file ElinkModel.hpp FELIX CR's ELink concept wrapper
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
       9              : #define FLXLIBS_SRC_ELINKMODEL_HPP_
      10              : 
      11              : #include "ElinkConcept.hpp"
      12              : 
      13              : #include "flxlibs/opmon/ElinkModel.pb.h"
      14              : 
      15              : #include "packetformat/block_format.hpp"
      16              : 
      17              : //#include "appfwk/DAQModuleHelper.hpp"
      18              : #include "iomanager/IOManager.hpp"
      19              : #include "iomanager/Sender.hpp"
      20              : #include "logging/Logging.hpp"
      21              : #include "utilities/ReusableThread.hpp"
      22              : 
      23              : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
      24              : 
      25              : #include <folly/ProducerConsumerQueue.h>
      26              : #include <nlohmann/json.hpp>
      27              : 
      28              : #include <atomic>
      29              : #include <memory>
      30              : #include <mutex>
      31              : #include <string>
      32              : 
      33              : namespace dunedaq::flxlibs {
      34              : 
      35              : template<class TargetPayloadType>
      36              : class ElinkModel : public ElinkConcept
      37              : {
      38              : public:
      39              :   using err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>;
      40              :   using inherited = ElinkConcept;
      41              :   using data_t = nlohmann::json;
      42              : 
      43              :   /**
      44              :    * @brief ElinkModel Constructor
      45              :    * @param name Instance name for this ElinkModel instance
      46              :    */
      47            0 :   ElinkModel()
      48              :     : ElinkConcept()
      49            0 :     , m_run_marker{ false }
      50            0 :     , m_parser_thread(0)
      51            0 :   {}
      52            0 :   ~ElinkModel() {}
      53              : 
      54              :   std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
      55              : 
      56            0 :   void init(const size_t block_queue_capacity)
      57              :   {
      58            0 :     m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
      59            0 :   }
      60              : 
      61            0 :   void conf(size_t block_size, bool is_32b_trailers)
      62              :   {
      63            0 :     if (m_configured) {
      64            0 :       TLOG_DEBUG(5) << "ElinkModel is already configured!";
      65              :     } else {
      66            0 :       m_parser_thread.set_name(inherited::m_elink_source_tid, inherited::m_link_tag);
      67              :       // if (inconsistency)
      68              :       // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
      69              : 
      70            0 :       m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
      71            0 :       m_configured = true;
      72              :     }
      73            0 :   }
      74              : 
      75            0 :   void start()
      76              :   {
      77            0 :     m_t0 = std::chrono::high_resolution_clock::now();
      78            0 :     if (!m_run_marker.load()) {
      79            0 :       set_running(true);
      80            0 :       m_parser_thread.set_work(&ElinkModel::process_elink, this);
      81            0 :       TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
      82              :     } else {
      83            0 :       TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
      84              :     }
      85            0 :   }
      86              : 
      87            0 :   void stop()
      88              :   {
      89            0 :     if (m_run_marker.load()) {
      90            0 :       set_running(false);
      91            0 :       while (!m_parser_thread.get_readiness()) {
      92            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
      93              :       }
      94            0 :       TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
      95              :     } else {
      96            0 :       TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
      97              :     }
      98            0 :   }
      99              : 
     100            0 :   void set_running(bool should_run)
     101              :   {
     102            0 :     bool was_running = m_run_marker.exchange(should_run);
     103            0 :     TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
     104            0 :   }
     105              : 
     106            0 :   bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
     107              :   {
     108            0 :     if (m_block_addr_queue->write(block_addr)) { // ok write
     109              :       return true;
     110              :     } else { // failed write
     111              :       return false;
     112              :     }
     113              :   }
     114              : 
     115            0 :   void acquire_callback() override
     116              :   {
     117            0 :     if (m_callback_is_acquired) {
     118            0 :       TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
     119              :     } else {
     120              :       // Getting DataMoveCBRegistry
     121            0 :       auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
     122            0 :       m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
     123            0 :       m_callback_is_acquired = true;
     124            0 :     }
     125            0 :   }
     126              : 
     127              :   // Callbacks
     128              :   bool m_callback_is_acquired{ false };
     129              :   using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
     130              :   sink_cb_t m_sink_callback;
     131              : 
     132              : protected:
     133            0 :   void generate_opmon_data() override {
     134              : 
     135            0 :     opmon::CardReaderInfo info;
     136            0 :     auto now = std::chrono::high_resolution_clock::now();
     137            0 :     auto& stats = m_parser_impl.get_stats();
     138              : 
     139            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     140              : 
     141            0 :     info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
     142            0 :     info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
     143            0 :     info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
     144            0 :     info.set_num_blocks_processed(stats.block_ctr.exchange(0));
     145              : 
     146            0 :     info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
     147            0 :     info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
     148              : 
     149            0 :     info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
     150            0 :     info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
     151            0 :     info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
     152            0 :     info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
     153            0 :     info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
     154            0 :     info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
     155            0 :     info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
     156              : 
     157              : 
     158            0 :     TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
     159            0 :                   << " Parser stats ->"
     160            0 :                   << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
     161            0 :                   << " [kHz]"
     162            0 :                   << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
     163            0 :                   << " [kHz]"
     164            0 :                   << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
     165            0 :                   << " Error Chunks: " << info.num_chunks_processed_with_error()
     166            0 :                   << " Error Shorts: " << info.num_short_chunks_processed_with_error()
     167            0 :                   << " Error Subchunks: " << info.num_subchunks_processed_with_error()
     168            0 :                   << " Error Block: " << info.num_blocks_processed_with_error();
     169              : 
     170            0 :     m_t0 = now;
     171              : 
     172            0 :     publish( std::move(info),
     173            0 :              { { "card", std::to_string(m_card_id) },
     174            0 :                { "logical_unit", std::to_string(m_logical_unit) },
     175            0 :                { "link", std::to_string(m_link_id) },
     176            0 :                { "tag", std::to_string(m_link_tag) } } );
     177              :              
     178            0 :   }
     179              : 
     180              : 
     181              : private:
     182              :   // Types
     183              :   using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
     184              : 
     185              :   // Internals
     186              :   std::atomic<bool> m_run_marker;
     187              :   bool m_configured{ false };
     188              : 
     189              :   // Sink
     190              :   bool m_sink_is_set{ false };
     191              :   std::shared_ptr<err_sink_t> m_error_sink_queue;
     192              : 
     193              :   // blocks to process
     194              :   UniqueBlockAddrQueue m_block_addr_queue;
     195              : 
     196              :   // Processor
     197              :   inline static const std::string m_parser_thread_name = "elinkp";
     198              :   utilities::ReusableThread m_parser_thread;
     199            0 :   void process_elink()
     200              :   {
     201            0 :     while (m_run_marker.load()) {
     202              :       uint64_t block_addr;                        // NOLINT
     203            0 :       if (m_block_addr_queue->read(block_addr)) { // read success
     204              :         const auto* block = const_cast<felix::packetformat::block*>(
     205            0 :           felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
     206              :         );
     207            0 :         m_parser->process(block);
     208              :       } else { // couldn't read from queue
     209            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     210              :       }
     211              :     }
     212            0 :   }
     213              : };
     214              : 
     215              : } // namespace dunedaq::flxlibs
     216              : 
     217              : #endif // FLXLIBS_SRC_ELINKMODEL_HPP_
        

Generated by: LCOV version 2.0-1