Line data Source code
1 : /**
2 : *
3 : * @file CallbackAdapter.cpp ipm CallbackAdapter class
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 :
10 : #include "CallbackAdapter.hpp"
11 :
12 : #include "logging/Logging.hpp"
13 :
14 : #include <memory>
15 : #include <string>
16 : #include <utility>
17 :
18 : namespace dunedaq::ipm {
19 :
20 43 : CallbackAdapter::~CallbackAdapter() noexcept
21 : {
22 43 : {
23 43 : std::lock_guard<std::mutex> lk(m_callback_mutex);
24 43 : m_callback = nullptr;
25 43 : }
26 43 : shutdown();
27 43 : m_receiver_ptr = nullptr;
28 43 : }
29 :
30 : void
31 40 : CallbackAdapter::set_receiver(Receiver* receiver_ptr)
32 : {
33 40 : {
34 40 : std::lock_guard<std::mutex> lk(m_callback_mutex);
35 40 : m_receiver_ptr = receiver_ptr;
36 40 : }
37 :
38 40 : if (m_receiver_ptr != nullptr && m_callback != nullptr) {
39 0 : startup();
40 : }
41 40 : }
42 :
43 : void
44 4 : CallbackAdapter::set_callback(std::function<void(Receiver::Response&)> callback)
45 : {
46 4 : {
47 4 : std::lock_guard<std::mutex> lk(m_callback_mutex);
48 4 : m_callback = callback;
49 4 : }
50 :
51 4 : if (m_receiver_ptr != nullptr && m_callback != nullptr) {
52 4 : startup();
53 : }
54 4 : }
55 :
56 : void
57 45 : CallbackAdapter::clear_callback()
58 : {
59 45 : {
60 45 : std::lock_guard<std::mutex> lk(m_callback_mutex);
61 45 : m_callback = nullptr;
62 45 : }
63 45 : shutdown();
64 45 : }
65 :
66 : void
67 92 : CallbackAdapter::shutdown()
68 : {
69 92 : if (m_thread && m_thread->joinable())
70 4 : m_thread->join();
71 :
72 92 : m_is_listening = false;
73 92 : m_thread.reset(nullptr);
74 92 : }
75 :
76 : void
77 4 : CallbackAdapter::startup()
78 : {
79 4 : shutdown();
80 4 : m_is_listening = false;
81 8 : m_thread = std::make_unique<std::thread>([&] { thread_loop(); });
82 :
83 28 : while (!m_is_listening.load()) {
84 24 : usleep(1000);
85 : }
86 4 : }
87 :
88 : void
89 4 : CallbackAdapter::thread_loop()
90 : {
91 58681 : do {
92 58681 : try {
93 58681 : auto response = m_receiver_ptr->receive(Receiver::s_no_block);
94 :
95 58656 : TLOG_DEBUG(45) << "Received " << response.data.size() << " bytes. Dispatching to callback.";
96 58656 : {
97 58656 : std::lock_guard<std::mutex> lk(m_callback_mutex);
98 58656 : if (m_callback != nullptr) {
99 58654 : m_callback(response);
100 : }
101 58656 : }
102 58681 : } catch (ipm::ReceiveTimeoutExpired const& tmo) {
103 25 : usleep(10000);
104 25 : }
105 58681 : m_is_listening = true;
106 58685 : } while (m_callback != nullptr && m_receiver_ptr != nullptr);
107 4 : }
108 :
109 : } // namespace dunedaq::ipm
|