RPCClient and RPCServer

These classes implement the low-level communication between a server and client. They are rarely used directly by the user except occasionally for initial setup and closing.

class pyacq.core.rpc.RPCClient(address, reentrant=True, serializer='msgpack')[source]

Connection to an RPCServer.

Each RPCClient connects to only one server, and may be used from only one thread. RPCClient instances are created automatically either through ProcessSpawner or by requesting attributes form an ObjectProxy. In general, it is not necessary for the user to interact directly with RPCClient.

Parameters:
address : URL

Address of RPC server to connect to.

reentrant : bool

If True, then this client will allow the server running in the same thread (if any) to process requests whenever the client is waiting for a response. This is necessary to avoid deadlocks in case of reentrant RPC requests (eg, server A calls server B, which then calls server A again). Default is True.

call_obj(obj, args=None, kwargs=None, **kwds)[source]

Invoke a remote callable object.

Parameters:
obj : ObjectProxy

A proxy that references an object owned by the connected RPCServer.

args : tuple

Arguments to pass to the remote call.

kwargs : dict

Keyword arguments to pass to the remote call.

kwds :

All extra keyword arguments are passed to send().

close()[source]

Close this client’s socket (but leave the server running).

close_server(sync='sync', timeout=1.0, **kwds)[source]

Ask the server to close.

The server returns True if it has closed. All clients known to the server will be informed that the server has disconnected.

If the server has already disconnected from this client, then the method returns True without error.

delete(obj, **kwds)[source]

Delete an object proxy.

This informs the remote process that an ObjectProxy is no longer needed. The remote process will decrement a reference counter and delete the referenced object if it is no longer held by any proxies.

Parameters:
obj : ObjectProxy

A proxy that references an object owned by the connected RPCServer.

kwds :

All extra keyword arguments are passed to send().

Notes

After a proxy is deleted, it cannot be used to access the remote object even if the server has not released the remote object yet. This also applies to proxies that are sent to a third process. For example, consider three processes A, B, C: first A acquires a proxy to an object owned by B. A sends the proxy to C, and then deletes the proxy. If C attempts to access this proxy, an exception will be raised because B has already remoted the reference held by this proxy. However, if C independently acquires a proxy to the same object owned by B, then that proxy will continue to function even after A deletes its proxy.

disconnected()[source]

Boolean indicating whether the server has disconnected from the client.

ensure_connection(timeout=1.0)[source]

Make sure RPC server is connected and available.

static get_client(address)[source]

Return the RPC client for this thread and a given server address.

If no client exists already, then a new one will be created. If the server is running in the current thread, then return None.

get_obj(obj, **kwds)[source]

Return a copy of a remote object.

Parameters:
obj : ObjectProxy

A proxy that references an object owned by the connected RPCServer. The object will be serialized and returned if possible, otherwise a new proxy is returned.

kwds :

All extra keyword arguments are passed to send().

measure_clock_diff()[source]

Measure the clock offset between this host and the remote host.

ping(sync='sync', **kwds)[source]

Ping the server.

This can be used to test connectivity to the server.

process_msg(msg)[source]

Handle one message received from the remote process.

This takes care of assigning return values or exceptions to existing Future instances.

process_until_future(future, timeout=None)[source]

Process all incoming messages until receiving a result for future.

If the future result is not raised before the timeout, then raise TimeoutError.

While waiting, the RPCServer for this thread (if any) is also allowed to process requests.

Parameters:
future : concurrent.Future instance

The Future to wait for. When the response for this Future arrives from the server, the method returns.

timeout : float

Maximum time (seconds) to wait for a response.

send(action, opts=None, return_type='auto', sync='sync', timeout=10.0)[source]

Send a request to the remote process.

It is not necessary to call this method directly; instead use call_obj(), get_obj(), __getitem__(), __setitem__(), transfer(), delete(), import(), or ping().

The request is given a unique ID that is included in the response from the server (if any).

Parameters:
action : str

The action to invoke on the remote process. See list of actions below.

opts : None or dict

Extra options to be sent with the request. Each action requires a different set of options. See list of actions below.

return_type : ‘auto’ | ‘proxy’

If ‘proxy’, then the return value is sent by proxy. If ‘auto’, then the server decides based on the return type whether to send a proxy.

sync : str

If ‘sync’, then block and return the result when it becomes available. If ‘async’, then return a Future instance immediately. If ‘off’, then ask the remote server NOT to send a response and return None immediately.

timeout : float

The amount of time to wait for a response when in synchronous operation (sync=’sync’). If the timeout elapses before a response is received, then raise TimeoutError.

Notes

The following table lists the actions that are recognized by RPCServer. The action argument to send() may be any string from the Action column below, and the opts argument must be a dict with the keys listed in the Options column.

Action Description Options
call_obj Invoke a callable
obj: a proxy to the callable object
args: a tuple of positional arguments
kwargs: a dict of keyword arguments
get_obj Return the object referenced by a proxy
obj: a proxy to the object to return
get_item Return a named object
name: string name of the object to return
set_item Set a named object
name: string name to set
value: object to assign to name
delete Delete a proxy reference
obj_id: proxy object ID
ref_id: proxy reference ID
import Import and return a proxy to a module
module: name of module to import
ping Return ‘pong’

