"""
This package imports the codecs that can be used for de- and encoding incoming
and outgoing messages:
- :class:`JSON` uses `JSON <http://www.json.org/>`_
- :class:`protobuf` uses protobuf
All codecs should implement the base class :class:`Codec`.
Most of this code is taken and adapted from Stefan Scherfkes aiomas:
https://gitlab.com/sscherfke/aiomas/
"""
import inspect
import json
from ctypes import c_int32
from hashlib import sha1
from mango.messages.message import (
ACLMessage,
AgentAddress,
MangoMessage,
Performatives,
enum_serializer,
)
from ..messages.acl_message_pb2 import ACLMessage as ACLProto
from ..messages.other_proto_msgs_pb2 import GenericMsg as GenericProtoMsg
[docs]
def json_serializable(cls=None, repr=True):
"""
This is a direct copy from aiomas:
https://gitlab.com/sscherfke/aiomas/-/blob/master/src/aiomas/codecs.py
Class decorator that makes the decorated class serializable by the json
codec (or any codec that can handle python dictionaries).
The decorator tries to extract all arguments to the class’ ``__init__()``.
That means, the arguments must be available as attributes with the same
name.
The decorator adds the following methods to the decorated class:
- ``__asdict__()``: Returns a dict with all __init__ parameters
- ``__fromdict__(dict)``: Creates a new class instance from *dict*
- ``__serializer__()``: Returns a tuple with args for
:meth:`Codec.add_serializer()`
- ``__repr__()``: Returns a generic instance representation. Adding this
method can be deactivated by passing ``repr=False`` to the decorator.
"""
def wrap(cls):
attrs = [a for a in inspect.signature(cls).parameters]
def __asdict__(self):
return {a: getattr(self, a) for a in attrs}
@classmethod
def __fromdict__(cls, attrs):
return cls(**attrs)
def __repr__(self):
args = (f"{a}={getattr(self, a)!r}" for a in attrs)
return "{}({})".format(self.__class__.__name__, ", ".join(args))
@classmethod
def __serializer__(cls):
return cls, cls.__asdict__, cls.__fromdict__
cls.__asdict__ = __asdict__
cls.__fromdict__ = __fromdict__
cls.__serializer__ = __serializer__
if repr:
cls.__repr__ = __repr__
return cls
# The type of "cls" depends on the usage of the decorator. It's a class if
# it's used as `@serializable` but ``None`` if used as `@serializable()`.
if cls is None:
return wrap
else:
return wrap(cls)
[docs]
class SerializationError(Exception):
"""Raised when an object cannot be serialized."""
[docs]
class DecodeError(Exception):
"""Raised when an object representation can not be decoded."""
[docs]
class Codec:
"""Base class for all Codecs.
Subclasses must implement :meth:`encode()` and :meth:`decode()`.
"""
def __init__(self):
self._serializers = {}
self._deserializers = {}
def __str__(self):
return "{}[{}]".format(
self.__class__.__name__,
", ".join(s.__name__ for s in self._serializers),
)
[docs]
def encode(self, data):
"""Encode the given *data* and return a :class:`bytes` object."""
raise NotImplementedError
[docs]
def decode(self, data):
"""Decode *data* from :class:`bytes` to the original data structure."""
raise NotImplementedError
[docs]
def make_type_id(self, otype):
"""Create a type id for *otype* using:
- type name
- function names in the class
- signature of the class
and return a 32 bit integer type id."""
class_funcs = sorted(inspect.getmembers(otype, predicate=inspect.isfunction))
data = otype.__name__
for d in class_funcs:
data += d[0]
try:
attrs = [a for a in inspect.signature(otype).parameters]
for d in attrs:
data += d
except ValueError:
# object type has no inspectable signature
pass
int_hash = int(sha1(data.encode("utf-8")).hexdigest(), 16)
# truncate to 32 bit for protobuf wrapper
type_id = c_int32(int_hash).value
return type_id
[docs]
def add_serializer(self, otype, serialize, deserialize, type_id=None):
"""Add methods to *serialize* and *deserialize* objects typed *otype*.
This can be used to de-/encode objects that the codec otherwise
couldn't encode.
*serialize* will receive the unencoded object and needs to return
an encodable serialization of it.
*deserialize* will receive an objects representation and should return
an instance of the original object.
"""
if otype in self._serializers:
raise ValueError(f'There is already a serializer for type "{otype}"')
if type_id is None:
type_id = self.make_type_id(otype)
if type_id in self._deserializers.keys():
raise ValueError(f'There is already a serializer with type id "{type_id}"')
# type_id = len(self._serializers)
self._serializers[otype] = (type_id, serialize)
self._deserializers[type_id] = deserialize
[docs]
def serialize_obj(self, obj):
"""Serialize *obj* to something that the codec can encode."""
orig_type = otype = type(obj)
if otype not in self._serializers:
# Fallback to a generic serializer (if available)
otype = object
try:
type_id, serialize = self._serializers[otype]
except KeyError:
raise SerializationError(
f'No serializer found for type "{orig_type}"'
) from None
try:
return {"__type__": (type_id, serialize(obj))}
except Exception as e:
raise SerializationError(
f'Could not serialize object "{obj!r}": {e}'
) from e
[docs]
def deserialize_obj(self, obj_repr):
"""Deserialize the original object from *obj_repr*."""
# This method is called for *all* dicts so we have to check if it
# contains a desrializable type.
if "__type__" in obj_repr:
type_id, data = obj_repr["__type__"]
obj_repr = self._deserializers[type_id](data)
return obj_repr
[docs]
class JSON(Codec):
"""A :class:`Codec` that uses *JSON* to encode and decode messages."""
def __init__(self):
super().__init__()
self.add_serializer(*ACLMessage.__json_serializer__())
self.add_serializer(*MangoMessage.__json_serializer__())
self.add_serializer(*AgentAddress.__serializer__())
self.add_serializer(*enum_serializer(Performatives))
[docs]
def encode(self, data):
return json.dumps(data, default=self.serialize_obj).encode()
[docs]
def decode(self, data):
return json.loads(data.decode(), object_hook=self.deserialize_obj)
[docs]
class PROTOBUF(Codec):
def __init__(self):
super().__init__()
# expected serializers: (obj, to_proto, from_proto)
# output of to_proto is the already serialized(!) proto object
# input of from_proto is the string representation of the proto object
# the codec merely handles the mapping of object types to these methods
# it does not require any knowledge of the actual proto classes
self.add_serializer(ACLMessage, self._acl_to_proto, self._proto_to_acl)
self.add_serializer(*MangoMessage.__protoserializer__())
[docs]
def encode(self, data):
# All known proto messages are wrapped in this generic proto msg.
# This is to have the type_id available to decoding later.
# Otherwise, we can not infer the original proto type from the serialized message.
proto_msg = GenericProtoMsg()
type_id, content = self.serialize_obj(data)
proto_msg.type_id = type_id
proto_msg.content = content.SerializeToString()
return proto_msg.SerializeToString()
[docs]
def decode(self, data):
proto_msg = GenericProtoMsg()
try:
proto_msg.ParseFromString(bytes(data))
except Exception as e:
raise DecodeError(f"Could not parse data: {data}") from e
obj_repr = {"__type__": (proto_msg.type_id, proto_msg.content)}
return self.deserialize_obj(obj_repr)
[docs]
def serialize_obj(self, obj):
serialized = super().serialize_obj(obj)
return serialized["__type__"]
[docs]
def register_proto_type(self, proto_class):
def deserialize(data):
proto_obj = proto_class()
proto_obj.ParseFromString(data)
return proto_obj
self.add_serializer(proto_class, lambda x: x, deserialize)
def _acl_to_proto(self, acl_message):
# ACLMessage to serialized proto object
msg = ACLProto()
msg.sender_id = acl_message.sender_id if acl_message.sender_id else ""
msg.receiver_id = acl_message.receiver_id if acl_message.receiver_id else ""
msg.conversation_id = (
acl_message.conversation_id if acl_message.conversation_id else ""
)
msg.performative = (
acl_message.performative.value
if acl_message.performative is not None
else 0
)
msg.protocol = acl_message.protocol if acl_message.protocol else ""
msg.language = acl_message.language if acl_message.language else ""
msg.encoding = acl_message.encoding if acl_message.encoding else ""
msg.ontology = acl_message.ontology if acl_message.ontology else ""
msg.reply_with = acl_message.reply_with if acl_message.reply_with else ""
msg.reply_by = acl_message.reply_by if acl_message.reply_by else ""
msg.in_reply_to = acl_message.in_reply_to if acl_message.in_reply_to else ""
if isinstance(acl_message.sender_addr, tuple | list):
msg.sender_addr = (
f"{acl_message.sender_addr[0]}:{acl_message.sender_addr[1]}"
)
elif acl_message.sender_addr:
msg.sender_addr = acl_message.sender_addr
if isinstance(acl_message.receiver_addr, tuple | list):
msg.receiver_addr = (
f"{acl_message.receiver_addr[0]}:{acl_message.receiver_addr[1]}"
)
elif acl_message.receiver_addr:
msg.receiver_addr = acl_message.receiver_addr
# content is only allowed to be a proto message known to the codec here
if acl_message.content is not None:
type_id, content = self.serialize_obj(acl_message.content)
msg.content = content.SerializeToString()
msg.content_type = type_id
return msg
def _proto_to_acl(self, data):
# serialized proto object to ACLMessage
msg = ACLProto()
acl = ACLMessage()
msg.ParseFromString(data)
acl.sender_id = msg.sender_id if msg.sender_id else None
acl.receiver_id = msg.receiver_id if msg.receiver_id else None
acl.conversation_id = msg.conversation_id if msg.conversation_id else None
acl.performative = Performatives(msg.performative) if msg.performative else None
acl.protocol = msg.protocol if msg.protocol else None
acl.language = msg.language if msg.language else None
acl.encoding = msg.encoding if msg.encoding else None
acl.ontology = msg.ontology if msg.ontology else None
acl.reply_with = msg.reply_with if msg.reply_with else None
acl.reply_by = msg.reply_by if msg.reply_by else None
acl.in_reply_to = msg.in_reply_to if msg.in_reply_to else None
acl.sender_addr = msg.sender_addr if msg.sender_addr else None
acl.receiver_addr = msg.receiver_addr if msg.receiver_addr else None
if msg.content and msg.content_type:
obj_repr = {"__type__": (msg.content_type, msg.content)}
acl.content = self.deserialize_obj(obj_repr)
return acl