Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_
9 : #define CRTMODULES_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 :
14 : #include "iomanager/IOManager.hpp"
15 : #include "iomanager/Sender.hpp"
16 : #include "logging/Logging.hpp"
17 :
18 : #include "crtmodules/opmon/SourceModel.pb.h"
19 :
20 : // #include "datahandlinglibs/utils/ReusableThread.hpp"
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 : #include "datahandlinglibs/utils/BufferCopy.hpp"
23 :
24 : // #include <folly/ProducerConsumerQueue.h>
25 : // #include <nlohmann/json.hpp>
26 :
27 : #include <atomic>
28 : #include <memory>
29 : #include <mutex>
30 : #include <string>
31 :
32 :
33 : namespace dunedaq::crtmodules {
34 :
35 : template<class TargetPayloadType>
36 : class SourceModel : public SourceConcept
37 : {
38 : public:
39 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
40 : using inherited = SourceConcept;
41 : using data_t = nlohmann::json;
42 :
43 : /**
44 : * @brief Buffer size based on TargetPayloadType
45 : */
46 : static constexpr auto buffer_size = sizeof(TargetPayloadType);
47 :
48 : /**
49 : * @brief SourceModel Constructor
50 : * @param name Instance name for this SourceModel instance
51 : */
52 0 : SourceModel()
53 0 : : SourceConcept()
54 0 : {}
55 0 : ~SourceModel() {}
56 :
57 0 : void acquire_callback() override
58 : {
59 0 : if (m_callback_is_acquired) {
60 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
61 : } else {
62 : // Getting DataMoveCBRegistry
63 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
64 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
65 0 : m_callback_is_acquired = true;
66 0 : }
67 0 : }
68 :
69 0 : bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
70 : {
71 0 : bool push_out = true;
72 : if (push_out) {
73 :
74 0 : TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
75 0 : (*m_sink_callback)(std::move(target_payload));
76 :
77 : } else {
78 : TargetPayloadType target_payload;
79 : uint32_t bytes_copied = 0;
80 : datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
81 : }
82 :
83 0 : return true;
84 : }
85 :
86 0 : std::size_t get_frame_size() const override {
87 : TargetPayloadType target_payload;
88 0 : return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
89 : }
90 :
91 0 : void generate_opmon_data() override {
92 :
93 0 : opmon::SourceInfo info;
94 0 : info.set_dropped_frames( m_dropped_packets.load() );
95 :
96 0 : publish( std::move(info) );
97 0 : }
98 :
99 : private:
100 : // Callback internals
101 : bool m_callback_is_acquired{ false };
102 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
103 : sink_cb_t m_sink_callback;
104 :
105 : std::atomic<uint64_t> m_dropped_packets{0};
106 :
107 : };
108 :
109 : } // namespace dunedaq::crtmodules
110 :
111 : #endif // CRTMODULES_SRC_SOURCEMODEL_HPP_
|