DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
AvailableParserOperations.hpp
Go to the documentation of this file.
1
9#ifndef FLXLIBS_INCLUDE_FLXLIBS_AVAILABLEPARSEROPERATIONS_HPP_
10#define FLXLIBS_INCLUDE_FLXLIBS_AVAILABLEPARSEROPERATIONS_HPP_
11
12#include "FelixIssues.hpp"
13
14#include "iomanager/Sender.hpp"
15
17
18#include "packetformat/block_format.hpp"
19
20#include <algorithm>
21#include <cstdlib>
22#include <memory>
23#include <sstream>
24#include <utility>
25
26namespace dunedaq {
27namespace flxlibs {
28namespace parsers {
29
30inline void
31print_bytes(std::ostream& ostr, const char* title, const unsigned char* data, std::size_t length, bool format = true)
32{
33 ostr << title << std::endl;
34 ostr << std::setfill('0');
35 for (size_t i = 0; i < length; ++i) {
36 ostr << std::hex << std::setw(2) << static_cast<int>(data[i]);
37 if (format) {
38 ostr << (((i + 1) % 16 == 0) ? "\n" : " ");
39 }
40 }
41 ostr << std::endl;
42}
43
44inline void
45dump_to_buffer(const char* data,
46 std::size_t size,
47 void* buffer,
48 uint32_t buffer_pos, // NOLINT
49 const std::size_t& buffer_size)
50{
51 auto bytes_to_copy = size; // NOLINT
52 while (bytes_to_copy > 0) {
53 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos); // NOLINT
54 std::memcpy(static_cast<char*>(buffer) + buffer_pos, data, n);
55 buffer_pos += n;
56 bytes_to_copy -= n;
57 if (buffer_pos == buffer_size) {
58 buffer_pos = 0;
59 }
60 }
61}
62
63template<class TargetStruct>
64inline std::function<void(const felix::packetformat::chunk& chunk)>
66 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
67{
68 return [&](const felix::packetformat::chunk& chunk) {
69 // Chunk info
70 auto subchunk_data = chunk.subchunks();
71 auto subchunk_sizes = chunk.subchunk_lengths();
72 auto n_subchunks = chunk.subchunk_number();
73 std::size_t target_size = sizeof(TargetStruct);
74
75 // Only dump to buffer if possible
76 if (chunk.length() != target_size) {
77 ers::error(UnexpectedChunk(ERS_HERE, chunk.length(), target_size));
78 } else {
79 TargetStruct payload;
80 uint32_t bytes_copied_chunk = 0; // NOLINT
81 for (unsigned i = 0; i < n_subchunks; i++) {
83 subchunk_data[i], subchunk_sizes[i], static_cast<void*>(&payload.data), bytes_copied_chunk, target_size);
84 bytes_copied_chunk += subchunk_sizes[i];
85 }
86 try {
87 // finally, push to sink
88 sink->send(std::move(payload), timeout);
89 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
90 // ers::error(ParserOperationQueuePushFailure(ERS_HERE, " "));
91 }
92 }
93 };
94}
95
96template<class TargetStruct>
97inline std::function<void(const felix::packetformat::shortchunk& shortchunk)>
99 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
100{
101 return [&](const felix::packetformat::shortchunk& shortchunk) {
102 // Only dump to buffer if possible
103 std::size_t target_size = sizeof(TargetStruct);
104 if (shortchunk.length != target_size) {
105 // report? Add custom way of handling unexpected user payloads.
106 // In this case -> not fixed size shortchunk -> shortchunk-to-userbuff not possible
107 // Can't throw, and can't print as it may flood output
108 } else {
109 TargetStruct payload;
110 std::memcpy(static_cast<char*>(payload), shortchunk.data, target_size);
111 try {
112 // finally, push to sink
113 sink->send(std::move(payload), timeout);
114 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
115 // ers::error(ParserOperationQueuePushFailure(ERS_HERE, " "));
116 }
117 }
118 };
119}
120
121template<class TargetStruct>
122inline std::function<void(const felix::packetformat::chunk& chunk)>
124 // std::shared_ptr<iomanager::SenderConcept<std::unique_ptr<TargetStruct>>>& sink,
125 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
126{
127 return [&](const felix::packetformat::chunk& chunk) {
128 // Chunk info
129 auto subchunk_data = chunk.subchunks();
130 auto subchunk_sizes = chunk.subchunk_lengths();
131 auto n_subchunks = chunk.subchunk_number();
132 auto target_size = sizeof(TargetStruct);
133
134 // Only dump to buffer if possible
135 if (chunk.length() != target_size) {
136 // report? Add custom way of handling unexpected user payloads.
137 // In this case -> not fixed size chunk -> chunk-to-userbuff not possible
138 } else {
139 TargetStruct* payload = new TargetStruct[sizeof(TargetStruct)];
140 // std::unique_ptr<TargetStruct> payload = std::make_unique<TargetStruct>();
141 uint_fast32_t bytes_copied_chunk = 0; // NOLINT
142 for (unsigned i = 0; i < n_subchunks; i++) {
143 dump_to_buffer(subchunk_data[i],
144 subchunk_sizes[i],
145 static_cast<void*>(payload), // static_cast<void*>(&payload_ptr->data),
146 bytes_copied_chunk,
147 target_size);
148 bytes_copied_chunk += subchunk_sizes[i];
149 }
150 try {
151 // finally, push to sink
152 sink->send(std::move(payload), timeout); // std::move(std::make_unique<TargetStruct>(payload)), timeout);
153 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
154 // ers::error(ParserOperationQueuePushFailure(ERS_HERE, " "));
155 }
156 }
157 };
158}
159
160template<class TargetWithDatafield>
161inline std::function<void(const felix::packetformat::chunk&)>
163 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
164{
165 return [&](const felix::packetformat::chunk& chunk) {
166 auto subchunk_data = chunk.subchunks();
167 auto subchunk_sizes = chunk.subchunk_lengths();
168 auto n_subchunks = chunk.subchunk_number();
169 TargetWithDatafield twd;
170 twd.get_data().reserve(chunk.length());
171 uint32_t bytes_copied_chunk = 0;
172 for (unsigned i = 0; i< n_subchunks; ++i) {
173 dump_to_buffer(subchunk_data[i],
174 subchunk_sizes[i],
175 static_cast<void*>(twd.get_data().data()),
176 bytes_copied_chunk,
177 chunk.length());
178 bytes_copied_chunk += subchunk_sizes[i];
179 }
180 twd.set_data_size(bytes_copied_chunk);
181 try {
182 sink->send(std::move(twd), timeout);
183 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
184 // ers::error
185 }
186 };
187}
188
189template<class TargetWithDatafield>
190inline std::function<void(const felix::packetformat::shortchunk&)>
192 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
193{
194 return [&](const felix::packetformat::shortchunk& shortchunk) {
195 TargetWithDatafield twd;
196 twd.get_data().reserve(shortchunk.length);
197 std::memcpy(static_cast<void*>(twd.get_data().data()), shortchunk.data, shortchunk.length);
198 twd.set_data_size(shortchunk.length);
199 try {
200 sink->send(std::move(twd), timeout);
201 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
202 // ers::error
203 }
204 };
205}
206
207inline std::function<void(const felix::packetformat::chunk& chunk)>
209 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
210{
211 return [&](const felix::packetformat::chunk& chunk) {
212 auto subchunk_data = chunk.subchunks();
213 auto subchunk_sizes = chunk.subchunk_lengths();
214 auto n_subchunks = chunk.subchunk_number();
215 auto chunk_length = chunk.length();
216
217 char* payload = static_cast<char*>(malloc(chunk_length * sizeof(char)));
218 uint32_t bytes_copied_chunk = 0; // NOLINT(build/unsigned)
219 for (unsigned i = 0; i < n_subchunks; ++i) {
221 subchunk_data[i], subchunk_sizes[i], static_cast<void*>(payload), bytes_copied_chunk, chunk_length);
222 bytes_copied_chunk += subchunk_sizes[i];
223 }
224 fdreadoutlibs::types::VariableSizePayloadTypeAdapter payload_wrapper(chunk_length, payload);
225 try {
226 sink->send(std::move(payload_wrapper), timeout);
227 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
228 // ers
229 }
230 };
231}
232
233inline std::function<void(const felix::packetformat::shortchunk& shortchunk)>
235 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
236{
237 return [&](const felix::packetformat::shortchunk& shortchunk) {
238 auto shortchunk_length = shortchunk.length;
239 char* payload = static_cast<char*>(malloc(shortchunk_length * sizeof(char)));
240 std::memcpy(payload, shortchunk.data, shortchunk_length);
241 fdreadoutlibs::types::VariableSizePayloadTypeAdapter payload_wrapper(shortchunk_length, payload);
242 try {
243 sink->send(std::move(payload_wrapper), timeout);
244 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
245 // ers
246 }
247 };
248}
249
250
251inline std::function<void(const felix::packetformat::chunk& chunk)>
253 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
254{
255 return [&](const felix::packetformat::chunk& chunk) {
256 try {
257 auto payload = chunk;
258 sink->send(std::move(payload), timeout);
259 } catch (const dunedaq::iomanager::TimeoutExpired& excpt) {
260 // ers
261 }
262 };
263}
264
265
267
268} // namespace parsers
269} // namespace flxlibs
270} // namespace dunedaq
271
272#endif // FLXLIBS_INCLUDE_FLXLIBS_AVAILABLEPARSEROPERATIONS_HPP_
#define ERS_HERE
std::function< void(const felix::packetformat::chunk &)> varsizedChunkIntoWithDatafield(std::shared_ptr< iomanager::SenderConcept< TargetWithDatafield > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
void print_bytes(std::ostream &ostr, const char *title, const unsigned char *data, std::size_t length, bool format=true)
std::function< void(const felix::packetformat::chunk &chunk)> fixsizedChunkViaHeap(std::shared_ptr< iomanager::SenderConcept< TargetStruct * > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
void dump_to_buffer(const char *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
std::function< void(const felix::packetformat::chunk &chunk)> errorChunkIntoSink(std::shared_ptr< iomanager::SenderConcept< felix::packetformat::chunk > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::chunk &chunk)> varsizedChunkIntoWrapper(std::shared_ptr< iomanager::SenderConcept< fdreadoutlibs::types::VariableSizePayloadTypeAdapter > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::chunk &chunk)> fixsizedChunkInto(std::shared_ptr< iomanager::SenderConcept< TargetStruct > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &shortchunk)> fixsizedShortchunkInto(std::shared_ptr< iomanager::SenderConcept< TargetStruct > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &shortchunk)> varsizedShortchunkIntoWrapper(std::shared_ptr< iomanager::SenderConcept< fdreadoutlibs::types::VariableSizePayloadTypeAdapter > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &)> varsizedShortchunkIntoWithDatafield(std::shared_ptr< iomanager::SenderConcept< TargetWithDatafield > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
Including Qt Headers.
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk int ParserOps couldn t push to queue !Failed chunk
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
FELIX Initialization std::string initerror FELIX queue timed std::string queuename UnexpectedChunk
void error(const Issue &issue)
Definition ers.hpp:81
Convencience wrapper to take ownership over char pointers with corresponding allocated memory size.