7using dunedaq::datahandlinglibs::CannotWriteToQueue;
8using dunedaq::datahandlinglibs::ConfigurationError;
16template<
class ReadoutType>
23template<
class ReadoutType>
52template<
class ReadoutType>
65 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
74template<
class ReadoutType>
87template<
class ReadoutType>
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
96template<
class ReadoutType>
104 this->
publish(std::move(info));
107template<
class ReadoutType>
135 std::this_thread::sleep_for(std::chrono::milliseconds(100));
141 (*m_raw_data_callback)(std::move(elem));
const std::string & get_file_compression_algorithm() const
Get "file_compression_algorithm" attribute value. Compression algorithm used when writing data.
const std::vector< std::string > & get_data_files() const
Get "data_files" attribute value.
uint32_t get_input_buffer_size() const
Get "input_buffer_size" attribute value.
uint32_t get_source_id() const
Get "source_id" attribute value.
static std::shared_ptr< DataMoveCallbackRegistry > get()
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
daqdataformats::SourceID m_sourceid
void stop(const appfwk::DAQModule::CommandData_t &)
std::unique_ptr< dunedaq::datahandlinglibs::RateLimiter > m_rate_limiter
std::chrono::milliseconds m_raw_sender_timeout_ms
std::atomic< int > m_packet_count
virtual void generate_opmon_data() override
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
std::atomic< int > m_packet_count_tot
std::unique_ptr< dunedaq::datahandlinglibs::BufferedFileReader< ReadoutType > > m_file_reader
utilities::ReusableThread m_producer_thread
std::atomic< bool > & m_run_marker
uint32_t m_input_buffer_size
void set_sender(const appmodel::DataMoveCallbackConf *sink)
std::vector< std::string > m_file_names
std::shared_ptr< std::function< void(ReadoutType &&)> > m_raw_data_callback
std::vector< std::string >::const_iterator m_file_iterator
const appmodel::DataMoveCallbackConf * m_raw_sender_conf
std::string m_compression_algorithm
void start(const appfwk::DAQModule::CommandData_t &)
Base class for any user define issue.
#define TLOG_DEBUG(lvl,...)
void fatal(const Issue &issue)