Line data Source code
1 : /**
2 : * @file Subscriber_test.cxx Subscriber class Unit Tests
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "CallbackAdapter.hpp"
10 : #include "ipm/Subscriber.hpp"
11 :
12 : #define BOOST_TEST_MODULE Subscriber_test // NOLINT
13 :
14 : #include "boost/test/unit_test.hpp"
15 :
16 : #include <set>
17 : #include <string>
18 : #include <vector>
19 :
20 : using namespace dunedaq::ipm;
21 :
22 : BOOST_AUTO_TEST_SUITE(Subscriber_test)
23 :
24 : namespace {
25 :
26 : class SubscriberImpl : public Subscriber
27 : {
28 :
29 : public:
30 : static const message_size_t s_bytes_on_each_receive = 10;
31 :
32 2 : SubscriberImpl()
33 4 : : m_can_receive(false)
34 2 : , m_subscriptions()
35 : {
36 2 : }
37 :
38 2 : std::string connect_for_receives(const nlohmann::json& /* connection_info */) override
39 : {
40 2 : m_can_receive = true;
41 2 : m_callback_adapter.set_receiver(this);
42 2 : return "";
43 : }
44 28883 : bool can_receive() const noexcept override { return m_can_receive; }
45 1 : void sabotage_my_receiving_ability()
46 : {
47 1 : unregister_callback();
48 1 : m_can_receive = false;
49 1 : }
50 :
51 1 : void register_callback(std::function<void(Response&)> callback) override
52 : {
53 1 : m_callback_adapter.set_callback(callback);
54 1 : }
55 2 : void unregister_callback() override { m_callback_adapter.clear_callback(); }
56 :
57 1 : void subscribe(std::string const& topic) override { m_subscriptions.insert(topic); }
58 1 : void unsubscribe(std::string const& topic) override { m_subscriptions.erase(topic); }
59 :
60 2 : std::set<std::string> get_subscriptions() const { return m_subscriptions; }
61 :
62 : protected:
63 28878 : Receiver::Response receive_(const duration_t& /* timeout */, bool /*no_tmoexcept_mode*/) override
64 : {
65 28878 : Receiver::Response output;
66 28878 : output.data = std::vector<char>(s_bytes_on_each_receive, 'A');
67 28878 : output.metadata = "TEST";
68 :
69 28878 : return output;
70 0 : }
71 :
72 : private:
73 : bool m_can_receive;
74 : std::set<std::string> m_subscriptions;
75 : CallbackAdapter m_callback_adapter;
76 : };
77 :
78 : } // namespace ""
79 :
80 2 : BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics)
81 : {
82 1 : BOOST_REQUIRE(!std::is_copy_constructible_v<SubscriberImpl>);
83 1 : BOOST_REQUIRE(!std::is_copy_assignable_v<SubscriberImpl>);
84 1 : BOOST_REQUIRE(!std::is_move_constructible_v<SubscriberImpl>);
85 1 : BOOST_REQUIRE(!std::is_move_assignable_v<SubscriberImpl>);
86 1 : }
87 :
88 2 : BOOST_AUTO_TEST_CASE(StatusChecks)
89 : {
90 1 : SubscriberImpl the_subscriber;
91 :
92 1 : BOOST_REQUIRE(!the_subscriber.can_receive());
93 :
94 1 : nlohmann::json j;
95 1 : the_subscriber.connect_for_receives(j);
96 1 : BOOST_REQUIRE(the_subscriber.can_receive());
97 :
98 1 : the_subscriber.subscribe("TEST");
99 1 : auto subs = the_subscriber.get_subscriptions();
100 1 : BOOST_REQUIRE_EQUAL(subs.size(), 1);
101 1 : BOOST_REQUIRE_EQUAL(subs.count("TEST"), 1);
102 :
103 1 : the_subscriber.unsubscribe("TEST");
104 1 : subs = the_subscriber.get_subscriptions();
105 1 : BOOST_REQUIRE_EQUAL(subs.size(), 0);
106 :
107 1 : BOOST_REQUIRE_NO_THROW(the_subscriber.receive(Subscriber::s_no_block));
108 1 : BOOST_REQUIRE_NO_THROW(the_subscriber.receive(Subscriber::s_no_block, SubscriberImpl::s_bytes_on_each_receive));
109 :
110 3 : BOOST_REQUIRE_EXCEPTION(the_subscriber.receive(Subscriber::s_no_block, SubscriberImpl::s_bytes_on_each_receive - 1),
111 : dunedaq::ipm::UnexpectedNumberOfBytes,
112 : [&](dunedaq::ipm::UnexpectedNumberOfBytes) { return true; });
113 :
114 1 : the_subscriber.sabotage_my_receiving_ability();
115 1 : BOOST_REQUIRE(!the_subscriber.can_receive());
116 :
117 3 : BOOST_REQUIRE_EXCEPTION(the_subscriber.receive(Subscriber::s_no_block),
118 : dunedaq::ipm::KnownStateForbidsReceive,
119 : [&](dunedaq::ipm::KnownStateForbidsReceive) { return true; });
120 1 : }
121 :
122 2 : BOOST_AUTO_TEST_CASE(Callback)
123 : {
124 1 : SubscriberImpl the_subscriber;
125 :
126 1 : nlohmann::json j;
127 1 : the_subscriber.connect_for_receives(j);
128 1 : BOOST_REQUIRE(the_subscriber.can_receive());
129 :
130 1 : std::atomic<size_t> callback_call_count = 0;
131 28875 : auto callback_fun = [&](Receiver::Response&) { callback_call_count++; }; // NOLINT
132 :
133 1 : the_subscriber.register_callback(callback_fun);
134 1 : usleep(10000);
135 1 : the_subscriber.unregister_callback();
136 :
137 1 : BOOST_REQUIRE_GT(callback_call_count, 0);
138 1 : }
139 :
140 : BOOST_AUTO_TEST_SUITE_END()
|