mango.container package

mango.container.core module

class mango.container.core.AgentProcessHandle(init: Task, pid: int)

Bases: object

Represents the handle for the agent process. It is awaitable for the initializing of agent process. Further it contains the pid of the process.

init: Task
pid: int
class mango.container.core.BaseContainerProcessManager

Bases: object

Base class for the two different container process manager types: mirror process manager, and main process manager. These process manager change the flow of the container to add the subprocess feature natively in the container itself. For these purposes this base class defines some hook in’s which are commonly used by all implementations.

However, there are some methods exclusive to one of the two types of process managers. These methods will either, return None, or raise a NotImplementedError.

property aids

List of aids living in subprocesses.

create_agent_process(agent_creator, container, mirror_container_creator)

Creates a process with an agent, which is created by agent_creator. Further, the mirror_container_creator will create a mirror container, which replaces the original container for all agents which live in its process.

Parameters:
  • agent_creator (Function(Container)) – function with one argument ‘container’, which creates all agents, which shall live in the process

  • container (Container) – the main container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates a mirror container, given container data and IPC connection data

Raises:

NotImplementedError – generally raised if the manager is a mirror manager

dispatch_to_agent_process(pid: int, coro_func, *args)

Dispatches a coroutine function to another process. The coroutine_func and its arguments are serialized and sent to the agent process, in which it is executed with the Container as first argument (followed by the defined arguments).

Parameters:
  • pid (int) – the pid

  • coro_func (async def) – coro function, which shall be executed

Raises:

NotImplementedError – raises a NotImplementedError if mirror manager

handle_message_in_sp(message, receiver_id, priority, meta)

Called when a message should be handled by the process manager. This happens when the receiver id is unknown to the main container itself.

Parameters:
  • message (Any) – the message

  • receiver_id (str) – aid of the receiver

  • priority (int) – prio

  • meta (dict) – meta

Raises:

NotImplementedError – generally not implemented in mirror container manager

pre_hook_reserve_aid(suggested_aid=None)

Hook in before an aid is reserved. Capable of preventing the default reserve_aid call.

Parameters:

suggested_aid (str, optional) – the aid, defaults to None

Returns:

aid, can be None if the original reserve_aid should be executed

Return type:

str

pre_hook_send_internal_message(message, receiver_id, priority, default_meta)

Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.

Parameters:
  • message (Any) – the message

  • receiver_id (str) – aid

  • priority (0) – prio

  • default_meta (dict) – meta

Returns:

Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in

Return type:

Tuple[Boolean, Queue-like]

async shutdown()

Clean up all process related stuff.

Raises:

NotImplementedError – Should never be raised

class mango.container.core.Container(*, addr, name: str, codec, loop, clock: Clock, copy_internal_messages=False, mirror_data=None, **kwargs)

Bases: ABC

Superclass for a mango container

as_agent_process(agent_creator, mirror_container_creator)

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

deregister_agent(aid)

Deregister an agent

Parameters:

aid

Returns:

dispatch_to_agent_process(pid: int, coro_func, *args)

Dispatch a coroutine function and its necessary arguments to the process ‘pid’. This allows the user to execute arbitrary code in the agent subprocesses.

The coro_func accepts coro_func(Container, *args).

Parameters:
  • pid (int) – the pid in which the coro func should get dispatched

  • coro_func (async function(Container, ...)) – async function(Container, …)

is_aid_available(aid)

Check if the aid is available and allowed. It is not possible to register aids matching the regular pattern “agentX”. :param aid: the aid you want to check :return True if the aid is available, False if it is not

register_agent(agent, suggested_aid: str = None)

Register agent and return the agent id

Parameters:
  • agent – The agent instance

  • suggested_aid – (Optional) suggested aid, if the aid is already taken, a generated aid is used. Using the generated aid-style (“agentX”) is not allowed.

:return The agent ID

async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, is_anonymous_acl=False, **kwargs) bool

The Container sends a message, wrapped in an ACL message, to an agent according the container protocol.

