Agents API

Agent core

This module implements the base class for agents (Agent).

Every agent must live in a container. Containers are responsible for making

connections to other agents.

class mango.agent.core.Agent[source]

Bases: ABC, AgentDelegates

Base class for all agents.

handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

property observable_tasks
on_register()[source]

Hook-in to define behavior of the agent directly after it got registered by a container

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

property suspendable_tasks
class mango.agent.core.AgentContext(container)[source]

Bases: object

property addr
property clock: Clock
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

deregister(aid)[source]
register(agent, suggested_aid)[source]
async send_message(content, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool[source]

See container.send_message(…)

class mango.agent.core.AgentDelegates[source]

Bases: object

property addr

Return the address of the agent as AgentAddress

Returns:

_type_: AgentAddress

property aid
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

neighbors(state: State = State.NORMAL) list[AgentAddress][source]

Return the neighbors of the agent (controlled by the topology api).

Returns:

the list of agent addresses filtered by state

Return type:

list[AgentAddress]

on_ready()[source]

Called when all container has been started using activate(…).

on_start()[source]

Called when container started in which the agent is contained

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)[source]

Schedule a process task when a specified condition is met.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)[source]

Schedule a task when a specified condition is met.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)[source]

Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)[source]

Schedule an instantly executed task in another processes.

Parameters:
  • coroutine_creator – coroutine_creator creating coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_instant_task(coroutine, on_stop=None, src=None)[source]

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)[source]

Schedule an open end periodically executed task in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)[source]

Schedule an open end peridocally executed task.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_process_task(task: ScheduledProcessTask, src=None)[source]

Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)[source]

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)[source]

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None)[source]

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)[source]

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)[source]

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (timestamp) – timestamp defining when the task should start

  • src (Object) – creator of the task

async send_message(content, receiver_addr: AgentAddress, **kwargs) bool[source]

See container.send_message(…)

service_of_type(type: type, default: Any = None) Any[source]

Return the service of the type type or set the default as service and return it.

Parameters:
  • type (type) – the type of the service

  • default (Any (optional)) – the default if applicable

Returns:

the service

Return type:

Any

async tasks_complete(timeout=1)[source]

Wait for all scheduled tasks to complete using a timeout.

Parameters:

timeout – waiting timeout. Defaults to 1.

class mango.agent.core.State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

BROKEN = 2
INACTIVE = 1
NORMAL = 0
class mango.agent.core.TopologyService[source]

Bases: object

neighbors(state: State = State.NORMAL)[source]
state_to_neighbors: dict[State, list] = {}

Roles

API classes for using the role system. The role system is based on the idea, that everything an agent can do, is described as role/responsibility and is implemented in one separate class. For example participating in a coalition would be a separate role, monitoring grid voltage another one.

A role is part of a RoleAgent which inherits from Agent.

There are essentially two APIs for acting resp reacting:

  • [Reacting] RoleContext.subscribe_message(), which allows you to subscribe to certain message types and lets you handle the message

  • [Acting] RoleContext.schedule_task(), this allows you to schedule a task with delay/repeating/…

To interact with the environment an instance of the role context is provided. This context provides methods to share data with other roles and to communicate with other agents.

A message can be send using the method RoleContext.send_message().

There are often dependencies between different parts of an agent, there are options to interact with other roles: Roles have the possibility to use shared models and to act on changes of these models. A role can subscribe specific data that another role provides. To set this up, a model has to be created via RoleContext.get_or_create_model(). To notify other roles RoleContext.update() has to be called. In order to let a Role subscribe to a model you can use subscribe_model(). If you prefer a lightweight variant you can use RoleContext.data() to assign/access shared data.

Furthermore there are two lifecycle methods to know about:

  • Role.setup() is called when the Role is added to the agent, so its the perfect place

    for initialization and scheduling of tasks

  • Role.on_stop() is called when the container the agent lives in, is shut down

class mango.agent.role.DataContainer[source]

Bases: object

get(key, default=None)[source]
update(data: dict)[source]
class mango.agent.role.Role[source]

Bases: ABC

General role class, defining the API every role can use. A role implements one responsibility of an agent.

Every role must be added to a RoleAgent and is defined by some lifecycle methods:

  • Role.setup() is called when the Role is added to the agent, so its the perfect place for

    initialization and scheduling of tasks

  • Role.on_stop() is called when the container the agent lives in, is shut down

To interact with the environment you have to use the context, accessible via :func:Role.context.

property context: RoleContext

Return the context of the role. This context can be send as bridge to the agent.

Returns:

the context of the role

