DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
ipm

Inter-Process Messaging

The IPM library provides the low-level library for for sending messages between DUNE DAQ processes. IPM deals with messages consisting of arrays of bytes: higher-level concepts such as object serialization/deserialization will be handled by other libraries and processes building on IPM.

IPM provides two communication patterns:

  1. Sender/Receiver, a pattern in which one sender talks to one receiver
  2. Publisher/Subscriber, a pattern in which one sender talks to zero or more receivers. Each message goes to all subscribers

Users should interact with IPM via the interfaces dunedaq::ipm::Sender, dunedaq::ipm::Receiver and dunedaq::ipm::Subscriber, which are created using the factory functions dunedaq::ipm::makeIPM(Sender|Receiver|Subscriber), which each take a string argument giving the implementation type. The currently-available implementation types all use ZeroMQ, and are:

Additioanlly, the CallbackAdapter class implements callback functionality for the ZmqReceiver and ZmqSubscriber class, since ZeroMQ does not have a native callback functionality. It does this by managing a thread which calls receive in a loop, calling the given function when data is returned.

Basic example of the sender/receiver pattern:

++
// Sender side
std::shared_ptr<dunedaq::ipm::Sender> sender=dunedaq::ipm::makeIPMSender("ZmqSender");
sender->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Last arg is send timeout
sender->send(message, message_size, std::chrono::milliseconds(10));
// Receiver side
std::shared_ptr<dunedaq::ipm::Receiver> receiver=dunedaq::ipm::makeIPMReceiver("ZmqReceiver");
receiver->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
// Arg is receive timeout
Receiver::Response response=receiver->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata

Basic example of the publisher/subscriber pattern:

++
// Publisher side
std::shared_ptr<dunedaq::ipm::Sender> publisher=dunedaq::ipm::makeIPMSender("ZmqPublisher");
publisher->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Third arg is send timeout; last arg is topic for subscribers to subscribe to
publisher->send(message, message_size, std::chrono::milliseconds(10), "topic");
// Subscriber side
std::shared_ptr<dunedaq::ipm::Subscriber> subscriber=dunedaq::ipm::makeIPMReceiver("ZmqSubscriber");
subscriber->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
subscriber->subscribe("topic");
// Arg is receive timeout
Receiver::Response response=subscriber->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata

More complete examples can be found in the test/plugins directory.

There is an asymmetry between send and receive, where send takes a void* and receive returns a std::vector<char>. This is a result of the fact that IPM does not own the memory being passed to send, but it does have to transfer the memory returned from receive.

API Diagram

Class Diagrams

ZeroMQ Configuration Variables

Currently, ZmqContext.hpp has two environment variables used to configure ZeroMQ within each application:

  • IPM_ZMQ_IO_THREADS: Sets the number of threads in the ZeroMQ context. ipm does not specify a default, the ZeroMQ default is 1.
  • IPM_ZMQ_MAX_SOCKETS: Set the maximum number of sockets allowed on the context. ipm uses a minimum value of 16636.