Container API

Container core

class mango.container.core.Container(*, addr, name: str, codec, clock: Clock, copy_internal_messages=False, mirror_data=None, mp_method='spawn', **kwargs)[source]

Bases: ABC

Superclass for a mango container

async as_agent_process(agent_creator, mirror_container_creator=None)[source]

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

as_agent_process_lazy(agent_creator, mirror_container_creator=None)[source]

Similar to as_agent_process, but does not wait for the agent process to be initialized. Does also not need a running event loop, making it suitable to add agent processes without an asyncio context.

deregister(aid)[source]

Deregister an agent

Parameters:

aid

Returns:

dispatch_to_agent_process(pid: int, coro_func, *args)[source]

Dispatch a coroutine function and its necessary arguments to the process ‘pid’. This allows the user to execute arbitrary code in the agent subprocesses.

The coro_func accepts coro_func(Container, *args).

Parameters:
  • pid (int) – the pid in which the coro func should get dispatched

  • coro_func (async function(Container, ...)) – async function(Container, …)

is_aid_available(aid)[source]

Check if the aid is available and allowed. It is not possible to register aids matching the regular pattern “agentX”. :param aid: the aid you want to check :return True if the aid is available, False if it is not

on_ready()[source]
register(agent: Agent, suggested_aid: str = None)[source]

Register agent and return the agent id

Parameters:
  • agent – The agent instance

  • suggested_aid – (Optional) suggested aid, if the aid is already taken, a generated aid is used. Using the generated aid-style (“agentX”) is not allowed.

:return The agent ID

abstractmethod async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool[source]

The Container sends a message to an agent according the container protocol.

Parameters:
  • content – The content of the message

  • receiver_addr – The address the message is sent to, should be constructed using agent_address(protocol_addr, aid) or address(agent) on sending messages, and sender_address(meta) on replying to messages.

  • kwargs – Can contain additional meta information

async shutdown()[source]

Shutdown all agents in the container and the container itself

async start()[source]

Container external coupling

class mango.container.external_coupling.ExternalAgentMessage(message: bytes, time: float, receiver: str)[source]

Bases: object

message: bytes
receiver: str
time: float
class mango.container.external_coupling.ExternalSchedulingContainer(*, addr: str, codec: Codec, clock: ExternalClock = None, **kwargs)[source]

Bases: Container

async send_message(content, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool[source]

The Container sends a message to an agent according the container protocol.

Parameters:
  • content – The content of the message

  • receiver_addr – Address of the receiving container

  • kwargs – Additional parameters to provide protocol specific settings

async step(simulation_time: float, incoming_messages: list[bytes]) ExternalSchedulingContainerOutput[source]
class mango.container.external_coupling.ExternalSchedulingContainerOutput(duration: float, messages: list[mango.container.external_coupling.ExternalAgentMessage], next_activity: None | float)[source]

Bases: object

duration: float
messages: list[ExternalAgentMessage]
next_activity: None | float
mango.container.external_coupling.ext_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)[source]

Container creation

mango.container.factory.create_external_coupling(codec: Codec = None, clock: Clock = None, addr: None | str | tuple[str, int] = None, **kwargs: dict[str, Any])[source]
mango.container.factory.create_mqtt(broker_addr: tuple | dict | str, client_id: str, codec: Codec = None, clock: Clock = None, inbox_topic: str | None = None, copy_internal_messages: bool = False, **kwargs)[source]

This method is called to instantiate an MQTT container

Parameters:
  • broker_addr – The address of the broker this container will connect to. it has to be a tuple of (host, port).

  • client_id – The id of the MQTT Client

  • codec – Defines the codec to use. Defaults to JSON

  • clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock

  • inbox_topic – Default subscription to the a specific MQTT topic

  • copy_internal_messages – Explicitly copy internal messages. Defaults to False

Returns:

The instance of a MQTTContainer

