Source code for pyacq.core.rpc.serializer

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

import numpy as np
import datetime
import base64
import json
    import msgpack
except ImportError:
    HAVE_MSGPACK = False

from .proxy import ObjectProxy

# Global list of supported serializers.
all_serializers = {}  # type_str: class

# Any type that is not supported by json/msgpack must be encoded as a dict.
# To distinguish these from plain dicts, we include a unique key in them:
encode_key = '___type_name___'

[docs]class Serializer: """Base serializer class on which msgpack and json serializers (and potentially others) are built. Subclasses must be registered by adding to the ``all_serializers`` global. Supports ndarray, date, datetime, and bytes for transfer in addition to the standard types supported by json and msgpack. All other types are converted to an object proxy that can be used to access methods / attributes of the object remotely (this requires that the object be owned by an RPC server). Note that tuples are converted to lists in transit. See: """ def __init__(self, server=None, client=None): self._server = server self.client = client @property def server(self): if self._server is None: # get the current server for this thread, if one exists from .server import RPCServer self._server = RPCServer.get_server() return self._server
[docs] def dumps(self, obj): """Convert obj to serialized string. """ raise NotImplementedError()
[docs] def loads(self, msg): """Convert from serialized string to python object. Proxies that reference objects owned by the server are converted back into the local object. All other proxies are left as-is. """ raise NotImplementedError()
[docs] def encode(self, obj): """Convert various types to serializable objects. Provides support for ndarray, datetime, date, and None. Other types are converted to proxies. """ if isinstance(obj, np.ndarray): if not obj.flags['C_CONTIGUOUS']: obj = np.ascontiguousarray(obj) assert(obj.flags['C_CONTIGUOUS']) return {encode_key: 'ndarray', 'data': obj.tostring(), 'dtype': str(obj.dtype), 'shape': obj.shape} elif isinstance(obj, datetime.datetime): return {encode_key: 'datetime', 'data': obj.strftime('%Y-%m-%dT%H:%M:%S.%f')} elif isinstance(obj, return {encode_key: 'date', 'data': obj.strftime('%Y-%m-%d')} elif obj is None: return {encode_key: 'none'} elif isinstance(obj, (np.float32, np.float64)): #convert for numpy.float32, numpy.float64, ... return float(obj) elif isinstance(obj, (np.int32, np.int64)): #convert for numpy.int32, numpy.int64, ... return int(obj) else: # All unrecognized types must be converted to proxy. if not isinstance(obj, ObjectProxy): if self.server is None: raise TypeError("Cannot make proxy to %r without proxy server." % obj) obj = self.server.get_proxy(obj) ser = {encode_key: 'proxy'} ser.update(obj._save()) return ser
[docs] def decode(self, dct): """Convert from serializable objects back to original types. """ if isinstance(dct, dict): type_name = dct.pop(encode_key, None) if type_name is None: return dct if type_name == 'ndarray': dt = dct['dtype'] if dt.startswith('['): #small hack to have a list d = {} exec('dtype='+dt, None, d) dt = d['dtype'] return np.fromstring(dct['data'], dtype=dt).reshape(dct['shape']) elif type_name == 'datetime': return datetime.datetime.strptime(dct['data'], '%Y-%m-%dT%H:%M:%S.%f') elif type_name == 'date': return datetime.datetime.strptime(dct['data'], '%Y-%m-%d').date() elif type_name == 'none': return None elif type_name == 'proxy': if 'attributes' in dct: dct['attributes'] = tuple(dct['attributes']) proxy = ObjectProxy(**dct) if self.client is not None: proxy._set_proxy_options(**self.client.default_proxy_options) if self.server is not None and proxy._rpc_addr == self.server.address: return self.server.unwrap_proxy(proxy) else: return proxy return dct
class MsgpackSerializer(Serializer): """Class for serializing objects using msgpack. Supports ndarray, date, datetime, and bytes for transfer in addition to the standard list supported by msgpack. All other types are converted to an object proxy that can be used to access methods / attributes of the object remotely (this requires that the object be owned by an RPC server). Note that tuples are converted to lists in transit. See: """ # used to tell server how to unserialize messages type = 'msgpack' def __init__(self, server=None, client=None): assert HAVE_MSGPACK Serializer.__init__(self, server, client) def dumps(self, obj): """Convert obj to msgpack string. """ return msgpack.dumps(obj, use_bin_type=True, default=self.encode) def loads(self, msg): """Convert from msgpack string to python object. Proxies that reference objects owned by the server are converted back into the local object. All other proxies are left as-is. """ ## use_list=False because we are more likely to care about transmitting ## tuples correctly (because they are used as dict keys). #return msgpack.loads(msg, encoding='utf8', use_list=False, object_hook=self.decode) #Return lists/tuples as lists because json can't be configured otherwise return msgpack.loads(msg, encoding='utf8', object_hook=self.decode) class JsonSerializer(Serializer): # used to tell server how to unserialize messages type = 'json' def __init__(self, server=None, client=None): Serializer.__init__(self, server, client) # We require a custom class to overrode json encode behavior. class EnhancedJSONEncoder(json.JSONEncoder): def default(self2, obj): obj2 = self.encode(obj) if obj is obj2: return json.JSONEncoder.default(self, obj) else: return obj2 self.EnhancedJSONEncoder = EnhancedJSONEncoder def dumps(self, obj): return json.dumps(obj, cls=self.EnhancedJSONEncoder).encode() def loads(self, msg): return json.loads(msg.decode(), object_hook=self.decode) def encode(self, obj): if isinstance(obj, np.ndarray): # JSON doesn't support bytes, so we use base64 encoding instead: if not obj.flags['C_CONTIGUOUS']: obj = np.ascontiguousarray(obj) assert(obj.flags['C_CONTIGUOUS']) return {encode_key: 'ndarray', 'data': base64.b64encode(, 'dtype': str(obj.dtype), 'shape': obj.shape} elif isinstance(obj, bytes): return {encode_key: 'bytes', 'data': base64.b64encode(obj).decode()} elif obj is None: # JSON does support None/null: return None return Serializer.encode(self, obj) def decode(self, dct): if isinstance(dct, dict): type_name = dct.get(encode_key, None) if type_name == 'ndarray': data = base64.b64decode(dct['data']) return np.frombuffer(data, dct['dtype']).reshape(dct['shape']) elif type_name == 'bytes': return base64.b64decode(dct['data']) return Serializer.decode(self, dct) return dct #: dict containing {name : SerializerSubclass} for all supported serializers all_serializers[JsonSerializer.type] = JsonSerializer if HAVE_MSGPACK: all_serializers[MsgpackSerializer.type] = MsgpackSerializer