API reference

The API reference provides detailed descriptions of the mango’s classes and functions.

class mango.Agent[source]

Base class for all agents.

property addr

Return the address of the agent as AgentAddress

Returns:

_type_: AgentAddress

property aid
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

neighbors(state: State = State.NORMAL) list[AgentAddress]

Return the neighbors of the agent (controlled by the topology api).

Returns:

the list of agent addresses filtered by state

Return type:

list[AgentAddress]

property observable_tasks
on_ready()

Called when all container has been started using activate(…).

on_register()[source]

Hook-in to define behavior of the agent directly after it got registered by a container

on_start()

Called when container started in which the agent is contained

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a process task when a specified condition is met.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a task when a specified condition is met.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)

Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)

Schedule an instantly executed task in another processes.

Parameters:
  • coroutine_creator – coroutine_creator creating coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_instant_task(coroutine, on_stop=None, src=None)

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)

Schedule an open end periodically executed task in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)

Schedule an open end peridocally executed task.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_process_task(task: ScheduledProcessTask, src=None)

Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None)

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (timestamp) – timestamp defining when the task should start

  • src (Object) – creator of the task

async send_message(content, receiver_addr: AgentAddress, **kwargs) bool

See container.send_message(…)

service_of_type(type: type, default: Any = None) Any

Return the service of the type type or set the default as service and return it.

Parameters:
  • type (type) – the type of the service

  • default (Any (optional)) – the default if applicable

Returns:

the service

Return type:

Any

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

property suspendable_tasks
async tasks_complete(timeout=1)

Wait for all scheduled tasks to complete using a timeout.

Parameters:

timeout – waiting timeout. Defaults to 1.

class mango.AgentAddress(protocol_addr: Any, aid: str)[source]
aid: str
protocol_addr: Any
class mango.AsyncioClock[source]

The AsyncioClock

sleep(t) Future[source]

Sleeping via asyncio sleep

property time: float

Current time using the time module

class mango.ExternalClock(start_time: float = 0)[source]

An external clock that proceeds only when set_time is called

get_next_activity() float[source]
set_time(t: float)[source]

New time is set

sleep(t: float) Future[source]

Sleeps for t based on the external clock

property time: float

Current time of the external clock

class mango.JSON[source]

A Codec that uses JSON to encode and decode messages.

add_serializer(otype, serialize, deserialize, type_id=None)

Add methods to serialize and deserialize objects typed otype.

This can be used to de-/encode objects that the codec otherwise couldn’t encode.

serialize will receive the unencoded object and needs to return an encodable serialization of it.

deserialize will receive an objects representation and should return an instance of the original object.

decode(data)[source]

Decode data from bytes to the original data structure.

deserialize_obj(obj_repr)

Deserialize the original object from obj_repr.

encode(data)[source]

Encode the given data and return a bytes object.

make_type_id(otype)

Create a type id for otype using: - type name - function names in the class - signature of the class and return a 32 bit integer type id.

serialize_obj(obj)

Serialize obj to something that the codec can encode.

class mango.PROTOBUF[source]
add_serializer(otype, serialize, deserialize, type_id=None)

Add methods to serialize and deserialize objects typed otype.

This can be used to de-/encode objects that the codec otherwise couldn’t encode.

serialize will receive the unencoded object and needs to return an encodable serialization of it.

deserialize will receive an objects representation and should return an instance of the original object.

decode(data)[source]

Decode data from bytes to the original data structure.

deserialize_obj(obj_repr)

Deserialize the original object from obj_repr.

encode(data)[source]

Encode the given data and return a bytes object.

make_type_id(otype)

Create a type id for otype using: - type name - function names in the class - signature of the class and return a 32 bit integer type id.

register_proto_type(proto_class)[source]
serialize_obj(obj)[source]

Serialize obj to something that the codec can encode.

