9#ifndef FLXLIBS_INCLUDE_FLXLIBS_AVAILABLEPARSEROPERATIONS_HPP_
10#define FLXLIBS_INCLUDE_FLXLIBS_AVAILABLEPARSEROPERATIONS_HPP_
18#include "packetformat/block_format.hpp"
31print_bytes(std::ostream& ostr,
const char* title,
const unsigned char* data, std::size_t length,
bool format =
true)
33 ostr << title << std::endl;
34 ostr << std::setfill(
'0');
35 for (
size_t i = 0; i < length; ++i) {
36 ostr << std::hex << std::setw(2) << static_cast<int>(data[i]);
38 ostr << (((i + 1) % 16 == 0) ?
"\n" :
" ");
49 const std::size_t& buffer_size)
51 auto bytes_to_copy =
size;
52 while (bytes_to_copy > 0) {
53 auto n = std::min(bytes_to_copy, buffer_size - buffer_pos);
54 std::memcpy(
static_cast<char*
>(buffer) + buffer_pos, data, n);
57 if (buffer_pos == buffer_size) {
63template<
class TargetStruct>
64inline std::function<void(
const felix::packetformat::chunk&
chunk)>
66 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
68 return [&](
const felix::packetformat::chunk&
chunk) {
70 auto subchunk_data =
chunk.subchunks();
71 auto subchunk_sizes =
chunk.subchunk_lengths();
72 auto n_subchunks =
chunk.subchunk_number();
73 std::size_t target_size =
sizeof(TargetStruct);
76 if (
chunk.length() != target_size) {
80 uint32_t bytes_copied_chunk = 0;
81 for (
unsigned i = 0; i < n_subchunks; i++) {
83 subchunk_data[i], subchunk_sizes[i],
static_cast<void*
>(&payload.data), bytes_copied_chunk, target_size);
84 bytes_copied_chunk += subchunk_sizes[i];
88 sink->send(std::move(payload), timeout);
89 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
96template<
class TargetStruct>
97inline std::function<void(
const felix::packetformat::shortchunk& shortchunk)>
99 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
101 return [&](
const felix::packetformat::shortchunk& shortchunk) {
103 std::size_t target_size =
sizeof(TargetStruct);
104 if (shortchunk.length != target_size) {
109 TargetStruct payload;
110 std::memcpy(
static_cast<char*
>(payload), shortchunk.data, target_size);
113 sink->send(std::move(payload), timeout);
114 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
121template<
class TargetStruct>
122inline std::function<void(
const felix::packetformat::chunk&
chunk)>
125 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
127 return [&](
const felix::packetformat::chunk&
chunk) {
129 auto subchunk_data =
chunk.subchunks();
130 auto subchunk_sizes =
chunk.subchunk_lengths();
131 auto n_subchunks =
chunk.subchunk_number();
132 auto target_size =
sizeof(TargetStruct);
135 if (
chunk.length() != target_size) {
139 TargetStruct* payload =
new TargetStruct[
sizeof(TargetStruct)];
141 uint_fast32_t bytes_copied_chunk = 0;
142 for (
unsigned i = 0; i < n_subchunks; i++) {
145 static_cast<void*
>(payload),
148 bytes_copied_chunk += subchunk_sizes[i];
152 sink->send(std::move(payload), timeout);
153 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
160template<
class TargetWithDatafield>
161inline std::function<void(
const felix::packetformat::chunk&)>
163 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
165 return [&](
const felix::packetformat::chunk&
chunk) {
166 auto subchunk_data =
chunk.subchunks();
167 auto subchunk_sizes =
chunk.subchunk_lengths();
168 auto n_subchunks =
chunk.subchunk_number();
169 TargetWithDatafield twd;
170 twd.get_data().reserve(
chunk.length());
171 uint32_t bytes_copied_chunk = 0;
172 for (
unsigned i = 0; i< n_subchunks; ++i) {
175 static_cast<void*
>(twd.get_data().data()),
178 bytes_copied_chunk += subchunk_sizes[i];
180 twd.set_data_size(bytes_copied_chunk);
182 sink->send(std::move(twd), timeout);
183 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
189template<
class TargetWithDatafield>
190inline std::function<void(
const felix::packetformat::shortchunk&)>
192 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
194 return [&](
const felix::packetformat::shortchunk& shortchunk) {
195 TargetWithDatafield twd;
196 twd.get_data().reserve(shortchunk.length);
197 std::memcpy(
static_cast<void*
>(twd.get_data().data()), shortchunk.data, shortchunk.length);
198 twd.set_data_size(shortchunk.length);
200 sink->send(std::move(twd), timeout);
201 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
207inline std::function<void(
const felix::packetformat::chunk&
chunk)>
209 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
211 return [&](
const felix::packetformat::chunk&
chunk) {
212 auto subchunk_data =
chunk.subchunks();
213 auto subchunk_sizes =
chunk.subchunk_lengths();
214 auto n_subchunks =
chunk.subchunk_number();
215 auto chunk_length =
chunk.length();
217 char* payload =
static_cast<char*
>(malloc(chunk_length *
sizeof(
char)));
218 uint32_t bytes_copied_chunk = 0;
219 for (
unsigned i = 0; i < n_subchunks; ++i) {
221 subchunk_data[i], subchunk_sizes[i],
static_cast<void*
>(payload), bytes_copied_chunk, chunk_length);
222 bytes_copied_chunk += subchunk_sizes[i];
226 sink->send(std::move(payload_wrapper), timeout);
227 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
233inline std::function<void(
const felix::packetformat::shortchunk& shortchunk)>
235 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
237 return [&](
const felix::packetformat::shortchunk& shortchunk) {
238 auto shortchunk_length = shortchunk.length;
239 char* payload =
static_cast<char*
>(malloc(shortchunk_length *
sizeof(
char)));
240 std::memcpy(payload, shortchunk.data, shortchunk_length);
243 sink->send(std::move(payload_wrapper), timeout);
244 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
251inline std::function<void(
const felix::packetformat::chunk&
chunk)>
253 std::chrono::milliseconds timeout = std::chrono::milliseconds(100))
255 return [&](
const felix::packetformat::chunk&
chunk) {
257 auto payload =
chunk;
258 sink->send(std::move(payload), timeout);
259 }
catch (
const dunedaq::iomanager::TimeoutExpired& excpt) {
std::function< void(const felix::packetformat::chunk &)> varsizedChunkIntoWithDatafield(std::shared_ptr< iomanager::SenderConcept< TargetWithDatafield > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
void print_bytes(std::ostream &ostr, const char *title, const unsigned char *data, std::size_t length, bool format=true)
std::function< void(const felix::packetformat::chunk &chunk)> fixsizedChunkViaHeap(std::shared_ptr< iomanager::SenderConcept< TargetStruct * > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
void dump_to_buffer(const char *data, std::size_t size, void *buffer, uint32_t buffer_pos, const std::size_t &buffer_size)
std::function< void(const felix::packetformat::chunk &chunk)> errorChunkIntoSink(std::shared_ptr< iomanager::SenderConcept< felix::packetformat::chunk > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::chunk &chunk)> varsizedChunkIntoWrapper(std::shared_ptr< iomanager::SenderConcept< fdreadoutlibs::types::VariableSizePayloadTypeAdapter > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::chunk &chunk)> fixsizedChunkInto(std::shared_ptr< iomanager::SenderConcept< TargetStruct > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &shortchunk)> fixsizedShortchunkInto(std::shared_ptr< iomanager::SenderConcept< TargetStruct > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &shortchunk)> varsizedShortchunkIntoWrapper(std::shared_ptr< iomanager::SenderConcept< fdreadoutlibs::types::VariableSizePayloadTypeAdapter > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
std::function< void(const felix::packetformat::shortchunk &)> varsizedShortchunkIntoWithDatafield(std::shared_ptr< iomanager::SenderConcept< TargetWithDatafield > > &sink, std::chrono::milliseconds timeout=std::chrono::milliseconds(100))
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk int ParserOps couldn t push to queue !Failed chunk
FELIX Initialization std::string initerror FELIX queue timed std::string queuename Unexpected chunk size
FELIX Initialization std::string initerror FELIX queue timed std::string queuename UnexpectedChunk
void error(const Issue &issue)
Convencience wrapper to take ownership over char pointers with corresponding allocated memory size.