DUNE-DAQ
DUNE Trigger and Data Acquisition software
Loading...
Searching...
No Matches
uhallibs::Flx Class Reference

Transport protocol to transfer an IPbus buffer via device file, using mmap. More...

#include <ProtocolFlx.hpp>

Inheritance diagram for uhallibs::Flx:
[legend]
Collaboration diagram for uhallibs::Flx:
[legend]

Classes

class  Card
 
struct  HexTo
 

Public Member Functions

 Flx (const std::string &aId, const uhal::URI &aUri)
 
virtual ~Flx ()
 Destructor.
 

Private Types

typedef ipc::RobustMutex IPCMutex_t
 
typedef std::unique_lock< IPCMutex_tIPCScopedLock_t
 
typedef IPbus< 2, 0 > InnerProtocol
 
typedef std::chrono::steady_clock SteadyClock_t
 

Private Member Functions

 Flx (const Flx &aFlx)
 
Flxoperator= (const Flx &aFlx)
 
void implementDispatch (std::shared_ptr< uhal::Buffers > aBuffers)
 
virtual void Flush ()
 Concrete implementation of the synchronization function to block until all buffers have been sent, all replies received and all data validated.
 
virtual void dispatchExceptionHandler ()
 Function which tidies up this protocol layer in the event of an exception.
 
uint32_t getMaxSendSize ()
 
uint32_t getMaxReplySize ()
 
void connect ()
 Set up the connection to the device.
 
void connect (IPCScopedLock_t &)
 Set up the connection to the device.
 
void disconnect ()
 Close the connection to the device.
 
void write (const std::shared_ptr< uhal::Buffers > &aBuffers)
 Write request packet to next page in host-to-FPGA device file.
 
void read ()
 Read next pending reply packet from appropriate page of FPGA-to-host device file, and validate contents.
 

Static Private Member Functions

static std::string getSharedMemName (const std::string &aPath)
 

Private Attributes

bool mConnected
 
Card mDeviceFile
 
ipc::SharedMemObject< IPCMutex_tmIPCMutex
 
bool mIPCExternalSessionActive
 
uint64_t mIPCSessionCount
 
std::chrono::microseconds mSleepDuration
 
uint32_t mNumberOfPages
 
uint32_t mPageSize
 
uint32_t mIndexNextPage
 
uint32_t mPublishedReplyPageCount
 
uint32_t mReadReplyPageCount
 
std::deque< std::shared_ptr< uhal::Buffers > > mReplyQueue
 The list of buffers still awaiting a reply.
 
uhal::exception::exception * mAsynchronousException
 

Detailed Description

Transport protocol to transfer an IPbus buffer via device file, using mmap.

Definition at line 107 of file ProtocolFlx.hpp.

Member Typedef Documentation

◆ InnerProtocol

IPbus< 2 , 0 > uhallibs::Flx::InnerProtocol
private

Definition at line 197 of file ProtocolFlx.hpp.

◆ IPCMutex_t

Definition at line 178 of file ProtocolFlx.hpp.

◆ IPCScopedLock_t

std::unique_lock<IPCMutex_t> uhallibs::Flx::IPCScopedLock_t
private

Definition at line 179 of file ProtocolFlx.hpp.

◆ SteadyClock_t

std::chrono::steady_clock uhallibs::Flx::SteadyClock_t
private

Definition at line 199 of file ProtocolFlx.hpp.

Constructor & Destructor Documentation

◆ Flx() [1/2]

uhallibs::Flx::Flx ( const Flx & aFlx)
private

◆ Flx() [2/2]

uhallibs::Flx::Flx ( const std::string & aId,
const uhal::URI & aUri )

Constructor

Parameters
aIdthe uinique identifier that the client will be given.
aUria struct containing the full URI of the target.

Definition at line 289 of file ProtocolFlx.cpp.

