DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType > Class Template Reference

#include <TaskRawDataProcessorModel.hpp>

Inheritance diagram for dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >:
[legend]
Collaboration diagram for dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >:
[legend]

Public Member Functions

 TaskRawDataProcessorModel (std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
 
 ~TaskRawDataProcessorModel ()
 
void conf (const appmodel::DataHandlerModule *conf) override
 Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
 
void scrap (const nlohmann::json &) override
 Unconfigure.
 
void start (const nlohmann::json &) override
 Start operation.
 
void stop (const nlohmann::json &) override
 Stop operation.
 
void reset_last_daq_time ()
 
std::uint64_t get_last_daq_time () override
 Get newest timestamp of last seen packet.
 
void preprocess_item (ReadoutType *item) override
 Preprocess one element.
 
void postprocess_item (const ReadoutType *item) override
 Postprocess one element.
 
template<typename Task >
void add_preprocess_task (Task &&task)
 
template<typename Task >
void add_postprocess_task (Task &&task)
 
void invoke_all_preprocess_functions (ReadoutType *item)
 
void launch_all_preprocess_functions (ReadoutType *item)
 
- Public Member Functions inherited from dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >
 RawDataProcessorConcept ()
 
virtual ~RawDataProcessorConcept ()
 
 RawDataProcessorConcept (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-constructible.
 
RawDataProcessorConceptoperator= (const RawDataProcessorConcept &)=delete
 RawDataProcessorConcept is not copy-assginable.
 
 RawDataProcessorConcept (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-constructible.
 
RawDataProcessorConceptoperator= (RawDataProcessorConcept &&)=delete
 RawDataProcessorConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

virtual void generate_opmon_data () override
 
void run_post_processing_thread (std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Protected Attributes

std::vector< std::function< void(ReadoutType *)> > m_preprocess_functions
 
std::unique_ptr< FrameErrorRegistry > & m_error_registry
 
bool m_post_processing_enabled
 
std::atomic< bool > m_run_marker { false }
 
std::vector< std::function< void(const ReadoutType *)> > m_post_process_functions
 
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const ReadoutType * > > > m_items_to_postprocess_queues
 
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
 
size_t m_postprocess_queue_sizes
 
daqdataformats::SourceID m_sourceid
 
std::atomic< uint64_t > m_last_processed_daq_ts { 0 }
 

Additional Inherited Members

- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 
- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

template<class ReadoutType>
class dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >

Definition at line 46 of file TaskRawDataProcessorModel.hpp.

Constructor & Destructor Documentation

◆ TaskRawDataProcessorModel()

template<class ReadoutType >
dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::TaskRawDataProcessorModel ( std::unique_ptr< FrameErrorRegistry > & error_registry,
bool post_processing_enabled )
inlineexplicit

◆ ~TaskRawDataProcessorModel()

template<class ReadoutType >
dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::~TaskRawDataProcessorModel ( )
inline

Definition at line 57 of file TaskRawDataProcessorModel.hpp.

57{}

Member Function Documentation

◆ add_postprocess_task()

template<class ReadoutType >
template<typename Task >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::add_postprocess_task ( Task && task)

Definition at line 81 of file TaskRawDataProcessorModel.hxx.

82{
83 m_post_process_threads.emplace_back(std::make_unique<utilities::ReusableThread>(0));
84 m_post_process_functions.push_back(std::forward<Task>(task));
85}
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
std::vector< std::function< void(const ReadoutType *)> > m_post_process_functions

◆ add_preprocess_task()

template<class ReadoutType >
template<typename Task >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::add_preprocess_task ( Task && task)

Definition at line 73 of file TaskRawDataProcessorModel.hxx.

74{
75 m_preprocess_functions.push_back(std::forward<Task>(task));
76}
std::vector< std::function< void(ReadoutType *)> > m_preprocess_functions

◆ conf()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::conf ( const appmodel::DataHandlerModule * conf)
overridevirtual

Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Reimplemented in dunedaq::fdreadoutlibs::TDEEthFrameProcessor, dunedaq::fdreadoutlibs::TDEFrameProcessor, dunedaq::fdreadoutlibs::WIBEthFrameProcessor, dunedaq::trigger::TCProcessor, and dunedaq::trigger::TPProcessor.

Definition at line 8 of file TaskRawDataProcessorModel.hxx.

9{
10 auto cfg = conf->get_module_configuration()->get_data_processor();
11 m_postprocess_queue_sizes = cfg->get_queue_sizes();
12 m_sourceid.id = conf->get_source_id();
13 m_sourceid.subsystem = ReadoutType::subsystem;
14
15 for (size_t i = 0; i < m_post_process_functions.size(); ++i) {
17 std::make_unique<folly::ProducerConsumerQueue<const ReadoutType*>>(m_postprocess_queue_sizes));
18 m_post_process_threads[i]->set_name(cfg->get_thread_names_prefix() + std::to_string(i), m_sourceid.id);
19 }
20}
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const ReadoutType * > > > m_items_to_postprocess_queues
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74

◆ generate_opmon_data()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Reimplemented in dunedaq::fdreadoutlibs::TDEEthFrameProcessor, dunedaq::fdreadoutlibs::WIBEthFrameProcessor, dunedaq::trigger::TAProcessor, dunedaq::trigger::TCProcessor, and dunedaq::trigger::TPProcessor.

Definition at line 123 of file TaskRawDataProcessorModel.hxx.

124{
125 for (size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
126 opmon::DataProcessorInfo info;
127 info.set_elements_queued(m_items_to_postprocess_queues[i]->sizeGuess());
128 this->publish(std::move(info), {{"post_processor_id", std::to_string(i)}});
129 }
130}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept

◆ get_last_daq_time()

template<class ReadoutType >
std::uint64_t dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::get_last_daq_time ( )
inlineoverridevirtual

Get newest timestamp of last seen packet.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Definition at line 75 of file TaskRawDataProcessorModel.hpp.

75{ return m_last_processed_daq_ts.load(); } // NOLINT(build/unsigned)

◆ invoke_all_preprocess_functions()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::invoke_all_preprocess_functions ( ReadoutType * item)

Definition at line 89 of file TaskRawDataProcessorModel.hxx.

90{
91 for (auto&& task : m_preprocess_functions) {
92 task(item);
93 }
94}

◆ launch_all_preprocess_functions()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::launch_all_preprocess_functions ( ReadoutType * item)

Definition at line 98 of file TaskRawDataProcessorModel.hxx.

99{
100 for (auto&& task : m_preprocess_functions) {
101 auto fut = std::async(std::launch::async, task, item);
102 }
103}

◆ postprocess_item()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::postprocess_item ( const ReadoutType * item)
overridevirtual

Postprocess one element.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Definition at line 61 of file TaskRawDataProcessorModel.hxx.

62{
63 for (size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
64 if (!m_items_to_postprocess_queues[i]->write(item)) {
65 ers::warning(PostprocessingNotKeepingUp(ERS_HERE, m_sourceid, i));
66 }
67 }
68}
#define ERS_HERE
void write(gtool::t_graph const &, std::string const &)
Definition gtool.cpp:221
void warning(const Issue &issue)
Definition ers.hpp:115

◆ preprocess_item()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::preprocess_item ( ReadoutType * item)
inlineoverridevirtual

◆ reset_last_daq_time()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::reset_last_daq_time ( )
inline

Definition at line 72 of file TaskRawDataProcessorModel.hpp.

72{ m_last_processed_daq_ts.store(0); }

◆ run_post_processing_thread()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::run_post_processing_thread ( std::function< void(const ReadoutType *)> & function,
folly::ProducerConsumerQueue< const ReadoutType * > & queue )
protected

Definition at line 107 of file TaskRawDataProcessorModel.hxx.

110{
111 while (m_run_marker.load() || queue.sizeGuess() > 0) {
112 const ReadoutType* element;
113 if (queue.read(element)) {
114 function(element);
115 } else {
116 std::this_thread::sleep_for(std::chrono::microseconds(50));
117 }
118 }
119}
ReadoutType
Which type of readout to use for TriggerDecision and DataRequest.
Definition Types.hpp:51

◆ scrap()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::scrap ( const nlohmann::json & args)
overridevirtual

◆ start()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::start ( const nlohmann::json & args)
overridevirtual

Start operation.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Reimplemented in dunedaq::fdreadoutlibs::TDEEthFrameProcessor, dunedaq::fdreadoutlibs::WIBEthFrameProcessor, dunedaq::trigger::TCProcessor, and dunedaq::trigger::TPProcessor.

Definition at line 34 of file TaskRawDataProcessorModel.hxx.

35{
36 // m_last_processed_daq_ts =
37 // std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
38 m_run_marker.store(true);
39 for (size_t i = 0; i < m_post_process_threads.size(); ++i) {
41 this,
42 std::ref(m_post_process_functions[i]),
43 std::ref(*m_items_to_postprocess_queues[i]));
44 }
45}
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)

◆ stop()

template<class ReadoutType >
void dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::stop ( const nlohmann::json & args)
overridevirtual

Stop operation.

Implements dunedaq::datahandlinglibs::RawDataProcessorConcept< ReadoutType >.

Reimplemented in dunedaq::fdreadoutlibs::TDEEthFrameProcessor, dunedaq::fdreadoutlibs::WIBEthFrameProcessor, dunedaq::trigger::TCProcessor, and dunedaq::trigger::TPProcessor.

Definition at line 49 of file TaskRawDataProcessorModel.hxx.

50{
51 m_run_marker.store(false);
52 for (auto& thread : m_post_process_threads) {
53 while (!thread->get_readiness()) {
54 std::this_thread::sleep_for(std::chrono::milliseconds(50));
55 }
56 }
57}

Member Data Documentation

◆ m_error_registry

template<class ReadoutType >
std::unique_ptr<FrameErrorRegistry>& dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_error_registry
protected

Definition at line 103 of file TaskRawDataProcessorModel.hpp.

◆ m_items_to_postprocess_queues

template<class ReadoutType >
std::vector<std::unique_ptr<folly::ProducerConsumerQueue<const ReadoutType*> > > dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_items_to_postprocess_queues
protected

Definition at line 115 of file TaskRawDataProcessorModel.hpp.

◆ m_last_processed_daq_ts

template<class ReadoutType >
std::atomic<uint64_t> dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_last_processed_daq_ts { 0 }
protected

Definition at line 123 of file TaskRawDataProcessorModel.hpp.

123{ 0 }; // NOLINT(build/unsigned)

◆ m_post_process_functions

template<class ReadoutType >
std::vector<std::function<void(const ReadoutType*)> > dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_post_process_functions
protected

Definition at line 114 of file TaskRawDataProcessorModel.hpp.

◆ m_post_process_threads

template<class ReadoutType >
std::vector<std::unique_ptr<utilities::ReusableThread> > dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_post_process_threads
protected

Definition at line 116 of file TaskRawDataProcessorModel.hpp.

◆ m_post_processing_enabled

template<class ReadoutType >
bool dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_post_processing_enabled
protected

Definition at line 108 of file TaskRawDataProcessorModel.hpp.

◆ m_postprocess_queue_sizes

template<class ReadoutType >
size_t dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_postprocess_queue_sizes
protected

Definition at line 119 of file TaskRawDataProcessorModel.hpp.

◆ m_preprocess_functions

template<class ReadoutType >
std::vector<std::function<void(ReadoutType*)> > dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_preprocess_functions
protected

Definition at line 102 of file TaskRawDataProcessorModel.hpp.

◆ m_run_marker

template<class ReadoutType >
std::atomic<bool> dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_run_marker { false }
protected

Definition at line 111 of file TaskRawDataProcessorModel.hpp.

111{ false };

◆ m_sourceid

template<class ReadoutType >
daqdataformats::SourceID dunedaq::datahandlinglibs::TaskRawDataProcessorModel< ReadoutType >::m_sourceid
protected

Definition at line 121 of file TaskRawDataProcessorModel.hpp.


The documentation for this class was generated from the following files: