Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef ASIOLIBS_SRC_SOURCEMODEL_HPP_
9 : #define ASIOLIBS_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 :
14 : #include "iomanager/IOManager.hpp"
15 : #include "iomanager/Sender.hpp"
16 : #include "logging/Logging.hpp"
17 :
18 : #include "asiolibs/opmon/SourceModel.pb.h"
19 :
20 : // #include "datahandlinglibs/utils/ReusableThread.hpp"
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 : #include "datahandlinglibs/utils/BufferCopy.hpp"
23 :
24 : // #include <folly/ProducerConsumerQueue.h>
25 : // #include <nlohmann/json.hpp>
26 :
27 : #include <atomic>
28 : #include <memory>
29 : #include <mutex>
30 : #include <string>
31 :
32 :
33 : namespace dunedaq::asiolibs {
34 :
35 : template<class TargetPayloadType>
36 : class SourceModel : public SourceConcept
37 : {
38 : public:
39 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
40 : using inherited = SourceConcept;
41 : using data_t = nlohmann::json;
42 :
43 : /**
44 : * @brief SourceModel Constructor
45 : * @param name Instance name for this SourceModel instance
46 : */
47 0 : SourceModel()
48 0 : : SourceConcept()
49 0 : {}
50 0 : ~SourceModel() {}
51 :
52 0 : void set_sink(const std::string& sink_name, bool callback_mode) override
53 : {
54 0 : m_callback_mode = callback_mode;
55 0 : if (callback_mode) {
56 0 : TLOG_DEBUG(5) << "Callback mode requested. Won't acquire iom sender!";
57 : } else {
58 0 : if (m_sink_is_set) {
59 0 : TLOG_DEBUG(5) << "SourceModel sink is already set in initialized!";
60 : } else {
61 0 : m_sink_queue = get_iom_sender<TargetPayloadType>(sink_name);
62 0 : m_sink_is_set = true;
63 : }
64 : }
65 0 : }
66 :
67 0 : void acquire_callback() override
68 : {
69 0 : if (m_callback_mode) {
70 0 : if (m_callback_is_acquired) {
71 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
72 : } else {
73 : // Getting DataMoveCBRegistry
74 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
75 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_name);
76 0 : m_callback_is_acquired = true;
77 0 : }
78 : } else {
79 0 : TLOG_DEBUG(5) << "Won't acquire callback, as IOM sink is set!";
80 : }
81 0 : }
82 :
83 : std::shared_ptr<sink_t>& get_sink() { return m_sink_queue; }
84 :
85 0 : bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
86 : {
87 0 : bool push_out = true;
88 : if (push_out) {
89 :
90 0 : TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
91 :
92 0 : if (m_callback_mode) {
93 0 : (*m_sink_callback)(std::move(target_payload));
94 : } else {
95 0 : if (!m_sink_queue->try_send(std::move(target_payload), iomanager::Sender::s_no_block)) {
96 : //if(m_dropped_packets == 0 || m_dropped_packets%10000) {
97 : // TLOG() << "Dropped data " << m_dropped_packets;
98 : //}
99 0 : ++m_dropped_packets;
100 : }
101 : }
102 :
103 : } else {
104 : TargetPayloadType target_payload;
105 : uint32_t bytes_copied = 0;
106 : datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
107 : }
108 :
109 0 : return true;
110 : }
111 :
112 0 : std::size_t get_target_payload_size() const override {
113 : TargetPayloadType target_payload;
114 0 : return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
115 : }
116 :
117 0 : void generate_opmon_data() override {
118 :
119 0 : opmon::SourceInfo info;
120 0 : info.set_dropped_frames( m_dropped_packets.load() );
121 :
122 0 : publish( std::move(info) );
123 0 : }
124 :
125 : private:
126 : // Sink internals
127 : std::string m_sink_id;
128 : bool m_sink_is_set{ false };
129 : std::shared_ptr<sink_t> m_sink_queue;
130 :
131 : // Callback internals
132 : bool m_callback_mode;
133 : bool m_callback_is_acquired{ false };
134 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
135 : sink_cb_t m_sink_callback;
136 :
137 : std::atomic<uint64_t> m_dropped_packets{0};
138 :
139 : };
140 :
141 : } // namespace dunedaq::asiolibs
142 :
143 : #endif // ASIOLIBS_SRC_SOURCEMODEL_HPP_
|