DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
TaskRawDataProcessorModel.hxx
Go to the documentation of this file.
1// Declarations for TaskRawDataProcessorModel
2
3namespace dunedaq {
4namespace datahandlinglibs {
5
6template<class ReadoutType>
7void
9{
10 auto cfg = conf->get_module_configuration()->get_data_processor();
11 m_postprocess_queue_sizes = cfg->get_queue_sizes();
12 m_sourceid.id = conf->get_source_id();
13 m_sourceid.subsystem = ReadoutType::subsystem;
14
15 for (size_t i = 0; i < m_post_process_functions.size(); ++i) {
16 m_items_to_postprocess_queues.push_back(
17 std::make_unique<folly::ProducerConsumerQueue<const ReadoutType*>>(m_postprocess_queue_sizes));
18 m_post_process_threads[i]->set_name(cfg->get_thread_names_prefix() + std::to_string(i), m_sourceid.id);
19 }
20}
21
22template<class ReadoutType>
23void
25{
26 m_items_to_postprocess_queues.clear();
27 m_post_process_threads.clear();
28 m_post_process_functions.clear();
29 m_preprocess_functions.clear();
30}
31
32template<class ReadoutType>
33void
34TaskRawDataProcessorModel<ReadoutType>::start(const nlohmann::json& /*args*/)
35{
36 // m_last_processed_daq_ts =
37 // std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
38 m_run_marker.store(true);
39 for (size_t i = 0; i < m_post_process_threads.size(); ++i) {
41 this,
42 std::ref(m_post_process_functions[i]),
43 std::ref(*m_items_to_postprocess_queues[i]));
44 }
45}
46
47template<class ReadoutType>
48void
49TaskRawDataProcessorModel<ReadoutType>::stop(const nlohmann::json& /*args*/)
50{
51 m_run_marker.store(false);
52 for (auto& thread : m_post_process_threads) {
53 while (!thread->get_readiness()) {
54 std::this_thread::sleep_for(std::chrono::milliseconds(50));
55 }
56 }
57}
58
59template<class ReadoutType>
60void
62{
63 for (size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
64 if (!m_items_to_postprocess_queues[i]->write(item)) {
65 ers::warning(PostprocessingNotKeepingUp(ERS_HERE, m_sourceid, i));
66 }
67 }
68}
70template<class ReadoutType>
71template<typename Task>
72void
74{
75 m_preprocess_functions.push_back(std::forward<Task>(task));
76}
77
78template<class ReadoutType>
79template<typename Task>
80void
82{
83 m_post_process_threads.emplace_back(std::make_unique<utilities::ReusableThread>(0));
84 m_post_process_functions.push_back(std::forward<Task>(task));
86
87template<class ReadoutType>
88void
90{
91 for (auto&& task : m_preprocess_functions) {
92 task(item);
93 }
94}
96template<class ReadoutType>
97void
100 for (auto&& task : m_preprocess_functions) {
101 auto fut = std::async(std::launch::async, task, item);
102 }
103}
104
105template<class ReadoutType>
106void
108 std::function<void(const ReadoutType*)>& function,
109 folly::ProducerConsumerQueue<const ReadoutType*>& queue)
110{
111 while (m_run_marker.load() || queue.sizeGuess() > 0) {
112 const ReadoutType* element;
113 if (queue.read(element)) {
114 function(element);
115 } else {
116 std::this_thread::sleep_for(std::chrono::microseconds(50));
117 }
118 }
119}
120
121template<class ReadoutType>
122void
124{
125 for (size_t i = 0; i < m_items_to_postprocess_queues.size(); ++i) {
127 info.set_elements_queued(m_items_to_postprocess_queues[i]->sizeGuess());
128 this->publish(std::move(info), {{"post_processor_id", std::to_string(i)}});
129 }
130}
131
132} // namespace datahandlinglibs
133} // namespace dunedaq
#define ERS_HERE
const dunedaq::appmodel::DataProcessor * get_data_processor() const
Get "data_processor" relationship value.
const dunedaq::appmodel::DataHandlerConf * get_module_configuration() const
Get "module_configuration" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_queue_sizes() const
Get "queue_sizes" attribute value.
void scrap(const nlohmann::json &) override
Unconfigure.
void start(const nlohmann::json &) override
Start operation.
void postprocess_item(const ReadoutType *item) override
Postprocess one element.
void conf(const appmodel::DataHandlerModule *conf) override
Set the emulator mode, if active, timestamps of processed packets are overwritten with new ones.
void run_post_processing_thread(std::function< void(const ReadoutType *)> &function, folly::ProducerConsumerQueue< const ReadoutType * > &queue)
void stop(const nlohmann::json &) override
Stop operation.
Including Qt Headers.
void warning(const Issue &issue)
Definition ers.hpp:115