Source code for mango.agent.role

"""
API classes for using the role system. The role system is based on the idea, that
everything an agent can do, is described as role/responsibility and is implemented in
one separate class. For example participating in a coalition would be a separate role,
monitoring grid voltage another one.

A role is part of a :class:`RoleAgent` which inherits from :class:`Agent`.

There are essentially two APIs for acting resp reacting:

* [Reacting] :func:`RoleContext.subscribe_message`, which allows you to subscribe to certain message types and lets you handle the message
* [Acting] :func:`RoleContext.schedule_task`, this allows you to schedule a task with delay/repeating/...

To interact with the environment an instance of the role context is provided. This context
provides methods to share data with other roles and to communicate with other agents.

A message can be send using the method :func:`RoleContext.send_message`.

There are often dependencies between different parts of an agent, there are options to
interact with other roles: Roles have the possibility to use shared models and to act on
changes of these models. A role can subscribe specific data that another role provides.
To set this up, a model has to be created via
:func:`RoleContext.get_or_create_model`. To notify other roles
:func:`RoleContext.update` has to be called. In order to let a Role subscribe to a model you can use
:func:`subscribe_model`.
If you prefer a lightweight variant you can use :func:`RoleContext.data` to assign/access shared data.

Furthermore there are two lifecycle methods to know about:

* :func:`Role.setup` is called when the Role is added to the agent, so its the perfect place
                     for initialization and scheduling of tasks
* :func:`Role.on_stop` is called when the container the agent lives in, is shut down
"""

import asyncio
from abc import ABC
from collections.abc import Callable
from typing import Any

from mango.agent.core import Agent, AgentAddress, AgentDelegates


