mango.agent package

mango.agent.core module

This module implements the base class for agents (Agent).

Every agent must live in a container. Containers are responsible for making

connections to other agents.

class mango.agent.core.Agent(container: Container, suggested_aid: str = None, suspendable_tasks=True, observable_tasks=True)

Bases: ABC, AgentDelegates

Base class for all agents.

handle_message(content, meta: Dict[str, Any])

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

raise_exceptions(fut: Future)

Inline function used as a callback to raise exceptions :param fut: The Future object of the task

async shutdown()

Shutdown all tasks that are running and deregister from the container

class mango.agent.core.AgentContext(container)

Bases: object

property addr
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

deregister_agent(aid)
register_agent(agent, suggested_aid)
async send_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)

See container.send_acl_message(…)

async send_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)

See container.send_message(…)

class mango.agent.core.AgentDelegates(context, scheduler)

Bases: object

property addr
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a process task when a specified condition is met.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)

Schedule a task when a specified condition is met.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled

  • lookup_delay (float) – delay between checking the condition

  • src (Object) – creator of the task

schedule_instant_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)

Schedules sending an acl message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_acl_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • receiver_id – The agent id of the receiver

  • acl_metadata – Metadata for the acl message

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)

Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.

Parameters:
  • content – The content of the message

  • receiver_addr – The address passed to the container

  • receiver_id – The agent id of the receiver

  • kwargs – Additional parameters to provide protocol specific settings

Returns:

asyncio.Task for the scheduled coroutine

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)

Schedule an instantly executed task in another processes.

Parameters:
  • coroutine_creator – coroutine_creator creating coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_instant_task(coroutine, on_stop=None, src=None)

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)

Schedule an open end periodically executed task in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)

Schedule an open end peridocally executed task.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_process_task(task: ScheduledProcessTask, src=None)

Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)

Schedule a task using a fine-grained recurrency rule in another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None)

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task – task to be scheduled

  • src – object, which represents the source of the task (for example the object in which the task got created)

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (timestamp) – timestamp defining when the task should start

  • src (Object) – creator of the task

async send_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)

See container.send_acl_message(…)

async send_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)

See container.send_message(…)

async tasks_complete(timeout=1)

Wait for all scheduled tasks to complete using a timeout.

Parameters:

timeout – waiting timeout. Defaults to 1.

mango.agent.role module

API classes for using the role system. The role system is based on the idea, that everything an agent can do, is described as role/responsibility and is implemented in one separate class. For example participating in a coalition would be a separate role, monitoring grid voltage another one.

A role is part of a RoleAgent which inherits from Agent.

There are essentially two APIs for acting resp reacting:

  • [Reacting] RoleContext.subscribe_message(), which allows you to subscribe to certain message types and lets you handle the message

  • [Acting] RoleContext.schedule_task(), this allows you to schedule a task with delay/repeating/…

To interact with the environment an instance of the role context is provided. This context provides methods to share data with other roles and to communicate with other agents.

A message can be send using the method RoleContext.send_message().

There are often dependencies between different parts of an agent, there are options to interact with other roles: Roles have the possibility to use shared models and to act on changes of these models. A role can subscribe specific data that another role provides. To set this up, a model has to be created via RoleContext.get_or_create_model(). To notify other roles RoleContext.update() has to be called. In order to let a Role subscribe to a model you can use subscribe_model(). If you prefer a lightweight variant you can use RoleContext.data() to assign/access shared data.

Furthermore there are two lifecycle methods to know about:

  • Role.setup() is called when the Role is added to the agent, so its the perfect place

    for initialization and scheduling of tasks

  • Role.on_stop() is called when the container the agent lives in, is shut down

class mango.agent.role.DataContainer

Bases: object

get(key)
update(data: dict)
class mango.agent.role.Role

Bases: ABC

General role class, defining the API every role can use. A role implements one responsibility of an agent.

Every role must be added to a RoleAgent and is defined by some lifecycle methods:

  • Role.setup() is called when the Role is added to the agent, so its the perfect place for

    initialization and scheduling of tasks

  • Role.on_stop() is called when the container the agent lives in, is shut down

To interact with the environment you have to use the context, accessible via :func:Role.context.

bind(context: RoleContext) None

Method used internal to set the context, do not override!

Parameters:

