Line data Source code
1 : /**
2 : * @file fileOpMonFacility.cpp
3 : *
4 : * This is part of the DUNE DAQ Application Framework, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "fileOpMonFacility.hpp"
10 : #include "opmonlib/Utils.hpp"
11 : #include <iostream>
12 : #include <thread>
13 :
14 : #include "logging/Logging.hpp"
15 :
16 :
17 : namespace dunedaq::opmonlib {
18 :
19 12 : fileOpMonFacility::fileOpMonFacility(std::string uri, OptionalOrigin o) :
20 12 : JSonOpMonFacility(uri) {
21 :
22 12 : std::string hook = "://";
23 12 : auto sep = uri.find(hook);
24 12 : std::string fname;
25 12 : if (sep == std::string::npos) { // assume filename
26 0 : fname = uri;
27 : } else {
28 12 : fname = uri.substr(sep + hook.size());
29 : }
30 :
31 12 : if (o) {
32 11 : auto slash_pos = fname.find_last_of('/');
33 11 : auto dot_pos = slash_pos == std::string::npos ? fname.find_first_of('.') : fname.find_first_of('.', slash_pos);
34 :
35 11 : auto origin = to_string( o.value() );
36 11 : if (dot_pos == std::string::npos) {
37 1 : fname += '.' + origin + ".json";
38 : } else {
39 10 : fname.insert(dot_pos, '.' + origin );
40 : }
41 :
42 11 : }
43 :
44 12 : m_ofs.open(fname, std::ios::out | std::ios::app);
45 12 : if (!m_ofs.is_open()) {
46 1 : throw BadFile(ERS_HERE, fname);
47 : }
48 16 : }
49 :
50 22 : fileOpMonFacility::~fileOpMonFacility() {
51 :
52 22 : std::chrono::milliseconds max_delay( m_writing_counter.load());
53 :
54 11 : m_stop_request.store(true);
55 :
56 11 : std::unique_lock<std::mutex> lock(m_mutex);
57 11 : auto ret = m_writing_variable.wait_for(lock,
58 : max_delay,
59 22 : [&](){ return m_writing_counter.load() == 0; } ) ;
60 11 : if ( ! ret ) {
61 0 : ers::error(FileClosedBeforeWritingComplete(ERS_HERE,
62 0 : max_delay.count(), m_writing_counter.load()) );
63 : }
64 22 : }
65 :
66 1507 : void fileOpMonFacility::publish(opmon::OpMonEntry && e) const {
67 :
68 1507 : if ( m_stop_request.load() ) {
69 0 : throw OpMonPublishFailure( ERS_HERE,
70 0 : get_URI(), e.measurement(),
71 0 : to_string(e.origin()),
72 0 : FacilityStopRequested(ERS_HERE) );
73 :
74 : }
75 :
76 1507 : ++m_writing_counter;
77 1508 : std::thread (& fileOpMonFacility::write,
78 1508 : this, std::move(e)).detach();
79 :
80 1508 : }
81 :
82 1508 : void fileOpMonFacility::write(opmon::OpMonEntry && e) const noexcept {
83 1508 : std::string json;
84 1508 : google::protobuf::util::MessageToJsonString( e, & json,
85 : get_json_options() );
86 :
87 1508 : std::unique_lock<std::mutex> lock(m_mutex);
88 4524 : m_writing_variable.wait(lock, [&](){ return m_writing_counter.load() > 0;} );
89 :
90 1508 : try {
91 1508 : m_ofs << json << std::endl << std::flush;
92 0 : } catch ( const std::ios_base::failure & except ) {
93 0 : ers::error( OpMonPublishFailure( ERS_HERE,
94 0 : get_URI(), e.measurement(),
95 0 : to_string(e.origin()),
96 0 : WritingFailed(ERS_HERE, json, except) ) );
97 0 : }
98 :
99 1508 : --m_writing_counter;
100 1508 : m_writing_variable.notify_one();
101 1508 : }
102 :
103 :
104 :
105 : } // namespace dunedaq::opmonlib
106 :
107 14 : DEFINE_DUNE_OPMON_FACILITY(dunedaq::opmonlib::fileOpMonFacility)
108 :
|