DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceEmulatorModel.hxx
Go to the documentation of this file.
1// Declarations for SourceEmulatorModel
2
5
6namespace dunedaq {
7namespace datahandlinglibs {
8
9void
11{
12 //TLOG() << "Generate random ADC patterns" ;
13 std::srand(source_id*12345);
14 m_channel.reserve(size);
15 for (int i = 0; i < size; i++) {
16 int random_ch = std::rand()%64;
17 m_channel.push_back(random_ch);
18 }
19}
20
21template<class ReadoutType>
22void
24{
25 if (!m_sender_is_set) {
26 m_raw_data_sender = get_iom_sender<ReadoutType>(conn_name);
27 m_sender_is_set = true;
28 } else {
29 // ers::error();
30 }
31}
32
33template<class ReadoutType>
34void
36{
37 if (m_is_configured) {
38 TLOG_DEBUG(TLVL_WORK_STEPS) << "This emulator is already configured!";
39 } else {
40 //m_conf = args.get<module_conf_t>();
41 //m_link_conf = link_conf.get<link_conf_t>();
42 m_raw_sender_timeout_ms = std::chrono::milliseconds(1);
43
44 std::mt19937 mt(rand()); // NOLINT(runtime/threadsafe_fn)
45 std::uniform_real_distribution<double> dis(0.0, 1.0);
46
47 m_sourceid.id = link_conf->get_source_id();
48 m_sourceid.subsystem = ReadoutType::subsystem;
49
50 m_crateid = link_conf->get_geo_id()->get_crate_id();
51 m_slotid = link_conf->get_geo_id()->get_slot_id();
52 m_linkid = link_conf->get_geo_id()->get_stream_id();
53
54 m_t0_now = emu_params->get_set_t0();
55 m_file_source = std::make_unique<FileSourceBuffer>(emu_params->get_input_file_size_limit(), sizeof(ReadoutType));
56 try {
57 m_file_source->read(emu_params->get_data_file_name());
58 } catch (const ers::Issue& ex) {
59 ers::fatal(ex);
60 throw ConfigurationError(ERS_HERE, m_sourceid, "", ex);
61 }
62 m_dropouts_length = emu_params->get_random_population_size();
63 if (m_dropout_rate == 0.0) {
64 m_dropouts = std::vector<bool>(1);
65 } else {
66 m_dropouts = std::vector<bool>(m_dropouts_length);
67 }
68 for (size_t i = 0; i < m_dropouts.size(); ++i) {
69 m_dropouts[i] = dis(mt) >= m_dropout_rate;
70 }
71
72 m_frame_errors_length = emu_params->get_random_population_size();
73 m_frame_error_rate = emu_params->get_frame_error_rate_hz();
74 m_error_bit_generator = ErrorBitGenerator(m_frame_error_rate);
75 m_error_bit_generator.generate();
76
77 // Generate random ADC pattern
78 m_generate_periodic_adc_pattern = emu_params->get_generate_periodic_adc_pattern();
79 auto vec_size = emu_params->get_random_population_size();
80 if (m_generate_periodic_adc_pattern) {
81 TLOG() << "Generated pattern.";
82 m_pattern_generator.generate(m_sourceid.id, vec_size);
83
84 if (emu_params->get_TP_rate_per_channel() != 0) {
85 TLOG() << "TP rate per channel multiplier (base of 100 Hz/ch): " << emu_params->get_TP_rate_per_channel();
86 // Define time to wait when adding an ADC above threshold
87 // Adding a hit every 9768 gives a total Sent TP rate of approx 100 Hz/wire with WIBEth
88 m_time_to_wait = m_time_to_wait / emu_params->get_TP_rate_per_channel();
89 }
90 }
91
92 m_is_configured = true;
93 }
94 // Configure thread:
95 m_producer_thread.set_name("fakeprod", m_sourceid.id);
96}
97
98template<class ReadoutType>
99void
100SourceEmulatorModel<ReadoutType>::start(const nlohmann::json& /*args*/)
101{
102 m_packet_count_tot = 0;
103 TLOG_DEBUG(TLVL_WORK_STEPS) << "Starting threads...";
104 // FIXME: don't know where to take the slowdown from... m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz / m_link_conf.slowdown);
105 m_rate_limiter = std::make_unique<RateLimiter>(m_rate_khz);
106 // m_stats_thread.set_work(&SourceEmulatorModel<ReadoutType>::run_stats, this);
107 m_producer_thread.set_work(&SourceEmulatorModel<ReadoutType>::run_produce, this);
108}
109
110template<class ReadoutType>
111void
112SourceEmulatorModel<ReadoutType>::stop(const nlohmann::json& /*args*/)
113{
114 while (!m_producer_thread.get_readiness()) {
115 std::this_thread::sleep_for(std::chrono::milliseconds(100));
116 }
117}
118
119template<class ReadoutType>
120void
122{
124 info.set_sum_packets(m_packet_count_tot.load());
125 info.set_num_packets(m_packet_count.exchange(0));
126
127 this->publish(std::move(info));
128}
129
130template<class ReadoutType>
131void
133{
134 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_this_link_number << " started";
135
136 // pthread_setname_np(pthread_self(), get_name().c_str());
137
138 uint offset = 0; // NOLINT(build/unsigned)
139 auto& source = m_file_source->get();
140
141 uint num_elem = m_file_source->num_elements();
142 if (num_elem == 0) {
143 TLOG_DEBUG(TLVL_WORK_STEPS) << "No elements to read from buffer! Sleeping...";
144 std::this_thread::sleep_for(std::chrono::milliseconds(100));
145 num_elem = m_file_source->num_elements();
146 }
147
148 auto rptr = reinterpret_cast<ReadoutType*>(source.data()); // NOLINT
149
150 // set the initial timestamp to a configured value, otherwise just use the timestamp from the header
151 uint64_t ts_0 = rptr->get_timestamp(); // NOLINT(build/unsigned)
152 if (m_t0_now) {
153 auto time_now = std::chrono::system_clock::now().time_since_epoch();
154 uint64_t current_time = // NOLINT (build/unsigned)
155 std::chrono::duration_cast<std::chrono::microseconds>(time_now).count();
156 // FIXME: where do I get the clockspeed from?
157 // ts_0 = (m_conf.clock_speed_hz / 100000) * current_time;
158 ts_0 = 625 * current_time / 10;
159
160 }
161 TLOG_DEBUG(TLVL_BOOKKEEPING) << "Using first timestamp: " << ts_0;
162 uint64_t timestamp = ts_0; // NOLINT(build/unsigned)
163 int dropout_index = 0;
164
165 while (m_run_marker.load()) {
166 // TLOG() << "Generating " << m_frames_per_tick << " for TS " << timestamp;
167 for (uint16_t i = 0; i < m_frames_per_tick; i++) {
168 // Which element to push to the buffer
169 if (offset == num_elem || (offset + 1) * sizeof(ReadoutType) > source.size()) {
170 offset = 0;
171 }
172
173 bool create_frame = m_dropouts[dropout_index]; // NOLINT(runtime/threadsafe_fn)
174 dropout_index = (dropout_index + 1) % m_dropouts.size();
175 if (create_frame) {
176 ReadoutType payload;
177 // Memcpy from file buffer to flat char array
178 ::memcpy(static_cast<void*>(&payload),
179 static_cast<void*>(source.data() + offset * sizeof(ReadoutType)),
180 sizeof(ReadoutType));
181
182 // Fake timestamp
183 payload.fake_timestamps(timestamp, m_time_tick_diff);
184
185 // Fake geoid
186 payload.fake_geoid(m_crateid, m_slotid, m_linkid);
187
188 // Introducing frame errors
189 std::vector<uint16_t> frame_errs; // NOLINT(build/unsigned)
190 for (size_t i = 0; i < rptr->get_num_frames(); ++i) {
191 frame_errs.push_back(m_error_bit_generator.next());
192 }
193 payload.fake_frame_errors(&frame_errs);
194
195 if (m_generate_periodic_adc_pattern) {
196 if (timestamp - m_pattern_generator_previous_ts > m_time_to_wait) {
197
198 /* Reset the pattern from the beginning if it reaches the maximum
199 m_pattern_index++;
200 if (m_pattern_index == m_pattern_generator.get_total_size()) {
201 m_pattern_index = 0;
202 }
203 */
204 // Set the ADC to the uint16 maximum value
205 try {
206 payload.fake_adc_pattern(m_pattern_generator.get_channel_number());
207 }
208 catch (std::exception & ex) {
209 //FIXME: should not happen
210 }
211
212 //TLOG() << "Lift channel " << channel;
213
214 // Update the previous timestamp of the pattern generator
215 m_pattern_generator_previous_ts = timestamp;
216
217 } // timestamp difference
218 }
219
220 // send it
221 try {
222 m_raw_data_sender->send(std::move(payload), m_raw_sender_timeout_ms);
223 } catch (ers::Issue& excpt) {
224 ers::warning(CannotWriteToQueue(ERS_HERE, m_sourceid, "raw data input queue", excpt));
225 // std::runtime_error("Queue timed out...");
226 }
227
228 // Count packet and limit rate if needed.
229 ++offset;
230 ++m_packet_count;
231 ++m_packet_count_tot;
232
233
234 }
235 }
236 timestamp += m_time_tick_diff * rptr->get_num_frames();
237
238
239
240 m_rate_limiter->limit();
241 }
242 TLOG_DEBUG(TLVL_WORK_STEPS) << "Data generation thread " << m_sourceid.to_string() << " finished";
243}
244
245} // namespace datahandlinglibs
246} // namespace dunedaq
#define ERS_HERE
uint32_t get_random_population_size() const
Get "random_population_size" attribute value.
bool get_generate_periodic_adc_pattern() const
Get "generate_periodic_adc_pattern" attribute value.
bool get_set_t0() const
Get "set_t0" attribute value. Set first timestamp to now.
uint32_t get_input_file_size_limit() const
Get "input_file_size_limit" attribute value.
float get_frame_error_rate_hz() const
Get "frame_error_rate_hz" attribute value.
const std::string & get_data_file_name() const
Get "data_file_name" attribute value.
float get_TP_rate_per_channel() const
Get "TP_rate_per_channel" attribute value. TP rate per channel in units of 100 Hz.
const dunedaq::confmodel::GeoId * get_geo_id() const
Get "geo_id" relationship value.
uint32_t get_source_id() const
Get "source_id" attribute value.
uint32_t get_stream_id() const
Get "stream_id" attribute value.
Definition GeoId.hpp:196
uint32_t get_slot_id() const
Get "slot_id" attribute value.
Definition GeoId.hpp:165
uint32_t get_crate_id() const
Get "crate_id" attribute value.
Definition GeoId.hpp:134
void conf(const confmodel::DetectorStream *stream_conf, const appmodel::StreamEmulationParameters *emu_conf)
void set_sender(const std::string &conn_name)
Base class for any user define issue.
Definition Issue.hpp:69
double offset
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
Including Qt Headers.
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
ConfigurationError
Definition util.hpp:27
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115
void fatal(const Issue &issue)
Definition ers.hpp:88