Line data Source code
1 : /**
2 : * @file TPStreamWriterModule.hpp
3 : *
4 : * TPStreamWriterModule is a DAQModule that provides sample code for writing TPSets to disk.
5 : *
6 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
7 : * Licensing/copyright details are in the COPYING file that you should have
8 : * received with this code.
9 : */
10 :
11 : #ifndef DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_
12 : #define DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_
13 :
14 : #include "dfmodules/DataStore.hpp"
15 :
16 : #include "appfwk/DAQModule.hpp"
17 : #include "appmodel/TPStreamWriterConf.hpp"
18 : #include "iomanager/Receiver.hpp"
19 : #include "daqdataformats/TimeSlice.hpp"
20 : #include "trigger/TPSet.hpp"
21 : #include "utilities/WorkerThread.hpp"
22 : #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
23 :
24 : #include <memory>
25 : #include <string>
26 :
27 : namespace dunedaq {
28 : namespace dfmodules {
29 :
30 : /**
31 : * @brief TPStreamWriterModule receives TPSets from a queue and prints them out
32 : */
33 : class TPStreamWriterModule : public dunedaq::appfwk::DAQModule
34 : {
35 : public:
36 : /**
37 : * @brief TPStreamWriterModule Constructor
38 : * @param name Instance name for this TPStreamWriterModule instance
39 : */
40 : explicit TPStreamWriterModule(const std::string& name);
41 :
42 : TPStreamWriterModule(const TPStreamWriterModule&) = delete; ///< TPStreamWriterModule is not copy-constructible
43 : TPStreamWriterModule& operator=(const TPStreamWriterModule&) = delete; ///< TPStreamWriterModule is not copy-assignable
44 : TPStreamWriterModule(TPStreamWriterModule&&) = delete; ///< TPStreamWriterModule is not move-constructible
45 : TPStreamWriterModule& operator=(TPStreamWriterModule&&) = delete; ///< TPStreamWriterModule is not move-assignable
46 :
47 : void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
48 : void generate_opmon_data() override;
49 :
50 : private:
51 : // Commands
52 : void do_conf(const CommandData_t&);
53 : void do_start(const CommandData_t&);
54 : void do_stop(const CommandData_t&);
55 : void do_scrap(const CommandData_t&);
56 :
57 : // Threading
58 : dunedaq::utilities::WorkerThread m_thread;
59 : void do_work(std::atomic<bool>&);
60 :
61 : // Configuration
62 :
63 : std::shared_ptr<appfwk::ConfigurationManager> m_module_configuration;
64 : const appmodel::TPStreamWriterConf* m_tp_writer_conf;
65 : std::chrono::milliseconds m_queue_timeout;
66 : size_t m_accumulation_interval_ticks;
67 : std::chrono::steady_clock::duration m_accumulation_inactivity_time_before_write;
68 : daqdataformats::run_number_t m_run_number;
69 : uint32_t m_source_id; // NOLINT(build/unsigned)
70 : bool m_warn_user_when_tardy_tps_are_discarded;
71 : double m_accumulation_interval_seconds;
72 : std::string m_writer_identifier;
73 :
74 : // Queue sources and sinks
75 : using source_t = iomanager::ReceiverConcept<trigger::TPSet>;
76 : std::shared_ptr<source_t> m_tpset_source;
77 :
78 : // Worker(s)
79 : std::shared_ptr<DataStore> m_data_writer;
80 :
81 : // Metrics
82 : std::atomic<uint64_t> m_heartbeat_tpsets = { 0 }; // NOLINT(build/unsigned)
83 : std::atomic<uint64_t> m_tpsets_with_tps = { 0 }; // NOLINT(build/unsigned)
84 : std::atomic<uint64_t> m_tps_received = { 0 }; // NOLINT(build/unsigned)
85 : std::atomic<uint64_t> m_tps_written = { 0 }; // NOLINT(build/unsigned)
86 : std::atomic<uint64_t> m_tps_discarded = { 0 }; // NOLINT(build/unsigned)
87 : std::atomic<uint64_t> m_timeslices_written = { 0 }; // NOLINT(build/unsigned)
88 : std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
89 : std::atomic<double> m_tardy_timeslice_max_seconds = { 0.0 }; // NOLINT(build/unsigned)
90 : std::atomic<uint64_t> m_total_tps_received = { 0 }; // NOLINT(build/unsigned)
91 : std::atomic<uint64_t> m_total_tps_written = { 0 }; // NOLINT(build/unsigned)
92 : std::atomic<uint64_t> m_total_tps_discarded = { 0 };// NOLINT(build/unsigned)
93 : };
94 : } // namespace dfmodules
95 :
96 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
97 : InvalidDataWriterModule,
98 : appfwk::GeneralDAQModuleIssue,
99 : "A valid dataWriter instance is not available so it will not be possible to write data. A "
100 : "likely cause for this is a skipped or missed Configure transition.",
101 : ((std::string)name),
102 : ERS_EMPTY)
103 :
104 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
105 : DataWritingProblem,
106 : appfwk::GeneralDAQModuleIssue,
107 : "A problem was encountered when writing TimeSlice number " << trnum << " in run " << runnum,
108 : ((std::string)name),
109 : ((size_t)trnum)((size_t)runnum))
110 :
111 0 : ERS_DECLARE_ISSUE_BASE(dfmodules,
112 : TardyTPsDiscarded,
113 : appfwk::GeneralDAQModuleIssue,
114 : "Tardy TPs from SourceIDs [" << sid_list << "] were discarded from TimeSlice number "
115 : << trnum << " (~" << sec_too_late << " sec too late)",
116 : ((std::string)name),
117 : ((std::string)sid_list)((size_t)trnum)((float)sec_too_late))
118 :
119 : } // namespace dunedaq
120 :
121 : #endif // DFMODULES_PLUGINS_TPSTREAMWRITER_HPP_
|