Pyacq Core

Process Management Classes

pyacq.core.create_manager(mode='rpc', auto_close_at_exit=True)[source]

Create a new Manager either in this process or in a new process.

This function also starts a log server to which all log records will be forwarded from the manager and all processes started by the manager. See LogServer for more information.

Parameters:
mode : str

Must be ‘local’ to create the Manager in the current process, or ‘rpc’ to create the Manager in a new process (in which case a proxy to the remote manager will be returned).

auto_close_at_exit : bool

If True, then call Manager.close() automatically when the calling process exits (only used when mode=='rpc').

class pyacq.core.Manager[source]

Manager is a central point of control for connecting to hosts and creating Nodegroups.

Manager instances should be created using create_manager().

close()[source]

Close the Manager.

If a default host was created by this Manager, then it will be closed as well.

create_nodegroup(name=None, host=None, qt=True, **kwds)[source]

Create a new NodeGroup process and return a proxy to the NodeGroup.

A NodeGroup is a process that manages one or more Nodes for device interaction, computation, or GUI.

Parameters:
name : str

A name for the new NodeGroup’s process. This name is used to uniquely identify log messages originating from this nodegroup.

host : None | str | ObjectProxy<Host>

Optional address of the Host that should be used to spawn the new NodeGroup, or a proxy to the Host itself. If omitted, then the NodeGroup is spawned from the Manager’s default host.

qt : bool

Whether to start a QApplication in the new process. Default is True.

All extra keyword arguments are passed to `Host.create_nodegroup()`.
get_host(addr)[source]

Connect the manager to a Host’s RPC server and return a proxy to the host.

Hosts are used as a stable service on remote machines from which new Nodegroups can be spawned or closed.

get_logger_info()[source]

Return the address of the log server and the level of the root logger.

list_hosts()[source]

Return a list of the Hosts that the Manager is connected to.

class pyacq.core.NodeGroup(host, manager)[source]

NodeGroup is responsible for managing a collection of Nodes within a single process.

NodeGroups themselves are created and destroyed by Hosts, which manage all NodeGroups on a particular machine.

add_node(node)[source]

Add a Node to this NodeGroup.

any_node_running()[source]

Return True if any of the Nodes in this group are running.

create_node(node_class, *args, **kwds)[source]

Create a new Node and add it to this NodeGroup.

Return the new Node.

list_node_types()[source]

Return a list of the class names for all registered node types.

register_node_type_from_module(modname, classname)[source]

Register a Node subclass with this NodeGroup.

This allows custom Node subclasses to be instantiated in this NodeGroup using NodeGroup.create_node().

remove_node(node)[source]

Remove a Node from this NodeGroup.

start_all_nodes()[source]

Call Node.start() for all Nodes in this group.

stop_all_nodes()[source]

Call Node.stop() for all Nodes in this group.

class pyacq.core.host.Host(name, poll_procs=False)[source]

Host serves as a pre-existing contact point for spawning new processes on a remote machine.

One Host instance must be running on each machine that will be connected to by a Manager. The Host is only responsible for creating and destroying NodeGroups.

check_spawners()[source]

Check for any processes that have exited and report them to their manager.

This method is called by a timer if the host is created with poll_procs True.

close_all_nodegroups(force=False)[source]

Close all NodeGroups belonging to this host.

create_nodegroup(name, manager=None, qt=True, **kwds)[source]

Create a new NodeGroup in a new process and return a proxy to it.

Parameters:
name : str

The name of the new NodeGroup. This will also be used as the name of the process in log records sent to the Manager.

manager : Manager | ObjectProxy<Manager> | None

The Manager to which this NodeGroup belongs.

qt : bool

Whether to start a QApplication in the new process. Default is True.

All extra keyword arguments are passed to `ProcessSpawner()`.
class pyacq.core.Node(name='', parent=None)[source]

A Node is the basic element for generating and processing data streams in pyacq.

Nodes may be used to interact with devices, generate data, store data, perform computations, or display user interfaces. Each node may have multiple input and output streams that connect to other nodes. For example:

[ data acquisition node ] -> [ processing node ] -> [ display node ]
                                                 -> [ recording node ]

An application may directly create and connect the Nodes it needs, or it may use a Manager to create a network of nodes distributed across multiple processes or machines.

The order of operations when creating and operating a node is very important:

  1. Instantiate the node directly or remotely using NodeGroup.create_node.
  2. Call Node.configure(…) to set global parameters such as sample rate, channel selections, etc.
  3. Connect inputs to their sources (if applicable): Node.inputs[‘input_name’].connect(other_node.outpouts[‘output_name’])
  4. Configure outputs: Node.outputs[‘output_name’].configure(…)
  5. Call Node.initialize(), which will verify input/output settings, allocate memory, prepare devices, etc.
  6. Call Node.start() and Node.stop() to begin/end reading from input streams and writing to output streams. These may be called multiple times.
  7. Close the node with Node.close(). If the node was created remotely, this is handled by the NodeGroup to which it belongs.

