DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
dunedaq::snbmodules::FileSourceModel< ReadoutType > Class Template Reference

#include <FileSourceModel.hpp>

Inheritance diagram for dunedaq::snbmodules::FileSourceModel< ReadoutType >:
[legend]
Collaboration diagram for dunedaq::snbmodules::FileSourceModel< ReadoutType >:
[legend]

Public Member Functions

 FileSourceModel (std::string name, std::atomic< bool > &run_marker, double rate_khz)
 
void set_sender (const appmodel::DataMoveCallbackConf *sink)
 
void conf (const confmodel::DetectorStream *stream_conf, const appmodel::SNBFileSourceParameters *file_params)
 
void scrap (const appfwk::DAQModule::CommandData_t &)
 
bool is_configured () override
 
void start (const appfwk::DAQModule::CommandData_t &)
 
void stop (const appfwk::DAQModule::CommandData_t &)
 
- Public Member Functions inherited from dunedaq::snbmodules::FileSourceConcept
 FileSourceConcept ()
 
virtual ~FileSourceConcept ()
 
 FileSourceConcept (const FileSourceConcept &)=delete
 FileSourceConcept is not copy-constructible.
 
FileSourceConceptoperator= (const FileSourceConcept &)=delete
 FileSourceConcept is not copy-assginable.
 
 FileSourceConcept (FileSourceConcept &&)=delete
 FileSourceConcept is not move-constructible.
 
FileSourceConceptoperator= (FileSourceConcept &&)=delete
 FileSourceConcept is not move-assignable.
 
- Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject (const MonitorableObject &)=delete
 
MonitorableObjectoperator= (const MonitorableObject &)=delete
 
 MonitorableObject (MonitorableObject &&)=delete
 
MonitorableObjectoperator= (MonitorableObject &&)=delete
 
virtual ~MonitorableObject ()=default
 
auto get_opmon_id () const noexcept
 
auto get_opmon_level () const noexcept
 

Protected Member Functions

void run_produce ()
 
virtual void generate_opmon_data () override
 
void open_next_file ()
 
- Protected Member Functions inherited from dunedaq::opmonlib::MonitorableObject
 MonitorableObject ()=default
 
void register_node (ElementId name, NewNodePtr)
 
