Scheduling and Clock¶
When implementing agents including proactive behavior there are some typical types of tasks you might want to create. For example it might be desired to let the agent check every minute whether some resources are available, or often you just want to execute a task at a specified time. To help achieving this kind of goals mango exposes the scheduling API.
The core of this API is the scheduler, which is part of every agent. To schedule a task its necessary to create a ScheduledTask (resp. an object of a subclass). In mango the following tasks are available:
Class |
Description |
---|---|
InstantScheduledTask |
Executes the coroutine without delay |
TimestampScheduledTask |
Executes the coroutine at a specified datetime |
PeriodicScheduledTask |
Executes a coroutine periodically with a static delay between the cycles |
ConditionalScheduledTask |
Executes the coroutine when a specified condition evaluates to True |
AwaitingTask |
Execute a given coroutine after another given coroutine has been awaited |
RecurrentScheduledTask |
Will get executed periodically with a specified delay |
Furthermore there are convenience methods to get rid of the class imports when using these types of tasks.
from mango import Agent
from mango.util.scheduling import InstantScheduledTask
class ScheduleAgent(Agent):
def __init__(self, container, other_addr, other_id):
self.schedule_instant_acl_message(
receiver_addr=other_addr,
receiver_id=other_id,
content="Hello world!")
)
# equivalent to
self.schedule_instant_acl_message(
receiver_addr=other_addr,
receiver_id=other_id,
content="Hello world!"))
)
def handle_message(self, content, meta: Dict[str, Any]):
pass
When using the scheduling another feature becomes available: suspendable tasks. This makes it possible to pause and resume all tasks started with the scheduling API. Using this it is necessary to specify an identifier when starting the task (using src=your_identifier). To suspend a task you can call scheduler.suspend(your_identifier), to resume them just call the counterpart scheduler.resume(your_identifier). The scheduler is part of the agent and accessible via self._scheduler.
Dispatch Tasks to other Process¶
As asyncio does not provide real parallelism to utilize multiple cores and agents may have tasks, which need a lot computational power, the need to dispatch certain tasks to other processes appear. Handling inter process communication manually is quite exhausting and having multiple process pools across different roles or agents leads to inefficient resource allocations. As a result mango offers a way to dispatch tasks, based on coroutine-functions, to other processes, managed by the framework.
Analogues to the normal API there are two different ways, first you create a ScheduledProcessTask and call schedule_process_task
, second you invoke the convnience methods with “process” in the name. These methods exists on any Agent, the RoleContext and the Scheduler.
In mango the following process tasks are available:
Class |
Description |
---|---|
ScheduledProcessTask |
Marks a ScheduledTask as process compatible |
TimestampScheduledProcessTask |
Timestamp based one-shot task |
AwaitingProcessTask |
Await a coroutine, then execute another |
InstantScheduledProcessTask |
One-shot task, which will get executed instantly |
PeriodicScheduledProcessTask |
Executes a coroutine periodically with a static delay between the cycles |
RecurrentScheduledProcessTask |
Will get executed periodically with a specified delay |
ConditionalProcessTask |
Will get executed as soon as the given condition is fulfilled |
from mango import Agent
from mango.util.scheduling import InstantScheduledProcessTask
class ScheduleAgent(Agent):
def __init__(self, container, other_addr, other_id):
self.schedule_instant_process_task(coroutine_creator=lambda: self.context.send_acl_message(
receiver_addr=other_addr,
receiver_id=other_id,
content="Hello world!")
)
# equivalent to
self.schedule_process_task(InstantScheduledProcessTask(coroutine_creator=lambda: self.context.send_acl_message(
receiver_addr=other_addr,
receiver_id=other_id,
content="Hello world!"))
)
def handle_message(self, content, meta: Dict[str, Any]):
pass
Using an external clock¶
Usually, the scheduler will schedule the tasks of a mango agent based on the real time.
This is the default behaviour of the scheduler.
However, in some contexts it is necessary to schedule the agent based on an external clock,
e. g. in simulations that run faster than real-time.
In mango, this is possible by defining the Clock
of a container, which will be used by the
scheduler of all agents within this container.
The default clock is the AsyncioClock
, which works as a real-time clock. An alternative clock
is the ExternalClock
. Time of this clock has to be set by an external process. That way you can
control how fast or slow time passes within your agent system:
import asyncio
from mango import create_container
from mango import Agent
from mango.util.clock import AsyncioClock, ExternalClock
class Caller(Agent):
def __init__(self, container, receiver_addr, receiver_id):
super().__init__(container)
self.schedule_timestamp_task(coroutine=self.send_hello_world(receiver_addr, receiver_id),
timestamp=self.current_timestamp + 5)
async def send_hello_world(self, receiver_addr, receiver_id):
await self.context.send_acl_message(receiver_addr=receiver_addr,
receiver_id=receiver_id,
content='Hello World')
def handle_message(self, content, meta):
pass
class Receiver(Agent):
def __init__(self, container):
super().__init__(container)
self.wait_for_reply = asyncio.Future()
def handle_message(self, content, meta):
print(f'Received a message with the following content {content}.')
self.wait_for_reply.set_result(True)
async def main():
clock = AsyncioClock()
# clock = ExternalClock(start_time=1000)
addr = ('127.0.0.1', 5555)
c = await create_container(addr=addr, clock=clock)
receiver = Receiver(c)
caller = Caller(c, addr, receiver.aid)
await receiver.wait_for_reply
await c.shutdown()
if __name__ == '__main__':
asyncio.run(main())
This code will terminate after 5 seconds.
If you change the clock to an ExternalClock
by uncommenting the ExternalClock in the example above,
the program won’t terminate as the time of the clock is not proceeded by an external process.
If you comment in the ExternalClock and change your main() as follows, the program will terminate after one second:
async def main():
# clock = AsyncioClock()
clock = ExternalClock(start_time=1000)
addr = ('127.0.0.1', 5555)
c = await create_container(addr=addr, clock=clock)
receiver = Receiver(c)
caller = Caller(c, addr, receiver.aid)
if isinstance(clock, ExternalClock):
await asyncio.sleep(1)
clock.set_time(clock.time + 5)
await receiver.wait_for_reply
await c.shutdown()