Line data Source code
1 : /**
2 : * @file DataWriterModule.hpp
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #ifndef DFMODULES_PLUGINS_DATAWRITER_HPP_
10 : #define DFMODULES_PLUGINS_DATAWRITER_HPP_
11 :
12 : #include "dfmodules/DataStore.hpp"
13 :
14 : #include "appfwk/DAQModule.hpp"
15 : #include "appmodel/DataWriterConf.hpp"
16 : #include "daqdataformats/TriggerRecord.hpp"
17 : #include "dfmessages/TriggerDecisionToken.hpp"
18 : #include "iomanager/Receiver.hpp"
19 : #include "iomanager/Sender.hpp"
20 : #include "utilities/WorkerThread.hpp"
21 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
22 :
23 : #include <chrono>
24 : #include <map>
25 : #include <memory>
26 : #include <string>
27 : #include <vector>
28 :
29 : namespace dunedaq {
30 : namespace dfmodules {
31 :
32 : /**
33 : * @brief DataWriterModule is a shell for what we might write for the MiniDAQApp.
34 : */
35 : class DataWriterModule : public dunedaq::appfwk::DAQModule
36 : {
37 : public:
38 : /**
39 : * @brief DataWriterModule Constructor
40 : * @param name Instance name for this DataWriterModule instance
41 : */
42 : explicit DataWriterModule(const std::string& name);
43 :
44 : DataWriterModule(const DataWriterModule&) = delete; ///< DataWriterModule is not copy-constructible
45 : DataWriterModule& operator=(const DataWriterModule&) = delete; ///< DataWriterModule is not copy-assignable
46 : DataWriterModule(DataWriterModule&&) = delete; ///< DataWriterModule is not move-constructible
47 : DataWriterModule& operator=(DataWriterModule&&) = delete; ///< DataWriterModule is not move-assignable
48 :
49 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
50 : void generate_opmon_data() override;
51 :
52 : private:
53 : // Commands
54 : void do_conf(const CommandData_t&);
55 : void do_start(const CommandData_t&);
56 : void do_stop(const CommandData_t&);
57 : void do_scrap(const CommandData_t&);
58 :
59 : // Callback
60 : void receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>&);
61 : std::atomic<bool> m_running = false;
62 :
63 : // Configuration
64 : std::shared_ptr<appfwk::ConfigurationManager> m_module_configuration;
65 : const appmodel::DataWriterConf* m_data_writer_conf;
66 : std::string m_writer_identifier;
67 :
68 : // size_t m_sleep_msec_while_running;
69 : std::chrono::milliseconds m_queue_timeout;
70 : bool m_data_storage_is_enabled;
71 : int m_data_storage_prescale;
72 : daqdataformats::run_number_t m_run_number;
73 : size_t m_min_write_retry_time_usec;
74 : size_t m_max_write_retry_time_usec;
75 : int m_write_retry_time_increase_factor;
76 :
77 : // Connections
78 : std::string m_trigger_record_connection;
79 : using tr_receiver_ct = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::TriggerRecord>>;
80 : std::shared_ptr<tr_receiver_ct> m_tr_receiver;
81 :
82 : using token_sender_t = iomanager::SenderConcept<dfmessages::TriggerDecisionToken>;
83 : std::shared_ptr<token_sender_t> m_token_output;
84 : std::string m_trigger_decision_connection;
85 :
86 : // Worker(s)
87 : dunedaq::utilities::WorkerThread m_thread;
88 : void do_work(std::atomic<bool>&);
89 :
90 : std::shared_ptr<DataStore> m_data_writer;
91 :
92 : // Metrics
93 : std::atomic<uint64_t> m_records_received = { 0 }; // NOLINT(build/unsigned)
94 : std::atomic<uint64_t> m_records_received_tot = { 0 }; // NOLINT(build/unsigned)
95 : std::atomic<uint64_t> m_records_written = { 0 }; // NOLINT(build/unsigned)
96 : std::atomic<uint64_t> m_records_written_tot = { 0 }; // NOLINT(build/unsigned)
97 : std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
98 : std::atomic<uint64_t> m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned)
99 : std::atomic<uint64_t> m_writing_us = { 0 }; // NOLINT(build/unsigned)
100 :
101 :
102 : // Other
103 : std::map<daqdataformats::trigger_number_t, size_t> m_seqno_counts;
104 :
105 : inline double elapsed_seconds(std::chrono::steady_clock::time_point then,
106 : std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const
107 : {
108 : return std::chrono::duration_cast<std::chrono::seconds>(now - then).count();
109 : }
110 : };
111 : } // namespace dfmodules
112 :
113 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
114 : InvalidDataWriterModule,
115 : appfwk::GeneralDAQModuleIssue,
116 : "A valid dataWriter instance is not available so it will not be possible to write data. A "
117 : "likely cause for this is a skipped or missed Configure transition.",
118 : ((std::string)name),
119 : ERS_EMPTY)
120 :
121 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
122 : DataWritingProblem,
123 : appfwk::GeneralDAQModuleIssue,
124 : "A problem was encountered when writing TriggerRecord number " << trnum << "." << seqnum
125 : << " in run " << runnum,
126 : ((std::string)name),
127 : ((size_t)trnum)((size_t)seqnum)((size_t)runnum))
128 :
129 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
130 : InvalidRunNumber,
131 : appfwk::GeneralDAQModuleIssue,
132 : "An invalid run number was received in a "
133 : << msg_type << " message, "
134 : << "received=" << received << ", expected=" << expected << ", trig/seq_number=" << trnum << "."
135 : << seqnum,
136 : ((std::string)name),
137 : ((std::string)msg_type)((size_t)received)((size_t)expected)((size_t)trnum)((size_t)seqnum))
138 :
139 : } // namespace dunedaq
140 :
141 : #endif // DFMODULES_PLUGINS_DATAWRITER_HPP_
|