Express API

API

class mango.express.api.ComposedAgent[source]

Bases: RoleAgent

class mango.express.api.ContainerActivationManager(containers: list[Container])[source]

Bases: object

class mango.express.api.PrintingAgent[source]

Bases: Agent

handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

class mango.express.api.RunWithContainer(num: int, *agents: tuple[Agent, dict])[source]

Bases: ABC

async after_start(container_list, agents)[source]
abstractmethod create_container_list(num) list[Container][source]
class mango.express.api.RunWithMQTTManager(num: int, *agents: Agent | tuple[Agent, dict], broker_addr: tuple[str, int] = ('127.0.0.1', 5555), codec: None | Codec = None)[source]

Bases: RunWithContainer

async after_start(container_list, agents)[source]
create_container_list(num)[source]
class mango.express.api.RunWithTCPManager(num: int, *agents: Agent | tuple[Agent, dict], addr: tuple[str, int] = ('127.0.0.1', 5555), codec: None | Codec = None, auto_port: bool = False)[source]

Bases: RunWithContainer

create_container_list(num)[source]
mango.express.api.activate(*containers: Container) ContainerActivationManager[source]

Create and return an async activation context manager. This can be used with the async with syntax to run code while the container(s) are active. The containers are started first, after your code under async with will run, and at the end the container will shut down (even when an error occurs).

Example:

# Single container
async with activate(container) as container:
    # do your stuff

# Multiple container
async with activate(container_list) as container_list:
    # do your stuff
Returns:

The context manager to be used as described

Return type:

ContainerActivationManager

mango.express.api.addr(protocol_addr: Any, aid: str) AgentAddress[source]

Create an Address from the topic.

Args:

protocol_addr (Any): the container part of the addr, e.g. topic for mqtt, or host/port for tcp, … aid (str): the agent id

Returns:

AgentAddress: the address

mango.express.api.agent_composed_of(*roles: Role, register_in: None | Container = None, suggested_aid: None | str = None) ComposedAgent[source]

Create an agent composed of the given roles. If a container is provided, the created agent is automatically registered with the container register_in.

Parameters:
  • register_in (None | Container) – container in which the created agent is registered, if provided

  • suggested_aid (str) – the suggested aid for registration

Returns:

the composed agent

Return type:

ComposedAgent

mango.express.api.run_with_mqtt(num: int, *agents: tuple[Agent, dict], broker_addr: tuple[str, int] = ('127.0.0.1', 1883), codec: None | Codec = None) RunWithMQTTManager[source]

Create and return an async context manager, which can be used to run the given agents in num automatically created mqtt container. The agents are distributed according to the topic

The function takes a list of agents which shall run, it is possible to provide a tuple (Agent, dict), the dict supports “aid” for the suggested_aid and “topics” as list of topics the agent wants to subscribe to.

Parameters:
  • num (int) – _description_

  • broker_addr (tuple[str, int], optional) – Address of the broker the container shall connect to, defaults to (“127.0.0.1”, 1883)

  • codec (None | Codec, optional, The codec of the container) – _description_, defaults to None

Returns:

the async context manager

Return type:

RunWithMQTTManager

mango.express.api.run_with_tcp(num: int, *agents: Agent | tuple[Agent, dict], addr: tuple[str, int] = ('127.0.0.1', 5555), codec: None | Codec = None, auto_port: bool = False) RunWithTCPManager[source]

Create and return an async context manager, which can be used to run the given agents in num automatically created tcp container. The agents are distributed evenly.

async with run_with_tcp(2, Agent(), Agent(), (Agent(), dict(aid="my_agent_id"))) as c:
    # do your stuff
Parameters:
  • num (int) – number of tcp container

  • addr (tuple[str, int], optional) – the starting addr of the containers, defaults to (“127.0.0.1”, 5555)

  • codec (None | Codec, optional) – the codec for the containers, defaults to None

  • auto_port (bool) – set if the port should be chosen automatically

Returns:

the async context manager to run the agents with

Return type:

RunWithTCPManager

mango.express.api.sender_addr(meta: dict) AgentAddress[source]

Extract the sender_addr from the meta dict.

Args:

meta (dict): the meta you received

Returns:

AgentAddress: Extracted agent address to be used for replying to messages