Source code for mango.container.factory

import logging
from typing import Any

from mango.container.core import Container
from mango.container.external_coupling import ExternalSchedulingContainer
from mango.container.mqtt import MQTTContainer
from mango.container.tcp import TCPContainer
from mango.messages.codecs import JSON

from ..messages.codecs import Codec
from ..util.clock import AsyncioClock, Clock, ExternalClock

logger = logging.getLogger(__name__)

TCP_CONNECTION = "tcp"
MQTT_CONNECTION = "mqtt"
EXTERNAL_CONNECTION = "external_connection"


[docs] def create_mqtt( broker_addr: tuple | dict | str, client_id: str, codec: Codec = None, clock: Clock = None, inbox_topic: str | None = None, copy_internal_messages: bool = False, **kwargs, ): """ This method is called to instantiate an MQTT container :param broker_addr: The address of the broker this container will connect to. it has to be a tuple of (host, port). :param client_id: The id of the MQTT Client :param codec: Defines the codec to use. Defaults to JSON :param clock: The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock :param inbox_topic: Default subscription to the a specific MQTT topic :param copy_internal_messages: Explicitly copy internal messages. Defaults to False :return: The instance of a MQTTContainer """ if codec is None: codec = JSON() if clock is None: clock = AsyncioClock() return MQTTContainer( client_id=client_id, broker_addr=broker_addr, clock=clock, codec=codec, inbox_topic=inbox_topic, copy_internal_messages=copy_internal_messages, **kwargs, )
[docs] def create_external_coupling( codec: Codec = None, clock: Clock = None, addr: None | str | tuple[str, int] = None, **kwargs: dict[str, Any], ): if codec is None: codec = JSON() if clock is None: clock = ExternalClock() return ExternalSchedulingContainer(addr=addr, codec=codec, clock=clock, **kwargs)
[docs] def create_tcp( addr: str | tuple[str, int], codec: Codec = None, clock: Clock = None, copy_internal_messages: bool = False, auto_port=False, **kwargs: dict[str, Any], ) -> Container: """ This method is called to instantiate a tcp container :param addr: The address to use. it has to be a tuple of (host, port). :param codec: Defines the codec to use. Defaults to JSON :param clock: The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock :param copy_internal_messages: Explicitly copy internal messages. Defaults to False :param auto_port: Whether you want to let the operating system pick the port. Defaults to False :return: The instance of a TCPContainer """ if codec is None: codec = JSON() if clock is None: clock = AsyncioClock() if isinstance(addr, str): host, port = addr.split(":") addr = (host, int(port)) # initialize TCPContainer return TCPContainer( addr=(addr[0], 0) if auto_port else addr, codec=codec, clock=clock, copy_internal_messages=copy_internal_messages, **kwargs, )