Line data Source code
1 : /**
2 : * @file ProtoBufStream.cpp ProtoBufStream class implementation
3 : *
4 : * This is part of the DUNE DAQ software, copyright 2020.
5 : * Licensing/copyright details are in the COPYING file that you should have
6 : * received with this code.
7 : */
8 :
9 : #include "ProtoBufStream.hpp"
10 : #include <ers/StreamFactory.hpp>
11 : #include <string>
12 : #include <iostream>
13 : #include <chrono>
14 : #include <boost/crc.hpp>
15 : #include <vector>
16 :
17 0 : ERS_REGISTER_OUTPUT_STREAM(erskafka::ProtoBufStream, "protobufstream", param)
18 :
19 : /** Constructor that creates a new instance of the ersstream stream with the given configuration.
20 : * \param format elastic search connection string.
21 : */
22 : namespace erskafka
23 : {
24 0 : erskafka::ProtoBufStream::ProtoBufStream(const std::string ¶m)
25 0 : : m_session("Uknown")
26 0 : , m_application("Uknown") {
27 :
28 0 : nlohmann::json conf;
29 0 : conf["bootstrap"] = param;
30 :
31 0 : m_publisher = std::make_unique<dunedaq::erskafka::ERSPublisher>(conf);
32 :
33 0 : if(auto env_p = std::getenv("DUNEDAQ_SESSION"))
34 0 : m_session = env_p;
35 :
36 0 : if (auto app_p = std::getenv("DUNEDAQ_APPLICATION_NAME"))
37 0 : m_application = app_p;
38 0 : }
39 :
40 :
41 : /** Write method
42 : * \param issue issue to be sent.
43 : */
44 0 : void erskafka::ProtoBufStream::write(const ers::Issue &issue)
45 : {
46 0 : try {
47 :
48 0 : auto schema = to_schema_chain(issue);
49 0 : schema.set_session(m_session);
50 0 : schema.set_application(m_application);
51 0 : m_publisher -> publish(std::move(schema)) ;
52 :
53 0 : }
54 0 : catch(const std::exception& e)
55 : {
56 0 : std::cerr << "Producer error : " << e.what() << '\n';
57 0 : }
58 0 : }
59 : } // namespace erskafka
|