DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
FileSourceModel.hxx
Go to the documentation of this file.
1// Declarations for FileSourceModel
2
6
7using dunedaq::datahandlinglibs::CannotWriteToQueue;
8using dunedaq::datahandlinglibs::ConfigurationError;
12
13namespace dunedaq {
14namespace snbmodules {
15
16template<class ReadoutType>
17void
19{
20 m_raw_sender_conf = sink;
21}
22
23template<class ReadoutType>
24void
26 const appmodel::SNBFileSourceParameters* file_params)
27{
28 if (m_is_configured) {
29 TLOG_DEBUG(TLVL_WORK_STEPS) << "This emulator is already configured!";
30 } else {
31 // m_conf = args.get<module_conf_t>();
32 // m_link_conf = link_conf.get<link_conf_t>();
33 m_raw_sender_timeout_ms = std::chrono::milliseconds(1000);
34
35 m_sourceid.id = link_conf->get_source_id();
36 m_sourceid.subsystem = ReadoutType::subsystem;
37
38 m_file_names = file_params->get_data_files();
39 m_file_iterator = m_file_names.begin();
40
41 m_input_buffer_size = file_params->get_input_buffer_size();
42 m_compression_algorithm = file_params->get_file_compression_algorithm();
43
44 open_next_file();
45
46 m_is_configured = true;
47 }
48 // Configure thread:
49 m_producer_thread.set_name("fileread", m_sourceid.id);
50}
51
52template<class ReadoutType>
53void
55{
56 if (m_file_iterator == m_file_names.end()) {
57 if (m_file_reader) {
58 m_file_reader->close();
59 }
60 m_file_reader.reset();
61 m_is_configured = false;
62 return;
63 }
64 try {
65 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
66 *m_file_iterator, m_input_buffer_size, m_compression_algorithm);
67 ++m_file_iterator;
68 } catch (const ers::Issue& ex) {
69 ers::fatal(ex);
70 throw ConfigurationError(ERS_HERE, m_sourceid, "", ex);
71 }
72}
73
74template<class ReadoutType>
75void
76FileSourceModel<ReadoutType>::start(const appfwk::DAQModule::CommandData_t& /*args*/)
77{
78 m_packet_count_tot = 0;
79 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
80 // FIXME: don't know where to take the slowdown from... m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz /
81 // m_link_conf.slowdown);
82 m_rate_limiter = std::make_unique<datahandlinglibs::RateLimiter>(m_rate_khz);
83 // m_stats_thread.set_work(&FileSourceModel<ReadoutType>::run_stats, this);
84 m_producer_thread.set_work(&FileSourceModel<ReadoutType>::run_produce, this);
85}
86
87template<class ReadoutType>
88void
89FileSourceModel<ReadoutType>::stop(const appfwk::DAQModule::CommandData_t& /*args*/)
90{
91 while (!m_producer_thread.get_readiness()) {
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
93 }
94}
95
96template<class ReadoutType>
97void
99{
101 info.set_sum_packets(m_packet_count_tot.load());
102 info.set_num_packets(m_packet_count.exchange(0));
103
104 this->publish(std::move(info));
105}
106
107template<class ReadoutType>
108void
110{
111 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " started";
112
113 // pthread_setname_np(pthread_self(), get_name().c_str());
114
115 ReadoutType elem;
116 while (m_run_marker.load()) {
117 auto read_sts = m_file_reader->read(elem);
118
119 if (!read_sts) {
120 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Failed to read, moving to next file";
121 open_next_file();
122 if (!m_is_configured) {
123 break;
124 }
125 continue;
126 }
127
128 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Read element with timestamp " << elem.get_timestamp() << " from file";
129
130 // send it
131 while (m_raw_data_callback == nullptr && m_run_marker.load()) {
132 m_raw_data_callback =
133 datahandlinglibs::DataMoveCallbackRegistry::get()->get_callback<ReadoutType>(m_raw_sender_conf);
134 TLOG_DEBUG(TLVL_WORK_STEPS) << "Sender not set yet, waiting...";
135 std::this_thread::sleep_for(std::chrono::milliseconds(100));
136 }
137
138 if (!m_run_marker.load()) {
139 break;
140 }
141 (*m_raw_data_callback)(std::move(elem));
142
143 // Count packet and limit rate if needed.
144 ++m_packet_count;
145 ++m_packet_count_tot;
146
147 m_rate_limiter->limit();
148 }
149 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " finished";
150}
151
152} // namespace snbmodules
153} // namespace dunedaq
#define ERS_HERE
const std::string & get_file_compression_algorithm() const
Get "file_compression_algorithm" attribute value. Compression algorithm used when writing data.
const std::vector< std::string > & get_data_files() const
Get "data_files" attribute value.
uint32_t get_input_buffer_size() const
Get "input_buffer_size" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void stop(const appfwk::DAQModule::CommandData_t &)
virtual void generate_opmon_data() override
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
void set_sender(const appmodel::DataMoveCallbackConf *sink)
void start(const appfwk::DAQModule::CommandData_t &)
Base class for any user define issue.
Definition Issue.hpp:69
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Including Qt Headers.
void fatal(const Issue &issue)
Definition ers.hpp:88