4namespace datahandlinglibs {
6template<
class ReadoutType>
13 m_sourceid.subsystem = ReadoutType::subsystem;
15 for (
size_t i = 0; i < m_post_process_functions.size(); ++i) {
16 m_items_to_postprocess_queues.push_back(
17 std::make_unique<folly::ProducerConsumerQueue<const ReadoutType*>>(m_postprocess_queue_sizes));
18 m_post_process_threads[i]->set_name(cfg->get_thread_names_prefix() + std::to_string(i), m_sourceid.id);
22template<
class ReadoutType>
26 m_items_to_postprocess_queues.clear();
27 m_post_process_threads.clear();
28 m_post_process_functions.clear();
29 m_preprocess_functions.clear();
32template<
class ReadoutType>
38 m_run_marker.store(
true);
39 for (
size_t i = 0; i < m_post_process_threads.size(); ++i) {
42 std::ref(m_post_process_functions[i]),
43 std::ref(*m_items_to_postprocess_queues[i]));
47template<
class ReadoutType>
51 m_run_marker.store(
false);
52 for (
auto&
thread : m_post_process_threads) {
53 while (!
thread->get_readiness()) {
54 std::this_thread::sleep_for(std::chrono::milliseconds(50));
59template<
class ReadoutType>
63 for (
size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
64 if (!m_items_to_postprocess_queues[i]->write(item)) {
70template<
class ReadoutType>
71template<
typename Task>
75 m_preprocess_functions.push_back(std::forward<Task>(task));
78template<
class ReadoutType>
79template<
typename Task>
83 m_post_process_threads.emplace_back(std::make_unique<utilities::ReusableThread>(0));
84 m_post_process_functions.push_back(std::forward<Task>(task));
87template<
class ReadoutType>
91 for (
auto&& task : m_preprocess_functions) {
96template<
class ReadoutType>
100 for (
auto&& task : m_preprocess_functions) {
101 auto fut = std::async(std::launch::async, task, item);
105template<
class ReadoutType>
108 std::function<
void(
const ReadoutType*)>& function,
109 folly::ProducerConsumerQueue<const ReadoutType*>& queue)
111 while (m_run_marker.load() || queue.sizeGuess() > 0) {
112 const ReadoutType* element;
113 if (queue.read(element)) {
116 std::this_thread::sleep_for(std::chrono::microseconds(50));
121template<
class ReadoutType>
125 for (
size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
127 info.set_elements_queued(m_items_to_postprocess_queues[i]->sizeGuess());
128 this->publish(std::move(info), {{
"post_processor_id", std::to_string(i)}});
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_queue_sizes() const
Get "queue_sizes" attribute value.
void scrap(const nlohmann::json &) override
Unconfigure.
void start(const nlohmann::json &) override
Start operation.
void postprocess_item(const ReadoutType *item) override
Postprocess one element.
void add_preprocess_task(Task &&task)
void add_postprocess_task(Task &&task)
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
void launch_all_preprocess_functions(ReadoutType *item)
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
void stop(const nlohmann::json &) override
Stop operation.
virtual void generate_opmon_data() override
void invoke_all_preprocess_functions(ReadoutType *item)
void warning(const Issue &issue)