Notes

For convenience, if a Node has only 1 input or 1 output:

  • Node.inputs[‘input_name’] can be written Node.input
  • Node.outputs[‘output_name’] can be written Node.output

When there are several outputs or inputs, this shorthand is not permitted.

The state of a Node can be requested using thread-safe methods:

  • Node.running()
  • Node.configured()
  • Node.initialized()
after_input_connect(inputname)[source]

This method is called when one of the Node’s inputs has been connected.

It may be reimplemented by subclasses.

after_output_configure(outputname)[source]

This method is called when one of the Node’s outputs has been configured.

It may be reimplemented by subclasses.

check_input_specs()[source]

This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that inputs are correctly configured before the node is started.

In case of misconfiguration, this method must raise an exception.

check_output_specs()[source]

This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that outputs are correctly configured before the node is started.

In case of misconfiguration, this method must raise an exception.

close()[source]

Close the Node.

This causes all input/output connections to be closed. Nodes must be stopped before they can be closed.

closed()[source]

Return True if the Node has already been closed.

This method is thread-safe.

configure(**kargs)[source]

Configure the Node.

This method is used to set global parameters such as sample rate, channel selections, etc. Each Node subclass determines the allowed arguments to this method by reimplementing Node._configure().

configured()[source]

Return True if the Node has already been configured.

This method is thread-safe.

initialize()[source]

Initialize the Node.

This method prepares the node for operation by allocating memory, preparing devices, checking input and output specifications, etc. Node subclasses determine the behavior of this method by reimplementing Node._initialize().

initialized()[source]

Return True if the Node has already been initialized.

This method is thread-safe.

input

Return the single input for this Node.

If the node does not have exactly one input, then raise AssertionError.

output

Return the single output for this Node.

If the node does not have exactly one put, then raise AssertionError.

running()[source]

Return True if the Node is running.

This method is thread-safe.

start()[source]

Start the Node.

When the node is running it will read from its input streams and write to its output streams (if any). Nodes must be configured and initialized before they are started, and can be stopped and restarted any number of times.

stop()[source]

Stop the Node (see start()).

Stream Classes

class pyacq.core.InputStream(spec=None, node=None, name=None)[source]

Class for streaming data from an OutputStream.

Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.

Typical InputStream usage:

  1. Use InputStream.connect() to connect to an OutputStream defined elsewhere. Usually, the argument will actually be a proxy to a remote OutputStream.
  2. Poll for incoming data packets with InputStream.poll().
  3. Receive the next packet with InputStream.recv().

Optionally, use InputStream.set_buffer() to attach a RingBuffer for easier data handling.

close()[source]

Close the stream.

This closes the socket. No data can be received after this point.

connect(output)[source]

Connect an output to this input.

Any data send over the stream using output.send() can be retrieved using input.recv().

Parameters:
output : OutputStream (or proxy to a remote OutputStream)

The OutputStream to connect.

empty_queue()[source]

Receive all pending messing in the zmq queue without consuming them. This is usefull when a Node do not start at the same time than other nodes but was already connected. In that case the zmq water mecanism put messages in a queue and when you start cusuming you get old message. This can be annoying. This recv every thing with timeout=0 and so empty the queue.

get_data(*args, **kargs)[source]

Return a segment of the RingBuffer attached to this InputStream.

If no RingBuffer is attached, raise an exception.

For parameters, see RingBuffer.get_data().

See also: InputStream.set_buffer().

poll(timeout=None)[source]

Poll the socket of input stream.

Return True if a new packet is available.

recv(**kargs)[source]

Receive a chunk of data.

Returns:
index: int

The absolute sample index. This is the index of the last sample + 1.

data: np.ndarray or bytes

The received chunk of data. If the stream uses transfermode='sharedarray', then the data is returned as None and you must use input_stream[start:stop] to read from the shared array or input_stream.recv(with_data=True) to return the received data chunk.

reset_buffer_index()[source]

Reset the buffer index. Usefull for multiple start/stop on Node to reset the index.

set_buffer(size=None, double=True, axisorder=None, shmem=None, fill=None)[source]

Ensure that this InputStream has a RingBuffer at least as large as size and with the specified double-mode and axis order.

If necessary, this will attach a new RingBuffer to the stream and remove any existing buffer.

class pyacq.core.OutputStream(spec=None, node=None, name=None)[source]

Class for streaming data to an InputStream.

Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.

Parameters:
spec : dict

Required parameters for this stream. These may not be overridden when calling configure() later on.

node : Node or None
name : str or None
close()[source]

Close the output.

This closes the socket and releases shared memory, if necessary.

configure(**kargs)[source]

Configure the output stream.

Parameters:
protocol : ‘tcp’, ‘udp’, ‘inproc’ or ‘inpc’ (linux only)

The type of protocol used for the zmq.PUB socket

interface : str

The bind adress for the zmq.PUB socket

port : str

The port for the zmq.PUB socket

transfermode: str

