DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TaskRawDataProcessorModel.hpp
Go to the documentation of this file.
1
9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_TASKRAWDATAPROCESSORMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_TASKRAWDATAPROCESSORMODEL_HPP_
11
13#include "logging/Logging.hpp"
18
20
26
28
29#include <folly/ProducerConsumerQueue.h>
30
31#include <atomic>
32#include <chrono>
33#include <functional>
34#include <future>
35#include <memory>
36#include <string>
37#include <utility>
38#include <vector>
39
41
42namespace dunedaq {
43namespace datahandlinglibs {
44
45template<class ReadoutType>
47{
48public:
49 // Excplicit constructor with error registry
50 explicit TaskRawDataProcessorModel(std::unique_ptr<FrameErrorRegistry>& error_registry, bool post_processing_enabled)
51 : RawDataProcessorConcept<ReadoutType>()
52 , m_error_registry(error_registry)
53 , m_post_processing_enabled(post_processing_enabled)
54 {}
55
56 // Destructor
58
59 // Configures the element pointer queue for the post-processors, and the SourceID
60 void conf(const appmodel::DataHandlerModule* conf) override;
61
62 // Clears elements to process, pre-proc pipeline, and post-proc functions
63 void scrap(const nlohmann::json& /*cfg*/) override;
64
65 // Starts the pre-processor pipeline and the parallel post-processor threads
66 void start(const nlohmann::json& /*args*/) override;
67
68 // Stops the pre-processor pipeline and the parallel post-processor threads
69 void stop(const nlohmann::json& /*args*/) override;
70
71 // Resets last known/processed DAQ timestamp
73
74 // Returns last processed ReadoutTyped element's DAQ timestamp
75 std::uint64_t get_last_daq_time() override { return m_last_processed_daq_ts.load(); } // NOLINT(build/unsigned)
76
77 // Registers ReadoutType item pointer to the pre-processing pipeline
78 void preprocess_item(ReadoutType* item) override { invoke_all_preprocess_functions(item); }
79
80 // Registers ReadoutType item pointer to to the post-processing queue
81 void postprocess_item(const ReadoutType* item) override;
82
83 // Registers a pre-processing task to the pre-processor pipeline
84 template<typename Task>
85 void add_preprocess_task(Task&& task);
86
87 // Registers a post-processing task to the parallel post-processors
88 template<typename Task>
89 void add_postprocess_task(Task&& task);
90
91 // Invokes all preprocessor functions as pipeline
92 void invoke_all_preprocess_functions(ReadoutType* item);
93
94 // Launches all preprocessor functions as async
95 void launch_all_preprocess_functions(ReadoutType* item);
96
97protected:
98 // Operational monitoring
99 virtual void generate_opmon_data() override;
100
101 // Pre-processing pipeline functions
102 std::vector<std::function<void(ReadoutType*)>> m_preprocess_functions;
103 std::unique_ptr<FrameErrorRegistry>& m_error_registry;
104
105 // Post-processing thread runner
106 void run_post_processing_thread(std::function<void(const ReadoutType*)>& function,
107 folly::ProducerConsumerQueue<const ReadoutType*>& queue);
109
110 // Run marker
111 std::atomic<bool> m_run_marker{ false };
112
113 // Post-processing functions and their corresponding threads
114 std::vector<std::function<void(const ReadoutType*)>> m_post_process_functions;
115 std::vector<std::unique_ptr<folly::ProducerConsumerQueue<const ReadoutType*>>> m_items_to_postprocess_queues;
116 std::vector<std::unique_ptr<utilities::ReusableThread>> m_post_process_threads;
117
118 // Internals
120 //uint32_t m_this_link_number; // NOLINT(build/unsigned)
122 //bool m_emulator_mode{ false };
123 std::atomic<uint64_t> m_last_processed_daq_ts{ 0 }; // NOLINT(build/unsigned)
124
125};
126
127} // namespace datahandlinglibs
128} // namespace dunedaq
129
130// Declarations
132
133#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_TASKRAWDATAPROCESSORMODEL_HPP_
void scrap(const nlohmann::json &) override
Unconfigure.
void start(const nlohmann::json &) override
Start operation.
void postprocess_item(const ReadoutType *item) override
Postprocess one element.
std::uint64_t get_last_daq_time() override
Get newest timestamp of last seen packet.
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::vector< std::function< void(ReadoutType *)> > m_preprocess_functions
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
void stop(const nlohmann::json &) override
Stop operation.
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
std::vector< std::function< void(const ReadoutType *)> > m_post_process_functions
TaskRawDataProcessorModel(std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const ReadoutType * > > > m_items_to_postprocess_queues
void preprocess_item(ReadoutType *item) override
Preprocess one element.
Including Qt Headers.
SourceID is a generalized representation of the source of a piece of data in the DAQ....
Definition SourceID.hpp:32