class mango.Performatives(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

member values (must be unique) could be used as priority values if not replaced by enum.auto. See http://www.fipa.org/specs/fipa00037/SC00037J.html for a description of performatives.

accept_proposal = 1
agree = 2
call_for_proposal = 5
cancel = 3
cfp = 4
confirm = 6
disconfirm = 7
failure = 8
inform = 9
inform_if = 20
not_understood = 10
propagate = 22
propose = 11
proxy = 21
query_if = 12
query_ref = 13
refuse = 14
reject_proposal = 15
request = 16
request_when = 17
request_whenever = 18
subscribe = 19
class mango.Role[source]

General role class, defining the API every role can use. A role implements one responsibility of an agent.

Every role must be added to a RoleAgent and is defined by some lifecycle methods:

  • Role.setup() is called when the Role is added to the agent, so its the perfect place for

    initialization and scheduling of tasks

  • Role.on_stop() is called when the container the agent lives in, is shut down

To interact with the environment you have to use the context, accessible via :func:Role.context.

property context: RoleContext

Return the context of the role. This context can be send as bridge to the agent.

Returns:

the context of the role

handle_message(content: Any, meta: dict)[source]
on_change_model(model) None[source]

Will be invoked when a subscribed model changes via RoleContext.update().

Parameters:

model – the model

on_deactivation(src) None[source]

Hook in, which will be called when another role deactivates this instance (temporarily)

on_ready()[source]

Called after the start of all container using activate

on_start() None[source]

Called when container started in which the agent is contained

async on_stop() None[source]

Lifecycle hook in, which will be called when the container is shut down or if the role got removed.

setup() None[source]

Lifecycle hook in, which will be called on adding the role to agent. The role context is known from hereon.

class mango.RoleAgent[source]

Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with RoleAgent.add_role().

add_role(role: Role)[source]

Add a role to the agent. This will lead to the call of Role.setup().

Parameters:

role – the role to add

property addr

Return the address of the agent as AgentAddress

Returns:

_type_: AgentAddress

property aid
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

neighbors(state: State = State.NORMAL) list[AgentAddress]

Return the neighbors of the agent (controlled by the topology api).

Returns:

the list of agent addresses filtered by state

Return type:

list[AgentAddress]

property observable_tasks
on_ready()[source]

Called when all container has been started using activate(…).

on_register()[source]

Hook-in to define behavior of the agent directly after it got registered by a container

on_start()[source]

Called when container started in which the agent is contained

remove_role(role: Role)[source]

Remove a role permanently from the agent.

Parameters:

role (Role) – [description]

property roles: list[Role]

Returns list of roles

Returns:

list of roles

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a process task when a specified condition is met.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a task when a specified condition is met.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)

Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)

Schedule an instantly executed task in another processes.

Parameters:
  • coroutine_creator – coroutine_creator creating coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_instant_task(coroutine, on_stop=None, src=None)

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)

Schedule an open end periodically executed task in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)

Schedule an open end peridocally executed task.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_process_task(task: ScheduledProcessTask, src=None)

Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None)

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (timestamp) – timestamp defining when the task should start

  • src (Object) – creator of the task

async send_message(content, receiver_addr: AgentAddress, **kwargs) bool

See container.send_message(…)

service_of_type(type: type, default: Any = None) Any

Return the service of the type type or set the default as service and return it.

Parameters:
  • type (type) – the type of the service

  • default (Any (optional)) – the default if applicable

Returns:

the service

Return type:

Any

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

property suspendable_tasks
async tasks_complete(timeout=1)

Wait for all scheduled tasks to complete using a timeout.

Parameters:

timeout – waiting timeout. Defaults to 1.

class mango.RoleContext(role_handler: RoleHandler, aid: str, inbox)[source]

Implementation of the RoleContext.

activate(role) None[source]
add_role(role: Role)[source]

Add a role to the context.

Parameters:

role – the Role

property addr

Return the address of the agent as AgentAddress

Returns:

_type_: AgentAddress

property aid
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

property data

Return data container of the agent

Returns:

the data container

Return type:

DataContainer

deactivate(role) None[source]
emit_event(event: Any, event_source: Any = None)[source]

Emit an custom event to other roles.

Parameters:
  • event (Any) – the event

  • event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None

get_or_create_model(cls)[source]
handle_message(content, meta: dict[str, Any])[source]

Handle an incoming message, delegating it to all applicable subscribers

for role, message_condition, method, _ in self._message_subs:
    if self._is_role_active(role) and message_condition(content, meta):
        method(content, meta)
Parameters:
  • content – content

  • meta – meta

inbox_length()[source]
neighbors(state: State = State.NORMAL) list[AgentAddress]

Return the neighbors of the agent (controlled by the topology api).

Returns:

the list of agent addresses filtered by state

Return type:

list[AgentAddress]

on_ready()[source]

Called when all container has been started using activate(…).

on_start()[source]

Called when container started in which the agent is contained

remove_role(role: Role)[source]

Remove a role and call on_stop for clean up

Parameters:

role (Role) – the role to remove

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a process task when a specified condition is met.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a task when a specified condition is met.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)

Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)

Schedule an instantly executed task in another processes.

Parameters:
  • coroutine_creator – coroutine_creator creating coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_instant_task(coroutine, on_stop=None, src=None)

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)

Schedule an open end periodically executed task in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)

