DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
StreamManager.cpp
Go to the documentation of this file.
1/*
2 * StreamManager.cxx
3 * ERS
4 *
5 * Created by Matthias Wiesmann on 21.01.05.
6 * Modified by Serguei Kolos on 12.09.06.
7 * Copyright 2005 CERN. All rights reserved.
8 *
9 */
10
11#include <assert.h>
12#include <iostream>
13
14#include <ers/Issue.hpp>
15#include <ers/InputStream.hpp>
16#include <ers/OutputStream.hpp>
17#include <ers/StreamManager.hpp>
18#include <ers/StreamFactory.hpp>
19#include <ers/Severity.hpp>
20#include <ers/Configuration.hpp>
21#include <ers/ers.hpp>
23#include <ers/internal/Util.hpp>
27
29 BadConfiguration,
30 "The stream configuration string \"" << config << "\" has syntax errors.",
31 ((std::string)config) )
32
33namespace
34{
38 const char SEPARATOR = ',';
39
40 const char * const DefaultOutputStreams[] =
41 {
42 "lstdout", // Debug
43 "lstdout", // Log
44 "throttle,lstdout", // Information
45 "throttle,lstderr", // Warning
46 "throttle,lstderr", // Error
47 "lstderr" // Fatal
48 };
49
50 const char *
51 get_stream_description( ers::severity severity )
52 {
53 assert( ers::Debug <= severity && severity <= ers::Fatal );
54
55 std::string env_name( "DUNEDAQ_ERS_" );
56 env_name += ers::to_string( severity );
57 const char * env = ::getenv( env_name.c_str() );
58 return env ? env : DefaultOutputStreams[severity];
59 }
60
61 void
62 parse_stream_definition( const std::string & text,
63 std::vector<std::string> & result )
64 {
65 std::string::size_type start_p = 0, end_p = 0;
66 short brackets_open = 0;
67 while ( end_p < text.length() )
68 {
69 switch ( text[end_p] )
70 {
71 case '(':
72 ++brackets_open;
73 break;
74 case ')':
75 --brackets_open;
76 break;
77 case SEPARATOR:
78 if ( !brackets_open )
79 {
80 result.push_back( text.substr( start_p, end_p - start_p ) );
81 start_p = end_p + 1;
82 }
83 break;
84 default:
85 break;
86 }
87 end_p++;
88 }
89 if ( brackets_open )
90 {
91 throw ers::BadConfiguration( ERS_HERE, text );
92 }
93 if ( start_p != end_p )
94 {
95 result.push_back( text.substr( start_p, end_p - start_p ) );
96 }
97 }
98}
99
100namespace ers
101{
102 // Performs lazy srteam initialization. Stream instances are created
103 // at the first attempt of writing to the stream
105 {
106 public:
108 : m_manager( manager ),
109 m_in_progress( false )
110 { ; }
111
112 void write( const Issue & issue )
113 {
114 ers::severity s = issue.severity();
115 std::scoped_lock lock( m_mutex );
116
117 if ( !m_in_progress ) {
118 m_in_progress = true;
119 }
120 else {
121 // The issue is coming from the stream constructor
122 // We can't use ERS streams, so print it to std
123 if ( s < ers::Warning )
124 std::cout << issue << std::endl;
125 else
126 std::cerr << issue << std::endl;
127 return ;
128 }
129
130 if ( m_manager.m_out_streams[s].get() == this ) {
132 std::shared_ptr<OutputStream>( m_manager.setup_stream( s ) );
133 }
134 m_manager.report_issue( s, issue );
135 m_in_progress = false;
136 }
137
138 private:
139 std::recursive_mutex m_mutex;
142 };
143
144}
145
159
164{
165 for( short ss = ers::Debug; ss <= ers::Fatal; ++ss )
166 {
167 m_init_streams[ss] = std::make_shared<StreamInitializer>( *this );
169 }
170}
171
176
177void
179{
180 std::shared_ptr<OutputStream> head = m_out_streams[severity];
181 if ( head && !head->isNull() )
182 {
183 OutputStream * parent = head.get();
184 for ( OutputStream * stream = parent; !stream->isNull(); parent = stream,
185 stream = &parent->chained() )
186 ;
187
188 parent->chained( new_stream );
189 }
190 else
191 {
192 m_out_streams[severity] = std::shared_ptr<OutputStream>( new_stream );
193 }
194}
195
196void
197ers::StreamManager::add_receiver( const std::string & stream,
198 const std::string & filter,
199 ers::IssueReceiver * receiver )
200{
201 InputStream * in = ers::StreamFactory::instance().create_in_stream( stream, filter );
202 in->set_receiver( receiver );
203
204 std::scoped_lock lock( m_mutex );
205 m_in_streams.push_back( std::shared_ptr<InputStream>( in ) );
206}
207
208void
209ers::StreamManager::add_receiver( const std::string & stream,
210 const std::initializer_list<std::string> & params,
211 ers::IssueReceiver * receiver )
212{
213 InputStream * in = ers::StreamFactory::instance().create_in_stream( stream, params );
214 in->set_receiver( receiver );
215
216 std::scoped_lock lock( m_mutex );
217 m_in_streams.push_back( std::shared_ptr<InputStream>( in ) );
218}
219
220void
222{
223 std::scoped_lock lock( m_mutex );
224 for( std::list<std::shared_ptr<InputStream> >::iterator it = m_in_streams.begin();
225 it != m_in_streams.end(); )
226 {
227 if ( (*it) -> m_receiver == receiver )
228 m_in_streams.erase( it++ );
229 else
230 ++it;
231 }
232}
233
236{
237 std::string config = get_stream_description( severity );
238 std::vector<std::string> streams;
239 try
240 {
241 parse_stream_definition( config, streams );
242 }
243 catch ( ers::BadConfiguration & ex )
244 {
245 ERS_INTERNAL_ERROR( "Configuration for the \"" << severity << "\" stream is invalid. "
246 "Default configuration will be used." );
247 }
248
249 ers::OutputStream * main = setup_stream( streams );
250
251 if ( !main )
252 {
253 std::vector<std::string> default_streams;
254 try
255 {
256 parse_stream_definition( DefaultOutputStreams[severity], default_streams );
257 main = setup_stream( default_streams );
258 }
259 catch ( ers::BadConfiguration & ex )
260 {
261 ERS_INTERNAL_ERROR( "Can not configure the \"" << severity
262 << "\" stream because of the following issue {" << ex << "}" );
263 }
264 }
265 return ( main ? main : new ers::NullStream() );
266}
267
269ers::StreamManager::setup_stream( const std::vector<std::string> & streams )
270{
271 size_t cnt = 0;
273 for ( ; cnt < streams.size(); ++cnt )
274 {
275 main = ers::StreamFactory::instance().create_out_stream( streams[cnt] );
276 if ( main )
277 break;
278 }
279
280 if ( !main )
281 {
282 return 0;
283 }
284
285 ers::OutputStream * head = main;
286 for ( ++cnt; cnt < streams.size(); ++cnt )
287 {
288 ers::OutputStream * chained = ers::StreamFactory::instance().create_out_stream( streams[cnt] );
289
290 if ( chained )
291 {
292 head->chained( chained );
293 head = chained;
294 }
295 }
296
297 return main;
298}
299
304void
306{
307 ers::severity old_severity = issue.set_severity( type );
308 m_out_streams[type]->write( issue );
309 issue.set_severity( old_severity );
310} // error
311
315void
317{
318 report_issue( ers::Error, issue );
319} // error
320
325void
326ers::StreamManager::debug( const Issue & issue, int level )
327{
328 if ( Configuration::instance().debug_level() >= level )
329 {
330 ers::severity old_severity = issue.set_severity( ers::Severity( ers::Debug, level ) );
331 m_out_streams[ers::Debug]->write( issue );
332 issue.set_severity( old_severity );
333 }
334}
335
339void
341{
342 report_issue( ers::Fatal, issue );
343}
344
348void
350{
351 report_issue( ers::Warning, issue );
352}
353
357void
359{
361}
362
366void
368{
369 report_issue( ers::Log, issue );
370}
371
372std::ostream &
373ers::operator<<( std::ostream & out, const ers::StreamManager & )
374{
375 for( short ss = ers::Debug; ss <= ers::Fatal; ++ss )
376 {
377 out << (ers::severity)ss << "\t\""
378 << get_stream_description( (ers::severity)ss ) << "\"" << std::endl;
379 }
380 return out;
381}
#define ERS_DECLARE_ISSUE(namespace_name, class_name, message, attributes)
#define ERS_HERE
static Configuration & instance()
return the singleton
ERS Issue input stream interface.
void set_receiver(IssueReceiver *receiver)
ERS Issue receiver interface.
Base class for any user define issue.
Definition Issue.hpp:69
ers::Severity severity() const
severity of the issue
Definition Issue.hpp:112
ers::Severity set_severity(ers::Severity severity) const
Definition Issue.cpp:180
ERS abstract output stream interface.
virtual bool isNull() const
OutputStream & chained()
void write(const Issue &issue)
std::recursive_mutex m_mutex
StreamManager & m_manager
StreamInitializer(StreamManager &manager)
This class manages and provides access to ERS streams.
void add_output_stream(ers::severity severity, ers::OutputStream *new_stream)
void error(const Issue &issue)
sends an issue to the error stream
std::shared_ptr< OutputStream > m_init_streams[ers::Fatal+1]
array of pointers to streams per severity
OutputStream * setup_stream(ers::severity severity)
void warning(const Issue &issue)
sends an issue to the warning stream
static StreamManager & instance()
return the singleton
std::list< std::shared_ptr< InputStream > > m_in_streams
void report_issue(ers::severity type, const Issue &issue)
void log(const Issue &issue)
sends an issue to the log stream
void fatal(const Issue &issue)
sends an issue to the fatal stream
void add_receiver(const std::string &stream, const std::string &filter, ers::IssueReceiver *receiver)
void debug(const Issue &issue, int level)
sends an Issue to the debug stream
void information(const Issue &issue)
sends an issue to the information stream
std::shared_ptr< OutputStream > m_out_streams[ers::Fatal+1]
array of pointers to streams per severity
void remove_receiver(ers::IssueReceiver *receiver)
#define ERS_INTERNAL_ERROR(message)
Definition macro.hpp:52
int main(int argc, char **argv)
int debug_level()
Definition ers.hpp:66
std::string to_string(severity s)
std::ostream & operator<<(std::ostream &, const ers::Configuration &)
severity
Definition Severity.hpp:26
@ Debug
Definition Severity.hpp:26
@ Error
Definition Severity.hpp:26
@ Fatal
Definition Severity.hpp:26
@ Log
Definition Severity.hpp:26
@ Warning
Definition Severity.hpp:26
@ Information
Definition Severity.hpp:26
Null stream.