transfer(obj, **kwds)[source]

Send an object to the remote process and return a proxy to it.

Parameters:
obj : object

Any object to send to the remote process. If the object is not serializable then a proxy will be sent if possible.

kwds :

All extra keyword arguments are passed to send().

class pyacq.core.rpc.RPCServer(address='tcp://127.0.0.1:*')[source]

Remote procedure call server for invoking requests on proxied objects.

RPCServer instances are automatically created when using ProcessSpawner. It is rarely necessary for the user to interact directly with RPCServer.

There may be at most one RPCServer per thread. RPCServers can be run in a few different modes:

  • Exclusive event loop: call run_forever() to cause the server to listen indefinitely for incoming request messages.
  • Lazy event loop: call run_lazy() to register the server with the current thread. The server’s socket will be polled whenever an RPCClient is waiting for a response (this allows reentrant function calls). You can also manually listen for requests with _read_and_process_one() in this mode.
  • Qt event loop: use QtRPCServer. In this mode, messages are polled in a separate thread, but then sent to the Qt event loop by signal and processed there. The server is registered as running in the Qt thread.
Parameters:
name : str

Name used to identify this server.

address : URL

Address for RPC server to bind to. Default is 'tcp://127.0.0.1:*'.

Note: binding RPCServer to a public IP address is a potential security hazard.

Notes

RPCServer is not a secure server. It is intended to be used only on trusted networks; anyone with tcp access to the server can execute arbitrary code on the server.

RPCServer is not a thread-safe class. Only use RPCClient to communicate with RPCServer from other threads.

Examples

# In host/process/thread 1:
server = RPCServer()
rpc_addr = server.address

# Publish an object for others to access easily
server['object_name'] = MyClass()

# In host/process/thread 2: (you must communicate rpc_addr manually)
client = RPCClient(rpc_addr)

# Get a proxy to published object; use this (almost) exactly as you
# would a local object:
remote_obj = client['object_name']
remote_obj.method(...)

# Or, you can remotely import and operate a module:
remote_module = client._import("my.module.name")
remote_obj = remote_module.MyClass()
remote_obj.method(...)

# See ObjectProxy for more information on interacting with remote
# objects, including (a)synchronous communication.
address = None

The zmq address where this server is listening (e.g. ‘tcp:///127.0.0.1:5678’)

close()[source]

Ask the server to close.

This method is thread-safe.

get_proxy(obj, **kwds)[source]

Return an ObjectProxy referring to a local object.

This proxy can be sent via RPC to any other node.

static get_server()[source]

Return the server running in this thread, or None if there is no server.

static local_client()[source]

Return the RPCClient used for accessing the server running in the current thread.

process_action(action, opts, return_type, caller)[source]

Invoke a single action and return the result.

static register_server(srv)[source]

Register a server as the (only) server running in this thread.

This static method fails if another server is already registered for this thread.

run_forever()[source]

Read and process RPC requests until the server is asked to close.

run_lazy()[source]

Register this server as being active for the current thread, but do not actually begin processing requests.

RPCClients in the same thread will allow the server to process requests while they are waiting for responses. This can prevent deadlocks that occur when

This can also be used to allow the user to manually process requests.

running()[source]

Boolean indicating whether the server is still running.

start_timer(callback, interval, **kwds)[source]

Start a timer that invokes callback at regular intervals.

Parameters:
callback : callable

Callable object to invoke. This must either be an ObjectProxy or an object that is safe to call from the server’s thread.

interval : float

Minimum time to wait between callback invocations (start to start).

static unregister_server(srv)[source]

Unregister a server from this thread.

unwrap_proxy(proxy)[source]

Return the local python object referenced by proxy.

class pyacq.core.rpc.QtRPCServer(address='tcp://127.0.0.1:*', quit_on_close=True)[source]

RPCServer that lives in a Qt GUI thread.

This server may be used to create and manage QObjects, QWidgets, etc. It uses a separate thread to poll for RPC requests, which are then sent to the Qt event loop using by signal. This allows the RPC actions to be executed in a Qt GUI thread without using a timer to poll the RPC socket. Responses are sent back to the poller thread by a secondary socket.

QtRPCServer may be started in newly spawned processes using ProcessSpawner.

Parameters:
address : str

ZMQ address to listen on. Default is 'tcp://127.0.0.1:*'.

Note: binding RPCServer to a public IP address is a potential security hazard. See RPCServer.

quit_on_close : bool

If True, then call QApplication.quit() when the server is closed.

Examples

Spawning in a new process:

# Create new process.
proc = ProcessSpawner(qt=True)

# Display a widget from the new process.
qtgui = proc._import('PyQt4.QtGui')
w = qtgui.QWidget()
w.show()

Starting in an existing Qt application:

# Create server.
server = QtRPCServer()

# Start listening for requests in a background thread (this call
# returns immediately).
server.run_forever()
process_action(action, opts, return_type, caller)[source]

Invoke a single action and return the result.

run_forever()[source]

Read and process RPC requests until the server is asked to close.