LCOV - code coverage report
Current view: top level - flxlibs/src - ElinkModel.hpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 84 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 60 0

            Line data    Source code
       1              : /**
       2              :  * @file ElinkModel.hpp FELIX CR's ELink concept wrapper
       3              :  *
       4              :  * This is part of the DUNE DAQ , copyright 2020.
       5              :  * Licensing/copyright details are in the COPYING file that you should have
       6              :  * received with this code.
       7              :  */
       8              : #ifndef FLXLIBS_SRC_ELINKMODEL_HPP_
       9              : #define FLXLIBS_SRC_ELINKMODEL_HPP_
      10              : 
      11              : #include "ElinkConcept.hpp"
      12              : 
      13              : #include "flxlibs/opmon/ElinkModel.pb.h"
      14              : 
      15              : #include "packetformat/block_format.hpp"
      16              : 
      17              : //#include "appfwk/DAQModuleHelper.hpp"
      18              : #include "iomanager/IOManager.hpp"
      19              : #include "iomanager/Sender.hpp"
      20              : #include "logging/Logging.hpp"
      21              : #include "utilities/ReusableThread.hpp"
      22              : 
      23              : #include <folly/ProducerConsumerQueue.h>
      24              : #include <nlohmann/json.hpp>
      25              : 
      26              : #include <atomic>
      27              : #include <memory>
      28              : #include <mutex>
      29              : #include <string>
      30              : 
      31              : namespace dunedaq::flxlibs {
      32              : 
      33              : template<class TargetPayloadType>
      34              : class ElinkModel : public ElinkConcept
      35              : {
      36              : public:
      37              :   using sink_t = iomanager::SenderConcept<TargetPayloadType>;
      38              :   using err_sink_t = iomanager::SenderConcept<felix::packetformat::chunk>;
      39              :   using inherited = ElinkConcept;
      40              :   using data_t = nlohmann::json;
      41              : 
      42              :   /**
      43              :    * @brief ElinkModel Constructor
      44              :    * @param name Instance name for this ElinkModel instance
      45              :    */
      46            0 :   ElinkModel()
      47              :     : ElinkConcept()
      48            0 :     , m_run_marker{ false }
      49            0 :     , m_parser_thread(0)
      50            0 :   {}
      51            0 :   ~ElinkModel() {}
      52              : 
      53            0 :   void set_sink(const std::string& sink_name) override
      54              :   {
      55            0 :     if (m_sink_is_set) {
      56            0 :       TLOG_DEBUG(5) << "ElinkModel sink is already set in initialized!";
      57              :     } else {
      58            0 :       m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
      59            0 :       m_sink_is_set = true;
      60              :     }
      61            0 :   }
      62              : 
      63            0 :   std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
      64              : 
      65              :   std::shared_ptr<err_sink_t>& get_error_sink() { return m_error_sink_queue; }
      66              : 
      67            0 :   void init(const size_t block_queue_capacity)
      68              :   {
      69            0 :     m_block_addr_queue = std::make_unique<folly::ProducerConsumerQueue<uint64_t>>(block_queue_capacity); // NOLINT
      70            0 :   }
      71              : 
      72            0 :   void conf(size_t block_size, bool is_32b_trailers)
      73              :   {
      74            0 :     if (m_configured) {
      75            0 :       TLOG_DEBUG(5) << "ElinkModel is already configured!";
      76              :     } else {
      77            0 :       m_parser_thread.set_name(inherited::m_elink_source_tid, inherited::m_link_tag);
      78              :       // if (inconsistency)
      79              :       // ers::fatal(ElinkConfigurationInconsistency(ERS_HERE, m_num_links));
      80              : 
      81            0 :       m_parser->configure(block_size, is_32b_trailers); // unsigned bsize, bool trailer_is_32bit
      82            0 :       m_configured = true;
      83              :     }
      84            0 :   }
      85              : 
      86            0 :   void start()
      87              :   {
      88            0 :     m_t0 = std::chrono::high_resolution_clock::now();
      89            0 :     if (!m_run_marker.load()) {
      90            0 :       set_running(true);
      91            0 :       m_parser_thread.set_work(&ElinkModel::process_elink, this);
      92            0 :       TLOG() << "Started ElinkModel of link " << inherited::m_link_id << "...";
      93              :     } else {
      94            0 :       TLOG_DEBUG(5) << "ElinkModel of link " << inherited::m_link_id << " is already running!";
      95              :     }
      96            0 :   }
      97              : 
      98            0 :   void stop()
      99              :   {
     100            0 :     if (m_run_marker.load()) {
     101            0 :       set_running(false);
     102            0 :       while (!m_parser_thread.get_readiness()) {
     103            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     104              :       }
     105            0 :       TLOG_DEBUG(5) << "Stopped ElinkModel of link " << m_link_id << "!";
     106              :     } else {
     107            0 :       TLOG_DEBUG(5) << "ElinkModel of link " << m_link_id << " is already stopped!";
     108              :     }
     109            0 :   }
     110              : 
     111            0 :   void set_running(bool should_run)
     112              :   {
     113            0 :     bool was_running = m_run_marker.exchange(should_run);
     114            0 :     TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run;
     115            0 :   }
     116              : 
     117            0 :   bool queue_in_block_address(uint64_t block_addr) // NOLINT(build/unsigned)
     118              :   {
     119            0 :     if (m_block_addr_queue->write(block_addr)) { // ok write
     120              :       return true;
     121              :     } else { // failed write
     122            0 :       return false;
     123              :     }
     124              :   }
     125              : 
     126              : 
     127              : protected:
     128            0 :   void generate_opmon_data() override {
     129              : 
     130            0 :     opmon::CardReaderInfo info;
     131            0 :     auto now = std::chrono::high_resolution_clock::now();
     132            0 :     auto& stats = m_parser_impl.get_stats();
     133              : 
     134            0 :     double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
     135              : 
     136            0 :     info.set_num_short_chunks_processed( stats.short_ctr.exchange(0) );
     137            0 :     info.set_num_chunks_processed( stats.chunk_ctr.exchange(0) );
     138            0 :     info.set_num_subchunks_processed(stats.subchunk_ctr.exchange(0));
     139            0 :     info.set_num_blocks_processed(stats.block_ctr.exchange(0));
     140              : 
     141            0 :     info.set_rate_blocks_processed(info.num_blocks_processed() / seconds / 1000. );
     142            0 :     info.set_rate_chunks_processed(info.num_chunks_processed() / seconds / 1000. );
     143              : 
     144            0 :     info.set_num_short_chunks_processed_with_error(stats.error_short_ctr.exchange(0));
     145            0 :     info.set_num_chunks_processed_with_error(stats.error_chunk_ctr.exchange(0));
     146            0 :     info.set_num_subchunks_processed_with_error(stats.error_subchunk_ctr.exchange(0));
     147            0 :     info.set_num_blocks_processed_with_error(stats.error_block_ctr.exchange(0));
     148            0 :     info.set_num_subchunk_crc_errors(stats.subchunk_crc_error_ctr.exchange(0));
     149            0 :     info.set_num_subchunk_trunc_errors(stats.subchunk_trunc_error_ctr.exchange(0));
     150            0 :     info.set_num_subchunk_errors(stats.subchunk_error_ctr.exchange(0));
     151              : 
     152              : 
     153            0 :     TLOG_DEBUG(2) << inherited::m_elink_str // Move to TLVL_TAKE_NOTE from readout
     154            0 :                   << " Parser stats ->"
     155            0 :                   << " Blocks: " << info.num_blocks_processed() << " Block rate: " << info.rate_blocks_processed()
     156            0 :                   << " [kHz]"
     157            0 :                   << " Chunks: " << info.num_chunks_processed() << " Chunk rate: " << info.rate_chunks_processed()
     158            0 :                   << " [kHz]"
     159            0 :                   << " Shorts: " << info.num_short_chunks_processed() << " Subchunks:" << info.num_subchunks_processed()
     160            0 :                   << " Error Chunks: " << info.num_chunks_processed_with_error()
     161            0 :                   << " Error Shorts: " << info.num_short_chunks_processed_with_error()
     162            0 :                   << " Error Subchunks: " << info.num_subchunks_processed_with_error()
     163            0 :                   << " Error Block: " << info.num_blocks_processed_with_error();
     164              : 
     165            0 :     m_t0 = now;
     166              : 
     167            0 :     publish( std::move(info),
     168              :              { { "card", std::to_string(m_card_id) },
     169              :                { "logical_unit", std::to_string(m_logical_unit) },
     170              :                { "link", std::to_string(m_link_id) },
     171              :                { "tag", std::to_string(m_link_tag) } } );
     172              :              
     173            0 :   }
     174              : 
     175              : private:
     176              :   // Types
     177              :   using UniqueBlockAddrQueue = std::unique_ptr<folly::ProducerConsumerQueue<uint64_t>>; // NOLINT(build/unsigned)
     178              : 
     179              :   // Internals
     180              :   std::atomic<bool> m_run_marker;
     181              :   bool m_configured{ false };
     182              : 
     183              :   // Sink
     184              :   bool m_sink_is_set{ false };
     185              :   std::shared_ptr<sink_t> m_sink_queue;
     186              :   std::shared_ptr<err_sink_t> m_error_sink_queue;
     187              : 
     188              :   // blocks to process
     189              :   UniqueBlockAddrQueue m_block_addr_queue;
     190              : 
     191              :   // Processor
     192              :   inline static const std::string m_parser_thread_name = "elinkp";
     193              :   utilities::ReusableThread m_parser_thread;
     194            0 :   void process_elink()
     195              :   {
     196            0 :     while (m_run_marker.load()) {
     197              :       uint64_t block_addr;                        // NOLINT
     198            0 :       if (m_block_addr_queue->read(block_addr)) { // read success
     199              :         const auto* block = const_cast<felix::packetformat::block*>(
     200            0 :           felix::packetformat::block_from_bytes(reinterpret_cast<const char*>(block_addr)) // NOLINT
     201              :         );
     202            0 :         m_parser->process(block);
     203              :       } else { // couldn't read from queue
     204            0 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     205              :       }
     206              :     }
     207            0 :   }
     208              : };
     209              : 
     210              : } // namespace dunedaq::flxlibs
     211              : 
     212              : #endif // FLXLIBS_SRC_ELINKMODEL_HPP_
        

Generated by: LCOV version 2.0-1