Pyacq Core¶
Process Management Classes¶
-
pyacq.core.
create_manager
(mode='rpc', auto_close_at_exit=True)[source]¶ Create a new Manager either in this process or in a new process.
This function also starts a log server to which all log records will be forwarded from the manager and all processes started by the manager. See LogServer for more information.
Parameters: - mode : str
Must be ‘local’ to create the Manager in the current process, or ‘rpc’ to create the Manager in a new process (in which case a proxy to the remote manager will be returned).
- auto_close_at_exit : bool
If True, then call Manager.close() automatically when the calling process exits (only used when
mode=='rpc'
).
-
class
pyacq.core.
Manager
[source]¶ Manager is a central point of control for connecting to hosts and creating Nodegroups.
Manager instances should be created using create_manager().
-
close
()[source]¶ Close the Manager.
If a default host was created by this Manager, then it will be closed as well.
-
create_nodegroup
(name=None, host=None, qt=True, **kwds)[source]¶ Create a new NodeGroup process and return a proxy to the NodeGroup.
A NodeGroup is a process that manages one or more Nodes for device interaction, computation, or GUI.
Parameters: - name : str
A name for the new NodeGroup’s process. This name is used to uniquely identify log messages originating from this nodegroup.
- host : None | str | ObjectProxy<Host>
Optional address of the Host that should be used to spawn the new NodeGroup, or a proxy to the Host itself. If omitted, then the NodeGroup is spawned from the Manager’s default host.
- qt : bool
Whether to start a QApplication in the new process. Default is True.
- All extra keyword arguments are passed to `Host.create_nodegroup()`.
-
-
class
pyacq.core.
NodeGroup
(host, manager)[source]¶ NodeGroup is responsible for managing a collection of Nodes within a single process.
NodeGroups themselves are created and destroyed by Hosts, which manage all NodeGroups on a particular machine.
-
create_node
(node_class, *args, **kwds)[source]¶ Create a new Node and add it to this NodeGroup.
Return the new Node.
-
register_node_type_from_module
(modname, classname)[source]¶ Register a Node subclass with this NodeGroup.
This allows custom Node subclasses to be instantiated in this NodeGroup using
NodeGroup.create_node()
.
-
-
class
pyacq.core.host.
Host
(name, poll_procs=False)[source]¶ Host serves as a pre-existing contact point for spawning new processes on a remote machine.
One Host instance must be running on each machine that will be connected to by a Manager. The Host is only responsible for creating and destroying NodeGroups.
-
check_spawners
()[source]¶ Check for any processes that have exited and report them to their manager.
This method is called by a timer if the host is created with poll_procs True.
-
create_nodegroup
(name, manager=None, qt=True, **kwds)[source]¶ Create a new NodeGroup in a new process and return a proxy to it.
Parameters: - name : str
The name of the new NodeGroup. This will also be used as the name of the process in log records sent to the Manager.
- manager : Manager | ObjectProxy<Manager> | None
The Manager to which this NodeGroup belongs.
- qt : bool
Whether to start a QApplication in the new process. Default is True.
- All extra keyword arguments are passed to `ProcessSpawner()`.
-
-
class
pyacq.core.
Node
(name='', parent=None)[source]¶ A Node is the basic element for generating and processing data streams in pyacq.
Nodes may be used to interact with devices, generate data, store data, perform computations, or display user interfaces. Each node may have multiple input and output streams that connect to other nodes. For example:
[ data acquisition node ] -> [ processing node ] -> [ display node ] -> [ recording node ]
An application may directly create and connect the Nodes it needs, or it may use a Manager to create a network of nodes distributed across multiple processes or machines.
The order of operations when creating and operating a node is very important:
- Instantiate the node directly or remotely using NodeGroup.create_node.
- Call Node.configure(…) to set global parameters such as sample rate, channel selections, etc.
- Connect inputs to their sources (if applicable): Node.inputs[‘input_name’].connect(other_node.outpouts[‘output_name’])
- Configure outputs: Node.outputs[‘output_name’].configure(…)
- Call Node.initialize(), which will verify input/output settings, allocate memory, prepare devices, etc.
- Call Node.start() and Node.stop() to begin/end reading from input streams and writing to output streams. These may be called multiple times.
- Close the node with Node.close(). If the node was created remotely, this is handled by the NodeGroup to which it belongs.
Notes
For convenience, if a Node has only 1 input or 1 output:
- Node.inputs[‘input_name’] can be written Node.input
- Node.outputs[‘output_name’] can be written Node.output
When there are several outputs or inputs, this shorthand is not permitted.
The state of a Node can be requested using thread-safe methods:
- Node.running()
- Node.configured()
- Node.initialized()
-
after_input_connect
(inputname)[source]¶ This method is called when one of the Node’s inputs has been connected.
It may be reimplemented by subclasses.
-
after_output_configure
(outputname)[source]¶ This method is called when one of the Node’s outputs has been configured.
It may be reimplemented by subclasses.
-
check_input_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that inputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
check_output_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that outputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
close
()[source]¶ Close the Node.
This causes all input/output connections to be closed. Nodes must be stopped before they can be closed.
-
configure
(**kargs)[source]¶ Configure the Node.
This method is used to set global parameters such as sample rate, channel selections, etc. Each Node subclass determines the allowed arguments to this method by reimplementing Node._configure().
-
configured
()[source]¶ Return True if the Node has already been configured.
This method is thread-safe.
-
initialize
()[source]¶ Initialize the Node.
This method prepares the node for operation by allocating memory, preparing devices, checking input and output specifications, etc. Node subclasses determine the behavior of this method by reimplementing Node._initialize().
-
initialized
()[source]¶ Return True if the Node has already been initialized.
This method is thread-safe.
-
input
¶ Return the single input for this Node.
If the node does not have exactly one input, then raise AssertionError.
-
output
¶ Return the single output for this Node.
If the node does not have exactly one put, then raise AssertionError.
Stream Classes¶
-
class
pyacq.core.
InputStream
(spec=None, node=None, name=None)[source]¶ Class for streaming data from an OutputStream.
Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.
Typical InputStream usage:
- Use
InputStream.connect()
to connect to anOutputStream
defined elsewhere. Usually, the argument will actually be a proxy to a remoteOutputStream
. - Poll for incoming data packets with
InputStream.poll()
. - Receive the next packet with
InputStream.recv()
.
Optionally, use
InputStream.set_buffer()
to attach aRingBuffer
for easier data handling.-
close
()[source]¶ Close the stream.
This closes the socket. No data can be received after this point.
-
connect
(output)[source]¶ Connect an output to this input.
Any data send over the stream using
output.send()
can be retrieved usinginput.recv()
.Parameters: - output : OutputStream (or proxy to a remote OutputStream)
The OutputStream to connect.
-
empty_queue
()[source]¶ Receive all pending messing in the zmq queue without consuming them. This is usefull when a Node do not start at the same time than other nodes but was already connected. In that case the zmq water mecanism put messages in a queue and when you start cusuming you get old message. This can be annoying. This recv every thing with timeout=0 and so empty the queue.
-
get_data
(*args, **kargs)[source]¶ Return a segment of the RingBuffer attached to this InputStream.
If no RingBuffer is attached, raise an exception.
For parameters, see
RingBuffer.get_data()
.See also:
InputStream.set_buffer()
.
-
poll
(timeout=None)[source]¶ Poll the socket of input stream.
Return True if a new packet is available.
-
recv
(**kargs)[source]¶ Receive a chunk of data.
Returns: - index: int
The absolute sample index. This is the index of the last sample + 1.
- data: np.ndarray or bytes
The received chunk of data. If the stream uses
transfermode='sharedarray'
, then the data is returned as None and you must useinput_stream[start:stop]
to read from the shared array orinput_stream.recv(with_data=True)
to return the received data chunk.
- Use
-
class
pyacq.core.
OutputStream
(spec=None, node=None, name=None)[source]¶ Class for streaming data to an InputStream.
Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.
Parameters: - spec : dict
Required parameters for this stream. These may not be overridden when calling
configure()
later on.- node : Node or None
- name : str or None
-
configure
(**kargs)[source]¶ Configure the output stream.
Parameters: - protocol : ‘tcp’, ‘udp’, ‘inproc’ or ‘inpc’ (linux only)
The type of protocol used for the zmq.PUB socket
- interface : str
The bind adress for the zmq.PUB socket
- port : str
The port for the zmq.PUB socket
- transfermode: str
The method used for data transfer:
- ‘plaindata’: data are sent over a plain socket in two parts: (frame index, data).
- ‘sharedmem’: data are stored in shared memory in a ring buffer and the current frame index is sent over the socket.
- ‘shared_cuda_buffer’: (planned) data are stored in shared Cuda buffer and the current frame index is sent over the socket.
- ‘share_opencl_buffer’: (planned) data are stored in shared OpenCL buffer and the current frame index is sent over the socket.
All registered transfer modes can be found in pyacq.core.stream.all_transfermodes.
- streamtype: ‘analogsignal’, ‘digitalsignal’, ‘event’ or ‘image/video’
The nature of data to be transferred.
- dtype: str (‘float32’,’float64’, [(‘r’, ‘uint16’), (‘g’, ‘uint16’), , (‘b’, ‘uint16’)], …)
The numpy.dtype of the data buffer. It can be a composed dtype for event or images.
- shape: list
The shape of each data frame. If the stream will send chunks of variable length, then use -1 for the first (time) dimension.
- For
streamtype=image
, the shape should be(-1, H, W)
or(n_frames, H, W)
. - For
streamtype=analogsignal
the shape should be(n_samples, n_channels)
or(-1, n_channels)
.
- For
- compression: ‘’, ‘blosclz’, ‘blosc-lz4’
The compression for the data stream. The default uses no compression.
- scale: float
An optional scale factor + offset to apply to the data before it is sent over the stream.
output = offset + scale * input
- offset:
See scale.
- units: str
Units of the stream data. Mainly used for ‘analogsignal’.
- sample_rate: float or None
Sample rate of the stream in Hz.
- kwargs :
All extra keyword arguments are passed to the DataSender constructor for the chosen transfermode (for example, see
SharedMemSender
).
-
class
pyacq.core.stream.plaindatastream.
PlainDataSender
(socket, params)[source]¶ Helper class to send data serialized over socket.
Note: this class is usually not instantiated directly; use
OutputStream.configure(transfermode='plaindata')
.To avoid unnecessary copies (and thus optimize transmission speed), data is sent exactly as it appears in memory including array strides.
This class supports compression.
Stream sender that uses shared memory for efficient interprocess communication. Only the data pointer is sent over the socket.
Note: this class is usually not instantiated directly; use
OutputStream.configure(transfermode='sharedmem')
.Extra parameters accepted when configuring the output stream:
- buffer_size (int) the size of the shared memory buffer in frames.
The total shape of the allocated buffer is
(buffer_size,) + shape
. - double (bool) if True, then the buffer size is doubled and all frames are written to the buffer twice. This makes it possible to guarantee zero-copy reads by any connected InputStream.
- axisorder (tuple) The order that buffer axes should be arranged in memory. This makes it possible to optimize for specific algorithms that expect either row-major or column-major alignment. The default is row-major; the time axis comes first in the axis order.
- fill (float) Value used to fill the buffer where no data is available.
- buffer_size (int) the size of the shared memory buffer in frames.
The total shape of the allocated buffer is
Data Handling Classes¶
-
class
pyacq.core.
RingBuffer
(shape, dtype, double=True, shmem=None, fill=None, axisorder=None)[source]¶ Class that collects data as it arrives from an InputStream and writes it into a single- or double-ring buffer.
This allows the user to request the concatenated history of data received by the stream, up to a predefined length. Double ring buffers allow faster, copyless reads at the expense of doubled write time and memory footprint.
-
get_data
(start, stop, copy=False, join=True)[source]¶ Return a segment of the ring buffer.
Parameters: - start : int
The starting index of the segment to return.
- stop : int
The stop index of the segment to return (the sample at this index will not be included in the returned data)
- copy : bool
If True, then a copy of the data is returned to ensure that modifying the data will not affect the ring buffer. If False, then a reference to the buffer will be returned if possible. Default is False.
- join : bool
If True, then a single contiguous array is returned for the entire requested segment. If False, then two separate arrays are returned for the beginning and end of the requested segment. This can be used to avoid an unnecessary copy when the buffer has double=False and the caller does not require a contiguous array.
-
Class to create a shared memory buffer.
This class uses mmap so that unrelated processes (not forked) can share it.
It is usually not necessary to instantiate this class directly; use
OutputStream.configure(transfermode='sharedmem')
.Parameters: - size : int
Buffer size in bytes.
- shm_id : str or None
The id of an existing SharedMem to open. If None, then a new shared memory file is created. On linux this is the filename, on Windows this is the tagname.
Close this buffer.
Return a dict that can be serialized and sent to other processes to access this buffer.
Return a numpy array pointing to part (or all) of this buffer.