Parameters:
  • content – The content of the message

  • receiver_addr – In case of TCP this is a tuple of host, port In case of MQTT this is the topic to publish to.

  • receiver_id – The agent id of the receiver

  • acl_metadata – metadata for the acl_header.

  • is_anonymous_acl – If set to True, the sender information won’t be written in the ACL header

  • kwargs – Additional parameters to provide protocol specific settings

abstractmethod async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool

The Container sends a message to an agent according the container protocol.

Parameters:
  • content – The content of the message

  • receiver_addr – In case of TCP this is a tuple of host, port In case of MQTT this is the topic to publish to.

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings

async shutdown()

Shutdown all agents in the container and the container itself

class mango.container.core.ContainerData(addr: object, codec: Codec, clock: Clock, kwargs: dict)

Bases: object

Container for the data neccessary for the creation of all container implementations

addr: object
clock: Clock
codec: Codec
kwargs: dict
class mango.container.core.ContainerMirrorData(message_pipe: AioDuplex, event_pipe: AioDuplex, terminate_event: Event, main_queue: Queue)

Bases: object

Container for the data necessary for setting up a mirror container in another process

event_pipe: AioDuplex
main_queue: Queue
message_pipe: AioDuplex
terminate_event: Event
class mango.container.core.IPCEvent(type: IPCEventType, data: object, pid: int)

Bases: object

IPCEvent data container.

data: object
pid: int
type: IPCEventType
class mango.container.core.IPCEventType(iterable, start=0)

Bases: enumerate

Available IPC event types for event process container communication

AIDS = 1
DISPATCH = 2
class mango.container.core.MainContainerProcessManager(container)

Bases: BaseContainerProcessManager

Internal Manager class, responsible for the implementation of operations necessary for the agent processes in the main container.

property aids

List of aids living in subprocesses.

create_agent_process(agent_creator, container, mirror_container_creator)

Creates a process with an agent, which is created by agent_creator. Further, the mirror_container_creator will create a mirror container, which replaces the original container for all agents which live in its process.

Parameters:
  • agent_creator (Function(Container)) – function with one argument ‘container’, which creates all agents, which shall live in the process

  • container (Container) – the main container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates a mirror container, given container data and IPC connection data

Raises:

NotImplementedError – generally raised if the manager is a mirror manager

dispatch_to_agent_process(pid: int, coro_func, *args)

Dispatches a coroutine function to another process. The coroutine_func and its arguments are serialized and sent to the agent process, in which it is executed with the Container as first argument (followed by the defined arguments).

Parameters:
  • pid (int) – the pid

  • coro_func (async def) – coro function, which shall be executed

Raises:

NotImplementedError – raises a NotImplementedError if mirror manager

handle_message_in_sp(message, receiver_id, priority, meta)

Called when a message should be handled by the process manager. This happens when the receiver id is unknown to the main container itself.

Parameters:
  • message (Any) – the message

  • receiver_id (str) – aid of the receiver

  • priority (int) – prio

  • meta (dict) – meta

Raises:

NotImplementedError – generally not implemented in mirror container manager

pre_hook_send_internal_message(message, receiver_id, priority, default_meta)

Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.

Parameters:
  • message (Any) – the message

  • receiver_id (str) – aid

  • priority (0) – prio

  • default_meta (dict) – meta

Returns:

Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in

Return type:

Tuple[Boolean, Queue-like]

async shutdown()

Clean up all process related stuff.

Raises:

NotImplementedError – Should never be raised

class mango.container.core.MirrorContainerProcessManager(container, mirror_data: ContainerMirrorData)

Bases: BaseContainerProcessManager

Internal Manager class, responsible for the implementation of operations necessary for the agent processes in the mirror container.

pre_hook_reserve_aid(suggested_aid=None)

Hook in before an aid is reserved. Capable of preventing the default reserve_aid call.

Parameters:

suggested_aid (str, optional) – the aid, defaults to None

Returns:

