DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm Class Reference

#include <DataHandlingModel.hpp>

Public Member Functions

 PostprocessScheduleAlgorithm (LatencyBufferType &latency_buffer_impl, RawDataProcessorType &raw_processor_impl, uint64_t processing_delay_ticks, uint64_t post_processing_delay_min_wait, uint64_t post_processing_delay_max_wait)
 
int run (bool timeout)
 
int do_run (bool timeout)
 

Private Attributes

LatencyBufferType & m_latency_buffer_impl
 
RawDataProcessorType & m_raw_processor_impl
 
const uint64_t m_processing_delay_ticks
 
const uint64_t m_post_processing_delay_min_wait
 
const uint64_t m_post_processing_delay_max_wait
 
bool m_first_cycle
 
RDT m_processed_up_to
 
int m_consecutive_timeouts
 
const timestamp_t m_max_wait_in_ticks
 
std::chrono::time_point< std::chrono::system_clock > m_last_post_proc_time
 

Detailed Description

template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType, class InputDataType = ReadoutType>
class dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm

Definition at line 136 of file DataHandlingModel.hpp.

Constructor & Destructor Documentation

◆ PostprocessScheduleAlgorithm()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::PostprocessScheduleAlgorithm ( LatencyBufferType & latency_buffer_impl,
RawDataProcessorType & raw_processor_impl,
uint64_t processing_delay_ticks,
uint64_t post_processing_delay_min_wait,
uint64_t post_processing_delay_max_wait )
inline

Definition at line 139 of file DataHandlingModel.hpp.

144 : m_latency_buffer_impl{ latency_buffer_impl }
145 , m_raw_processor_impl{ raw_processor_impl }
146 , m_processing_delay_ticks{ processing_delay_ticks }
147 , m_post_processing_delay_min_wait{ post_processing_delay_min_wait }
148 , m_post_processing_delay_max_wait{ post_processing_delay_max_wait }
149 , m_first_cycle{ true }
151 , m_last_post_proc_time{ std::chrono::system_clock::now() }
153 , m_max_wait_in_ticks{ post_processing_delay_max_wait * 62500 } // FIXME: hardcoded clock frequency
154 {
155 }
std::chrono::time_point< std::chrono::system_clock > m_last_post_proc_time

Member Function Documentation

◆ do_run()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
int dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::do_run ( bool timeout)
inline

Definition at line 173 of file DataHandlingModel.hpp.

