mango.agent package
mango.agent.core module
This module implements the base class for agents (Agent).
- Every agent must live in a container. Containers are responsible for making
connections to other agents.
- class mango.agent.core.Agent(container: Container, suggested_aid: str = None, suspendable_tasks=True, observable_tasks=True)
Bases:
ABC,AgentDelegatesBase class for all agents.
- handle_message(content, meta: Dict[str, Any])
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- raise_exceptions(fut: Future)
Inline function used as a callback to raise exceptions :param fut: The Future object of the task
- async shutdown()
Shutdown all tasks that are running and deregister from the container
- class mango.agent.core.AgentContext(container)
Bases:
object- property addr
- property current_timestamp: float
Method that returns the current unix timestamp given the clock within the container
- deregister_agent(aid)
- register_agent(agent, suggested_aid)
- async send_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
See container.send_acl_message(…)
- async send_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)
See container.send_message(…)
- class mango.agent.core.AgentDelegates(context, scheduler)
Bases:
object- property addr
- property current_timestamp: float
Method that returns the current unix timestamp given the clock within the container
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay=0.1, on_stop=None, src=None)
Schedule a process task when a specified condition is met.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_conditional_task(coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None)
Schedule a task when a specified condition is met.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
condition_func (lambda () -> bool) – function for determining whether the confition is fullfiled
lookup_delay (float) – delay between checking the condition
src (Object) – creator of the task
- schedule_instant_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
Schedules sending an acl message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_acl_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
receiver_id – The agent id of the receiver
acl_metadata – Metadata for the acl message
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)
Schedules sending a message without any delay. This is equivalent to using the schedulers ‘schedule_instant_task’ with the coroutine created by ‘container.send_message’.
- Parameters:
content – The content of the message
receiver_addr – The address passed to the container
receiver_id – The agent id of the receiver
kwargs – Additional parameters to provide protocol specific settings
- Returns:
asyncio.Task for the scheduled coroutine
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)
Schedule an instantly executed task in another processes.
- Parameters:
coroutine_creator – coroutine_creator creating coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_instant_task(coroutine, on_stop=None, src=None)
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)
Schedule an open end periodically executed task in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)
Schedule an open end peridocally executed task.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_process_task(task: ScheduledProcessTask, src=None)
Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)
Schedule a task using a fine-grained recurrency rule in another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None)
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task – task to be scheduled
src – object, which represents the source of the task (for example the object in which the task got created)
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (timestamp) – timestamp defining when the task should start
src (Object) – creator of the task
- async send_acl_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
See container.send_acl_message(…)
- async send_message(content, receiver_addr: str | Tuple[str, int], receiver_id: str | None = None, **kwargs)
See container.send_message(…)
- async tasks_complete(timeout=1)
Wait for all scheduled tasks to complete using a timeout.
- Parameters:
timeout – waiting timeout. Defaults to 1.
mango.agent.role module
API classes for using the role system. The role system is based on the idea, that everything an agent can do, is described as role/responsibility and is implemented in one separate class. For example participating in a coalition would be a separate role, monitoring grid voltage another one.
A role is part of a RoleAgent which inherits from Agent.
There are essentially two APIs for acting resp reacting:
[Reacting]
RoleContext.subscribe_message(), which allows you to subscribe to certain message types and lets you handle the message[Acting]
RoleContext.schedule_task(), this allows you to schedule a task with delay/repeating/…
To interact with the environment an instance of the role context is provided. This context provides methods to share data with other roles and to communicate with other agents.
A message can be send using the method RoleContext.send_message().
There are often dependencies between different parts of an agent, there are options to
interact with other roles: Roles have the possibility to use shared models and to act on
changes of these models. A role can subscribe specific data that another role provides.
To set this up, a model has to be created via
RoleContext.get_or_create_model(). To notify other roles
RoleContext.update() has to be called. In order to let a Role subscribe to a model you can use
subscribe_model().
If you prefer a lightweight variant you can use RoleContext.data() to assign/access shared data.
Furthermore there are two lifecycle methods to know about:
Role.setup()is called when the Role is added to the agent, so its the perfect placefor initialization and scheduling of tasks
Role.on_stop()is called when the container the agent lives in, is shut down
- class mango.agent.role.Role
Bases:
ABCGeneral role class, defining the API every role can use. A role implements one responsibility of an agent.
Every role must be added to a
RoleAgentand is defined by some lifecycle methods:Role.setup()is called when the Role is added to the agent, so its the perfect place forinitialization and scheduling of tasks
Role.on_stop()is called when the container the agent lives in, is shut down
To interact with the environment you have to use the context, accessible via :func:Role.context.
- bind(context: RoleContext) None
Method used internal to set the context, do not override!
- Parameters:
context – the role context
- property context: RoleContext
Return the context of the role. This context can be send as bridge to the agent.
- Returns:
the context of the role
- on_change_model(model) None
Will be invoked when a subscribed model changes via
RoleContext.update().- Parameters:
model – the model
- on_deactivation(src) None
Hook in, which will be called when another role deactivates this instance (temporarily)
- async on_stop() None
Lifecycle hook in, which will be called when the container is shut down or if the role got removed.
- setup() None
Lifecycle hook in, which will be called on adding the role to agent. The role context is known from hereon.
- class mango.agent.role.RoleAgent(container, suggested_aid: str = None, suspendable_tasks=True, observable_tasks=True)
Bases:
AgentAgent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with
RoleAgent.add_role().- add_role(role: Role)
Add a role to the agent. This will lead to the call of
Role.setup().- Parameters:
role – the role to add
- handle_message(content, meta: Dict[str, Any])
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- remove_role(role: Role)
Remove a role permanently from the agent.
- Parameters:
role (Role) – [description]
- async shutdown()
Shutdown all tasks that are running and deregister from the container
- class mango.agent.role.RoleContext(agent_context: AgentContext, scheduler: Scheduler, role_handler: RoleHandler, aid: str, inbox)
Bases:
AgentDelegatesImplementation of the RoleContext.
- activate(role) None
- property addr
- property aid
- property current_timestamp: float
Method that returns the current unix timestamp given the clock within the container
- property data
Return data container of the agent
- Returns:
the data container
- Return type:
- deactivate(role) None
- emit_event(event: Any, event_source: Any = None)
Emit an custom event to other roles.
- Parameters:
event (Any) – the event
event_source (Any, optional) – emitter of the event (mostly the emitting role), defaults to None
- get_or_create_model(cls)
- handle_message(content, meta: Dict[str, Any])
Handle an incoming message, delegating it to all applicable subscribers
for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta)
- Parameters:
content – content
meta – meta
- inbox_length()
- remove_role(role: Role)
Remove a role and call on_stop for clean up
- Parameters:
role (Role) – the role to remove
- async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
See container.send_acl_message(…)
- async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)
See container.send_message(…)
- subscribe_event(role: Role, event_type: Any, handler_method: Callable)
Subscribe to specific event types. The listener will be evaluated based on their order of subscription
- Parameters:
role (Role) – the role in which you want to handle the event
event_type (Any) – the event type you want to handle
- subscribe_message(role, method, message_condition, priority=0)
- subscribe_model(role, role_model_type)
- subscribe_send(role, method)
- update(role_model)
- class mango.agent.role.RoleHandler(agent_context, scheduler)
Bases:
objectContains all roles and their models. Implements the communication between roles.
- deactivate(role) None
Deactivates the role. This includes all tasks (soft suspending)
- Parameters:
role (Role) – the role to deactivate
- emit_event(event: Any, event_source: Any = None)
- get_or_create_model(cls)
Creates or return (when already created) a central role model.
- Returns:
[type]: the model
- handle_message(content, meta: Dict[str, Any])
Handle an incoming message, delegating it to all applicable subscribers
for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta)
- Parameters:
content – content
meta – meta
- async on_stop()
Notify all roles when the container is shutdown
- async send_acl_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, acl_metadata: Dict[str, Any] | None = None, **kwargs)
- async send_message(content, receiver_addr: str | Tuple[str, int], *, receiver_id: str | None = None, **kwargs)
- subscribe(role: Role, role_model_type) None
Subscribe a role to change events of a specific role model type
- Args:
role ([type]): the role role_model_type ([type]): the type of the role model
- subscribe_message(role, method, message_condition, priority=0)
- update(role_model) None
Notifies all subscribers of an update of the given role_model.
- Args:
role_model ([type]): the role model to notify about