context – the role context

property context: RoleContext

Return the context of the role. This context can be send as bridge to the agent.

Returns:

the context of the role

on_change_model(model) None

Will be invoked when a subscribed model changes via RoleContext.update().

Parameters:

model – the model

on_deactivation(src) None

Hook in, which will be called when another role deactivates this instance (temporarily)

async on_stop() None

Lifecycle hook in, which will be called when the container is shut down or if the role got removed.

setup() None

Lifecycle hook in, which will be called on adding the role to agent. The role context is known from hereon.

class mango.agent.role.RoleAgent(container, suggested_aid: str = None, suspendable_tasks=True, observable_tasks=True)

Bases: Agent

Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with RoleAgent.add_role().

add_role(role: Role)

Add a role to the agent. This will lead to the call of Role.setup().

Parameters:

role – the role to add

handle_message(content, meta: Dict[str, Any])

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

remove_role(role: Role)

Remove a role permanently from the agent.

Parameters:

role (Role) – [description]

property roles: List[Role]

Returns list of roles

Returns:

list of roles

async shutdown()

Shutdown all tasks that are running and deregister from the container

class mango.agent.role.RoleContext(agent_context: AgentContext, scheduler: Scheduler, role_handler: RoleHandler, aid: str, inbox)

Bases: AgentDelegates

Implementation of the RoleContext.

activate(role) None
add_role(role: Role)

Add a role to the context.

Parameters:

role – the Role

property addr
property aid
property current_timestamp: float

Method that returns the current unix timestamp given the clock within the container

property data

Return data container of the agent

Returns:

the data container

Return type:

DataContainer

deactivate(role) None
emit_event(event: Any, event_source: Any = None)

Emit an custom event to other roles.

Parameters:
  • event (Any) – the event

  • event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None

get_or_create_model(cls)
handle_message(content, meta: Dict[str, Any])

Handle an incoming message, delegating it to all applicable subscribers

for role, message_condition, method, _ in self._message_subs:
    if self._is_role_active(role) and message_condition(content, meta):
        method(content, meta)
Parameters:
  • content – content

  • meta – meta

inbox_length()
remove_role(role: Role)

Remove a role and call on_stop for clean up

Parameters:

role (Role) – the role to remove

async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)

See container.send_acl_message(…)

async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)

See container.send_message(…)

subscribe_event(role: Role, event_type: Any, handler_method: Callable)

Subscribe to specific event types. The listener will be evaluated based on their order of subscription

Parameters:
  • role (Role) – the role in which you want to handle the event

  • event_type (Any) – the event type you want to handle

subscribe_message(role, method, message_condition, priority=0)
subscribe_model(role, role_model_type)
subscribe_send(role, method)
update(role_model)
class mango.agent.role.RoleHandler(agent_context, scheduler)

Bases: object

Contains all roles and their models. Implements the communication between roles.

activate(role) None

Activates the given role.

Parameters:

role (Role) – the role to activate

add_role(role: Role) None

Add a new role

Args:

role ([type]): the role

deactivate(role) None

Deactivates the role. This includes all tasks (soft suspending)

Parameters:

role (Role) – the role to deactivate

emit_event(event: Any, event_source: Any = None)
get_or_create_model(cls)

Creates or return (when already created) a central role model.

Returns:

[type]: the model

handle_message(content, meta: Dict[str, Any])

Handle an incoming message, delegating it to all applicable subscribers

for role, message_condition, method, _ in self._message_subs:
    if self._is_role_active(role) and message_condition(content, meta):
        method(content, meta)
Parameters:
  • content – content

  • meta – meta

async on_stop()

Notify all roles when the container is shutdown

remove_role(role: Role) None

Remove a given role

Args:

role ([type]): the role

property roles: List[Role]

Returns all roles

Returns:

List[Role]: the roles hold by this handler

async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)
subscribe(role: Role, role_model_type) None

Subscribe a role to change events of a specific role model type

Args:

role ([type]): the role role_model_type ([type]): the type of the role model

subscribe_event(role: Role, event_type: type, method: Callable)
subscribe_message(role, method, message_condition, priority=0)
subscribe_send(role: Role, method: Callable)
update(role_model) None

Notifies all subscribers of an update of the given role_model.

Args:

role_model ([type]): the role model to notify about