Line data Source code
1 : /**
2 : * @file FakeDataProdModule.cpp FakeDataProdModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "FakeDataProdModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "appmodel/FakeDataProdModule.hpp"
13 : #include "confmodel/Connection.hpp"
14 : #include "confmodel/DetectorConfig.hpp"
15 : #include "dfmessages/Fragment_serialization.hpp"
16 : #include "dfmessages/TimeSync.hpp"
17 : #include "iomanager/IOManager.hpp"
18 : #include "logging/Logging.hpp"
19 :
20 : #include <chrono>
21 : #include <cstdlib>
22 : #include <memory>
23 : #include <string>
24 : #include <thread>
25 : #include <utility>
26 : #include <vector>
27 :
28 : /**
29 : * @brief Name used by TRACE TLOG calls from this source file
30 : */
31 : #define TRACE_NAME "FakeDataProdModule" // NOLINT
32 : enum
33 : {
34 : TLVL_ENTER_EXIT_METHODS = 5,
35 : TLVL_CONFIG = 7,
36 : TLVL_WORK_STEPS = 10,
37 : TLVL_TIME_SYNCS = 12
38 : };
39 :
40 : namespace dunedaq {
41 : namespace dfmodules {
42 :
43 0 : FakeDataProdModule::FakeDataProdModule(const std::string& name)
44 : : dunedaq::appfwk::DAQModule(name)
45 0 : , m_timesync_thread(std::bind(&FakeDataProdModule::do_timesync, this, std::placeholders::_1))
46 0 : , m_queue_timeout(100)
47 0 : , m_run_number(0)
48 : {
49 0 : register_command("conf", &FakeDataProdModule::do_conf);
50 0 : register_command("start", &FakeDataProdModule::do_start);
51 0 : register_command("stop", &FakeDataProdModule::do_stop);
52 0 : }
53 :
54 : void
55 0 : FakeDataProdModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
56 : {
57 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
58 :
59 0 : auto clock_speed_hz = mcfg->get_session()->get_detector_configuration()->get_clock_speed_hz();
60 0 : m_timestamp_estimator = std::make_unique<utilities::TimestampEstimatorSystem>(clock_speed_hz);
61 :
62 0 : auto mdal = mcfg->get_dal<appmodel::FakeDataProdModule>(get_name());
63 0 : if (!mdal) {
64 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
65 : }
66 :
67 0 : auto inputs = mdal->get_inputs();
68 0 : auto outputs = mdal->get_outputs();
69 :
70 0 : if (inputs[0]->get_data_type() != datatype_to_string<dfmessages::DataRequest>()) {
71 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "DataRequest Input queue");
72 : }
73 0 : m_data_request_id = inputs[0]->UID();
74 :
75 0 : for (auto con : outputs) {
76 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TimeSync>()) {
77 0 : m_timesync_id = con->UID();
78 : }
79 : }
80 0 : m_fake_data_prod_conf = mdal->get_configuration();
81 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
82 0 : }
83 :
84 : void
85 0 : FakeDataProdModule::do_conf(const CommandData_t&)
86 : {
87 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
88 :
89 0 : m_sourceid.subsystem = daqdataformats::SourceID::string_to_subsystem(m_fake_data_prod_conf->get_system_type());
90 0 : m_sourceid.id = m_fake_data_prod_conf->get_source_id();
91 0 : m_time_tick_diff = m_fake_data_prod_conf->get_time_tick_diff();
92 0 : m_frame_size = m_fake_data_prod_conf->get_frame_size();
93 0 : m_response_delay = m_fake_data_prod_conf->get_response_delay();
94 0 : m_fragment_type = daqdataformats::string_to_fragment_type(m_fake_data_prod_conf->get_fragment_type());
95 :
96 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": configured for link number " << m_sourceid.id;
97 :
98 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
99 0 : }
100 :
101 : void
102 0 : FakeDataProdModule::do_start(const CommandData_t& payload)
103 : {
104 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
105 0 : m_sent_fragments = 0;
106 0 : m_received_requests = 0;
107 0 : m_run_number = payload.value<dunedaq::daqdataformats::run_number_t>("run", 0);
108 :
109 0 : m_timesync_thread.start_working_thread();
110 :
111 0 : auto iom = iomanager::IOManager::get();
112 0 : iom->add_callback<dfmessages::DataRequest>(
113 0 : m_data_request_id, std::bind(&FakeDataProdModule::process_data_request, this, std::placeholders::_1));
114 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
115 0 : }
116 :
117 : void
118 0 : FakeDataProdModule::do_stop(const CommandData_t& /*args*/)
119 : {
120 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
121 0 : m_timesync_thread.stop_working_thread();
122 :
123 0 : auto iom = iomanager::IOManager::get();
124 0 : iom->remove_callback<dfmessages::DataRequest>(m_data_request_id);
125 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
126 0 : }
127 :
128 : // void
129 : // FakeDataProdModule::get_info(opmonlib::InfoCollector& ci, int /*level*/)
130 : // {
131 : // fakedataprodinfo::Info info;
132 : // info.requests_received = m_received_requests;
133 : // info.fragments_sent = m_sent_fragments;
134 : // ci.add(info);
135 : // }
136 :
137 : void
138 0 : FakeDataProdModule::do_timesync(std::atomic<bool>& running_flag)
139 : {
140 0 : while (m_timestamp_estimator == nullptr) {
141 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
142 : }
143 0 : auto iom = iomanager::IOManager::get();
144 0 : auto sender_ptr = iom->get_sender<dfmessages::TimeSync>(m_timesync_id);
145 :
146 0 : int sent_count = 0;
147 0 : uint64_t msg_seqno = 0; // NOLINT (build/unsigned)
148 0 : while (running_flag.load()) {
149 0 : auto timesyncmsg = dfmessages::TimeSync(m_timestamp_estimator->get_timestamp_estimate());
150 0 : ++msg_seqno;
151 0 : timesyncmsg.run_number = m_run_number;
152 0 : timesyncmsg.sequence_number = msg_seqno;
153 0 : timesyncmsg.source_id = m_sourceid.id;
154 0 : TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time << " wall=" << timesyncmsg.system_time
155 0 : << " run=" << timesyncmsg.run_number << " seqno=" << timesyncmsg.sequence_number
156 0 : << " source_id=" << timesyncmsg.source_id;
157 0 : try {
158 0 : sender_ptr->send(std::move(timesyncmsg), std::chrono::milliseconds(500));
159 0 : ++sent_count;
160 0 : } catch (ers::Issue& excpt) {
161 0 : ers::warning(TimeSyncTransmissionFailed(ERS_HERE, get_name(), m_timesync_id, excpt));
162 0 : }
163 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
164 : }
165 0 : TLOG() << get_name() << ": sent " << sent_count << " TimeSync messages.";
166 0 : }
167 :
168 : void
169 0 : FakeDataProdModule::process_data_request(dfmessages::DataRequest& data_request)
170 : {
171 :
172 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": processsing request " << data_request.request_number;
173 :
174 0 : m_received_requests++;
175 :
176 : // num_frames_to_send = ⌈window_size / tick_diff⌉
177 0 : size_t num_frames_to_send = (data_request.request_information.window_end -
178 0 : data_request.request_information.window_begin + m_time_tick_diff - 1) /
179 : m_time_tick_diff;
180 0 : size_t num_bytes_to_send = num_frames_to_send * m_frame_size;
181 :
182 : // We don't care about the content of the data, but the size should be correct
183 :
184 0 : std::vector<uint8_t> fake_data; // NOLINT (build/unsigned)
185 0 : try {
186 0 : fake_data.resize(num_bytes_to_send);
187 0 : } catch (const std::bad_alloc&) {
188 0 : throw dunedaq::dfmodules::MemoryAllocationFailed(ERS_HERE, get_name(), num_bytes_to_send);
189 0 : }
190 :
191 0 : auto data_fragment_ptr = std::make_unique<daqdataformats::Fragment>(fake_data.data(), num_bytes_to_send);
192 :
193 0 : data_fragment_ptr->set_trigger_number(data_request.trigger_number);
194 0 : data_fragment_ptr->set_run_number(m_run_number);
195 0 : data_fragment_ptr->set_element_id(m_sourceid);
196 0 : data_fragment_ptr->set_error_bits(0);
197 0 : data_fragment_ptr->set_type(m_fragment_type);
198 0 : data_fragment_ptr->set_trigger_timestamp(data_request.trigger_timestamp);
199 0 : data_fragment_ptr->set_window_begin(data_request.request_information.window_begin);
200 0 : data_fragment_ptr->set_window_end(data_request.request_information.window_end);
201 0 : data_fragment_ptr->set_sequence_number(data_request.sequence_number);
202 :
203 0 : if (m_response_delay > 0) {
204 0 : std::this_thread::sleep_for(std::chrono::nanoseconds(m_response_delay));
205 : }
206 :
207 0 : try {
208 0 : auto iom = iomanager::IOManager::get();
209 0 : iom->get_sender<std::unique_ptr<daqdataformats::Fragment>>(data_request.data_destination)
210 0 : ->send(std::move(data_fragment_ptr), std::chrono::milliseconds(1000));
211 0 : } catch (ers::Issue& e) {
212 0 : ers::warning(FragmentTransmissionFailed(ERS_HERE, get_name(), data_request.trigger_number, e));
213 0 : }
214 :
215 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": finishing processing request " << data_request.request_number;
216 0 : }
217 :
218 : } // namespace dfmodules
219 : } // namespace dunedaq
220 :
221 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::FakeDataProdModule)
|