DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
DataSubscriberModel.hpp
Go to the documentation of this file.
1
8#ifndef DATAHANDLINGLIBS_SRC_SOURCEMODEL_HPP_
9#define DATAHANDLINGLIBS_SRC_SOURCEMODEL_HPP_
10
13
15#include "iomanager/Sender.hpp"
17#include "logging/Logging.hpp"
18//#include "utilities/ReusableThread.hpp"
19
21
22//#include <folly/ProducerConsumerQueue.h>
23//#include <nlohmann/json.hpp>
24
25//#include <atomic>
26//#include <memory>
27//#include <mutex>
28#include <functional>
29
31
32template<class PayloadType>
34{
35public:
37
44
45 void init(const confmodel::DaqModule* cfg) override {
46 if (cfg->get_outputs().size() != 1) {
47 throw datahandlinglibs::InitializationError(ERS_HERE, "Only 1 output supported for subscribers");
48 }
50
51 if (cfg->get_inputs().size() != 1) {
52 throw datahandlinglibs::InitializationError(ERS_HERE, "Only 1 input supported for subscribers");
53 }
55 }
56
57 void start() {
58 m_packets = 0;
59 m_sum_packets = 0;
61 m_data_receiver->add_callback(std::bind(&DataSubscriberModel::handle_payload, this, std::placeholders::_1));
62 }
63
64 void stop() {
65 m_data_receiver->remove_callback();
66 }
67
68 bool handle_payload(PayloadType& message) // NOLINT(build/unsigned)
69 {
70 ++m_packets;
72 if (!m_data_sender->try_send(std::move(message), iomanager::Sender::s_no_block)) {
74 }
75 return true;
76 }
77protected:
78 virtual void generate_opmon_data() override {
80 info.set_num_packets(m_packets.exchange(0));
81 info.set_sum_packets(m_sum_packets) ;
82 info.set_num_dropped_packets(m_dropped_packets.exchange(0));
83
84 this->publish(std::move(info));
85 }
86
87private:
89 std::shared_ptr<source_t> m_data_receiver;
90
92 std::shared_ptr<sink_t> m_data_sender;
93
94 std::atomic<uint64_t> m_packets{0};
95 std::atomic<uint64_t> m_sum_packets{0};
96 std::atomic<uint64_t> m_dropped_packets{0};
97};
98
99} // namespace dunedaq::datahandlinglibs
100
101#endif // DATAHANDLINGLIBS_SRC_SOURCEMODEL_HPP_
#define ERS_HERE
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
const std::vector< const dunedaq::confmodel::Connection * > & get_outputs() const
Get "outputs" relationship value. Output connections from this module.
void init(const confmodel::DaqModule *cfg) override
static constexpr timeout_t s_no_block
Definition Sender.hpp:26
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
static std::shared_ptr< iomanager::SenderConcept< Datatype > > get_iom_sender(iomanager::ConnectionId const &id)
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)