Line data Source code
1 : /**
2 : * @file FakeDataProdModule.cpp FakeDataProdModule class implementation
3 : *
4 : * This is part of the DUNE DAQ Software Suite, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "FakeDataProdModule.hpp"
10 : #include "dfmodules/CommonIssues.hpp"
11 :
12 : #include "appmodel/FakeDataProdModule.hpp"
13 : #include "confmodel/Connection.hpp"
14 : #include "dfmessages/Fragment_serialization.hpp"
15 : #include "dfmessages/TimeSync.hpp"
16 : #include "iomanager/IOManager.hpp"
17 : #include "logging/Logging.hpp"
18 :
19 : #include <chrono>
20 : #include <cstdlib>
21 : #include <memory>
22 : #include <string>
23 : #include <thread>
24 : #include <utility>
25 : #include <vector>
26 :
27 : /**
28 : * @brief Name used by TRACE TLOG calls from this source file
29 : */
30 : #define TRACE_NAME "FakeDataProdModule" // NOLINT
31 : enum
32 : {
33 : TLVL_ENTER_EXIT_METHODS = 5,
34 : TLVL_CONFIG = 7,
35 : TLVL_WORK_STEPS = 10,
36 : TLVL_TIME_SYNCS = 12
37 : };
38 :
39 : namespace dunedaq {
40 : namespace dfmodules {
41 :
42 0 : FakeDataProdModule::FakeDataProdModule(const std::string& name)
43 : : dunedaq::appfwk::DAQModule(name)
44 0 : , m_timesync_thread(std::bind(&FakeDataProdModule::do_timesync, this, std::placeholders::_1))
45 0 : , m_queue_timeout(100)
46 0 : , m_run_number(0)
47 : {
48 0 : register_command("conf", &FakeDataProdModule::do_conf);
49 0 : register_command("start", &FakeDataProdModule::do_start);
50 0 : register_command("stop", &FakeDataProdModule::do_stop);
51 0 : }
52 :
53 : void
54 0 : FakeDataProdModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcfg)
55 : {
56 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method";
57 0 : auto mdal = mcfg->get_dal<appmodel::FakeDataProdModule>(get_name());
58 0 : if (!mdal) {
59 0 : throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object");
60 : }
61 :
62 0 : auto inputs = mdal->get_inputs();
63 0 : auto outputs = mdal->get_outputs();
64 :
65 0 : if (inputs[0]->get_data_type() != datatype_to_string<dfmessages::DataRequest>()) {
66 0 : throw InvalidQueueFatalError(ERS_HERE, get_name(), "DataRequest Input queue");
67 : }
68 0 : m_data_request_id = inputs[0]->UID();
69 :
70 0 : for (auto con : outputs) {
71 0 : if (con->get_data_type() == datatype_to_string<dfmessages::TimeSync>()) {
72 0 : m_timesync_id = con->UID();
73 : }
74 : }
75 0 : m_fake_data_prod_conf = mdal->get_configuration();
76 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method";
77 0 : }
78 :
79 : void
80 0 : FakeDataProdModule::do_conf(const CommandData_t&)
81 : {
82 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method";
83 :
84 0 : m_sourceid.subsystem = daqdataformats::SourceID::string_to_subsystem(m_fake_data_prod_conf->get_system_type());
85 0 : m_sourceid.id = m_fake_data_prod_conf->get_source_id();
86 0 : m_time_tick_diff = m_fake_data_prod_conf->get_time_tick_diff();
87 0 : m_frame_size = m_fake_data_prod_conf->get_frame_size();
88 0 : m_response_delay = m_fake_data_prod_conf->get_response_delay();
89 0 : m_fragment_type = daqdataformats::string_to_fragment_type(m_fake_data_prod_conf->get_fragment_type());
90 :
91 0 : TLOG_DEBUG(TLVL_CONFIG) << get_name() << ": configured for link number " << m_sourceid.id;
92 :
93 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method";
94 0 : }
95 :
96 : void
97 0 : FakeDataProdModule::do_start(const CommandData_t& payload)
98 : {
99 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
100 0 : m_sent_fragments = 0;
101 0 : m_received_requests = 0;
102 0 : m_run_number = payload.value<dunedaq::daqdataformats::run_number_t>("run", 0);
103 :
104 0 : m_timesync_thread.start_working_thread();
105 :
106 0 : auto iom = iomanager::IOManager::get();
107 0 : iom->add_callback<dfmessages::DataRequest>(
108 0 : m_data_request_id, std::bind(&FakeDataProdModule::process_data_request, this, std::placeholders::_1));
109 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method";
110 0 : }
111 :
112 : void
113 0 : FakeDataProdModule::do_stop(const CommandData_t& /*args*/)
114 : {
115 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method";
116 0 : m_timesync_thread.stop_working_thread();
117 :
118 0 : auto iom = iomanager::IOManager::get();
119 0 : iom->remove_callback<dfmessages::DataRequest>(m_data_request_id);
120 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method";
121 0 : }
122 :
123 : // void
124 : // FakeDataProdModule::get_info(opmonlib::InfoCollector& ci, int /*level*/)
125 : // {
126 : // fakedataprodinfo::Info info;
127 : // info.requests_received = m_received_requests;
128 : // info.fragments_sent = m_sent_fragments;
129 : // ci.add(info);
130 : // }
131 :
132 : void
133 0 : FakeDataProdModule::do_timesync(std::atomic<bool>& running_flag)
134 : {
135 :
136 0 : auto iom = iomanager::IOManager::get();
137 0 : auto sender_ptr = iom->get_sender<dfmessages::TimeSync>(m_timesync_id);
138 :
139 0 : int sent_count = 0;
140 0 : uint64_t msg_seqno = 0; // NOLINT (build/unsigned)
141 0 : while (running_flag.load()) {
142 0 : auto time_now = std::chrono::system_clock::now().time_since_epoch();
143 0 : uint64_t current_timestamp = // NOLINT (build/unsigned)
144 0 : std::chrono::duration_cast<std::chrono::nanoseconds>(time_now).count();
145 0 : auto timesyncmsg = dfmessages::TimeSync(current_timestamp);
146 0 : ++msg_seqno;
147 0 : timesyncmsg.run_number = m_run_number;
148 0 : timesyncmsg.sequence_number = msg_seqno;
149 0 : timesyncmsg.source_id = m_sourceid.id;
150 0 : TLOG_DEBUG(TLVL_TIME_SYNCS) << "New timesync: daq=" << timesyncmsg.daq_time << " wall=" << timesyncmsg.system_time
151 0 : << " run=" << timesyncmsg.run_number << " seqno=" << timesyncmsg.sequence_number
152 0 : << " source_id=" << timesyncmsg.source_id;
153 0 : try {
154 0 : sender_ptr->send(std::move(timesyncmsg), std::chrono::milliseconds(500));
155 0 : ++sent_count;
156 0 : } catch (ers::Issue& excpt) {
157 0 : ers::warning(TimeSyncTransmissionFailed(ERS_HERE, get_name(), m_timesync_id, excpt));
158 0 : }
159 0 : std::this_thread::sleep_for(std::chrono::milliseconds(100));
160 : }
161 0 : TLOG() << get_name() << ": sent " << sent_count << " TimeSync messages.";
162 0 : }
163 :
164 : void
165 0 : FakeDataProdModule::process_data_request(dfmessages::DataRequest& data_request)
166 : {
167 :
168 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": processsing request " << data_request.request_number;
169 :
170 0 : m_received_requests++;
171 :
172 : // num_frames_to_send = ⌈window_size / tick_diff⌉
173 0 : size_t num_frames_to_send = (data_request.request_information.window_end -
174 0 : data_request.request_information.window_begin + m_time_tick_diff - 1) /
175 : m_time_tick_diff;
176 0 : size_t num_bytes_to_send = num_frames_to_send * m_frame_size;
177 :
178 : // We don't care about the content of the data, but the size should be correct
179 :
180 0 : std::vector<uint8_t> fake_data; // NOLINT (build/unsigned)
181 0 : try {
182 0 : fake_data.resize(num_bytes_to_send);
183 0 : } catch (const std::bad_alloc&) {
184 0 : throw dunedaq::dfmodules::MemoryAllocationFailed(ERS_HERE, get_name(), num_bytes_to_send);
185 0 : }
186 :
187 0 : auto data_fragment_ptr = std::make_unique<daqdataformats::Fragment>(fake_data.data(), num_bytes_to_send);
188 :
189 0 : data_fragment_ptr->set_trigger_number(data_request.trigger_number);
190 0 : data_fragment_ptr->set_run_number(m_run_number);
191 0 : data_fragment_ptr->set_element_id(m_sourceid);
192 0 : data_fragment_ptr->set_error_bits(0);
193 0 : data_fragment_ptr->set_type(m_fragment_type);
194 0 : data_fragment_ptr->set_trigger_timestamp(data_request.trigger_timestamp);
195 0 : data_fragment_ptr->set_window_begin(data_request.request_information.window_begin);
196 0 : data_fragment_ptr->set_window_end(data_request.request_information.window_end);
197 0 : data_fragment_ptr->set_sequence_number(data_request.sequence_number);
198 :
199 0 : if (m_response_delay > 0) {
200 0 : std::this_thread::sleep_for(std::chrono::nanoseconds(m_response_delay));
201 : }
202 :
203 0 : try {
204 0 : auto iom = iomanager::IOManager::get();
205 0 : iom->get_sender<std::unique_ptr<daqdataformats::Fragment>>(data_request.data_destination)
206 0 : ->send(std::move(data_fragment_ptr), std::chrono::milliseconds(1000));
207 0 : } catch (ers::Issue& e) {
208 0 : ers::warning(FragmentTransmissionFailed(ERS_HERE, get_name(), data_request.trigger_number, e));
209 0 : }
210 :
211 0 : TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": finishing processing request " << data_request.request_number;
212 0 : }
213 :
214 : } // namespace dfmodules
215 : } // namespace dunedaq
216 :
217 0 : DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::FakeDataProdModule)
|