Buffer

Buffers are needed between a data producer and a data consumer working asynchronously, in order to absorb the fluctuations in the capacity of the consumer to follow the producer throughput.

One key point in the Lima design is to receive data from the detector at its maximum speed. Lima also aims to process the data at the same rate, but this cannot be always guaranteed either by the full processing pipeline or part of it. In such case there is an iterest to fill the backend computer with buffers before the slowest task in order to ensure long image acquisition sequences.

The main motivations for buffer management in Lima are:

  • Prepare in advance the memory where the hardware (HW) and the processing tasks will write the data. Memory allocation by the OS takes time and it has to be available (with a deterministic latency) when needed.

  • Keep control of the total memory used by the Lima framework so it does not exceed the allocated resources

As the CPU-to-memory system bandwidth is limited and DAQ being an I/O bound operation, memory copies between two buffer stages should be avoided to the maximum extent in Lima. This implies to extend as much as possible the original buffer lifetime.

Buffer management can be break down to several orthogonal concepts:

  • Container (a holder object that stores a collection of other objects, here pixels)

  • Allocator (handle all the requests for allocation and deallocation of memory for a given container)

  • Ownership (sole or shared ownership)

Direct hardware writable vs standard memory buffers

The first data generator in the Lima chain is the hardware interface associated to the detector. It can be a frame grabber, using either proprietary or standard protocols, or an adapter of a standard technology like Ethernet. The next producers in the Lima chain are mainly software algorithms running on the CPU, although GPU algorithms are also envisaged.

Modern HW can intrinsically perform Direct Memory Access (DMA), that is, write into the backend computer memory without the help of the CPU. In the best configuration the first processing tasks can read directly from DMA buffers. However, depending on the HW design and software interface, the following constrains apply:

  1. General: DMA must be performed on pinned memory

  2. Hardware: DMA can be performed on a limited amount of physical memory, either contiguous or fragmented

  3. Software: the kernel driver interface might or might not export the DMA buffers to user space

    3.1. Espia: the driver mmap DMA buffers to user space

    3.2. Ethernet/standard: the kernel network stack avoids direct access of DMA buffers from user space

    3.3. Ethernet/DPDK: this project allows controlling the network adapter from user space

The solution to (1.) is to use pinned memory, by means of mechanisms like mlock. If there is a solution for (2.) and (3.) Lima can use the buffers provided/mapped by the kernel/SDK/plugin, referred to as hardware-allocated buffers, in order to feed the processing tasks. However, a solution for (2.) and/or (3.) is not always possible. In such case the Lima process need to use standard memory for its front-end buffers, which will be filled by a memory copy from DMA.

Container

Both std::vector or gil::image are well suited to store a contiguous array of pixels. gil::image implementation, and more specifically gil::image_view, is used because it gives the 2D abstraction used in Lima.

Allocators

Allocators and allocator wrappers

Several standard compatible allocators are provided with Lima2:

  • numa_allocator allocate memory on a specific NUMA node;

  • track_allocator is an allocator adaptor that keeps track of the memory allocations.

See their complete description in the C++ API ref section.

The Boost library also provides aligned_allocator and pool_allocator.

HPC Memory Allocators

Here is a list of thread-friendly allocators (e.g. allocators that do not require a memory lock):

  • ptmalloc2

  • tcmalloc

  • hoard

  • jemalloc

Special Linux kernel Allocators

Large chunks of Continuous physical memory can be allocated with:

SDK allocators

The following SDKs provide specific memory allocators:

  • Espia: buffers are allocated and mmap’ed by the Espia driver

  • Rashpa (?)

Ownership

If the processing pipeline is linear, the single-writer-single-user model fits the buffer usage, so the ownership can be implemented by a single in-use-by-reader flag. As soon as more that one task can read from a buffer, which is a general approach that Lima will follow, a reader-counter is necessary. The std::shared_ptr can be used to implement this functionality.

Buffer Lifetime / Cycle

HW buffer tracking is required to limit copy that are not meaningful or when processing can be executed in-place. The generic life cycle of a buffer is:

graph LR Available --> Filling Filling --> Ready Ready --> Used Used --> Available

Some of these states can be missing (or intrinsically skipped by atomic operations) in several plugin implementations. For instance, a plugin like the non-threaded Simulator that fills the buffers (synchronously) inside the get_image() call from Lima does not need to implement the Filling state. The Espia-based plugins will skip the Available state because the buffers enter intrinsically in Filling state as soon as they are no-longer Used.

DAQ buffer management implementation

The concept ImageBufferMgr offers high-level buffer management, used by both Lima and the camera plugin. Following the user request, Lima first asks ImageBufferMgr (through the camera plugin) the allocation of pixel buffers. During the acquisition loop Lima calls get_image() in the camera plugin, which asks ImageBufferMgr for the next free image buffer to fill, and returns it together with the frame metadata (timestamp, etc.). Lima then pass it to the processing chain, and once consumed it is returned to the ImageBufferMgr.

Two orthogonal concepts in the HW implementation are analyzed:

  • Buffer sequence order HW capability: BufferPool

  • Synchronisation of the HW buffer filling process

The ImageBufferMgr class implements the image abstraction (pixel type, width and height) on top of memory buffers. It builds on top of the BufferPool abstraction, which in turn uses the Allocator abstraction. Ideally the gil::image concept could be used as a pixel container. However its Allocator policy must be specified as a template parameter, complexifying a flexible implementation. Instead the image abstraction is given by the any_image_view concept, which can be constructed on top of raw void *s provided by any allocator. any_image_view_t provides at the same time the pixel type, the image width and height, as well as the pointer to the buffer. In particular, the image dimensions are provided by an any_image_view_t with a nullptr buffer reference.

ImageBufferMgr API

Synopsis

concept ImageBufferMgr
{
    using image_buffer_type = /* class providing image view, returned by view(); */;
    
    // Allocate nb images of size bytes each
    void allocate(any_image_view_t size, std::size_t nb_buffers);
    
    // Release previously allocated image buffers
    void release();
    
    // Returns the next image buffer to fill
    image_buffer_type get();
};

Description

The ImageBufferMgr class stores the any_image_view_t size provided to the allocate() method and calls the underlying BufferPool::allocate() method. Further calls to get() return copiable image_buffer_type objects representing the ownership of the buffers. When all the equivalent image_buffer_type instances are destroyed, the buffer ownerwhip is returned to the BufferPool.

The ImageBufferMgr::image_buffer_type could implement the RAII (Resource Acquisition Is Initialisation) paradigm. This could be a possible implementation, where the BufferPool policy is provided as a template class, is:

template <class BufferPool>
class ImageBuffer
{
public:
    ImageBuffer(any_image_view_t size, BufferPool& pool) : m_data(new Data(size, pool)) {}
    ImageBuffer(const ImageBuffer& o) = default;
    ImageBuffer(ImageBuffer&& o) = default;

    any_image_view_t& view()
    { return m_data->view; }

private:
    struct Data {
        // implament RAII paradigm
        struct PoolBuffer {
            using buffer_type = BufferPool::buffer_type;
    
            Pool& pool;
            const buffer_type& buffer;

            PoolBuffer(Pool& p) : pool(p), buffer(pool.get_buffer()) {}
	    ~PoolBuffer() { pool.put_buffer(buffer); }
	} pool_buffer;

        any_image_view_t view;

        Data(any_image_t size, BufferPool& p)
	    : pool_buffer(p),
              view(gil::make_view_from_ptr(size, pool_buffer.buffer.ptr()))
	{}
    }
    std::shared_ptr<Data> m_data;
};

Buffer sequence order HW capability: BufferPool

Two alternatives are foreseen:

  • Ordered BufferPool: Strict, predefined order in the buffer filling sequence

  • Unordered BufferPool: Arbitrarily ordered buffer filling sequence

BufferPool API

The BufferPool implementation must provide the following API:

concept BufferPool
{
    using buffer_type = /* class providing access to buffer, returned by ptr() */;

    // Allocate `nb_buffers` of `size` bytes each
    void allocate(std::size_t size, std::size_t nb_buffers);

    // Free the previously allocated buffers
    void release();

    // Get the next buffer to fill
    const buffer_type& get_buffer();

    // Return a no-longer used buffer to the pool
    void put_buffer(const buffer_type& buffer);
};

BufferPool & SyncPolicy

Since the acquisition and processing tasks are asynchronous and the buffers can be released at any time after their are filled, a synchronisation is needed between the get and put methods in order to safely detect overrun conditions raising from indepedent producer and consumer rates. In addition, multiple synchronisation policies are desirable, like blocking and non-blocking.

The BufferPool builds the synchronisation semantics on top of a SyncContainer and SyncPolicy, which allows different synchronisation strategies. SyncContainer inherits from a Container that holds buffer elements:

classDiagram Container <|-- SyncContainer SyncContainer <|-- BufferPool SyncPolicy <|-- BufferPool Container : struct element_type {...} Container : const element_type& get_element() Container : void get_element(const element_type& e) SyncContainer : struct element_type : public Container::element_type {...} SyncContainer : bool is_get_ok() SyncContainer : bool is_put_ok(const element_type& e) SyncContainer : bool is_put_making_get_ok(const element_type& e) SyncContainer : const element_type& get_element() SyncContainer : void put_element(const element_type& e) SyncPolicy : struct buffer_type : public SyncContainer::element_type {...} SyncPolicy : void pre_get() SyncPolicy : void post_get(const buffer_type& e) SyncPolicy : void pre_put(const buffer_type& e) SyncPolicy : void post_put() BufferPool : const buffer_type& get_buffer() BufferPool : void put_buffer(const buffer& b)

The following generic code for BufferPool completely relies on the SyncPolicy:

template <class SyncPolicy>
class BufferPool : public typename SyncPolicy::SyncContainer, public SyncPolicy
{
public:
    using SyncContainer = typename SyncPolicy::SyncContainer;

    const buffer_type& get_buffer()
    {
        SyncPolicy::pre_get();
	buffer_type& b = SyncContainer::get_element();
	SyncPolicy::post_get(b);
	return b;
    }

    void put_buffer(const buffer_type& b)
    {
        SyncPolicy::pre_put(b);
	SyncContainer::put_element(b);
	SyncPolicy::post_put();
    }
};

Two basic SyncPolicys are foreseen:

  • NonBlockingSync: An overrun_error exception is thrown on pre_get() if !is_get_ok(). If a reference to the last written frame by the HW is available after a latency smaller than the main plugin loop, the is_put_ok() method verifies if the just-freed buffer was not overwritten during its processing. If the check fails, post_put() throws an overrun_error.

  • BlockingSync: Getting a new buffer will block until it’s available, with an optional timeout specification. A std::condition_variable and its corresponding std::mutex are added. pre_get() will wait until is_get_ok(). pre_put() will check !is_get_ok() && is_put_making_get_ok() and, if it is the case, post_put() will wake-up the waiting task.

template <class SyncContainer>
class NonBlockingSync
{
public:
    struct buffer_type : public typename SyncContainer::element_type {};

    void pre_get()
    { if (!SyncContainer::is_get_ok()) throw overrun_error(); }

    void post_get(const buffer_type& e)
    {}

    void pre_put(const buffer_type& e)
    { m_overwritten = !SyncContainer::is_put_ok(e); }

    void post_put()
    { if (m_overwritten) throw overrun_error(); }

private:
    bool m_overwritten;
};

template <class SyncContainer>
class BlockingSync
{
public:
    struct buffer_type : public typename SyncContainer::element_type {};

    void pre_get()
    {
        std::unique_lock<std::mutex> l(m_mutex);
        if (!SyncContainer::is_get_ok())
	    m_cond.wait(m_mutex, [&] () { return SyncContainer::is_get_ok(); });
    }

    void post_get(const buffer_type& e)
    {}

    void pre_put(const buffer_type& e)
    {
        std::lock_guard<std::mutex> l(m_mutex);
        m_available = (!SyncContainer::is_get_ok() &&
                       SyncContainer::is_put_making_get_ok(e));
    }

    void post_put()
    {
        std::lock_guard<std::mutex> l(m_mutex);
        if (m_available)
	    m_cond.notify_one();
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_cond;
    bool m_available;
};

Note

The SyncPolicy::buffer_type hook allows inserting variables in each buffer element needed by that policy.

Ordered BufferPool

The order in which buffers are filled is predefined and must be respected. The sequence can typically be repeated in a virtually endless loop. The buffers are normally allocated by the SDK; they can also be allocated by the user and transferred to the SDK before the acquisition.

Camera: Espia, RASHPA (?)

Possible implementation: std::vector, providing a basic, read-pointer-only ring buffer, like this:

template <class BufferPool, class Allocator = std::allocator<unsigned char>>
class OrderedContainer
{
public:
    struct element_type {
        void *p;
        void *ptr() const { return p; }
    };
    using buffer_type = typename BufferPool::buffer_type;

    // Allocate buffers through Allocator
    void allocate(std::size_t size, std::size_t nb_elements);

    // Release all allocated elements
    void release();

    // Returns the next element in the ring and update the get index
    const element_type& get_element()
    {
        element_type& e = get_next_element();
	m_next_get = (m_next_get + 1) % m_element_list.size();
	return e;
    }

    // Nothing to be done
    void put_element(const element_type& e)
    {}

    std::size_t get_element_index(const element_type& e)
    {
        const buffer_type& b = e;
        return &b - &m_element_list[0];
    }

protected:
    // Returns the next element in the ring
    element_type& get_next_element()
    {
        return m_element_list[m_next_get];
    }

private:
    Allocator m_allocator;
    std::vector<buffer_type> m_element_list;
    std::size_t m_next_get;
};


template <class BufferPool,
          class Allocator = std::allocator<unsigned char>>
class OrderedSyncContainer : public OrderedContainer<BufferPool, Allocator>
{
public:
    using Container =  OrderedContainer<BufferPool, Allocator>;
    struct element_type : public typename Container::element_type {
        std::atomic<bool> in_use;
    };

    bool is_get_ok()
    {
        element_type& next = Container::get_next_element();
        return !next.in_use;
    }

    // to be overloaded by HW plugin if can detect that data was overwritten
    bool is_put_ok(const element_type& e)
    {
        return true;
    }

    bool is_put_making_get_ok(const element_type& e)
    {
        element_type& next = Container::get_next_element();
	return &e == &next;
    }

    const element_type& get_element()
    {
        element_type& e = Container::get_element();
	e.in_use = true;
	return e;
    }

    void put_element(const element_type& e)
    {
        element_type& e = const_cast<element_type&>(e);
	e.in_use = false;
	Container::put_element(e);
    }
};

Unordered BufferPool

The sequence of buffers to write on can be arbitrarily ordered. The plugin asks for a free buffer to fill.

Camera: Simulator, PSI/SlsDetector

Possible implementation: Buffer Boost Pool.

Overrun: Pool throws an exceptions or returns NULL

Synchronisation of the HW buffer filling process

Two scenarios are envisaged in the synchronisation between buffer filling and Lima get_image() call in the DAQ loop:

  • Synchronous buffer filling

  • Asynchronous buffer filling

Lima strongly favors synchronous buffer filling in order to improve performance and reduce complexity. The goal is to make the Lima DAQ loop as fast as possible in order to ensure data integrity in synchronous mode.

Synchronous buffer filling

The HW plugin fills the buffer synchronously inside the Lima get_image() call. Again, this is the preferred mechanism in Lima.

Camera: non-threaded Simulator, Espia, Rashpha (?), PSI/SlsDetector (passive receiver)

Implementation: Ask the buffer API for the next buffer to write on, fill it and return it to Lima.

Asynchronous buffer filling

Buffers are filled asynchronously to the Lima main DAQ loop, and a dedicated thread/event_loop (either in the plugin or in the SDK) manage the Available -> Filling -> Ready state transitions.

Camera: threaded Simulator, PSI/SlsDetector (native)

Possible implementation: Lockfree Queue.

Interprocess buffer sharing

MPI offers fast, zero-extra-copy mechanisms to share data between processes on the same node, implemented by the Vader shared memory Byte Transport Layer (BTL).