DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
BufferedFileWriter.hpp
Go to the documentation of this file.
1
11#ifndef DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_UTILS_BUFFEREDFILEWRITER_HPP_
12#define DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_UTILS_BUFFEREDFILEWRITER_HPP_
13
16
17#include "logging/Logging.hpp"
18
19#include <boost/align/aligned_allocator.hpp>
20#include <boost/iostreams/device/file_descriptor.hpp>
21#include <boost/iostreams/filter/lzma.hpp>
22#include <boost/iostreams/filter/zlib.hpp>
23#include <boost/iostreams/filter/zstd.hpp>
24#include <boost/iostreams/filtering_stream.hpp>
25#include <boost/iostreams/stream.hpp>
26#include <boost/iostreams/stream_buffer.hpp>
27
28#include <fcntl.h>
29#include <fstream>
30#include <iostream>
31#include <limits>
32#include <string>
33#include <unistd.h>
34
36
37namespace dunedaq {
38namespace datahandlinglibs {
46template<size_t Alignment = 4096>
48{
49 using io_sink_t = boost::iostreams::file_descriptor_sink;
50 using aligned_allocator_t = boost::alignment::aligned_allocator<io_sink_t::char_type, Alignment>;
52 boost::iostreams::filtering_stream<boost::iostreams::output, char, std::char_traits<char>, aligned_allocator_t>;
53
54public:
66 BufferedFileWriter(std::string filename, size_t buffer_size, std::string compression_algorithm = "None", bool use_o_direct = true)
67 {
68 open(filename, buffer_size, compression_algorithm, use_o_direct);
69 }
70
75
80 {
81 if (m_is_open)
82 close();
83 }
84
89
101 void open(std::string filename,
102 size_t buffer_size,
103 std::string compression_algorithm = "None",
104 bool use_o_direct = true)
105 {
106 m_use_o_direct = use_o_direct;
107 if (m_is_open) {
108 close();
109 }
110
112 m_buffer_size = buffer_size;
113 m_compression_algorithm = compression_algorithm;
114 auto oflag = O_CREAT | O_WRONLY;
115 if (m_use_o_direct) {
116 oflag = oflag | O_DIRECT;
117 }
118
119 m_fd = ::open(m_filename.c_str(), oflag, 0644);
120 if (m_fd == -1) {
121 throw BufferedReaderWriterCannotOpenFile(ERS_HERE, m_filename);
122 }
123
124 m_sink = io_sink_t(m_fd, boost::iostreams::file_descriptor_flags::close_handle);
125 if (m_compression_algorithm == "zstd") {
126 TLOG_DEBUG(TLVL_WORK_STEPS) << "Using zstd compression" << std::endl;
127 m_output_stream.push(boost::iostreams::zstd_compressor(boost::iostreams::zstd::best_speed));
128 } else if (m_compression_algorithm == "lzma") {
129 TLOG_DEBUG(TLVL_WORK_STEPS) << "Using lzma compression" << std::endl;
130 m_output_stream.push(boost::iostreams::lzma_compressor(boost::iostreams::lzma::best_speed));
131 } else if (m_compression_algorithm == "zlib") {
132 TLOG_DEBUG(TLVL_WORK_STEPS) << "Using zlib compression" << std::endl;
133 m_output_stream.push(boost::iostreams::zlib_compressor(boost::iostreams::zlib::best_speed));
134 } else if (m_compression_algorithm == "None") {
135 TLOG_DEBUG(TLVL_WORK_STEPS) << "Running without compression" << std::endl;
136 } else {
138 "Non-recognized compression algorithm: " + m_compression_algorithm);
139 }
140
142 m_is_open = true;
143 }
144
149 bool is_open() const { return m_is_open; }
150
156 bool write(const char* memory, const size_t size)
157 {
158 if (!m_is_open)
159 return false;
160 m_output_stream.write(memory, size); // NOLINT
161 return !m_output_stream.bad();
162 }
163
167 void close()
168 {
169 // Set the file descriptor to not use O_DIRECT. This is necessary because the write size has to be aligned for
170 // O_DIRECT to succeed. This is not guaranteed for the data that remains in the buffer.
171 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
172 m_output_stream.reset();
173 m_is_open = false;
174 }
175
180 void flush()
181 {
182 // Set the file descriptor to not use O_DIRECT. This is necessary because the write size has to be aligned for
183 // O_DIRECT to succeed. This is not guaranteed for the data that remains in the buffer.
184 fcntl(m_fd, F_SETFL, O_CREAT | O_WRONLY);
185 // This does not flush the compressor as it is not flushable
186 m_output_stream.flush();
187 // Activate O_DIRECT again
188 auto oflag = O_CREAT | O_WRONLY;
189 if (m_use_o_direct) {
190 oflag = oflag | O_DIRECT;
191 }
192 fcntl(m_fd, F_SETFL, oflag);
193 }
194
195private:
196 // Config parameters
197 std::string m_filename;
200
201 // Internals
202 int m_fd;
205 bool m_is_open = false;
206 bool m_use_o_direct = true;
207};
208
209} // namespace datahandlinglibs
210} // namespace dunedaq
211
212#endif // DATAHANDLINGLIBS_INCLUDE_DATAHANDLINGLIBS_UTILS_BUFFEREDFILEWRITER_HPP_
#define ERS_HERE
BufferedFileWriter(const BufferedFileWriter &)=delete
BufferedFileWriter is not copy-constructible.
boost::iostreams::filtering_stream< boost::iostreams::output, char, std::char_traits< char >, aligned_allocator_t > filtering_ostream_t
boost::alignment::aligned_allocator< io_sink_t::char_type, Alignment > aligned_allocator_t
void open(std::string filename, size_t buffer_size, std::string compression_algorithm="None", bool use_o_direct=true)
BufferedFileWriter & operator=(BufferedFileWriter &&)=delete
BufferedFileWriter is not move-assignable.
bool write(const char *memory, const size_t size)
boost::iostreams::file_descriptor_sink io_sink_t
BufferedFileWriter(std::string filename, size_t buffer_size, std::string compression_algorithm="None", bool use_o_direct=true)
BufferedFileWriter(BufferedFileWriter &&)=delete
BufferedFileWriter is not move-constructible.
BufferedFileWriter & operator=(const BufferedFileWriter &)=delete
BufferedFileWriter is not copy-assginable.
#define TLOG_DEBUG(lvl,...)
Definition Logging.hpp:112
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
SourceID[" << sourceid << "] Command daqdataformats::SourceID Readout Initialization std::string initerror BufferedReaderWriterConfigurationError