DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
LocalStream.cpp
Go to the documentation of this file.
1/*
2 * LocalStream.cxx
3 * ERS
4 *
5 * Created by Serguei Kolos on 21.01.05.
6 * Copyright 2005 CERN. All rights reserved.
7 *
8 */
9#include <ers/LocalStream.hpp>
10#include <ers/StreamManager.hpp>
12
17ers::LocalStream &
18ers::LocalStream::instance()
19{
22 static ers::LocalStream * instance = ers::SingletonCreator<ers::LocalStream>::create();
23
24 return *instance;
25}
26
30ers::LocalStream::LocalStream( )
31 : m_terminated( false )
32{ }
33
34ers::LocalStream::~LocalStream( )
35{
36 remove_issue_catcher();
37}
38
39void
40ers::LocalStream::remove_issue_catcher( )
41{
42 std::unique_ptr<std::thread> catcher;
43 {
44 std::unique_lock lock( m_mutex );
45 if ( !m_issue_catcher_thread.get() )
46 {
47 return ;
48 }
49 m_terminated = true;
50 m_condition.notify_one();
51 catcher.swap(m_issue_catcher_thread);
52 }
53
54 catcher -> join();
55}
56
57void
58ers::LocalStream::thread_wrapper()
59{
60 std::unique_lock lock( m_mutex );
61 m_catcher_thread_id = std::this_thread::get_id();
62 while( !m_terminated )
63 {
64 m_condition.wait( lock, [this](){return !m_issues.empty() || m_terminated;} );
65
66 while( !m_terminated && !m_issues.empty() )
67 {
68 ers::Issue * issue = m_issues.front();
69 m_issues.pop();
70
71 lock.unlock();
72 m_issue_catcher( *issue );
73 delete issue;
74 lock.lock();
75 }
76 }
77 m_catcher_thread_id = {};
78 m_terminated = false;
79}
80
82ers::LocalStream::set_issue_catcher( const std::function<void ( const ers::Issue & )> & catcher )
83{
84 std::unique_lock lock( m_mutex );
85 if ( m_issue_catcher_thread.get() )
86 {
87 throw ers::IssueCatcherAlreadySet( ERS_HERE );
88 }
89 m_issue_catcher = catcher;
90 m_issue_catcher_thread.reset( new std::thread( std::bind( &ers::LocalStream::thread_wrapper, this ) ) );
91
92 return new ers::IssueCatcherHandler;
93}
94
95void
96ers::LocalStream::report_issue( ers::severity type, const ers::Issue & issue )
97{
98 if ( m_issue_catcher_thread.get() && m_catcher_thread_id != std::this_thread::get_id() )
99 {
100 ers::Issue * clone = issue.clone();
101 clone->set_severity( type );
102 std::unique_lock lock( m_mutex );
103 m_issues.push( clone );
104 m_condition.notify_one();
105 }
106 else
107 {
108 StreamManager::instance().report_issue( type, issue );
109 }
110}
111
112void
113ers::LocalStream::error( const ers::Issue & issue )
114{
115 report_issue( ers::Error, issue );
116}
117
118void
119ers::LocalStream::fatal( const ers::Issue & issue )
120{
121 report_issue( ers::Fatal, issue );
122}
123
124void
125ers::LocalStream::warning( const ers::Issue & issue )
126{
127 report_issue( ers::Warning, issue );
128}
#define ERS_HERE
Implements issue catcher lifetime management.
Base class for any user define issue.
Definition Issue.hpp:69
virtual Issue * clone() const =0
ers::Severity set_severity(ers::Severity severity) const
Definition Issue.cpp:180
severity
Definition Severity.hpp:26
@ Error
Definition Severity.hpp:26
@ Fatal
Definition Severity.hpp:26
@ Warning
Definition Severity.hpp:26