174 {
175 if (m_latency_buffer_impl.occupancy() == 0) {
176 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (empty buffer)";
177 return 0;
178 }
179
180 if (m_first_cycle) {
181 auto head = m_latency_buffer_impl.front();
182 m_processed_up_to.set_timestamp(head->get_timestamp());
183 m_first_cycle = false;
184 TLOG() << "***** First pass post processing *****";
185 }
186
187 // Get the LB boundaries
188 auto tail = m_latency_buffer_impl.back();
189 auto newest_ts = tail->get_timestamp();
190
191 timestamp_t end_win_ts = 0;
192 std::chrono::time_point<std::chrono::system_clock> now{ std::chrono::system_clock::now() };
193
194 if (timeout) {
195 // Return if the last processed timestamp is greater than the newest timestamp
196 // This condition occurs after a timeout
197 if (m_processed_up_to.get_timestamp() >= newest_ts + 1) {
198 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (at or past cap)";
199 return 0;
200 }
201
204
205 end_win_ts = newest_ts - m_processing_delay_ticks + timeout_accumulated;
206 end_win_ts = std::min(end_win_ts, newest_ts + 1); // Cap to prevent end_win_ts from becoming unnecessarily large
207 } else {
209
210 if (m_processed_up_to.get_timestamp() >= newest_ts + 1) {
211 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (data arrived too late, will be ignored)";
212 return 0;
213 }
214
215 auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_last_post_proc_time);
216
217 if (milliseconds.count() > m_post_processing_delay_min_wait) {
218 if (newest_ts - m_processed_up_to.get_timestamp() > m_processing_delay_ticks) {
219 end_win_ts = newest_ts - m_processing_delay_ticks;
220 } else {
221 TLOG_DEBUG(TLVL_WORK_STEPS) << "Not ready to postprocess (m_processing_delay_ticks is greater)";
222 return 0;
223 }
224 } else {
225 TLOG_DEBUG(TLVL_WORK_STEPS) << "Not ready to postprocess (too fast)";
226 return 0;
227 }
228 }
229
230 auto start_iter = m_latency_buffer_impl.lower_bound(m_processed_up_to, false);
231 m_processed_up_to.set_timestamp(end_win_ts);
232 auto end_iter = m_latency_buffer_impl.lower_bound(m_processed_up_to, false);
233
234 // This likely happens when RDT uses a composite key
235 // The current algorithm does not support composite keys
236 // Our search item `m_processed_up_to` will have its other keys set to their defaults
237 // E.g., for TriggerPrimitive, channel = INVALID_TP_CHANNEL
238 // Even if an entry with the same ts exists in the buffer, its channel will be a valid (smaller) value,
239 // so `lower_bound` will not be able to find it
240 // We should verify that this is the only scenario in which we end up here
241 if (!start_iter.good()) {
242 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (!start_iter.good())";
243 return 0;
244 }
245
246 if (start_iter == end_iter) {
247 TLOG_DEBUG(TLVL_WORK_STEPS) << "Nothing to postprocess (start_iter == end_iter)";
248 return 0;
249 }
250
251 int processed = 0;
252 for (auto it = start_iter; it != end_iter; ++it) {
253 // Just to be completely safe
254 // We should understand why we end up here
255 if (!it.good()) {
256 TLOG_DEBUG(TLVL_WORK_STEPS) << "Invalid iterator in postprocessing loop";
257 break;
258 }
259 m_raw_processor_impl.postprocess_item(&(*it));
260 ++processed;
261 }
262
264
265 return processed;
266 }
static int64_t now()
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22

◆ run()

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
int dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::run ( bool timeout)
inline

Definition at line 159 of file DataHandlingModel.hpp.

159 {
160 int processed = this->do_run(timeout);
161
162 if (timeout) {
164 m_raw_processor_impl.invoke_postprocess_schedule_timeout_policy(timeout_accumulated);
165 }
166
167 return processed;
168 }

Member Data Documentation

◆ m_consecutive_timeouts

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
int dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_consecutive_timeouts
private

Definition at line 276 of file DataHandlingModel.hpp.

◆ m_first_cycle

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
bool dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_first_cycle
private

Definition at line 274 of file DataHandlingModel.hpp.

◆ m_last_post_proc_time

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
std::chrono::time_point<std::chrono::system_clock> dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_last_post_proc_time
private

Definition at line 278 of file DataHandlingModel.hpp.

◆ m_latency_buffer_impl

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
LatencyBufferType& dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_latency_buffer_impl
private

Definition at line 269 of file DataHandlingModel.hpp.

◆ m_max_wait_in_ticks

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
const timestamp_t dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_max_wait_in_ticks
private

Definition at line 277 of file DataHandlingModel.hpp.

◆ m_post_processing_delay_max_wait

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
const uint64_t dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_post_processing_delay_max_wait
private

Definition at line 273 of file DataHandlingModel.hpp.

◆ m_post_processing_delay_min_wait

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
const uint64_t dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_post_processing_delay_min_wait
private

Definition at line 272 of file DataHandlingModel.hpp.

◆ m_processed_up_to

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
RDT dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_processed_up_to
private

Definition at line 275 of file DataHandlingModel.hpp.

◆ m_processing_delay_ticks

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
const uint64_t dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_processing_delay_ticks
private

Definition at line 271 of file DataHandlingModel.hpp.

◆ m_raw_processor_impl

template<class ReadoutType , class RequestHandlerType , class LatencyBufferType , class RawDataProcessorType , class InputDataType = ReadoutType>
RawDataProcessorType& dunedaq::datahandlinglibs::DataHandlingModel< ReadoutType, RequestHandlerType, LatencyBufferType, RawDataProcessorType, InputDataType >::PostprocessScheduleAlgorithm::m_raw_processor_impl
private

Definition at line 270 of file DataHandlingModel.hpp.


The documentation for this class was generated from the following file: