Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_
9 : #define CRTMODULES_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 :
14 : #include "iomanager/IOManager.hpp"
15 : #include "iomanager/Sender.hpp"
16 : #include "logging/Logging.hpp"
17 :
18 : #include "crtmodules/opmon/SourceModel.pb.h"
19 :
20 : // #include "datahandlinglibs/utils/ReusableThread.hpp"
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 : #include "datahandlinglibs/utils/BufferCopy.hpp"
23 :
24 : // #include <folly/ProducerConsumerQueue.h>
25 : // #include <nlohmann/json.hpp>
26 :
27 : #include <atomic>
28 : #include <memory>
29 : #include <mutex>
30 : #include <string>
31 :
32 :
33 : namespace dunedaq::crtmodules {
34 :
35 : template<class TargetPayloadType>
36 : class SourceModel : public SourceConcept
37 : {
38 : public:
39 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
40 : using inherited = SourceConcept;
41 : using data_t = nlohmann::json;
42 :
43 : /**
44 : * @brief Buffer size based on TargetPayloadType
45 : */
46 : static constexpr auto buffer_size = sizeof(TargetPayloadType);
47 :
48 : /**
49 : * @brief SourceModel Constructor
50 : * @param name Instance name for this SourceModel instance
51 : */
52 0 : SourceModel()
53 0 : : SourceConcept()
54 0 : {}
55 0 : ~SourceModel() {}
56 :
57 0 : void set_sink(const std::string& sink_name, bool callback_mode) override
58 : {
59 0 : m_callback_mode = callback_mode;
60 0 : if (callback_mode) {
61 0 : TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
62 : } else {
63 0 : if (m_sink_is_set) {
64 0 : TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
65 : } else {
66 0 : m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
67 0 : m_sink_is_set = true;
68 : }
69 : }
70 0 : }
71 :
72 0 : void acquire_callback() override
73 : {
74 0 : if (m_callback_mode) {
75 0 : if (m_callback_is_acquired) {
76 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
77 : } else {
78 : // Getting DataMoveCBRegistry
79 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
80 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
81 0 : m_callback_is_acquired = true;
82 0 : }
83 : } else {
84 0 : TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
85 : }
86 0 : }
87 :
88 : std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
89 :
90 0 : bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
91 : {
92 0 : bool push_out = true;
93 : if (push_out) {
94 :
95 0 : TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
96 :
97 0 : if (m_callback_mode) {
98 0 : (*m_sink_callback)(std::move(target_payload));
99 : } else {
100 0 : if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
101 : //if(m_dropped_packets == 0 || m_dropped_packets%10000) {
102 : // TLOG() << "Dropped data " << m_dropped_packets;
103 : //}
104 0 : ++m_dropped_packets;
105 : }
106 : }
107 :
108 : } else {
109 : TargetPayloadType target_payload;
110 : uint32_t bytes_copied = 0;
111 : datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
112 : }
113 :
114 0 : return true;
115 : }
116 :
117 0 : std::size_t get_frame_size() const override {
118 : TargetPayloadType target_payload;
119 0 : return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
120 : }
121 :
122 0 : void generate_opmon_data() override {
123 :
124 0 : opmon::SourceInfo info;
125 0 : info.set_dropped_frames( m_dropped_packets.load() );
126 :
127 0 : publish( std::move(info) );
128 0 : }
129 :
130 : private:
131 : // Sink internals
132 : std::string m_sink_id;
133 : bool m_sink_is_set{ false };
134 : std::shared_ptr<sink_t> m_sink_queue;
135 :
136 : // Callback internals
137 : bool m_callback_mode;
138 : bool m_callback_is_acquired{ false };
139 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
140 : sink_cb_t m_sink_callback;
141 :
142 : std::atomic<uint64_t> m_dropped_packets{0};
143 :
144 : };
145 :
146 : } // namespace dunedaq::crtmodules
147 :
148 : #endif // CRTMODULES_SRC_SOURCEMODEL_HPP_
|