7using dunedaq::datahandlinglibs::CannotWriteToQueue;
8using dunedaq::datahandlinglibs::ConfigurationError;
16template<
class ReadoutType>
20 m_raw_sender_conf = sink;
23template<
class ReadoutType>
28 if (m_is_configured) {
29 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"This emulator is already configured!";
33 m_raw_sender_timeout_ms = std::chrono::milliseconds(1000);
36 m_sourceid.subsystem = ReadoutType::subsystem;
39 m_file_iterator = m_file_names.begin();
46 m_is_configured =
true;
49 m_producer_thread.set_name(
"fileread", m_sourceid.id);
52template<
class ReadoutType>
56 if (m_file_iterator == m_file_names.end()) {
58 m_file_reader->close();
60 m_file_reader.reset();
61 m_is_configured =
false;
65 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
66 *m_file_iterator, m_input_buffer_size, m_compression_algorithm);
70 throw ConfigurationError(
ERS_HERE, m_sourceid,
"", ex);
74template<
class ReadoutType>
78 m_packet_count_tot = 0;
79 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Starting threads...";
82 m_rate_limiter = std::make_unique<datahandlinglibs::RateLimiter>(m_rate_khz);
87template<
class ReadoutType>
91 while (!m_producer_thread.get_readiness()) {
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
96template<
class ReadoutType>
101 info.set_sum_packets(m_packet_count_tot.load());
102 info.set_num_packets(m_packet_count.exchange(0));
104 this->publish(std::move(info));
107template<
class ReadoutType>
111 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Data generation thread " << m_sourceid.to_string() <<
" started";
116 while (m_run_marker.load()) {
117 auto read_sts = m_file_reader->read(elem);
120 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Failed to read, moving to next file";
122 if (!m_is_configured) {
128 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Read element with timestamp " << elem.get_timestamp() <<
" from file";
131 while (m_raw_data_callback ==
nullptr && m_run_marker.load()) {
132 m_raw_data_callback =
134 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Sender not set yet, waiting...";
135 std::this_thread::sleep_for(std::chrono::milliseconds(100));
138 if (!m_run_marker.load()) {
141 (*m_raw_data_callback)(std::move(elem));
145 ++m_packet_count_tot;
147 m_rate_limiter->limit();
149 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Data generation thread " << m_sourceid.to_string() <<
" finished";
const std::string & get_file_compression_algorithm() const
Get "file_compression_algorithm" attribute value. Compression algorithm used when writing data.
const std::vector< std::string > & get_data_files() const
Get "data_files" attribute value.
uint32_t get_input_buffer_size() const
Get "input_buffer_size" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void stop(const appfwk::DAQModule::CommandData_t &)
virtual void generate_opmon_data() override
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
void set_sender(const appmodel::DataMoveCallbackConf *sink)
void start(const appfwk::DAQModule::CommandData_t &)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
void fatal(const Issue &issue)