Schedule an open end peridocally executed task.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_process_task(task: ScheduledProcessTask, src=None)

Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None)

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (timestamp) – timestamp defining when the task should start

  • src (Object) – creator of the task

async send_message(content, receiver_addr: AgentAddress, **kwargs) bool[source]

See container.send_message(…)

service_of_type(type: type, default: Any = None) Any

Return the service of the type type or set the default as service and return it.

Parameters:
  • type (type) – the type of the service

  • default (Any (optional)) – the default if applicable

Returns:

the service

Return type:

Any

subscribe_event(role: Role, event_type: Any, handler_method: Callable)[source]

Subscribe to specific event types. The listener will be evaluated based on their order of subscription

Parameters:
  • role (Role) – the role in which you want to handle the event

  • event_type (Any) – the event type you want to handle

subscribe_message(role, method, message_condition, priority=0)[source]
subscribe_model(role, role_model_type)[source]
subscribe_send(role, method)[source]
async tasks_complete(timeout=1)

Wait for all scheduled tasks to complete using a timeout.

Parameters:

timeout – waiting timeout. Defaults to 1.

update(role_model)[source]
exception mango.SerializationError[source]

Raised when an object cannot be serialized.

add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class mango.Topology(graph)[source]
add_edge(node_from, node_to, state: State = State.NORMAL)[source]
add_node(*agents: Agent)[source]
property agents: list[Agent]

Return all agents controlled by the topology after the neighborhood were injected.

Returns:

the list of agents

Return type:

list[Agent]

inject()[source]
set_edge_state(node_id_from: int, node_id_to: int, state: State)[source]
mango.activate(*containers: Container) ContainerActivationManager[source]

Create and return an async activation context manager. This can be used with the async with syntax to run code while the container(s) are active. The containers are started first, after your code under async with will run, and at the end the container will shut down (even when an error occurs).

Example:

# Single container
async with activate(container) as container:
    # do your stuff

# Multiple container
async with activate(container_list) as container_list:
    # do your stuff
Returns:

The context manager to be used as described

Return type:

ContainerActivationManager

mango.addr(protocol_addr: Any, aid: str) AgentAddress[source]

Create an Address from the topic.

Args:

protocol_addr (Any): the container part of the addr, e.g. topic for mqtt, or host/port for tcp, … aid (str): the agent id

Returns:

AgentAddress: the address

mango.agent_composed_of(*roles: Role, register_in: None | Container = None, suggested_aid: None | str = None) ComposedAgent[source]

Create an agent composed of the given roles. If a container is provided, the created agent is automatically registered with the container register_in.

Parameters:
  • register_in (None | Container) – container in which the created agent is registered, if provided

  • suggested_aid (str) – the suggested aid for registration

Returns:

the composed agent

Return type:

ComposedAgent

mango.complete_topology(number_of_nodes: int) Topology[source]

Create a fully-connected topology.

mango.create_acl(content, receiver_addr: AgentAddress, sender_addr: AgentAddress, acl_metadata: None | dict[str, Any] = None, is_anonymous_acl=False)[source]
mango.create_ec_container(codec: Codec = None, clock: Clock = None, addr: None | str | tuple[str, int] = None, **kwargs: dict[str, Any])
mango.create_mqtt_container(broker_addr: tuple | dict | str, client_id: str, codec: Codec = None, clock: Clock = None, inbox_topic: str | None = None, copy_internal_messages: bool = False, **kwargs)

This method is called to instantiate an MQTT container

Parameters:
  • broker_addr – The address of the broker this container will connect to. it has to be a tuple of (host, port).

  • client_id – The id of the MQTT Client

  • codec – Defines the codec to use. Defaults to JSON

  • clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock

  • inbox_topic – Default subscription to the a specific MQTT topic

  • copy_internal_messages – Explicitly copy internal messages. Defaults to False

Returns:

The instance of a MQTTContainer

mango.create_tcp_container(addr: str | tuple[str, int], codec: Codec = None, clock: Clock = None, copy_internal_messages: bool = False, auto_port=False, **kwargs: dict[str, Any]) Container

This method is called to instantiate a tcp container

Parameters:
  • addr – The address to use. it has to be a tuple of (host, port).

  • codec – Defines the codec to use. Defaults to JSON

  • clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock

  • copy_internal_messages – Explicitly copy internal messages. Defaults to False

  • auto_port – Whether you want to let the operating system pick the port. Defaults to False

Returns:

The instance of a TCPContainer

mango.create_topology(directed: bool = False)[source]

Create a topology, which will automatically inject the neighbors to the participating agents. Example:

