Line data Source code
1 : /**
2 : * @file SourceModel.hpp FELIX CR's ELink concept wrapper
3 : *
4 : * This is part of the DUNE DAQ , copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 : #ifndef ASIOLIBS_SRC_SOURCEMODEL_HPP_
9 : #define ASIOLIBS_SRC_SOURCEMODEL_HPP_
10 :
11 : #include "SourceConcept.hpp"
12 :
13 :
14 : #include "iomanager/IOManager.hpp"
15 : #include "iomanager/Sender.hpp"
16 : #include "logging/Logging.hpp"
17 :
18 : #include "asiolibs/opmon/SourceModel.pb.h"
19 :
20 : // #include "datahandlinglibs/utils/ReusableThread.hpp"
21 : #include "datahandlinglibs/DataMoveCallbackRegistry.hpp"
22 : #include "datahandlinglibs/utils/BufferCopy.hpp"
23 :
24 : // #include <folly/ProducerConsumerQueue.h>
25 : // #include <nlohmann/json.hpp>
26 :
27 : #include <atomic>
28 : #include <memory>
29 : #include <mutex>
30 : #include <string>
31 :
32 :
33 : namespace dunedaq::asiolibs {
34 :
35 : template<class TargetPayloadType>
36 : class SourceModel : public SourceConcept
37 : {
38 : public:
39 : using sink_t = iomanager::SenderConcept<TargetPayloadType>;
40 : using inherited = SourceConcept;
41 : using data_t = nlohmann::json;
42 :
43 : /**
44 : * @brief SourceModel Constructor
45 : * @param name Instance name for this SourceModel instance
46 : */
47 0 : SourceModel()
48 0 : : SourceConcept()
49 0 : {}
50 0 : ~SourceModel() {}
51 :
52 0 : void acquire_callback() override
53 : {
54 0 : if (m_callback_is_acquired) {
55 0 : TLOG_DEBUG(5) << "SourceModel callback is already acquired!";
56 : } else {
57 : // Getting DataMoveCBRegistry
58 0 : auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
59 0 : m_sink_callback = dmcbr->get_callback<TargetPayloadType>(inherited::m_sink_conf);
60 0 : m_callback_is_acquired = true;
61 0 : }
62 0 : }
63 :
64 0 : bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned)
65 : {
66 0 : bool push_out = true;
67 : if (push_out) {
68 :
69 0 : TargetPayloadType& target_payload = *reinterpret_cast<TargetPayloadType*>(message);
70 :
71 0 : (*m_sink_callback)(std::move(target_payload));
72 :
73 : } else {
74 : TargetPayloadType target_payload;
75 : uint32_t bytes_copied = 0;
76 : datahandlinglibs::buffer_copy(message, size, static_cast<void*>(&target_payload), bytes_copied, sizeof(target_payload));
77 : }
78 :
79 0 : return true;
80 : }
81 :
82 0 : std::size_t get_target_payload_size() const override {
83 : TargetPayloadType target_payload;
84 0 : return target_payload.get_frame_size(); // TODO (DTE): Could be a static function?
85 : }
86 :
87 0 : void generate_opmon_data() override {
88 :
89 0 : opmon::SourceInfo info;
90 0 : info.set_dropped_frames( m_dropped_packets.load() );
91 :
92 0 : publish( std::move(info) );
93 0 : }
94 :
95 : private:
96 : // Callback internals
97 : bool m_callback_is_acquired{ false };
98 : using sink_cb_t = std::shared_ptr<std::function<void(TargetPayloadType&&)>>;
99 : sink_cb_t m_sink_callback;
100 :
101 : std::atomic<uint64_t> m_dropped_packets{0};
102 :
103 : };
104 :
105 : } // namespace dunedaq::asiolibs
106 :
107 : #endif // ASIOLIBS_SRC_SOURCEMODEL_HPP_
|