289 :
290 IPbus< 2 , 0 > ( aId , aUri ),
291 mConnected(false),
292 mDeviceFile(aUri.mHostname, LOCK_NONE),
293 mIPCMutex(getSharedMemName(aUri.mHostname)),
295 mPageSize(0),
300{
301 mSleepDuration = std::chrono::microseconds(50);
302
303 for (uhal::NameValuePairVectorType::const_iterator lIt = aUri.mArguments.begin(); lIt != aUri.mArguments.end(); lIt++) {
304 if (lIt->first == "sleep") {
305 mSleepDuration = std::chrono::microseconds(std::stoul(lIt->second));
306 log (uhal::Notice() , "flx client with URI ", uhal::Quote (uri()), " : Inter-poll-/-interrupt sleep duration set to ", std::stoul(lIt->second), " us by URI 'sleep' attribute");
307 }
308 // else if (lIt->first == "offset") {
309 // const bool lIsHex = (lIt->second.find("0x") == 0) or (lIt->second.find("0X") == 0);
310 // const size_t lOffset = (lIsHex ? std::lexical_cast<HexTo<size_t> >(lIt->second) : std::stoul(lIt->second));
311 // mDeviceFile.setOffset(lOffset);
312 // log (uhal::Notice(), "flx client with URI ", uhal::Quote (uri()), " : Address offset set to ", uhal::Integer(lOffset, IntFmt<hex>()));
313 // }
314 else {
315 log (uhal::Warning() , "Unknown attribute ", uhal::Quote (lIt->first), " used in URI ", uhal::Quote(uri()));
316 }
317 }
318}
static std::string getSharedMemName(const std::string &aPath)
uint32_t mPublishedReplyPageCount
uint32_t mNumberOfPages
std::chrono::microseconds mSleepDuration
uint32_t mPageSize
ipc::SharedMemObject< IPCMutex_t > mIPCMutex
uint32_t mReadReplyPageCount
uhal::exception::exception * mAsynchronousException
uint32_t mIndexNextPage
Unsupported std::string uri Execution of command std::string error Failed to create CommandFacility uri
Definition Issues.hpp:77

◆ ~Flx()

uhallibs::Flx::~Flx ( )
virtual

Destructor.

Definition at line 321 of file ProtocolFlx.cpp.

322{
323 disconnect();
324}
void disconnect()
Close the connection to the device.

Member Function Documentation

◆ connect() [1/2]

void uhallibs::Flx::connect ( )
private

Set up the connection to the device.

Definition at line 387 of file ProtocolFlx.cpp.

388{
389 IPCScopedLock_t lLockGuard(*mIPCMutex);
390 connect(lLockGuard);
391}
void connect()
Set up the connection to the device.
std::unique_lock< IPCMutex_t > IPCScopedLock_t

◆ connect() [2/2]

void uhallibs::Flx::connect ( IPCScopedLock_t & aGuard)
private

Set up the connection to the device.

Definition at line 394 of file ProtocolFlx.cpp.