aid, can be None if the original reserve_aid should be executed

Return type:

str

pre_hook_send_internal_message(message, receiver_id, priority, default_meta)

Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.

Parameters:
  • message (Any) – the message

  • receiver_id (str) – aid

  • priority (0) – prio

  • default_meta (dict) – meta

Returns:

Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in

Return type:

Tuple[Boolean, Queue-like]

async shutdown()

Clean up all process related stuff.

Raises:

NotImplementedError – Should never be raised

async mango.container.core.cancel_and_wait_for_task(task)

Utility to cancel and wait for a task.

Parameters:

task (asyncio.Task) – task to be canceled

mango.container.core.create_agent_process_environment(container_data: ContainerData, agent_creator, mirror_container_creator, message_pipe: AioDuplex, main_queue: Queue, event_pipe: AioDuplex, terminate_event: Event, process_initialized_event: Event)

Create the agent process environment for using agent subprocesses in a mango container. This routine will create a new event loop and run the so-called agent-loop, which will

  1. initialize the mirror-container and the agents

  2. will wait and return to the event loop until there is a terminate signal
    • while this step, the container and its agents are responsive

  3. shutdown the mirror container

Parameters:
  • container_data (ContainerData) – data for the mirror container creation

  • agent_creator (Function(Container)) – function, which will be called with the mirror container to create and initialize all agents

  • mirror_container_creator (Function(ContainerData, AsyncIoEventLoop, AioDuplex, AioDuplex, )) – function, which will create a mirror container

  • message_pipe (AioDuplex) – Pipe for messages

  • event_pipe (AioDuplex) – Pipe for events

  • terminate_event (Event) – Event which signals termination of the main container

  • process_initialized_event (Event) – Event signaling to the main container, that the environment is done with initializing

mango.container.external_coupling module

class mango.container.external_coupling.ExternalAgentMessage(message: bytes, time: float, receiver: str)

Bases: object

message: bytes
receiver: str
time: float
class mango.container.external_coupling.ExternalSchedulingContainer(*, addr: str, codec: Codec, loop: AbstractEventLoop, clock: ExternalClock = None, **kwargs)

Bases: Container

as_agent_process(agent_creator, mirror_container_creator=<function ext_mirror_container_creator>)

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool

The Container sends a message to an agent according the container protocol.

Parameters:
  • content – The content of the message

  • receiver_addr – Address if the receiving container

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings

async shutdown()

calls shutdown() from super class Container

async step(simulation_time: float, incoming_messages: List[bytes]) ExternalSchedulingContainerOutput
class mango.container.external_coupling.ExternalSchedulingContainerOutput(duration: float, messages: List[mango.container.external_coupling.ExternalAgentMessage], next_activity: float | None)

Bases: object

duration: float
messages: List[ExternalAgentMessage]
next_activity: float | None
mango.container.external_coupling.ext_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)

mango.container.factory module

async mango.container.factory.create(*, connection_type: str = 'tcp', codec: Codec = None, clock: Clock = None, addr: str | Tuple[str, int] | None = None, copy_internal_messages: bool = False, mqtt_kwargs: Dict[str, Any] = None, **kwargs: Dict[str, Any]) Container

This method is called to instantiate a container instance, either a TCPContainer or a MQTTContainer, depending on the parameter connection_type.

Parameters:
  • connection_type – Defines the connection type. So far only ‘tcp’ or ‘mqtt’ are allowed

  • codec – Defines the codec to use. Defaults to JSON

  • clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock

  • addr – the address to use. If connection_type == ‘tcp’: it has to be a tuple of (host, port). If connection_type == ‘mqtt’ this can optionally define an inbox_topic that is used similarly than a tcp address.

  • mqtt_kwargs – Dictionary of keyword arguments for connection to a mqtt broker. At least the keys ‘broker_addr’ and ‘client_id’ have to be provided. Ignored if connection_type != ‘mqtt’

Returns:

The instance of a MQTTContainer or a TCPContainer