mango.container.factory.create_tcp(addr: str | tuple[str, int], codec: Codec = None, clock: Clock = None, copy_internal_messages: bool = False, auto_port=False, **kwargs: dict[str, Any]) Container[source]

This method is called to instantiate a tcp container

Parameters:
  • addr – The address to use. it has to be a tuple of (host, port).

  • codec – Defines the codec to use. Defaults to JSON

  • clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock

  • copy_internal_messages – Explicitly copy internal messages. Defaults to False

  • auto_port – Whether you want to let the operating system pick the port. Defaults to False

Returns:

The instance of a TCPContainer

MQTT Container

class mango.container.mqtt.MQTTContainer(*, client_id: str, broker_addr: tuple | dict | str, clock: Clock, codec: Codec, inbox_topic: None | str = None, **kwargs)[source]

Bases: Container

Container for agents.

The container allows its agents to send messages to specific topics (via send_message()).

decode_mqtt_message(*, topic, payload)[source]

deserializes a mqtt message. Checks if for the topic a special class is defined, otherwise assumes an ACLMessage :param topic: the topic on which the message arrived :param payload: the serialized message :return: content and meta

deregister(aid)[source]
Parameters:

aid

Returns:

async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool[source]

The container sends the message of one of its own agents to a specific topic.

Parameters:
  • content – The content of the message

  • receiver_addr – The topic to publish to.

  • sender_id – The sender aid

  • kwargs – Additional parameters to provide protocol specific settings Possible fields: qos: The quality of service to use for publishing retain: Indicates, weather the retain flag should be set Ignored if connection_type != ‘mqtt’

async shutdown()[source]

Shutdown container, disconnect from broker and stop mqtt thread

async start()[source]
async subscribe_for_agent(*, aid: str, topic: str, qos: int = 2) bool[source]
Parameters:
  • aid – aid of the corresponding agent

  • topic – topic to subscribe (wildcards are allowed)

  • qos – The maximum quality of service this subscription supports

Returns:

A boolean signaling if subscription was true or not

mango.container.mqtt.mqtt_mirror_container_creator(client_id, inbox_topic, container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)[source]

TCP Container

This module contains the abstract Container class and the subclasses TCPContainer and MQTTContainer

class mango.container.tcp.TCPConnectionPool(ttl_in_sec: float = 30.0, max_connections_per_target: int = 10)[source]

Bases: object

Pool of async tcp connections. Is able to create arbitrary connections and manages them. This makes reusing connections possible. To obtain a connection, use obtain_connection. When you are done with the connection, use release_connection.

There are two parameters to be set, ttl_in_sec (time to live for a connection), max_connections_per_target (max number of connections per connection target).

async obtain_connection(host: str, port: int, protocol: ContainerProtocol)[source]

Obtain a connection from the pool. If no connection is available, a new connection is created.

Parameters:
  • host (str) – the host

  • port (int) – the port

  • protocol (ContainerProtocol) – ContainerProtocol

Returns:

connection

Return type:

ContainerProtocol with open transport object

async release_connection(host: str, port: int, connection)[source]

Release the connection to the pool. Have to be called after a connection is obtained, otherwise the connection can never return to the pool.

Parameters:
  • host (str) – the host

  • port (int) – the port

  • connection (ContainerProtocol) – the connection

async shutdown()[source]
class mango.container.tcp.TCPContainer(*, addr: tuple[str, int], codec: Codec, clock: Clock, **kwargs)[source]

Bases: Container

This is a container that communicate directly with other containers via tcp

as_agent_process(agent_creator, mirror_container_creator=<function tcp_mirror_container_creator>)[source]

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool[source]

The Container sends a message to an agent using TCP.

Parameters:
  • content – The content of the message

  • receiver_addr – Tuple of host, port

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings

async shutdown()[source]

calls shutdown() from super class Container and closes the server

async start()[source]
mango.container.tcp.tcp_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)[source]