[docs] class DataContainer: def __getitem__(self, key): return self.__getattribute__(key) def __setitem__(self, key, newvalue): self.__setattr__(key, newvalue) def __contains__(self, key): return hasattr(self, key)
[docs] def get(self, key, default=None): if key in self: return self[key] else: return default
[docs] def update(self, data: dict): for k, v in data.items(): self.__setattr__(k, v)
class Role: pass
[docs] class RoleHandler: """Contains all roles and their models. Implements the communication between roles.""" def __init__(self, scheduler): self._role_models = {} self._roles = [] self._role_to_active = {} self._role_model_type_to_subs = {} self._message_subs = [] self._send_msg_subs = {} self._role_event_type_to_handler = {} self._scheduler = scheduler self._data = DataContainer()
[docs] def get_or_create_model(self, cls): """Creates or return (when already created) a central role model. Returns: [type]: the model """ if cls in self._role_models: return self._role_models[cls] self._role_models[cls] = cls() return self._role_models[cls]
[docs] def update(self, role_model) -> None: """Notifies all subscribers of an update of the given role_model. Args: role_model ([type]): the role model to notify about """ role_model_type = type(role_model) self._role_models[role_model_type] = role_model # Notify all subscribing agents if role_model_type in self._role_model_type_to_subs: for role in self._role_model_type_to_subs[role_model_type]: if self._is_role_active(role): role.on_change_model(role_model)
[docs] def subscribe(self, role: Role, role_model_type) -> None: """Subscribe a role to change events of a specific role model type Args: role ([type]): the role role_model_type ([type]): the type of the role model """ if role_model_type in self._role_model_type_to_subs: self._role_model_type_to_subs[role_model_type].append(role) else: self._role_model_type_to_subs[role_model_type] = [role]
[docs] def add_role(self, role: Role) -> None: """Add a new role Args: role ([type]): the role """ self._roles.append(role) self._role_to_active[role] = True
[docs] def remove_role(self, role: Role) -> None: """Remove a given role Args: role ([type]): the role """ self._roles.remove(role) del self._role_to_active[role]
@property def roles(self) -> list[Role]: """Returns all roles Returns: List[Role]: the roles hold by this handler """ return self._roles
[docs] def deactivate(self, role) -> None: """Deactivates the role. This includes all tasks (soft suspending) :param role: the role to deactivate :type role: Role """ self._role_to_active[role] = False self._scheduler.suspend(role)
[docs] def activate(self, role) -> None: """Activates the given role. :param role: the role to activate :type role: Role """ self._role_to_active[role] = True self._scheduler.resume(role)
def _is_role_active(self, role) -> bool: if role in self._role_to_active: return self._role_to_active[role] return True
[docs] async def on_stop(self): """Notify all roles when the container is shutdown""" for role in self._roles: await role.on_stop()
[docs] def handle_message(self, content, meta: dict[str, Any]): """Handle an incoming message, delegating it to all applicable subscribers .. code-block:: python for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta) :param content: content :param meta: meta """ handle_message_found = False for role, message_condition, method, _ in self._message_subs: # do not execute handle_message twice if role has subscription as well if method.__name__ == "handle_message": handle_message_found = True if self._is_role_active(role) and message_condition(content, meta): method(content, meta) if not handle_message_found: for role in self.roles: role.handle_message(content, meta)
def _notify_send_message_subs(self, content, receiver_addr: AgentAddress, **kwargs): for role in self._send_msg_subs: for sub in self._send_msg_subs[role]: if self._is_role_active(role): sub( content=content, receiver_addr=receiver_addr, **kwargs, )
[docs] def subscribe_message(self, role, method, message_condition, priority=0): if len(self._message_subs) == 0: self._message_subs.append((role, message_condition, method, priority)) return for i in range(len(self._message_subs)): _, _, _, other_prio = self._message_subs[i] if priority < other_prio: self._message_subs.insert( i, (role, message_condition, method, priority) ) break elif i == len(self._message_subs) - 1: self._message_subs.append((role, message_condition, method, priority))
[docs] def subscribe_send(self, role: Role, method: Callable): if role in self._send_msg_subs: self._send_msg_subs[role].append(method) else: self._send_msg_subs[role] = [method]
[docs] def emit_event(self, event: Any, event_source: Any = None): subs = self._role_event_type_to_handler[type(event)] for _, method in subs: method(event, event_source)
[docs] def subscribe_event(self, role: Role, event_type: type, method: Callable): if event_type not in self._role_event_type_to_handler: self._role_event_type_to_handler[event_type] = [] self._role_event_type_to_handler[event_type] += [(role, method)]
[docs] def on_start(self): for role in self.roles: role.on_start()
[docs] def on_ready(self): for role in self.roles: role.on_ready()
[docs] class RoleContext(AgentDelegates): """Implementation of the RoleContext.""" def __init__( self, role_handler: RoleHandler, aid: str, inbox, ): super().__init__() self._role_handler = role_handler self._aid = aid self._inbox = inbox @property def data(self): """Return data container of the agent :return: the data container :rtype: DataContainer """ return self._get_container() @property def current_timestamp(self) -> float: return self.context.current_timestamp def _get_container(self): return self._role_handler._data
[docs] def inbox_length(self): return self._inbox.qsize()
[docs] def get_or_create_model(self, cls): return self._role_handler.get_or_create_model(cls)
[docs] def update(self, role_model): self._role_handler.update(role_model)
[docs] def subscribe_model(self, role, role_model_type): self._role_handler.subscribe(role, role_model_type)
[docs] def subscribe_message(self, role, method, message_condition, priority=0): self._role_handler.subscribe_message( role, method, message_condition, priority=priority )
[docs] def subscribe_send(self, role, method): self._role_handler.subscribe_send(role, method)
[docs] def add_role(self, role: Role): """Add a role to the context. :param role: the Role """ role._bind(self) self._role_handler.add_role(role) # Setup role role.setup()
[docs] def remove_role(self, role: Role): """Remove a role and call on_stop for clean up :param role: the role to remove :type role: Role """ self._role_handler.remove_role(role) asyncio.create_task(role.on_stop())
[docs] def handle_message(self, content, meta: dict[str, Any]): """Handle an incoming message, delegating it to all applicable subscribers .. code-block:: python for role, message_condition, method, _ in self._message_subs: if self._is_role_active(role) and message_condition(content, meta): method(content, meta) :param content: content :param meta: meta """ self._role_handler.handle_message(content, meta)
[docs] async def send_message( self, content, receiver_addr: AgentAddress, **kwargs, ) -> bool: self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs) return await self.context.send_message( content=content, receiver_addr=receiver_addr, sender_id=self.aid, **kwargs, )
[docs] def emit_event(self, event: Any, event_source: Any = None): """Emit an custom event to other roles. :param event: the event :type event: Any :param event_source: emitter of the event (mostly the emitting role), defaults to None :type event_source: Any, optional """ self._role_handler.emit_event(event, event_source)
[docs] def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable): """Subscribe to specific event types. The listener will be evaluated based on their order of subscription :param role: the role in which you want to handle the event :type role: Role :param event_type: the event type you want to handle :type event_type: Any """ self._role_handler.subscribe_event(role, event_type, handler_method)
[docs] def deactivate(self, role) -> None: self._role_handler.deactivate(role)
[docs] def activate(self, role) -> None: self._role_handler.activate(role)
[docs] def on_start(self): self._role_handler.on_start()
[docs] def on_ready(self): self._role_handler.on_ready()
[docs] class RoleAgent(Agent): """Agent, which support the role API-system. When you want to use the role-api you always need a RoleAgent as base for your agents. A role can be added with :func:`RoleAgent.add_role`. """ def __init__(self): """Create a role-agent :param container: container the agent lives in :param suggested_aid: (Optional) suggested aid, if the aid is already taken, a generated aid is used. Using the generated aid-style ("agentX") is not allowed. """ super().__init__() self._role_handler = RoleHandler(None) self._role_context = RoleContext(self._role_handler, self.aid, self.inbox)
[docs] def on_start(self): self._role_context.on_start()
[docs] def on_ready(self): self._role_context.on_ready()
[docs] def on_register(self): self._role_context.context = self.context self._role_context.scheduler = self.scheduler self._role_handler._scheduler = self.scheduler self._role_context._aid = self.aid
[docs] def add_role(self, role: Role): """Add a role to the agent. This will lead to the call of :func:`Role.setup`. :param role: the role to add """ self._role_context.add_role(role)
[docs] def remove_role(self, role: Role): """Remove a role permanently from the agent. :param role: [description] :type role: Role """ self._role_context.remove_role(role)
@property def roles(self) -> list[Role]: """Returns list of roles :return: list of roles """ return self._role_handler.roles
[docs] def handle_message(self, content, meta: dict[str, Any]): self._role_context.handle_message(content, meta)
[docs] async def shutdown(self): await self._role_handler.on_stop() await super().shutdown()
[docs] class Role(ABC): """General role class, defining the API every role can use. A role implements one responsibility of an agent. Every role must be added to a :class:`RoleAgent` and is defined by some lifecycle methods: * :func:`Role.setup` is called when the Role is added to the agent, so its the perfect place for initialization and scheduling of tasks * :func:`Role.on_stop` is called when the container the agent lives in, is shut down To interact with the environment you have to use the context, accessible via :func:Role.context. """ def __init__(self) -> None: """Initialize the roles internals. !!Care!! the role context is unknown at this point! """ self._context = None def _bind(self, context: RoleContext) -> None: """Method used internal to set the context, do not override! :param context: the role context """ self._context = context @property def context(self) -> RoleContext: """Return the context of the role. This context can be send as bridge to the agent. :return: the context of the role """ return self._context
[docs] def setup(self) -> None: """Lifecycle hook in, which will be called on adding the role to agent. The role context is known from hereon. """
[docs] def on_change_model(self, model) -> None: """Will be invoked when a subscribed model changes via :func:`RoleContext.update`. :param model: the model """
[docs] def on_deactivation(self, src) -> None: """Hook in, which will be called when another role deactivates this instance (temporarily)"""
[docs] async def on_stop(self) -> None: """Lifecycle hook in, which will be called when the container is shut down or if the role got removed."""
[docs] def on_start(self) -> None: """Called when container started in which the agent is contained"""
[docs] def on_ready(self): """Called after the start of all container using activate"""
[docs] def handle_message(self, content: Any, meta: dict): pass