LCOV - code coverage report
Current view: top level - dfmodules/src - TPBundleHandler.cpp (source / functions) Coverage Total Hit
Test: code.result Lines: 0.0 % 128 0
Test Date: 2025-12-21 13:07:08 Functions: 0.0 % 8 0

            Line data    Source code
       1              : /**
       2              :  * @file TPBundleHandler.cpp TPBundleHandler Class Implementation
       3              :  *
       4              :  * The TPBundleHandler class takes care of assembling and repacking TriggerPrimitives
       5              :  * for storage on disk as part of a TP stream.
       6              :  *
       7              :  * This is part of the DUNE DAQ Software Suite, copyright 2020.
       8              :  * Licensing/copyright details are in the COPYING file that you should have
       9              :  * received with this code.
      10              :  */
      11              : 
      12              : #include "dfmodules/TPBundleHandler.hpp"
      13              : 
      14              : #include "detdataformats/DetID.hpp"
      15              : #include "logging/Logging.hpp"
      16              : 
      17              : #include <memory>
      18              : #include <string>
      19              : #include <utility>
      20              : #include <vector>
      21              : 
      22              : namespace dunedaq {
      23              : namespace dfmodules {
      24              : 
      25              : void
      26            0 : TimeSliceAccumulator::add_tpset(trigger::TPSet&& tpset)
      27              : {
      28              :   // if this TPSet is near one of the edges of our window, handle it specially
      29            0 :   if (tpset.start_time <= m_begin_time || tpset.end_time >= m_end_time) {
      30            0 :     trigger::TPSet working_tpset;
      31            0 :     daqdataformats::timestamp_t first_time = daqdataformats::TypeDefaults::s_invalid_timestamp;
      32            0 :     daqdataformats::timestamp_t last_time = daqdataformats::TypeDefaults::s_invalid_timestamp;
      33            0 :     bool first = true;
      34            0 :     for (auto& trigprim : tpset.objects) {
      35            0 :       if (trigprim.time_start >= m_begin_time && trigprim.time_start < m_end_time) {
      36            0 :         working_tpset.objects.push_back(trigprim);
      37            0 :         if (first) {
      38            0 :           first_time = trigprim.time_start;
      39            0 :           first = false;
      40              :         }
      41            0 :         last_time = trigprim.time_start;
      42              :       }
      43              :     }
      44            0 :     if (working_tpset.objects.size() == 0) {
      45            0 :       if (tpset.end_time == m_begin_time) {
      46              :         // the end of the TPSet just missed the start of our window, so not a big deal
      47            0 :         TLOG_DEBUG(22) << "Note: no TPs were used from a TPSet with start_time=" << tpset.start_time
      48            0 :                        << ", end_time=" << tpset.end_time << ", TSAccumulator begin and end times:" << m_begin_time
      49            0 :                        << ", " << m_end_time;
      50              :       } else {
      51              :         // woah, something unexpected happened
      52            0 :         ers::warning(NoTPsInWindow(ERS_HERE, tpset.start_time, tpset.end_time, m_begin_time, m_end_time));
      53              :       }
      54            0 :       return;
      55              :     }
      56            0 :     working_tpset.type = tpset.type;
      57            0 :     working_tpset.seqno = tpset.seqno;
      58            0 :     working_tpset.origin = tpset.origin;
      59            0 :     working_tpset.start_time = first_time;
      60            0 :     working_tpset.end_time = last_time;
      61            0 :     tpset = std::move(working_tpset);
      62            0 :   }
      63              : 
      64              :   // create an entry in the top-level map for the sourceid in this TPSet, if needed
      65            0 :   auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
      66            0 :   if (m_tpbundles_by_sourceid_and_start_time.count(tpset.origin) == 0) {
      67            0 :     tpbundles_by_start_time_t empty_bundle_map;
      68            0 :     m_tpbundles_by_sourceid_and_start_time[tpset.origin] = empty_bundle_map;
      69            0 :   }
      70              : 
      71              :   // store the TPSet in the map
      72            0 :   if (m_tpbundles_by_sourceid_and_start_time[tpset.origin].count(tpset.start_time)) {
      73            0 :     ers::warning(DuplicateTPWindow(ERS_HERE, tpset.origin.id, tpset.start_time));
      74              :   }
      75            0 :   m_tpbundles_by_sourceid_and_start_time[tpset.origin].emplace(tpset.start_time, std::move(tpset));
      76            0 :   m_update_time = std::chrono::steady_clock::now();
      77            0 : }
      78              : 
      79              : std::unique_ptr<daqdataformats::TimeSlice>
      80            0 : TimeSliceAccumulator::get_timeslice()
      81              : {
      82            0 :   auto lk = std::lock_guard<std::mutex>(m_bundle_map_mutex);
      83            0 :   std::vector<std::unique_ptr<daqdataformats::Fragment>> list_of_fragments;
      84              : 
      85              :   // loop over all SourceID present in this accumulator
      86            0 :   for (auto& [sourceid, bundle_map] : m_tpbundles_by_sourceid_and_start_time) {
      87              : 
      88              :     // build up the list of pieces that we will use to contruct the Fragment
      89            0 :     std::vector<std::pair<void*, size_t>> list_of_pieces;
      90            0 :     for (auto& [start_time, tpset] : bundle_map) {
      91            0 :       list_of_pieces.push_back(std::make_pair<void*, size_t>(
      92            0 :         &tpset.objects[0], tpset.objects.size() * sizeof(trgdataformats::TriggerPrimitive)));
      93              :     }
      94            0 :     std::unique_ptr<daqdataformats::Fragment> frag(new daqdataformats::Fragment(list_of_pieces));
      95              : 
      96            0 :     frag->set_run_number(m_run_number);
      97            0 :     frag->set_trigger_number(m_slice_number);
      98            0 :     frag->set_window_begin(m_begin_time);
      99            0 :     frag->set_window_end(m_end_time);
     100            0 :     frag->set_element_id(sourceid);
     101            0 :     frag->set_detector_id(static_cast<uint16_t>(detdataformats::DetID::Subdetector::kDAQ));
     102            0 :     frag->set_type(daqdataformats::FragmentType::kTriggerPrimitive);
     103              : 
     104            0 :     size_t frag_payload_size = frag->get_size() - sizeof(dunedaq::daqdataformats::FragmentHeader);
     105            0 :     TLOG_DEBUG(21) << "In get_timeslice, Source ID is " << sourceid << ", number of pieces is " << list_of_pieces.size()
     106            0 :                    << ", size of Fragment payload is " << frag_payload_size << ", size of TP is "
     107            0 :                    << sizeof(trgdataformats::TriggerPrimitive);
     108              : 
     109            0 :     list_of_fragments.push_back(std::move(frag));
     110            0 :   }
     111              : 
     112            0 :   std::unique_ptr<daqdataformats::TimeSlice> time_slice(new daqdataformats::TimeSlice(m_slice_number, m_run_number));
     113            0 :   time_slice->set_fragments(std::move(list_of_fragments));
     114            0 :   return time_slice;
     115            0 : }
     116              : 
     117              : void
     118            0 : TPBundleHandler::add_tpset(trigger::TPSet&& tpset)
     119              : {
     120              :   // if the tpset seems to span multiple timeslices, then we add it to all
     121              :   // of the relevant accumulators. This runs the risk of adding a tpset
     122              :   // to an accumululator that won't find any TPs within the
     123              :   // accumulator window (because of edge effects), but we want to be
     124              :   // cautious here (and we'll protect against the absence of tpsets later).
     125              :   // Of course, adding the tpset to multiple accumulators requires copies...
     126            0 :   size_t tsidx_from_begin_time = tpset.start_time / m_slice_interval;
     127            0 :   size_t tsidx_from_end_time = tpset.end_time / m_slice_interval;
     128            0 :   if (m_slice_index_offset == 0) {
     129            0 :     m_slice_index_offset = tsidx_from_begin_time - 1;
     130              :   }
     131              : 
     132              :   // 24-Mar-2024, KAB: added check for TimeSlice indexes that are earlier
     133              :   // than the one that we started with. We try to gracefully handle them
     134              :   // by adjusting the slice_ids of existing slices, but if we can't do that,
     135              :   // we discard them so that we don't get
     136              :   // TimeSlices with large timeslice_ids (e.g. -1 converted to a uint64_t).
     137            0 :   if (tsidx_from_begin_time <= m_slice_index_offset) {
     138            0 :     auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
     139            0 :     int64_t diff = static_cast<int64_t>(tsidx_from_begin_time) - static_cast<int64_t>(m_slice_index_offset);
     140            0 :     if (! m_one_or_more_time_slices_have_aged_out) {
     141            0 :       TLOG() << "Updating the slice numbers of existing accumulators by " << (1-diff);
     142            0 :       for (auto& [local_tsidx, local_accum] : m_timeslice_accumulators) {
     143            0 :         local_accum.update_slice_number(1 - diff);
     144              :       }
     145            0 :       m_slice_index_offset -= (1 - diff);
     146              :     }
     147              :     else {
     148            0 :       ers::warning(TardyTPSetReceived(ERS_HERE, tpset.origin.id, tpset.start_time, diff));
     149            0 :       return;
     150              :     }
     151            0 :   }
     152              : 
     153              :   // add the TPSet to any 'extra' accumulators
     154            0 :   for (size_t tsidx = (tsidx_from_begin_time + 1); tsidx <= tsidx_from_end_time; ++tsidx) {
     155            0 :     {
     156            0 :       auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
     157            0 :       if (m_timeslice_accumulators.count(tsidx) == 0) {
     158            0 :         TimeSliceAccumulator accum(tsidx * m_slice_interval,
     159            0 :                                    (tsidx + 1) * m_slice_interval,
     160            0 :                                    tsidx - m_slice_index_offset,
     161            0 :                                    m_run_number);
     162            0 :         m_timeslice_accumulators[tsidx] = accum;
     163            0 :       }
     164            0 :     }
     165            0 :     trigger::TPSet tpset_copy = tpset;
     166            0 :     m_timeslice_accumulators[tsidx].add_tpset(std::move(tpset_copy));
     167            0 :   }
     168              : 
     169              :   // add the TPSet to the accumulator associated with the begin time
     170            0 :   {
     171            0 :     auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
     172            0 :     if (m_timeslice_accumulators.count(tsidx_from_begin_time) == 0) {
     173            0 :       TimeSliceAccumulator accum(tsidx_from_begin_time * m_slice_interval,
     174            0 :                                  (tsidx_from_begin_time + 1) * m_slice_interval,
     175            0 :                                  tsidx_from_begin_time - m_slice_index_offset,
     176            0 :                                  m_run_number);
     177            0 :       m_timeslice_accumulators[tsidx_from_begin_time] = accum;
     178            0 :     }
     179            0 :   }
     180            0 :   m_timeslice_accumulators[tsidx_from_begin_time].add_tpset(std::move(tpset));
     181              : }
     182              : 
     183              : std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
     184            0 : TPBundleHandler::get_properly_aged_timeslices()
     185              : {
     186            0 :   std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
     187            0 :   std::vector<daqdataformats::timestamp_t> elements_to_be_removed;
     188              : 
     189            0 :   auto now = std::chrono::steady_clock::now();
     190            0 :   for (auto& [tsidx, accum] : m_timeslice_accumulators) {
     191            0 :     if ((now - accum.get_update_time()) >= m_cooling_off_time) {
     192            0 :       list_of_timeslices.push_back(accum.get_timeslice());
     193            0 :       elements_to_be_removed.push_back(tsidx);
     194              :     }
     195              :   }
     196              : 
     197            0 :   auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
     198            0 :   for (auto& tsidx : elements_to_be_removed) {
     199            0 :     m_timeslice_accumulators.erase(tsidx);
     200              :   }
     201              : 
     202            0 :   if (list_of_timeslices.size() > 0) {m_one_or_more_time_slices_have_aged_out = true;}
     203            0 :   return list_of_timeslices;
     204            0 : }
     205              : 
     206              : std::vector<std::unique_ptr<daqdataformats::TimeSlice>>
     207            0 : TPBundleHandler::get_all_remaining_timeslices()
     208              : {
     209            0 :   std::vector<std::unique_ptr<daqdataformats::TimeSlice>> list_of_timeslices;
     210              : 
     211            0 :   for (auto& [tsidx, accum] : m_timeslice_accumulators) {
     212            0 :     list_of_timeslices.push_back(accum.get_timeslice());
     213              :   }
     214              : 
     215            0 :   {
     216            0 :     auto lk = std::lock_guard<std::mutex>(m_accumulator_map_mutex);
     217            0 :     m_timeslice_accumulators.clear();
     218            0 :   }
     219              : 
     220            0 :   return list_of_timeslices;
     221            0 : }
     222              : 
     223              : } // namespace dfmodules
     224              : } // namespace dunedaq
        

Generated by: LCOV version 2.0-1