Line data Source code
1 : /**
2 : * @file MonitorableObject.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include <NullOpMonFacility.hpp>
10 : #include <opmonlib/MonitorableObject.hpp>
11 : #include <opmonlib/Utils.hpp>
12 : #include "logging/Logging.hpp"
13 :
14 : #include <google/protobuf/util/time_util.h>
15 :
16 : #include <chrono>
17 :
18 :
19 : /**
20 : * @brief Name used by TRACE TLOG calls from this source file
21 : */
22 : #define TRACE_NAME "MonitorableObject" // NOLINT
23 : enum {
24 : TLVL_MONITORING_STEPS = 10,
25 : TLVL_LEVEL_SUPPRESSION = 20,
26 : };
27 :
28 : using namespace dunedaq::opmonlib;
29 :
30 : std::shared_ptr<OpMonFacility> MonitorableObject::s_default_facility = std::make_shared<NullOpMonFacility>();
31 :
32 307 : void MonitorableObject::register_node( ElementId name, NewNodePtr p ) {
33 :
34 307 : std::lock_guard<std::mutex> lock(m_node_mutex);
35 :
36 : // check if the name is already present to ensure uniqueness
37 307 : auto it = m_nodes.find(name) ;
38 307 : if ( it != m_nodes.end() ) {
39 : // This not desired because names are suppposed to be unique
40 : // But if the pointer is expired, there is no harm in override it
41 2 : if ( it -> second.expired() ) {
42 0 : ers::warning(NonUniqueNodeName(ERS_HERE, name, to_string(get_opmon_id())));
43 : }
44 : else {
45 2 : throw NonUniqueNodeName(ERS_HERE, name, to_string(get_opmon_id()));
46 : }
47 : }
48 :
49 305 : m_nodes[name] = p;
50 :
51 305 : p -> m_opmon_name = name;
52 305 : p -> inherit_parent_properties( *this );
53 :
54 610 : TLOG() << "Node " << name << " registered to " << to_string(get_opmon_id()) ;
55 307 : }
56 :
57 :
58 115 : void MonitorableObject::publish( google::protobuf::Message && m,
59 : CustomOrigin && co,
60 : OpMonLevel l ) const noexcept {
61 :
62 115 : auto timestamp = google::protobuf::util::TimeUtil::GetCurrentTime();
63 :
64 115 : auto start_time = std::chrono::high_resolution_clock::now();
65 :
66 115 : if ( ! MonitorableObject::publishable_metric( l, get_opmon_level() ) ) {
67 1 : TLOG_DEBUG(TLVL_LEVEL_SUPPRESSION) << "Metric " << m.GetTypeName() << " ignored because of the level";
68 1 : ++m_ignored_counter;
69 1 : return;
70 : }
71 :
72 114 : auto e = to_entry( m, co );
73 :
74 114 : if ( e.data().empty() ) {
75 0 : ers::warning( EntryWithNoData(ERS_HERE, e.measurement() ) );
76 0 : return ;
77 : }
78 :
79 114 : *e.mutable_origin() = get_opmon_id() ;
80 :
81 114 : *e.mutable_time() = timestamp;
82 :
83 : // this pointer is always garanteed to be filled, even if with a null Facility.
84 : // But the facility can fail
85 114 : try {
86 114 : m_facility.load()->publish(std::move(e));
87 114 : ++m_published_counter;
88 0 : } catch ( const OpMonPublishFailure & e ) {
89 0 : ers::error(e);
90 0 : ++m_error_counter;
91 0 : }
92 :
93 114 : auto stop_time = std::chrono::high_resolution_clock::now();
94 :
95 114 : auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time - start_time );
96 114 : m_cpu_us_counter += duration.count();
97 :
98 115 : }
99 :
100 :
101 180 : opmon::MonitoringTreeInfo MonitorableObject::collect() noexcept {
102 :
103 180 : auto start_time = std::chrono::high_resolution_clock::now();
104 :
105 180 : TLOG_DEBUG(TLVL_MONITORING_STEPS) << "Collecting data from " << to_string(get_opmon_id());
106 :
107 180 : opmon::MonitoringTreeInfo info;
108 :
109 180 : info.set_n_invalid_links(0);
110 :
111 180 : try {
112 180 : generate_opmon_data();
113 0 : } catch ( const ers::Issue & i ) {
114 0 : ++m_error_counter;
115 0 : auto cause_ptr = i.cause();
116 0 : while ( cause_ptr ) {
117 0 : ++m_error_counter;
118 0 : cause_ptr = cause_ptr->cause();
119 : }
120 0 : ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id()), i) );
121 0 : } catch ( const std::exception & e ) {
122 0 : ++m_error_counter;
123 0 : ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id()), e) );
124 0 : } catch (...) {
125 0 : ++m_error_counter;
126 0 : ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id())) );
127 0 : }
128 :
129 180 : info.set_n_published_measurements( m_published_counter.exchange(0) );
130 180 : info.set_n_ignored_measurements( m_ignored_counter.exchange(0) );
131 180 : info.set_n_errors( m_error_counter.exchange(0) );
132 180 : if (info.n_published_measurements() > 0) {
133 98 : info.set_n_publishing_nodes(1);
134 : }
135 180 : info.set_cpu_elapsed_time_us( m_cpu_us_counter.exchange(0) );
136 :
137 :
138 180 : std::lock_guard<std::mutex> lock(m_node_mutex);
139 :
140 180 : info.set_n_registered_nodes( m_nodes.size() );
141 :
142 180 : unsigned int n_invalid_links = 0;
143 :
144 339 : for ( auto it = m_nodes.begin(); it != m_nodes.end(); ) {
145 :
146 159 : auto ptr = it->second.lock();
147 :
148 159 : if( ptr ) {
149 159 : auto child_info = ptr->collect(); // MR: can we make this an async? There is no point to wait all done here
150 159 : info.set_n_registered_nodes( info.n_registered_nodes() + child_info.n_registered_nodes() );
151 159 : info.set_n_publishing_nodes( info.n_publishing_nodes() + child_info.n_publishing_nodes() );
152 159 : info.set_n_invalid_links( info.n_invalid_links() + child_info.n_invalid_links() );
153 159 : info.set_n_published_measurements( info.n_published_measurements() + child_info.n_published_measurements() );
154 159 : info.set_n_ignored_measurements( info.n_ignored_measurements() + child_info.n_ignored_measurements() );
155 159 : info.set_n_errors( info.n_errors() + child_info.n_errors() );
156 159 : info.set_cpu_elapsed_time_us( info.cpu_elapsed_time_us() + child_info.cpu_elapsed_time_us() );
157 159 : }
158 :
159 : // prune the dead links
160 159 : if ( it->second.expired() ) {
161 0 : it = m_nodes.erase(it);
162 0 : ++n_invalid_links;
163 : } else {
164 159 : ++it;
165 : }
166 159 : }
167 :
168 180 : info.set_n_invalid_links( info.n_invalid_links() + n_invalid_links );
169 :
170 :
171 180 : auto stop_time = std::chrono::high_resolution_clock::now();
172 :
173 180 : auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time - start_time );
174 180 : info.set_clockwall_elapsed_time_us( duration.count() );
175 :
176 180 : return info;
177 180 : }
178 :
179 :
180 14 : void MonitorableObject::set_opmon_level( OpMonLevel l ) noexcept {
181 :
182 14 : m_opmon_level = l;
183 :
184 14 : std::lock_guard<std::mutex> lock(m_node_mutex);
185 17 : for ( const auto & [key,wp] : m_nodes ) {
186 3 : auto p = wp.lock();
187 3 : if (p) {
188 3 : p->set_opmon_level(l);
189 : }
190 3 : }
191 14 : }
192 :
193 305 : void MonitorableObject::inherit_parent_properties( const MonitorableObject & parent ) {
194 :
195 305 : m_facility.store(parent.m_facility);
196 305 : m_parent_id = parent.get_opmon_id();
197 305 : m_opmon_level = parent.get_opmon_level();
198 :
199 305 : std::lock_guard<std::mutex> lock(m_node_mutex);
200 :
201 305 : for ( const auto & [key,wp] : m_nodes ) {
202 :
203 0 : auto p = wp.lock();
204 0 : if ( p ) {
205 0 : p->inherit_parent_properties(*this);
206 : }
207 :
208 0 : }
209 :
210 305 : }
211 :
|