# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
import numpy as np
import sys, random, string, tempfile, mmap
# TODO
# On POSIX system it can optionally the shm_open way to avoid mmap.
[docs]class SharedMem:
"""Class to create a shared memory buffer.
This class uses mmap so that unrelated processes (not forked) can share it.
It is usually not necessary to instantiate this class directly; use
:func:`OutputStream.configure(transfermode='sharedmem') <OutputStream.configure>`.
Parameters
----------
size : int
Buffer size in bytes.
shm_id : str or None
The id of an existing SharedMem to open. If None, then a new shared
memory file is created.
On linux this is the filename, on Windows this is the tagname.
"""
def __init__(self, nbytes, shm_id=None):
self.nbytes = nbytes
self.mmap_size = (self.nbytes // mmap.PAGESIZE + 1) * mmap.PAGESIZE
self.shm_id = shm_id
if sys.platform.startswith('win'):
if shm_id is None:
self.shm_id = u'pyacq_SharedMem_'+''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(128))
self.mmap = mmap.mmap(-1, self.nbytes, self.shm_id, access=mmap.ACCESS_WRITE)
else:
self.mmap = mmap.mmap(-1, self.nbytes, self.shm_id, access=mmap.ACCESS_READ)
else:
if shm_id is None:
self._tmpFile = tempfile.NamedTemporaryFile(prefix=u'pyacq_SharedMem_')
self._tmpFile.write(b'\x00' * self.nbytes)
self._tmpFile.flush() # I do not anderstand but this is needed....
self.shm_id = self._tmpFile.name
self.mmap = mmap.mmap(self._tmpFile.fileno(), self.nbytes, mmap.MAP_SHARED, mmap.PROT_WRITE)
else:
self._tmpFile = open(self.shm_id, 'rb')
self.mmap = mmap.mmap(self._tmpFile.fileno(), self.nbytes, mmap.MAP_SHARED, mmap.PROT_READ)
[docs] def close(self):
"""Close this buffer.
"""
self.mmap.close()
if not sys.platform.startswith('win') and hasattr(self, '_tmpFile'):
self._tmpFile.close()
[docs] def to_dict(self):
"""Return a dict that can be serialized and sent to other processes to
access this buffer.
"""
return {'nbytes': self.nbytes, 'shm_id': self.shm_id}
[docs] def to_numpy(self, offset, dtype, shape, strides=None):
"""Return a numpy array pointing to part (or all) of this buffer.
"""
return np.ndarray(buffer=self.mmap, shape=shape,
strides=strides, offset=offset, dtype=dtype)
class SharedArray:
"""Class to create shared memory that can be viewed as a `numpy.ndarray`.
This class uses mmap so that unrelated processes (not forked) can share it.
The parameters of the array may be serialized and passed to other processes
using `to_dict()`::
orig_array = SharedArray(shape, dtype)
spec = pickle.dumps(orig_array.to_dict())
shared_array = SharedArray(**pickle.loads(spec))
Parameters
----------
shape : tuple
The shape of the array.
dtype : str or list
The dtype of the array (as understood by `numpy.dtype()`).
shm_id : str or None
The id of an existing SharedMem to open. If None, then a new shared
memory file is created.
On linux this is the filename, on Windows this is the tagname.
"""
def __init__(self, shape=(1,), dtype='float64', shm_id=None):
self.shape = shape
self.dtype = np.dtype(dtype)
nbytes = np.prod(shape)*self.dtype.itemsize
self.shmem = SharedMem(nbytes, shm_id)
def to_dict(self):
return {'shape': self.shape, 'dtype': self.dtype, 'shm_id': self.shmem.shm_id}
def to_numpy(self):
return np.frombuffer(self.shmem.mmap, dtype=self.dtype).reshape(self.shape)