The method used for data transfer:

  • ‘plaindata’: data are sent over a plain socket in two parts: (frame index, data).
  • ‘sharedmem’: data are stored in shared memory in a ring buffer and the current frame index is sent over the socket.
  • ‘shared_cuda_buffer’: (planned) data are stored in shared Cuda buffer and the current frame index is sent over the socket.
  • ‘share_opencl_buffer’: (planned) data are stored in shared OpenCL buffer and the current frame index is sent over the socket.

All registered transfer modes can be found in pyacq.core.stream.all_transfermodes.

streamtype: ‘analogsignal’, ‘digitalsignal’, ‘event’ or ‘image/video’

The nature of data to be transferred.

dtype: str (‘float32’,’float64’, [(‘r’, ‘uint16’), (‘g’, ‘uint16’), , (‘b’, ‘uint16’)], …)

The numpy.dtype of the data buffer. It can be a composed dtype for event or images.

shape: list

The shape of each data frame. If the stream will send chunks of variable length, then use -1 for the first (time) dimension.

  • For streamtype=image, the shape should be (-1, H, W) or (n_frames, H, W).
  • For streamtype=analogsignal the shape should be (n_samples, n_channels) or (-1, n_channels).
compression: ‘’, ‘blosclz’, ‘blosc-lz4’

The compression for the data stream. The default uses no compression.

scale: float

An optional scale factor + offset to apply to the data before it is sent over the stream. output = offset + scale * input

offset:

See scale.

units: str

Units of the stream data. Mainly used for ‘analogsignal’.

sample_rate: float or None

Sample rate of the stream in Hz.

kwargs :

All extra keyword arguments are passed to the DataSender constructor for the chosen transfermode (for example, see SharedMemSender).

reset_buffer_index()[source]

Reset the buffer index. Usefull for multiple start/stop on Node to reset the index.

send(data, index=None, **kargs)[source]

Send a data chunk and its frame index.

Parameters:
index: int

The absolute sample index. This is the index of the last sample + 1.

data: np.ndarray or bytes

The chunk of data to send.

class pyacq.core.stream.plaindatastream.PlainDataSender(socket, params)[source]

Helper class to send data serialized over socket.

Note: this class is usually not instantiated directly; use OutputStream.configure(transfermode='plaindata').

To avoid unnecessary copies (and thus optimize transmission speed), data is sent exactly as it appears in memory including array strides.

This class supports compression.

class pyacq.core.stream.sharedmemstream.SharedMemSender(socket, params)[source]

Stream sender that uses shared memory for efficient interprocess communication. Only the data pointer is sent over the socket.

Note: this class is usually not instantiated directly; use OutputStream.configure(transfermode='sharedmem').

Extra parameters accepted when configuring the output stream:

  • buffer_size (int) the size of the shared memory buffer in frames. The total shape of the allocated buffer is (buffer_size,) + shape.
  • double (bool) if True, then the buffer size is doubled and all frames are written to the buffer twice. This makes it possible to guarantee zero-copy reads by any connected InputStream.
  • axisorder (tuple) The order that buffer axes should be arranged in memory. This makes it possible to optimize for specific algorithms that expect either row-major or column-major alignment. The default is row-major; the time axis comes first in the axis order.
  • fill (float) Value used to fill the buffer where no data is available.

Data Handling Classes

class pyacq.core.RingBuffer(shape, dtype, double=True, shmem=None, fill=None, axisorder=None)[source]

Class that collects data as it arrives from an InputStream and writes it into a single- or double-ring buffer.

This allows the user to request the concatenated history of data received by the stream, up to a predefined length. Double ring buffers allow faster, copyless reads at the expense of doubled write time and memory footprint.

get_data(start, stop, copy=False, join=True)[source]

Return a segment of the ring buffer.

Parameters:
start : int

The starting index of the segment to return.

stop : int

The stop index of the segment to return (the sample at this index will not be included in the returned data)

copy : bool

If True, then a copy of the data is returned to ensure that modifying the data will not affect the ring buffer. If False, then a reference to the buffer will be returned if possible. Default is False.

join : bool

If True, then a single contiguous array is returned for the entire requested segment. If False, then two separate arrays are returned for the beginning and end of the requested segment. This can be used to avoid an unnecessary copy when the buffer has double=False and the caller does not require a contiguous array.

class pyacq.core.stream.sharedarray.SharedMem(nbytes, shm_id=None)[source]

Class to create a shared memory buffer.

This class uses mmap so that unrelated processes (not forked) can share it.

It is usually not necessary to instantiate this class directly; use OutputStream.configure(transfermode='sharedmem').

Parameters:
size : int

Buffer size in bytes.

shm_id : str or None

The id of an existing SharedMem to open. If None, then a new shared memory file is created. On linux this is the filename, on Windows this is the tagname.

close()[source]

Close this buffer.

to_dict()[source]

Return a dict that can be serialized and sent to other processes to access this buffer.

to_numpy(offset, dtype, shape, strides=None)[source]

Return a numpy array pointing to part (or all) of this buffer.