"""
This module implements the base class for agents (:class:`Agent`).
Every agent must live in a container. Containers are responsible for making
connections to other agents.
"""
import asyncio
import logging
from abc import ABC
from enum import Enum
from typing import Any
from ..messages.message import AgentAddress
from ..util.clock import Clock
from ..util.scheduling import ScheduledProcessTask, ScheduledTask, Scheduler
logger = logging.getLogger(__name__)
[docs]
class State(Enum):
NORMAL = 0 # normal neighbor
INACTIVE = (
1 # neighbor link exists but link is not active (could be activated/used)
)
BROKEN = 2 # neighbor link exists but link is not usable (can not be activated)
[docs]
class TopologyService:
state_to_neighbors: dict[State, list] = dict()
[docs]
def neighbors(self, state: State = State.NORMAL):
return [f() for f in self.state_to_neighbors.get(state, [])]
[docs]
class AgentContext:
def __init__(self, container) -> None:
self._container = container
@property
def current_timestamp(self) -> float:
"""
Method that returns the current unix timestamp given the clock within the container
"""
return self._container.clock.time
@property
def clock(self) -> Clock:
return self._container.clock
@property
def addr(self):
return self._container.addr
[docs]
def register(self, agent, suggested_aid):
return self._container.register(agent, suggested_aid=suggested_aid)
[docs]
def deregister(self, aid):
if self._container.running:
self._container.deregister(aid)
[docs]
async def send_message(
self,
content,
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
) -> bool:
"""
See container.send_message(...)
"""
return await self._container.send_message(
content, receiver_addr=receiver_addr, sender_id=sender_id, **kwargs
)
[docs]
class AgentDelegates:
def __init__(self) -> None:
self.context: AgentContext = None
self.scheduler: Scheduler = None
self._aid = None
self._services = {}
[docs]
def on_start(self):
"""Called when container started in which the agent is contained"""
[docs]
def on_ready(self):
"""Called when all container has been started using activate(...)."""
@property
def current_timestamp(self) -> float:
"""
Method that returns the current unix timestamp given the clock within the container
"""
return self.context.current_timestamp
@property
def aid(self):
return self._aid
@property
def addr(self):
"""Return the address of the agent as AgentAddress
Returns:
_type_: AgentAddress
"""
if self.context is None:
return None
return AgentAddress(self.context.addr, self.aid)
[docs]
async def send_message(
self,
content,
receiver_addr: AgentAddress,
**kwargs,
) -> bool:
"""
See container.send_message(...)
"""
return await self.context.send_message(
content, receiver_addr=receiver_addr, sender_id=self.aid, **kwargs
)
[docs]
def schedule_instant_message(
self,
content,
receiver_addr: AgentAddress,
**kwargs,
):
"""
Schedules sending a message without any delay. This is equivalent to using the schedulers 'schedule_instant_task' with the coroutine created by
'container.send_message'.
:param content: The content of the message
:param receiver_addr: The address passed to the container
:param kwargs: Additional parameters to provide protocol specific settings
:returns: asyncio.Task for the scheduled coroutine
"""
return self.schedule_instant_task(
self.send_message(content, receiver_addr=receiver_addr, **kwargs)
)
[docs]
def schedule_conditional_process_task(
self,
coroutine_creator,
condition_func,
lookup_delay=0.1,
on_stop=None,
src=None,
):
"""Schedule a process task when a specified condition is met.
:param coroutine_creator: coroutine_creator creating coroutine to be scheduled
:type coroutine_creator: coroutine_creator
:param condition_func: function for determining whether the confition is fullfiled
:type condition_func: lambda () -> bool
:param lookup_delay: delay between checking the condition
:type lookup_delay: float
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_conditional_process_task(
coroutine_creator=coroutine_creator,
condition_func=condition_func,
lookup_delay=lookup_delay,
on_stop=on_stop,
src=src,
)
[docs]
def schedule_conditional_task(
self, coroutine, condition_func, lookup_delay=0.1, on_stop=None, src=None
):
"""Schedule a task when a specified condition is met.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param condition_func: function for determining whether the confition is fullfiled
:type condition_func: lambda () -> bool
:param lookup_delay: delay between checking the condition
:type lookup_delay: float
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_conditional_task(
coroutine=coroutine,
condition_func=condition_func,
lookup_delay=lookup_delay,
on_stop=on_stop,
src=src,
)
[docs]
def schedule_timestamp_task(
self, coroutine, timestamp: float, on_stop=None, src=None
):
"""Schedule a task at specified unix timestamp.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param timestamp: timestamp defining when the task should start
:type timestamp: timestamp
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_timestamp_task(
coroutine=coroutine, timestamp=timestamp, on_stop=on_stop, src=src
)
[docs]
def schedule_timestamp_process_task(
self, coroutine_creator, timestamp: float, on_stop=None, src=None
):
"""Schedule a task at specified unix timestamp dispatched to another process.
:param coroutine_creator: coroutine_creator creating coroutine to be scheduled
:type coroutine_creator: coroutine_creator
:param timestamp: unix timestamp defining when the task should start
:type timestamp: float
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_timestamp_process_task(
coroutine_creator=coroutine_creator,
timestamp=timestamp,
on_stop=on_stop,
src=src,
)
[docs]
def schedule_periodic_process_task(
self, coroutine_creator, delay, on_stop=None, src=None
):
"""Schedule an open end periodically executed task in another process.
:param coroutine_creator: coroutine function creating coros to be scheduled
:type coroutine_creator: Coroutine Function
:param delay: delay in between the cycles
:type delay: float
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_periodic_process_task(
coroutine_creator=coroutine_creator, delay=delay, on_stop=on_stop, src=src
)
[docs]
def schedule_periodic_task(self, coroutine_func, delay, on_stop=None, src=None):
"""Schedule an open end peridocally executed task.
:param coroutine_func: coroutine function creating coros to be scheduled
:type coroutine_func: Coroutine Function
:param delay: delay in between the cycles
:type delay: float
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_periodic_task(
coroutine_func=coroutine_func, delay=delay, on_stop=on_stop, src=src
)
[docs]
def schedule_recurrent_process_task(
self, coroutine_creator, recurrency, on_stop=None, src=None
):
"""Schedule a task using a fine-grained recurrency rule in another process.
:param coroutine_creator: coroutine function creating coros to be scheduled
:type coroutine_creator: Coroutine Function
:param recurrency: recurrency rule to calculate next event
:type recurrency: dateutil.rrule.rrule
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_recurrent_process_task(
coroutine_creator=coroutine_creator,
recurrency=recurrency,
on_stop=on_stop,
src=src,
)
[docs]
def schedule_recurrent_task(
self, coroutine_func, recurrency, on_stop=None, src=None
):
"""Schedule a task using a fine-grained recurrency rule in another process.
:param coroutine_creator: coroutine function creating coros to be scheduled
:type coroutine_creator: Coroutine Function
:param recurrency: recurrency rule to calculate next event
:type recurrency: dateutil.rrule.rrule
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_recurrent_task(
coroutine_func=coroutine_func,
recurrency=recurrency,
on_stop=on_stop,
src=src,
)
[docs]
def schedule_instant_process_task(self, coroutine_creator, on_stop=None, src=None):
"""Schedule an instantly executed task in another processes.
:param coroutine_creator: coroutine_creator creating coroutine to be scheduled
:type coroutine_creator:
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_instant_process_task(
coroutine_creator=coroutine_creator, on_stop=on_stop, src=src
)
[docs]
def schedule_instant_task(self, coroutine, on_stop=None, src=None):
"""Schedule an instantly executed task.
:param coroutine: coroutine to be scheduled
:type coroutine:
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.scheduler.schedule_instant_task(
coroutine=coroutine, on_stop=on_stop, src=src
)
[docs]
def schedule_process_task(self, task: ScheduledProcessTask, src=None):
"""Schedule a task with asyncio in another process. When the task is finished, if finite, its automatically
removed afterwards. For scheduling options see the subclasses of ScheduledScheduledProcessTaskTask.
:param task: task to be scheduled
:param src: object, which represents the source of the task (for example the object in which the task got created)
"""
return self.scheduler.schedule_process_task(task, src=src)
[docs]
def schedule_task(self, task: ScheduledTask, src=None):
"""Schedule a task with asyncio. When the task is finished, if finite, its automatically
removed afterwards. For scheduling options see the subclasses of ScheduledTask.
:param task: task to be scheduled
:param src: object, which represents the source of the task (for example the object in which the task got created)
"""
return self.scheduler.schedule_task(task, src=src)
[docs]
async def tasks_complete(self, timeout=1):
"""Wait for all scheduled tasks to complete using a timeout.
:param timeout: waiting timeout. Defaults to 1.
"""
await self.scheduler.tasks_complete(timeout=timeout)
[docs]
def service_of_type(self, type: type, default: Any = None) -> Any:
"""Return the service of the type ``type`` or set the default as service and return it.
:param type: the type of the service
:type type: type
:param default: the default if applicable
:type default: Any (optional)
:return: the service
:rtype: Any
"""
if type not in self._services:
self._services[type] = default
return self._services[type]
[docs]
def neighbors(self, state: State = State.NORMAL) -> list[AgentAddress]:
"""Return the neighbors of the agent (controlled by the topology api).
:return: the list of agent addresses filtered by state
:rtype: list[AgentAddress]
"""
return self.service_of_type(TopologyService).neighbors(state)
[docs]
class Agent(ABC, AgentDelegates):
"""Base class for all agents."""
def __init__(
self,
):
"""
Initialize an agent
"""
super().__init__()
self.inbox = asyncio.Queue()
@property
def observable_tasks(self):
return self.scheduler.observable
@observable_tasks.setter
def observable_tasks(self, value: bool):
self.scheduler.observable = value
@property
def suspendable_tasks(self):
return self.scheduler.suspendable
@suspendable_tasks.setter
def suspendable_tasks(self, value: bool):
self.scheduler.suspendable = value
[docs]
def on_register(self):
"""
Hook-in to define behavior of the agent directly after it got registered by a container
"""
def _do_register(self, container, aid):
self._aid = aid
self.context = AgentContext(container)
self.scheduler = Scheduler(
suspendable=True, observable=True, clock=container.clock
)
self.on_register()
def _do_start(self):
self._check_inbox_task = asyncio.create_task(self._check_inbox())
self._check_inbox_task.add_done_callback(self._raise_exceptions)
self._stopped = asyncio.Future()
self.on_start()
def _raise_exceptions(self, fut: asyncio.Future):
"""
Inline function used as a callback to raise exceptions
:param fut: The Future object of the task
"""
try:
if fut.exception() is not None:
logger.error(
"Agent %s: Caught the following exception in _check_inbox: ",
self.aid,
fut.exception(),
)
raise fut.exception()
except asyncio.CancelledError:
pass
async def _check_inbox(self):
"""Task for waiting on new message in the inbox"""
try:
logger.debug("Agent %s: Start waiting for messages", self.aid)
while True:
# run in infinite loop until it is cancelled from outside
message = await self.inbox.get()
logger.debug("Agent %s: Received message;%s", self.aid, message)
# message should be tuples of (priority, content, meta)
priority, content, meta = message
meta["priority"] = priority
self.handle_message(content=content, meta=meta)
# signal to the Queue that the message is handled
self.inbox.task_done()
except Exception:
logger.exception("The check inbox task of %s failed!", self.aid)
[docs]
def handle_message(self, content, meta: dict[str, Any]):
"""
Has to be implemented by the user.
This method is called when a message is received at the agents inbox.
:param content: The deserialized message object
:param meta: Meta details of the message. In case of mqtt this dict
includes at least the field 'topic'
"""
raise NotImplementedError
[docs]
async def shutdown(self):
"""Shutdown all tasks that are running
and deregister from the container"""
if not self._stopped.done():
self._stopped.set_result(True)
self.context.deregister(self.aid)
try:
# Shutdown reactive inbox task
self._check_inbox_task.remove_done_callback(self._raise_exceptions)
self._check_inbox_task.cancel()
await self._check_inbox_task
except asyncio.CancelledError:
pass
try:
await self.scheduler.shutdown()
except asyncio.CancelledError:
pass
finally:
logger.info("Agent %s: Shutdown successful", self.aid)