Line data Source code
1 : /**
2 : * @file TokenManager.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "trigger/TokenManager.hpp"
10 : #include "trigger/LivetimeCounter.hpp"
11 :
12 : #include "iomanager/IOManager.hpp"
13 :
14 : #include <memory>
15 : #include <string>
16 :
17 : namespace dunedaq::trigger {
18 :
19 0 : TokenManager::TokenManager(const std::string& connection_name,
20 : int initial_tokens,
21 : daqdataformats::run_number_t run_number,
22 0 : std::shared_ptr<LivetimeCounter> livetime_counter)
23 0 : : m_connection_name(connection_name)
24 0 : , m_n_tokens(initial_tokens)
25 0 : , m_run_number(run_number)
26 0 : , m_livetime_counter(livetime_counter)
27 0 : , m_token_receiver(nullptr)
28 : {
29 0 : m_open_trigger_time = std::chrono::steady_clock::now();
30 :
31 0 : m_token_receiver = get_iom_receiver<dfmessages::TriggerDecisionToken>(m_connection_name);
32 0 : m_token_receiver->add_callback(std::bind(&TokenManager::receive_token, this, std::placeholders::_1));
33 0 : }
34 :
35 0 : TokenManager::~TokenManager()
36 : {
37 0 : m_token_receiver->remove_callback();
38 :
39 0 : if (!m_open_trigger_decisions.empty()) {
40 :
41 0 : auto now = std::chrono::steady_clock::now();
42 0 : if (std::chrono::duration_cast<std::chrono::milliseconds>(now - m_open_trigger_time) >
43 0 : std::chrono::milliseconds(3000)) {
44 0 : std::ostringstream o;
45 0 : o << "Open Trigger Decisions: [";
46 0 : { // Scope for lock_guard
47 0 : bool first = true;
48 0 : std::lock_guard<std::mutex> lk(m_open_trigger_decisions_mutex);
49 0 : for (auto& td : m_open_trigger_decisions) {
50 0 : if (!first)
51 0 : o << ", ";
52 0 : o << td;
53 0 : first = false;
54 : }
55 0 : o << "]";
56 0 : }
57 0 : TLOG_DEBUG(0) << o.str();
58 0 : }
59 : }
60 0 : }
61 :
62 : int
63 0 : TokenManager::get_n_tokens() const
64 : {
65 0 : return m_n_tokens.load();
66 : }
67 :
68 : void
69 0 : TokenManager::trigger_sent(dfmessages::trigger_number_t trigger_number)
70 : {
71 0 : std::lock_guard<std::mutex> lk(m_open_trigger_decisions_mutex);
72 0 : m_open_trigger_decisions.insert(trigger_number);
73 0 : m_n_tokens--;
74 0 : if (m_n_tokens.load() == 0) {
75 0 : m_livetime_counter->set_state(LivetimeCounter::State::kDead);
76 : }
77 0 : }
78 :
79 : void
80 0 : TokenManager::receive_token(dfmessages::TriggerDecisionToken& token)
81 : {
82 0 : TLOG_DEBUG(1) << "Received token with run number " << token.run_number << ", current run number " << m_run_number;
83 0 : if (token.run_number == m_run_number) {
84 0 : if (m_n_tokens.load() == 0) {
85 0 : m_livetime_counter->set_state(LivetimeCounter::State::kLive);
86 : }
87 0 : m_n_tokens++;
88 0 : TLOG_DEBUG(1) << "There are now " << m_n_tokens.load() << " tokens available";
89 :
90 0 : if (token.trigger_number != dfmessages::TypeDefaults::s_invalid_trigger_number) {
91 0 : if (m_open_trigger_decisions.count(token.trigger_number)) {
92 0 : std::lock_guard<std::mutex> lk(m_open_trigger_decisions_mutex);
93 0 : m_open_trigger_decisions.erase(token.trigger_number);
94 0 : TLOG_DEBUG(1) << "Token indicates that trigger decision " << token.trigger_number
95 0 : << " has been completed. There are now " << m_open_trigger_decisions.size()
96 0 : << " triggers in flight";
97 0 : } else {
98 : // ERS warning: received token for trigger number I don't recognize
99 : }
100 : }
101 : }
102 0 : }
103 :
104 : } // namespace dunedaq::trigger
|