handle_message(content: Any, meta: dict)[source]
on_change_model(model) None[source]

Will be invoked when a subscribed model changes via RoleContext.update().

Parameters:

model – the model

on_deactivation(src) None[source]

Hook in, which will be called when another role deactivates this instance (temporarily)

on_ready()[source]

Called after the start of all container using activate

on_start() None[source]

Called when container started in which the agent is contained

async on_stop() None[source]

Lifecycle hook in, which will be called when the container is shut down or if the role got removed.

setup() None[source]

Lifecycle hook in, which will be called on adding the role to agent. The role context is known from hereon.

class mango.agent.role.RoleAgent[source]

Bases: Agent

Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with RoleAgent.add_role().

add_role(role: Role)[source]

Add a role to the agent. This will lead to the call of Role.setup().

Parameters:

role – the role to add

handle_message(content, meta: dict[str, Any])[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

on_ready()[source]

Called when all container has been started using activate(…).

on_register()[source]

Hook-in to define behavior of the agent directly after it got registered by a container

on_start()[source]

Called when container started in which the agent is contained

remove_role(role: Role)[source]

Remove a role permanently from the agent.

Parameters:

role (Role) – [description]

property roles: list[Role]

Returns list of roles

Returns:

list of roles

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

class mango.agent.role.RoleContext(role_handler: RoleHandler, aid: str, inbox)[source]

Bases: AgentDelegates

Implementation of the RoleContext.

activate(role) None[source]
add_role(role: Role)[source]

Add a role to the context.

Parameters:

role – the Role

property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

property data

Return data container of the agent

Returns:

the data container

Return type:

DataContainer

deactivate(role) None[source]
emit_event(event: Any, event_source: Any = None)[source]

Emit an custom event to other roles.

Parameters:
  • event (Any) – the event

  • event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None

get_or_create_model(cls)[source]
handle_message(content, meta: dict[str, Any])[source]

Handle an incoming message, delegating it to all applicable subscribers

for role, message_condition, method, _ in self._message_subs:
    if self._is_role_active(role) and message_condition(content, meta):
        method(content, meta)
Parameters:
  • content – content

  • meta – meta

inbox_length()[source]
on_ready()[source]

Called when all container has been started using activate(…).

on_start()[source]

Called when container started in which the agent is contained

remove_role(role: Role)[source]

Remove a role and call on_stop for clean up

Parameters:

role (Role) – the role to remove

async send_message(content, receiver_addr: AgentAddress, **kwargs) bool[source]

See container.send_message(…)

subscribe_event(role: Role, event_type: Any, handler_method: Callable)[source]

Subscribe to specific event types. The listener will be evaluated based on their order of subscription

Parameters:
  • role (Role) – the role in which you want to handle the event

  • event_type (Any) – the event type you want to handle

subscribe_message(role, method, message_condition, priority=0)[source]
subscribe_model(role, role_model_type)[source]
subscribe_send(role, method)[source]
update(role_model)[source]
class mango.agent.role.RoleHandler(scheduler)[source]

Bases: object

Contains all roles and their models. Implements the communication between roles.

activate(role) None[source]

Activates the given role.

Parameters:

role (Role) – the role to activate

add_role(role: Role) None[source]

Add a new role

Args:

role ([type]): the role

deactivate(role) None[source]

Deactivates the role. This includes all tasks (soft suspending)

Parameters:

role (Role) – the role to deactivate

emit_event(event: Any, event_source: Any = None)[source]
get_or_create_model(cls)[source]

Creates or return (when already created) a central role model.

Returns:

[type]: the model

handle_message(content, meta: dict[str, Any])[source]

Handle an incoming message, delegating it to all applicable subscribers

for role, message_condition, method, _ in self._message_subs:
    if self._is_role_active(role) and message_condition(content, meta):
        method(content, meta)
Parameters:
  • content – content

  • meta – meta

on_ready()[source]
on_start()[source]
async on_stop()[source]

Notify all roles when the container is shutdown

remove_role(role: Role) None[source]

Remove a given role

Args:

role ([type]): the role

property roles: list[Role]

Returns all roles

Returns:

List[Role]: the roles hold by this handler

subscribe(role: Role, role_model_type) None[source]

Subscribe a role to change events of a specific role model type

Args:

role ([type]): the role role_model_type ([type]): the type of the role model

subscribe_event(role: Role, event_type: type, method: Callable)[source]
subscribe_message(role, method, message_condition, priority=0)[source]
subscribe_send(role: Role, method: Callable)[source]
update(role_model) None[source]

Notifies all subscribers of an update of the given role_model.

Args:

role_model ([type]): the role model to notify about