Line data Source code
1 : /**
2 : * @file Subscriber_test.cxx Subscriber class Unit Tests
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "CallbackAdapter.hpp"
10 : #include "ipm/Subscriber.hpp"
11 :
12 : #define BOOST_TEST_MODULE Subscriber_test // NOLINT
13 :
14 : #include "boost/test/unit_test.hpp"
15 :
16 : #include <set>
17 : #include <string>
18 : #include <vector>
19 :
20 : using namespace dunedaq::ipm;
21 :
22 : BOOST_AUTO_TEST_SUITE(Subscriber_test)
23 :
24 : namespace {
25 :
26 : class SubscriberImpl : public Subscriber
27 : {
28 :
29 : public:
30 : static const message_size_t s_bytes_on_each_receive = 10;
31 :
32 2 : SubscriberImpl()
33 4 : : m_can_receive(false)
34 2 : , m_subscriptions()
35 : {
36 2 : }
37 :
38 2 : std::string connect_for_receives(const ConnectionInfo& /* connection_info */) override
39 : {
40 2 : m_can_receive = true;
41 2 : m_callback_adapter.set_receiver(this);
42 2 : return "";
43 : }
44 35044 : bool can_receive() const noexcept override { return m_can_receive; }
45 1 : void sabotage_my_receiving_ability()
46 : {
47 1 : unregister_callback();
48 1 : m_can_receive = false;
49 1 : }
50 :
51 1 : void register_callback(std::function<void(Response&)> callback) override
52 : {
53 1 : m_callback_adapter.set_callback(callback);
54 1 : }
55 2 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
56 :
57 1 : void subscribe(std::string const& topic) override { m_subscriptions.insert(topic); }
58 1 : void unsubscribe(std::string const& topic) override { m_subscriptions.erase(topic); }
59 :
60 2 : std::set<std::string> get_subscriptions() const { return m_subscriptions; }
61 :
62 0 : bool data_pending() override { return true; }
63 :
64 : protected:
65 35039 : Receiver::Response receive_(const duration_t& /* timeout */, bool /*no_tmoexcept_mode*/) override
66 : {
67 35039 : Receiver::Response output;
68 35039 : output.data = std::vector<char>(s_bytes_on_each_receive, 'A');
69 35039 : output.metadata = "TEST";
70 :
71 35039 : return output;
72 0 : }
73 :
74 : private:
75 : bool m_can_receive;
76 : std::set<std::string> m_subscriptions;
77 : CallbackAdapter m_callback_adapter;
78 : };
79 :
80 : } // namespace ""
81 :
82 2 : BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics)
83 : {
84 1 : BOOST_REQUIRE(!std::is_copy_constructible_v<SubscriberImpl>);
85 1 : BOOST_REQUIRE(!std::is_copy_assignable_v<SubscriberImpl>);
86 1 : BOOST_REQUIRE(!std::is_move_constructible_v<SubscriberImpl>);
87 1 : BOOST_REQUIRE(!std::is_move_assignable_v<SubscriberImpl>);
88 1 : }
89 :
90 2 : BOOST_AUTO_TEST_CASE(StatusChecks)
91 : {
92 1 : SubscriberImpl the_subscriber;
93 :
94 1 : BOOST_REQUIRE(!the_subscriber.can_receive());
95 :
96 1 : Receiver::ConnectionInfo ci;
97 1 : the_subscriber.connect_for_receives(ci);
98 1 : BOOST_REQUIRE(the_subscriber.can_receive());
99 :
100 1 : the_subscriber.subscribe("TEST");
101 1 : auto subs = the_subscriber.get_subscriptions();
102 1 : BOOST_REQUIRE_EQUAL(subs.size(), 1);
103 1 : BOOST_REQUIRE_EQUAL(subs.count("TEST"), 1);
104 :
105 1 : the_subscriber.unsubscribe("TEST");
106 1 : subs = the_subscriber.get_subscriptions();
107 1 : BOOST_REQUIRE_EQUAL(subs.size(), 0);
108 :
109 1 : BOOST_REQUIRE_NO_THROW(the_subscriber.receive(Subscriber::s_no_block));
110 1 : BOOST_REQUIRE_NO_THROW(the_subscriber.receive(Subscriber::s_no_block, SubscriberImpl::s_bytes_on_each_receive));
111 :
112 1 : BOOST_REQUIRE_EXCEPTION(the_subscriber.receive(Subscriber::s_no_block, SubscriberImpl::s_bytes_on_each_receive - 1),
113 : dunedaq::ipm::UnexpectedNumberOfBytes,
114 : [&](dunedaq::ipm::UnexpectedNumberOfBytes) { return true; });
115 :
116 1 : the_subscriber.sabotage_my_receiving_ability();
117 1 : BOOST_REQUIRE(!the_subscriber.can_receive());
118 :
119 1 : BOOST_REQUIRE_EXCEPTION(the_subscriber.receive(Subscriber::s_no_block),
120 : dunedaq::ipm::KnownStateForbidsReceive,
121 : [&](dunedaq::ipm::KnownStateForbidsReceive) { return true; });
122 1 : }
123 :
124 2 : BOOST_AUTO_TEST_CASE(Callback)
125 : {
126 1 : SubscriberImpl the_subscriber;
127 :
128 1 : Receiver::ConnectionInfo ci;
129 1 : the_subscriber.connect_for_receives(ci);
130 1 : BOOST_REQUIRE(the_subscriber.can_receive());
131 :
132 1 : std::atomic<size_t> callback_call_count = 0;
133 35036 : auto callback_fun = [&](Receiver::Response&) { callback_call_count++; }; // NOLINT
134 :
135 1 : the_subscriber.register_callback(callback_fun);
136 1 : usleep(10000);
137 1 : the_subscriber.unregister_callback();
138 :
139 1 : BOOST_REQUIRE_GT(callback_call_count, 0);
140 1 : }
141 :
142 : BOOST_AUTO_TEST_SUITE_END()
|