395{
396 // Read current value of session counter when reading status info from FPGA
397 // (So that can check whether this info is up-to-date later on, when sending next request packet)
399 mIPCSessionCount = mIPCMutex->getCounter();
400
401 log ( uhal::Debug() , "flx client is opening device file " , uhal::Quote ( mDeviceFile.getPath() ) );
402 std::vector<uint32_t> lValues;
403 mDeviceFile.read(0x0, 4, lValues);
404 log (uhal::Debug(), "Read status info from addr 0 (", uhal::Integer(lValues.at(0)), ", ", uhal::Integer(lValues.at(1)), ", ", uhal::Integer(lValues.at(2)), ", ", uhal::Integer(lValues.at(3)), "): ", PacketFmt((const uint8_t*)lValues.data(), 4 * lValues.size()));
405 aGuard.unlock();
406
407 mNumberOfPages = lValues.at(0);
408 // mPageSize = std::min(uint32_t(4096), lValues.at(1));
409 mPageSize = lValues.at(1);
410 mIndexNextPage = lValues.at(2);
411 mPublishedReplyPageCount = lValues.at(3);
413
414 if (lValues.at(1) > 0xFFFF) {
415 exception::FlxInitialisationError lExc;
416 log (lExc, "Invalid page size, ", uhal::Integer(lValues.at(1)), ", reported in device file ", uhal::Quote(mDeviceFile.getPath()));
417 throw lExc;
418 }
419
421 exception::FlxInitialisationError lExc;
422 log (lExc, "Next page index, ", uhal::Integer(mIndexNextPage), ", reported in device file ", uhal::Quote(mDeviceFile.getPath()), " is inconsistent with number of pages, ", uhal::Integer(mNumberOfPages));
423 throw lExc;
424 }
425
426 mConnected = true;
427 log ( uhal::Info() , "flx client connected to device at ", uhal::Quote(mDeviceFile.getPath()), "; FPGA has ", uhal::Integer(mNumberOfPages), " pages, each of size ", uhal::Integer(mPageSize), " words, index ", uhal::Integer(mIndexNextPage), " should be filled next" );
428}
const std::string & getPath() const
bool haveLock() const
void read(const uint32_t aAddr, const uint32_t aNrWords, std::vector< uint32_t > &aValues)
bool mIPCExternalSessionActive
uint64_t mIPCSessionCount

◆ disconnect()

void uhallibs::Flx::disconnect ( )
private

Close the connection to the device.

Definition at line 431 of file ProtocolFlx.cpp.

432{
433 log ( uhal::Debug() , "flx client is closing device file " , uhal::Quote ( mDeviceFile.getPath() ) );
435 mConnected = false;
436}

◆ dispatchExceptionHandler()

void uhallibs::Flx::dispatchExceptionHandler ( )
privatevirtual

Function which tidies up this protocol layer in the event of an exception.

Definition at line 354 of file ProtocolFlx.cpp.

355{
356 // FIXME: Adapt to PCIe implementation
357 // log(uhal::Notice(), "flx client ", uhal::Quote(id()), " (URI: ", uhal::Quote(uri()), ") : closing device files since exception detected");
358
359 // ClientInterface::returnBufferToPool ( mReplyQueue );
360
362
363 disconnect();
364
365 InnerProtocol::dispatchExceptionHandler();
366}

◆ Flush()

void uhallibs::Flx::Flush ( )
privatevirtual

Concrete implementation of the synchronization function to block until all buffers have been sent, all replies received and all data validated.

Definition at line 340 of file ProtocolFlx.cpp.

341{
342 log(uhal::Debug(), "flx client (URI: ", uhal::Quote(uri()), ") : Flush method called");
343 while ( !mReplyQueue.empty() )
344 read();
345
347
348 IPCScopedLock_t lLockGuard(*mIPCMutex);
349 mIPCMutex->endSession();
350
351}
std::deque< std::shared_ptr< uhal::Buffers > > mReplyQueue
The list of buffers still awaiting a reply.
void read()
Read next pending reply packet from appropriate page of FPGA-to-host device file, and validate conten...

◆ getMaxReplySize()

uint32_t uhallibs::Flx::getMaxReplySize ( )
private

Return the maximum size of reply packet based on the buffer size in the target

Returns
the maximum size of reply packet

Definition at line 378 of file ProtocolFlx.cpp.

379{
380 if ( ! mConnected )
381 connect();
382
383 return (mPageSize - 1) * 4;
384}

◆ getMaxSendSize()

uint32_t uhallibs::Flx::getMaxSendSize ( )
private

Return the maximum size to be sent based on the buffer size in the target

Returns
the maximum size to be sent

Definition at line 369 of file ProtocolFlx.cpp.

370{
371 if ( ! mConnected )
372 connect();
373
374 return (mPageSize - 1) * 4;
375}

◆ getSharedMemName()

std::string uhallibs::Flx::getSharedMemName ( const std::string & aPath)
staticprivate

Definition at line 280 of file ProtocolFlx.cpp.

