9#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_TASKRAWDATAPROCESSORMODEL_HPP_
10#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_MODELS_TASKRAWDATAPROCESSORMODEL_HPP_
29#include <folly/ProducerConsumerQueue.h>
43namespace datahandlinglibs {
45template<
class ReadoutType>
63 void scrap(
const nlohmann::json& )
override;
66 void start(
const nlohmann::json& )
override;
69 void stop(
const nlohmann::json& )
override;
84 template<
typename Task>
88 template<
typename Task>
107 folly::ProducerConsumerQueue<const ReadoutType*>& queue);
void scrap(const nlohmann::json &) override
Unconfigure.
void start(const nlohmann::json &) override
Start operation.
void postprocess_item(const ReadoutType *item) override
Postprocess one element.
std::uint64_t get_last_daq_time() override
Get newest timestamp of last seen packet.
~TaskRawDataProcessorModel()
void add_preprocess_task(Task &&task)
size_t m_postprocess_queue_sizes
void add_postprocess_task(Task &&task)
daqdataformats::SourceID m_sourceid
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
bool m_post_processing_enabled
std::vector< std::function< void(ReadoutType *)> > m_preprocess_functions
void launch_all_preprocess_functions(ReadoutType *item)
std::atomic< bool > m_run_marker
void reset_last_daq_time()
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
void stop(const nlohmann::json &) override
Stop operation.
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
virtual void generate_opmon_data() override
std::vector< std::function< void(const ReadoutType *)> > m_post_process_functions
TaskRawDataProcessorModel(std::unique_ptr< FrameErrorRegistry > &error_registry, bool post_processing_enabled)
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const ReadoutType * > > > m_items_to_postprocess_queues
std::atomic< uint64_t > m_last_processed_daq_ts
void preprocess_item(ReadoutType *item) override
Preprocess one element.
std::unique_ptr< FrameErrorRegistry > & m_error_registry
void invoke_all_preprocess_functions(ReadoutType *item)