DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
CallbackAdapter.cpp
Go to the documentation of this file.
1
10#include "CallbackAdapter.hpp"
11
12#include "logging/Logging.hpp"
13
14#include <string>
15#include <utility>
16
17namespace dunedaq::ipm {
18
20{
21 {
22 std::lock_guard<std::mutex> lk(m_callback_mutex);
23 m_callback = nullptr;
24 }
25 shutdown();
26 m_receiver_ptr = nullptr;
27}
28
29void
31{
32 {
33 std::lock_guard<std::mutex> lk(m_callback_mutex);
34 m_receiver_ptr = receiver_ptr;
35 }
36
37 if (m_receiver_ptr != nullptr && m_callback != nullptr) {
38 startup();
39 }
40}
41
42void
44{
45 {
46 std::lock_guard<std::mutex> lk(m_callback_mutex);
47 m_callback = callback;
48 }
49
50 if (m_receiver_ptr != nullptr && m_callback != nullptr) {
51 startup();
52 }
53}
54
55void
57{
58 {
59 std::lock_guard<std::mutex> lk(m_callback_mutex);
60 m_callback = nullptr;
61 }
62 shutdown();
63}
64
65void
67{
68 if (m_thread && m_thread->joinable())
69 m_thread->join();
70
71 m_is_listening = false;
72 m_thread.reset(nullptr);
73}
74
75void
77{
78 shutdown();
79 m_is_listening = false;
80 m_thread.reset(new std::thread([&] { thread_loop(); }));
81
82 while (!m_is_listening.load()) {
83 usleep(1000);
84 }
85}
86
87void
89{
90 do {
91 try {
93
94 TLOG_DEBUG(25) << "Received " << response.data.size() << " bytes. Dispatching to callback.";
95 {
96 std::lock_guard<std::mutex> lk(m_callback_mutex);
97 if (m_callback != nullptr) {
98 m_callback(response);
99 }
100 }
101 } catch (ipm::ReceiveTimeoutExpired const& tmo) {
102 usleep(10000);
103 }
104 m_is_listening = true;
105 } while (m_callback != nullptr && m_receiver_ptr != nullptr);
106}
107
108} // namespace dunedaq::ipm
std::function< void(Receiver::Response &)> m_callback
void set_callback(std::function< void(Receiver::Response &)> callback)
virtual ~CallbackAdapter() noexcept
std::unique_ptr< std::thread > m_thread
std::atomic< bool > m_is_listening
void set_receiver(Receiver *receiver_ptr)
static constexpr duration_t s_no_block
Definition Receiver.hpp:77
Response receive(const duration_t &timeout, message_size_t num_bytes=s_any_size, bool no_tmoexcept_mode=false)
Definition Receiver.cpp:13
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
An ERS Error indicating that an exception was thrown from ZMQ while performing an operation.