# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
import sys
import time
import os
import traceback
import socket
import threading
import builtins
import zmq
import logging
import numpy as np
import atexit
from pyqtgraph.Qt import QtCore, QtGui
from .serializer import all_serializers
from .proxy import ObjectProxy
from .timer import Timer
from . import log
logger = logging.getLogger(__name__)
[docs]class RPCServer(object):
"""Remote procedure call server for invoking requests on proxied objects.
RPCServer instances are automatically created when using :class:`ProcessSpawner`.
It is rarely necessary for the user to interact directly with RPCServer.
There may be at most one RPCServer per thread. RPCServers can be run in a
few different modes:
* **Exclusive event loop**: call `run_forever()` to cause the server to listen
indefinitely for incoming request messages.
* **Lazy event loop**: call `run_lazy()` to register the server with the current
thread. The server's socket will be polled whenever an RPCClient is waiting
for a response (this allows reentrant function calls). You can also manually
listen for requests with `_read_and_process_one()` in this mode.
* **Qt event loop**: use :class:`QtRPCServer`. In this mode, messages are polled in
a separate thread, but then sent to the Qt event loop by signal and
processed there. The server is registered as running in the Qt thread.
Parameters
----------
name : str
Name used to identify this server.
address : URL
Address for RPC server to bind to. Default is ``'tcp://127.0.0.1:*'``.
**Note:** binding RPCServer to a public IP address is a potential
security hazard.
Notes
-----
**RPCServer is not a secure server.** It is intended to be used only on trusted
networks; anyone with tcp access to the server can execute arbitrary code
on the server.
RPCServer is not a thread-safe class. Only use :class:`RPCClient` to communicate
with RPCServer from other threads.
Examples
--------
::
# In host/process/thread 1:
server = RPCServer()
rpc_addr = server.address
# Publish an object for others to access easily
server['object_name'] = MyClass()
# In host/process/thread 2: (you must communicate rpc_addr manually)
client = RPCClient(rpc_addr)
# Get a proxy to published object; use this (almost) exactly as you
# would a local object:
remote_obj = client['object_name']
remote_obj.method(...)
# Or, you can remotely import and operate a module:
remote_module = client._import("my.module.name")
remote_obj = remote_module.MyClass()
remote_obj.method(...)
# See ObjectProxy for more information on interacting with remote
# objects, including (a)synchronous communication.
"""
servers_by_thread = {}
servers_by_thread_lock = threading.Lock()
[docs] @staticmethod
def get_server():
"""Return the server running in this thread, or None if there is no server.
"""
with RPCServer.servers_by_thread_lock:
return RPCServer.servers_by_thread.get(threading.current_thread().ident, None)
[docs] @staticmethod
def register_server(srv):
"""Register a server as the (only) server running in this thread.
This static method fails if another server is already registered for
this thread.
"""
key = threading.current_thread().ident
if srv._thread == key:
return
assert srv._thread is None, "Server has already been registered."
with RPCServer.servers_by_thread_lock:
if key in RPCServer.servers_by_thread:
raise KeyError("An RPCServer is already running in this thread.")
RPCServer.servers_by_thread[key] = srv
srv._thread = key
[docs] @staticmethod
def unregister_server(srv):
"""Unregister a server from this thread.
"""
key = srv._thread
with RPCServer.servers_by_thread_lock:
assert RPCServer.servers_by_thread[key] is srv
RPCServer.servers_by_thread.pop(key)
[docs] @staticmethod
def local_client():
"""Return the RPCClient used for accessing the server running in the
current thread.
"""
from .client import RPCClient
srv = RPCServer.get_server()
return RPCClient.get_client(srv.address)
def __init__(self, address="tcp://127.0.0.1:*"):
self._socket = zmq.Context.instance().socket(zmq.ROUTER)
# socket will continue attempting to deliver messages up to 5 sec after
# it has closed. (default is -1, which can cause processes to hang
# on exit)
self._socket.linger = 5000
self._socket.bind(address)
#: The zmq address where this server is listening (e.g. 'tcp:///127.0.0.1:5678')
self.address = self._socket.getsockopt(zmq.LAST_ENDPOINT)
self._closed = False
# Clients may make requests using any supported serializer, so we should
# have one of each ready.
self._serializers = {}
for ser in all_serializers.values():
self._serializers[ser.type] = ser(server=self)
# keep track of all clients we have seen so that we can inform them
# when the server exits.
self._clients = {} # {socket_id: serializer_type}
# Id of thread that this server is registered to
self._thread = None
# types that are sent by value when return_type='auto'
self.no_proxy_types = [type(None), str, int, float, tuple, list, dict, ObjectProxy, np.ndarray]
# Objects that may be retrieved by name using client['obj_name']
self._namespace = {'self': self}
# Information about objects for which we have sent proxies to other machines.
# "object ID" is an integer that uniquely identifies each object that
# has been proxied. Multiple requests for the same object will return
# proxies with the same object ID. We do _not_ use id(obj) here because
# Python may re-use these IDs over time.
self._next_object_id = 0 # uniquely identifies proxied objects
# "ref ID" is an integer that uniquely identifies a single proxy as it
# is sent. Multiple requests for the same object will each have a
# different ref ID. These are used for remote reference counting to
# ensure that local objects stay alive as long as a remote proxy might
# still exist for the object.
self._next_ref_id = 0 # uniquely identifies a proxy reference
self._proxy_refs = {} # obj_id: [object, set(refs)]
self._proxy_id_map = {} # id(obj): obj_id
# Make sure we inform clients of closure
atexit.register(self._atexit)
[docs] def get_proxy(self, obj, **kwds):
"""Return an ObjectProxy referring to a local object.
This proxy can be sent via RPC to any other node.
"""
rid = self._next_ref_id
self._next_ref_id += 1
oid = self._get_object_id(obj)
type_str = str(type(obj))
proxy = ObjectProxy(self.address, oid, rid, type_str, attributes=(), **kwds)
proxy_ref = self._proxy_refs.setdefault(oid, [obj, set()])
proxy_ref[1].add(rid)
#logging.debug("server %s add proxy %d: %s", self.address, oid, obj)
return proxy
def _get_object_id(self, obj):
oid = self._proxy_id_map.get(id(obj), None)
if oid is None:
oid = self._next_object_id
self._next_object_id += 1
self._proxy_id_map[id(obj)] = oid
return oid
[docs] def unwrap_proxy(self, proxy):
"""Return the local python object referenced by *proxy*.
"""
try:
oid = proxy._obj_id
obj = self._proxy_refs[oid][0]
except KeyError:
raise KeyError("Invalid proxy object ID %r. The object may have "
"been released already." % proxy.obj_id)
for attr in proxy._attributes:
obj = getattr(obj, attr)
#logging.debug("server %s unwrap proxy %d: %s", self.address, oid, obj)
return obj
def __getitem__(self, key):
return self._namespace[key]
def __setitem__(self, key, value):
"""Define an object that may be retrieved by name from the client.
"""
self._namespace[key] = value
@staticmethod
def _read_one(socket):
name, req_id, action, return_type, ser_type, opts = socket.recv_multipart()
msg = {
'req_id': int(req_id),
'action': action.decode(),
'return_type': return_type.decode(),
'ser_type': ser_type.decode(),
'opts': opts,
}
return name, msg
def _read_and_process_one(self):
"""Read one message from the rpc socket and invoke the requested
action.
"""
if not self.running:
raise RuntimeError("RPC server socket is already closed.")
name, msg = self._read_one(self._socket)
self._process_one(name, msg)
def _process_one(self, caller, msg):
"""
Invoke the requested action.
This method sends back to the client either the return value or an
error message.
"""
ser_type = msg['ser_type']
action = msg['action']
req_id = msg['req_id']
return_type = msg.get('return_type', 'auto')
# remember this caller so we can deliver a disconnect message later
self._clients[caller] = ser_type
# Attempt to read message and invoke requested action
try:
try:
serializer = self._serializers[ser_type]
except KeyError:
raise ValueError("Unsupported serializer '%s'" % ser_type)
opts = msg.pop('opts', None)
logging.debug("RPC recv '%s' from %s [req_id=%s]", action, caller.decode(), req_id)
logging.debug(" => %s", msg)
if opts == b'':
opts = None
else:
opts = serializer.loads(opts)
logging.debug(" => opts: %s", opts)
result = self.process_action(action, opts, return_type, caller)
exc = None
except:
exc = sys.exc_info()
# Send result or error back to client
if req_id >= 0:
if exc is None:
#print "returnValue:", returnValue, result
if return_type == 'auto':
result = self.auto_proxy(result, self.no_proxy_types)
elif return_type == 'proxy':
result = self.get_proxy(result)
try:
self._send_result(caller, req_id, rval=result)
except:
logger.warn(" => Failed to send result for %d", req_id)
exc = sys.exc_info()
self._send_error(caller, req_id, exc)
else:
logger.warn(" => returning exception for %d: %s", req_id, exc)
self._send_error(caller, req_id, exc)
elif exc is not None:
# An exception occurred, but client did not request a response.
# Instead we will dump the exception here.
sys.excepthook(*exc)
if action == 'close':
self._final_close()
def _send_error(self, caller, req_id, exc):
exc_str = ["Error while processing request %s [%d]: " % (caller.decode(), req_id)]
exc_str += traceback.format_stack()
exc_str += [" < exception caught here >\n"]
exc_str += traceback.format_exception(*exc)
self._send_result(caller, req_id, error=(exc[0].__name__, exc_str))
def _send_result(self, caller, req_id, rval=None, error=None):
result = {'action': 'return', 'req_id': req_id,
'rval': rval, 'error': error}
logging.debug("RPC send result to %s [rpc_id=%s]", caller.decode(), result['req_id'])
logging.debug(" => %s", result)
# Select the correct serializer for this client
serializer = self._serializers[self._clients[caller]]
# Serialize and return the result
data = serializer.dumps(result)
self._socket.send_multipart([caller, data])
[docs] def process_action(self, action, opts, return_type, caller):
"""Invoke a single action and return the result.
"""
if action == 'call_obj':
obj = opts['obj']
fnargs = opts.get('args', ())
fnkwds = opts.get('kwargs', {})
if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments.
try:
result = obj(*fnargs)
except:
logger.warn("Failed to call object %s: %d, %s", obj, len(fnargs), fnargs[1:])
raise
else:
result = obj(*fnargs, **fnkwds)
#logging.debug(" => call_obj result: %r", result)
elif action == 'get_obj':
result = opts['obj']
elif action == 'delete':
proxy_ref = self._proxy_refs[opts['obj_id']]
proxy_ref[1].remove(opts['ref_id'])
if len(proxy_ref[1]) == 0:
del self._proxy_refs[opts['obj_id']]
del self._proxy_id_map[id(proxy_ref[0])]
result = None
elif action =='get_item':
result = self[opts['name']]
elif action =='set_item':
self[opts['name']] = opts['obj']
result = None
elif action == 'import':
name = opts['module']
fromlist = opts.get('fromlist', [])
mod = builtins.__import__(name, fromlist=fromlist)
if len(fromlist) == 0:
parts = name.lstrip('.').split('.')
result = mod
for part in parts[1:]:
result = getattr(result, part)
else:
result = map(mod.__getattr__, fromlist)
elif action == 'ping':
result = 'pong'
elif action == 'close':
self._closed = True
# Send a disconnect message to all known clients
data = {}
for client, ser_type in self._clients.items():
if client == caller:
# We will send an actual return value to confirm closure
# to the caller.
continue
# Select or generate the disconnect message that was serialized
# correctly for this client.
if ser_type not in data:
ser = self._serializers[ser_type]
data[ser_type] = ser.dumps({'action': 'disconnect'})
data_str = data[ser_type]
# Send disconnect message.
logger.debug("RPC server sending disconnect message to %r", client)
self._socket.send_multipart([client, data_str])
RPCServer.unregister_server(self)
result = True
else:
raise ValueError("Invalid RPC action '%s'" % action)
return result
def _atexit(self):
# Process is exiting; do any last-minute cleanup if necessary.
if self._closed is not True:
logger.warn("RPCServer exiting without close()!")
self.close()
[docs] def close(self):
"""Ask the server to close.
This method is thread-safe.
"""
from .client import RPCClient
cli = RPCClient.get_client(self.address)
if cli is None:
self.process_action('close', None, None, None)
else:
cli.close_server(sync='sync')
def _final_close(self):
# Called after the server has closed and sent its disconnect messages.
self._socket.close()
[docs] def running(self):
"""Boolean indicating whether the server is still running.
"""
return self._closed is False
[docs] def run_forever(self):
"""Read and process RPC requests until the server is asked to close.
"""
name = ('%s.%s.%s' % (log.get_host_name(), log.get_process_name(),
log.get_thread_name()))
logging.info("RPC start server: %s@%s", name, self.address.decode())
RPCServer.register_server(self)
while self.running():
name, msg = self._read_one(self._socket)
self._process_one(name, msg)
[docs] def run_lazy(self):
"""Register this server as being active for the current thread, but do
not actually begin processing requests.
RPCClients in the same thread will allow the server to process requests
while they are waiting for responses. This can prevent deadlocks that
occur when
This can also be used to allow the user to manually process requests.
"""
name = ('%s.%s.%s' % (log.get_host_name(), log.get_process_name(),
log.get_thread_name()))
logging.info("RPC lazy-start server: %s@%s", name, self.address.decode())
RPCServer.register_server(self)
def auto_proxy(self, obj, no_proxy_types):
## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
for typ in no_proxy_types:
if isinstance(obj, typ):
return obj
return self.get_proxy(obj)
[docs] def start_timer(self, callback, interval, **kwds):
"""Start a timer that invokes *callback* at regular intervals.
Parameters
----------
callback : callable
Callable object to invoke. This must either be an ObjectProxy or
an object that is safe to call from the server's thread.
interval : float
Minimum time to wait between callback invocations (start to start).
"""
kwds.setdefault('start', True)
if not isinstance(callback, ObjectProxy):
callback = self.get_proxy(callback)
return Timer(callback, interval, **kwds)
[docs]class QtRPCServer(RPCServer):
"""RPCServer that lives in a Qt GUI thread.
This server may be used to create and manage QObjects, QWidgets, etc. It
uses a separate thread to poll for RPC requests, which are then sent to the
Qt event loop using by signal. This allows the RPC actions to be executed
in a Qt GUI thread without using a timer to poll the RPC socket. Responses
are sent back to the poller thread by a secondary socket.
QtRPCServer may be started in newly spawned processes using
:class:`ProcessSpawner`.
Parameters
----------
address : str
ZMQ address to listen on. Default is ``'tcp://127.0.0.1:*'``.
**Note:** binding RPCServer to a public IP address is a potential
security hazard. See :class:`RPCServer`.
quit_on_close : bool
If True, then call `QApplication.quit()` when the server is closed.
Examples
--------
Spawning in a new process::
# Create new process.
proc = ProcessSpawner(qt=True)
# Display a widget from the new process.
qtgui = proc._import('PyQt4.QtGui')
w = qtgui.QWidget()
w.show()
Starting in an existing Qt application::
# Create server.
server = QtRPCServer()
# Start listening for requests in a background thread (this call
# returns immediately).
server.run_forever()
"""
def __init__(self, address="tcp://127.0.0.1:*", quit_on_close=True):
RPCServer.__init__(self, address)
self.quit_on_close = quit_on_close
self.poll_thread = QtPollThread(self)
[docs] def run_forever(self):
name = ('%s.%s.%s' % (log.get_host_name(), log.get_process_name(),
log.get_thread_name()))
logging.info("RPC start server: %s@%s", name, self.address.decode())
RPCServer.register_server(self)
self.poll_thread.start()
[docs] def process_action(self, action, opts, return_type, caller):
# this method is called from the Qt main thread.
if action == 'close':
if self.quit_on_close:
QtGui.QApplication.instance().quit()
# can't stop poller thread here--that would prevent the return
# message being sent. In general it should be safe to leave this thread
# running anyway.
#self.poll_thread.stop()
return RPCServer.process_action(self, action, opts, return_type, caller)
def _final_close(self):
# Block for a moment to allow the poller thread to flush any pending
# messages. Ideally, we could let the poller thread keep the process
# alive until it is done, but then we can end up with zombie processes..
time.sleep(0.1)
class QtPollThread(QtCore.QThread):
"""Thread that polls an RPCServer socket and sends incoming messages to the
server by Qt signal.
This allows the RPC actions to be executed in a Qt GUI thread without using
a timer to poll the RPC socket. Responses are sent back to the poller
thread by a secondary socket.
"""
new_request = QtCore.Signal(object, object) # client, msg
def __init__(self, server):
# Note: QThread behaves like threading.Thread(daemon=True); a running
# QThread will not prevent the process from exiting.
QtCore.QThread.__init__(self)
self.server = server
# Steal RPC socket from the server; it should not be touched outside the
# polling thread.
self.rpc_socket = server._socket
# Create a socket for the Qt thread to send results back to the poller
# thread
return_addr = 'inproc://%x' % id(self)
context = zmq.Context.instance()
self.return_socket = context.socket(zmq.PAIR)
self.return_socket.linger = 1000 # don't let socket deadlock when exiting
self.return_socket.bind(return_addr)
server._socket = context.socket(zmq.PAIR)
server._socket.linger = 1000 # don't let socket deadlock when exiting
server._socket.connect(return_addr)
self.new_request.connect(server._process_one)
def run(self):
poller = zmq.Poller()
poller.register(self.rpc_socket, zmq.POLLIN)
poller.register(self.return_socket, zmq.POLLIN)
while True:
# Note: poller needs to continue running until server has sent
# its final response (which can be after the server claims to be
# no longer running).
socks = dict(poller.poll(timeout=100))
if self.return_socket in socks:
name, data = self.return_socket.recv_multipart()
#logger.debug("poller return %s %s", name, data)
if name == 'STOP':
break
self.rpc_socket.send_multipart([name, data])
if self.rpc_socket in socks:
name, msg = RPCServer._read_one(self.rpc_socket)
#logger.debug("poller recv %s %s", name, msg)
self.new_request.emit(name, msg)
#logger.error("poller exit.")
def stop(self):
"""Ask the poller thread to stop.
This method may only be called from the Qt main thread.
"""
self.server._socket.send_multipart([b'STOP', b''])