mango.container.mqtt module

class mango.container.mqtt.MQTTContainer(*, client_id: str, addr: str | None, loop: AbstractEventLoop, clock: Clock, mqtt_client: Client, codec: Codec, **kwargs)

Bases: Container

Container for agents.

The container allows its agents to send messages to specific topics (via send_message()).

as_agent_process(agent_creator, mirror_container_creator=None)

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

decode_mqtt_message(*, topic, payload)

deserializes a mqtt message. Checks if for the topic a special class is defined, otherwise assumes an ACLMessage :param topic: the topic on which the message arrived :param payload: the serialized message :return: content and meta

deregister_agent(aid)
Parameters:

aid

Returns:

async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)

The container sends the message of one of its own agents to a specific topic.

Parameters:
  • content – The content of the message

  • receiver_addr – The topic to publish to.

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings Possible fields: qos: The quality of service to use for publishing retain: Indicates, weather the retain flag should be set Ignored if connection_type != ‘mqtt’

async shutdown()

Shutdown container, disconnect from broker and stop mqtt thread

async subscribe_for_agent(*, aid: str, topic: str, qos: int = 0) bool
Parameters:
  • aid – aid of the corresponding agent

  • topic – topic to subscribe (wildcards are allowed)

  • qos – The quality of service for the subscription

Returns:

A boolean signaling if subscription was true or not

mango.container.mqtt.mqtt_mirror_container_creator(client_id, mqtt_client, container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)

mango.container.protocol module

class mango.container.protocol.ContainerProtocol(*, container, loop, codec)

Bases: Protocol

Protocol for implementing the TCP Container connection. Internally reads the asyncio transport object into a buffer and moves the read messages async to the container inbox.

connection_lost(exc)
Parameters:

exc

Returns:

connection_made(transport)
Parameters:

transport

data_received(data)
Parameters:

data

Returns:

eof_received()
Returns:

async shutdown()

Will close the transport and stop the writing task :return:

write(msg_payload)

Write the message (as bytes) to the connection.

Parameters:

msg_payload – message payload

Returns:

mango.container.tcp module

This module contains the abstract Container class and the subclasses TCPContainer and MQTTContainer

class mango.container.tcp.TCPConnectionPool(asyncio_loop, ttl_in_sec: float = 30.0, max_connections_per_target: int = 10)

Bases: object

Pool of async tcp connections. Is able to create arbitrary connections and manages them. This makes reusing connections possible. To obtain a connection, use obtain_connection. When you are done with the connection, use release_connection.

There are two parameters to be set, ttl_in_sec (time to live for a connection), max_connections_per_target (max number of connections per connection target).

async obtain_connection(host: str, port: int, protocol: ContainerProtocol)

Obtain a connection from the pool. If no connection is available, a new connection is created.

Parameters:
  • host (str) – the host

  • port (int) – the port

  • protocol (ContainerProtocol) – ContainerProtocol

Returns:

connection

Return type:

ContainerProtocol with open transport object

async release_connection(host: str, port: int, connection)

Release the connection to the pool. Have to be called after a connection is obtained, otherwise the connection can never return to the pool.

Parameters:
  • host (str) – the host

  • port (int) – the port

  • connection (ContainerProtocol) – the connection

async shutdown()
class mango.container.tcp.TCPContainer(*, addr: Tuple[str, int], codec: Codec, loop: AbstractEventLoop, clock: Clock, **kwargs)

Bases: Container

This is a container that communicate directly with other containers via tcp

as_agent_process(agent_creator, mirror_container_creator=<function tcp_mirror_container_creator>)

Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.

Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).

Parameters:
  • agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container

  • mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container

Returns:

a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.

Return type:

AgentProcessHandle

async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool

The Container sends a message to an agent using TCP.

Parameters:
  • content – The content of the message

  • receiver_addr – Tuple of host, port

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings

async setup()
async shutdown()

calls shutdown() from super class Container and closes the server

mango.container.tcp.tcp_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)