281{
282 std::string lSanitizedPath(aPath);
283 std::replace(lSanitizedPath.begin(), lSanitizedPath.end(), '/', ':');
284
285 return "/uhal::ipbusflx-2.0::" + lSanitizedPath;
286}

◆ implementDispatch()

void uhallibs::Flx::implementDispatch ( std::shared_ptr< uhal::Buffers > aBuffers)
private

Send the IPbus buffer to the target, read back the response and call the packing-protocol's validate function

Parameters
aBuffersthe buffer object wrapping the send and recieve buffers that are to be transported If multithreaded, adds buffer to the dispatch queue and returns. If single-threaded, calls the dispatch-worker dispatch function directly and blocks until the response is validated.

Definition at line 327 of file ProtocolFlx.cpp.

328{
329 log(uhal::Debug(), "flx client (URI: ", uhal::Quote(uri()), ") : implementDispatch method called");
330
331 if ( ! mConnected )
332 connect();
333
334 if ( mReplyQueue.size() == mNumberOfPages )
335 read();
336 write(aBuffers);
337}
void write(const std::shared_ptr< uhal::Buffers > &aBuffers)
Write request packet to next page in host-to-FPGA device file.

◆ operator=()

Flx & uhallibs::Flx::operator= ( const Flx & aFlx)
private

◆ read()

void uhallibs::Flx::read ( )
private

Read next pending reply packet from appropriate page of FPGA-to-host device file, and validate contents.

Definition at line 469 of file ProtocolFlx.cpp.

