# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.
import os
import weakref
[docs]class ObjectProxy(object):
"""
Proxy to an object stored by a remote :class:`RPCServer`.
A proxy behaves in most ways like the object that it wraps--you can request
the same attributes, call methods, etc. There are a few important
differences:
* A proxy can be on a different thread, process, or machine than its object,
so long as the object's thread has an RPCServer and the proxy's thread has
an associated RPCClient.
* Attribute lookups and method calls can be slower because the request and
response must traverse a socket. These can also be performed asynchronously
to avoid blocking the client thread.
* Function argument and return types must be serializable or proxyable.
Most basic types can be serialized, including numpy arrays. All other
objects are automatically proxied, but there are some cases when this will
not work well.
* :func:`__repr__` and :func:`__str__` are overridden on proxies to allow
safe debugging.
* :func:`__hash__` is overridden to ensure that remote hash values are not
used locally.
For the most part, object proxies can be used exactly as if they are
local objects::
client = RPCClient(address) # connect to RPCServer
rsys = client._import('sys') # returns proxy to sys module on remote process
rsys.stdout.write # proxy to remote sys.stdout.write
rsys.stdout.write('hello') # calls sys.stdout.write('hello') on remote machine
# and returns the result (None)
When calling a proxy to a remote function, the call can be made synchronous
(caller is blocked until result can be returned), asynchronous (Future is
returned immediately and result can be accessed later), or return can be
disabled entirely::
ros = proc._import('os')
# synchronous call; caller blocks until result is returned
pid = ros.getpid()
# asynchronous call
request = ros.getpid(_sync='async')
while not request.hasResult():
time.sleep(0.01)
pid = request.result()
# disable return when we know it isn't needed.
rsys.stdout.write('hello', _sync='off')
Additionally, values returned from a remote function call are automatically
returned either by value (must be picklable) or by proxy.
This behavior can be forced::
rnp = proc._import('numpy')
arrProxy = rnp.array([1,2,3,4], _return_type='proxy')
arrValue = rnp.array([1,2,3,4], _return_type='value')
The default sync and return_type behaviors (as well as others) can be set
for each proxy individually using ObjectProxy._set_proxy_options() or globally using
proc.set_proxy_options().
It is also possible to send arguments by proxy if an RPCServer is running
in the caller's thread (this can be used, for example, to connect Qt
signals across network connections)::
def callback():
print("called back.")
# Remote process can invoke our callback function as long as there is
# a server running here to process the request.
remote_object.set_callback(proxy(callback))
"""
def __init__(self, rpc_addr, obj_id, ref_id, type_str='', attributes=(), **kwds):
object.__init__(self)
## can't set attributes directly because setattr is overridden.
if isinstance(rpc_addr, str):
rpc_addr = rpc_addr.encode()
self.__dict__.update(dict(
_rpc_addr=rpc_addr,
_obj_id=obj_id,
_ref_id=ref_id,
_type_str=type_str,
_attributes=attributes,
_parent_proxy=None,
_hash=hash((rpc_addr, obj_id, attributes)),
# identify local client/server instances this proxy belongs to
_client_=None,
_server_=None,
))
## attributes that affect the behavior of the proxy.
self.__dict__['_proxy_options'] = {
'sync': 'sync', ## 'sync', 'async', 'off'
'timeout': 10, ## float
'return_type': 'auto', ## 'proxy', 'value', 'auto'
#'auto_proxy_args': False, ## bool
'defer_getattr': True, ## True, False
'no_proxy_types': [type(None), str, int, float, tuple, list, dict, ObjectProxy],
'auto_delete': False,
}
self._set_proxy_options(**kwds)
def _copy(self):
# Return a copy of this proxy.
# This is used for transferring proxies across threads (because an
# ObjectProxy should only be used in one thread.
prox = ObjectProxy(self._rpc_addr, self._obj_id, self._ref_id, self._type_str,
self._attributes, **self._proxy_options)
if self._parent_proxy is not None:
prox._parent_proxy = self._parent_proxy.copy()
return prox
def _client(self):
if self._client_ is None:
from .client import RPCClient
self.__dict__['_client_'] = RPCClient.get_client(self._rpc_addr)
return self._client_
def _server(self):
if self._server_ is None:
from .server import RPCServer
self.__dict__['_server_'] = RPCServer.get_server()
return self._server_
[docs] def _set_proxy_options(self, **kwds):
"""
Change the behavior of this proxy. For all options, a value of None
will cause the proxy to instead use the default behavior defined
by its parent Process.
Parameters
----------
sync : 'sync', 'async', 'off', or None
If 'async', then calling methods will return a :class:`Future` object
that can be used to inquire later about the result of the
method call.
If 'sync', then calling a method
will block until the remote process has returned its result
or the timeout has elapsed (in this case, a Request object
is returned instead).
If 'off', then the remote process is instructed *not* to
reply and the method call will return None immediately.
This option can be overridden by supplying a ``_sync`` keyword
argument when calling the method (see :func:`__call__`).
return_type : 'auto', 'proxy', 'value', or None
If 'proxy', then the value returned when calling a method
will be a proxy to the object on the remote process.
If 'value', then attempt to pickle the returned object and
send it back.
If 'auto', then the decision is made by consulting the
'no_proxy_types' option.
This option can be overridden by supplying a ``_return_type`` keyword
argument when calling the method (see :func:`__call__`).
auto_proxy : bool or None
If True, arguments to __call__ are
automatically converted to proxy unless their type is
listed in no_proxy_types (see below). If False, arguments
are left untouched. Use proxy(obj) to manually convert
arguments before sending.
timeout : float or None
Length of time to wait during synchronous
requests before returning a Request object instead.
This option can be overridden by supplying a ``_timeout`` keyword
argument when calling a method (see :func:`__call__`).
defer_getattr : True, False, or None
If False, all attribute requests will be sent to the remote
process immediately and will block until a response is
received (or timeout has elapsed).
If True, requesting an attribute from the proxy returns a
new proxy immediately. The remote process is *not* contacted
to make this request. This is faster, but it is possible to
request an attribute that does not exist on the proxied
object. In this case, AttributeError will not be raised
until an attempt is made to look up the attribute on the
remote process.
no_proxy_types : list
List of object types that should *not* be proxied when
sent to the remote process.
auto_delete : bool
If True, then the proxy will automatically call
`self._delete()` when it is collected by Python.
"""
for k in kwds:
if k not in self._proxy_options:
raise KeyError("Unrecognized proxy option '%s'" % k)
self._proxy_options.update(kwds)
def _save(self):
"""Convert this proxy to a serializable structure.
"""
addr = self._rpc_addr
if isinstance(addr, bytes):
addr = addr.decode()
state = {
'rpc_addr': addr,
'obj_id': self._obj_id,
'ref_id': self._ref_id,
'type_str': self._type_str,
'attributes': self._attributes,
}
# TODO: opts DO need to be sent in some cases, like when sending
# callbacks.
#state.update(self._proxy_options)
return state
[docs] def _get_value(self):
"""
Return the value of the proxied object.
If the object is not serializable, then raise an exception.
"""
if self._client() is None:
return self._server().unwrap_proxy(self)
else:
return self._client().get_obj(self, return_type='value')
def __repr__(self):
orep = '.'.join((self._type_str,) + self._attributes)
rep = '<ObjectProxy for %s[%d] %s >' % (self._rpc_addr.decode(), self._obj_id, orep)
return rep
def _undefer(self, sync='sync', return_type='auto', **kwds):
"""Process any deferred attribute lookups and return the result.
"""
if len(self._attributes) == 0:
return self
# Transfer sends this object to the remote process and returns a new proxy.
# In the process, this invokes any deferred attributes.
return self._client().get_obj(self, sync=sync, return_type=return_type, **kwds)
[docs] def _delete(self, sync='sync', **kwds):
"""Ask the RPC server to release the reference held by this proxy.
Note: this does not guarantee the remote object will be deleted; only
that its reference count will be reduced. Any copies of this proxy will
no longer be usable.
"""
self._client().delete(self, sync=sync, **kwds)
def __del__(self):
if self._proxy_options['auto_delete'] is True:
self._delete()
[docs] def __getattr__(self, attr):
"""
Calls __getattr__ on the remote object and returns the attribute
by value or by proxy depending on the options set (see
ObjectProxy._set_proxy_options and RemoteEventHandler.set_proxy_options)
If the option 'defer_getattr' is True for this proxy, then a new proxy object
is returned _without_ asking the remote object whether the named attribute exists.
This can save time when making multiple chained attribute requests,
but may also defer a possible AttributeError until later, making
them more difficult to debug.
"""
prox = self._deferred_attr(attr)
if self._proxy_options['defer_getattr'] is True:
return prox
else:
return prox._undefer()
def _deferred_attr(self, attr, **kwds):
"""Return a proxy to an attribute of this object. The attribute lookup
is deferred.
"""
opts = self._proxy_options.copy()
opts.update(kwds)
proxy = ObjectProxy(self._rpc_addr, self._obj_id, self._ref_id,
self._type_str, self._attributes + (attr,), **opts)
# Keep a reference to the parent proxy so that the remote object cannot be
# released as long as this proxy is alive.
proxy.__dict__['_parent_proxy'] = self
return proxy
[docs] def __call__(self, *args, **kwargs):
"""Call the proxied object from the remote process.
All positional and keyword arguments (except those listed below) are
sent to the remote procedure call.
In synchronous mode (see parameters below), this method blocks until
the remote return value has been received, and then returns that value.
In asynchronous mode, this method returns a :class:`Future` instance
immediately, which may be used to retrieve the return value later. If
return is disabled, then the method immediately returns None.
If the remote call raises an exception on the remote process, then this
method will raise RemoteCallException if in synchronous mode, or calling
:func:`Future.result()` will raise RemoteCallException if in
asynchronous mode. If return is disabled, then remote exceptions will
be ignored.
Parameters
----------
_sync: 'off', 'sync', or 'async'
Set the sync mode for this call. The default value is determined by
the 'sync' argument to :func:`_set_proxy_options()`.
_return_type: 'value', 'proxy', or 'auto'
Set the return type for this call. The default value is determined by
the 'return_type' argument to :func:`_set_proxy_options()`.
_timeout: float
Set the timeout for this call. The default value is determined by
the 'timeout' argument to :func:`_set_proxy_options()`.
See also
--------
RPCClient.call_obj()
Future
"""
opts = {
'sync': self._proxy_options['sync'],
'return_type': self._proxy_options['return_type'],
'timeout': self._proxy_options['timeout'],
}
for k in opts:
opts[k] = kwargs.pop('_'+k, opts[k])
return self._client().call_obj(obj=self, args=args, kwargs=kwargs, **opts)
def __hash__(self):
"""Override __hash__ because we need to avoid comparing remote and local
hashes.
"""
#return id(self)
return self._hash
# Explicitly proxy special methods. Is there a better way to do this??
def __getitem__(self, *args):
return self._deferred_attr('__getitem__')(*args)
def __setitem__(self, *args):
return self._deferred_attr('__setitem__')(*args, _sync='off')
[docs] def __setattr__(self, *args):
return self._deferred_attr('__setattr__')(*args, _sync='off')
def __str__(self, *args):
# for safe printing
return repr(self)
#return self._deferred_attr('__str__')(*args, _return_type='value')
def __len__(self, *args):
return self._deferred_attr('__len__')(*args)
def __add__(self, *args):
return self._deferred_attr('__add__')(*args)
def __sub__(self, *args):
return self._deferred_attr('__sub__')(*args)
def __div__(self, *args):
return self._deferred_attr('__div__')(*args)
def __truediv__(self, *args):
return self._deferred_attr('__truediv__')(*args)
def __floordiv__(self, *args):
return self._deferred_attr('__floordiv__')(*args)
def __mul__(self, *args):
return self._deferred_attr('__mul__')(*args)
def __pow__(self, *args):
return self._deferred_attr('__pow__')(*args)
def __iadd__(self, *args):
return self._deferred_attr('__iadd__')(*args, _sync='off')
def __isub__(self, *args):
return self._deferred_attr('__isub__')(*args, _sync='off')
def __idiv__(self, *args):
return self._deferred_attr('__idiv__')(*args, _sync='off')
def __itruediv__(self, *args):
return self._deferred_attr('__itruediv__')(*args, _sync='off')
def __ifloordiv__(self, *args):
return self._deferred_attr('__ifloordiv__')(*args, _sync='off')
def __imul__(self, *args):
return self._deferred_attr('__imul__')(*args, _sync='off')
def __ipow__(self, *args):
return self._deferred_attr('__ipow__')(*args, _sync='off')
def __rshift__(self, *args):
return self._deferred_attr('__rshift__')(*args)
def __lshift__(self, *args):
return self._deferred_attr('__lshift__')(*args)
def __irshift__(self, *args):
return self._deferred_attr('__irshift__')(*args, _sync='off')
def __ilshift__(self, *args):
return self._deferred_attr('__ilshift__')(*args, _sync='off')
def __eq__(self, *args):
# If checking equality between two proxies to the same object, then
# we can immediately return True.
if (isinstance(args[0], ObjectProxy) and
args[0]._rpc_addr == self._rpc_addr and
args[0]._obj_id == self._obj_id):
return True
return self._deferred_attr('__eq__')(*args)
def __ne__(self, *args):
return self._deferred_attr('__ne__')(*args)
def __lt__(self, *args):
return self._deferred_attr('__lt__')(*args)
def __gt__(self, *args):
return self._deferred_attr('__gt__')(*args)
def __le__(self, *args):
return self._deferred_attr('__le__')(*args)
def __ge__(self, *args):
return self._deferred_attr('__ge__')(*args)
def __and__(self, *args):
return self._deferred_attr('__and__')(*args)
def __or__(self, *args):
return self._deferred_attr('__or__')(*args)
def __xor__(self, *args):
return self._deferred_attr('__xor__')(*args)
def __iand__(self, *args):
return self._deferred_attr('__iand__')(*args, _sync='off')
def __ior__(self, *args):
return self._deferred_attr('__ior__')(*args, _sync='off')
def __ixor__(self, *args):
return self._deferred_attr('__ixor__')(*args, _sync='off')
def __mod__(self, *args):
return self._deferred_attr('__mod__')(*args)
def __radd__(self, *args):
return self._deferred_attr('__radd__')(*args)
def __rsub__(self, *args):
return self._deferred_attr('__rsub__')(*args)
def __rdiv__(self, *args):
return self._deferred_attr('__rdiv__')(*args)
def __rfloordiv__(self, *args):
return self._deferred_attr('__rfloordiv__')(*args)
def __rtruediv__(self, *args):
return self._deferred_attr('__rtruediv__')(*args)
def __rmul__(self, *args):
return self._deferred_attr('__rmul__')(*args)
def __rpow__(self, *args):
return self._deferred_attr('__rpow__')(*args)
def __rrshift__(self, *args):
return self._deferred_attr('__rrshift__')(*args)
def __rlshift__(self, *args):
return self._deferred_attr('__rlshift__')(*args)
def __rand__(self, *args):
return self._deferred_attr('__rand__')(*args)
def __ror__(self, *args):
return self._deferred_attr('__ror__')(*args)
def __rxor__(self, *args):
return self._deferred_attr('__ror__')(*args)
def __rmod__(self, *args):
return self._deferred_attr('__rmod__')(*args)