API reference¶
The API reference provides detailed descriptions of the mango’s classes and functions.
- class mango.Agent[source]¶
Base class for all agents.
- property addr¶
Return the address of the agent as AgentAddress
- Returns:
_type_: AgentAddress
- property aid¶
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- handle_message(content, meta: dict[str, Any])[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- neighbors(state: State = State.NORMAL) list[AgentAddress] ¶
Return the neighbors of the agent (controlled by the topology api).
- Returns:
the list of agent addresses filtered by state
- Return type:
list[AgentAddress]
- property observable_tasks¶
- on_ready()¶
Called when all container has been started using activate(…).
- on_register()[source]¶
Hook-in to define behavior of the agent directly after it got registered by a container
- on_start()¶
Called when container started in which the agent is contained
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a process task when a specified condition is met.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a task when a specified condition is met.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)¶
Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)¶
Schedule an instantly executed task in another processes.
- Parameters:
coroutine_creator – coroutine_creator creating coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_instant_task(coroutine, on_stop=None, src=None)¶
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)¶
Schedule an open end periodically executed task in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)¶
Schedule an open end peridocally executed task.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_process_task(task: ScheduledProcessTask, src=None)¶
Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None)¶
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (timestamp) – timestamp defining when the task should start
src (Object) – creator of the task
- async send_message(content, receiver_addr: AgentAddress, **kwargs) bool ¶
See container.send_message(…)
- service_of_type(type: type, default: Any = None) Any ¶
Return the service of the type
type
or set the default as service and return it.- Parameters:
type (type) – the type of the service
default (Any (optional)) – the default if applicable
- Returns:
the service
- Return type:
Any
- property suspendable_tasks¶
- async tasks_complete(timeout=1)¶
Wait for all scheduled tasks to complete using a timeout.
- Parameters:
timeout – waiting timeout. Defaults to 1.
- class mango.AsyncioClock[source]¶
The AsyncioClock
- property time: float¶
Current time using the time module
- class mango.ExternalClock(start_time: float = 0)[source]¶
An external clock that proceeds only when set_time is called
- property time: float¶
Current time of the external clock
- class mango.JSON[source]¶
A
Codec
that uses JSON to encode and decode messages.- add_serializer(otype, serialize, deserialize, type_id=None)¶
Add methods to serialize and deserialize objects typed otype.
This can be used to de-/encode objects that the codec otherwise couldn’t encode.
serialize will receive the unencoded object and needs to return an encodable serialization of it.
deserialize will receive an objects representation and should return an instance of the original object.
- deserialize_obj(obj_repr)¶
Deserialize the original object from obj_repr.
- make_type_id(otype)¶
Create a type id for otype using: - type name - function names in the class - signature of the class and return a 32 bit integer type id.
- serialize_obj(obj)¶
Serialize obj to something that the codec can encode.
- class mango.PROTOBUF[source]¶
- add_serializer(otype, serialize, deserialize, type_id=None)¶
Add methods to serialize and deserialize objects typed otype.
This can be used to de-/encode objects that the codec otherwise couldn’t encode.
serialize will receive the unencoded object and needs to return an encodable serialization of it.
deserialize will receive an objects representation and should return an instance of the original object.
- deserialize_obj(obj_repr)¶
Deserialize the original object from obj_repr.
- make_type_id(otype)¶
Create a type id for otype using: - type name - function names in the class - signature of the class and return a 32 bit integer type id.
- class mango.Performatives(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
member values (must be unique) could be used as priority values if not replaced by enum.auto. See http://www.fipa.org/specs/fipa00037/SC00037J.html for a description of performatives.
- accept_proposal = 1¶
- agree = 2¶
- call_for_proposal = 5¶
- cancel = 3¶
- cfp = 4¶
- confirm = 6¶
- disconfirm = 7¶
- failure = 8¶
- inform = 9¶
- inform_if = 20¶
- not_understood = 10¶
- propagate = 22¶
- propose = 11¶
- proxy = 21¶
- query_if = 12¶
- query_ref = 13¶
- refuse = 14¶
- reject_proposal = 15¶
- request = 16¶
- request_when = 17¶
- request_whenever = 18¶
- subscribe = 19¶
- class mango.Role[source]¶
General role class, defining the API every role can use. A role implements one responsibility of an agent.
Every role must be added to a
RoleAgent
and is defined by some lifecycle methods:Role.setup()
is called when the Role is added to the agent, so its the perfect place forinitialization and scheduling of tasks
Role.on_stop()
is called when the container the agent lives in, is shut down
To interact with the environment you have to use the context, accessible via :func:Role.context.
- property context: RoleContext¶
Return the context of the role. This context can be send as bridge to the agent.
- Returns:
the context of the role
- on_change_model(model) None [source]¶
Will be invoked when a subscribed model changes via
RoleContext.update()
.- Parameters:
model – the model
- on_deactivation(src) None [source]¶
Hook in, which will be called when another role deactivates this instance (temporarily)
- class mango.RoleAgent[source]¶
Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with
RoleAgent.add_role()
.- add_role(role: Role)[source]¶
Add a role to the agent. This will lead to the call of
Role.setup()
.- Parameters:
role – the role to add
- property addr¶
Return the address of the agent as AgentAddress
- Returns:
_type_: AgentAddress
- property aid¶
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- handle_message(content, meta: dict[str, Any])[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- neighbors(state: State = State.NORMAL) list[AgentAddress] ¶
Return the neighbors of the agent (controlled by the topology api).
- Returns:
the list of agent addresses filtered by state
- Return type:
list[AgentAddress]
- property observable_tasks¶
- on_register()[source]¶
Hook-in to define behavior of the agent directly after it got registered by a container
- remove_role(role: Role)[source]¶
Remove a role permanently from the agent.
- Parameters:
role (Role) – [description]
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a process task when a specified condition is met.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a task when a specified condition is met.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)¶
Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)¶
Schedule an instantly executed task in another processes.
- Parameters:
coroutine_creator – coroutine_creator creating coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_instant_task(coroutine, on_stop=None, src=None)¶
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)¶
Schedule an open end periodically executed task in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)¶
Schedule an open end peridocally executed task.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_process_task(task: ScheduledProcessTask, src=None)¶
Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None)¶
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (timestamp) – timestamp defining when the task should start
src (Object) – creator of the task
- async send_message(content, receiver_addr: AgentAddress, **kwargs) bool ¶
See container.send_message(…)
- service_of_type(type: type, default: Any = None) Any ¶
Return the service of the type
type
or set the default as service and return it.- Parameters:
type (type) – the type of the service
default (Any (optional)) – the default if applicable
- Returns:
the service
- Return type:
Any
- property suspendable_tasks¶
- async tasks_complete(timeout=1)¶
Wait for all scheduled tasks to complete using a timeout.
- Parameters:
timeout – waiting timeout. Defaults to 1.
- class mango.RoleContext(role_handler: RoleHandler, aid: str, inbox)[source]¶
Implementation of the RoleContext.
- property addr¶
Return the address of the agent as AgentAddress
- Returns:
_type_: AgentAddress
- property aid¶
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- property data¶
Return data container of the agent
- Returns:
the data container
- Return type:
- emit_event(event: Any, event_source: Any = None)[source]¶
Emit an custom event to other roles.
- Parameters:
event (Any) – the event
event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None
- handle_message(content, meta: dict[str, Any])[source]¶
Handle an incoming message, delegating it to all applicable subscribers
for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta)
- Parameters:
content – content
meta – meta
- neighbors(state: State = State.NORMAL) list[AgentAddress] ¶
Return the neighbors of the agent (controlled by the topology api).
- Returns:
the list of agent addresses filtered by state
- Return type:
list[AgentAddress]
- remove_role(role: Role)[source]¶
Remove a role and call on_stop for clean up
- Parameters:
role (Role) – the role to remove
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a process task when a specified condition is met.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)¶
Schedule a task when a specified condition is met.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)¶
Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)¶
Schedule an instantly executed task in another processes.
- Parameters:
coroutine_creator – coroutine_creator creating coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_instant_task(coroutine, on_stop=None, src=None)¶
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)¶
Schedule an open end periodically executed task in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)¶
Schedule an open end peridocally executed task.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_process_task(task: ScheduledProcessTask, src=None)¶
Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None)¶
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)¶
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (timestamp) – timestamp defining when the task should start
src (Object) – creator of the task
- async send_message(content, receiver_addr: AgentAddress, **kwargs) bool [source]¶
See container.send_message(…)
- service_of_type(type: type, default: Any = None) Any ¶
Return the service of the type
type
or set the default as service and return it.- Parameters:
type (type) – the type of the service
default (Any (optional)) – the default if applicable
- Returns:
the service
- Return type:
Any
- subscribe_event(role: Role, event_type: Any, handler_method: Callable)[source]¶
Subscribe to specific event types. The listener will be evaluated based on their order of subscription
- Parameters:
role (Role) – the role in which you want to handle the event
event_type (Any) – the event type you want to handle
- async tasks_complete(timeout=1)¶
Wait for all scheduled tasks to complete using a timeout.
- Parameters:
timeout – waiting timeout. Defaults to 1.
- exception mango.SerializationError[source]¶
Raised when an object cannot be serialized.
- add_note()¶
Exception.add_note(note) – add a note to the exception
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class mango.Topology(graph)[source]¶
- mango.activate(*containers: Container) ContainerActivationManager [source]¶
Create and return an async activation context manager. This can be used with the async with syntax to run code while the container(s) are active. The containers are started first, after your code under async with will run, and at the end the container will shut down (even when an error occurs).
Example:
# Single container async with activate(container) as container: # do your stuff # Multiple container async with activate(container_list) as container_list: # do your stuff
- Returns:
The context manager to be used as described
- Return type:
- mango.addr(protocol_addr: Any, aid: str) AgentAddress [source]¶
Create an Address from the topic.
- Args:
protocol_addr (Any): the container part of the addr, e.g. topic for mqtt, or host/port for tcp, … aid (str): the agent id
- Returns:
AgentAddress: the address
- mango.agent_composed_of(*roles: Role, register_in: None | Container = None, suggested_aid: None | str = None) ComposedAgent [source]¶
Create an agent composed of the given roles. If a container is provided, the created agent is automatically registered with the container register_in.
- Parameters:
register_in (None | Container) – container in which the created agent is registered, if provided
suggested_aid (str) – the suggested aid for registration
- Returns:
the composed agent
- Return type:
- mango.create_acl(content, receiver_addr: AgentAddress, sender_addr: AgentAddress, acl_metadata: None | dict[str, Any] = None, is_anonymous_acl=False)[source]¶
- mango.create_ec_container(codec: Codec = None, clock: Clock = None, addr: None | str | tuple[str, int] = None, **kwargs: dict[str, Any])¶
- mango.create_mqtt_container(broker_addr: tuple | dict | str, client_id: str, codec: Codec = None, clock: Clock = None, inbox_topic: str | None = None, copy_internal_messages: bool = False, **kwargs)¶
This method is called to instantiate an MQTT container
- Parameters:
broker_addr – The address of the broker this container will connect to. it has to be a tuple of (host, port).
client_id – The id of the MQTT Client
codec – Defines the codec to use. Defaults to JSON
clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock
inbox_topic – Default subscription to the a specific MQTT topic
copy_internal_messages – Explicitly copy internal messages. Defaults to False
- Returns:
The instance of a MQTTContainer
- mango.create_tcp_container(addr: str | tuple[str, int], codec: Codec = None, clock: Clock = None, copy_internal_messages: bool = False, auto_port=False, **kwargs: dict[str, Any]) Container ¶
This method is called to instantiate a tcp container
- Parameters:
addr – The address to use. it has to be a tuple of (host, port).
codec – Defines the codec to use. Defaults to JSON
clock – The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock
copy_internal_messages – Explicitly copy internal messages. Defaults to False
auto_port – Whether you want to let the operating system pick the port. Defaults to False
- Returns:
The instance of a TCPContainer
- mango.create_topology(directed: bool = False)[source]¶
Create a topology, which will automatically inject the neighbors to the participating agents. Example:
agents = [TopAgent(), TopAgent(), TopAgent()] with create_topology() as topology: id_1 = topology.add_node(agents[0]) id_2 = topology.add_node(agents[1]) id_3 = topology.add_node(agents[2]) topology.add_edge(id_1, id_2) topology.add_edge(id_1, id_3)
- Parameters:
directed (bool, optional) – _description_, defaults to False
- Yield:
_description_
- Return type:
_type_
- mango.custom_topology(graph: Graph) Topology [source]¶
Create an already created custom topology base on a nx Graph.
- mango.json_serializable(cls=None, repr=True)[source]¶
This is a direct copy from aiomas: https://gitlab.com/sscherfke/aiomas/-/blob/master/src/aiomas/codecs.py
Class decorator that makes the decorated class serializable by the json codec (or any codec that can handle python dictionaries).
The decorator tries to extract all arguments to the class’
__init__()
. That means, the arguments must be available as attributes with the same name.The decorator adds the following methods to the decorated class:
__asdict__()
: Returns a dict with all __init__ parameters__fromdict__(dict)
: Creates a new class instance from dict__serializer__()
: Returns a tuple with args forCodec.add_serializer()
__repr__()
: Returns a generic instance representation. Adding this method can be deactivated by passingrepr=False
to the decorator.
- mango.per_node(topology: Topology)[source]¶
Assign agents per node of the already created topology. This method shall be used as iterator in a for in construct. The iterator will return nodes, which can be used to add (with node.add()) agents to the node.
topology = complete_topology(3) for node in per_node(topology): node.add(TopAgent())
- Parameters:
topology (Topology) – the topology
- Yield:
AgentNode
- Return type:
_type_
- mango.run_with_mqtt(num: int, *agents: tuple[Agent, dict], broker_addr: tuple[str, int] = ('127.0.0.1', 1883), codec: None | Codec = None) RunWithMQTTManager [source]¶
Create and return an async context manager, which can be used to run the given agents in num automatically created mqtt container. The agents are distributed according to the topic
The function takes a list of agents which shall run, it is possible to provide a tuple (Agent, dict), the dict supports “aid” for the suggested_aid and “topics” as list of topics the agent wants to subscribe to.
- Parameters:
num (int) – _description_
broker_addr (tuple[str, int], optional) – Address of the broker the container shall connect to, defaults to (“127.0.0.1”, 1883)
codec (None | Codec, optional, The codec of the container) – _description_, defaults to None
- Returns:
the async context manager
- Return type:
- mango.run_with_tcp(num: int, *agents: Agent | tuple[Agent, dict], addr: tuple[str, int] = ('127.0.0.1', 5555), codec: None | Codec = None, auto_port: bool = False) RunWithTCPManager [source]¶
Create and return an async context manager, which can be used to run the given agents in num automatically created tcp container. The agents are distributed evenly.
async with run_with_tcp(2, Agent(), Agent(), (Agent(), dict(aid="my_agent_id"))) as c: # do your stuff
- Parameters:
num (int) – number of tcp container
addr (tuple[str, int], optional) – the starting addr of the containers, defaults to (“127.0.0.1”, 5555)
codec (None | Codec, optional) – the codec for the containers, defaults to None
auto_port (bool) – set if the port should be chosen automatically
- Returns:
the async context manager to run the agents with
- Return type:
- mango.sender_addr(meta: dict) AgentAddress [source]¶
Extract the sender_addr from the meta dict.
- Args:
meta (dict): the meta you received
- Returns:
AgentAddress: Extracted agent address to be used for replying to messages
- class mango.PrintingAgent[source]¶
- handle_message(content, meta: dict[str, Any])[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- class mango.DistributedClockManager(receiver_clock_addresses: list)[source]¶
- async broadcast(message, add_futures=True)[source]¶
Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received.
- Args:
message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True.
- async distribute_time(time=None)[source]¶
Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received.
- Args:
time (number, optional): The new time which is set. Defaults to None.
- Returns:
number or None: The time at which the next event happens
- async get_next_event()[source]¶
Get the next event from the scheduler by requesting all known clock agents
- handle_message(content: float, meta)[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- async send_current_time(time=None)[source]¶
Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here.
- Args:
time (number, optional): The current time which is set. Defaults to None.
- async wait_all_online()[source]¶
sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation.
This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages.
- class mango.DistributedClockAgent[source]¶
- handle_message(content: float, meta)[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
Note
Note that, most classes and functions described in the API reference should be imported using from mango import …, as the stable and public API generally will be available by using mango and the internal module structure might change, even in minor releases.