DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceModel.hpp
Go to the documentation of this file.
1
8#ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
9#define DPDKLIBS_SRC_SOURCEMODEL_HPP_
10
11#include "SourceConcept.hpp"
12
13#include "dpdklibs/Issues.hpp"
14
16#include "iomanager/Sender.hpp"
17#include "logging/Logging.hpp"
18
20
22
23// #include <folly/ProducerConsumerQueue.h>
24// #include <nlohmann/json.hpp>
25
26#include <atomic>
27#include <memory>
28#include <mutex>
29#include <string>
30
31
32namespace dunedaq::dpdklibs {
33
34template<class TargetPayloadType>
36{
37public:
40 using data_t = nlohmann::json;
41
48 {}
50
51 void set_sink(const std::string& sink_name, bool callback_mode) override
52 {
53 m_callback_mode = callback_mode;
54 if (callback_mode) {
55 TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
56 } else {
57 if (m_sink_is_set) {
58 TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
59 } else {
61 m_sink_is_set = true;
62 }
63 }
64 }
65
66 void acquire_callback() override
67 {
68 if (m_callback_mode) {
70 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
71 } else {
72 // Getting DataMoveCBRegistry
74 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
76 }
77 } else {
78 TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
79 }
80 }
81
82 // Exposes sink via returning a pointer to it.
83 std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
84
85 // Process an incoming raw byte buffer and extract complete payloads of type TargetPayloadType.
86 void handle_payload(char* message, std::size_t size)
87 {
88 // Calculate how many full payloads fit in the incoming message buffer.
89 std::size_t full_payloads = size / m_expected_payload_size;
90
91 // Calculate leftover bytes that don't form a complete payload.
92 if (size % m_expected_payload_size > 0) [[unlikely]] {
94 }
95
96 // Process each full payload
97 for (std::size_t i = 0; i < full_payloads; ++i) {
98 // Calculate pointer to the i-th payload chunk inside the message buffer.
99 const char* src = message + i * m_expected_payload_size;
100
101 // Materialize a real TargetPayloadType object by copying bytes from the buffer.
102 // This is defined behavior, alignment-safe, and fast, without pointer vodoo
103 // Previously reinterpret_cast to TargetPayloadType* introduced alignment traps
104 // “pretend there’s a constructed object there” UB. Scatter won't work like that.
105 TargetPayloadType payload;
106 std::memcpy(&payload, src, m_expected_payload_size);
107
108 if (m_callback_mode) {
109 // Pass by value (moved); no references into 'message', so no UAF.
110 (*m_sink_callback)(std::move(payload));
111 } else {
112 // Queue mode: attempt to enqueue the payload in a non-blocking way.
113 if (!m_sink_queue->try_send(std::move(payload), iomanager::Sender::s_no_block)) {
115 }
116 }
117 }
118 }
119
120 void generate_opmon_data() override {
121
124 }
125
127 // RS FIXME: These are NOT dropped frames!!! Rename on opmon is needed
128 info.set_dropped_frames( m_failed_to_send_daq_payloads.exchange(0) );
129
130 publish( std::move(info) );
131 }
132
133private:
134 // Constants
135 const std::size_t m_expected_payload_size = sizeof(TargetPayloadType);
136
137
138 // Sink internals
139 std::string m_sink_id;
140 bool m_sink_is_set{ false };
141 std::shared_ptr<sink_t> m_sink_queue;
142
143 // Callback internals
146 using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
148
149 // Stats
150 std::atomic<uint64_t> m_leftover_bytes_encountered{0};
151 std::atomic<uint64_t> m_failed_to_send_daq_payloads{0};
152
153};
154
155} // namespace dunedaq::dpdklibs
156
157#endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
#define ERS_HERE
static std::shared_ptr< DataMoveCallbackRegistry > get()
std::atomic< uint64_t > m_failed_to_send_daq_payloads
std::shared_ptr< sink_t > & get_sink()
SourceModel()
SourceModel Constructor.
std::atomic< uint64_t > m_leftover_bytes_encountered
std::shared_ptr< std::function< void(TargetPayloadType &&)> > sink_cb_t
const std::size_t m_expected_payload_size
void set_sink(const std::string &sink_name, bool callback_mode) override
void handle_payload(char *message, std::size_t size)
std::shared_ptr< sink_t > m_sink_queue
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115