6using dunedaq::datahandlinglibs::CannotWriteToQueue;
7using dunedaq::datahandlinglibs::ConfigurationError;
15template<
class ReadoutType>
19 if (!m_sender_is_set) {
21 m_sender_is_set =
true;
27template<
class ReadoutType>
32 if (m_is_configured) {
33 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"This emulator is already configured!";
37 m_raw_sender_timeout_ms = std::chrono::milliseconds(1000);
40 m_sourceid.subsystem = ReadoutType::subsystem;
43 m_file_iterator = m_file_names.begin();
50 m_is_configured =
true;
53 m_producer_thread.set_name(
"fileread", m_sourceid.id);
56template<
class ReadoutType>
60 if (m_file_iterator == m_file_names.end()) {
62 m_file_reader->close();
64 m_file_reader.reset();
65 m_is_configured =
false;
69 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
70 *m_file_iterator, m_input_buffer_size, m_compression_algorithm);
74 throw ConfigurationError(
ERS_HERE, m_sourceid,
"", ex);
78template<
class ReadoutType>
82 m_packet_count_tot = 0;
83 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Starting threads...";
86 m_rate_limiter = std::make_unique<datahandlinglibs::RateLimiter>(m_rate_khz);
91template<
class ReadoutType>
95 while (!m_producer_thread.get_readiness()) {
96 std::this_thread::sleep_for(std::chrono::milliseconds(100));
100template<
class ReadoutType>
105 info.set_sum_packets(m_packet_count_tot.load());
106 info.set_num_packets(m_packet_count.exchange(0));
108 this->publish(std::move(info));
111template<
class ReadoutType>
115 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Data generation thread " << m_sourceid.to_string() <<
" started";
120 while (m_run_marker.load()) {
121 auto read_sts = m_file_reader->read(elem);
124 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Failed to read, moving to next file";
126 if (!m_is_configured) {
132 TLOG_DEBUG(TLVL_BOOKKEEPING) <<
"Read element with timestamp " << elem.get_timestamp() <<
" from file";
135 bool send_successful =
false;
136 while (!send_successful && m_run_marker.load()) {
137 ReadoutType elem_copy(elem);
138 send_successful = m_raw_data_sender->try_send(std::move(elem_copy), m_raw_sender_timeout_ms);
143 ++m_packet_count_tot;
145 m_rate_limiter->limit();
147 TLOG_DEBUG(TLVL_WORK_STEPS) <<
"Data generation thread " << m_sourceid.to_string() <<
" finished";
const std::string & get_file_compression_algorithm() const
Get "file_compression_algorithm" attribute value. Compression algorithm used when writing data.
const std::vector< std::string > & get_data_files() const
Get "data_files" attribute value.
uint32_t get_input_buffer_size() const
Get "input_buffer_size" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
void stop(const appfwk::DAQModule::CommandData_t &)
void set_sender(const std::string &conn_name)
virtual void generate_opmon_data() override
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
void start(const appfwk::DAQModule::CommandData_t &)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
void fatal(const Issue &issue)