DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
RecorderModel.hxx
Go to the documentation of this file.
1// Declarations for RecorderModel
2
3namespace dunedaq {
4namespace datahandlinglibs {
5
6template<class ReadoutType>
7void
9{
10 for (auto input : conf->get_inputs()) {
11 try {
12 m_data_receiver = get_iom_receiver<ReadoutType>(input->UID());
13 } catch (const ers::Issue& excpt) {
14 throw ResourceQueueError(ERS_HERE, "raw_recording", "RecorderModel");
15 }
16 }
17 m_output_file = conf->get_configuration()->get_output_file();
18 m_stream_buffer_size = conf->get_configuration()->get_streaming_buffer_size();
19 m_compression_algorithm = conf->get_configuration()->get_compression_algorithm();
20 m_use_o_direct = conf->get_configuration()->get_use_o_direct();
21}
22
23template<class ReadoutType>
24void
26{
28 info.set_recording_status("Y");
29 info.set_packets_recorded(m_packets_processed.exchange(0));
30 info.set_bytes_recorded(m_bytes_processed.exchange(0));
31}
32
33template<class ReadoutType>
34void
35RecorderModel<ReadoutType>::do_conf(const nlohmann::json& /* args */)
36{
37
38 if (remove(m_output_file.c_str()) == 0) {
39 TLOG(TLVL_WORK_STEPS) << "Removed existing output file from previous run" << std::endl;
40 }
41
42 m_buffered_writer.open(
43 m_output_file, m_stream_buffer_size, m_compression_algorithm, m_use_o_direct);
44 m_work_thread.set_name(m_name, 0);
45}
46
47template<class ReadoutType>
48void
49RecorderModel<ReadoutType>::do_start(const nlohmann::json& /* args */)
50{
51 m_packets_processed = 0;
52 m_bytes_processed = 0;
53
54 m_run_marker.store(true);
55 m_work_thread.set_work(&RecorderModel<ReadoutType>::do_work, this);
56}
57
58template<class ReadoutType>
59void
60RecorderModel<ReadoutType>::do_stop(const nlohmann::json& /* args */)
61{
62 m_run_marker.store(false);
63 while (!m_work_thread.get_readiness()) {
64 std::this_thread::sleep_for(std::chrono::milliseconds(100));
65 }
66}
67
68template<class ReadoutType>
69void
71{
72 m_time_point_last_info = std::chrono::steady_clock::now();
73
74 ReadoutType element;
75 while (m_run_marker) {
76 try {
77 element = m_data_receiver->receive(std::chrono::milliseconds(100)); // RS -> Use confed timeout?
78 if (!m_buffered_writer.write(reinterpret_cast<char*>(&element), sizeof(element))) { // NOLINT
79 ers::warning(CannotWriteToFile(ERS_HERE, m_output_file));
80 break;
81 }
82 m_packets_processed++;
83 m_bytes_processed+=sizeof(element);
84 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
85 continue;
86 }
87 }
88 m_buffered_writer.flush();
89}
90
91} // namespace datahandlinglibs
92} // namespace dunedaq
#define ERS_HERE
const std::string & get_output_file() const
Get "output_file" attribute value.
bool get_use_o_direct() const
Get "use_o_direct" attribute value. Whether to use O_DIRECT flag when opening files.
uint32_t get_streaming_buffer_size() const
Get "streaming_buffer_size" attribute value.
const std::string & get_compression_algorithm() const
Get "compression_algorithm" attribute value.
const dunedaq::appmodel::DataRecorderConf * get_configuration() const
Get "configuration" relationship value.
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
void init(const appmodel::DataRecorderModule *conf) override
void do_start(const nlohmann::json &) override
virtual void generate_opmon_data() override
void do_conf(const nlohmann::json &) override
void do_stop(const nlohmann::json &) override
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)
Definition ers.hpp:115