DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
MonitorableObject.cpp
Go to the documentation of this file.
1
11#include <opmonlib/Utils.hpp>
12#include "logging/Logging.hpp"
13
14#include <google/protobuf/util/time_util.h>
15
16#include <chrono>
17
18
22#define TRACE_NAME "MonitorableObject" // NOLINT
23enum {
26};
27
28using namespace dunedaq::opmonlib;
29
30std::shared_ptr<OpMonFacility> MonitorableObject::s_default_facility = std::make_shared<NullOpMonFacility>();
31
33
34 std::lock_guard<std::mutex> lock(m_node_mutex);
35
36 // check if the name is already present to ensure uniqueness
37 auto it = m_nodes.find(name) ;
38 if ( it != m_nodes.end() ) {
39 // This not desired because names are suppposed to be unique
40 // But if the pointer is expired, there is no harm in override it
41 if ( it -> second.expired() ) {
42 ers::warning(NonUniqueNodeName(ERS_HERE, name, to_string(get_opmon_id())));
43 }
44 else {
45 throw NonUniqueNodeName(ERS_HERE, name, to_string(get_opmon_id()));
46 }
47 }
48
49 m_nodes[name] = p;
50
51 p -> m_opmon_name = name;
52 p -> inherit_parent_properties( *this );
53
54 TLOG() << "Node " << name << " registered to " << to_string(get_opmon_id()) ;
55}
56
57
58void MonitorableObject::publish( google::protobuf::Message && m,
59 CustomOrigin && co,
60 OpMonLevel l ) const noexcept {
61
62 auto timestamp = google::protobuf::util::TimeUtil::GetCurrentTime();
63
64 auto start_time = std::chrono::high_resolution_clock::now();
65
66 if ( ! MonitorableObject::publishable_metric( l, get_opmon_level() ) ) {
67 TLOG_DEBUG(TLVL_LEVEL_SUPPRESSION) << "Metric " << m.GetTypeName() << " ignored because of the level";
68 ++m_ignored_counter;
69 return;
70 }
71
72 auto e = to_entry( m, co );
73
74 if ( e.data().empty() ) {
75 ers::warning( EntryWithNoData(ERS_HERE, e.measurement() ) );
76 return ;
77 }
78
79 *e.mutable_origin() = get_opmon_id() ;
80
81 *e.mutable_time() = timestamp;
82
83 // this pointer is always garanteed to be filled, even if with a null Facility.
84 // But the facility can fail
85 try {
86 m_facility.load()->publish(std::move(e));
87 ++m_published_counter;
88 } catch ( const OpMonPublishFailure & e ) {
89 ers::error(e);
90 ++m_error_counter;
91 }
92
93 auto stop_time = std::chrono::high_resolution_clock::now();
94
95 auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time - start_time );
96 m_cpu_us_counter += duration.count();
97
98}
99
100
102
103 auto start_time = std::chrono::high_resolution_clock::now();
104
105 TLOG_DEBUG(TLVL_MONITORING_STEPS) << "Collecting data from " << to_string(get_opmon_id());
106
108
109 info.set_n_invalid_links(0);
110
111 try {
113 } catch ( const ers::Issue & i ) {
115 auto cause_ptr = i.cause();
116 while ( cause_ptr ) {
118 cause_ptr = cause_ptr->cause();
119 }
120 ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id()), i) );
121 } catch ( const std::exception & e ) {
123 ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id()), e) );
124 } catch (...) {
126 ers::error( ErrorWhileCollecting(ERS_HERE, to_string(get_opmon_id())) );
127 }
128
129 info.set_n_published_measurements( m_published_counter.exchange(0) );
130 info.set_n_ignored_measurements( m_ignored_counter.exchange(0) );
131 info.set_n_errors( m_error_counter.exchange(0) );
132 if (info.n_published_measurements() > 0) {
133 info.set_n_publishing_nodes(1);
134 }
135 info.set_cpu_elapsed_time_us( m_cpu_us_counter.exchange(0) );
136
137
138 std::lock_guard<std::mutex> lock(m_node_mutex);
139
140 info.set_n_registered_nodes( m_nodes.size() );
141
142 unsigned int n_invalid_links = 0;
143
144 for ( auto it = m_nodes.begin(); it != m_nodes.end(); ) {
145
146 auto ptr = it->second.lock();
147
148 if( ptr ) {
149 auto child_info = ptr->collect(); // MR: can we make this an async? There is no point to wait all done here
150 info.set_n_registered_nodes( info.n_registered_nodes() + child_info.n_registered_nodes() );
151 info.set_n_publishing_nodes( info.n_publishing_nodes() + child_info.n_publishing_nodes() );
152 info.set_n_invalid_links( info.n_invalid_links() + child_info.n_invalid_links() );
153 info.set_n_published_measurements( info.n_published_measurements() + child_info.n_published_measurements() );
154 info.set_n_ignored_measurements( info.n_ignored_measurements() + child_info.n_ignored_measurements() );
155 info.set_n_errors( info.n_errors() + child_info.n_errors() );
156 info.set_cpu_elapsed_time_us( info.cpu_elapsed_time_us() + child_info.cpu_elapsed_time_us() );
157 }
158
159 // prune the dead links
160 if ( it->second.expired() ) {
161 it = m_nodes.erase(it);
162 ++n_invalid_links;
163 } else {
164 ++it;
165 }
166 }
167
168 info.set_n_invalid_links( info.n_invalid_links() + n_invalid_links );
169
170
171 auto stop_time = std::chrono::high_resolution_clock::now();
172
173 auto duration = std::chrono::duration_cast<std::chrono::microseconds>( stop_time - start_time );
174 info.set_clockwall_elapsed_time_us( duration.count() );
175
176 return info;
177}
178
179
181
182 m_opmon_level = l;
183
184 std::lock_guard<std::mutex> lock(m_node_mutex);
185 for ( const auto & [key,wp] : m_nodes ) {
186 auto p = wp.lock();
187 if (p) {
188 p->set_opmon_level(l);
189 }
190 }
191}
192
194
195 m_facility.store(parent.m_facility);
196 m_parent_id = parent.get_opmon_id();
198
199 std::lock_guard<std::mutex> lock(m_node_mutex);
200
201 for ( const auto & [key,wp] : m_nodes ) {
202
203 auto p = wp.lock();
204 if ( p ) {
205 p->inherit_parent_properties(*this);
206 }
207
208 }
209
210}
211
#define ERS_HERE
@ TLVL_MONITORING_STEPS
@ TLVL_LEVEL_SUPPRESSION
std::shared_ptr< MonitorableObject > NewNodePtr
void inherit_parent_properties(const MonitorableObject &parent)
std::atomic< metric_counter_t > m_published_counter
std::atomic< metric_counter_t > m_ignored_counter
opmon::MonitoringTreeInfo collect() noexcept
std::atomic< metric_counter_t > m_error_counter
void set_opmon_level(OpMonLevel) noexcept
std::atomic< facility_ptr_t > m_facility
void register_node(ElementId name, NewNodePtr)
void publish(google::protobuf::Message &&, CustomOrigin &&co={}, OpMonLevel l=to_level(EntryOpMonLevel::kDefault)) const noexcept
static bool publishable_metric(OpMonLevel entry, OpMonLevel system) noexcept
std::atomic< time_counter_t > m_cpu_us_counter
std::map< ElementId, NodePtr > m_nodes
Base class for any user define issue.
Definition Issue.hpp:69
const Issue * cause() const
return the cause Issue of this Issue
Definition Issue.hpp:97
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
#define TLOG(...)
Definition macro.hpp:22
dunedaq::opmon::OpMonEntry to_entry(const google::protobuf::Message &m, const CustomOrigin &co)
Definition Utils.cpp:20
std::invoke_result< decltype(&dunedaq::confmodel::OpMonConf::get_level), dunedaq::confmodel::OpMonConf >::type OpMonLevel
std::string to_string(const dunedaq::opmon::OpMonId &)
Definition Utils.cpp:167
std::map< std::string, std::string > CustomOrigin
Definition Utils.hpp:53
Cannot add TPSet with start_time
void warning(const Issue &issue)
Definition ers.hpp:115
void error(const Issue &issue)
Definition ers.hpp:81