Pyacq Documentation¶
Contents:
Introduction¶
What is Pyacq?¶
Pyacq is an open-source system for distributed data acquisition and stream processing. Its functionality is organized into nodes that individually handle acquisition, filtering, visualization, and recording. Nodes are created on demand and connected to form a graph of data streams. Nodes may be created and connected within a single thread, or distributed across multiple threads, processes, and hosts.
An example setup for a neurophysiology experiment.
The figure above shows an example use case: two data acquisition devices–a camera and an electrode array recorded through an ADC–stream data simultaneously to storage. The electrode data is passed through an online spike-sorting analyzer whose output is also stored. Finally, the camera’s video feed and the online analysis are streamed to a graphical interface that displays data as it is acquired. The nodes in the graph may run together on a single machine or distributed across a network.
The simplified example below gives a brief overview of the code needed to create part of the graph shown above:
import pyacq
# Connect to a remote host and create a new process there
manager = pyacq.create_manager('rpc')
worker_host = manager.add_host('tcp://10.0.0.103:5678')
worker = worker_host.create_nodegroup()
# Create nodes for data acquisition, analysis, storage, and display
device = manager.create_node('NiDAQmx')
analyzer = manager.create_node('Spikesorter', host=worker)
recorder = manager.create_node('HDF5Recorder', host=worker)
viewer = manager.create_node('QOscilloscope')
# Connect all nodes together
analyzer.input.connect(device.output)
recorder.input.connect(analyzer.output)
viewer.input.connect(analyzer.output)
# Begin acquiring and streaming data
manager.start_all()
License¶
Pyacq is supported by the French National Center for Scientific Research (CNRS) and the Lyon Neuroscience Research Center (CRNL). It is distributed under the BSD 3-clause license.
Architecture¶
Pyacq consists of 1) a collection of nodes with various capabilities for acquisition, processing, and visualization, and 2) a set of core tools that facilitate distributed control and data streaming.
Pyacq is built on several open-source tools including Python 3, numpy for data handling, ZeroMQ for interprocess and network communication, and PyQt for graphical user interface.
Overview of node types¶
Acquisition | Processing | Visualization |
PyAudio | Triggering | Oscilloscope |
Webcam (libav, imageio) | Filtering | Wavelet spectrogram |
BrainAmp | Video display | |
Emotiv | ||
OpenBCI |
Installation¶
Pyacq requires Python 3; support for Python 2 is not planned.
Several packages are required, but most can be installed with pip:
$ pip install pyzmq pytest numpy scipy pyqtgraph vispy colorama msgpack-python pyaudio blosc
One final dependency, PyQt4, cannot be installed with pip. Linux distributions typically provide this package. OSX users can get PyQt4 (and most other dependencies) using the Anaconda Python distribution. Windows users can also use Anaconda or download PyQt4 directly from the Riverbank Computing website.
To install Pyacq, use the standard distutils approach:
$ python setup.py install
Interacting with nodes¶
Pyacq delegates its data handling tasks to objects called Nodes. Each type of Node implements a different part of the pipeline such as acquiring data from a device, filtering or online analysis, visualization, or data storage. Nodes are connected by Streams to form a graph in which data flows from device to screen or to disk.
Creating nodes¶
In the simplest case, nodes may be created directly by instantiating their classes:
audio = pyacq.devices.PyAudio()
viewer = pyacq.viewers.QOscilloscope()
For cases that require multiple processes or that are distributed across machines, Pyacq provides mechanisms for creating and managing Nodes remotely:
manager = pyacq.create_manager()
nodegroup = manager.default_nodegroup
audio = nodegroup.create_node('PyAudio')
viewer = nodegroup.create_node('QOscilloscope')
It is also possible to use both locally- and remotely-instantiated Nodes in the same application. See Managing distributed nodes for more information about managing remote Nodes and their processes.
Configuring and connecting nodes¶
Nodes are configured and connected in a few steps that must be executed in order:
- Call
node.configure(...)
to set global parameters for the node such as sample rate, channel selections, etc. Each Node class defines and documents the parameters accepted by its configure method. - Configure the node’s output streams (if any) by calling
node.outputs['output_name'].configure(...)
. This determines the method of communication that the stream will use–plain TCP data stream, shared memory, etc.–and any associated options such as compression and chunk size. - Connect inputs to their sources (if any) by calling
node.inputs['input_name'].connect(other_node.outputs['output_name'])
. The input will be automatically configured to match its associated output. - Call
node.initialize()
, which will verify input/output settings, allocate memory, prepare devices, etc.
The following code example, taken from examples/pyaudio_oscope.py
, demonstrates
the creation and configuration of two nodes: the first uses PyAudio to stream
an audio signal from a microphone, and the second displays the streamed data.
# Create and configure audio input node
dev = PyAudio()
dev.configure(nb_channel=1, sample_rate=44100., input_device_index=default_input,
format='int16', chunksize=1024)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfertmode='plaindata')
dev.initialize()
# Create an oscilloscope to display data.
viewer = QOscilloscope()
viewer.configure(with_user_dialog = True)
# Connect audio stream to oscilloscope
viewer.input.connect(dev.output)
viewer.initialize()
Starting and stopping¶
After a node is created, configured, and initialized, it is ready to begin
acquiring or processing data. Calling node.start()
instructs the node to
immediately begin reading data from its inputs and/or sending data through its
outputs. Calling node.stop()
will stop all processing until start()
is
called again:
dev.start()
viewer.start()
...
dev.stop()
viewer.stop()
To permanently deallocate any memory and device resources used by a node, call
node.close()
. Nodes may be started and stopped multiple times, but may
not be reinitialized once they are closed.
Interacting with remote nodes¶
It is often useful or necessary to have nodes distributed across multiple threads, processes, or machines (see Managing distributed nodes). Pyacq uses a remote procedure call (RPC) system with object proxies to allow remotely-hosted nodes to be treated almost exactly like local nodes:
# local:
local_node = MyNodeType()
local_node.configure(...)
# remote:
remote_node = nodegroup.create_node('MyNodeType')
remote_node.configure(...)
remote_node.output.configure(...)
local_node.input.connect(remote_node.output)
local_node.initialize()
remote_node.initialize()
local_node.start()
remote_node.start()
In this example, calling any method on remote_node
causes a message to be
sent to the process that owns the node, asking it to invoke the method on our
behalf. The calling process blocks until the return value is sent back. Similarly,
any attributes accessed from remote_node
(such as remote_node.output
)
are automatically returned as proxies to the remote process.
One major difference between local and proxied objects is that remote methods
may be invoked asynchronously. This done by adding the special keyword argument
_sync='async'
to the method call, which causes the call to immediately return
a Future object (see
concurrent.Future
in the Python library reference) that may be used to check the status of the request:
future = remote_node.configure(..., _sync='async')
while not future.done():
# do something while we wait for response
# get the result of calling configure()
result = future.result()
More information about the RPC system can be found in the API reference.
Managing distributed nodes¶
In Pyacq it is often useful to have nodes distributed across multiple threads, processes, or machines. Although it is straightforward to manually create and communicate with other processes, it can become cumbersome as the number of distributed resources increases. Pyacq provides high-level tools for managing processes and the nodes hosted within them:
Manager
is a central point of control for connecting to remote hosts; starting, stopping, and monitoring distributed processes; and collecting node connectivity information.Host
is a server that runs on remote machines to allow Pyacq to connect and spawn new worker processes.NodeGroup
is a simple object that manages multiple nodes within a single process.
The general procedure for running a set of distributed nodes looks like:
- Run Host servers on each remote machine (these can be left running indefinitely).
- Create a Manger from the main process.
- Ask the Manager to connect to each Host server.
- Create NodeGroups as needed. Each NodeGroup appears inside a newly spawned process on any of the available hosts.
- Create Nodes as needed within each NodeGroup.
- Configure, start, and stop Nodes.
- Close the Manager. This will shut down all NodeGroups across all hosts.
Creating a manager¶
Each application should only start one Manager at most by calling the
create_manager()
function:
import pyacq
manager = pyacq.create_manager()
By default, the Manager is created in a new process and a proxy
to the Manager
is returned. This allows the Manager to listen and respond in the background
to requests made by the Hosts, NodeGroups, and Nodes associated with the
application.
Calling create_manager()
also starts a log server
to which all error messages will be sent. Any spawned processes that are
associated with this application will forward their log messages, uncaught
exceptions, and stdout/stderr output back to the log server.
The log server runs inside a new thread of the main process. By default, it prints each received log record along with information about the source host, process, and thread that generated the record. All log records are sorted by their timestamps before being displayed, so it is important that the system clocks are precisely synchronized.
Connecting to remote hosts¶
In order to connect to another machine on the network, the remote machine must be running a server that allows the manager to start and stop new processes. This can be done by running the host server script provided with Pyacq:
$ python tools/host_server.py tcp://10.0.0.53:8000
The IP address and port on which the server should run must be provided as shown above. For each machine that runs a host server, we ask the Manager to make contact with the Host:
host = manager.add_host('tcp://10.0.0.53:8000')
Making this conection ensures that 1) the Manager is aware that it needs to monitor its resources on the host, 2) the Host will inform the Manager if any of its processes dies unexpectedly and 3) the Host will forward all log records, exceptions, and stdout/stderr output back to the Manager’s log server.
Creating remote Nodes¶
Although there are few differences between interacting with remote versus local
Nodes, a little more effort is required to create a Node on a remote host. We
will start by creating a new process on the remote host using
Manager.create_nodegroup()
, then create a new Node using
NodeGroup.create_node()
:
# Create a new process with a NodeGroup on the remote host
nodegroup = manager.create_nodegroup(host)
# Next, request the NodeGroup to create a new Node
node = nodegroup.create_node('PyAudio', **kwargs)
We now have a proxy
to a Node
that has been created in the remote process.
We can use this proxy to configure, initialize, start, and stop the Node,
exactly as we would with a locally instantiated Node:
node.configure(...)
node.initialize(...)
node.start()
node.stop()
Optionally, we can also request the NodeGroup to remove the Node (if we omit this step, then the Manager will take care of it when it exits):
nodegroup.remove_node(node)
Registering new Node classes¶
Whereas local Nodes are instantiated directly from their classes, remote Nodes
are instantiated using their class names. Consequently, custom Node classes
must be registered through the remote NodeGroup using
register_node_type_from_module()
:
nodegroup.register_node_type_from_module('my.module.name', 'MyClassName')
This requests the remote NodeGroup to import the named module and to register the named Node subclass found there. Following this call, it is possible to create new instances of your custom Node class within the remote NodeGroup:
my_node = nodegroup.create_node('MyClassName', ...)
Data streams¶
As data is acquired in Pyacq, it is transmitted from Node to Node within the graph using Stream classes. Each Node has one or more input and/or output streams that may be connected together, and each stream can be configured to transmit different types and shapes of data:
device.output.configure(protocol='tcp', interface='127.0.0.1',
transfermode='plaindata')
viewer.input.connect(device.output)
recorder.input.connect(device.output)
For the most part, Nodes will automatically decide the configuration options for their input/output streams based on the data they receive or generate. Some options, however, must be configured manually. In the sections below we describe the basic operating theory for streams and the associated configuration options.
Streamable Data Types¶
Pyacq’s streams can, in principle, carry any type of time-varying signal that can be represented by a numpy array. In practice, this is expressed in a few simple cases:
- One or more analog signals such as an audio stream or multichannel EEG data. If multiple signals are transmitted in a single stream, they must be time-locked such that, for each time point represented in the data, every channel must have exactly one value recorded (in other words, it must be possible to represent the data as a 2D array of floating-point values).
- One or more time-locked digital signals. These are typically recorded TTL signals such as a lever-press indicator or the frame exposure signal from a camera.
- A video feed from a camera. Although it would be possible to carry multiple time-locked video signals in a single stream, this might be more naturally implemented by creating a single stream per video feed.
- A stream of events, where each event is a
(time, value)
pair that indicates the time that the event occurred and an integer descriptor of the event. This can be used in a similar way to digital signals (for recording button presses, beam crossings, etc.), but where the events are sparsely coded and the complete sample-by-sample recording of the digital signal is either unnecessary or unavailable.
Streams can be used to transmit multidimensional arrays, and for the most part, the shape of these arrays is left to the user to decide. The only requirement is that the first array axis should represent time. Conceptually, stream data represents an array where axis 0 can have an arbitrary length that grows over time as data is collected. In practice, this data is represented in chunks as numpy arrays with a fixed size for axis 0.
Data Transmission¶
Data transmission through a stream occurs in several stages:
- Pre-filtering: As data is passed to an output stream, it is passed through a user-defined sequence of filtering functions. These are used, for example, to scale, cast, or reshape data as needed to meet the stream requirements.
- Chunking: The output stream collects data until a minimum chunk size is reached. The
chunk size is determined by the
output stream configuration
and may depend on the data type. For example, a 100 kHz analog signal might be transmitted over a stream in 1000-sample chunks, whereas a video feed might be transmitted one frame at a time. - Transmission: The chunk is transmitted to all input streams that are connected to the
output. The mechanism used to transmit data depends on the
protocol
andtransfermode
arguments used duringoutput stream configuration
:- Plain data stream over TCP: data is sent by TCP using a ZeroMQ socket.
- Plain data stream within process: data is sent using a ZeroMQ “inproc” socket, which avoids uncecessary memory copies.
- Shared memory: data is written into shared memory, and the new memory pointer is sent using a TCP or inproc ZeroMQ socket.
- Reassembly: Each connected input stream independently receives data chunks and reassembles the stream.
- Post-filtering: The reconstructed stream data is passed through another user-defined sequence of filtering functions before being made available to the stream user.
When transmitting plain data streams, Pyacq tries to maximize throughput by avoiding any unnecessary data copies. In most cases, a copy is required only if the input array does not occupy a contiguous block of memory.
See also
A Simple Example¶
In this example, we pass an array from one thread to another:
import numpy as np
import pyacq
import threading
data = np.array([[1,2], [3,4], [5,6]])
# Create and configure the output stream (sender)
out = pyacq.OutputStream()
out.configure(dtype=data.dtype)
# Create the input stream (receiver) and connect it to
# the output stream
inp = pyacq.InputStream()
inp.connect(out)
# Start a background thread that just receives and prints
# data from the input stream
def receiver():
global inp
while True:
d = inp.recv()
print("Received: %s" % repr(d))
thread = threading.Thread(target=receiver, daemon=True)
thread.start()
# Send data through the stream
out.send(data)
If we run this code from an interactive shell, the last few lines might look like:
>>> out.send(data)
>>> Received: (6, array([[1, 2],
[3, 4],
[5, 6]]))
At this point, we may continue calling out.send()
indefinitely.
Notes:
- In this example, data is sent over the stream using the default method: each chunk is serialized and transmitted over a tcp socket. This default works well when sending data between processes; for threads, however, we can achieve much better performance with other methods. (see
OutputStream.configure()
)- InputStream and OutputStream are not thread-safe. Once the input thread is started, we should not attempt to access the InputStream’s attributes or methods from the main thread. Likewise, we should not attempt to call ip.connect(out) from within the input thread.
- In this example we have not provided any way to ask the stream thread to exit. Setting
daemon=True
when creating the thread ensures that, once the main thread exits, the stream thread will not prevent the process from exiting as well.
Streaming between processes¶
In the example above, we used inp.connect(out)
to establish the connection
between the ends of the stream. How does this work when we have the input and
output in different processes, or on different machines? We use pyacq’s RPC
system to allow the streams to negotiate a connection, exactly as if they
had been created in the same process:
import pyacq
# Start a local RPC server so that a remote InputStream will be able
# to make configuration requests from a local OutputStream:
s = pyacq.RPCServer()
s.run_lazy()
# Create the output stream in the local process
o = pyacq.OutputStream()
o.configure(dtype=float)
# Spawn a new process and create an InputStream there
p = pyacq.ProcessSpawner()
rpyacq = p.client._import('pyacq')
i = rpyacq.InputStream()
# Connect the streams exactly as if they were local
i.connect(o)
Although this example is somewhat contrived, it demonstrates the general approach: assuming both processes are running an RPC server, one will be able to initiate a stream connection as long as it has an RPC proxy to the stream from the other process.
Using Streams in Custom Node Types¶
Node
classes are responsible for handling most of the configuration for their
input/output streams as well as for sending, receiving, and reconstructing data
through the streams. This functionality is mostly hidden from Node users;
however, if you plan to write custom Node classes, then it is
necessary to understand this process in more detail.
Node subclasses may declare any required properties for their input and output
streams through the _input_specs
and _output_specs
class attributes.
Each attribute is a dict whose keys are the names of the streams and whose
values provide the default configuration arguments for the stream. For example:
class MyFilterNode(Node):
_input_specs = {
'analog_in': dict(streamtype='analogsignal', dtype='int16', shape=(-1, 2),
compression='', timeaxis=0, sample_rate=50000.),
'trigger_in': dict(streamtype='digitalsignal', dtype='uint32', shape=(-1),
compression='', timeaxis=0, sample_rate=200000.),
}
_output_specs = {
'filtered_out': dict(streamtype='analogsignal', dtype='float32', shape=(-1, 2),
compression='', timeaxis=0, sample_rate=50000)}
This Node
subclass declares two input streams and one output stream:
an analog input called “analog_in”, a digital input called “trigger_in”, and
an analog output called “filtered_out”. The configuration parameters specified
for each stream are passed to the spec
argument of
InputStream.__init__()
or OutputStream.__init__()
.
When the user calls Node.configure()
,
the Node will have its last opportunity to create extra streams (if any) and apply
all configuration options to its streams.
Nodes call OutputStream.send()
to send new data via their output streams,
and InputStream.recv()
to receive data from their input streams. If the
stream is a plaindata type, then calling recv()
will return the next data chunk. In contrast, sharedmem streams only return the
poisition within the shared memory array of the next data chunk. In this case,
it may be more useful to call InputStream.get_array_slice()
to return
part of the shared memory buffer.
See also
- The Noise generator example demonstrates a simple node with an output stream.
- The Stream monitor example demonstrates a simple node with an input stream.
Using streams in GUI nodes¶
User interface nodes pose a unique challenge because they must somehow work
with the Qt event loop. Using a QTimer
to poll an input node is a valid
option, but this requires a tradeoff between latency and CPU usage–a node that
responds quickly to stream input would have to poll with a short timer interval,
which can be computationally expensive.
A better alternative is to have a background thread block while receiving data
on the input stream, and then send a signal to the GUI event loop whenever it
receives a packet. This is the purpose of pyacq.core.ThreadPollInput
.
For example:
instream = InputStream()
instream.connect(outstream)
poller = ThreadPollInput(input_stream=instream, return_data=True)
def callback(position, data):
print("Received new data packet at position %d" % position)
poller.new_data.connect(callback)
In this example, we assume a Qt event loop is already running.
The pyacq.core.ThreadPollInput
instance starts a
background thread to receive data from instream
. When data is received,
a signal is emitted and the callback is invoked by the Qt event loop. Because
this stream is being accessed by another thread, it must not be accessed
from the main GUI thread until poller.stop()
and poller.wait()
have
been called.
The default behavior for pyacq.core.ThreadPollInput
is to emit a signal
whenever it receives a data packet. However, this behavior can be customized by
overriding the :func:pyacq.core.ThreadPollInput.processData` method in a subclass.
Stream management tools¶
Pyacq provides two simple tools for managing data as it moves between streams:
pyacq.core.StreamConverter
receives data from an output stream and
immediately sends it through another output stream, which could have a
different configuration. As an example, one could receive data from a sharedmem
stream and then use a pyacq.core.StreamConverter
to forward the data
over a tcp socket:
conv = StreamConverter()
conv.configure()
# data arrives via outstream
conv.input.connect(outstream)
# ..and is forwarded via conv.output
conv.output.configure(protocol='tcp')
conv.initialize()
# now we may connect another InputStream to conv.output
pyacq.core.ChannelSplitter
takes a multi-channel stream as input and
forwards data from individual channels (or groups of channels) via multiple
outputs. This is used primarily when streaming multichannel data to a cluster
of nodes that will preform a parallel computation. Although it would be possible
to simply send all channel data to all nodes, this could incur a performance
penalty depending on the stream protocol. By splitting the stream before sending
it to the compute nodes, we can avoid this extra overhead.
Pyacq API Reference¶
Contents:
Pyacq Core¶
Process Management Classes¶
-
pyacq.core.
create_manager
(mode='rpc', auto_close_at_exit=True)[source]¶ Create a new Manager either in this process or in a new process.
This function also starts a log server to which all log records will be forwarded from the manager and all processes started by the manager. See LogServer for more information.
Parameters: - mode : str
Must be ‘local’ to create the Manager in the current process, or ‘rpc’ to create the Manager in a new process (in which case a proxy to the remote manager will be returned).
- auto_close_at_exit : bool
If True, then call Manager.close() automatically when the calling process exits (only used when
mode=='rpc'
).
-
class
pyacq.core.
Manager
[source]¶ Manager is a central point of control for connecting to hosts and creating Nodegroups.
Manager instances should be created using create_manager().
-
close
()[source]¶ Close the Manager.
If a default host was created by this Manager, then it will be closed as well.
-
create_nodegroup
(name=None, host=None, qt=True, **kwds)[source]¶ Create a new NodeGroup process and return a proxy to the NodeGroup.
A NodeGroup is a process that manages one or more Nodes for device interaction, computation, or GUI.
Parameters: - name : str
A name for the new NodeGroup’s process. This name is used to uniquely identify log messages originating from this nodegroup.
- host : None | str | ObjectProxy<Host>
Optional address of the Host that should be used to spawn the new NodeGroup, or a proxy to the Host itself. If omitted, then the NodeGroup is spawned from the Manager’s default host.
- qt : bool
Whether to start a QApplication in the new process. Default is True.
- All extra keyword arguments are passed to `Host.create_nodegroup()`.
-
-
class
pyacq.core.
NodeGroup
(host, manager)[source]¶ NodeGroup is responsible for managing a collection of Nodes within a single process.
NodeGroups themselves are created and destroyed by Hosts, which manage all NodeGroups on a particular machine.
-
create_node
(node_class, *args, **kwds)[source]¶ Create a new Node and add it to this NodeGroup.
Return the new Node.
-
register_node_type_from_module
(modname, classname)[source]¶ Register a Node subclass with this NodeGroup.
This allows custom Node subclasses to be instantiated in this NodeGroup using
NodeGroup.create_node()
.
-
-
class
pyacq.core.host.
Host
(name, poll_procs=False)[source]¶ Host serves as a pre-existing contact point for spawning new processes on a remote machine.
One Host instance must be running on each machine that will be connected to by a Manager. The Host is only responsible for creating and destroying NodeGroups.
-
check_spawners
()[source]¶ Check for any processes that have exited and report them to their manager.
This method is called by a timer if the host is created with poll_procs True.
-
create_nodegroup
(name, manager=None, qt=True, **kwds)[source]¶ Create a new NodeGroup in a new process and return a proxy to it.
Parameters: - name : str
The name of the new NodeGroup. This will also be used as the name of the process in log records sent to the Manager.
- manager : Manager | ObjectProxy<Manager> | None
The Manager to which this NodeGroup belongs.
- qt : bool
Whether to start a QApplication in the new process. Default is True.
- All extra keyword arguments are passed to `ProcessSpawner()`.
-
-
class
pyacq.core.
Node
(name='', parent=None)[source]¶ A Node is the basic element for generating and processing data streams in pyacq.
Nodes may be used to interact with devices, generate data, store data, perform computations, or display user interfaces. Each node may have multiple input and output streams that connect to other nodes. For example:
[ data acquisition node ] -> [ processing node ] -> [ display node ] -> [ recording node ]
An application may directly create and connect the Nodes it needs, or it may use a Manager to create a network of nodes distributed across multiple processes or machines.
The order of operations when creating and operating a node is very important:
- Instantiate the node directly or remotely using NodeGroup.create_node.
- Call Node.configure(…) to set global parameters such as sample rate, channel selections, etc.
- Connect inputs to their sources (if applicable): Node.inputs[‘input_name’].connect(other_node.outpouts[‘output_name’])
- Configure outputs: Node.outputs[‘output_name’].configure(…)
- Call Node.initialize(), which will verify input/output settings, allocate memory, prepare devices, etc.
- Call Node.start() and Node.stop() to begin/end reading from input streams and writing to output streams. These may be called multiple times.
- Close the node with Node.close(). If the node was created remotely, this is handled by the NodeGroup to which it belongs.
Notes
For convenience, if a Node has only 1 input or 1 output:
- Node.inputs[‘input_name’] can be written Node.input
- Node.outputs[‘output_name’] can be written Node.output
When there are several outputs or inputs, this shorthand is not permitted.
The state of a Node can be requested using thread-safe methods:
- Node.running()
- Node.configured()
- Node.initialized()
-
after_input_connect
(inputname)[source]¶ This method is called when one of the Node’s inputs has been connected.
It may be reimplemented by subclasses.
-
after_output_configure
(outputname)[source]¶ This method is called when one of the Node’s outputs has been configured.
It may be reimplemented by subclasses.
-
check_input_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that inputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
check_output_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that outputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
close
()[source]¶ Close the Node.
This causes all input/output connections to be closed. Nodes must be stopped before they can be closed.
-
configure
(**kargs)[source]¶ Configure the Node.
This method is used to set global parameters such as sample rate, channel selections, etc. Each Node subclass determines the allowed arguments to this method by reimplementing Node._configure().
-
configured
()[source]¶ Return True if the Node has already been configured.
This method is thread-safe.
-
initialize
()[source]¶ Initialize the Node.
This method prepares the node for operation by allocating memory, preparing devices, checking input and output specifications, etc. Node subclasses determine the behavior of this method by reimplementing Node._initialize().
-
initialized
()[source]¶ Return True if the Node has already been initialized.
This method is thread-safe.
-
input
¶ Return the single input for this Node.
If the node does not have exactly one input, then raise AssertionError.
-
output
¶ Return the single output for this Node.
If the node does not have exactly one put, then raise AssertionError.
Stream Classes¶
-
class
pyacq.core.
InputStream
(spec=None, node=None, name=None)[source]¶ Class for streaming data from an OutputStream.
Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.
Typical InputStream usage:
- Use
InputStream.connect()
to connect to anOutputStream
defined elsewhere. Usually, the argument will actually be a proxy to a remoteOutputStream
. - Poll for incoming data packets with
InputStream.poll()
. - Receive the next packet with
InputStream.recv()
.
Optionally, use
InputStream.set_buffer()
to attach aRingBuffer
for easier data handling.-
close
()[source]¶ Close the stream.
This closes the socket. No data can be received after this point.
-
connect
(output)[source]¶ Connect an output to this input.
Any data send over the stream using
output.send()
can be retrieved usinginput.recv()
.Parameters: - output : OutputStream (or proxy to a remote OutputStream)
The OutputStream to connect.
-
empty_queue
()[source]¶ Receive all pending messing in the zmq queue without consuming them. This is usefull when a Node do not start at the same time than other nodes but was already connected. In that case the zmq water mecanism put messages in a queue and when you start cusuming you get old message. This can be annoying. This recv every thing with timeout=0 and so empty the queue.
-
get_data
(*args, **kargs)[source]¶ Return a segment of the RingBuffer attached to this InputStream.
If no RingBuffer is attached, raise an exception.
For parameters, see
RingBuffer.get_data()
.See also:
InputStream.set_buffer()
.
-
poll
(timeout=None)[source]¶ Poll the socket of input stream.
Return True if a new packet is available.
-
recv
(**kargs)[source]¶ Receive a chunk of data.
Returns: - index: int
The absolute sample index. This is the index of the last sample + 1.
- data: np.ndarray or bytes
The received chunk of data. If the stream uses
transfermode='sharedarray'
, then the data is returned as None and you must useinput_stream[start:stop]
to read from the shared array orinput_stream.recv(with_data=True)
to return the received data chunk.
- Use
-
class
pyacq.core.
OutputStream
(spec=None, node=None, name=None)[source]¶ Class for streaming data to an InputStream.
Streams allow data to be sent between objects that may exist on different threads, processes, or machines. They offer a variety of transfer methods including TCP for remote connections and IPC for local connections.
Parameters: - spec : dict
Required parameters for this stream. These may not be overridden when calling
configure()
later on.- node : Node or None
- name : str or None
-
configure
(**kargs)[source]¶ Configure the output stream.
Parameters: - protocol : ‘tcp’, ‘udp’, ‘inproc’ or ‘inpc’ (linux only)
The type of protocol used for the zmq.PUB socket
- interface : str
The bind adress for the zmq.PUB socket
- port : str
The port for the zmq.PUB socket
- transfermode: str
The method used for data transfer:
- ‘plaindata’: data are sent over a plain socket in two parts: (frame index, data).
- ‘sharedmem’: data are stored in shared memory in a ring buffer and the current frame index is sent over the socket.
- ‘shared_cuda_buffer’: (planned) data are stored in shared Cuda buffer and the current frame index is sent over the socket.
- ‘share_opencl_buffer’: (planned) data are stored in shared OpenCL buffer and the current frame index is sent over the socket.
All registered transfer modes can be found in pyacq.core.stream.all_transfermodes.
- streamtype: ‘analogsignal’, ‘digitalsignal’, ‘event’ or ‘image/video’
The nature of data to be transferred.
- dtype: str (‘float32’,’float64’, [(‘r’, ‘uint16’), (‘g’, ‘uint16’), , (‘b’, ‘uint16’)], …)
The numpy.dtype of the data buffer. It can be a composed dtype for event or images.
- shape: list
The shape of each data frame. If the stream will send chunks of variable length, then use -1 for the first (time) dimension.
- For
streamtype=image
, the shape should be(-1, H, W)
or(n_frames, H, W)
. - For
streamtype=analogsignal
the shape should be(n_samples, n_channels)
or(-1, n_channels)
.
- For
- compression: ‘’, ‘blosclz’, ‘blosc-lz4’
The compression for the data stream. The default uses no compression.
- scale: float
An optional scale factor + offset to apply to the data before it is sent over the stream.
output = offset + scale * input
- offset:
See scale.
- units: str
Units of the stream data. Mainly used for ‘analogsignal’.
- sample_rate: float or None
Sample rate of the stream in Hz.
- kwargs :
All extra keyword arguments are passed to the DataSender constructor for the chosen transfermode (for example, see
SharedMemSender
).
-
class
pyacq.core.stream.plaindatastream.
PlainDataSender
(socket, params)[source]¶ Helper class to send data serialized over socket.
Note: this class is usually not instantiated directly; use
OutputStream.configure(transfermode='plaindata')
.To avoid unnecessary copies (and thus optimize transmission speed), data is sent exactly as it appears in memory including array strides.
This class supports compression.
Stream sender that uses shared memory for efficient interprocess communication. Only the data pointer is sent over the socket.
Note: this class is usually not instantiated directly; use
OutputStream.configure(transfermode='sharedmem')
.Extra parameters accepted when configuring the output stream:
- buffer_size (int) the size of the shared memory buffer in frames.
The total shape of the allocated buffer is
(buffer_size,) + shape
. - double (bool) if True, then the buffer size is doubled and all frames are written to the buffer twice. This makes it possible to guarantee zero-copy reads by any connected InputStream.
- axisorder (tuple) The order that buffer axes should be arranged in memory. This makes it possible to optimize for specific algorithms that expect either row-major or column-major alignment. The default is row-major; the time axis comes first in the axis order.
- fill (float) Value used to fill the buffer where no data is available.
- buffer_size (int) the size of the shared memory buffer in frames.
The total shape of the allocated buffer is
Data Handling Classes¶
-
class
pyacq.core.
RingBuffer
(shape, dtype, double=True, shmem=None, fill=None, axisorder=None)[source]¶ Class that collects data as it arrives from an InputStream and writes it into a single- or double-ring buffer.
This allows the user to request the concatenated history of data received by the stream, up to a predefined length. Double ring buffers allow faster, copyless reads at the expense of doubled write time and memory footprint.
-
get_data
(start, stop, copy=False, join=True)[source]¶ Return a segment of the ring buffer.
Parameters: - start : int
The starting index of the segment to return.
- stop : int
The stop index of the segment to return (the sample at this index will not be included in the returned data)
- copy : bool
If True, then a copy of the data is returned to ensure that modifying the data will not affect the ring buffer. If False, then a reference to the buffer will be returned if possible. Default is False.
- join : bool
If True, then a single contiguous array is returned for the entire requested segment. If False, then two separate arrays are returned for the beginning and end of the requested segment. This can be used to avoid an unnecessary copy when the buffer has double=False and the caller does not require a contiguous array.
-
Class to create a shared memory buffer.
This class uses mmap so that unrelated processes (not forked) can share it.
It is usually not necessary to instantiate this class directly; use
OutputStream.configure(transfermode='sharedmem')
.Parameters: - size : int
Buffer size in bytes.
- shm_id : str or None
The id of an existing SharedMem to open. If None, then a new shared memory file is created. On linux this is the filename, on Windows this is the tagname.
Close this buffer.
Return a dict that can be serialized and sent to other processes to access this buffer.
Return a numpy array pointing to part (or all) of this buffer.
Remote Process Control API¶
Contents:
Overview: Remote Process Control¶
Pyacq implements a system for spawning and controlling remote processes through object proxies. This allows remote objects to be treated almost exactly as if they were local objects, with the exception that methods of object proxies may be called asynchronously.
The remote process control system consists of several components:
RPCServer
uses ZeroMQ to listen for serialized requests to control the process by invoking methods, returning objects, etc. RPCServer instances are automatically created in subprocesses when using ProcessSpawner.RPCClient
sends messages and receives responses from an RPCServer in another thread, process, or host. Each RPCClient instance connects to only one RPCServer. RPCClient instances are created automatically when using ProcessSpawner, or can be created manually using RPCClient.get_client.ObjectProxy
is the class used to represent any type of remote object. Invoking methods on an ObjectProxy causes a request to be sent to the remote process, and the result is eventually returned via the ObjectProxy.ProcessSpawner
is used to spawn new processes on the same machine as the caller. New processes will automatically start an RPCServer, and an RPCClient will be created in the parent process.- Serializers (currently msgpack and json are supported) are used to encode basic python types for transfer over ZeroMQ sockets. Clients are free to pick whichever serializer they prefer. List of data types:
- Logging tools that allow log records, uncaught excaptions, and stdout/stderr data to be sent to a remote log server. These are essential for debugging multiprocess applications.
The following simple example makes use of most of these components, although only ProcessSpawner and ObjectProxy are directly visible to the user:
from pyacq.core.rpc import ProcessSpawner
# Start a new process with an RPCServer running inside
proc = ProcessSpawner()
# Ask the remote process to import a module and return a proxy to it
remote_col = proc.client._import('collections')
# Create a new object (an ordered dict) in the remote process
remote_dict = remote_col.OrderedDict()
# Interact with the new object exactly as if it were local:
remote_dict['x'] = 1
assert 'x' in remote_dict.keys()
assert remote_dict['x'] == 1
Using object proxies allows remote objects to be accessed using the same syntax as if they were local. However, there are two major differences to consider when using remote objects:
First, function arguments and return values in Python are passed by reference. This means that both the caller and the callee operate on the same Python object. Since it is not possible to share python objects between processes, we are restricted to sending them either by copy or by proxy. By default, arguments and return values for remote functions are serialized if possible, or passed by proxy otherwise.
Second, remote functions can be called asynchronously. By default, calling
a remote function will block until the return value has arrived. However, any
remote function call can be made asynchronous by adding a special argument:
_sync='async'
. In this case, the function call will immediately return a
Future
object that can be used to
access the return value when it arrives.
ObjectProxy and Future¶
After initial setup, these classes are the main API through which a remote process is controlled.
-
class
pyacq.core.rpc.
ObjectProxy
(rpc_addr, obj_id, ref_id, type_str='', attributes=(), **kwds)[source]¶ Proxy to an object stored by a remote
RPCServer
.A proxy behaves in most ways like the object that it wraps–you can request the same attributes, call methods, etc. There are a few important differences:
- A proxy can be on a different thread, process, or machine than its object, so long as the object’s thread has an RPCServer and the proxy’s thread has an associated RPCClient.
- Attribute lookups and method calls can be slower because the request and response must traverse a socket. These can also be performed asynchronously to avoid blocking the client thread.
- Function argument and return types must be serializable or proxyable. Most basic types can be serialized, including numpy arrays. All other objects are automatically proxied, but there are some cases when this will not work well.
__repr__()
and__str__()
are overridden on proxies to allow safe debugging.__hash__()
is overridden to ensure that remote hash values are not used locally.
For the most part, object proxies can be used exactly as if they are local objects:
client = RPCClient(address) # connect to RPCServer rsys = client._import('sys') # returns proxy to sys module on remote process rsys.stdout.write # proxy to remote sys.stdout.write rsys.stdout.write('hello') # calls sys.stdout.write('hello') on remote machine # and returns the result (None)
When calling a proxy to a remote function, the call can be made synchronous (caller is blocked until result can be returned), asynchronous (Future is returned immediately and result can be accessed later), or return can be disabled entirely:
ros = proc._import('os') # synchronous call; caller blocks until result is returned pid = ros.getpid() # asynchronous call request = ros.getpid(_sync='async') while not request.hasResult(): time.sleep(0.01) pid = request.result() # disable return when we know it isn't needed. rsys.stdout.write('hello', _sync='off')
Additionally, values returned from a remote function call are automatically returned either by value (must be picklable) or by proxy. This behavior can be forced:
rnp = proc._import('numpy') arrProxy = rnp.array([1,2,3,4], _return_type='proxy') arrValue = rnp.array([1,2,3,4], _return_type='value')
The default sync and return_type behaviors (as well as others) can be set for each proxy individually using ObjectProxy._set_proxy_options() or globally using proc.set_proxy_options().
It is also possible to send arguments by proxy if an RPCServer is running in the caller’s thread (this can be used, for example, to connect Qt signals across network connections):
def callback(): print("called back.") # Remote process can invoke our callback function as long as there is # a server running here to process the request. remote_object.set_callback(proxy(callback))
-
__call__
(*args, **kwargs)[source]¶ Call the proxied object from the remote process.
All positional and keyword arguments (except those listed below) are sent to the remote procedure call.
In synchronous mode (see parameters below), this method blocks until the remote return value has been received, and then returns that value. In asynchronous mode, this method returns a
Future
instance immediately, which may be used to retrieve the return value later. If return is disabled, then the method immediately returns None.If the remote call raises an exception on the remote process, then this method will raise RemoteCallException if in synchronous mode, or calling
Future.result()
will raise RemoteCallException if in asynchronous mode. If return is disabled, then remote exceptions will be ignored.Parameters: - _sync: ‘off’, ‘sync’, or ‘async’
Set the sync mode for this call. The default value is determined by the ‘sync’ argument to
_set_proxy_options()
.- _return_type: ‘value’, ‘proxy’, or ‘auto’
Set the return type for this call. The default value is determined by the ‘return_type’ argument to
_set_proxy_options()
.- _timeout: float
Set the timeout for this call. The default value is determined by the ‘timeout’ argument to
_set_proxy_options()
.
See also
-
__getattr__
(attr)[source]¶ Calls __getattr__ on the remote object and returns the attribute by value or by proxy depending on the options set (see ObjectProxy._set_proxy_options and RemoteEventHandler.set_proxy_options)
If the option ‘defer_getattr’ is True for this proxy, then a new proxy object is returned _without_ asking the remote object whether the named attribute exists. This can save time when making multiple chained attribute requests, but may also defer a possible AttributeError until later, making them more difficult to debug.
-
_delete
(sync='sync', **kwds)[source]¶ Ask the RPC server to release the reference held by this proxy.
Note: this does not guarantee the remote object will be deleted; only that its reference count will be reduced. Any copies of this proxy will no longer be usable.
-
_get_value
()[source]¶ Return the value of the proxied object.
If the object is not serializable, then raise an exception.
-
_set_proxy_options
(**kwds)[source]¶ Change the behavior of this proxy. For all options, a value of None will cause the proxy to instead use the default behavior defined by its parent Process.
Parameters: - sync : ‘sync’, ‘async’, ‘off’, or None
If ‘async’, then calling methods will return a
Future
object that can be used to inquire later about the result of the method call. If ‘sync’, then calling a method will block until the remote process has returned its result or the timeout has elapsed (in this case, a Request object is returned instead). If ‘off’, then the remote process is instructed not to reply and the method call will return None immediately. This option can be overridden by supplying a_sync
keyword argument when calling the method (see__call__()
).- return_type : ‘auto’, ‘proxy’, ‘value’, or None
If ‘proxy’, then the value returned when calling a method will be a proxy to the object on the remote process. If ‘value’, then attempt to pickle the returned object and send it back. If ‘auto’, then the decision is made by consulting the ‘no_proxy_types’ option. This option can be overridden by supplying a
_return_type
keyword argument when calling the method (see__call__()
).- auto_proxy : bool or None
If True, arguments to __call__ are automatically converted to proxy unless their type is listed in no_proxy_types (see below). If False, arguments are left untouched. Use proxy(obj) to manually convert arguments before sending.
- timeout : float or None
Length of time to wait during synchronous requests before returning a Request object instead. This option can be overridden by supplying a
_timeout
keyword argument when calling a method (see__call__()
).- defer_getattr : True, False, or None
If False, all attribute requests will be sent to the remote process immediately and will block until a response is received (or timeout has elapsed). If True, requesting an attribute from the proxy returns a new proxy immediately. The remote process is not contacted to make this request. This is faster, but it is possible to request an attribute that does not exist on the proxied object. In this case, AttributeError will not be raised until an attempt is made to look up the attribute on the remote process.
- no_proxy_types : list
List of object types that should not be proxied when sent to the remote process.
- auto_delete : bool
If True, then the proxy will automatically call self._delete() when it is collected by Python.
-
class
pyacq.core.rpc.
Future
(client, call_id)[source]¶ Represents a return value from a remote procedure call that has not yet arrived.
Instances of Future are returned from
ObjectProxy.__call__()
when used with_sync='async'
. This is the mechanism through which remote functions may be called asynchronously.Use
done()
to determine whether the return value (or an error message) has arrived, andresult()
to get the return value. If the remote call raised an exception, then callingresult()
will raise RemoteCallException with a transcript of the original exception.See concurrent.futures.Future in the Python documentation for more information.
ProcessSpawner¶
-
class
pyacq.core.rpc.
ProcessSpawner
(name=None, address='tcp://127.0.0.1:*', qt=False, log_addr=None, log_level=None, executable=None)[source]¶ Utility for spawning and bootstrapping a new process with an
RPCServer
.Automatically creates an
RPCClient
that is connected to the remote process (spawner.client
).Parameters: - name : str | None
Optional process name that will be assigned to all remote log records.
- address : str
ZMQ socket address that the new process’s RPCServer will bind to. Default is
'tcp://127.0.0.1:*'
.Note: binding RPCServer to a public IP address is a potential security hazard (see
RPCServer
).- qt : bool
If True, then start a Qt application in the remote process, and use a
QtRPCServer
.- log_addr : str
Optional log server address to which the new process will send its log records. This will also cause the new process’s stdout and stderr to be captured and forwarded as log records.
- log_level : int
Optional initial log level to assign to the root logger in the new process.
- executable : str | None
Optional python executable to invoke. The default value is sys.executable.
Examples
# start a new process proc = ProcessSpawner() # ask the child process to do some work mod = proc._import('my.module') mod.do_work() # close the child process proc.close() proc.wait()
-
client
= None¶ An RPCClient instance that is connected to the RPCServer in the remote process
RPCClient and RPCServer¶
These classes implement the low-level communication between a server and client. They are rarely used directly by the user except occasionally for initial setup and closing.
-
class
pyacq.core.rpc.
RPCClient
(address, reentrant=True, serializer='msgpack')[source]¶ Connection to an
RPCServer
.Each RPCClient connects to only one server, and may be used from only one thread. RPCClient instances are created automatically either through
ProcessSpawner
or by requesting attributes form anObjectProxy
. In general, it is not necessary for the user to interact directly with RPCClient.Parameters: - address : URL
Address of RPC server to connect to.
- reentrant : bool
If True, then this client will allow the server running in the same thread (if any) to process requests whenever the client is waiting for a response. This is necessary to avoid deadlocks in case of reentrant RPC requests (eg, server A calls server B, which then calls server A again). Default is True.
-
call_obj
(obj, args=None, kwargs=None, **kwds)[source]¶ Invoke a remote callable object.
Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer.
- args : tuple
Arguments to pass to the remote call.
- kwargs : dict
Keyword arguments to pass to the remote call.
- kwds :
All extra keyword arguments are passed to
send()
.
- obj :
-
close_server
(sync='sync', timeout=1.0, **kwds)[source]¶ Ask the server to close.
The server returns True if it has closed. All clients known to the server will be informed that the server has disconnected.
If the server has already disconnected from this client, then the method returns True without error.
-
delete
(obj, **kwds)[source]¶ Delete an object proxy.
This informs the remote process that an
ObjectProxy
is no longer needed. The remote process will decrement a reference counter and delete the referenced object if it is no longer held by any proxies.Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer.
- kwds :
All extra keyword arguments are passed to
send()
.
Notes
After a proxy is deleted, it cannot be used to access the remote object even if the server has not released the remote object yet. This also applies to proxies that are sent to a third process. For example, consider three processes A, B, C: first A acquires a proxy to an object owned by B. A sends the proxy to C, and then deletes the proxy. If C attempts to access this proxy, an exception will be raised because B has already remoted the reference held by this proxy. However, if C independently acquires a proxy to the same object owned by B, then that proxy will continue to function even after A deletes its proxy.
- obj :
-
static
get_client
(address)[source]¶ Return the RPC client for this thread and a given server address.
If no client exists already, then a new one will be created. If the server is running in the current thread, then return None.
See also
-
get_obj
(obj, **kwds)[source]¶ Return a copy of a remote object.
Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer. The object will be serialized and returned if possible, otherwise a new proxy is returned.
- kwds :
All extra keyword arguments are passed to
send()
.
- obj :
-
ping
(sync='sync', **kwds)[source]¶ Ping the server.
This can be used to test connectivity to the server.
-
process_msg
(msg)[source]¶ Handle one message received from the remote process.
This takes care of assigning return values or exceptions to existing Future instances.
-
process_until_future
(future, timeout=None)[source]¶ Process all incoming messages until receiving a result for future.
If the future result is not raised before the timeout, then raise TimeoutError.
While waiting, the RPCServer for this thread (if any) is also allowed to process requests.
Parameters: - future : concurrent.Future instance
The Future to wait for. When the response for this Future arrives from the server, the method returns.
- timeout : float
Maximum time (seconds) to wait for a response.
-
send
(action, opts=None, return_type='auto', sync='sync', timeout=10.0)[source]¶ Send a request to the remote process.
It is not necessary to call this method directly; instead use
call_obj()
,get_obj()
,__getitem__()
,__setitem__()
,transfer()
,delete()
,import()
, orping()
.The request is given a unique ID that is included in the response from the server (if any).
Parameters: - action : str
The action to invoke on the remote process. See list of actions below.
- opts : None or dict
Extra options to be sent with the request. Each action requires a different set of options. See list of actions below.
- return_type : ‘auto’ | ‘proxy’
If ‘proxy’, then the return value is sent by proxy. If ‘auto’, then the server decides based on the return type whether to send a proxy.
- sync : str
If ‘sync’, then block and return the result when it becomes available. If ‘async’, then return a Future instance immediately. If ‘off’, then ask the remote server NOT to send a response and return None immediately.
- timeout : float
The amount of time to wait for a response when in synchronous operation (sync=’sync’). If the timeout elapses before a response is received, then raise TimeoutError.
Notes
The following table lists the actions that are recognized by RPCServer. The action argument to send() may be any string from the Action column below, and the opts argument must be a dict with the keys listed in the Options column.
Action Description Options call_obj Invoke a callable obj: a proxy to the callable objectargs: a tuple of positional argumentskwargs: a dict of keyword argumentsget_obj Return the object referenced by a proxy obj: a proxy to the object to returnget_item Return a named object name: string name of the object to returnset_item Set a named object name: string name to setvalue: object to assign to namedelete Delete a proxy reference obj_id: proxy object IDref_id: proxy reference IDimport Import and return a proxy to a module module: name of module to importping Return ‘pong’
-
class
pyacq.core.rpc.
RPCServer
(address='tcp://127.0.0.1:*')[source]¶ Remote procedure call server for invoking requests on proxied objects.
RPCServer instances are automatically created when using
ProcessSpawner
. It is rarely necessary for the user to interact directly with RPCServer.There may be at most one RPCServer per thread. RPCServers can be run in a few different modes:
- Exclusive event loop: call run_forever() to cause the server to listen indefinitely for incoming request messages.
- Lazy event loop: call run_lazy() to register the server with the current thread. The server’s socket will be polled whenever an RPCClient is waiting for a response (this allows reentrant function calls). You can also manually listen for requests with _read_and_process_one() in this mode.
- Qt event loop: use
QtRPCServer
. In this mode, messages are polled in a separate thread, but then sent to the Qt event loop by signal and processed there. The server is registered as running in the Qt thread.
Parameters: - name : str
Name used to identify this server.
- address : URL
Address for RPC server to bind to. Default is
'tcp://127.0.0.1:*'
.Note: binding RPCServer to a public IP address is a potential security hazard.
Notes
RPCServer is not a secure server. It is intended to be used only on trusted networks; anyone with tcp access to the server can execute arbitrary code on the server.
RPCServer is not a thread-safe class. Only use
RPCClient
to communicate with RPCServer from other threads.Examples
# In host/process/thread 1: server = RPCServer() rpc_addr = server.address # Publish an object for others to access easily server['object_name'] = MyClass() # In host/process/thread 2: (you must communicate rpc_addr manually) client = RPCClient(rpc_addr) # Get a proxy to published object; use this (almost) exactly as you # would a local object: remote_obj = client['object_name'] remote_obj.method(...) # Or, you can remotely import and operate a module: remote_module = client._import("my.module.name") remote_obj = remote_module.MyClass() remote_obj.method(...) # See ObjectProxy for more information on interacting with remote # objects, including (a)synchronous communication.
-
address
= None¶ The zmq address where this server is listening (e.g. ‘tcp:///127.0.0.1:5678’)
-
get_proxy
(obj, **kwds)[source]¶ Return an ObjectProxy referring to a local object.
This proxy can be sent via RPC to any other node.
-
static
get_server
()[source]¶ Return the server running in this thread, or None if there is no server.
-
static
local_client
()[source]¶ Return the RPCClient used for accessing the server running in the current thread.
-
process_action
(action, opts, return_type, caller)[source]¶ Invoke a single action and return the result.
-
static
register_server
(srv)[source]¶ Register a server as the (only) server running in this thread.
This static method fails if another server is already registered for this thread.
-
run_lazy
()[source]¶ Register this server as being active for the current thread, but do not actually begin processing requests.
RPCClients in the same thread will allow the server to process requests while they are waiting for responses. This can prevent deadlocks that occur when
This can also be used to allow the user to manually process requests.
-
start_timer
(callback, interval, **kwds)[source]¶ Start a timer that invokes callback at regular intervals.
Parameters: - callback : callable
Callable object to invoke. This must either be an ObjectProxy or an object that is safe to call from the server’s thread.
- interval : float
Minimum time to wait between callback invocations (start to start).
-
class
pyacq.core.rpc.
QtRPCServer
(address='tcp://127.0.0.1:*', quit_on_close=True)[source]¶ RPCServer that lives in a Qt GUI thread.
This server may be used to create and manage QObjects, QWidgets, etc. It uses a separate thread to poll for RPC requests, which are then sent to the Qt event loop using by signal. This allows the RPC actions to be executed in a Qt GUI thread without using a timer to poll the RPC socket. Responses are sent back to the poller thread by a secondary socket.
QtRPCServer may be started in newly spawned processes using
ProcessSpawner
.Parameters: - address : str
ZMQ address to listen on. Default is
'tcp://127.0.0.1:*'
.Note: binding RPCServer to a public IP address is a potential security hazard. See
RPCServer
.- quit_on_close : bool
If True, then call QApplication.quit() when the server is closed.
Examples
Spawning in a new process:
# Create new process. proc = ProcessSpawner(qt=True) # Display a widget from the new process. qtgui = proc._import('PyQt4.QtGui') w = qtgui.QWidget() w.show()
Starting in an existing Qt application:
# Create server. server = QtRPCServer() # Start listening for requests in a background thread (this call # returns immediately). server.run_forever()
Serializers¶
Serializers provide a mechanism for some data types to be copied from one process to another by converting Python objects into byte strings and vice-versa. Currently, two serializer classes are supported:
- msgpack provides efficient serialization for all supported types, including large binary data.
- json is somewhat less efficient in encoding large binary data, but is more universally supported across platforms where msgpack may be unavailable.
The basic types supported by both serializers are int
, float
, str
,
dict
, and list
. Further data types are serialized by first converting
to a dict containing the key ___type_name___
in order to distinguish it
from normal dicts (see Serializer.encode()
and Serializer.decode()
):
datetime = {
'___type_name___': 'datetime',
'data': obj.strftime('%Y-%m-%dT%H:%M:%S.%f')
}
date = {
'___type_name___': 'date',
'data': obj.strftime('%Y-%m-%d')
}
nonetype = {
'___type_name___': 'none'
}
objectproxy = {
'___type_name___': 'proxy',
'rpc_addr': obj._rpc_addr,
'obj_id': obj._obj_id,
'ref_id': obj._ref_id,
'type_str': obj._type_str,
'attributes': obj._attributes,
}
Types containing byte strings are handled differently between msgpack and json. In msgpack, byte strings are natively supported:
np.ndarray = {
'___type_name___': 'ndarray',
'data': array.tostring(),
'dtype': str(array.dtype),
'shape': array.shape
}
# no need to convert; msgpack already handles this type
bytes = bytes_obj
However json does not support byte strings, so in this case the strings must be base-64 encoded before being serialized:
ndarray = {
'___type_name___': 'ndarray',
'data': base64.b64encode(array.data).decode(),
'dtype': str(array.dtype),
'shape': array.shape
}
bytes = {
'__type_name__': 'bytes',
'data': base64.b64encode(bytes_obj).decode()
}
Note that both serializers convert tuples into lists automatically. This is undesirable, but is currently not configurable in a consistent way across both serializers.
It is possible to add support for new serializers by creating a subclass of
Serializer
and modifying pyacq.core.rpc.serializer.all_serializers
.
-
class
pyacq.core.rpc.serializer.
Serializer
(server=None, client=None)[source]¶ Base serializer class on which msgpack and json serializers (and potentially others) are built.
Subclasses must be registered by adding to the
all_serializers
global.Supports ndarray, date, datetime, and bytes for transfer in addition to the standard types supported by json and msgpack. All other types are converted to an object proxy that can be used to access methods / attributes of the object remotely (this requires that the object be owned by an RPC server).
Note that tuples are converted to lists in transit. See: https://github.com/msgpack/msgpack-python/issues/98
Logging tools¶
These tools allow log records and unhandled exceptions to be forwarded to a central log server. Basic usage consists of:
- Start a log server in any process using
start_log_server()
. - Attach a handler to the root logger (see Python logging documentation). If
the log server is running in a process that can output to a terminal, then
RPCLogHandler
can be used to display log records color-coded by source. - Set the log level of the root logger. Using INFO or DEBUG levels will reveal details about RPC communications between processes.
- In the remote process set the log level and call
set_host_name()
,set_process_name()
,set_thread_name()
, andset_logger_address()
(note thatProcessSpawner
handles this step automatically).
-
pyacq.core.rpc.log.
start_log_server
(logger)[source]¶ Create a global log server and attach it to a logger.
Use get_logger_address() to return the socket address for the server after it has started. On a remote process, call set_logger_address() to connect it to the server. Then all messages logged remotely will be forwarded to the server and handled by the logging system there.
-
class
pyacq.core.rpc.log.
LogServer
(logger, address='tcp://127.0.0.1:*', sort=True)[source]¶ Thread for receiving log records via zmq socket.
Messages are immediately passed to a python logger for local handling.
Parameters: - logger : Logger
The python logger that should handle incoming messages.
-
run
()[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
class
pyacq.core.rpc.log.
LogSender
(address=None, logger=None)[source]¶ Handler for forwarding log messages to a remote LogServer via zmq socket.
Instances of this class can be attached to any python logger using logger.addHandler(log_sender).
This can be used with LogServer to collect log messages from many remote processes to a central logger.
Note: We do not use RPC for this because we have to avoid generating extra log messages.
Parameters: - address : str | None
The socket address of a log server. If None, then the sender is not connected to a server and connect() must be called later.
- logger : str | None
The name of the python logger to which this handler should be attached. If None, then the handler is not attached (use ‘’ for the root logger).
-
close
()[source]¶ Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.
-
class
pyacq.core.rpc.log.
RPCLogHandler
(stream=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>)[source]¶ StreamHandler that sorts incoming log records by their creation time and writes to stderr. Messages are also colored by their log level and the host/process/thread that created the record.
Credit: https://gist.github.com/kergoth/813057
Parameters: - stream : file-like
The stream to which messages should be sent. The default is sys.stderr.
-
pyacq.core.rpc.log.
log_exceptions
()[source]¶ Install a hook that creates log messages from unhandled exceptions.
-
pyacq.core.rpc.log.
set_thread_name
(name, tid=None)[source]¶ Set the name of a thread used for logging.
If no thread ID is given, then the current thread’s ID is used.
-
pyacq.core.rpc.log.
get_thread_name
(tid=None)[source]¶ Return the name of a thread used for logging.
If no thread ID is given, then the current thread’s ID is used.
Device Nodes¶
Audio¶
-
class
pyacq.devices.
PyAudio
(**kargs)[source]¶ Simple wrapper around PyAudio for input and output to audio devices.
-
check_input_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that inputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
check_output_specs
()[source]¶ This method is called during Node.initialize() and may be reimplemented by subclasses to ensure that outputs are correctly configured before the node is started.
In case of misconfiguration, this method must raise an exception.
-
configure
(*args, **kwargs)[source]¶ Parameters: - nb_channel : int
Number of audio channels
- sample_rate: float
Sample rate. This value is rounded to integer.
- input_device_index : int or None
Input device index (see list_device_specs() and pyaudio documentation). If None then no recording will be requested from the device, and the node will have no output.
- output_device_index: in or None
Output device index (see list_device_specs() and pyaudio documentation). If None then no playback will be requested from the device, and the node will have no input.
- format : str in (‘int16’, ‘int32’ or ‘float32’)
Internal data format for pyaudio.
- chunksize : int (1024 by default)
Size of each chunk. Smaller chunks result in lower overall latency, but may also cause buffering issues (cracks/pops in sound).
-
Cameras¶
EEG¶
-
class
pyacq.devices.brainampsocket.
BrainAmpSocket
(**kargs)[source]¶ BrainAmp EEG amplifier from Brain Products http://www.brainproducts.com/.
This class is a bridge between pyacq and the socket-based data streaming provided by the Vision recorder acquisition software.
-
class
pyacq.devices.
Emotiv
(**kargs)[source]¶ Simple eeg emotiv device to access eeg, impedances and gyro data in a Node.
Reverse engineering and original crack code written by Cody Brocious (http://github.com/daeken) Kyle Machulis (http://github.com/qdot) Many thanks for their contribution.
Emotiv USB emit 32-bytes reports at a rate of 128Hz, encrypted via AES see https://github.com/qdot/emokit/blob/master/doc/emotiv_protocol.asciidoc for more details
-
class
pyacq.devices.
OpenBCI
(**kargs)[source]¶ This class is a bridge between Pyacq and the 32bit board OpenBCI amplifier from the open source project http://openbci.com. Daisy board version for now
#TODO : this is a very basic code to grab data from 8 channel Daisy OpenBCI board. # next version will improve dialog with the board and auto-initialisation
Testing¶
-
class
pyacq.devices.
NumpyDeviceBuffer
(**kargs)[source]¶ A fake analogsignal device.
This node streams data from a predefined buffer in an endless loop.
-
after_output_configure
(outputname)[source]¶ This method is called when one of the Node’s outputs has been configured.
It may be reimplemented by subclasses.
-
configure
(*args, **kwargs)[source]¶ Parameters: - nb_channel: int
Number of output channels.
- sample_interval: float
Time duration of a single data sample. This determines the rate at which data is sent.
- chunksize: int
Length of chunks to send.
- buffer: array
Data to send. Must have buffer.shape[0] == nb_channel.
-
Visualization Nodes¶
Analog signal visualizers¶
Spectral visualizers¶
-
class
pyacq.viewers.
QTimeFreq
(**kargs)[source]¶ Class for visualizing the frequency spectrogram with a Morlet continuous wavelet transform.
This allows better visualization than the standard FFT spectrogram because it provides better temporal resolution for high-frequency signals without sacrificing frequency resolution for low-frequency signals. See https://en.wikipedia.org/wiki/Morlet_wavelet
This class internally uses one TimeFreqWorker per channel, which allows multiple signals to be transformed in parallel.
The node operates in one of 2 modes:
- Each TimeFreqWorker lives in the same QApplication as the QTimeFreq node (nodegroup_friends=None).
- Each TimeFreqWorker is spawned in another NodeGroup to distribute the load (nodegroup_friends=[some_list_of_nodegroup]).
This viewer needs manual tuning for performance: small refresh_interval, high number of freqs, hight f_stop, and high xsize can all lead to heavy CPU load.
This node requires its input stream to use:
transfermode==sharedarray
axisorder==[1,0]
If the input stream does not meet these requirements, then a StreamConverter will be created to proxy the input.
QTimeFreq can be configured on the fly by changing QTimeFreq.params and QTimeFreq.by_channel_params. By default, double-clicking on the viewer will open a GUI dialog for these parameters.
Usage:
viewer = QTimeFreq() viewer.configure(with_user_dialog=True, nodegroup_friends=None) viewer.input.connect(somedevice.output) viewer.initialize() viewer.show() viewer.start() viewer.params['nb_column'] = 4 viewer.params['refresh_interval'] = 1000
Signal Processing Nodes¶
Triggering Nodes¶
-
class
pyacq.dsp.
AnalogTrigger
(parent=None, **kargs)[source]¶ No so efficient but quite robust trigger on analogsignal.
This act like a standart trigger with a threshold and a front. The channel can be selected among all.
All params can be set online via AnalogTrigger.params[‘XXX’] = …
- The main feature is the debounce mode combinated with debounce_time:
- ‘no-debounce’ all crossing threshold is a trigger
- ‘after-stable’ when interval between a series of triggers is too short, the lastet one is taken is account.
- ‘before-stable’ when interval between a series of triggers is too short, the first one is taken is account.
-
class
pyacq.dsp.
TriggerAccumulator
(parent=None, **kargs)[source]¶ Node that accumulate in a ring buffer chunk of a multi signals on trigger events.
This Node have no output because the stack size of signals chunks is online configurable. sharred memory is difficult because shape can change.
- The internal self.stack have 3 dims:
- 0 - trigger 1 - nb channel 2 - times
The self.total_trig indicate the number of triggers since the last reset_stack().
TriggerAccumulator.params[‘stask_size’] control the number of event in the stack. Note the stask behave as a ring buffer along the axis 0. So if self.total_trig>stask_size you need to play with modulo to acces the last event.
On each new chunk this new_chunk is emmited. Note that this do not occurs on new trigger but a bit after when the right_sweep is reached on signals stream.
Filtering nodes¶
-
class
pyacq.dsp.
OverlapFiltfilt
(parent=None, **kargs)[source]¶ Node for filtering with forward-backward method (filtfilt) using second order (sos) coefficient and a sliding, overlapping window.
Because the signal is filtered piecewise, the result will differ slightly from the ideal case, in which the entire signal would be filtered over all time at once. To ensure accurate results, the chunksize and overlapsize parameters must be chosen carefully: a small chunksize will affect low frequencies, and a small overlapsize may result in transients at the border between chunks. We recommend comparing the output of this node to an ideal offline filter to ensure that the residuals are acceptably small.
The chunksize need to be fixed. For overlapsize there are 2 cases:
overlapsize < chunksize/2
: natural case; each chunk partially overlaps. The overlapping regions are on the ends of each chunk, whereas the central part of the chunk has no overlap.overlapsize>chunksize/2
: chunks are fully overlapping; there is no central part.
In the 2 cases, for each arrival of a new chunk at
[-chunksize:]
, the computed chunk at[-(chunksize+overlapsize):-overlapsize]
is released.The
coefficients.shape
must be (nb_section, 6).If pyopencl is avaible you can use
SosFilter.configure(engine='opencl')
. In that case the coefficients.shape can also be (nb_channel, nb_section, 6) this helps for having different filters on each channel.The opencl engine inernally requires data to be in (channel, sample) order. If the input data does not have this order, then it must be copied and performance will be affected.
-
class
pyacq.dsp.
SosFilter
(parent=None, **kargs)[source]¶ Node for filtering multi channel signals. This uses a second order filter, it is a casde of IIR filter of order 2.
Example:
dev = NumpyDeviceBuffer() dev.configure(...) dev.output.configure(...) dev.initialize(...) f1, f2 = 40., 60. coefficients = scipy.signal.iirfilter(7, [f1/sample_rate*2, f2/sample_rate*2], btype='bandpass', ftype='butter', output='sos') filter = SosFilter() filter.configure(coefficients=coefficients) filter.input.connect(dev.output) filter.output.configure(...) filter.initialize()
The
coefficients.shape
must be (nb_section, 6).If pyopencl is avaible you can use
SosFilter.configure(engine='opencl')
. In that case the coefficients.shape can also be (nb_channel, nb_section, 6) this helps for having different filters on each channel.The opencl engine inernally requires data to be in (channel, sample) order. If the input data does not have this order, then it must be copied and performance will be affected.
Examples¶
Contents:
Local and remote Nodes¶
"""
Local and remote Nodes
This example demonstrates the use of Node instances both in the local process
and in a remote process. In either case, the way we interact with the Node is
essentially the same.
"""
from pyacq import create_manager, ImageViewer, WebCamAV
from pyqtgraph.Qt import QtCore, QtGui
import time
import pyqtgraph as pg
def dev_remote_viewer_local():
man = create_manager()
# this create the dev in a separate process (NodeGroup)
nodegroup = man.create_nodegroup()
dev = nodegroup.create_node('WebCamAV', name = 'cam0')
dev.configure(camera_num = 0)
dev.output.configure(protocol = 'tcp', interface = '127.0.0.1', transfermode = 'plaindata')
dev.initialize()
#view is a Node in local QApp
app = pg.mkQApp()
viewer = ImageViewer()
viewer.configure()
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
dev.start()
viewer.start()
app.exec_()
def dev_local_viewer_local():
# no manager
# device + view is a Node in local QApp
# Nodes are controled directly
app = pg.mkQApp()
dev = WebCamAV()
dev.configure(camera_num = 0)
dev.output.configure(protocol = 'tcp', interface = '127.0.0.1', transfermode = 'plaindata')
dev.initialize()
viewer = ImageViewer()
viewer.configure()
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
dev.start()
viewer.start()
app.exec_()
def dev_remote_viewer_remote():
# no QApp all Nodes are remoted even the viewer.
# note that dev and viewer are in the same NodeGroup
# so they are in the same process
man = create_manager()
nodegroup = man.create_nodegroup()
dev = nodegroup.create_node('WebCamAV', name = 'cam0')
dev.configure(camera_num = 0)
dev.output.configure(protocol = 'tcp', interface = '127.0.0.1', transfermode = 'plaindata')
dev.initialize()
viewer = nodegroup.create_node('ImageViewer', name = 'viewer0')
viewer.configure()
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
dev.start()
viewer.start()
time.sleep(10.)
# uncomment one if this 3 lines and compare the process number
dev_remote_viewer_local()
#dev_local_viewer_local()
#dev_remote_viewer_remote()
Noise generator node¶
# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
"""
Noise generator node
Simple example of a custom Node class that generates a stream of random
values.
"""
import numpy as np
from pyacq.core import Node, register_node_type
from pyqtgraph.Qt import QtCore, QtGui
class NoiseGenerator(Node):
"""A simple example node that generates gaussian noise.
"""
_output_specs = {'signals': dict(streamtype='analogsignal', dtype='float32',
shape=(-1, 1), compression='')}
def __init__(self, **kargs):
Node.__init__(self, **kargs)
self.timer = QtCore.QTimer(singleShot=False)
self.timer.timeout.connect(self.send_data)
def _configure(self, chunksize=100, sample_rate=1000.):
self.chunksize = chunksize
self.sample_rate = sample_rate
self.output.spec['shape'] = (-1, 1)
self.output.spec['sample_rate'] = sample_rate
self.output.spec['buffer_size'] = 1000
def _initialize(self):
self.head = 0
def _start(self):
self.timer.start(int(1000 * self.chunksize / self.sample_rate))
def _stop(self):
self.timer.stop()
def _close(self):
pass
def send_data(self):
self.head += self.chunksize
self.output.send(np.random.normal(size=(self.chunksize, 1)).astype('float32'), index=self.head)
# Not necessary for this example, but registering the node class would make it
# easier for us to instantiate this type of node in a remote process via
# Manager.create_node()
register_node_type(NoiseGenerator)
if __name__ == '__main__':
from pyacq.viewers import QOscilloscope
app = QtGui.QApplication([])
# Create a noise generator node
ng = NoiseGenerator()
ng.configure()
ng.output.configure(protocol='inproc', transfermode='sharedmem')
ng.initialize()
# Create an oscilloscope node to view the noise stream
osc = QOscilloscope()
osc.configure(with_user_dialog=True)
osc.input.connect(ng.output)
osc.initialize()
osc.show()
# start both nodes
osc.start()
ng.start()
Simple webcam viewer¶
"""
Simple webcam viewer
Streams video frames from a WebCamAV Node to an ImageViewer Node.
"""
from pyacq import create_manager, ImageViewer
from pyqtgraph.Qt import QtCore, QtGui
man = create_manager()
# this create the dev in a separate process (NodeGroup)
nodegroup = man.create_nodegroup()
dev = nodegroup.create_node('WebCamAV', name = 'cam0')
dev.configure(camera_num = 0)
dev.output.configure(protocol = 'tcp', interface = '127.0.0.1', transfermode = 'plaindata')
dev.initialize()
#view is a Node in local QApp
app = QtGui.QApplication([])
viewer = ImageViewer()
viewer.configure()
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
dev.start()
viewer.start()
app.exec_()
PyAudio oscilloscope (remote)¶
"""
PyAudio oscilloscope (remote)
Simple demonstration of streaming data from a PyAudio device to a QOscilloscope
viewer.
Both device and viewer nodes are created locally without a manager.
"""
import pyqtgraph as pg
from pyacq.viewers import QOscilloscope
import pyacq.core.rpc as rpc
app = pg.mkQApp()
# Create PyAudio device node in remote process
dev_proc = rpc.ProcessSpawner()
dev = dev_proc.client._import('pyacq.devices.audio_pyaudio').PyAudio()
# Print a list of available input devices (but ultimately we will just use the
# default device).
default_input = dev.default_input_device()
print("\nAvaliable devices:")
for device in dev.list_device_specs():
index = device['index']
star = "*" if index == default_input else " "
print(" %s %d: %s" % (star, index, device['name']))
# Configure PyAudio device with a single (default) input channel.
dev.configure(nb_channel=1, sample_rate=44100., input_device_index=default_input,
format='int16', chunksize=1024)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
dev.initialize()
# Create an oscilloscope to display data.
viewer = QOscilloscope()
viewer.configure(with_user_dialog = True)
# Connect audio stream to oscilloscope
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
viewer.params['decimation_method'] = 'min_max'
viewer.by_channel_params['Signal0', 'gain'] = 0.001
# Start both nodes
dev.start()
viewer.start()
app.exec_()
PyAudio wavelet spectrogram¶
"""
PyAudio wavelet spectrogram
Streams audio data to a QTimeFreq Node, which displays a frequency spectrogram
from a Morlet continuous wavelet transform.
"""
from pyacq.devices.audio_pyaudio import PyAudio
from pyacq.viewers import QTimeFreq
from pyacq.core import create_manager
import pyqtgraph as pg
# Start Qt application
app = pg.mkQApp()
# Create a manager to spawn worker process to record and process audio
man = create_manager()
ng = man.create_nodegroup()
# Create PyAudio device node in remote process
dev = ng.create_node('PyAudio')
# Configure PyAudio device with a single (default) input channel.
default_input = dev.default_input_device()
dev.configure(nb_channel=1, sample_rate=44100., input_device_index=default_input,
format='int16', chunksize=1024)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
dev.initialize()
# We are only recording a single audio channel, so we create one extra
# nodegroup for processing TFR. For multi-channel signals, create more
# nodegroups.
workers = [man.create_nodegroup()]
# Create a viewer in the local application, using the remote process for
# frequency analysis
viewer = QTimeFreq()
viewer.configure(with_user_dialog=True, nodegroup_friends=workers)
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
viewer.params['refresh_interval'] = 100
viewer.params['timefreq', 'f_start'] = 50
viewer.params['timefreq', 'f_stop'] = 5000
viewer.params['timefreq', 'deltafreq'] = 500
viewer.by_channel_params['Signal0', 'clim'] = 2500
# Start both nodes
dev.start()
viewer.start()
if __name__ == '__main__':
import sys
if sys.flags.interactive == 0:
app.exec_()
PyAudio triggered oscilloscope¶
"""
PyAudio triggered oscilloscope
Streams audio data to a QTriggeredOscilloscope Node, which displays the
incoming waveform time-aligned to the rising phase of the sound wave.
"""
from pyacq.devices.audio_pyaudio import PyAudio
from pyacq.viewers import QTriggeredOscilloscope
import pyqtgraph as pg
# Start Qt application
app = pg.mkQApp()
# Create PyAudio device node
dev = PyAudio()
# Print a list of available input devices (but ultimately we will just use the
# default device).
default_input = dev.default_input_device()
print("\nAvaliable devices:")
for device in dev.list_device_specs():
index = device['index']
star = "*" if index == default_input else " "
print(" %s %d: %s" % (star, index, device['name']))
# Configure PyAudio device with a single (default) input channel.
dev.configure(nb_channel=1, sample_rate=44100., input_device_index=default_input,
format='int16', chunksize=1024)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
dev.initialize()
# Create a triggered oscilloscope to display data.
viewer = QTriggeredOscilloscope()
viewer.configure(with_user_dialog = True)
# Connect audio stream to oscilloscope
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
#viewer.params['decimation_method'] = 'min_max'
#viewer.by_channel_params['Signal0', 'gain'] = 0.001
viewer.trigger.params['threshold'] = 1.
viewer.trigger.params['debounce_mode'] = 'after-stable'
viewer.trigger.params['front'] = '+'
viewer.trigger.params['debounce_time'] = 0.1
viewer.triggeraccumulator.params['stack_size'] = 3
viewer.triggeraccumulator.params['left_sweep'] = -.2
viewer.triggeraccumulator.params['right_sweep'] = .5
# Start both nodes
dev.start()
viewer.start()
if __name__ == '__main__':
import sys
if sys.flags.interactive == 0:
app.exec_()
PyAudio oscilloscope (local)¶
"""
PyAudio oscilloscope (local)
Simple demonstration of streaming data from a PyAudio device to a QOscilloscope
viewer.
Both device and viewer nodes are created locally without a manager.
"""
from pyacq.devices.audio_pyaudio import PyAudio
from pyacq.viewers import QOscilloscope
import pyqtgraph as pg
# Start Qt application
app = pg.mkQApp()
# Create PyAudio device node
dev = PyAudio()
# Print a list of available input devices (but ultimately we will just use the
# default device).
default_input = dev.default_input_device()
print("\nAvaliable devices:")
for device in dev.list_device_specs():
index = device['index']
star = "*" if index == default_input else " "
print(" %s %d: %s" % (star, index, device['name']))
# Configure PyAudio device with a single (default) input channel.
dev.configure(nb_channel=1, sample_rate=44100., input_device_index=default_input,
format='int16', chunksize=1024)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
dev.initialize()
# Create an oscilloscope to display data.
viewer = QOscilloscope()
viewer.configure(with_user_dialog = True)
# Connect audio stream to oscilloscope
viewer.input.connect(dev.output)
viewer.initialize()
viewer.show()
viewer.params['decimation_method'] = 'min_max'
viewer.by_channel_params['Signal0', 'gain'] = 0.001
# Start both nodes
dev.start()
viewer.start()
if __name__ == '__main__':
import sys
if sys.flags.interactive == 0:
app.exec_()
Stream monitor¶
"""
Stream monitor
A simple node that monitors activity on an input stream and prints details about packets
received.
"""
import numpy as np
from pyqtgraph.Qt import QtCore, QtGui
from pyacq.core import Node, register_node_type
from pyacq.core.tools import ThreadPollInput
class StreamMonitor(Node):
"""
Monitors activity on an input stream and prints details about packets
received.
"""
_input_specs = {'signals': {}}
def __init__(self, **kargs):
Node.__init__(self, **kargs)
def _configure(self):
pass
def _initialize(self):
# There are many ways to poll for data from the input stream. In this
# case, we will use a background thread to monitor the stream and emit
# a Qt signal whenever data is available.
self.poller = ThreadPollInput(self.input, return_data=True)
self.poller.new_data.connect(self.data_received)
def _start(self):
self.poller.start()
def data_received(self, ptr, data):
print("Data received: %d %s %s" % (ptr, data.shape, data.dtype))
# Not necessary for this example, but registering the node class would make it
# easier for us to instantiate this type of node in a remote process via
# Manager.create_node()
register_node_type(StreamMonitor)
if __name__ == '__main__':
from pyacq.devices import NumpyDeviceBuffer
app = QtGui.QApplication([])
# Create a data source. This will continuously emit chunks from a numpy
# array.
data = np.random.randn(2500, 7).astype('float64')
dev = NumpyDeviceBuffer()
# Configure the source such that it emits 50-element chunks twice per second.
dev.configure(nb_channel=7, sample_interval=0.01, chunksize=50, buffer=data)
dev.output.configure(protocol='tcp', interface='127.0.0.1', transfermode='plaindata')
dev.initialize()
# Create a monitor node
mon = StreamMonitor()
mon.configure()
mon.input.connect(dev.output)
mon.initialize()
# start both nodes
mon.start()
dev.start()
Custom RPC client¶
"""
Custom RPC client
Demonstrate the most simple use of zmq and json to create a client that
connects to an RPCServer. This provides a basic template for connecting
to pyacq from non-Python platforms.
One important note before we start: pyacq's remote API is not actually different
from its internal Python API. Any function you can call from within Python
can also be invoked remotely by RPC calls. The example below deals entirely
with pyacq's RPC protocol--how translate between the Python API and the raw
packets handled by zeroMQ.
"""
# First we will start a manager in a subprocess to test our client against
from pyacq.core import create_manager
manager = create_manager('rpc')
address = manager._rpc_addr
# --- From here on, we don't use any pyacq code ---
import json, zmq
# Here's how we connect to a new server (we will likely want to connect to
# multiple servers)
def create_socket(address, name):
"""Return a ZeroMQ socket connected to an RPC server.
Parameters
----------
address : str
The zmq interface where the server is listening (e.g.
'tcp://127.0.0.1:5678')
name : str
A unique name identifying the client.
"""
if isinstance(name, str):
name = name.encode()
socket = zmq.Context.instance().socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, name)
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 sec timeout
# Connect the socket to the server
if isinstance(address, str):
address = address.encode()
socket.connect(address)
# Ping the server until it responds to make sure we are connected.
ping(socket)
print("\nConnected to server @ %s" % address)
return socket
# Here's how we have to format all request messages that we send to RPC servers
next_req_id = 0
def send(socket, action, opts=None, request_response=True, return_type='auto'):
"""Send a request to an RPC server.
Parameters
----------
socket : zmq socket
The ZeroMQ socket that is connected to the server.
action : str
Name of action server should perform. See :func:`RPCClient.send()` for
a list of actions and their associated options.
opts : dict or None
An optional dict of options specifying the behavior of the action.
request_response : bool
If True, then the server is asked to send a response.
return_type : str
'proxy' to force the server to send return values by proxy, or 'auto'
to allow the server to decide whether to return by proxy or by value.
"""
global next_req_id
# If we want the server to send a response, then we must supply a unique ID
# for the request. Otherwise, send -1 as the request ID to indicate that
# the server should not send a reply.
if request_response:
req_id = next_req_id
next_req_id += 1
else:
req_id = -1
# Serialize opts if it was specified, otherwise send an empty string.
if opts is None:
opts_str = b''
else:
opts_str = json.dumps(opts).encode()
# Tell the server which serializer we are using
ser_type = b'json'
# Send the request as a multipart message
msg = [str(req_id).encode(), action.encode(), return_type.encode(), ser_type, opts_str]
socket.send_multipart(msg)
# Print so we can see what the final json-encoded message looks like
msg = '\n'.join([' ' + m.decode() for m in msg])
print("\n>>> send to %s:\n%s" % (socket.last_endpoint.decode(), msg))
# Return the request ID we can use to listen for a response later.
return req_id
# ..And here is how we receive responses from the server.
def recv(socket):
# Wait for a response or a timeout.
try:
msg = socket.recv().decode()
except zmq.error.Again:
raise TimeoutError('Timed out while waiting for server response.')
# Print so we can see what the json-encoded message looks like
print("\n<<< recv from %s:\n %s" % (socket.last_endpoint.decode(), msg))
# Unserialize the response
msg = json.loads(msg)
# Check for error
if msg.get('error', None) is not None:
traceback = ''.join(msg['error'][1])
raise RuntimeError("Exception in remote process:\n%s" % traceback)
# NOTE: msg also contains the key 'req_id', which should be used to verify
# that the message received really does correspond to a particular request.
# We're skipping that here for simplicity.
return msg['rval']
def get_attr(socket, obj, attr_name):
"""Return an attribute of an object owned by a remote server.
Parameters
----------
socket : zmq socket
A socket that is connected to the remote server.
obj : dict
A dict that identifies the object owned by the server.
attr_name : str
The name of the attribute to return.
"""
attr = obj.copy()
attr['attributes'] = (attr_name,)
send(socket, action='get_obj', opts={'obj': attr})
return recv(socket)
def call_method(socket, obj, method_name, *args, **kwds):
"""Request that a remote server call a method on an object.
Parameters
----------
socket : zmq socket
A socket that is connected to the remote server.
obj : dict
A dict that identifies the object owned by the server. This should have
been returned by a previous request to the server.
method_name : str
The name of the method to call.
args,kwargs :
All further arguments are passed to the remote method call.
"""
# modify object reference to point to its method instead.
# (this is faster than using get_attr as defined above)
func = obj.copy()
func['attributes'] = (method_name,)
send(socket, action='call_obj', opts={'obj': func, 'args': args, 'kwargs': kwds})
return recv(socket)
def ping(socket):
"""Ping a server until it responds.
This can be called to check that a functional connection to a server exists
before making any other requests.
"""
for i in range(3):
req_id = send(socket, action='ping')
try:
resp = recv(socket)
assert resp == 'pong'
break
except TimeoutError:
pass
if i == 2:
raise RuntimeError("Did not receive any response from server at %s!"
% socket.last_endpoint)
# Create a zmq socket with a unique name
socket = create_socket(address, 'my_custom_client')
# Request a reference to the manager
send(socket, action='get_item', opts={'name': 'manager'})
manager = recv(socket)
# Ask the manager to create a nodegroup
nodegroup = call_method(socket, manager, 'create_nodegroup', name='my_nodegroup')
# Request from the manager a list of all available nodegroups
ng_list = call_method(socket, manager, 'list_nodegroups')
assert ng_list[0] == nodegroup
# Connect to the newly spawned nodegroup and ask it a question
ng_socket = create_socket(nodegroup['rpc_addr'], 'my_nodegroup_socket')
node_types = call_method(ng_socket, nodegroup, 'list_node_types')
print("\nAvailable node types: %s" % node_types)