Source code for pyacq.core.rpc.client

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

import sys
import os
import time
import weakref
import socket
import concurrent.futures
import threading
import zmq
import logging
import numpy as np
from pyqtgraph.Qt import QtGui

from .serializer import all_serializers
from .proxy import ObjectProxy
from .server import RPCServer, QtRPCServer
from . import log

logger = logging.getLogger(__name__)

[docs]class RPCClient(object): """Connection to an :class:`RPCServer`. Each RPCClient connects to only one server, and may be used from only one thread. RPCClient instances are created automatically either through :class:`ProcessSpawner` or by requesting attributes form an :class:`ObjectProxy`. In general, it is not necessary for the user to interact directly with RPCClient. Parameters ---------- address : URL Address of RPC server to connect to. reentrant : bool If True, then this client will allow the server running in the same thread (if any) to process requests whenever the client is waiting for a response. This is necessary to avoid deadlocks in case of reentrant RPC requests (eg, server A calls server B, which then calls server A again). Default is True. """ clients_by_thread = {} # (thread_id, rpc_addr): client clients_by_thread_lock = threading.Lock()
[docs] @staticmethod def get_client(address): """Return the RPC client for this thread and a given server address. If no client exists already, then a new one will be created. If the server is running in the current thread, then return None. See also -------- RPCServer.address """ if isinstance(address, str): address = address.encode() key = (threading.current_thread().ident, address) # Return an existing client if there is one with RPCClient.clients_by_thread_lock: if key in RPCClient.clients_by_thread: return RPCClient.clients_by_thread[key] return RPCClient(address)
def __init__(self, address, reentrant=True, serializer='msgpack'): # pick a unique name: = ("%s.%s.%s:%s" % (log.get_host_name(), log.get_process_name(), log.get_thread_name(), address.decode())).encode() if sys.platform == 'win32' and '' in str(address): logger.warn("RPC server address is likely to cause trouble on windows: %r" % address) self.address = address key = (threading.current_thread().ident, address) with RPCClient.clients_by_thread_lock: if key in RPCClient.clients_by_thread: raise KeyError("An RPCClient instance already exists for this address." " Use RPCClient.get_client(address) instead.") # DEALER is fully asynchronous--we can send or receive at any time, and # unlike ROUTER, it only connects to a single endpoint. self._socket = zmq.Context.instance().socket(zmq.DEALER) self._sock_name = self._socket.setsockopt(zmq.IDENTITY, self._sock_name) # socket will continue attempting to deliver messages up to 1 sec after # it has closed. (default is -1, which can cause processes to hang # on exit) self._socket.linger = 1000 # If this thread is running a server, then we need to allow the # server to process requests when the client is blocking. assert reentrant in (True, False) self._reentrant = reentrant self._poller = None"RPC connect to %s", address.decode()) self._socket.connect(address) self.next_request_id = 0 self.futures = weakref.WeakValueDictionary() with RPCClient.clients_by_thread_lock: RPCClient.clients_by_thread[key] = self # proxies generated by this client will be assigned these default options self.default_proxy_options = {} self.connect_established = False self.establishing_connect = False self._disconnected = False # For unserializing results returned from servers. This cannot be # used to send proxies of local objects unless there is also a server # for this thread.. try: self.serializer = all_serializers[serializer](client=self) except KeyError: raise ValueError("Unsupported serializer type '%s'" % serializer) self.ensure_connection() def _get_poller(self): # Return the poller that should be used to listen for incoming messages # This poller is responsible for ensuring that the RPC server in this # thread is able to process requests while we are blocked waiting # for responses from other servers. if not self._reentrant: return None if self._poller is None: server = RPCServer.get_server() if server is None: return None if isinstance(server, QtRPCServer): self._poller = 'qt' else: self._poller = zmq.Poller() self._poller.register(self._socket, zmq.POLLIN) self._poller.register(server._socket, zmq.POLLIN) return self._poller
[docs] def disconnected(self): """Boolean indicating whether the server has disconnected from the client. """ if self._disconnected: return True # check to see if we have received any new messages self._read_and_process_all() return self._disconnected
[docs] def send(self, action, opts=None, return_type='auto', sync='sync', timeout=10.0): """Send a request to the remote process. It is not necessary to call this method directly; instead use :func:`call_obj`, :func:`get_obj`, :func:`__getitem__`, :func:`__setitem__`, :func:`transfer`, :func:`delete`, :func:`import`, or :func:`ping`. The request is given a unique ID that is included in the response from the server (if any). Parameters ---------- action : str The action to invoke on the remote process. See list of actions below. opts : None or dict Extra options to be sent with the request. Each action requires a different set of options. See list of actions below. return_type : 'auto' | 'proxy' If 'proxy', then the return value is sent by proxy. If 'auto', then the server decides based on the return type whether to send a proxy. sync : str If 'sync', then block and return the result when it becomes available. If 'async', then return a Future instance immediately. If 'off', then ask the remote server NOT to send a response and return None immediately. timeout : float The amount of time to wait for a response when in synchronous operation (sync='sync'). If the timeout elapses before a response is received, then raise TimeoutError. Notes ----- The following table lists the actions that are recognized by RPCServer. The *action* argument to `send()` may be any string from the *Action* column below, and the *opts* argument must be a dict with the keys listed in the *Options* column. ======== ======================================= ========================================== Action Description Options -------- --------------------------------------- ------------------------------------------ call_obj Invoke a callable | obj: a proxy to the callable object | args: a tuple of positional arguments | kwargs: a dict of keyword arguments get_obj Return the object referenced by a proxy | obj: a proxy to the object to return get_item Return a named object | name: string name of the object to return set_item Set a named object | name: string name to set | value: object to assign to name delete Delete a proxy reference | obj_id: proxy object ID | ref_id: proxy reference ID import Import and return a proxy to a module | module: name of module to import ping Return 'pong' | ======== ======================================= ========================================== """ # This is nice, but very expensive! #if self.disconnected(): if self._disconnected: raise RuntimeError("Cannot send request; server has already disconnected.") if sync == 'off': req_id = -1 else: req_id = self.next_request_id self.next_request_id += 1"RPC request '%s' to %s [req_id=%s]", action, self.address.decode(), req_id) logger.debug(" => sync=%s return=%s opts=%s", sync, return_type, opts) if opts is None: opts_str = b'' else: opts_str = self.serializer.dumps(opts) ser_type = self.serializer.type.encode() msg = [str(req_id).encode(), action.encode(), return_type.encode(), ser_type, opts_str] self._socket.send_multipart(msg) if sync == 'off': return fut = Future(self, req_id) if action == 'close': # for server closure we require a little special handling fut.add_done_callback(self._close_request_returned) self.futures[req_id] = fut if sync == 'async': return fut elif sync == 'sync': return fut.result(timeout=timeout) else: raise ValueError('Invalid sync value: %s' % sync)
[docs] def call_obj(self, obj, args=None, kwargs=None, **kwds): """Invoke a remote callable object. Parameters ---------- obj : :class:`ObjectProxy` A proxy that references an object owned by the connected RPCServer. args : tuple Arguments to pass to the remote call. kwargs : dict Keyword arguments to pass to the remote call. kwds : All extra keyword arguments are passed to :func:`send() <RPCClient.send>`. """ opts = {'obj': obj, 'args': args, 'kwargs': kwargs} return self.send('call_obj', opts=opts, **kwds)
[docs] def get_obj(self, obj, **kwds): """Return a copy of a remote object. Parameters ---------- obj : :class:`ObjectProxy` A proxy that references an object owned by the connected RPCServer. The object will be serialized and returned if possible, otherwise a new proxy is returned. kwds : All extra keyword arguments are passed to :func:`send() <RPCClient.send>`. """ return self.send('get_obj', opts={'obj': obj}, **kwds)
[docs] def transfer(self, obj, **kwds): """Send an object to the remote process and return a proxy to it. Parameters ---------- obj : object Any object to send to the remote process. If the object is not serializable then a proxy will be sent if possible. kwds : All extra keyword arguments are passed to :func:`send() <RPCClient.send>`. """ kwds['return_type'] = 'proxy' return self.send('get_obj', opts={'obj': obj}, **kwds)
def _import(self, module, **kwds): """Import a module in the remote process and return a proxy to it. Parameters ---------- module : str The name of the module to import. kwds : All extra keyword arguments are passed to :func:`send() <RPCClient.send>`. """ return self.send('import', opts={'module': module}, **kwds)
[docs] def delete(self, obj, **kwds): """Delete an object proxy. This informs the remote process that an :class:`ObjectProxy` is no longer needed. The remote process will decrement a reference counter and delete the referenced object if it is no longer held by any proxies. Parameters ---------- obj : :class:`ObjectProxy` A proxy that references an object owned by the connected RPCServer. kwds : All extra keyword arguments are passed to :func:`send() <RPCClient.send>`. Notes ----- After a proxy is deleted, it cannot be used to access the remote object even if the server has not released the remote object yet. This also applies to proxies that are sent to a third process. For example, consider three processes A, B, C: first A acquires a proxy to an object owned by B. A sends the proxy to C, and then deletes the proxy. If C attempts to access this proxy, an exception will be raised because B has already remoted the reference held by this proxy. However, if C independently acquires a proxy to the same object owned by B, then that proxy will continue to function even after A deletes its proxy. """ assert obj._rpc_addr == self.address return self.send('delete', opts={'obj_id': obj._obj_id, 'ref_id': obj._ref_id}, **kwds)
def __getitem__(self, name): """Return a named item published by the remote server. This provides a sort of "global namespace" for clients to access objects that are explicitly published using either :func:`RPCServer.__setitem__` or :func:`RPCClient.__setitem__`. """ return self.send('get_item', opts={'name': name}, sync='sync') def __setitem__(self, name, obj): """Publish an object as a named item on the server. The item can be retrieved by the remote process using :func:`RPCServer.__getitem__`, or by any client connected to the remote server using :func:`RPCClient.__getitem__`. """ # We could make this sync='off', but probably it's safer to block until # the transaction is complete. return self.send('set_item', opts={'name': name, 'obj': obj}, sync='sync')
[docs] def ensure_connection(self, timeout=1.0): """Make sure RPC server is connected and available. """ if self.establishing_connect: return self.establishing_connect = True try: start = time.time() while time.time() < start + timeout: fut ='async') try: result = fut.result(timeout=0.1) self.connect_established = True return except TimeoutError: continue raise TimeoutError("Could not establish connection with RPC server.") finally: self.establishing_connect = False
[docs] def process_until_future(self, future, timeout=None): """Process all incoming messages until receiving a result for *future*. If the future result is not raised before the timeout, then raise TimeoutError. While waiting, the RPCServer for this thread (if any) is also allowed to process requests. Parameters ---------- future : concurrent.Future instance The Future to wait for. When the response for this Future arrives from the server, the method returns. timeout : float Maximum time (seconds) to wait for a response. """ start = time.perf_counter() while not future.done(): # wait patiently with blocking calls. if timeout is None: itimeout = None else: dt = time.perf_counter() - start itimeout = timeout - dt if itimeout < 0: raise TimeoutError("Timeout waiting for Future result.") poller = self._get_poller() if poller is None: self._read_and_process_one(itimeout) elif poller == 'qt': # Server runs in Qt thread; we need to time-share with Qt event # loop. QtGui.QApplication.processEvents() try: self._read_and_process_one(timeout=0.05) except TimeoutError: pass else: # Poll for input on both the client's socket and the server's # socket. This is necessary to avoid deadlocks. socks = [x[0] for x in poller.poll(itimeout)] if self._socket in socks: self._read_and_process_one(timeout=0) elif len(socks) > 0: server = RPCServer.get_server() if server is None: # this can happen after server has unregistered itself # at exit continue server._read_and_process_one()
def _read_and_process_one(self, timeout): """Read a single message from the remote server and process it by calling :func:`process_msg()`. Parameters ---------- timeout : float Maximum time (seconds) to wait for a message. """ # timeout is in seconds; convert to ms # use timeout=None to block indefinitely if timeout is None: timeout = -1 else: timeout = int(timeout * 1000) try: # NOTE: docs say timeout can only be set before bind, but this # seems to work for now. self._socket.setsockopt(zmq.RCVTIMEO, timeout) msg = self._socket.recv() msg = self.serializer.loads(msg) except zmq.error.Again: raise TimeoutError("Timeout waiting for Future result.") self.process_msg(msg) def _read_and_process_all(self): # process all messages until none are immediately available. try: while True: self._read_and_process_one(timeout=0) except TimeoutError: return
[docs] def process_msg(self, msg): """Handle one message received from the remote process. This takes care of assigning return values or exceptions to existing Future instances. """ logger.debug("RPC recv result from %s [req_id=%s]", self.address.decode(), msg.get('req_id', None)) logger.debug(" => %s" % msg) if msg['action'] == 'return': req_id = msg['req_id'] fut = self.futures.pop(req_id, None) if fut is None: return if msg['error'] is not None: exc = RemoteCallException(*msg['error']) fut.set_exception(exc) else: fut.set_result(msg['rval']) elif msg['action'] == 'disconnect': self._server_disconnected() else: raise ValueError("Invalid action '%s'" % msg['action'])
def _close_request_returned(self, fut): try: if fut.result() is True: # We requested a server closure and the server complied; now # handle the disconnect. self._server_disconnected() except RuntimeError: # might have already disconnected before this request finished. if self.disconnected(): pass raise def _server_disconnected(self): # server has disconnected; inform all pending futures. # This method can be called two different ways: # * this client requested that the server close and it returned True # * another client requested that the server close and this client # received a preemptive disconnect message from the server. self._disconnected = True logger.debug("Received server disconnect from %s", self.address) exc = RuntimeError("Cannot send request; server has already disconnected.") for fut in self.futures.values(): fut.set_exception(exc) self.futures.clear()
[docs] def ping(self, sync='sync', **kwds): """Ping the server. This can be used to test connectivity to the server. """ return self.send('ping', sync=sync, **kwds)
[docs] def close(self): """Close this client's socket (but leave the server running). """ # reference management is disabled for now.. #self.send('release_all', return_type=None) self._socket.close()
[docs] def close_server(self, sync='sync', timeout=1.0, **kwds): """Ask the server to close. The server returns True if it has closed. All clients known to the server will be informed that the server has disconnected. If the server has already disconnected from this client, then the method returns True without error. """ if self.disconnected(): return True return self.send('close', sync=sync, timeout=timeout, **kwds)
[docs] def measure_clock_diff(self): """Measure the clock offset between this host and the remote host. """ rcounter = self._import('time').perf_counter ltimes = [] rtimes = [] for i in range(10): ltimes.append(time.perf_counter()) rtimes.append(rcounter()) ltimes = np.array(ltimes) rtimes = np.array(rtimes[:-1]) dif = rtimes - ((ltimes[1:] + ltimes[:-1]) * 0.5) # we can probably constrain this estimate a bit more by looking at # min/max times and excluding outliers. return dif.mean()
def __del__(self): if hasattr(self, 'socket'): self.close()
class RemoteCallException(Exception): def __init__(self, type_str, tb_str): self.type_str = type_str self.tb_str = tb_str def __str__(self): msg = '\n===> Remote exception was:\n' + ''.join(self.tb_str) return msg
[docs]class Future(concurrent.futures.Future): """Represents a return value from a remote procedure call that has not yet arrived. Instances of Future are returned from :func:`ObjectProxy.__call__()` when used with ``_sync='async'``. This is the mechanism through which remote functions may be called asynchronously. Use :func:`done()` to determine whether the return value (or an error message) has arrived, and :func:`result()` to get the return value. If the remote call raised an exception, then calling :func:`result()` will raise RemoteCallException with a transcript of the original exception. See `concurrent.futures.Future` in the Python documentation for more information. """ def __init__(self, client, call_id): concurrent.futures.Future.__init__(self) self.client = client self.call_id = call_id
[docs] def cancel(self): return False
[docs] def result(self, timeout=None): """Return the result of this Future. If the result is not yet available, then this call will block until the result has arrived or the timeout elapses. """ self.client.process_until_future(self, timeout=timeout) return concurrent.futures.Future.result(self)