6template<
class ReadoutType>
10 auto cfg =
conf->get_module_configuration()->get_data_processor();
22template<
class ReadoutType>
32template<
class ReadoutType>
47template<
class ReadoutType>
53 while (!
thread->get_readiness()) {
54 std::this_thread::sleep_for(std::chrono::milliseconds(50));
59template<
class ReadoutType>
70template<
class ReadoutType>
71template<
typename Task>
78template<
class ReadoutType>
79template<
typename Task>
87template<
class ReadoutType>
96template<
class ReadoutType>
101 auto fut = std::async(std::launch::async, task, item);
105template<
class ReadoutType>
108 std::function<
void(
const ReadoutType*)>& function,
109 folly::ProducerConsumerQueue<const ReadoutType*>& queue)
112 const ReadoutType* element;
113 if (queue.read(element)) {
116 std::this_thread::sleep_for(std::chrono::microseconds(50));
121template<
class ReadoutType>
128 this->
publish(std::move(info), {{
"post_processor_id", std::to_string(i)}});
void start(const appfwk::DAQModule::CommandData_t &) override
Start operation.
void postprocess_item(const ReadoutType *item) override
Postprocess one element.
void add_preprocess_task(Task &&task)
size_t m_postprocess_queue_sizes
void add_postprocess_task(Task &&task)
daqdataformats::SourceID m_sourceid
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
std::vector< std::function< void(ReadoutType *)> > m_preprocess_functions
void stop(const appfwk::DAQModule::CommandData_t &) override
Stop operation.
void launch_all_preprocess_functions(ReadoutType *item)
std::atomic< bool > m_run_marker
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
std::vector< std::unique_ptr< utilities::ReusableThread > > m_post_process_threads
virtual void generate_opmon_data() override
std::vector< std::function< void(const ReadoutType *)> > m_post_process_functions
std::vector< std::unique_ptr< folly::ProducerConsumerQueue< const ReadoutType * > > > m_items_to_postprocess_queues
void scrap(const appfwk::DAQModule::CommandData_t &) override
Unconfigure.
void invoke_all_preprocess_functions(ReadoutType *item)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
void warning(const Issue &issue)