DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
FileSourceModel.hxx
Go to the documentation of this file.
1// Declarations for FileSourceModel
2
5
6using dunedaq::datahandlinglibs::CannotWriteToQueue;
7using dunedaq::datahandlinglibs::ConfigurationError;
11
12namespace dunedaq {
13namespace snbmodules {
14
15template<class ReadoutType>
16void
17FileSourceModel<ReadoutType>::set_sender(const std::string& conn_name)
18{
19 if (!m_sender_is_set) {
20 m_raw_data_sender = get_iom_sender<ReadoutType>(conn_name);
21 m_sender_is_set = true;
22 } else {
23 // ers::error();
24 }
25}
26
27template<class ReadoutType>
28void
30 const appmodel::SNBFileSourceParameters* file_params)
31{
32 if (m_is_configured) {
33 TLOG_DEBUG(TLVL_WORK_STEPS) << "This emulator is already configured!";
34 } else {
35 // m_conf = args.get<module_conf_t>();
36 // m_link_conf = link_conf.get<link_conf_t>();
37 m_raw_sender_timeout_ms = std::chrono::milliseconds(1000);
38
39 m_sourceid.id = link_conf->get_source_id();
40 m_sourceid.subsystem = ReadoutType::subsystem;
41
42 m_file_names = file_params->get_data_files();
43 m_file_iterator = m_file_names.begin();
44
45 m_input_buffer_size = file_params->get_input_buffer_size();
46 m_compression_algorithm = file_params->get_file_compression_algorithm();
47
48 open_next_file();
49
50 m_is_configured = true;
51 }
52 // Configure thread:
53 m_producer_thread.set_name("fileread", m_sourceid.id);
54}
55
56template<class ReadoutType>
57void
59{
60 if (m_file_iterator == m_file_names.end()) {
61 if (m_file_reader) {
62 m_file_reader->close();
63 }
64 m_file_reader.reset();
65 m_is_configured = false;
66 return;
67 }
68 try {
69 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
70 *m_file_iterator, m_input_buffer_size, m_compression_algorithm);
71 ++m_file_iterator;
72 } catch (const ers::Issue& ex) {
73 ers::fatal(ex);
74 throw ConfigurationError(ERS_HERE, m_sourceid, "", ex);
75 }
76}
77
78template<class ReadoutType>
79void
80FileSourceModel<ReadoutType>::start(const appfwk::DAQModule::CommandData_t& /*args*/)
81{
82 m_packet_count_tot = 0;
83 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
84 // FIXME: don't know where to take the slowdown from... m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz /
85 // m_link_conf.slowdown);
86 m_rate_limiter = std::make_unique<datahandlinglibs::RateLimiter>(m_rate_khz);
87 // m_stats_thread.set_work(&FileSourceModel<ReadoutType>::run_stats, this);
88 m_producer_thread.set_work(&FileSourceModel<ReadoutType>::run_produce, this);
89}
90
91template<class ReadoutType>
92void
93FileSourceModel<ReadoutType>::stop(const appfwk::DAQModule::CommandData_t& /*args*/)
94{
95 while (!m_producer_thread.get_readiness()) {
96 std::this_thread::sleep_for(std::chrono::milliseconds(100));
97 }
98}
99
100template<class ReadoutType>
101void
103{
105 info.set_sum_packets(m_packet_count_tot.load());
106 info.set_num_packets(m_packet_count.exchange(0));
107
108 this->publish(std::move(info));
109}
110
111template<class ReadoutType>
112void
114{
115 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " started";
116
117 // pthread_setname_np(pthread_self(), get_name().c_str());
118
119 ReadoutType elem;
120 while (m_run_marker.load()) {
121 auto read_sts = m_file_reader->read(elem);
122
123 if (!read_sts) {
124 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Failed to read, moving to next file";
125 open_next_file();
126 if (!m_is_configured) {
127 break;
128 }
129 continue;
130 }
131
132 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Read element with timestamp " << elem.get_timestamp() << " from file";
133
134 // send it
135 bool send_successful = false;
136 while (!send_successful && m_run_marker.load()) {
137 ReadoutType elem_copy(elem);
138 send_successful = m_raw_data_sender->try_send(std::move(elem_copy), m_raw_sender_timeout_ms);
139 }
140
141 // Count packet and limit rate if needed.
142 ++m_packet_count;
143 ++m_packet_count_tot;
144
145 m_rate_limiter->limit();
146 }
147 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " finished";
148}
149
150} // namespace snbmodules
151} // namespace dunedaq
#define ERS_HERE
const std::string & get_file_compression_algorithm() const
Get "file_compression_algorithm" attribute value. Compression algorithm used when writing data.
const std::vector< std::string > & get_data_files() const
Get "data_files" attribute value.
uint32_t get_input_buffer_size() const
Get "input_buffer_size" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
void stop(const appfwk::DAQModule::CommandData_t &)
void set_sender(const std::string &conn_name)
virtual void generate_opmon_data() override
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
void start(const appfwk::DAQModule::CommandData_t &)
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
The DUNE-DAQ namespace.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void fatal(const Issue &issue)
Definition ers.hpp:88