Agents API¶
Agent core¶
This module implements the base class for agents (Agent
).
- Every agent must live in a container. Containers are responsible for making
connections to other agents.
- class mango.agent.core.Agent[source]¶
Bases:
ABC
,AgentDelegates
Base class for all agents.
- handle_message(content, meta: dict[str, Any])[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- property observable_tasks¶
- on_register()[source]¶
Hook-in to define behavior of the agent directly after it got registered by a container
- property suspendable_tasks¶
- class mango.agent.core.AgentContext(container)[source]¶
Bases:
object
- property addr¶
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- async send_message(content, receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs) bool [source]¶
See container.send_message(…)
- class mango.agent.core.AgentDelegates[source]¶
Bases:
object
- property addr¶
Return the address of the agent as AgentAddress
- Returns:
_type_: AgentAddress
- property aid¶
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- neighbors(state: State = State.NORMAL) list[AgentAddress] [source]¶
Return the neighbors of the agent (controlled by the topology api).
- Returns:
the list of agent addresses filtered by state
- Return type:
list[AgentAddress]
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)[source]¶
Schedule a process task when a specified condition is met.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)[source]¶
Schedule a task when a specified condition is met.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_instant_message(content, receiver_addr: AgentAddress, **kwargs)[source]¶
Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)[source]¶
Schedule an instantly executed task in another processes.
- Parameters:
coroutine_creator – coroutine_creator creating coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_instant_task(coroutine, on_stop=None, src=None)[source]¶
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)[source]¶
Schedule an open end periodically executed task in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)[source]¶
Schedule an open end peridocally executed task.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_process_task(task: ScheduledProcessTask, src=None)[source]¶
Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)[source]¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)[source]¶
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None)[source]¶
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)[source]¶
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)[source]¶
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (timestamp) – timestamp defining when the task should start
src (Object) – creator of the task
- async send_message(content, receiver_addr: AgentAddress, **kwargs) bool [source]¶
See container.send_message(…)
Roles¶
API classes for using the role system. The role system is based on the idea, that everything an agent can do, is described as role/responsibility and is implemented in one separate class. For example participating in a coalition would be a separate role, monitoring grid voltage another one.
A role is part of a RoleAgent
which inherits from Agent
.
There are essentially two APIs for acting resp reacting:
[Reacting]
RoleContext.subscribe_message()
, which allows you to subscribe to certain message types and lets you handle the message[Acting]
RoleContext.schedule_task()
, this allows you to schedule a task with delay/repeating/…
To interact with the environment an instance of the role context is provided. This context provides methods to share data with other roles and to communicate with other agents.
A message can be send using the method RoleContext.send_message()
.
There are often dependencies between different parts of an agent, there are options to
interact with other roles: Roles have the possibility to use shared models and to act on
changes of these models. A role can subscribe specific data that another role provides.
To set this up, a model has to be created via
RoleContext.get_or_create_model()
. To notify other roles
RoleContext.update()
has to be called. In order to let a Role subscribe to a model you can use
subscribe_model()
.
If you prefer a lightweight variant you can use RoleContext.data()
to assign/access shared data.
Furthermore there are two lifecycle methods to know about:
Role.setup()
is called when the Role is added to the agent, so its the perfect placefor initialization and scheduling of tasks
Role.on_stop()
is called when the container the agent lives in, is shut down
- class mango.agent.role.Role[source]¶
Bases:
ABC
General role class, defining the API every role can use. A role implements one responsibility of an agent.
Every role must be added to a
RoleAgent
and is defined by some lifecycle methods:Role.setup()
is called when the Role is added to the agent, so its the perfect place forinitialization and scheduling of tasks
Role.on_stop()
is called when the container the agent lives in, is shut down
To interact with the environment you have to use the context, accessible via :func:Role.context.
- property context: RoleContext¶
Return the context of the role. This context can be send as bridge to the agent.
- Returns:
the context of the role
- on_change_model(model) None [source]¶
Will be invoked when a subscribed model changes via
RoleContext.update()
.- Parameters:
model – the model
- on_deactivation(src) None [source]¶
Hook in, which will be called when another role deactivates this instance (temporarily)
- class mango.agent.role.RoleAgent[source]¶
Bases:
Agent
Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with
RoleAgent.add_role()
.- add_role(role: Role)[source]¶
Add a role to the agent. This will lead to the call of
Role.setup()
.- Parameters:
role – the role to add
- handle_message(content, meta: dict[str, Any])[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- on_register()[source]¶
Hook-in to define behavior of the agent directly after it got registered by a container
- class mango.agent.role.RoleContext(role_handler: RoleHandler, aid: str, inbox)[source]¶
Bases:
AgentDelegates
Implementation of the RoleContext.
- property current_timestamp: float¶
Method that returns the current unix timestamp given the clock within the container
- property data¶
Return data container of the agent
- Returns:
the data container
- Return type:
- emit_event(event: Any, event_source: Any = None)[source]¶
Emit an custom event to other roles.
- Parameters:
event (Any) – the event
event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None
- handle_message(content, meta: dict[str, Any])[source]¶
Handle an incoming message, delegating it to all applicable subscribers
for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta)
- Parameters:
content – content
meta – meta
- remove_role(role: Role)[source]¶
Remove a role and call on_stop for clean up
- Parameters:
role (Role) – the role to remove
- async send_message(content, receiver_addr: AgentAddress, **kwargs) bool [source]¶
See container.send_message(…)
- subscribe_event(role: Role, event_type: Any, handler_method: Callable)[source]¶
Subscribe to specific event types. The listener will be evaluated based on their order of subscription
- Parameters:
role (Role) – the role in which you want to handle the event
event_type (Any) – the event type you want to handle
- class mango.agent.role.RoleHandler(scheduler)[source]¶
Bases:
object
Contains all roles and their models. Implements the communication between roles.
- activate(role) None [source]¶
Activates the given role.
- Parameters:
role (Role) – the role to activate
- deactivate(role) None [source]¶
Deactivates the role. This includes all tasks (soft suspending)
- Parameters:
role (Role) – the role to deactivate
- get_or_create_model(cls)[source]¶
Creates or return (when already created) a central role model.
- Returns:
[type]: the model
- handle_message(content, meta: dict[str, Any])[source]¶
Handle an incoming message, delegating it to all applicable subscribers
for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta)
- Parameters:
content – content
meta – meta