void publish (google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
 

Private Attributes

std::atomic< bool > & m_run_marker
 
std::string m_name
 
bool m_is_configured = false
 
double m_rate_khz
 
std::vector< std::string > m_file_names
 
std::vector< std::string >::const_iterator m_file_iterator
 
uint32_t m_input_buffer_size
 
std::string m_compression_algorithm
 
daqdataformats::SourceID m_sourceid
 
std::atomic< int > m_packet_count { 0 }
 
std::atomic< int > m_packet_count_tot { 0 }
 
std::chrono::milliseconds m_raw_sender_timeout_ms
 
const appmodel::DataMoveCallbackConfm_raw_sender_conf { nullptr }
 
std::shared_ptr< std::function< void(ReadoutType &&)> > m_raw_data_callback { nullptr }
 
std::unique_ptr< dunedaq::datahandlinglibs::RateLimiterm_rate_limiter
 
std::unique_ptr< dunedaq::datahandlinglibs::BufferedFileReader< ReadoutType > > m_file_reader
 
utilities::ReusableThread m_producer_thread
 

Additional Inherited Members

- Public Types inherited from dunedaq::opmonlib::MonitorableObject
using NodePtr = std::weak_ptr<MonitorableObject>
 
using NewNodePtr = std::shared_ptr<MonitorableObject>
 
using ElementId = std::string
 
- Static Public Member Functions inherited from dunedaq::opmonlib::MonitorableObject
static bool publishable_metric (OpMonLevel entry, OpMonLevel system) noexcept
 

Detailed Description

template<class ReadoutType>
class dunedaq::snbmodules::FileSourceModel< ReadoutType >

Definition at line 44 of file FileSourceModel.hpp.

Constructor & Destructor Documentation

◆ FileSourceModel()

template<class ReadoutType >
dunedaq::snbmodules::FileSourceModel< ReadoutType >::FileSourceModel ( std::string name,
std::atomic< bool > & run_marker,
double rate_khz )
inlineexplicit

Definition at line 47 of file FileSourceModel.hpp.

49 , m_name(name)
50 , m_rate_khz(rate_khz)
51 , m_packet_count{ 0 }
54 {
55 }
std::chrono::milliseconds m_raw_sender_timeout_ms
utilities::ReusableThread m_producer_thread
std::atomic< bool > run_marker
Global atomic for process lifetime.

Member Function Documentation

◆ conf()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::conf ( const confmodel::DetectorStream * stream_conf,
const appmodel::SNBFileSourceParameters * file_params )
virtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 25 of file FileSourceModel.hxx.

27{
28 if (m_is_configured) {
29 TLOG_DEBUG(TLVL_WORK_STEPS) << "This emulator is already configured!";
30 } else {
31 // m_conf = args.get<module_conf_t>();
32 // m_link_conf = link_conf.get<link_conf_t>();
33 m_raw_sender_timeout_ms = std::chrono::milliseconds(1000);
34
35 m_sourceid.id = link_conf->get_source_id();
36 m_sourceid.subsystem = ReadoutType::subsystem;
37
38 m_file_names = file_params->get_data_files();
40
41 m_input_buffer_size = file_params->get_input_buffer_size();
42 m_compression_algorithm = file_params->get_file_compression_algorithm();
43
45
46 m_is_configured = true;
47 }
48 // Configure thread:
50}
std::vector< std::string > m_file_names
std::vector< std::string >::const_iterator m_file_iterator
void set_name(const std::string &name, int tid)
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Subsystem subsystem
The general subsystem of the source of the data.
Definition SourceID.hpp:69
ID_t id
Unique identifier of the source of the data.
Definition SourceID.hpp:74

◆ generate_opmon_data()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::generate_opmon_data ( )
overrideprotectedvirtual

Hook for customisable pubblication. The function can throw, exception will be caught by the monitoring thread

Reimplemented from dunedaq::opmonlib::MonitorableObject.

Definition at line 98 of file FileSourceModel.hxx.

99{
100 datahandlinglibs::opmon::DataSourceInfo info;
101 info.set_sum_packets(m_packet_count_tot.load());
102 info.set_num_packets(m_packet_count.exchange(0));
103
104 this->publish(std::move(info));
105}
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept

◆ is_configured()

template<class ReadoutType >
bool dunedaq::snbmodules::FileSourceModel< ReadoutType >::is_configured ( )
inlineoverridevirtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 69 of file FileSourceModel.hpp.

69{ return m_is_configured; }

◆ open_next_file()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::open_next_file ( )
protected

Definition at line 54 of file FileSourceModel.hxx.

55{
56 if (m_file_iterator == m_file_names.end()) {
57 if (m_file_reader) {
58 m_file_reader->close();
59 }
60 m_file_reader.reset();
61 m_is_configured = false;
62 return;
63 }
64 try {
65 m_file_reader = std::make_unique<datahandlinglibs::BufferedFileReader<ReadoutType>>(
68 } catch (const ers::Issue& ex) {
69 ers::fatal(ex);
70 throw ConfigurationError(ERS_HERE, m_sourceid, "", ex);
71 }
72}
#define ERS_HERE
std::unique_ptr< dunedaq::datahandlinglibs::BufferedFileReader< ReadoutType > > m_file_reader
Base class for any user define issue.
Definition Issue.hpp:69
void fatal(const Issue &issue)
Definition ers.hpp:88

◆ run_produce()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::run_produce ( )
protected

Definition at line 109 of file FileSourceModel.hxx.

110{
111 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " started";
112
113 // pthread_setname_np(pthread_self(), get_name().c_str());
114
115 ReadoutType elem;
116 while (m_run_marker.load()) {
117 auto read_sts = m_file_reader->read(elem);
118
119 if (!read_sts) {
120 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Failed to read, moving to next file";
122 if (!m_is_configured) {
123 break;
124 }
125 continue;
126 }
127
128 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Read element with timestamp " << elem.get_timestamp() << " from file";
129
130 // send it
131 while (m_raw_data_callback == nullptr && m_run_marker.load()) {
134 TLOG_DEBUG(TLVL_WORK_STEPS) << "Sender not set yet, waiting...";
135 std::this_thread::sleep_for(std::chrono::milliseconds(100));
136 }
137
138 if (!m_run_marker.load()) {
139 break;
140 }
141 (*m_raw_data_callback)(std::move(elem));
142
143 // Count packet and limit rate if needed.
146
147 m_rate_limiter->limit();
148 }
149 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " finished";
150}
static std::shared_ptr< DataMoveCallbackRegistry > get()
std::unique_ptr< dunedaq::datahandlinglibs::RateLimiter > m_rate_limiter
std::shared_ptr< std::function< void(ReadoutType &&)> > m_raw_data_callback
const appmodel::DataMoveCallbackConf * m_raw_sender_conf
ReadoutType
Which type of readout to use for TriggerDecision and DataRequest.
Definition Types.hpp:57
std::string to_string() const
Definition SourceID.hpp:83

◆ scrap()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::scrap ( const appfwk::DAQModule::CommandData_t & )
inlinevirtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 61 of file FileSourceModel.hpp.

62 {
63 if (m_file_reader != nullptr) {
64 m_file_reader->close();
65 }
66 m_file_reader.reset();
67 m_is_configured = false;
68 }

◆ set_sender()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::set_sender ( const appmodel::DataMoveCallbackConf * sink)
virtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 18 of file FileSourceModel.hxx.

19{
20 m_raw_sender_conf = sink;
21}

◆ start()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::start ( const appfwk::DAQModule::CommandData_t & )
virtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 76 of file FileSourceModel.hxx.

77{
79 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
80 // FIXME: don't know where to take the slowdown from... m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz /
81 // m_link_conf.slowdown);
82 m_rate_limiter = std::make_unique<datahandlinglibs::RateLimiter>(m_rate_khz);
83 // m_stats_thread.set_work(&FileSourceModel<ReadoutType>::run_stats, this);
85}
bool set_work(Function &&f, Args &&... args)

◆ stop()

template<class ReadoutType >
void dunedaq::snbmodules::FileSourceModel< ReadoutType >::stop ( const appfwk::DAQModule::CommandData_t & )
virtual

Implements dunedaq::snbmodules::FileSourceConcept.

Definition at line 89 of file FileSourceModel.hxx.

90{
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
93 }
94}

Member Data Documentation

◆ m_compression_algorithm

template<class ReadoutType >
std::string dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_compression_algorithm
private

Definition at line 94 of file FileSourceModel.hpp.

◆ m_file_iterator

template<class ReadoutType >
std::vector<std::string>::const_iterator dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_file_iterator
private

Definition at line 92 of file FileSourceModel.hpp.

◆ m_file_names

template<class ReadoutType >
std::vector<std::string> dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_file_names
private

Definition at line 91 of file FileSourceModel.hpp.

◆ m_file_reader

template<class ReadoutType >
std::unique_ptr<dunedaq::datahandlinglibs::BufferedFileReader<ReadoutType> > dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_file_reader
private

Definition at line 114 of file FileSourceModel.hpp.

◆ m_input_buffer_size

template<class ReadoutType >
uint32_t dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_input_buffer_size
private

Definition at line 93 of file FileSourceModel.hpp.

◆ m_is_configured

template<class ReadoutType >
bool dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_is_configured = false
private

Definition at line 88 of file FileSourceModel.hpp.

◆ m_name

template<class ReadoutType >
std::string dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_name
private

Definition at line 87 of file FileSourceModel.hpp.

◆ m_packet_count

template<class ReadoutType >
std::atomic<int> dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_packet_count { 0 }
private

Definition at line 99 of file FileSourceModel.hpp.

99{ 0 };

◆ m_packet_count_tot

template<class ReadoutType >
std::atomic<int> dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_packet_count_tot { 0 }
private

Definition at line 100 of file FileSourceModel.hpp.

100{ 0 };

◆ m_producer_thread

template<class ReadoutType >
utilities::ReusableThread dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_producer_thread
private

Definition at line 116 of file FileSourceModel.hpp.

◆ m_rate_khz

template<class ReadoutType >
double dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_rate_khz
private

Definition at line 89 of file FileSourceModel.hpp.

◆ m_rate_limiter

template<class ReadoutType >
std::unique_ptr<dunedaq::datahandlinglibs::RateLimiter> dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_rate_limiter
private

Definition at line 113 of file FileSourceModel.hpp.

◆ m_raw_data_callback

template<class ReadoutType >
std::shared_ptr<std::function<void(ReadoutType&&)> > dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_raw_data_callback { nullptr }
private

Definition at line 107 of file FileSourceModel.hpp.

107{ nullptr };

◆ m_raw_sender_conf

template<class ReadoutType >
const appmodel::DataMoveCallbackConf* dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_raw_sender_conf { nullptr }
private

Definition at line 106 of file FileSourceModel.hpp.

106{ nullptr };

◆ m_raw_sender_timeout_ms

template<class ReadoutType >
std::chrono::milliseconds dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_raw_sender_timeout_ms
private

Definition at line 105 of file FileSourceModel.hpp.

◆ m_run_marker

template<class ReadoutType >
std::atomic<bool>& dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_run_marker
private

Definition at line 84 of file FileSourceModel.hpp.

◆ m_sourceid

template<class ReadoutType >
daqdataformats::SourceID dunedaq::snbmodules::FileSourceModel< ReadoutType >::m_sourceid
private

Definition at line 96 of file FileSourceModel.hpp.


The documentation for this class was generated from the following files: