Source code for pyacq.core.stream.ringbuffer

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

import numpy as np

from .sharedarray import SharedMem, SharedArray
from .arraytools import make_dtype


[docs]class RingBuffer: """Class that collects data as it arrives from an InputStream and writes it into a single- or double-ring buffer. This allows the user to request the concatenated history of data received by the stream, up to a predefined length. Double ring buffers allow faster, copyless reads at the expense of doubled write time and memory footprint. """ def __init__(self, shape, dtype, double=True, shmem=None, fill=None, axisorder=None): self.double = double self.shape = shape dtype = make_dtype(dtype) # fix dtype serialization # order of axes as written in memory. This does not affect the shape of the # buffer as seen by the user, but can be used to make sure a specific axis # is contiguous in memory. if axisorder is None: axisorder = np.arange(len(shape)) self.axisorder = np.array(axisorder) shape = (shape[0] * (2 if double else 1),) + shape[1:] nativeshape = np.array(shape)[self.axisorder] # initialize int buffers with 0 and float buffers with nan if fill is None: fill = 0 if make_dtype(dtype).kind in 'ui' else np.nan self._filler = fill if shmem is None: self.buffer = np.empty(nativeshape, dtype=dtype).transpose(np.argsort(axisorder)) self.buffer[:] = self._filler self._indexes = np.zeros((2,), dtype='int64') self._shmem = None self.shm_id = None else: size = np.product(shape) * make_dtype(dtype).itemsize + 16 if shmem is True: # create new shared memory buffer self._shmem = SharedMem(nbytes=size) else: self._shmem = SharedMem(nbytes=size, shm_id=shmem) buf = self._shmem.to_numpy(offset=16, dtype=dtype, shape=nativeshape) self.buffer = buf.transpose(np.argsort(axisorder)) self._indexes = self._shmem.to_numpy(offset=0, dtype='int64', shape=(2,)) self.shm_id = self._shmem.shm_id self.dtype = self.buffer.dtype if shmem in (None, True): # Index of last writable sample + 1. This value is used to determine which # buffer indices map to which data indices (where buffer indices wrap # around to 0, but data indices always increase as data arrives). self._set_write_index(0) # Index of last written sample + 1. This is used to determine how much of # the buffer is *valid* for reading. self._set_read_index(0) # Note: read_index and write_index are defined independently to avoid # race condifions with processes reading and writing from the same # shared memory simultaneously. When new data arrives: # 1. write_index is increased to indicate that the buffer has advanced # and some old data is no longer valid # 2. new data is written over the old buffer data # 3. read_index is increased to indicate that the new data is now # readable # # write_index-bsize break_index read_index write_index # | | | | # ..........[.....................|...............][...............] # | # [ readable area ][ writable area ] # | # | [........] read without copy # [........] | read without copy # [....|......] read with copy # def index(self): return self._read_index def first_index(self): return self._read_index - self.shape[0] @property def _write_index(self): return self._indexes[1] @property def _read_index(self): return self._indexes[0] def _set_write_index(self, i): # what kind of protection do we need here? self._indexes[1] = i def _set_read_index(self, i): # what kind of protection do we need here? self._indexes[0] = i def reset_index(self): self._set_write_index(0) self._set_read_index(0) def new_chunk(self, data, index=None): dsize = data.shape[0] bsize = self.shape[0] if dsize > bsize: raise ValueError("Data chunk size %d is too large for ring " "buffer of size %d." % (dsize, bsize)) if data.dtype != self.dtype: raise TypeError("Data has incorrect dtype %s (buffer requires %s)" % (data.dtype, self.dtype)) # by default, index advances by the size of the chunk if index is None: index = self._write_index + dsize assert dsize <= index - self._write_index, ("Data size is %d, but index " "only advanced by %d. (index=%d, self._write_index=%d)" % (dsize, index-self._write_index, index, self._write_index)) revert_inds = [self._read_index, self._write_index] try: # advance write index. This immediately prevents other processes from # accessing memory that is about to be overwritten. self._set_write_index(index) # decide if any skipped data needs to be filled in fill_start = max(self._read_index, self._write_index - bsize) fill_stop = self._write_index - dsize if fill_stop > fill_start: # data was skipped; fill in missing regions with 0 or nan. self._write(fill_start, fill_stop, self._filler) revert_inds[1] = fill_stop self._write(self._write_index - dsize, self._write_index, data) self._set_read_index(index) except: # If there is a failure writing data, revert read/write pointers self._set_read_index(revert_inds[0]) self._set_write_index(revert_inds[1]) raise def _write(self, start, stop, value): # get starting index bsize = self.shape[0] dsize = stop - start i = start % bsize if self.double: self.buffer[i:i+dsize] = value i += bsize if i + dsize <= self.buffer.shape[0]: self.buffer[i:i+dsize] = value else: n = self.buffer.shape[0]-i if hasattr(value, '__len__'): # case array self.buffer[i:] = value[:n] self.buffer[:dsize-n] = value[n:] else: # case value is a scalar (when self._filler) self.buffer[i:] = value self.buffer[:dsize-n] = value def __getitem__(self, item): if isinstance(item, tuple): first = item[0] rest = (slice(None),) + item[1:] else: first = item rest = None if isinstance(first, (int, np.integer)): start = self._interpret_index(first) stop = start + 1 data = self.get_data(start, stop)[0] if rest is not None: data = data[rest[1:]] elif isinstance(first, slice): start, stop, step = self._interpret_index(first) data = self.get_data(start, stop)[::step] if rest is not None: data = data[rest] else: raise TypeError("Invalid index type %s" % type(first)) return data
[docs] def get_data(self, start, stop, copy=False, join=True): """Return a segment of the ring buffer. Parameters ---------- start : int The starting index of the segment to return. stop : int The stop index of the segment to return (the sample at this index will not be included in the returned data) copy : bool If True, then a copy of the data is returned to ensure that modifying the data will not affect the ring buffer. If False, then a reference to the buffer will be returned if possible. Default is False. join : bool If True, then a single contiguous array is returned for the entire requested segment. If False, then two separate arrays are returned for the beginning and end of the requested segment. This can be used to avoid an unnecessary copy when the buffer has double=False and the caller does not require a contiguous array. """ first, last = self.first_index(), self.index() if start < first or stop > last: raise IndexError("Requested segment (%d, %d) is out of bounds for ring buffer. " "Current bounds are (%d, %d)." % (start, stop, first, last)) bsize = self.shape[0] copied = False if self.double: # This do not work when get_data(-10, 50) meaning stop=50 length=60 (start=stop-length) # this is util at the beging to get larger buffer than already possible #start_ind = start % bsize #stop_ind = start_ind + (stop - start) # I prefer this which equivalent but work with start<0: stop_ind = stop%bsize + bsize start_ind = stop_ind - (stop - start) data = self.buffer[start_ind:stop_ind] else: break_index = self._write_index - (self._write_index % bsize) if (start < break_index) == (stop <= break_index): start_ind = start % bsize stop_ind = start_ind + (stop - start) data = self.buffer[start_ind:stop_ind] else: # need to reconstruct from two pieces newshape = np.array((stop-start,) + self.shape[1:])[self.axisorder] a = self.buffer[start%bsize:] b = self.buffer[:stop%bsize] if join is False: if copy is True: return (a.copy(), b.copy()) else: return (a, b) else: data = np.empty(newshape, self.buffer.dtype).transpose(np.argsort(self.axisorder)) #data[:break_index-start] = a #not robust if break_index==start #data[break_index-start:] = b data[:a.shape[0]] = a data[a.shape[0]:] = b copied = True if copy and not copied: data = data.copy() if join: return data else: empty = np.empty((0,) + data.shape[1:], dtype=data.dtype) return data, empty
def _interpret_index(self, index): """Return normalized index, accounting for negative and None values. Also check that the index is readable. Slices are returned such that start,stop are swapped and shifted -1 if the step is negative. This makes it possible to collect the result in the forward direction and handle the step later. """ start_index = self._write_index - self.shape[0] if isinstance(index, (int, np.integer)): if index < 0: index += self._read_index if index >= self._read_index or index < start_index: raise IndexError("Index %d is out of bounds for ring buffer [%d:%d]" % (index, start_index, self._read_index)) return index elif isinstance(index, slice): start, stop, step = index.start, index.stop, index.step # Handle None and negative steps if step is None: step = 1 if step < 0: start, stop = stop, start # Interpret None and negative indices if start is None: start = start_index else: if start < 0: start += self._read_index if step < 0: start += 1 if stop is None: stop = self._read_index else: if stop < 0: stop += self._read_index if step < 0: stop += 1 # Bounds check. # Perhaps we could clip the returned data like lists/arrays do, # but in this case the feedback is likely to be useful to the user. if stop > self._read_index or stop < start_index: raise IndexError("Stop index %d is out of bounds for ring buffer [%d, %d]" % (stop, start_index, self._read_index)) if start > self._read_index or start < start_index: raise IndexError("Start index %d is out of bounds for ring buffer [%d, %d]" % (start, start_index, self._read_index)) return start, stop, step else: raise TypeError("Invalid index %s" % index)