470{
471 const size_t lPageIndexToRead = (mIndexNextPage - mReplyQueue.size() + mNumberOfPages) % mNumberOfPages;
472 SteadyClock_t::time_point lStartTime = SteadyClock_t::now();
473
475 {
476 uint32_t lHwPublishedPageCount = 0x0;
477
478 std::vector<uint32_t> lValues;
479 while ( true ) {
480 IPCScopedLock_t lGuard(*mIPCMutex);
481 // FIXME : Improve by simply adding dmaWrite method that takes uint32_t ref as argument (or returns uint32_t)
482 mDeviceFile.read(0, 4, lValues);
483 lHwPublishedPageCount = lValues.at(3);
484 // log (uhal::Info(), "Read status info from addr 0 (", uhal::Integer(lValues.at(0)), ", ", uhal::Integer(lValues.at(1)), ", ", uhal::Integer(lValues.at(2)), ", ", uhal::Integer(lValues.at(3)), "): ", PacketFmt((const uint8_t*)lValues.data(), 4 * lValues.size()));
485 log (uhal::Debug(), "Read status info from addr 0 (", uhal::Integer(lValues.at(0)), ", ", uhal::Integer(lValues.at(1)), ", ", uhal::Integer(lValues.at(2)), ", ", uhal::Integer(lValues.at(3)), "): ", PacketFmt((const uint8_t*)lValues.data(), 4 * lValues.size()));
486
487 if (lHwPublishedPageCount != mPublishedReplyPageCount) {
488 mPublishedReplyPageCount = lHwPublishedPageCount;
489 break;
490 }
491 // FIXME: Throw if published page count is invalid number
492
493 if (SteadyClock_t::now() - lStartTime > std::chrono::microseconds(getBoostTimeoutPeriod().total_microseconds())) {
494 exception::FlxTimeout lExc;
495 log(lExc, "Next page (index ", uhal::Integer(lPageIndexToRead), " count ", uhal::Integer(mPublishedReplyPageCount+1), ") of flx device '" + mDeviceFile.getPath() + "' is not ready after timeout period");
496 throw lExc;
497 }
498
499 log(uhal::Debug(), "flx client ", uhal::Quote(id()), " (URI: ", uhal::Quote(uri()), ") : Trying to read page index ", uhal::Integer(lPageIndexToRead), " = count ", uhal::Integer(mReadReplyPageCount+1), "; published page count is ", uhal::Integer(lHwPublishedPageCount), "; sleeping for ", mSleepDuration.count(), "us");
500 if (mSleepDuration > std::chrono::microseconds(0))
501 std::this_thread::sleep_for( mSleepDuration );
502 lValues.clear();
503 }
504
505 log(uhal::Info(), "flx client ", uhal::Quote(id()), " (URI: ", uhal::Quote(uri()), ") : Reading page ", uhal::Integer(lPageIndexToRead), " (published count ", uhal::Integer(lHwPublishedPageCount), ", surpasses required, ", uhal::Integer(mReadReplyPageCount + 1), ")");
506 }
508
509 // PART 1 : Read the page
510 std::shared_ptr<uhal::Buffers> lBuffers = mReplyQueue.front();
511 mReplyQueue.pop_front();
512
513 uint32_t lNrWordsToRead(lBuffers->replyCounter() >> 2);
514 lNrWordsToRead += 1;
515
516 std::vector<uint32_t> lPageContents;
517 IPCScopedLock_t lGuard(*mIPCMutex);
518 mDeviceFile.read(4 + lPageIndexToRead * mPageSize, lNrWordsToRead , lPageContents);
519 lGuard.unlock();
520 log (uhal::Debug(), "Read " , uhal::Integer(lNrWordsToRead), " 32-bit words from address " , uhal::Integer(4 + lPageIndexToRead * 4 * mPageSize), " ... ", PacketFmt((const uint8_t*)lPageContents.data(), 4 * lPageContents.size()));
521
522 // PART 2 : Transfer to reply buffer
523 const std::deque< std::pair< uint8_t* , uint32_t > >& lReplyBuffers ( lBuffers->getReplyBuffer() );
524 size_t lNrWordsInPacket = (lPageContents.at(0) >> 16) + (lPageContents.at(0) & 0xFFFF);
525 if (lNrWordsInPacket != (lBuffers->replyCounter() >> 2))
526 log (uhal::Warning(), "Expected reply packet to contain ", uhal::Integer(lBuffers->replyCounter() >> 2), " words, but it actually contains ", uhal::Integer(lNrWordsInPacket), " words");
527
528 size_t lNrBytesCopied = 0;
529 for ( std::deque< std::pair< uint8_t* , uint32_t > >::const_iterator lIt = lReplyBuffers.begin() ; lIt != lReplyBuffers.end() ; ++lIt )
530 {
531 // Don't copy more of page than was written to, for cases when less data received than expected
532 if ( lNrBytesCopied >= 4*lNrWordsInPacket)
533 break;
534
535 size_t lNrBytesToCopy = std::min( lIt->second , uint32_t(4*lNrWordsInPacket - lNrBytesCopied) );
536 memcpy ( lIt->first, &lPageContents.at(1 + (lNrBytesCopied / 4)), lNrBytesToCopy );
537 lNrBytesCopied += lNrBytesToCopy;
538 }
539
540
541 // PART 3 : Validate the packet contents
542 try
543 {
544 if ( uhal::exception::exception* lExc = ClientInterface::validate ( lBuffers ) ) //Control of the pointer has been passed back to the client interface
545 {
547 }
548 }
549 catch ( uhal::exception::exception& aExc )
550 {
551 mAsynchronousException = new uhal::exception::ValidationError ();
552 log ( *mAsynchronousException , "Exception caught during reply validation for flx device with URI " , uhal::Quote ( this->uri() ) , "; what returned: " , uhal::Quote ( aExc.what() ) );
553 }
554
556 {
557 mAsynchronousException->throwAsDerivedType();
558 }
559}

◆ write()

void uhallibs::Flx::write ( const std::shared_ptr< uhal::Buffers > & aBuffers)
private

Write request packet to next page in host-to-FPGA device file.

Definition at line 439 of file ProtocolFlx.cpp.

