DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceModel.hpp
Go to the documentation of this file.
1
8#ifndef DPDKLIBS_SRC_SOURCEMODEL_HPP_
9#define DPDKLIBS_SRC_SOURCEMODEL_HPP_
10
11#include "SourceConcept.hpp"
12
13#include "dpdklibs/Issues.hpp"
14
16#include "iomanager/Sender.hpp"
17#include "logging/Logging.hpp"
18
20
22
23// #include <folly/ProducerConsumerQueue.h>
24// #include <nlohmann/json.hpp>
25
26#include <atomic>
27#include <memory>
28#include <mutex>
29#include <string>
30
31
32namespace dunedaq::dpdklibs {
33
34template<class TargetPayloadType>
36{
37public:
40 using data_t = nlohmann::json;
41
48 {}
50
51 void set_sink(const std::string& sink_name, bool callback_mode) override
52 {
53 m_callback_mode = callback_mode;
54 if (callback_mode) {
55 TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
56 } else {
57 if (m_sink_is_set) {
58 TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
59 } else {
61 m_sink_is_set = true;
62 }
63 }
64 }
65
66 void acquire_callback() override
67 {
68 if (m_callback_mode) {
70 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
71 } else {
72 // Getting DataMoveCBRegistry
74 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
76 }
77 } else {
78 TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
79 }
80 }
81
82 std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
83
84 bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
85 {
86 bool push_out = true;
87 if (push_out) {
88
89 TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
90
91 if (m_callback_mode) {
92 (*m_sink_callback)(std::move(target_payload));
93 } else {
94 if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
96 }
97 }
98
99 } else {
100 TargetPayloadType target_payload;
101 uint32_t bytes_copied = 0;
102 datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
103 }
104
105 return true;
106 }
107
108 void generate_opmon_data() override {
109
110 if(m_dropped_packets != 0) {
112 }
113
115 info.set_dropped_frames( m_dropped_packets.load() );
116
117 publish( std::move(info) );
118 }
119
120private:
121 // Sink internals
122 std::string m_sink_id;
123 bool m_sink_is_set{ false };
124 std::shared_ptr<sink_t> m_sink_queue;
125
126 // Callback internals
129 using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
131
132 std::atomic<uint64_t> m_dropped_packets{0};
133
134};
135
136} // namespace dunedaq::dpdklibs
137
138#endif // DPDKLIBS_SRC_SOURCEMODEL_HPP_
#define ERS_HERE
static std::shared_ptr< DataMoveCallbackRegistry > get()
std::shared_ptr< sink_t > & get_sink()
SourceModel()
SourceModel Constructor.
bool handle_payload(char *message, std::size_t size)
std::atomic< uint64_t > m_dropped_packets
std::shared_ptr< std::function< void(TargetPayloadType &&)> > sink_cb_t
void set_sink(const std::string &sink_name, bool callback_mode) override
std::shared_ptr< sink_t > m_sink_queue
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
void buffer_copy(const char *data, std::size_t size, void *buffer, std::uint32_t buffer_pos, const std::size_t &buffer_size)
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
Unknown serialization Cannot deserialize message
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
void warning(const Issue &issue)
Definition ers.hpp:115