mango.container package
mango.container.core module
- class mango.container.core.AgentProcessHandle(init: Task, pid: int)
Bases:
objectRepresents the handle for the agent process. It is awaitable for the initializing of agent process. Further it contains the pid of the process.
- init: Task
- pid: int
- class mango.container.core.BaseContainerProcessManager
Bases:
objectBase class for the two different container process manager types: mirror process manager, and main process manager. These process manager change the flow of the container to add the subprocess feature natively in the container itself. For these purposes this base class defines some hook in’s which are commonly used by all implementations.
However, there are some methods exclusive to one of the two types of process managers. These methods will either, return None, or raise a NotImplementedError.
- property aids
List of aids living in subprocesses.
- create_agent_process(agent_creator, container, mirror_container_creator)
Creates a process with an agent, which is created by agent_creator. Further, the mirror_container_creator will create a mirror container, which replaces the original container for all agents which live in its process.
- Parameters:
agent_creator (Function(Container)) – function with one argument ‘container’, which creates all agents, which shall live in the process
container (Container) – the main container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates a mirror container, given container data and IPC connection data
- Raises:
NotImplementedError – generally raised if the manager is a mirror manager
- dispatch_to_agent_process(pid: int, coro_func, *args)
Dispatches a coroutine function to another process. The coroutine_func and its arguments are serialized and sent to the agent process, in which it is executed with the Container as first argument (followed by the defined arguments).
- Parameters:
pid (int) – the pid
coro_func (async def) – coro function, which shall be executed
- Raises:
NotImplementedError – raises a NotImplementedError if mirror manager
- handle_message_in_sp(message, receiver_id, priority, meta)
Called when a message should be handled by the process manager. This happens when the receiver id is unknown to the main container itself.
- Parameters:
message (Any) – the message
receiver_id (str) – aid of the receiver
priority (int) – prio
meta (dict) – meta
- Raises:
NotImplementedError – generally not implemented in mirror container manager
- pre_hook_reserve_aid(suggested_aid=None)
Hook in before an aid is reserved. Capable of preventing the default reserve_aid call.
- Parameters:
suggested_aid (str, optional) – the aid, defaults to None
- Returns:
aid, can be None if the original reserve_aid should be executed
- Return type:
str
- pre_hook_send_internal_message(message, receiver_id, priority, default_meta)
Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.
- Parameters:
message (Any) – the message
receiver_id (str) – aid
priority (0) – prio
default_meta (dict) – meta
- Returns:
Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in
- Return type:
Tuple[Boolean, Queue-like]
- async shutdown()
Clean up all process related stuff.
- Raises:
NotImplementedError – Should never be raised
- class mango.container.core.Container(*, addr, name: str, codec, loop, clock: Clock, copy_internal_messages=False, mirror_data=None, **kwargs)
Bases:
ABCSuperclass for a mango container
- as_agent_process(agent_creator, mirror_container_creator)
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
- deregister_agent(aid)
Deregister an agent
- Parameters:
aid
- Returns:
- dispatch_to_agent_process(pid: int, coro_func, *args)
Dispatch a coroutine function and its necessary arguments to the process ‘pid’. This allows the user to execute arbitrary code in the agent subprocesses.
The
coro_funcacceptscoro_func(Container, *args).- Parameters:
pid (int) – the pid in which the coro func should get dispatched
coro_func (async function(Container, ...)) – async function(Container, …)
- is_aid_available(aid)
Check if the aid is available and allowed. It is not possible to register aids matching the regular pattern “agentX”. :param aid: the aid you want to check :return True if the aid is available, False if it is not
- register_agent(agent, suggested_aid: str = None)
Register agent and return the agent id
- Parameters:
agent – The agent instance
suggested_aid – (Optional) suggested aid, if the aid is already taken, a generated aid is used. Using the generated aid-style (“agentX”) is not allowed.
:return The agent ID
- async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, is_anonymous_acl=False, **kwargs) bool
The Container sends a message, wrapped in an ACL message, to an agent according the container protocol.
- Parameters:
content – The content of the message
receiver_addr – In case of TCP this is a tuple of host, port In case of MQTT this is the topic to publish to.
receiver_id – The agent id of the receiver
acl_metadata – metadata for the acl_header.
is_anonymous_acl – If set to True, the sender information won’t be written in the ACL header
kwargs – Additional parameters to provide protocol specific settings
- abstractmethod async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool
The Container sends a message to an agent according the container protocol.
- Parameters:
content – The content of the message
receiver_addr – In case of TCP this is a tuple of host, port In case of MQTT this is the topic to publish to.
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings
- async shutdown()
Shutdown all agents in the container and the container itself
- class mango.container.core.ContainerData(addr: object, codec: Codec, clock: Clock, kwargs: dict)
Bases:
objectContainer for the data neccessary for the creation of all container implementations
- addr: object
- kwargs: dict
- class mango.container.core.ContainerMirrorData(message_pipe: AioDuplex, event_pipe: AioDuplex, terminate_event: Event, main_queue: Queue)
Bases:
objectContainer for the data necessary for setting up a mirror container in another process
- main_queue: Queue
- terminate_event: Event
- class mango.container.core.IPCEvent(type: IPCEventType, data: object, pid: int)
Bases:
objectIPCEvent data container.
- data: object
- pid: int
- type: IPCEventType
- class mango.container.core.IPCEventType(iterable, start=0)
Bases:
enumerateAvailable IPC event types for event process container communication
- AIDS = 1
- DISPATCH = 2
- class mango.container.core.MainContainerProcessManager(container)
Bases:
BaseContainerProcessManagerInternal Manager class, responsible for the implementation of operations necessary for the agent processes in the main container.
- property aids
List of aids living in subprocesses.
- create_agent_process(agent_creator, container, mirror_container_creator)
Creates a process with an agent, which is created by agent_creator. Further, the mirror_container_creator will create a mirror container, which replaces the original container for all agents which live in its process.
- Parameters:
agent_creator (Function(Container)) – function with one argument ‘container’, which creates all agents, which shall live in the process
container (Container) – the main container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates a mirror container, given container data and IPC connection data
- Raises:
NotImplementedError – generally raised if the manager is a mirror manager
- dispatch_to_agent_process(pid: int, coro_func, *args)
Dispatches a coroutine function to another process. The coroutine_func and its arguments are serialized and sent to the agent process, in which it is executed with the Container as first argument (followed by the defined arguments).
- Parameters:
pid (int) – the pid
coro_func (async def) – coro function, which shall be executed
- Raises:
NotImplementedError – raises a NotImplementedError if mirror manager
- handle_message_in_sp(message, receiver_id, priority, meta)
Called when a message should be handled by the process manager. This happens when the receiver id is unknown to the main container itself.
- Parameters:
message (Any) – the message
receiver_id (str) – aid of the receiver
priority (int) – prio
meta (dict) – meta
- Raises:
NotImplementedError – generally not implemented in mirror container manager
- pre_hook_send_internal_message(message, receiver_id, priority, default_meta)
Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.
- Parameters:
message (Any) – the message
receiver_id (str) – aid
priority (0) – prio
default_meta (dict) – meta
- Returns:
Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in
- Return type:
Tuple[Boolean, Queue-like]
- async shutdown()
Clean up all process related stuff.
- Raises:
NotImplementedError – Should never be raised
- class mango.container.core.MirrorContainerProcessManager(container, mirror_data: ContainerMirrorData)
Bases:
BaseContainerProcessManagerInternal Manager class, responsible for the implementation of operations necessary for the agent processes in the mirror container.
- pre_hook_reserve_aid(suggested_aid=None)
Hook in before an aid is reserved. Capable of preventing the default reserve_aid call.
- Parameters:
suggested_aid (str, optional) – the aid, defaults to None
- Returns:
aid, can be None if the original reserve_aid should be executed
- Return type:
str
- pre_hook_send_internal_message(message, receiver_id, priority, default_meta)
Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects.
- Parameters:
message (Any) – the message
receiver_id (str) – aid
priority (0) – prio
default_meta (dict) – meta
- Returns:
Tuple, first the status (True, False = successful, unsuccessful and prevent the original send_internal_message, None = Continue original call), second the Queue-like inbox, in which the message should be redirected in
- Return type:
Tuple[Boolean, Queue-like]
- async shutdown()
Clean up all process related stuff.
- Raises:
NotImplementedError – Should never be raised
- async mango.container.core.cancel_and_wait_for_task(task)
Utility to cancel and wait for a task.
- Parameters:
task (asyncio.Task) – task to be canceled
- mango.container.core.create_agent_process_environment(container_data: ContainerData, agent_creator, mirror_container_creator, message_pipe: AioDuplex, main_queue: Queue, event_pipe: AioDuplex, terminate_event: Event, process_initialized_event: Event)
Create the agent process environment for using agent subprocesses in a mango container. This routine will create a new event loop and run the so-called agent-loop, which will
initialize the mirror-container and the agents
- will wait and return to the event loop until there is a terminate signal
while this step, the container and its agents are responsive
shutdown the mirror container
- Parameters:
container_data (ContainerData) – data for the mirror container creation
agent_creator (Function(Container)) – function, which will be called with the mirror container to create and initialize all agents
mirror_container_creator (Function(ContainerData, AsyncIoEventLoop, AioDuplex, AioDuplex, )) – function, which will create a mirror container
message_pipe (AioDuplex) – Pipe for messages
event_pipe (AioDuplex) – Pipe for events
terminate_event (Event) – Event which signals termination of the main container
process_initialized_event (Event) – Event signaling to the main container, that the environment is done with initializing
mango.container.external_coupling module
- class mango.container.external_coupling.ExternalAgentMessage(message: bytes, time: float, receiver: str)
Bases:
object- message: bytes
- receiver: str
- time: float
- class mango.container.external_coupling.ExternalSchedulingContainer(*, addr: str, codec: Codec, loop: AbstractEventLoop, clock: ExternalClock = None, **kwargs)
Bases:
Container- as_agent_process(agent_creator, mirror_container_creator=<function ext_mirror_container_creator>)
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
- async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool
The Container sends a message to an agent according the container protocol.
- Parameters:
content – The content of the message
receiver_addr – Address if the receiving container
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings
- async shutdown()
calls shutdown() from super class Container
- async step(simulation_time: float, incoming_messages: List[bytes]) ExternalSchedulingContainerOutput
- class mango.container.external_coupling.ExternalSchedulingContainerOutput(duration: float, messages: List[mango.container.external_coupling.ExternalAgentMessage], next_activity: float | None)
Bases:
object- duration: float
- messages: List[ExternalAgentMessage]
- next_activity: float | None
- mango.container.external_coupling.ext_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)
mango.container.factory module
- async mango.container.factory.create(*, connection_type: str = 'tcp', codec: Codec = None, clock: Clock = None, addr: str | Tuple[str, int] | None = None, copy_internal_messages: bool = False, mqtt_kwargs: Dict[str, Any] = None, **kwargs: Dict[str, Any]) Container
This method is called to instantiate a container instance, either a TCPContainer or a MQTTContainer, depending on the parameter connection_type.
- Parameters:
connection_type – Defines the connection type. So far only ‘tcp’ or ‘mqtt’ are allowed
codec – Defines the codec to use. Defaults to JSON
clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock
addr – the address to use. If connection_type == ‘tcp’: it has to be a tuple of (host, port). If connection_type == ‘mqtt’ this can optionally define an inbox_topic that is used similarly than a tcp address.
mqtt_kwargs – Dictionary of keyword arguments for connection to a mqtt broker. At least the keys ‘broker_addr’ and ‘client_id’ have to be provided. Ignored if connection_type != ‘mqtt’
- Returns:
The instance of a MQTTContainer or a TCPContainer
mango.container.mqtt module
- class mango.container.mqtt.MQTTContainer(*, client_id: str, addr: str | None, loop: AbstractEventLoop, clock: Clock, mqtt_client: Client, codec: Codec, **kwargs)
Bases:
ContainerContainer for agents.
The container allows its agents to send messages to specific topics (via
send_message()).- as_agent_process(agent_creator, mirror_container_creator=None)
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
- decode_mqtt_message(*, topic, payload)
deserializes a mqtt message. Checks if for the topic a special class is defined, otherwise assumes an ACLMessage :param topic: the topic on which the message arrived :param payload: the serialized message :return: content and meta
- deregister_agent(aid)
- Parameters:
aid
- Returns:
- async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)
The container sends the message of one of its own agents to a specific topic.
- Parameters:
content – The content of the message
receiver_addr – The topic to publish to.
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings Possible fields: qos: The quality of service to use for publishing retain: Indicates, weather the retain flag should be set Ignored if connection_type != ‘mqtt’
- async shutdown()
Shutdown container, disconnect from broker and stop mqtt thread
- async subscribe_for_agent(*, aid: str, topic: str, qos: int = 0) bool
- Parameters:
aid – aid of the corresponding agent
topic – topic to subscribe (wildcards are allowed)
qos – The quality of service for the subscription
- Returns:
A boolean signaling if subscription was true or not
- mango.container.mqtt.mqtt_mirror_container_creator(client_id, mqtt_client, container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)
mango.container.protocol module
- class mango.container.protocol.ContainerProtocol(*, container, loop, codec)
Bases:
ProtocolProtocol for implementing the TCP Container connection. Internally reads the asyncio transport object into a buffer and moves the read messages async to the container inbox.
- connection_lost(exc)
- Parameters:
exc
- Returns:
- connection_made(transport)
- Parameters:
transport
- data_received(data)
- Parameters:
data
- Returns:
- eof_received()
- Returns:
- async shutdown()
Will close the transport and stop the writing task :return:
- write(msg_payload)
Write the message (as bytes) to the connection.
- Parameters:
msg_payload – message payload
- Returns:
mango.container.tcp module
This module contains the abstract Container class and the subclasses TCPContainer and MQTTContainer
- class mango.container.tcp.TCPConnectionPool(asyncio_loop, ttl_in_sec: float = 30.0, max_connections_per_target: int = 10)
Bases:
objectPool of async tcp connections. Is able to create arbitrary connections and manages them. This makes reusing connections possible. To obtain a connection, use obtain_connection. When you are done with the connection, use release_connection.
There are two parameters to be set, ttl_in_sec (time to live for a connection), max_connections_per_target (max number of connections per connection target).
- async obtain_connection(host: str, port: int, protocol: ContainerProtocol)
Obtain a connection from the pool. If no connection is available, a new connection is created.
- Parameters:
host (str) – the host
port (int) – the port
protocol (ContainerProtocol) – ContainerProtocol
- Returns:
connection
- Return type:
ContainerProtocol with open transport object
- async release_connection(host: str, port: int, connection)
Release the connection to the pool. Have to be called after a connection is obtained, otherwise the connection can never return to the pool.
- Parameters:
host (str) – the host
port (int) – the port
connection (ContainerProtocol) – the connection
- async shutdown()
- class mango.container.tcp.TCPContainer(*, addr: Tuple[str, int], codec: Codec, loop: AbstractEventLoop, clock: Clock, **kwargs)
Bases:
ContainerThis is a container that communicate directly with other containers via tcp
- as_agent_process(agent_creator, mirror_container_creator=<function tcp_mirror_container_creator>)
Spawn a new process with a container, mirroring the current container, and 1 to n agents, created by agent_creator. Can be used to introduce real parallelization using the agents as unit to divide.
Internally this will create a process with its own asyncio loop and an own container, which will mostly imitate the behavior of this container. For this, IPC is used (async pipes). The agent in the process will behave exactly like it would in the parent process and in the main container (this).
- Parameters:
agent_creator (Function(Container)) – function, which creates 1…n agents, has exactly one argument the mirror container
mirror_container_creator (Function(ContainerData, AsyncioLoop, AioDuplex, AioDuplex, Event)) – function, which creates the mirror container, generally this parameter is set by the subclasses of container
- Returns:
a handle for the created process. It contains the pid as property ‘pid’ and can be awaited to make sure the initialization of the agents in the subprocess is actually done.
- Return type:
- async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs) bool
The Container sends a message to an agent using TCP.
- Parameters:
content – The content of the message
receiver_addr – Tuple of host, port
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings
- async setup()
- async shutdown()
calls shutdown() from super class Container and closes the server
- mango.container.tcp.tcp_mirror_container_creator(container_data, loop, message_pipe, main_queue, event_pipe, terminate_event)