DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
Receiver.hpp
Go to the documentation of this file.
1
23#ifndef IPM_INCLUDE_IPM_RECEIVER_HPP_
24#define IPM_INCLUDE_IPM_RECEIVER_HPP_
25
26#include "cetlib/BasicPluginFactory.h"
27#include "cetlib/compiler_macros.h"
28#include "ers/Issue.hpp"
29#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
31
32#include <atomic>
33#include <memory>
34#include <string>
35#include <vector>
36
37namespace dunedaq {
38// Disable coverage collection LCOV_EXCL_START
39ERS_DECLARE_ISSUE(ipm, KnownStateForbidsReceive, "Receiver not in a state to receive data", )
41 UnexpectedNumberOfBytes,
42 connection_name << ": Expected " << bytes1 << " bytes in message but received " << bytes2,
43 ((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT
46 connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
47 ((std::string)connection_name)((int)timeout)) // NOLINT
48// Reenable coverage collection LCOV_EXCL_STOP
49} // namespace dunedaq
50
51#ifndef EXTERN_C_FUNC_DECLARE_START
52// NOLINTNEXTLINE(build/define_used)
53#define EXTERN_C_FUNC_DECLARE_START \
54 extern "C" \
55 {
56#endif
57
62// NOLINTNEXTLINE
63#define DEFINE_DUNE_IPM_RECEIVER(klass) \
64 EXTERN_C_FUNC_DECLARE_START \
65 std::shared_ptr<dunedaq::ipm::Receiver> make() \
66 { \
67 return std::shared_ptr<dunedaq::ipm::Receiver>(new klass()); \
68 } \
69 }
70
71namespace dunedaq::ipm {
72
74{
75
76public:
78 {
79 std::string connection_name{ "" };
80 std::string connection_string{ "" };
81 std::vector<std::string> connection_strings{};
82 };
83 using duration_t = std::chrono::milliseconds;
84 static constexpr duration_t s_block = duration_t::max();
85 static constexpr duration_t s_no_block = duration_t::zero();
86
87 using message_size_t = int;
88 static constexpr message_size_t s_any_size =
89 0; // Since "I want 0 bytes" is pointless, "0" denotes "I don't care about the size"
90
91 Receiver() = default;
92 virtual ~Receiver() = default;
93
94 virtual std::string connect_for_receives(const ConnectionInfo& connection_info) = 0;
95
96 virtual bool can_receive() const noexcept = 0;
97
98 // receive() will perform some universally-desirable checks before calling user-implemented receive_:
99 // -Throws KnownStateForbidsReceive if can_receive() == false
100 // -Throws UnexpectedNumberOfBytes if the "nbytes" argument isn't anysize, and the
101 // received bytes inside the function aren't the same number as nbytes
102
103 struct Response
104 {
105 std::string metadata{ "" };
106 std::vector<char> data{};
107 };
108
109 Response receive(const duration_t& timeout, message_size_t num_bytes = s_any_size, bool no_tmoexcept_mode = false);
110 virtual bool data_pending() = 0;
111
112 virtual void register_callback(std::function<void(Response&)>) = 0;
113 virtual void unregister_callback() = 0;
114
115 Receiver(const Receiver&) = delete;
116 Receiver& operator=(const Receiver&) = delete;
117
118 Receiver(Receiver&&) = delete;
120
121protected:
123 void generate_opmon_data() override;
124
125 virtual Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) = 0;
126
127private:
128 mutable std::atomic<size_t> m_bytes = { 0 };
129 mutable std::atomic<size_t> m_messages = { 0 };
130};
131
132inline std::shared_ptr<Receiver>
133make_ipm_receiver(std::string const& plugin_name)
134{
135 static cet::BasicPluginFactory bpf("duneIPM", "make");
136 return bpf.makePlugin<std::shared_ptr<Receiver>>(plugin_name);
137}
138
139} // namespace dunedaq::ipm
140
141#endif // IPM_INCLUDE_IPM_RECEIVER_HPP_
ConnectionInfo m_connection_info
Definition Receiver.hpp:122
Receiver(const Receiver &)=delete
virtual void unregister_callback()=0
virtual ~Receiver()=default
std::chrono::milliseconds duration_t
Definition Receiver.hpp:83
Receiver(Receiver &&)=delete
virtual bool data_pending()=0
Receiver & operator=(const Receiver &)=delete
Receiver & operator=(Receiver &&)=delete
virtual Response receive_(const duration_t &timeout, bool no_tmoexcept_mode)=0
virtual bool can_receive() const noexcept=0
virtual void register_callback(std::function< void(Response &)>)=0
virtual std::string connect_for_receives(const ConnectionInfo &connection_info)=0
#define ERS_DECLARE_ISSUE( namespace_name, class_name, message_, attributes)
Definition macro.hpp:65
An ERS Error indicating that an exception was thrown from ZMQ while performing an operation.
std::shared_ptr< Receiver > make_ipm_receiver(std::string const &plugin_name)
Definition Receiver.hpp:133
Including Qt Headers.
ReceiveTimeoutExpired
Definition Receiver.hpp:45
Both frame_count_limit and tp_count_limit were set to(disabled) in the TPCRawDataProcessor config. TPs will not send."