LCOV - code coverage report
Current view: top level - listrev/plugins - ListReverser.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 86.9 % 153 133
Test Date: 2025-12-21 13:07:08 Functions: 100.0 % 26 26

            Line data    Source code
       1              : /**
       2              :  * @file ListReverser.cpp ListReverser class
       3              :  * implementation
       4              :  *
       5              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       6              :  * Licensing/copyright details are in the COPYING file that you should have
       7              :  * received with this code.
       8              :  */
       9              : 
      10              : #include "listrev/dal/ListReverser.hpp"
      11              : #include "listrev/dal/RandomDataListGenerator.hpp"
      12              : #include "listrev/dal/RandomListGeneratorSet.hpp"
      13              : 
      14              : #include "listrev/opmon/list_rev_info.pb.h"
      15              : 
      16              : #include "CommonIssues.hpp"
      17              : #include "ListReverser.hpp"
      18              : 
      19              : #include "appfwk/ConfigurationManager.hpp"
      20              : #include "confmodel/Connection.hpp"
      21              : 
      22              : #include "iomanager/IOManager.hpp"
      23              : #include "logging/Logging.hpp"
      24              : 
      25              : #include <chrono>
      26              : #include <memory>
      27              : #include <string>
      28              : #include <thread>
      29              : #include <utility>
      30              : #include <vector>
      31              : 
      32              : /**
      33              :  * @brief Name used by TRACE TLOG calls from this source file
      34              :  */
      35              : #define TRACE_NAME "ListReverser"  // NOLINT
      36              : #define TLVL_ENTER_EXIT_METHODS 10 // NOLINT
      37              : #define TLVL_LIST_REVERSAL 15      // NOLINT
      38              : #define TLVL_REQUEST_SENDING 16    // NOLINT
      39              : #define TLVL_CONFIGURE 17          // NOLINT
      40              : 
      41              : namespace dunedaq::listrev {
      42              : 
      43            3 : ListReverser::ListReverser(const std::string& name)
      44            3 :   : DAQModule(name)
      45              : {
      46            3 :   register_command("start", &ListReverser::do_start);
      47            3 :   register_command("stop", &ListReverser::do_stop);
      48            3 : }
      49              : 
      50              : void
      51            3 : ListReverser::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
      52              : {
      53            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
      54            3 :   auto mdal = mcfg->get_dal<dal::ListReverser>(get_name());
      55            9 :   for (auto con : mdal->get_inputs()) {
      56            6 :     if (con->get_data_type() == datatype_to_string<IntList>()) {
      57            3 :       m_list_connection = con->UID();
      58              :     }
      59            6 :     if (con->get_data_type() == datatype_to_string<RequestList>()) {
      60            3 :       m_requests = con->UID();
      61              :     }
      62              :   }
      63              : 
      64            3 :   try {
      65            3 :     get_iom_receiver<IntList>(m_list_connection);
      66            0 :   } catch (const ers::Issue& excpt) {
      67            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "input", excpt);
      68            0 :   }
      69            3 :   try {
      70            3 :     get_iom_receiver<RequestList>(m_requests);
      71            0 :   } catch (const ers::Issue& excpt) {
      72            0 :     throw InvalidQueueFatalError(ERS_HERE, get_name(), "output", excpt);
      73            0 :   }
      74              : 
      75            9 :   for (auto con : mdal->get_outputs()) {
      76            6 :     if (con->get_data_type() == datatype_to_string<RequestList>()) {
      77            3 :       m_generator_connections.push_back(con->UID());
      78              :     }
      79              :   }
      80              : 
      81            3 :   m_send_timeout = std::chrono::milliseconds(mdal->get_send_timeout_ms());
      82            3 :   m_request_timeout = std::chrono::milliseconds(mdal->get_request_timeout_ms());
      83            3 :   m_reverser_id = mdal->get_reverser_id();
      84              : 
      85            3 :   TLOG_DEBUG(TLVL_CONFIGURE) << "ListReverser " << m_reverser_id << " configured with " << "send timeout "
      86            0 :                              << mdal->get_send_timeout_ms() << " ms," << " request timeout "
      87            0 :                              << mdal->get_request_timeout_ms() << "ms, " << " and " << m_generator_connections.size()
      88            3 :                              << " generators.";
      89              : 
      90            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
      91            3 : }
      92              : 
      93              : void
      94            3 : ListReverser::generate_opmon_data()
      95              : {
      96            3 :   opmon::ListReverserInfo fcr;
      97              : 
      98            3 :   fcr.set_requests_received(m_requests_received.exchange(0));
      99            3 :   fcr.set_requests_sent(m_requests_sent.exchange(0));
     100            3 :   fcr.set_lists_received(m_lists_received.exchange(0));
     101            3 :   fcr.set_lists_sent(m_lists_sent.exchange(0));
     102            6 :   fcr.set_total_requests_received(m_total_requests_received.load());
     103            6 :   fcr.set_total_requests_sent(m_total_requests_sent.load());
     104            6 :   fcr.set_total_lists_received(m_total_lists_received.load());
     105            6 :   fcr.set_total_lists_sent(m_total_lists_sent.load());
     106              : 
     107            3 :   publish(std::move(fcr));
     108            3 : }
     109              : 
     110              : void
     111            3 : ListReverser::do_start(const CommandData_t& /*startobj*/)
     112              : {
     113            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
     114            6 :   get_iomanager()->add_callback<IntList>(m_list_connection,
     115            6 :                                          std::bind(&ListReverser::process_list, this, std::placeholders::_1));
     116            6 :   get_iomanager()->add_callback<RequestList>(
     117            6 :     m_requests, std::bind(&ListReverser::process_list_request, this, std::placeholders::_1));
     118              : 
     119            6 :   TLOG() << get_name() << " successfully started";
     120            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
     121            3 : }
     122              : 
     123              : void
     124            3 : ListReverser::do_stop(const CommandData_t& /*stopobj*/)
     125              : {
     126            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
     127              : 
     128            3 :   std::this_thread::sleep_for(std::chrono::milliseconds(100));
     129            3 :   std::chrono::milliseconds stop_timeout(5000);
     130            3 :   auto stop_wait = std::chrono::steady_clock::now();
     131            6 :   size_t outstanding_wait = (m_total_requests_received.load() - m_total_lists_sent.load()) + // Requests from validator
     132            6 :                             (m_total_requests_sent.load() - m_total_lists_received.load());  // Requests to generator
     133          499 :   while (outstanding_wait > 0 && std::chrono::duration_cast<std::chrono::milliseconds>(
     134          497 :                                    std::chrono::steady_clock::now() - stop_wait) < stop_timeout) {
     135          496 :     std::this_thread::sleep_for(std::chrono::milliseconds(10));
     136          992 :     outstanding_wait = (m_total_requests_received.load() - m_total_lists_sent.load()) + // Requests from validator
     137         1488 :                        (m_total_requests_sent.load() - m_total_lists_received.load());  // Requests to generator
     138              :   }
     139              : 
     140            3 :   get_iomanager()->remove_callback<RequestList>(m_requests);
     141            3 :   get_iomanager()->remove_callback<IntList>(m_list_connection);
     142            6 :   TLOG() << get_name() << " successfully stopped";
     143              : 
     144            3 :   std::ostringstream oss_summ;
     145            6 :   oss_summ << ": Exiting do_stop() method, received " << m_total_requests_received.load() << " request messages, "
     146            9 :            << "sent " << m_total_requests_sent.load() << ", received " << m_total_lists_received.load()
     147            6 :            << " lists, and sent " << m_total_lists_sent.load() << " reversed list messages";
     148            3 :   ers::info(ProgressUpdate(ERS_HERE, get_name(), oss_summ.str()));
     149              : 
     150            3 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
     151            3 : }
     152              : 
     153              : void
     154            2 : ListReverser::process_list_request(const RequestList& request)
     155              : {
     156            2 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list_request() method";
     157            2 :   {
     158            2 :     std::lock_guard<std::mutex> lk(m_map_mutex);
     159            2 :     if (!m_pending_lists.count(request.list_id)) {
     160            2 :       m_pending_lists[request.list_id] = PendingList(request.destination, request.list_id, m_reverser_id);
     161            2 :       ++m_requests_received;
     162            2 :       ++m_total_requests_received;
     163              :     }
     164            2 :   }
     165              : 
     166            4 :   for (auto const& gen_conn : m_generator_connections) {
     167            2 :     TLOG_DEBUG(TLVL_REQUEST_SENDING) << "Sending request for " << request.list_id << " with destination "
     168            2 :                                      << m_list_connection << " to " << gen_conn;
     169            2 :     RequestList req(request.list_id, m_list_connection);
     170            2 :     get_iomanager()->get_sender<RequestList>(gen_conn)->send(std::move(req), m_send_timeout);
     171            2 :     ++m_requests_sent;
     172            2 :     ++m_total_requests_sent;
     173            2 :   }
     174            2 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list_request() method";
     175            2 : }
     176              : 
     177              : /**
     178              :  * @brief Format a std::vector<int> to a stream
     179              :  * @param t ostream Instance
     180              :  * @param ints Vector to format
     181              :  * @return ostream Instance
     182              :  */
     183              : std::ostream&
     184            1 : operator<<(std::ostream& t, std::vector<int> ints)
     185              : {
     186            1 :   t << "{";
     187            1 :   bool first = true;
     188            5 :   for (auto& i : ints) {
     189            4 :     if (!first)
     190            3 :       t << ", ";
     191            4 :     first = false;
     192            4 :     t << i;
     193              :   }
     194            1 :   return t << "}";
     195              : }
     196              : 
     197              : void
     198            1 : ListReverser::process_list(const IntList& list)
     199              : {
     200            1 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering process_list() method";
     201              : 
     202            1 :   std::lock_guard<std::mutex> lk(m_map_mutex);
     203            1 :   ++m_lists_received;
     204            1 :   ++m_total_lists_received;
     205            1 :   TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Received list #" << list.list_id << " from " << list.generator_id
     206            1 :                                  << ". It has size " << list.list.size() << ". Reversing its contents";
     207              : 
     208            1 :   if (m_pending_lists.count(list.list_id) == 0) {
     209              : 
     210            0 :     std::ostringstream oss_warn;
     211            0 :     oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor
     212            0 :              << "\" (late list receive)";
     213            0 :     ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
     214            0 :     return;
     215            0 :   }
     216              : 
     217            1 :   auto workingVector = list.list;
     218            1 :   std::reverse(workingVector.begin(), workingVector.end());
     219            1 :   IntList wrapped(list.list_id, m_reverser_id, workingVector);
     220              : 
     221            1 :   ReversedList::Data this_data;
     222            1 :   this_data.original = list;
     223            1 :   this_data.reversed = wrapped;
     224              : 
     225            1 :   m_pending_lists[list.list_id].list.lists.push_back(this_data);
     226              : 
     227            1 :   std::ostringstream oss_prog;
     228            2 :   oss_prog << "Reversed list #" << list.list_id << " from " << list.generator_id << ", new contents " << workingVector
     229            1 :            << " and size " << workingVector.size() << ". ";
     230            1 :   TLOG_DEBUG() << ProgressUpdate(ERS_HERE, get_name(), oss_prog.str());
     231              : 
     232            1 :   if (m_pending_lists[list.list_id].list.lists.size() >= m_generator_connections.size() ||
     233            1 :       std::chrono::duration_cast<std::chrono::milliseconds>(
     234            1 :         std::chrono::steady_clock::now() - m_pending_lists[list.list_id].start_time) > m_request_timeout) {
     235              : 
     236              :     bool successfullyWasSent = false;
     237              :     int failCount = 0;
     238            2 :     while (!successfullyWasSent && failCount < 100) {
     239            1 :       TLOG_DEBUG(TLVL_LIST_REVERSAL) << get_name() << ": Sending the reversed lists " << list.list_id;
     240            1 :       try {
     241            2 :         get_iomanager()
     242            2 :           ->get_sender<ReversedList>(m_pending_lists[list.list_id].requestor)
     243            1 :           ->send(std::move(m_pending_lists[list.list_id].list), m_send_timeout);
     244            1 :         successfullyWasSent = true;
     245            1 :         ++m_lists_sent;
     246            1 :         ++m_total_lists_sent;
     247            1 :         m_pending_lists.erase(list.list_id);
     248            0 :       } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
     249            0 :         std::ostringstream oss_warn;
     250            0 :         oss_warn << "send " << list.list_id << " to \"" << m_pending_lists[list.list_id].requestor << "\"";
     251            0 :         ers::warning(dunedaq::iomanager::TimeoutExpired(ERS_HERE, get_name(), oss_warn.str(), m_send_timeout.count()));
     252            0 :         ++failCount;
     253            0 :       }
     254              :     }
     255              :   }
     256              : 
     257            1 :   TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting process_list() method";
     258            1 : }
     259              : 
     260              : } // namespace dunedaq::listrev
     261              : 
     262            3 : DEFINE_DUNE_DAQ_MODULE(dunedaq::listrev::ListReverser)
     263              : 
     264              : // Local Variables:
     265              : // c-basic-offset: 2
     266              : // End:
        

Generated by: LCOV version 2.0-1