DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceModel.hpp
Go to the documentation of this file.
1
8#ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
9#define DPDKLIBS_SRC_SOURCEMODEL_HPP_
10
11#include "SourceConcept.hpp"
12
13#include "dpdklibs/Issues.hpp"
14
16#include "iomanager/Sender.hpp"
17#include "logging/Logging.hpp"
18
20
22
23// #include <folly/ProducerConsumerQueue.h>
24// #include <nlohmann/json.hpp>
25
26#include <atomic>
27#include <memory>
28#include <mutex>
29#include <string>
30
31
32namespace dunedaq::dpdklibs {
33
34template<class TargetPayloadType>
36{
37public:
40 using data_t = nlohmann::json;
41
48 {}
50
51 void acquire_callback() override
52 {
54 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
55 } else {
56 // Getting DataMoveCBRegistry
58 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
60 }
61 }
62
63 // Process an incoming raw byte buffer and extract complete frames of type TargetPayloadType.
64 void handle_daq_frame(char* buffer, std::size_t size)
65 {
66 // Calculate how many full frames fit in the incoming message buffer.
67 std::size_t full_frames = size / m_expected_frame_size;
68
69 // Calculate leftover bytes that don't form a complete frame.
70 if (size % m_expected_frame_size > 0) [[unlikely]] {
72 }
73
74 // Process each full frames
75 for (std::size_t i = 0; i < full_frames; ++i) {
76 // Calculate pointer to the i-th frame chunk inside the message buffer.
77 const char* src = buffer + i * m_expected_frame_size;
78
79 // Materialize a real TargetPayloadType object by copying bytes from the buffer.
80 // This is defined behavior, alignment-safe, and fast, without pointer vodoo
81 // Previously reinterpret_cast to TargetPayloadType* introduced alignment traps
82 // “pretend there’s a constructed object there” UB. Scatter won't work like that.
83 TargetPayloadType frame;
84 std::memcpy(&frame, src, m_expected_frame_size);
85
86 // Pass by value (moved); no references into 'buffer', so no UAF.
87 (*m_sink_callback)(std::move(frame));
88 }
89 }
90
91 void generate_opmon_data() override {
92
95 }
96
98 info.set_failed_to_send_daq_payloads( m_failed_to_send_daq_payloads.exchange(0) );
99 info.set_leftover_bytes_encountered( m_leftover_bytes_encountered.exchange(0) );
100
101 publish( std::move(info) );
102 }
103
104private:
105 // Constants
106 const std::size_t m_expected_frame_size = sizeof(TargetPayloadType);
107
108 // Callback internals
110 using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
112
113 // Stats
114 std::atomic<uint64_t> m_leftover_bytes_encountered{0};
115 std::atomic<uint64_t> m_failed_to_send_daq_payloads{0};
116
117};
118
119} // namespace dunedaq::dpdklibs
120
121#endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
#define ERS_HERE
static std::shared_ptr< DataMoveCallbackRegistry > get()
const appmodel::DataMoveCallbackConf * m_sink_conf
std::atomic< uint64_t > m_failed_to_send_daq_payloads
SourceModel()
SourceModel Constructor.
std::atomic< uint64_t > m_leftover_bytes_encountered
void generate_opmon_data() override
const std::size_t m_expected_frame_size
std::shared_ptr< std::function< void(TargetPayloadType &&)> > sink_cb_t
void handle_daq_frame(char *buffer, std::size_t size)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115