DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
SourceModel.hpp
Go to the documentation of this file.
1
8#ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_
9#define CRTMODULES_SRC_SOURCEMODEL_HPP_
10
11#include "SourceConcept.hpp"
12
13
15#include "iomanager/Sender.hpp"
16#include "logging/Logging.hpp"
17
19
20// #include "datahandlinglibs/utils/ReusableThread.hpp"
23
24// #include <folly/ProducerConsumerQueue.h>
25// #include <nlohmann/json.hpp>
26
27#include <atomic>
28#include <memory>
29#include <mutex>
30#include <string>
31
32
33namespace dunedaq::crtmodules {
34
35template<class TargetPayloadType>
37{
38public:
41 using data_t = nlohmann::json;
42
46 static constexpr auto buffer_size = sizeof(TargetPayloadType);
47
54 {}
56
57 void acquire_callback() override
58 {
60 TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
61 } else {
62 // Getting DataMoveCBRegistry
64 m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
66 }
67 }
68
69 bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
70 {
71 bool push_out = true;
72 if (push_out) {
73
74 TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
75 (*m_sink_callback)(std::move(target_payload));
76
77 } else {
78 TargetPayloadType target_payload;
79 uint32_t bytes_copied = 0;
80 datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
81 }
82
83 return true;
84 }
85
86 std::size_t get_frame_size() const override {
87 TargetPayloadType target_payload;
88 return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
89 }
90
91 void generate_opmon_data() override {
92
94 info.set_dropped_frames( m_dropped_packets.load() );
95
96 publish( std::move(info) );
97 }
98
99private:
100 // Callback internals
102 using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
104
105 std::atomic<uint64_t> m_dropped_packets{0};
106
107};
108
109} // namespace dunedaq::crtmodules
110
111#endif // CRTMODULES_SRC_SOURCEMODEL_HPP_
const appmodel::DataMoveCallbackConf * m_sink_conf
static constexpr auto buffer_size
Buffer size based on TargetPayloadType.
bool handle_payload(char *message, std::size_t size)
std::atomic< uint64_t > m_dropped_packets
SourceModel()
SourceModel Constructor.
std::shared_ptr< std::function< void(TargetPayloadType &&)> > sink_cb_t
std::size_t get_frame_size() const override
static std::shared_ptr< DataMoveCallbackRegistry > get()
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
void buffer_copy(const char *data, std::size_t size, void *buffer, std::uint32_t buffer_pos, const std::size_t &buffer_size)
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size