Container API¶
Container core¶
- class mango.container.core.Container(*, addr, name: str, codec, clock: Clock, copy_internal_messages=False, mirror_data=None, mp_method='spawn', **kwargs)[source]¶
Bases:
ABC
Superclass for a mango container
- async as_agent_process(agent_creator, mirror_container_creator=None)[source]¶
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
AgentProcessHandle
- as_agent_process_lazy(agent_creator, mirror_container_creator=None)[source]¶
Similar to as_agent_process, but does not wait for the agent process to be initialized. Does also not need a running event loop, making it suitable to add agent processes without an asyncio context.
- dispatch_to_agent_process(pid: int, coro_func, *args)[source]¶
Dispatch a coroutine function and its necessary arguments to the process ‘pid’. This allows the user to execute arbitrary code in the agent subprocesses.
The
coro_func
acceptscoro_func(Container, *args)
.- Parameters:
pid (int) – the pid in which the coro func should get dispatched
coro_func (async function(Container, ...)) – async function(Container, …)
- is_aid_available(aid)[source]¶
Check if the aid is available and allowed. It is not possible to register aids matching the regular pattern “agentX”. :param aid: the aid you want to check :return True if the aid is available, False if it is not
- register(agent: Agent, suggested_aid: str = None)[source]¶
Register agent and return the agent id
- Parameters:
agent – The agent instance
suggested_aid – (Optional) suggested aid, if the aid is already taken, a generated aid is used. Using the generated aid-style (“agentX”) is not allowed.
:return The agent ID
- abstractmethod async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool [source]¶
The Container sends a message to an agent according the container protocol.
- Parameters:
content – The content of the message
receiver_addr – The address the message is sent to, should be constructed using agent_address(protocol_addr, aid) or address(agent) on sending messages, and sender_address(meta) on replying to messages.
kwargs – Can contain additional meta information
Container external coupling¶
- class mango.container.external_coupling.ExternalAgentMessage(message: bytes, time: float, receiver: str)[source]¶
Bases:
object
- message: bytes¶
- receiver: str¶
- time: float¶
- class mango.container.external_coupling.ExternalSchedulingContainer(*, addr: str, codec: Codec, clock: ExternalClock = None, **kwargs)[source]¶
Bases:
Container
- async send_message(content, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool [source]¶
The Container sends a message to an agent according the container protocol.
- Parameters:
content – The content of the message
receiver_addr – Address of the receiving container
kwargs – Additional parameters to provide protocol specific settings
- async step(simulation_time: float, incoming_messages: list[bytes]) ExternalSchedulingContainerOutput [source]¶
- class mango.container.external_coupling.ExternalSchedulingContainerOutput(duration: float, messages: list[mango.container.external_coupling.ExternalAgentMessage], next_activity: None | float)[source]¶
Bases:
object
- duration: float¶
- messages: list[ExternalAgentMessage]¶
- next_activity: None | float¶
Container creation¶
- mango.container.factory.create_external_coupling(codec: Codec = None, clock: Clock = None, addr: None | str | tuple[str, int] = None, **kwargs: dict[str, Any])[source]¶
- mango.container.factory.create_mqtt(broker_addr: tuple | dict | str, client_id: str, codec: Codec = None, clock: Clock = None, inbox_topic: str | None = None, copy_internal_messages: bool = False, **kwargs)[source]¶
This method is called to instantiate an MQTT container
- Parameters:
broker_addr – The address of the broker this container will connect to. it has to be a tuple of (host, port).
client_id – The id of the MQTT Client
codec – Defines the codec to use. Defaults to JSON
clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock
inbox_topic – Default subscription to the a specific MQTT topic
copy_internal_messages – Explicitly copy internal messages. Defaults to False
- Returns:
The instance of a MQTTContainer
- mango.container.factory.create_tcp(addr: str | tuple[str, int], codec: Codec = None, clock: Clock = None, copy_internal_messages: bool = False, auto_port=False, **kwargs: dict[str, Any]) Container [source]¶
This method is called to instantiate a tcp container
- Parameters:
addr – The address to use. it has to be a tuple of (host, port).
codec – Defines the codec to use. Defaults to JSON
clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock
copy_internal_messages – Explicitly copy internal messages. Defaults to False
auto_port – Whether you want to let the operating system pick the port. Defaults to False
- Returns:
The instance of a TCPContainer
MQTT Container¶
- class mango.container.mqtt.MQTTContainer(*, client_id: str, broker_addr: tuple | dict | str, clock: Clock, codec: Codec, inbox_topic: None | str = None, **kwargs)[source]¶
Bases:
Container
Container for agents.
The container allows its agents to send messages to specific topics (via
send_message()
).- decode_mqtt_message(*, topic, payload)[source]¶
deserializes a mqtt message. Checks if for the topic a special class is defined, otherwise assumes an ACLMessage :param topic: the topic on which the message arrived :param payload: the serialized message :return: content and meta
- async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool [source]¶
The container sends the message of one of its own agents to a specific topic.
- Parameters:
content – The content of the message
receiver_addr – The topic to publish to.
sender_id – The sender aid
kwargs – Additional parameters to provide protocol specific settings Possible fields: qos: The quality of service to use for publishing retain: Indicates, weather the retain flag should be set Ignored if connection_type != ‘mqtt’
- async subscribe_for_agent(*, aid: str, topic: str, qos: int = 2) bool [source]¶
- Parameters:
aid – aid of the corresponding agent
topic – topic to subscribe (wildcards are allowed)
qos – The maximum quality of service this subscription supports
- Returns:
A boolean signaling if subscription was true or not
TCP Container¶
This module contains the abstract Container class and the subclasses TCPContainer and MQTTContainer
- class mango.container.tcp.TCPConnectionPool(ttl_in_sec: float = 30.0, max_connections_per_target: int = 10)[source]¶
Bases:
object
Pool of async tcp connections. Is able to create arbitrary connections and manages them. This makes reusing connections possible. To obtain a connection, use obtain_connection. When you are done with the connection, use release_connection.
There are two parameters to be set, ttl_in_sec (time to live for a connection), max_connections_per_target (max number of connections per connection target).
- async obtain_connection(host: str, port: int, protocol: ContainerProtocol)[source]¶
Obtain a connection from the pool. If no connection is available, a new connection is created.
- Parameters:
host (str) – the host
port (int) – the port
protocol (ContainerProtocol) – ContainerProtocol
- Returns:
connection
- Return type:
ContainerProtocol with open transport object
- async release_connection(host: str, port: int, connection)[source]¶
Release the connection to the pool. Have to be called after a connection is obtained, otherwise the connection can never return to the pool.
- Parameters:
host (str) – the host
port (int) – the port
connection (ContainerProtocol) – the connection
- class mango.container.tcp.TCPContainer(*, addr: tuple[str, int], codec: Codec, clock: Clock, **kwargs)[source]¶
Bases:
Container
This is a container that communicate directly with other containers via tcp
- as_agent_process(agent_creator, mirror_container_creator=<function tcp_mirror_container_creator>)[source]¶
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
AgentProcessHandle
- async send_message(content: Any, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool [source]¶
The Container sends a message to an agent using TCP.
- Parameters:
content – The content of the message
receiver_addr – Tuple of host, port
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings