DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceEmulatorModel.hxx
Go to the documentation of this file.
1// Declarations for SourceEmulatorModel
2
5
6namespace dunedaq {
7namespace datahandlinglibs {
8
9void
11{
12 //TLOG() << "Generate random ADC patterns" ;
13 std::srand(source_id*12345);
14 m_channel.reserve(size);
15 for (int i = 0; i < size; i++) {
16 int random_ch = std::rand()%64;
17 m_channel.push_back(random_ch);
18 }
19}
20
21template<class ReadoutType>
22void
24{
25 if (!m_sender_is_set) {
26 m_raw_data_callback = datahandlinglibs::DataMoveCallbackRegistry::get()->get_callback<ReadoutType>(conf);
27 m_sender_is_set = true;
28 } else {
29 // ers::error();
30 }
31}
32
33template<class ReadoutType>
34void
36{
37 if (m_is_configured) {
38 TLOG_DEBUG(TLVL_WORK_STEPS) << "This emulator is already configured!";
39 } else {
40 //m_conf = args.get<module_conf_t>();
41 //m_link_conf = link_conf.get<link_conf_t>();
42
43 std::mt19937 mt(rand()); // NOLINT(runtime/threadsafe_fn)
44 std::uniform_real_distribution<double> dis(0.0, 1.0);
45
46 m_sourceid.id = link_conf->get_source_id();
47 m_sourceid.subsystem = ReadoutType::subsystem;
48
49 m_crateid = link_conf->get_geo_id()->get_crate_id();
50 m_slotid = link_conf->get_geo_id()->get_slot_id();
51 m_linkid = link_conf->get_geo_id()->get_stream_id();
52
53 m_t0_now = emu_params->get_set_t0();
54 m_file_source = std::make_unique<FileSourceBuffer>(emu_params->get_input_file_size_limit(), sizeof(ReadoutType));
55 try {
56 m_file_source->read(emu_params->get_data_file_name());
57 } catch (const ers::Issue& ex) {
58 ers::fatal(ex);
59 throw ConfigurationError(ERS_HERE, m_sourceid, "", ex);
60 }
61 m_dropouts_length = emu_params->get_random_population_size();
62 if (m_dropout_rate == 0.0) {
63 m_dropouts = std::vector<bool>(1);
64 } else {
65 m_dropouts = std::vector<bool>(m_dropouts_length);
66 }
67 for (size_t i = 0; i < m_dropouts.size(); ++i) {
68 m_dropouts[i] = dis(mt) >= m_dropout_rate;
69 }
70
71 m_frame_errors_length = emu_params->get_random_population_size();
72 m_frame_error_rate = emu_params->get_frame_error_rate_hz();
73 m_error_bit_generator = ErrorBitGenerator(m_frame_error_rate);
74 m_error_bit_generator.generate();
75
76 // Generate random ADC pattern
77 m_generate_periodic_adc_pattern = emu_params->get_generate_periodic_adc_pattern();
78 auto vec_size = emu_params->get_random_population_size();
79 if (m_generate_periodic_adc_pattern) {
80 TLOG() << "Generated pattern.";
81 m_pattern_generator.generate(m_sourceid.id, vec_size);
82
83 if (emu_params->get_TP_rate_per_channel() != 0) {
84 TLOG() << "TP rate per channel multiplier (base of 100 Hz/ch): " << emu_params->get_TP_rate_per_channel();
85 // Define time to wait when adding an ADC above threshold
86 // Adding a hit every 9768 gives a total Sent TP rate of approx 100 Hz/wire with WIBEth
87 m_time_to_wait = m_time_to_wait / emu_params->get_TP_rate_per_channel();
88 }
89 }
90
91 m_is_configured = true;
92 }
93 // Configure thread:
94 m_producer_thread.set_name("fakeprod", m_sourceid.id);
95}
96
97template<class ReadoutType>
98void
99SourceEmulatorModel<ReadoutType>::start(const appfwk::DAQModule::CommandData_t& /*args*/)
100{
101 m_packet_count_tot = 0;
102 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
103 // FIXME: don't know where to take the slowdown from... m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz / m_link_conf.slowdown);
104 m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz);
105 // m_stats_thread.set_work(&SourceEmulatorModel<ReadoutType>::run_stats, this);
106 m_producer_thread.set_work(&SourceEmulatorModel<ReadoutType>::run_produce, this);
107}
108
109template<class ReadoutType>
110void
111SourceEmulatorModel<ReadoutType>::stop(const appfwk::DAQModule::CommandData_t& /*args*/)
112{
113 while (!m_producer_thread.get_readiness()) {
114 std::this_thread::sleep_for(std::chrono::milliseconds(100));
115 }
116}
117
118template<class ReadoutType>
119void
121{
123 info.set_sum_packets(m_packet_count_tot.load());
124 info.set_num_packets(m_packet_count.exchange(0));
125
126 this->publish(std::move(info));
127}
128
129template<class ReadoutType>
130void
132{
133 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_this_link_number << " started";
134
135 // pthread_setname_np(pthread_self(), get_name().c_str());
136
137 uint offset = 0; // NOLINT(build/unsigned)
138 auto& source = m_file_source->get();
139
140 uint num_elem = m_file_source->num_elements();
141 if (num_elem == 0) {
142 TLOG_DEBUG(TLVL_WORK_STEPS) << "No elements to read from buffer! Sleeping...";
143 std::this_thread::sleep_for(std::chrono::milliseconds(100));
144 num_elem = m_file_source->num_elements();
145 }
146
147 auto rptr = reinterpret_cast<ReadoutType*>(source.data()); // NOLINT
148
149 // set the initial timestamp to a configured value, otherwise just use the timestamp from the header
150 uint64_t ts_0 = rptr->get_timestamp(); // NOLINT(build/unsigned)
151 if (m_t0_now) {
152 auto time_now = std::chrono::system_clock::now().time_since_epoch();
153 uint64_t current_time = // NOLINT (build/unsigned)
154 std::chrono::duration_cast<std::chrono::microseconds>(time_now).count();
155 // FIXME: where do I get the clockspeed from?
156 // ts_0 = (m_conf.clock_speed_hz / 100000) * current_time;
157 ts_0 = 625 * current_time / 10;
158
159 }
160 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Using first timestamp: " << ts_0;
161 uint64_t timestamp = ts_0; // NOLINT(build/unsigned)
162 int dropout_index = 0;
163
164 while (m_run_marker.load()) {
165 // TLOG() << "Generating " << m_frames_per_tick << " for TS " << timestamp;
166 for (uint16_t i = 0; i < m_frames_per_tick; i++) {
167 // Which element to push to the buffer
168 if (offset == num_elem || (offset + 1) * sizeof(ReadoutType) > source.size()) {
169 offset = 0;
170 }
171
172 bool create_frame = m_dropouts[dropout_index]; // NOLINT(runtime/threadsafe_fn)
173 dropout_index = (dropout_index + 1) % m_dropouts.size();
174 if (create_frame) {
175 ReadoutType payload;
176 // Memcpy from file buffer to flat char array
177 ::memcpy(static_cast<void*>(&payload),
178 static_cast<void*>(source.data() + offset * sizeof(ReadoutType)),
179 sizeof(ReadoutType));
180
181 // Fake timestamp
182 payload.fake_timestamps(timestamp, m_time_tick_diff);
183
184 // Fake geoid
185 payload.fake_geoid(m_crateid, m_slotid, m_linkid);
186
187 // Introducing frame errors
188 std::vector<uint16_t> frame_errs; // NOLINT(build/unsigned)
189 for (size_t i = 0; i < rptr->get_num_frames(); ++i) {
190 frame_errs.push_back(m_error_bit_generator.next());
191 }
192 payload.fake_frame_errors(&frame_errs);
193
194 if (m_generate_periodic_adc_pattern) {
195 if (timestamp - m_pattern_generator_previous_ts > m_time_to_wait) {
196
197 /* Reset the pattern from the beginning if it reaches the maximum
198 m_pattern_index++;
199 if (m_pattern_index == m_pattern_generator.get_total_size()) {
200 m_pattern_index = 0;
201 }
202 */
203 // Set the ADC to the uint16 maximum value
204 try {
205 payload.fake_adc_pattern(m_pattern_generator.get_channel_number());
206 }
207 catch (std::exception & ex) {
208 //FIXME: should not happen
209 }
210
211 //TLOG() << "Lift channel " << channel;
212
213 // Update the previous timestamp of the pattern generator
214 m_pattern_generator_previous_ts = timestamp;
215
216 } // timestamp difference
217 }
218
219 // send it
220 try {
221 (*m_raw_data_callback)(std::move(payload));
222 } catch (ers::Issue& excpt) {
223 ers::warning(CannotWriteToQueue(ERS_HERE, m_sourceid, "raw data input queue", excpt));
224 // std::runtime_error("Queue timed out...");
225 }
226
227 // Count packet and limit rate if needed.
228 ++offset;
229 ++m_packet_count;
230 ++m_packet_count_tot;
231
232
233 }
234 }
235 timestamp += m_time_tick_diff * rptr->get_num_frames();
236
237
238
239 m_rate_limiter->limit();
240 }
241 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " finished";
242}
243
244} // namespace datahandlinglibs
245} // namespace dunedaq
#define ERS_HERE
uint32_t get_random_population_size() const
Get "random_population_size" attribute value.
bool get_generate_periodic_adc_pattern() const
Get "generate_periodic_adc_pattern" attribute value.
bool get_set_t0() const
Get "set_t0" attribute value. Set first timestamp to now.
uint32_t get_input_file_size_limit() const
Get "input_file_size_limit" attribute value.
float get_frame_error_rate_hz() const
Get "frame_error_rate_hz" attribute value.
const std::string & get_data_file_name() const
Get "data_file_name" attribute value.
float get_TP_rate_per_channel() const
Get "TP_rate_per_channel" attribute value. TP rate per channel in units of 100 Hz.
const dunedaq::confmodel::GeoId * get_geo_id() const
Get "geo_id" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_stream_id() const
Get "stream_id" attribute value.
Definition GeoId.hpp:196
uint32_t get_slot_id() const
Get "slot_id" attribute value.
Definition GeoId.hpp:165
uint32_t get_crate_id() const
Get "crate_id" attribute value.
Definition GeoId.hpp:134
static std::shared_ptr< DataMoveCallbackRegistry > get()
void stop(const appfwk::DAQModule::CommandData_t &)
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::StreamEmulationParameters *emu_conf)
void start(const appfwk::DAQModule::CommandData_t &)
void set_sender(const appmodel::DataMoveCallbackConf *conf)
Base class for any user define issue.
Definition Issue.hpp:69
double offset
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88