Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
9 : #define DPDKLIBS_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 : #include "dpdklibs/Issues.hpp"
14 :
15 : #include "iomanager/IOManager.hpp"
16 : #include "iomanager/Sender.hpp"
17 : #include "logging/Logging.hpp"
18 :
19 : #include "dpdklibs/opmon/SourceModel.pb.h"
20 :
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 :
23 : // #include <folly/ProducerConsumerQueue.h>
24 : // #include <nlohmann/json.hpp>
25 :
26 : #include <atomic>
27 : #include <memory>
28 : #include <mutex>
29 : #include <string>
30 :
31 :
32 : namespace dunedaq::dpdklibs {
33 :
34 : template<class TargetPayloadType>
35 : class SourceModel : public SourceConcept
36 : {
37 : public:
38 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
39 : using inherited = SourceConcept;
40 : using data_t = nlohmann::json;
41 :
42 : /**
43 : * @brief SourceModel Constructor
44 : * @param name Instance name for this SourceModel instance
45 : */
46 0 : SourceModel()
47 0 : : SourceConcept()
48 0 : {}
49 0 : ~SourceModel() {}
50 :
51 0 : void acquire_callback() override
52 : {
53 0 : if (m_callback_is_acquired) {
54 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
55 : } else {
56 : // Getting DataMoveCBRegistry
57 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
58 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
59 0 : m_callback_is_acquired = true;
60 0 : }
61 0 : }
62 :
63 : // Process an incoming raw byte buffer and extract complete frames of type TargetPayloadType.
64 0 : void handle_daq_frame(char* buffer, std::size_t size)
65 : {
66 : // Calculate how many full frames fit in the incoming message buffer.
67 0 : std::size_t full_frames = size / m_expected_frame_size;
68 :
69 : // Calculate leftover bytes that don't form a complete frame.
70 0 : if (size % m_expected_frame_size > 0) [[unlikely]] {
71 0 : ++m_leftover_bytes_encountered;
72 : }
73 :
74 : // Process each full frames
75 0 : for (std::size_t i = 0; i < full_frames; ++i) {
76 : // Calculate pointer to the i-th frame chunk inside the message buffer.
77 0 : const char* src = buffer + i * m_expected_frame_size;
78 :
79 : // Materialize a real TargetPayloadType object by copying bytes from the buffer.
80 : // This is defined behavior, alignment-safe, and fast, without pointer vodoo
81 : // Previously reinterpret_cast to TargetPayloadType* introduced alignment traps
82 : // “pretend there’s a constructed object there” UB. Scatter won't work like that.
83 : TargetPayloadType frame;
84 0 : std::memcpy(&frame, src, m_expected_frame_size);
85 :
86 : // Pass by value (moved); no references into 'buffer', so no UAF.
87 0 : (*m_sink_callback)(std::move(frame));
88 : }
89 0 : }
90 :
91 0 : void generate_opmon_data() override {
92 :
93 0 : if(m_failed_to_send_daq_payloads != 0) {
94 0 : ers::warning(FailedToSendData(ERS_HERE, inherited::m_sink_conf->UID(), m_failed_to_send_daq_payloads));
95 : }
96 :
97 0 : opmon::SourceInfo info;
98 0 : info.set_failed_to_send_daq_payloads( m_failed_to_send_daq_payloads.exchange(0) );
99 0 : info.set_leftover_bytes_encountered( m_leftover_bytes_encountered.exchange(0) );
100 :
101 0 : publish( std::move(info) );
102 0 : }
103 :
104 : private:
105 : // Constants
106 : const std::size_t m_expected_frame_size = sizeof(TargetPayloadType);
107 :
108 : // Callback internals
109 : bool m_callback_is_acquired{ false };
110 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
111 : sink_cb_t m_sink_callback;
112 :
113 : // Stats
114 : std::atomic<uint64_t> m_leftover_bytes_encountered{0};
115 : std::atomic<uint64_t> m_failed_to_send_daq_payloads{0};
116 :
117 : };
118 :
119 : } // namespace dunedaq::dpdklibs
120 :
121 : #endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
|