Line data Source code
1 : /*
2 : * StreamManager.cxx
3 : * ERS
4 : *
5 : * Created by Matthias Wiesmann on 21.01.05.
6 : * Modified by Serguei Kolos on 12.09.06.
7 : * Copyright 2005 CERN. All rights reserved.
8 : *
9 : */
10 :
11 : #include <assert.h>
12 : #include <iostream>
13 :
14 : #include <ers/Issue.hpp>
15 : #include <ers/InputStream.hpp>
16 : #include <ers/OutputStream.hpp>
17 : #include <ers/StreamManager.hpp>
18 : #include <ers/StreamFactory.hpp>
19 : #include <ers/Severity.hpp>
20 : #include <ers/Configuration.hpp>
21 : #include <ers/ers.hpp>
22 : #include <ers/internal/macro.hpp>
23 : #include <ers/internal/Util.hpp>
24 : #include <ers/internal/PluginManager.hpp>
25 : #include <ers/internal/NullStream.hpp>
26 : #include <ers/internal/SingletonCreator.hpp>
27 :
28 73 : ERS_DECLARE_ISSUE( ers,
29 : BadConfiguration,
30 : "The stream configuration string \"" << config << "\" has syntax errors.",
31 : ((std::string)config) )
32 :
33 : namespace
34 : {
35 : /** This variable contains the default keys for building the default streams.
36 : * The default is to use the default stream, in verbose mode for errors and fatals.
37 : */
38 : const char SEPARATOR = ',';
39 :
40 : const char * const DefaultOutputStreams[] =
41 : {
42 : "lstdout", // Debug
43 : "lstdout", // Log
44 : "throttle,lstdout", // Information
45 : "throttle,lstderr", // Warning
46 : "throttle,lstderr", // Error
47 : "lstderr" // Fatal
48 : };
49 :
50 : const char *
51 55 : get_stream_description( ers::severity severity )
52 : {
53 55 : assert( ers::Debug <= severity && severity <= ers::Fatal );
54 :
55 55 : std::string env_name( "DUNEDAQ_ERS_" );
56 55 : env_name += ers::to_string( severity );
57 55 : const char * env = ::getenv( env_name.c_str() );
58 110 : return env ? env : DefaultOutputStreams[severity];
59 55 : }
60 :
61 : void
62 55 : parse_stream_definition( const std::string & text,
63 : std::vector<std::string> & result )
64 : {
65 55 : std::string::size_type start_p = 0, end_p = 0;
66 55 : short brackets_open = 0;
67 629 : while ( end_p < text.length() )
68 : {
69 574 : switch ( text[end_p] )
70 : {
71 0 : case '(':
72 0 : ++brackets_open;
73 0 : break;
74 0 : case ')':
75 0 : --brackets_open;
76 0 : break;
77 21 : case SEPARATOR:
78 21 : if ( !brackets_open )
79 : {
80 21 : result.push_back( text.substr( start_p, end_p - start_p ) );
81 21 : start_p = end_p + 1;
82 : }
83 : break;
84 : default:
85 : break;
86 : }
87 574 : end_p++;
88 : }
89 55 : if ( brackets_open )
90 : {
91 0 : throw ers::BadConfiguration( ERS_HERE, text );
92 : }
93 55 : if ( start_p != end_p )
94 : {
95 55 : result.push_back( text.substr( start_p, end_p - start_p ) );
96 : }
97 55 : }
98 : }
99 :
100 : namespace ers
101 : {
102 : // Performs lazy srteam initialization. Stream instances are created
103 : // at the first attempt of writing to the stream
104 : class StreamInitializer : public ers::OutputStream
105 : {
106 : public:
107 246 : StreamInitializer( StreamManager & manager )
108 492 : : m_manager( manager ),
109 246 : m_in_progress( false )
110 246 : { ; }
111 :
112 55 : void write( const Issue & issue )
113 : {
114 55 : ers::severity s = issue.severity();
115 55 : std::scoped_lock lock( m_mutex );
116 :
117 55 : if ( !m_in_progress ) {
118 55 : m_in_progress = true;
119 : }
120 : else {
121 : // The issue is coming from the stream constructor
122 : // We can't use ERS streams, so print it to std
123 0 : if ( s < ers::Warning )
124 0 : std::cout << issue << std::endl;
125 : else
126 0 : std::cerr << issue << std::endl;
127 0 : return ;
128 : }
129 :
130 55 : if ( m_manager.m_out_streams[s].get() == this ) {
131 110 : m_manager.m_out_streams[s] =
132 55 : std::shared_ptr<OutputStream>( m_manager.setup_stream( s ) );
133 : }
134 55 : m_manager.report_issue( s, issue );
135 55 : m_in_progress = false;
136 55 : }
137 :
138 : private:
139 : std::recursive_mutex m_mutex;
140 : StreamManager & m_manager;
141 : bool m_in_progress;
142 : };
143 :
144 : }
145 :
146 : /** This method returns the singleton instance.
147 : * It should be used for every operation on the factory.
148 : * \return a reference to the singleton instance
149 : */
150 : ers::StreamManager &
151 1020 : ers::StreamManager::instance()
152 : {
153 : /**Singleton instance
154 : */
155 1020 : static ers::StreamManager * instance = ers::SingletonCreator<ers::StreamManager>::create();
156 :
157 1020 : return *instance;
158 : } // instance
159 :
160 : /** Private constructor - can not be called by user code, use the \c instance() method instead
161 : * \see instance()
162 : */
163 41 : ers::StreamManager::StreamManager()
164 : {
165 287 : for( short ss = ers::Debug; ss <= ers::Fatal; ++ss )
166 : {
167 246 : m_init_streams[ss] = std::make_shared<StreamInitializer>( *this );
168 246 : m_out_streams[ss] = m_init_streams[ss];
169 : }
170 41 : }
171 :
172 : /** Destructor - basic cleanup
173 : */
174 0 : ers::StreamManager::~StreamManager()
175 0 : { ; }
176 :
177 : void
178 0 : ers::StreamManager::add_output_stream( ers::severity severity, ers::OutputStream * new_stream )
179 : {
180 0 : std::shared_ptr<OutputStream> head = m_out_streams[severity];
181 0 : if ( head && !head->isNull() )
182 : {
183 0 : OutputStream * parent = head.get();
184 0 : for ( OutputStream * stream = parent; !stream->isNull(); parent = stream,
185 0 : stream = &parent->chained() )
186 : ;
187 :
188 0 : parent->chained( new_stream );
189 : }
190 : else
191 : {
192 0 : m_out_streams[severity] = std::shared_ptr<OutputStream>( new_stream );
193 : }
194 0 : }
195 :
196 : void
197 0 : ers::StreamManager::add_receiver( const std::string & stream,
198 : const std::string & filter,
199 : ers::IssueReceiver * receiver )
200 : {
201 0 : InputStream * in = ers::StreamFactory::instance().create_in_stream( stream, filter );
202 0 : in->set_receiver( receiver );
203 :
204 0 : std::scoped_lock lock( m_mutex );
205 0 : m_in_streams.push_back( std::shared_ptr<InputStream>( in ) );
206 0 : }
207 :
208 : void
209 0 : ers::StreamManager::add_receiver( const std::string & stream,
210 : const std::initializer_list<std::string> & params,
211 : ers::IssueReceiver * receiver )
212 : {
213 0 : InputStream * in = ers::StreamFactory::instance().create_in_stream( stream, params );
214 0 : in->set_receiver( receiver );
215 :
216 0 : std::scoped_lock lock( m_mutex );
217 0 : m_in_streams.push_back( std::shared_ptr<InputStream>( in ) );
218 0 : }
219 :
220 : void
221 0 : ers::StreamManager::remove_receiver( ers::IssueReceiver * receiver )
222 : {
223 0 : std::scoped_lock lock( m_mutex );
224 0 : for( std::list<std::shared_ptr<InputStream> >::iterator it = m_in_streams.begin();
225 0 : it != m_in_streams.end(); )
226 : {
227 0 : if ( (*it) -> m_receiver == receiver )
228 0 : m_in_streams.erase( it++ );
229 : else
230 0 : ++it;
231 : }
232 0 : }
233 :
234 : ers::OutputStream *
235 55 : ers::StreamManager::setup_stream( ers::severity severity )
236 : {
237 55 : std::string config = get_stream_description( severity );
238 55 : std::vector<std::string> streams;
239 55 : try
240 : {
241 55 : parse_stream_definition( config, streams );
242 : }
243 0 : catch ( ers::BadConfiguration & ex )
244 : {
245 0 : ERS_INTERNAL_ERROR( "Configuration for the \"" << severity << "\" stream is invalid. "
246 0 : "Default configuration will be used." );
247 0 : }
248 :
249 55 : ers::OutputStream * main = setup_stream( streams );
250 :
251 55 : if ( !main )
252 : {
253 0 : std::vector<std::string> default_streams;
254 0 : try
255 : {
256 0 : parse_stream_definition( DefaultOutputStreams[severity], default_streams );
257 0 : main = setup_stream( default_streams );
258 : }
259 0 : catch ( ers::BadConfiguration & ex )
260 : {
261 0 : ERS_INTERNAL_ERROR( "Can not configure the \"" << severity
262 0 : << "\" stream because of the following issue {" << ex << "}" );
263 0 : }
264 0 : }
265 55 : return ( main ? main : new ers::NullStream() );
266 55 : }
267 :
268 : ers::OutputStream *
269 55 : ers::StreamManager::setup_stream( const std::vector<std::string> & streams )
270 : {
271 55 : size_t cnt = 0;
272 55 : ers::OutputStream * main = 0;
273 55 : for ( ; cnt < streams.size(); ++cnt )
274 : {
275 55 : main = ers::StreamFactory::instance().create_out_stream( streams[cnt] );
276 55 : if ( main )
277 : break;
278 : }
279 :
280 55 : if ( !main )
281 : {
282 : return 0;
283 : }
284 :
285 55 : ers::OutputStream * head = main;
286 76 : for ( ++cnt; cnt < streams.size(); ++cnt )
287 : {
288 21 : ers::OutputStream * chained = ers::StreamFactory::instance().create_out_stream( streams[cnt] );
289 :
290 21 : if ( chained )
291 : {
292 21 : head->chained( chained );
293 21 : head = chained;
294 : }
295 : }
296 :
297 : return main;
298 : }
299 :
300 : /** Sends an Issue to an appropriate stream
301 : * \param type
302 : * \param issue
303 : */
304 : void
305 1074 : ers::StreamManager::report_issue( ers::severity type, const Issue & issue )
306 : {
307 1074 : ers::severity old_severity = issue.set_severity( type );
308 1074 : m_out_streams[type]->write( issue );
309 1074 : issue.set_severity( old_severity );
310 1074 : } // error
311 :
312 : /** Sends an Issue to the error stream
313 : * \param issue
314 : */
315 : void
316 0 : ers::StreamManager::error( const Issue & issue )
317 : {
318 0 : report_issue( ers::Error, issue );
319 0 : } // error
320 :
321 : /** Sends an issue to the debug stream
322 : * \param issue the Issue to send
323 : * \param level the debug level.
324 : */
325 : void
326 1 : ers::StreamManager::debug( const Issue & issue, int level )
327 : {
328 1 : if ( Configuration::instance().debug_level() >= level )
329 : {
330 0 : ers::severity old_severity = issue.set_severity( ers::Severity( ers::Debug, level ) );
331 0 : m_out_streams[ers::Debug]->write( issue );
332 0 : issue.set_severity( old_severity );
333 : }
334 1 : }
335 :
336 : /** Sends an Issue to the fatal error stream
337 : * \param issue
338 : */
339 : void
340 0 : ers::StreamManager::fatal( const Issue & issue )
341 : {
342 0 : report_issue( ers::Fatal, issue );
343 0 : }
344 :
345 : /** Sends an Issue to the warning stream
346 : * \param issue the issue to send
347 : */
348 : void
349 0 : ers::StreamManager::warning( const Issue & issue )
350 : {
351 0 : report_issue( ers::Warning, issue );
352 0 : }
353 :
354 : /** Sends an issue to the info stream
355 : * \param issue the Issue to send
356 : */
357 : void
358 19 : ers::StreamManager::information( const Issue & issue )
359 : {
360 19 : report_issue( ers::Information, issue );
361 19 : }
362 :
363 : /** Sends an issue to the log stream
364 : * \param issue the Issue to send
365 : */
366 : void
367 961 : ers::StreamManager::log( const Issue & issue )
368 : {
369 961 : report_issue( ers::Log, issue );
370 961 : }
371 :
372 : std::ostream &
373 0 : ers::operator<<( std::ostream & out, const ers::StreamManager & )
374 : {
375 0 : for( short ss = ers::Debug; ss <= ers::Fatal; ++ss )
376 : {
377 0 : out << (ers::severity)ss << "\t\""
378 0 : << get_stream_description( (ers::severity)ss ) << "\"" << std::endl;
379 : }
380 0 : return out;
381 : }
|