Scheduling and Clock¶
When implementing agents including proactive behavior there are some typical types of tasks you might want to create. For example it might be desired to let the agent check every minute whether some resources are available, or often you just want to execute a task at a specified time. To help achieving this kind of goals mango exposes the scheduling API.
The core of this API is the scheduler, which is part of every agent. To schedule a task its necessary to create a ScheduledTask (resp. an object of a subclass). In mango the following tasks are available:
Class |
Description |
---|---|
InstantScheduledTask |
Executes the coroutine without delay |
TimestampScheduledTask |
Executes the coroutine at a specified datetime |
PeriodicScheduledTask |
Executes a coroutine periodically with a static delay between the cycles |
RecurrentScheduledTask |
Executes a coroutine according to a dynamic repetition scheme provided by a rrule |
ConditionalScheduledTask |
Executes the coroutine when a specified condition evaluates to True |
AwaitingTask |
Execute a given coroutine after another given coroutine has been awaited |
RecurrentScheduledTask |
Will get executed periodically with a specified delay |
Furthermore there are convenience methods to get rid of the class imports when using these types of tasks.
from mango import Agent
class ScheduleAgent(Agent):
def schedule(self, other_addr):
self.schedule_instant_message(
"Hello world!"
other_addr
)
When using the scheduling another feature becomes available: suspendable tasks.
This makes it possible to pause and resume all tasks started with the scheduling API.
Using this it is necessary to specify an identifier when starting the task (using src=your_identifier
).
To suspend a task you can call scheduler.suspend(your_identifier)
, to resume them just call the
counterpart scheduler.resume(your_identifier)
. The scheduler is part of the agent and accessible
via self._scheduler
.
Dispatch Tasks to other Process¶
As asyncio does not provide real parallelism to utilize multiple cores and agents may have tasks, which need a lot computational power, the need to dispatch certain tasks to other processes appear. Handling inter process communication manually is quite exhausting and having multiple process pools across different roles or agents leads to inefficient resource allocations. As a result mango offers a way to dispatch tasks, based on coroutine-functions, to other processes, managed by the framework.
Analogues to the normal API there are two different ways, first you create a ScheduledProcessTask
and call schedule_process_task
, second you invoke the convnience methods with “process” in the name.
These methods exists on any Agent, the RoleContext and the Scheduler.
In mango the following process tasks are available:
Class |
Description |
---|---|
ScheduledProcessTask |
Marks a ScheduledTask as process compatible |
TimestampScheduledProcessTask |
Timestamp based one-shot task |
AwaitingProcessTask |
Await a coroutine, then execute another |
InstantScheduledProcessTask |
One-shot task, which will get executed instantly |
PeriodicScheduledProcessTask |
Executes a coroutine periodically with a static delay between the cycles |
RecurrentScheduledProcessTask |
Will get executed periodically with a specified delay |
ConditionalProcessTask |
Will get executed as soon as the given condition is fulfilled |
Using an external clock¶
Usually, the scheduler will schedule the tasks of a mango agent based on the real time.
This is the default behaviour of the scheduler.
However, in some contexts it is necessary to schedule the agent based on an external clock,
e. g. in simulations that run faster than real-time.
In mango, this is possible by defining the Clock
of a container, which will be used by the
scheduler of all agents within this container.
The default clock is the AsyncioClock
, which works as a real-time clock. An alternative clock
is the ExternalClock
. Time of this clock has to be set by an external process. That way you can
control how fast or slow time passes within your agent system:
import asyncio
from mango import create_tcp_container, Agent, AsyncioClock, ExternalClock, activate
class Caller(Agent):
def __init__(self, receiver_addr):
super().__init__()
self.receiver_addr = receiver_addr
def on_ready(self):
self.schedule_timestamp_task(coroutine=self.send_hello_world(self.receiver_addr),
timestamp=self.current_timestamp + 0.5)
async def send_hello_world(self, receiver_addr):
await self.send_message(receiver_addr=self.receiver_addr,
content='Hello World')
class Receiver(Agent):
def __init__(self):
super().__init__()
self.wait_for_reply = asyncio.Future()
def handle_message(self, content, meta):
print(f'Received a message with the following content {content}.')
self.wait_for_reply.set_result(True)
async def main():
clock = AsyncioClock()
addr = ('127.0.0.1', 5555)
c = create_tcp_container(addr=addr, clock=clock)
receiver = c.register(Receiver())
caller = c.register(Caller(receiver.addr))
async with activate(c):
await receiver.wait_for_reply
asyncio.run(main())
Received a message with the following content Hello World.
This code will terminate after 0.5 seconds.
If you change the clock to an ExternalClock
in the example above,
the program won’t terminate as the time of the clock is not proceeded by an external process.
If you comment in the ExternalClock and change your main() as follows, the program will terminate after one second:
async def main():
clock = ExternalClock(start_time=1000)
addr = ('127.0.0.1', 5555)
c = create_tcp_container(addr=addr, clock=clock)
receiver = c.register(Receiver())
caller = c.register(Caller(receiver.addr))
async with activate(c):
await asyncio.sleep(1)
clock.set_time(clock.time + 0.5)
await receiver.wait_for_reply
asyncio.run(main())
Received a message with the following content Hello World.
Using a distributed clock¶
To distribute simulations, mango provides a distributed clock, which is implemented with by two Agents: 1. DistributedClockAgent: this agent needs to be present in every participating container 2. DistributedClockManager: this agent shall exist exactly once
The clock is distributed by an DistributedClockManager Agent on the managing container, which listens to the current time.
In the other container DistributedClockAgent’s are running, which listen to messages from the ClockManager.
The ClockAgent sets the received time on the clock of its container with set_time and responds with its get_next_activity() after making sure that all tasks which are due at the current timestamp are finished.
The ClockManager only acts after all connected Containers have finished and have sent their next timestamp as response.
The response is then added as a Future on the manager, which makes sure, that the managers get_next_activity() shows the next action needed to run on all containers.
Caution: it is needed, that all agents are connected before starting the manager
In the following a simple example is shown.
import asyncio
from mango import DistributedClockAgent, DistributedClockManager, create_tcp_container, activate, ExternalClock
async def main():
container_man = create_tcp_container(("127.0.0.1", 1555), clock=ExternalClock())
container_ag = create_tcp_container(("127.0.0.1", 1556), clock=ExternalClock())
clock_agent = container_ag.register(DistributedClockAgent())
clock_manager = container_man.register(DistributedClockManager(
receiver_clock_addresses=[clock_agent.addr]
))
async with activate(container_man, container_ag) as cl:
# increasing the time
container_man.clock.set_time(100)
# first distribute the time - then wait for the agent to finish
next_event = await clock_manager.distribute_time()
# here no second distribute to wait for retrieval is needed
# the clock_manager distributed the time to the other container
assert container_ag.clock.time == 100
print("Time has been distributed!")
asyncio.run(main())
Time has been distributed!