import asyncio
import logging
from functools import partial
from typing import Any
import paho.mqtt.client as paho
from mango.container.core import AgentAddress, Container
from mango.container.mp import ContainerMirrorData
from ..messages.codecs import Codec
from ..messages.message import MangoMessage
from ..util.clock import Clock
logger = logging.getLogger(__name__)
[docs]
def mqtt_mirror_container_creator(
client_id,
inbox_topic,
container_data,
loop,
message_pipe,
main_queue,
event_pipe,
terminate_event,
):
return MQTTContainer(
client_id=client_id,
inbox_topic=inbox_topic,
broker_addr=container_data.addr,
codec=container_data.codec,
clock=container_data.clock,
loop=loop,
mirror_data=ContainerMirrorData(
message_pipe=message_pipe,
event_pipe=event_pipe,
terminate_event=terminate_event,
main_queue=main_queue,
),
**container_data.kwargs,
)
[docs]
class MQTTContainer(Container):
"""
Container for agents.
The container allows its agents to send messages to specific topics
(via :meth:`send_message()`).
"""
def __init__(
self,
*,
client_id: str,
broker_addr: tuple | dict | str,
clock: Clock,
codec: Codec,
inbox_topic: None | str = None,
**kwargs,
):
"""
Initializes a container. Do not directly call this method but use
the factory method instead
:param client_id: The ID that the container should use when connecting
to the broker
:param addr: A string of the unique inbox topic to use.
No wildcards are allowed. If None, no inbox topic will be set
:param mqtt_client: The paho.Client object that is used for the
communication with the broker
:param codec: The codec to use. Currently only 'json' or 'protobuf' are
allowed
"""
super().__init__(
codec=codec,
addr=inbox_topic or client_id,
clock=clock,
name=client_id,
**kwargs,
)
self.client_id: str = client_id
# the client will be created on start.
self.mqtt_client: paho.Client = None
self.inbox_topic: None | str = inbox_topic or client_id
self.broker_addr = broker_addr
# dict mapping additionally subscribed topics to a set of aids
self.additional_subscriptions: dict[str, set[str]] = {}
# Future for pending sub requests
self.pending_sub_request: dict[int, asyncio.Future] = {}
[docs]
async def start(self):
self._loop = asyncio.get_event_loop()
if not self.client_id:
raise ValueError("client_id is required!")
if not self.broker_addr:
raise ValueError("broker_addr is required!")
# get parameters for Client.init()
init_kwargs = {}
possible_init_kwargs = (
"clean_session",
"userdata",
"protocol",
"transport",
)
for possible_kwarg in possible_init_kwargs:
if possible_kwarg in self._kwargs.keys():
init_kwargs[possible_kwarg] = self._kwargs.pop(possible_kwarg)
# check if addr is a valid topic without wildcards
if self.inbox_topic is not None and (
not isinstance(self.inbox_topic, str)
or "#" in self.inbox_topic
or "+" in self.inbox_topic
):
raise ValueError(
"inbox topic is not set correctly. It must be a string without any wildcards ('#' or '+')!"
)
# create paho.Client object for mqtt communication
mqtt_messenger: paho.Client = paho.Client(
paho.CallbackAPIVersion.VERSION2, client_id=self.client_id, **init_kwargs
)
# set TLS options if provided
# expected as a dict:
# {ca_certs, certfile, keyfile, cert_eqs, tls_version, ciphers}
tls_kwargs = self._kwargs.pop("tls_kwargs", None)
if tls_kwargs:
mqtt_messenger.tls_set(**tls_kwargs)
# Future that is triggered, on successful connection
connected = asyncio.Future()
# callbacks to check for successful connection
def on_con(client, userdata, flags, reason_code, properties):
logger.info("Connection Callback with the following flags: %s", flags)
self._loop.call_soon_threadsafe(connected.set_result, reason_code)
mqtt_messenger.on_connect = on_con
# check broker_addr input and connect
if isinstance(self.broker_addr, tuple):
if not 0 < len(self.broker_addr) < 4:
raise ValueError("Invalid broker address argument count")
if len(self.broker_addr) > 0 and not isinstance(self.broker_addr[0], str):
raise ValueError("Invalid broker address - host must be str")
if len(self.broker_addr) > 1 and not isinstance(self.broker_addr[1], int):
raise ValueError("Invalid broker address - port must be int")
if len(self.broker_addr) > 2 and not isinstance(self.broker_addr[2], int):
raise ValueError("Invalid broker address - keepalive must be int")
mqtt_messenger.connect(*self.broker_addr, **self._kwargs)
elif isinstance(self.broker_addr, dict):
if "hostname" not in self.broker_addr.keys():
raise ValueError("Invalid broker address - host not given")
mqtt_messenger.connect(**self.broker_addr, **self._kwargs)
else:
if not isinstance(self.broker_addr, str):
raise ValueError("Invalid broker address")
mqtt_messenger.connect(self.broker_addr, **self._kwargs)
logger.info(
"[%s]: Going to connect to broker at %s..", self.client_id, self.broker_addr
)
counter = 0
# process MQTT messages for maximum of 10 seconds to
# receive connection callback
while not connected.done() and counter < 100:
mqtt_messenger.loop()
# wait for the thread to trigger the future
await asyncio.sleep(0.1)
counter += 1
if not connected.done():
# timeout
raise ConnectionError(
f"Connection to {self.broker_addr} could not be "
f"established after {counter * 0.1} seconds"
)
if connected.result() != 0:
raise ConnectionError(
f"Connection to {self.broker_addr} could not be "
f"set up. Callback returner error code "
f"{connected.result()}"
)
logger.info("sucessfully connected to mqtt broker")
if self.inbox_topic is not None:
# connection has been set up, subscribe to inbox topic now
logger.info(
"[%s]: Going to subscribe to %s as inbox topic..",
self.client_id,
self.inbox_topic,
)
# create Future that is triggered on successful subscription
subscribed = asyncio.Future()
# set up subscription callback
def on_sub(client, userdata, mid, reason_code_list, properties):
self._loop.call_soon_threadsafe(subscribed.set_result, True)
mqtt_messenger.on_subscribe = on_sub
# subscribe topic
# set maximum QoS to 2
result, _ = mqtt_messenger.subscribe(self.inbox_topic, qos=2)
if result != paho.MQTT_ERR_SUCCESS:
# subscription to inbox topic was not successful
mqtt_messenger.disconnect()
raise ConnectionError(
f"Subscription request to {self.inbox_topic} at {self.broker_addr} "
f"returned error code: {result}"
)
counter = 0
while not subscribed.done() and counter < 100:
# wait for subscription
mqtt_messenger.loop(timeout=0.1)
await asyncio.sleep(0.1)
counter += 1
if not subscribed.done():
raise ConnectionError(
f"Subscription request to {self.inbox_topic} at {self.broker_addr} "
f"did not succeed after {counter * 0.1} seconds."
)
logger.info("successfully subscribed to topic")
# connection and subscription is successful, remove callbacks
mqtt_messenger.on_subscribe = None
mqtt_messenger.on_connect = None
self.mqtt_client = mqtt_messenger
# set the callbacks
self._set_mqtt_callbacks()
# start the mqtt client
self.mqtt_client.loop_start()
await super().start()
def _set_mqtt_callbacks(self):
"""
Sets the callbacks for the mqtt paho client
"""
def on_con(client, userdata, flags, reason_code, properties):
if reason_code != 0:
logger.info("Connection attempt to broker failed")
else:
logger.debug("Successfully reconnected to broker.")
self.mqtt_client.on_connect = on_con
def on_discon(client, userdata, disconnect_flags, reason_code, properties):
if reason_code != 0:
logger.warning("Unexpected disconnect from broker. Trying to reconnect")
else:
logger.debug("Successfully disconnected from broker.")
self.mqtt_client.on_disconnect = on_discon
def process_sub_request(mid):
self.pending_sub_request[mid].set_result(0)
def on_sub(client, userdata, mid, reason_code_list, properties):
self._loop.call_soon_threadsafe(process_sub_request, mid)
self.mqtt_client.on_subscribe = on_sub
def on_message(client, userdata, message):
# extract the meta information first
meta = {
"network_protocol": "mqtt",
"topic": message.topic,
"qos": message.qos,
"retain": message.retain,
}
# decode message and extract content and meta
content, message_meta = self.decode_mqtt_message(
payload=message.payload, topic=message.topic
)
# update meta dict
meta.update(message_meta)
# put information to inbox
self._loop.call_soon_threadsafe(self.inbox.put_nowait, (0, content, meta))
self.mqtt_client.on_message = on_message
self.mqtt_client.enable_logger(logger)
[docs]
def decode_mqtt_message(self, *, topic, payload):
"""
deserializes a mqtt message.
Checks if for the topic a special class is defined, otherwise assumes
an ACLMessage
:param topic: the topic on which the message arrived
:param payload: the serialized message
:return: content and meta
"""
meta = {}
content = None
decoded = self.codec.decode(payload)
if hasattr(decoded, "split_content_and_meta"):
content, meta = decoded.split_content_and_meta()
else:
content = decoded
return content, meta
async def _handle_message(self, *, priority: int, content, meta: dict[str, Any]):
"""
This is called as a separate task for every message that is read
:param priority: priority of the message
:param content: Deserialized content of the message
:param meta: Dict with additional information (e.g. topic)
"""
topic = meta["topic"]
logger.debug("Received message with content and meta;%s;%s", content, meta)
if topic == self.inbox_topic:
await super()._handle_message(priority=priority, content=content, meta=meta)
else:
# no inbox topic. Check who has subscribed the topic.
receivers = set()
for sub, rec in self.additional_subscriptions.items():
if paho.topic_matches_sub(sub, topic):
receivers.update(rec)
if not receivers:
logger.warning(
"Received a message at a topic which no agent subscribed;%s", topic
)
else:
for receiver_id in receivers:
receiver = self._agents[receiver_id]
await receiver.inbox.put((priority, content, meta))
[docs]
async def send_message(
self,
content: Any,
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
) -> bool:
"""
The container sends the message of one of its own agents to a specific topic.
:param content: The content of the message
:param receiver_addr: The topic to publish to.
:param sender_id: The sender aid
:param kwargs: Additional parameters to provide protocol specific settings
Possible fields:
qos: The quality of service to use for publishing
retain: Indicates, weather the retain flag should be set
Ignored if connection_type != 'mqtt'
"""
# internal message first (if retain Flag is set, it has to be sent to
# the broker
meta = {}
for key, value in kwargs.items():
meta[key] = value
meta["sender_id"] = sender_id
meta["sender_addr"] = self.inbox_topic
meta["receiver_id"] = receiver_addr.aid
actual_mqtt_kwargs = {} if kwargs is None else kwargs
if (
self.inbox_topic
and receiver_addr.protocol_addr == self.inbox_topic
and not actual_mqtt_kwargs.get("retain", False)
):
meta.update(
{
"topic": self.inbox_topic,
"qos": actual_mqtt_kwargs.get("qos", 0),
"retain": False,
"network_protocol": "mqtt",
}
)
return self._send_internal_message(
content, receiver_addr.aid, default_meta=meta, inbox=self.inbox
)
else:
message = content
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
return self._send_external_message(
topic=receiver_addr.protocol_addr,
message=message,
qos=actual_mqtt_kwargs.get("qos", 0),
retain=actual_mqtt_kwargs.get("retain", False),
)
def _send_external_message(
self, *, topic: str, message, qos: int = 0, retain: bool = False
) -> bool:
"""
:param topic: MQTT topic
:param message: The ACL message
:param qos: The quality of service param passed to mqtt publish
:param retain: The retain param passed to mqtt publish
:return:
"""
encoded_message = self.codec.encode(message)
logger.debug("Sending message;%s;%s", message, topic)
msg_info = self.mqtt_client.publish(topic, encoded_message, qos, retain)
return msg_info.is_published()
[docs]
async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 2) -> bool:
"""
:param aid: aid of the corresponding agent
:param topic: topic to subscribe (wildcards are allowed)
:param qos: The maximum quality of service this subscription supports
:return: A boolean signaling if subscription was true or not
"""
if aid not in self._agents.keys():
raise ValueError("Given aid is not known")
if topic in self.additional_subscriptions.keys():
self.additional_subscriptions[topic].add(aid)
return True
self.additional_subscriptions[topic] = {aid}
future = asyncio.Future()
result, mid = self.mqtt_client.subscribe(topic, qos=qos)
if result != paho.MQTT_ERR_SUCCESS:
future.set_result(False)
return False
self.pending_sub_request[mid] = future
await self.pending_sub_request[mid]
del self.pending_sub_request[mid]
return True
[docs]
def deregister(self, aid):
"""
:param aid:
:return:
"""
super().deregister(aid)
empty_subscriptions = []
for subscription, aid_set in self.additional_subscriptions.items():
if aid in aid_set:
aid_set.remove(aid)
if len(aid_set) == 0:
empty_subscriptions.append(subscription)
for subscription in empty_subscriptions:
self.additional_subscriptions.pop(subscription)
self.mqtt_client.unsubscribe(topic=subscription)
def _create_mirror_container(self):
return partial(
mqtt_mirror_container_creator,
self.client_id,
self.inbox_topic,
)
[docs]
async def shutdown(self):
"""
Shutdown container, disconnect from broker and stop mqtt thread
"""
await super().shutdown()
# disconnect to broker
if self.mqtt_client is not None:
self.mqtt_client.disconnect()
self.mqtt_client.loop_stop()