RPCClient and RPCServer¶
These classes implement the low-level communication between a server and client. They are rarely used directly by the user except occasionally for initial setup and closing.
-
class
pyacq.core.rpc.
RPCClient
(address, reentrant=True, serializer='msgpack')[source]¶ Connection to an
RPCServer
.Each RPCClient connects to only one server, and may be used from only one thread. RPCClient instances are created automatically either through
ProcessSpawner
or by requesting attributes form anObjectProxy
. In general, it is not necessary for the user to interact directly with RPCClient.Parameters: - address : URL
Address of RPC server to connect to.
- reentrant : bool
If True, then this client will allow the server running in the same thread (if any) to process requests whenever the client is waiting for a response. This is necessary to avoid deadlocks in case of reentrant RPC requests (eg, server A calls server B, which then calls server A again). Default is True.
-
call_obj
(obj, args=None, kwargs=None, **kwds)[source]¶ Invoke a remote callable object.
Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer.
- args : tuple
Arguments to pass to the remote call.
- kwargs : dict
Keyword arguments to pass to the remote call.
- kwds :
All extra keyword arguments are passed to
send()
.
- obj :
-
close_server
(sync='sync', timeout=1.0, **kwds)[source]¶ Ask the server to close.
The server returns True if it has closed. All clients known to the server will be informed that the server has disconnected.
If the server has already disconnected from this client, then the method returns True without error.
-
delete
(obj, **kwds)[source]¶ Delete an object proxy.
This informs the remote process that an
ObjectProxy
is no longer needed. The remote process will decrement a reference counter and delete the referenced object if it is no longer held by any proxies.Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer.
- kwds :
All extra keyword arguments are passed to
send()
.
Notes
After a proxy is deleted, it cannot be used to access the remote object even if the server has not released the remote object yet. This also applies to proxies that are sent to a third process. For example, consider three processes A, B, C: first A acquires a proxy to an object owned by B. A sends the proxy to C, and then deletes the proxy. If C attempts to access this proxy, an exception will be raised because B has already remoted the reference held by this proxy. However, if C independently acquires a proxy to the same object owned by B, then that proxy will continue to function even after A deletes its proxy.
- obj :
-
static
get_client
(address)[source]¶ Return the RPC client for this thread and a given server address.
If no client exists already, then a new one will be created. If the server is running in the current thread, then return None.
See also
-
get_obj
(obj, **kwds)[source]¶ Return a copy of a remote object.
Parameters: - obj :
ObjectProxy
A proxy that references an object owned by the connected RPCServer. The object will be serialized and returned if possible, otherwise a new proxy is returned.
- kwds :
All extra keyword arguments are passed to
send()
.
- obj :
-
ping
(sync='sync', **kwds)[source]¶ Ping the server.
This can be used to test connectivity to the server.
-
process_msg
(msg)[source]¶ Handle one message received from the remote process.
This takes care of assigning return values or exceptions to existing Future instances.
-
process_until_future
(future, timeout=None)[source]¶ Process all incoming messages until receiving a result for future.
If the future result is not raised before the timeout, then raise TimeoutError.
While waiting, the RPCServer for this thread (if any) is also allowed to process requests.
Parameters: - future : concurrent.Future instance
The Future to wait for. When the response for this Future arrives from the server, the method returns.
- timeout : float
Maximum time (seconds) to wait for a response.
-
send
(action, opts=None, return_type='auto', sync='sync', timeout=10.0)[source]¶ Send a request to the remote process.
It is not necessary to call this method directly; instead use
call_obj()
,get_obj()
,__getitem__()
,__setitem__()
,transfer()
,delete()
,import()
, orping()
.The request is given a unique ID that is included in the response from the server (if any).
Parameters: - action : str
The action to invoke on the remote process. See list of actions below.
- opts : None or dict
Extra options to be sent with the request. Each action requires a different set of options. See list of actions below.
- return_type : ‘auto’ | ‘proxy’
If ‘proxy’, then the return value is sent by proxy. If ‘auto’, then the server decides based on the return type whether to send a proxy.
- sync : str
If ‘sync’, then block and return the result when it becomes available. If ‘async’, then return a Future instance immediately. If ‘off’, then ask the remote server NOT to send a response and return None immediately.
- timeout : float
The amount of time to wait for a response when in synchronous operation (sync=’sync’). If the timeout elapses before a response is received, then raise TimeoutError.
Notes
The following table lists the actions that are recognized by RPCServer. The action argument to send() may be any string from the Action column below, and the opts argument must be a dict with the keys listed in the Options column.
Action Description Options call_obj Invoke a callable obj: a proxy to the callable objectargs: a tuple of positional argumentskwargs: a dict of keyword argumentsget_obj Return the object referenced by a proxy obj: a proxy to the object to returnget_item Return a named object name: string name of the object to returnset_item Set a named object name: string name to setvalue: object to assign to namedelete Delete a proxy reference obj_id: proxy object IDref_id: proxy reference IDimport Import and return a proxy to a module module: name of module to importping Return ‘pong’
-
class
pyacq.core.rpc.
RPCServer
(address='tcp://127.0.0.1:*')[source]¶ Remote procedure call server for invoking requests on proxied objects.
RPCServer instances are automatically created when using
ProcessSpawner
. It is rarely necessary for the user to interact directly with RPCServer.There may be at most one RPCServer per thread. RPCServers can be run in a few different modes:
- Exclusive event loop: call run_forever() to cause the server to listen indefinitely for incoming request messages.
- Lazy event loop: call run_lazy() to register the server with the current thread. The server’s socket will be polled whenever an RPCClient is waiting for a response (this allows reentrant function calls). You can also manually listen for requests with _read_and_process_one() in this mode.
- Qt event loop: use
QtRPCServer
. In this mode, messages are polled in a separate thread, but then sent to the Qt event loop by signal and processed there. The server is registered as running in the Qt thread.
Parameters: - name : str
Name used to identify this server.
- address : URL
Address for RPC server to bind to. Default is
'tcp://127.0.0.1:*'
.Note: binding RPCServer to a public IP address is a potential security hazard.
Notes
RPCServer is not a secure server. It is intended to be used only on trusted networks; anyone with tcp access to the server can execute arbitrary code on the server.
RPCServer is not a thread-safe class. Only use
RPCClient
to communicate with RPCServer from other threads.Examples
# In host/process/thread 1: server = RPCServer() rpc_addr = server.address # Publish an object for others to access easily server['object_name'] = MyClass() # In host/process/thread 2: (you must communicate rpc_addr manually) client = RPCClient(rpc_addr) # Get a proxy to published object; use this (almost) exactly as you # would a local object: remote_obj = client['object_name'] remote_obj.method(...) # Or, you can remotely import and operate a module: remote_module = client._import("my.module.name") remote_obj = remote_module.MyClass() remote_obj.method(...) # See ObjectProxy for more information on interacting with remote # objects, including (a)synchronous communication.
-
address
= None¶ The zmq address where this server is listening (e.g. ‘tcp:///127.0.0.1:5678’)
-
get_proxy
(obj, **kwds)[source]¶ Return an ObjectProxy referring to a local object.
This proxy can be sent via RPC to any other node.
-
static
get_server
()[source]¶ Return the server running in this thread, or None if there is no server.
-
static
local_client
()[source]¶ Return the RPCClient used for accessing the server running in the current thread.
-
process_action
(action, opts, return_type, caller)[source]¶ Invoke a single action and return the result.
-
static
register_server
(srv)[source]¶ Register a server as the (only) server running in this thread.
This static method fails if another server is already registered for this thread.
-
run_lazy
()[source]¶ Register this server as being active for the current thread, but do not actually begin processing requests.
RPCClients in the same thread will allow the server to process requests while they are waiting for responses. This can prevent deadlocks that occur when
This can also be used to allow the user to manually process requests.
-
start_timer
(callback, interval, **kwds)[source]¶ Start a timer that invokes callback at regular intervals.
Parameters: - callback : callable
Callable object to invoke. This must either be an ObjectProxy or an object that is safe to call from the server’s thread.
- interval : float
Minimum time to wait between callback invocations (start to start).
-
class
pyacq.core.rpc.
QtRPCServer
(address='tcp://127.0.0.1:*', quit_on_close=True)[source]¶ RPCServer that lives in a Qt GUI thread.
This server may be used to create and manage QObjects, QWidgets, etc. It uses a separate thread to poll for RPC requests, which are then sent to the Qt event loop using by signal. This allows the RPC actions to be executed in a Qt GUI thread without using a timer to poll the RPC socket. Responses are sent back to the poller thread by a secondary socket.
QtRPCServer may be started in newly spawned processes using
ProcessSpawner
.Parameters: - address : str
ZMQ address to listen on. Default is
'tcp://127.0.0.1:*'
.Note: binding RPCServer to a public IP address is a potential security hazard. See
RPCServer
.- quit_on_close : bool
If True, then call QApplication.quit() when the server is closed.
Examples
Spawning in a new process:
# Create new process. proc = ProcessSpawner(qt=True) # Display a widget from the new process. qtgui = proc._import('PyQt4.QtGui') w = qtgui.QWidget() w.show()
Starting in an existing Qt application:
# Create server. server = QtRPCServer() # Start listening for requests in a background thread (this call # returns immediately). server.run_forever()