Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
9 : #define DPDKLIBS_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 : #include "dpdklibs/Issues.hpp"
14 :
15 : #include "iomanager/IOManager.hpp"
16 : #include "iomanager/Sender.hpp"
17 : #include "logging/Logging.hpp"
18 :
19 : #include "dpdklibs/opmon/SourceModel.pb.h"
20 :
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 :
23 : // #include <folly/ProducerConsumerQueue.h>
24 : // #include <nlohmann/json.hpp>
25 :
26 : #include <atomic>
27 : #include <memory>
28 : #include <mutex>
29 : #include <string>
30 :
31 :
32 : namespace dunedaq::dpdklibs {
33 :
34 : template<class TargetPayloadType>
35 : class SourceModel : public SourceConcept
36 : {
37 : public:
38 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
39 : using inherited = SourceConcept;
40 : using data_t = nlohmann::json;
41 :
42 : /**
43 : * @brief SourceModel Constructor
44 : * @param name Instance name for this SourceModel instance
45 : */
46 0 : SourceModel()
47 0 : : SourceConcept()
48 0 : {}
49 0 : ~SourceModel() {}
50 :
51 0 : void set_sink(const std::string& sink_name, bool callback_mode) override
52 : {
53 0 : m_callback_mode = callback_mode;
54 0 : if (callback_mode) {
55 0 : TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
56 : } else {
57 0 : if (m_sink_is_set) {
58 0 : TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
59 : } else {
60 0 : m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
61 0 : m_sink_is_set = true;
62 : }
63 : }
64 0 : }
65 :
66 0 : void acquire_callback() override
67 : {
68 0 : if (m_callback_mode) {
69 0 : if (m_callback_is_acquired) {
70 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
71 : } else {
72 : // Getting DataMoveCBRegistry
73 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
74 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
75 0 : m_callback_is_acquired = true;
76 0 : }
77 : } else {
78 0 : TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
79 : }
80 0 : }
81 :
82 : // Exposes sink via returning a pointer to it.
83 : std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
84 :
85 : // Process an incoming raw byte buffer and extract complete frames of type TargetPayloadType.
86 0 : void handle_daq_frame(char* buffer, std::size_t size)
87 : {
88 : // Calculate how many full frames fit in the incoming message buffer.
89 0 : std::size_t full_frames = size / m_expected_frame_size;
90 :
91 : // Calculate leftover bytes that don't form a complete frame.
92 0 : if (size % m_expected_frame_size > 0) [[unlikely]] {
93 0 : ++m_leftover_bytes_encountered;
94 : }
95 :
96 : // Process each full frames
97 0 : for (std::size_t i = 0; i < full_frames; ++i) {
98 : // Calculate pointer to the i-th frame chunk inside the message buffer.
99 0 : const char* src = buffer + i * m_expected_frame_size;
100 :
101 : // Materialize a real TargetPayloadType object by copying bytes from the buffer.
102 : // This is defined behavior, alignment-safe, and fast, without pointer vodoo
103 : // Previously reinterpret_cast to TargetPayloadType* introduced alignment traps
104 : // “pretend there’s a constructed object there” UB. Scatter won't work like that.
105 : TargetPayloadType frame;
106 0 : std::memcpy(&frame, src, m_expected_frame_size);
107 :
108 0 : if (m_callback_mode) {
109 : // Pass by value (moved); no references into 'buffer', so no UAF.
110 0 : (*m_sink_callback)(std::move(frame));
111 : } else {
112 : // Queue mode: attempt to enqueue the frame in a non-blocking way.
113 0 : if (!m_sink_queue->try_send(std::move(frame), iomanager::Sender::s_no_block)) {
114 0 : ++m_failed_to_send_daq_payloads;
115 : }
116 : }
117 : }
118 0 : }
119 :
120 0 : void generate_opmon_data() override {
121 :
122 0 : if(m_failed_to_send_daq_payloads != 0) {
123 0 : ers::warning(FailedToSendData(ERS_HERE, m_sink_id, m_failed_to_send_daq_payloads));
124 : }
125 :
126 0 : opmon::SourceInfo info;
127 0 : info.set_failed_to_send_daq_payloads( m_failed_to_send_daq_payloads.exchange(0) );
128 0 : info.set_leftover_bytes_encountered( m_leftover_bytes_encountered.exchange(0) );
129 :
130 0 : publish( std::move(info) );
131 0 : }
132 :
133 : private:
134 : // Constants
135 : const std::size_t m_expected_frame_size = sizeof(TargetPayloadType);
136 :
137 : // Sink internals
138 : std::string m_sink_id;
139 : bool m_sink_is_set{ false };
140 : std::shared_ptr<sink_t> m_sink_queue;
141 :
142 : // Callback internals
143 : bool m_callback_mode;
144 : bool m_callback_is_acquired{ false };
145 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
146 : sink_cb_t m_sink_callback;
147 :
148 : // Stats
149 : std::atomic<uint64_t> m_leftover_bytes_encountered{0};
150 : std::atomic<uint64_t> m_failed_to_send_daq_payloads{0};
151 :
152 : };
153 :
154 : } // namespace dunedaq::dpdklibs
155 :
156 : #endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
|