Line data Source code
1 : /**
2 : * @file test_ratelimiter_app.cxx Test application for
3 : * ratelimiter implementation
4 : *
5 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
6 : * Licensing/copyright details are in the COPYING file that you should have
7 : * received with this code.
8 : */
9 : #
10 :
11 : #include <thread>
12 : #include <atomic>
13 : #include <chrono>
14 : #include <iostream>
15 : #include <folly/coro/Baton.h>
16 : #include <folly/coro/Task.h>
17 : #include <folly/futures/Future.h>
18 : #include <folly/coro/CurrentExecutor.h>
19 : #include <folly/coro/Timeout.h>
20 : #include <folly/futures/ThreadWheelTimekeeper.h>
21 : #include <folly/coro/BlockingWait.h>
22 :
23 :
24 : // using namespace dunedaq::datahandlinglibs;
25 : using namespace std::chrono_literals;
26 :
27 : folly::coro::Baton baton{0};
28 : uint32_t max_wait = 500;
29 :
30 : folly::coro::Task<void>
31 0 : postprocess_schedule() {
32 :
33 : folly::ThreadWheelTimekeeper tk;
34 :
35 0 : const auto wait_data = [&baton]() -> folly::coro::Task<void> {
36 : // folly::coro::timeout cancels the task on timeout.
37 : // Baton is not cancellable, so we attach a callback to resume the coroutine.
38 : auto token = co_await folly::coro::co_current_cancellation_token;
39 0 : folly::CancellationCallback cb(token, [&baton] { baton.post(); });
40 : co_await baton; // Wait data
41 0 : };
42 :
43 :
44 : uint64_t n_timeouts = 0;
45 : uint64_t n_process = 0;
46 :
47 : while(true) {
48 : try {
49 : co_await folly::coro::timeout(
50 : wait_data(),
51 : std::chrono::milliseconds{ max_wait },
52 : &tk);
53 : ++n_process;
54 : } catch (const folly::FutureTimeout&) {
55 : // timeout = true;
56 : std::cout << "Timeout " << ++n_timeouts << std::endl;
57 : }
58 : baton.reset();
59 : }
60 :
61 :
62 : co_return;
63 0 : }
64 :
65 : int
66 0 : main(int /*argc*/, char** /*argv[]*/)
67 : {
68 0 : std::atomic<bool> run_marker;
69 :
70 :
71 :
72 :
73 : // A sleepy worker thread
74 0 : std::jthread sleepy_worker(
75 0 : [&baton](std::stop_token stoken)
76 : {
77 :
78 0 : while(!stoken.stop_requested()) {
79 0 : baton.post();
80 : }
81 : // for (int i = 10; i; --i)
82 : // {
83 : // std::this_thread::sleep_for(300ms);
84 : // if (stoken.stop_requested())
85 : // {
86 : // print("Sleepy worker is requested to stop\n");
87 : // return;
88 : // }
89 : // print("Sleepy worker goes back to sleep\n");
90 : // }
91 0 : });
92 :
93 : // std::cout << "Sleeping for 3s" << std::endl;
94 :
95 : // std::this_thread::sleep_for(3s);
96 :
97 0 : std::cout << "Starting the coroutine" << std::endl;
98 0 : folly::coro::blockingWait(postprocess_schedule());
99 :
100 0 : std::cout << "Requesting stop" << std::endl;
101 : // sleepy_worker.request_stop();
102 0 : std::cout << "Thread stopped" << std::endl;
103 :
104 :
105 0 : return 0;
106 0 : } // NOLINT(readability/fn_size)
|