DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
CallbackAdapter.cpp
Go to the documentation of this file.
1
10#include "CallbackAdapter.hpp"
11
12#include "logging/Logging.hpp"
13
14#include <memory>
15#include <string>
16#include <utility>
17
18namespace dunedaq::ipm {
19
21{
22 {
23 std::lock_guard<std::mutex> lk(m_callback_mutex);
24 m_callback = nullptr;
25 }
26 shutdown();
27 m_receiver_ptr = nullptr;
28}
29
30void
32{
33 {
34 std::lock_guard<std::mutex> lk(m_callback_mutex);
35 m_receiver_ptr = receiver_ptr;
36 }
37
38 if (m_receiver_ptr != nullptr && m_callback != nullptr) {
39 startup();
40 }
41}
42
43void
45{
46 {
47 std::lock_guard<std::mutex> lk(m_callback_mutex);
48 m_callback = callback;
49 }
50
51 if (m_receiver_ptr != nullptr && m_callback != nullptr) {
52 startup();
53 }
54}
55
56void
58{
59 {
60 std::lock_guard<std::mutex> lk(m_callback_mutex);
61 m_callback = nullptr;
62 }
63 shutdown();
64}
65
66void
68{
69 if (m_thread && m_thread->joinable())
70 m_thread->join();
71
72 m_is_listening = false;
73 m_thread.reset(nullptr);
74}
75
76void
78{
79 shutdown();
80 m_is_listening = false;
81 m_thread = std::make_unique<std::thread>([&] { thread_loop(); });
82
83 while (!m_is_listening.load()) {
84 usleep(1000);
85 }
86}
87
88void
90{
91 do {
92 try {
94
95 TLOG_DEBUG(45) << "Received " << response.data.size() << " bytes. Dispatching to callback.";
96 {
97 std::lock_guard<std::mutex> lk(m_callback_mutex);
98 if (m_callback != nullptr) {
99 m_callback(response);
100 }
101 }
102 } catch (ipm::ReceiveTimeoutExpired const& tmo) {
103 usleep(10000);
104 }
105 m_is_listening = true;
106 } while (m_callback != nullptr && m_receiver_ptr != nullptr);
107}
108
109} // namespace dunedaq::ipm
std::function< void(Receiver::Response &)> m_callback
void set_callback(std::function< void(Receiver::Response &)> callback)
virtual ~CallbackAdapter() noexcept
std::unique_ptr< std::thread > m_thread
std::atomic< bool > m_is_listening
void set_receiver(Receiver *receiver_ptr)
static constexpr duration_t s_no_block
Definition Receiver.hpp:80
Response receive(const duration_t &timeout, message_size_t num_bytes=s_any_size, bool no_tmoexcept_mode=false)
Definition Receiver.cpp:15
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
An ERS Error indicating that an exception was thrown from ZMQ while performing an operation.