LCOV - code coverage report
Current view: top level - listrev/plugins - ReversedListValidator.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 99.4 % 161 160
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 33 33

            Line data    Source code
       1              : /**
       2              :  * @file ReversedListValidator.cpp ReversedListValidator class
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "ReversedListValidator.hpp"
      11              : #include "CommonIssues.hpp"
      12              : 
      13              : #include "listrev/dal/RandomDataListGenerator.hpp"
      14              : #include "listrev/dal/RandomListGeneratorSet.hpp"
      15              : #include "listrev/dal/ReversedListValidator.hpp"
      16              : #include "listrev/opmon/list_rev_info.pb.h"
      17              : 
      18              : #include "appfwk/ConfigurationManager.hpp"
      19              : #include "confmodel/Connection.hpp"
      20              : #include "iomanager/IOManager.hpp"
      21              : #include "logging/Logging.hpp"
      22              : 
      23              : #include <chrono>
      24              : #include <functional>
      25              : #include <list>
      26              : #include <memory>
      27              : #include <string>
      28              : #include <thread>
      29              : #include <utility>
      30              : #include <vector>
      31              : 
      32              : /**
      33              :  * @brief Name used by TRACE TLOG calls from this source file
      34              :  */
      35              : #define TRACE_NAME "ReversedListValidator" // NOLINT
      36              : #define TLVL_ENTER_EXIT_METHODS 10         // NOLINT
      37              : #define TLVL_LIST_VALIDATION 15            // NOLINT
      38              : #define TLVL_REQUEST_SENDING 16            // NOLINT
      39              : #define TLVL_PROCESS_LIST 17               // NOLINT
      40              : 
      41              : namespace dunedaq::listrev {
      42              : 
      43            4 : ReversedListValidator::ReversedListValidator(const std::string& name)
      44              :   : DAQModule(name)
      45            4 :   , m_work_thread(std::bind(&ReversedListValidator::do_work, this, std::placeholders::_1))
      46              : {
      47            4 :   register_command("start", &ReversedListValidator::do_start);
      48            4 :   register_command("stop", &ReversedListValidator::do_stop);
      49            4 : }
      50              : 
      51              : void
      52            4 : ReversedListValidator::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      53              : {
      54            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      55              : 
      56            4 :   auto mdal = mcfg->get_dal<dal::ReversedListValidator>(get_name());
      57            4 :   for (auto con : mdal->get_inputs()) {
      58            4 :     if (con->get_data_type() == datatype_to_string<ReversedList>()) {
      59            4 :       m_list_connection = con->UID();
      60              :       break;
      61              :     }
      62              :   }
      63           12 :   for (auto con : mdal->get_outputs()) {
      64            8 :     if (con->get_data_type() == datatype_to_string<CreateList>()) {
      65            4 :       m_create_connection = con->UID();
      66              :     }
      67            8 :     if (con->get_data_type() == datatype_to_string<RequestList>()) {
      68            4 :       m_num_reversers++;
      69            4 :       m_reveserIds.push_back(con->UID());
      70              :     }
      71              :   }
      72              : 
      73            8 :   for (auto gen : mdal->get_generatorSet()->get_generators()) {
      74            4 :     m_generatorIds.push_back(gen->get_generator_id());
      75              :   }
      76            4 :   m_num_generators = m_generatorIds.size();
      77              : 
      78              :   // these are just tests to check if the connections are ok
      79            4 :   auto iom = iomanager::IOManager::get();
      80            4 :   iom->get_receiver<ReversedList>(m_list_connection);
      81            4 :   iom->get_sender<CreateList>(m_create_connection);
      82              : 
      83            4 :   m_send_timeout = std::chrono::milliseconds(mdal->get_send_timeout_ms());
      84            4 :   m_request_timeout = std::chrono::milliseconds(mdal->get_request_timeout_ms());
      85            4 :   m_max_outstanding_requests = mdal->get_max_outstanding_requests();
      86              : 
      87            4 :   m_list_creator =
      88            4 :     ListCreator(m_create_connection, m_send_timeout, mdal->get_min_list_size(), mdal->get_max_list_size());
      89              : 
      90            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      91            4 : }
      92              : 
      93              : void
      94            4 : ReversedListValidator::generate_opmon_data()
      95              : {
      96            4 :   opmon::ReversedListValidatorInfo fcr;
      97              : 
      98            8 :   fcr.set_total_requests(m_requests_total.load());
      99            4 :   fcr.set_new_requests(m_new_requests.exchange(0));
     100            8 :   fcr.set_total_lists(m_total_lists.load());
     101            4 :   fcr.set_new_lists(m_new_lists.exchange(0));
     102            8 :   fcr.set_total_valid_pairs(m_total_valid_pairs.load());
     103            4 :   fcr.set_valid_list_pairs(m_valid_list_pairs.exchange(0));
     104            8 :   fcr.set_total_invalid_pairs(m_total_invalid_pairs.load());
     105            4 :   fcr.set_invalid_list_pairs(m_invalid_list_pairs.exchange(0));
     106              : 
     107            4 :   publish(std::move(fcr));
     108            4 : }
     109              : 
     110              : void
     111            4 : ReversedListValidator::do_start(const CommandData_t& /*args*/)
     112              : {
     113            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     114            4 :   m_next_id = 0;
     115            4 :   m_work_thread.start_working_thread();
     116            8 :   get_iomanager()->add_callback<ReversedList>(
     117            8 :     m_list_connection, std::bind(&ReversedListValidator::process_list, this, std::placeholders::_1));
     118            8 :   TLOG() << get_name() << " successfully started";
     119            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     120            4 : }
     121              : 
     122              : void
     123            4 : ReversedListValidator::do_stop(const CommandData_t& /*args*/)
     124              : {
     125            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     126            4 :   m_work_thread.stop_working_thread();
     127            4 :   std::this_thread::sleep_for(std::chrono::milliseconds(100));
     128              : 
     129            4 :   std::chrono::milliseconds stop_timeout(5000);
     130            4 :   auto stop_wait = std::chrono::steady_clock::now();
     131            4 :   size_t outstanding_wait = 1;
     132         1988 :   while (outstanding_wait > 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
     133         1988 :                                    std::chrono::steady_clock::now() - stop_wait) < stop_timeout) {
     134         1984 :     std::this_thread::sleep_for(std::chrono::milliseconds(10));
     135         1984 :     std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
     136         1984 :     outstanding_wait = m_outstanding_ids.size();
     137         1984 :   }
     138              : 
     139            8 :   TLOG() << get_name() << " Removing callback, there are " << outstanding_wait << " requests left outstanding.";
     140              : 
     141            4 :   get_iomanager()->remove_callback<ReversedList>(m_list_connection);
     142            8 :   TLOG() << get_name() << " successfully stopped";
     143              : 
     144            4 :   std::ostringstream oss_summ;
     145            8 :   oss_summ << ": Exiting do_stop() method, received " << m_total_lists.load() << " reversed list messages, "
     146           12 :            << "compared " << m_total_valid_pairs.load() + m_total_invalid_pairs.load()
     147            8 :            << " reversed lists to their original data, and found " << m_total_invalid_pairs.load() << " mismatches. ";
     148            4 :   ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
     149              : 
     150            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     151            4 : }
     152              : 
     153              : /**
     154              :  * @brief Format a std::vector<int> to a stream
     155              :  * @param t ostream Instance
     156              :  * @param ints Vector to format
     157              :  * @return ostream Instance
     158              :  */
     159              : std::ostream&
     160            8 : operator<<(std::ostream& t, std::vector<int> ints)
     161              : {
     162            8 :   t << "{";
     163            8 :   bool first = true;
     164           90 :   for (auto& i : ints) {
     165           82 :     if (!first)
     166           74 :       t << ", ";
     167           82 :     first = false;
     168           82 :     t << i;
     169              :   }
     170            8 :   return t << "}";
     171              : }
     172              : 
     173              : void
     174            4 : ReversedListValidator::do_work(std::atomic<bool>& running_flag)
     175              : {
     176            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_work() method";
     177            4 :   m_request_start = std::chrono::steady_clock::now();
     178              : 
     179      1736500 :   while (running_flag.load()) {
     180      1736492 :     TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Locking out id list";
     181      1736492 :     std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
     182              : 
     183      1736492 :     TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Sending new requests";
     184      3473021 :     auto next_req_time = [&]() {
     185      1736529 :       auto ms = 1000.0 / m_request_rate_hz;
     186      1736529 :       auto off = ms * m_next_id;
     187      1736529 :       return m_request_start + std::chrono::milliseconds(static_cast<int>(off));
     188      1736492 :     };
     189              : 
     190      1736529 :     while (m_outstanding_ids.size() < m_max_outstanding_requests &&
     191      1736529 :            std::chrono::steady_clock::now() > next_req_time()) {
     192           37 :       auto size = m_list_creator.send_create(++m_next_id); //  NOLINT(runtime/increment_decrement)
     193           37 :       m_outstanding_ids[m_next_id] = OutstandingList(size);
     194           37 :       send_request(m_next_id);
     195           37 :       ++m_requests_total;
     196           37 :       ++m_new_requests;
     197              :     }
     198              : 
     199      1736492 :     TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": End of do_work loop";
     200      1736492 :   }
     201              : 
     202            4 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_work() method";
     203            4 : }
     204              : 
     205              : void
     206            3 : ReversedListValidator::process_list(const ReversedList& list)
     207              : {
     208            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method";
     209              : 
     210            3 :   ++m_total_lists;
     211            3 :   ++m_new_lists;
     212              : 
     213            3 :   size_t requested_size = 0;
     214            3 :   if (m_outstanding_ids.count(list.list_id)) {
     215            3 :     requested_size = m_outstanding_ids.at(list.list_id).size;
     216              :   }
     217              : 
     218            3 :   std::ostringstream oss_prog;
     219            3 :   oss_prog << "Validating list set #" << list.list_id << " with requested size " << requested_size << " from reverser " << list.reverser_id
     220            3 :            << ". ";
     221            3 :   TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
     222              : 
     223            3 :   if (list.lists.size() != m_num_generators) {
     224            0 :     ers::error(MissingListError(ERS_HERE, get_name(), list.list_id, m_num_generators, list.lists.size()));
     225              :   }
     226              : 
     227            6 :   for (auto& list_data : list.lists) {
     228              : 
     229            3 :     std::ostringstream oss_prog;
     230            3 :     oss_prog << "Validating list #" << list.list_id << " from generator " << list_data.original.generator_id
     231            3 :              << ", original contents " << list_data.original.list << " and reversed contents "
     232            3 :              << list_data.reversed.list << ". ";
     233            3 :     TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
     234              : 
     235            3 :     if (list_data.original.list.size() != requested_size || list_data.reversed.list.size() != requested_size) {
     236            1 :       ers::error(ListSizeError(ERS_HERE, get_name(), list.list_id, requested_size, list_data.original.list.size(), list_data.reversed.list.size()));
     237            1 :       ++m_invalid_list_pairs;
     238            1 :       ++m_total_invalid_pairs;
     239            1 :       continue;
     240              :     }
     241              : 
     242            2 :     TLOG_DEBUG(TLVL_LIST_VALIDATION)
     243            2 :       << get_name() << ": Re-reversing the reversed list so that it can be compared to the original list";
     244            2 :     auto reversed = list_data.reversed.list;
     245            2 :     std::reverse(reversed.begin(), reversed.end());
     246              : 
     247            2 :     TLOG_DEBUG(TLVL_LIST_VALIDATION) << get_name() << ": Comparing the doubly-reversed list with the original list";
     248            2 :     if (reversed != list_data.original.list) {
     249            1 :       std::ostringstream oss_rev;
     250            1 :       oss_rev << reversed;
     251            1 :       std::ostringstream oss_orig;
     252            1 :       oss_orig << list_data.original.list;
     253            1 :       ers::error(DataMismatchError(ERS_HERE, get_name(), list.list_id, oss_rev.str(), oss_orig.str()));
     254            1 :       ++m_invalid_list_pairs;
     255            1 :       ++m_total_invalid_pairs;
     256            1 :     } else {
     257            1 :       ++m_valid_list_pairs;
     258            1 :       ++m_total_valid_pairs;
     259              :     }
     260            3 :   }
     261              : 
     262            3 :   std::lock_guard<std::mutex> lk(m_outstanding_id_mutex);
     263            3 :   m_outstanding_ids.erase(list.list_id);
     264              : 
     265            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method";
     266            3 : }
     267              : 
     268              : void
     269           37 : ReversedListValidator::send_request(int id)
     270              : {
     271           37 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering send_request() method";
     272              : 
     273           37 :   auto reverser_id = id % m_num_reversers;
     274              : 
     275           37 :   RequestList req;
     276           37 :   req.list_id = id;
     277           37 :   req.destination = m_list_connection;
     278              : 
     279           37 :   get_iomanager()->get_sender<RequestList>(m_reveserIds[reverser_id])->send(std::move(req), m_send_timeout);
     280              : 
     281           37 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting send_request() method";
     282           37 : }
     283              : 
     284              : } // namespace dunedaq::listrev
     285              : 
     286            4 : DEFINE_DUNE_DAQ_MODULE(dunedaq::listrev::ReversedListValidator)
     287              : 
     288              : // Local Variables:
     289              : // c-basic-offset: 2
     290              : // End:
        

Generated by: LCOV version 2.0-1