440{
441 if (not mDeviceFile.haveLock()) {
443
444 IPCScopedLock_t lGuard(*mIPCMutex);
445 mIPCMutex->startSession();
447
448 if (mIPCExternalSessionActive or (mIPCMutex->getCounter() != mIPCSessionCount)) {
449 connect(lGuard);
450 }
451 }
452
453 log (uhal::Info(), "flx client ", uhal::Quote(id()), " (URI: ", uhal::Quote(uri()), ") : writing ", uhal::Integer(aBuffers->sendCounter() / 4), "-word packet to page ", uhal::Integer(mIndexNextPage), " in ", uhal::Quote(mDeviceFile.getPath()));
454
455 const uint32_t lHeaderWord = (0x10000 | (((aBuffers->sendCounter() / 4) - 1) & 0xFFFF));
456 std::vector<std::pair<const uint8_t*, size_t> > lDataToWrite;
457 lDataToWrite.push_back( std::make_pair(reinterpret_cast<const uint8_t*>(&lHeaderWord), sizeof lHeaderWord) );
458 lDataToWrite.push_back( std::make_pair(aBuffers->getSendBuffer(), aBuffers->sendCounter()) );
459
460 IPCScopedLock_t lGuard(*mIPCMutex);
461 mDeviceFile.write(mIndexNextPage * mPageSize, lDataToWrite);
462 log (uhal::Debug(), "Wrote " , uhal::Integer((aBuffers->sendCounter() / 4) + 1), " 32-bit words at address " , uhal::Integer(mIndexNextPage * 4 * mPageSize), " ... ", PacketFmt(lDataToWrite));
463
465 mReplyQueue.push_back(aBuffers);
466}
void write(const uint32_t aAddr, const std::vector< std::pair< const uint8_t *, size_t > > &aData)

Member Data Documentation

◆ mAsynchronousException

uhal::exception::exception* uhallibs::Flx::mAsynchronousException
private

A pointer to an exception object for passing exceptions from the worker thread to the main thread. Exceptions must always be created on the heap (i.e. using new) and deletion will be handled in the main thread

Definition at line 248 of file ProtocolFlx.hpp.

◆ mConnected

bool uhallibs::Flx::mConnected
private

Definition at line 228 of file ProtocolFlx.hpp.

◆ mDeviceFile

Card uhallibs::Flx::mDeviceFile
private

Definition at line 230 of file ProtocolFlx.hpp.

◆ mIndexNextPage

uint32_t uhallibs::Flx::mIndexNextPage
private

Definition at line 239 of file ProtocolFlx.hpp.

◆ mIPCExternalSessionActive

bool uhallibs::Flx::mIPCExternalSessionActive
private

Definition at line 233 of file ProtocolFlx.hpp.

◆ mIPCMutex

ipc::SharedMemObject<IPCMutex_t> uhallibs::Flx::mIPCMutex
private

Definition at line 232 of file ProtocolFlx.hpp.

◆ mIPCSessionCount

uint64_t uhallibs::Flx::mIPCSessionCount
private

Definition at line 234 of file ProtocolFlx.hpp.

◆ mNumberOfPages

uint32_t uhallibs::Flx::mNumberOfPages
private

Definition at line 239 of file ProtocolFlx.hpp.

◆ mPageSize

uint32_t uhallibs::Flx::mPageSize
private

Definition at line 239 of file ProtocolFlx.hpp.

◆ mPublishedReplyPageCount

uint32_t uhallibs::Flx::mPublishedReplyPageCount
private

Definition at line 239 of file ProtocolFlx.hpp.

◆ mReadReplyPageCount

uint32_t uhallibs::Flx::mReadReplyPageCount
private

Definition at line 239 of file ProtocolFlx.hpp.

◆ mReplyQueue

std::deque< std::shared_ptr< uhal::Buffers > > uhallibs::Flx::mReplyQueue
private

The list of buffers still awaiting a reply.

Definition at line 242 of file ProtocolFlx.hpp.

◆ mSleepDuration

std::chrono::microseconds uhallibs::Flx::mSleepDuration
private

Definition at line 237 of file ProtocolFlx.hpp.


The documentation for this class was generated from the following files: