4namespace datahandlinglibs {
6template<
class ReadoutType>
14 throw ResourceQueueError(
ERS_HERE,
"raw_recording",
"RecorderModel");
23template<
class ReadoutType>
28 info.set_recording_status(
"Y");
29 info.set_packets_recorded(m_packets_processed.exchange(0));
30 info.set_bytes_recorded(m_bytes_processed.exchange(0));
33template<
class ReadoutType>
38 if (remove(m_output_file.c_str()) == 0) {
39 TLOG(TLVL_WORK_STEPS) <<
"Removed existing output file from previous run" << std::endl;
42 m_buffered_writer.open(
43 m_output_file, m_stream_buffer_size, m_compression_algorithm, m_use_o_direct);
44 m_work_thread.set_name(m_name, 0);
47template<
class ReadoutType>
51 m_packets_processed = 0;
52 m_bytes_processed = 0;
54 m_run_marker.store(
true);
58template<
class ReadoutType>
62 m_run_marker.store(
false);
63 while (!m_work_thread.get_readiness()) {
64 std::this_thread::sleep_for(std::chrono::milliseconds(100));
68template<
class ReadoutType>
72 m_time_point_last_info = std::chrono::steady_clock::now();
75 while (m_run_marker) {
77 element = m_data_receiver->receive(std::chrono::milliseconds(100));
78 if (!m_buffered_writer.write(
reinterpret_cast<char*
>(&element),
sizeof(element))) {
82 m_packets_processed++;
83 m_bytes_processed+=
sizeof(element);
84 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
88 m_buffered_writer.flush();
const std::string & get_output_file() const
Get "output_file" attribute value.
bool get_use_o_direct() const
Get "use_o_direct" attribute value. Whether to use O_DIRECT flag when opening files.
uint32_t get_streaming_buffer_size() const
Get "streaming_buffer_size" attribute value.
const std::string & get_compression_algorithm() const
Get "compression_algorithm" attribute value.
const dunedaq::appmodel::DataRecorderConf * get_configuration() const
Get "configuration" relationship value.
const std::vector< const dunedaq::confmodel::Connection * > & get_inputs() const
Get "inputs" relationship value. List of connections to/from this module.
void init(const appmodel::DataRecorderModule *conf) override
void do_start(const nlohmann::json &) override
virtual void generate_opmon_data() override
void do_conf(const nlohmann::json &) override
void do_stop(const nlohmann::json &) override
Base class for any user define issue.
static std::shared_ptr< iomanager::ReceiverConcept< Datatype > > get_iom_receiver(iomanager::ConnectionId const &id)
void warning(const Issue &issue)