agents = [TopAgent(), TopAgent(), TopAgent()]
with create_topology() as topology:
    id_1 = topology.add_node(agents[0])
    id_2 = topology.add_node(agents[1])
    id_3 = topology.add_node(agents[2])
    topology.add_edge(id_1, id_2)
    topology.add_edge(id_1, id_3)
Parameters:

directed (bool, optional) – _description_, defaults to False

Yield:

_description_

Return type:

_type_

mango.custom_topology(graph: Graph) Topology[source]

Create an already created custom topology base on a nx Graph.

mango.json_serializable(cls=None, repr=True)[source]

This is a direct copy from aiomas: https://gitlab.com/sscherfke/aiomas/-/blob/master/src/aiomas/codecs.py

Class decorator that makes the decorated class serializable by the json codec (or any codec that can handle python dictionaries).

The decorator tries to extract all arguments to the class’ __init__(). That means, the arguments must be available as attributes with the same name.

The decorator adds the following methods to the decorated class:

  • __asdict__(): Returns a dict with all __init__ parameters

  • __fromdict__(dict): Creates a new class instance from dict

  • __serializer__(): Returns a tuple with args for Codec.add_serializer()

  • __repr__(): Returns a generic instance representation. Adding this method can be deactivated by passing repr=False to the decorator.

mango.per_node(topology: Topology)[source]

Assign agents per node of the already created topology. This method shall be used as iterator in a for in construct. The iterator will return nodes, which can be used to add (with node.add()) agents to the node.

topology = complete_topology(3)
for node in per_node(topology):
    node.add(TopAgent())
Parameters:

topology (Topology) – the topology

Yield:

AgentNode

Return type:

_type_

mango.run_with_mqtt(num: int, *agents: tuple[Agent, dict], broker_addr: tuple[str, int] = ('127.0.0.1', 1883), codec: None | Codec = None) RunWithMQTTManager[source]

Create and return an async context manager, which can be used to run the given agents in num automatically created mqtt container. The agents are distributed according to the topic

The function takes a list of agents which shall run, it is possible to provide a tuple (Agent, dict), the dict supports “aid” for the suggested_aid and “topics” as list of topics the agent wants to subscribe to.

Parameters:
  • num (int) – _description_

  • broker_addr (tuple[str, int], optional) – Address of the broker the container shall connect to, defaults to (“127.0.0.1”, 1883)

  • codec (None | Codec, optional, The codec of the container) – _description_, defaults to None

Returns:

the async context manager

Return type:

RunWithMQTTManager

mango.run_with_tcp(num: int, *agents: Agent | tuple[Agent, dict], addr: tuple[str, int] = ('127.0.0.1', 5555), codec: None | Codec = None, auto_port: bool = False) RunWithTCPManager[source]

Create and return an async context manager, which can be used to run the given agents in num automatically created tcp container. The agents are distributed evenly.

async with run_with_tcp(2, Agent(), Agent(), (Agent(), dict(aid="my_agent_id"))) as c:
    # do your stuff
Parameters:
  • num (int) – number of tcp container

  • addr (tuple[str, int], optional) – the starting addr of the containers, defaults to (“127.0.0.1”, 5555)

  • codec (None | Codec, optional) – the codec for the containers, defaults to None

  • auto_port (bool) – set if the port should be chosen automatically

Returns:

the async context manager to run the agents with

Return type:

RunWithTCPManager

mango.sender_addr(meta: dict) AgentAddress[source]

Extract the sender_addr from the meta dict.

Args:

meta (dict): the meta you received

Returns:

AgentAddress: Extracted agent address to be used for replying to messages

class mango.PrintingAgent[source]
handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

class mango.DistributedClockManager(receiver_clock_addresses: list)[source]
async broadcast(message, add_futures=True)[source]

Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received.

Args:

message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True.

async distribute_time(time=None)[source]

Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received.

Args:

time (number, optional): The new time which is set. Defaults to None.

Returns:

number or None: The time at which the next event happens

async get_next_event()[source]

Get the next event from the scheduler by requesting all known clock agents

handle_message(content: float, meta)[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

on_ready()[source]

Called when all container has been started using activate(…).

async send_current_time(time=None)[source]

Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here.

Args:

time (number, optional): The current time which is set. Defaults to None.

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

async wait_all_online()[source]

sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation.

This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages.

async wait_for_futures()[source]

Waits for all futures in self.futures

Gives debug log output to see which agent is waited for.

class mango.DistributedClockAgent[source]
handle_message(content: float, meta)[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

Note

Note that, most classes and functions described in the API reference should be imported using from mango import …, as the stable and public API generally will be available by using mango and the internal module structure might change, even in minor releases.

By subpackages