Scheduling and Clock

When implementing agents including proactive behavior there are some typical types of tasks you might want to create. For example it might be desired to let the agent check every minute whether some resources are available, or often you just want to execute a task at a specified time. To help achieving this kind of goals mango exposes the scheduling API.

The core of this API is the scheduler, which is part of every agent. To schedule a task its necessary to create a ScheduledTask (resp. an object of a subclass). In mango the following tasks are available:

Available ScheduledTasks

Class

Description

InstantScheduledTask

Executes the coroutine without delay

TimestampScheduledTask

Executes the coroutine at a specified datetime

PeriodicScheduledTask

Executes a coroutine periodically with a static delay between the cycles

ConditionalScheduledTask

Executes the coroutine when a specified condition evaluates to True

AwaitingTask

Execute a given coroutine after another given coroutine has been awaited

RecurrentScheduledTask

Will get executed periodically with a specified delay

Furthermore there are convenience methods to get rid of the class imports when using these types of tasks.

from mango import Agent
from mango.util.scheduling import InstantScheduledTask

    class ScheduleAgent(Agent):
        def __init__(self, container, other_addr, other_id):
            self.schedule_instant_acl_message(
                receiver_addr=other_addr,
                receiver_id=other_id,
                content="Hello world!")
            )
            # equivalent to
            self.schedule_instant_acl_message(
                receiver_addr=other_addr,
                receiver_id=other_id,
                content="Hello world!"))
            )

        def handle_message(self, content, meta: Dict[str, Any]):
            pass

When using the scheduling another feature becomes available: suspendable tasks. This makes it possible to pause and resume all tasks started with the scheduling API. Using this it is necessary to specify an identifier when starting the task (using src=your_identifier). To suspend a task you can call scheduler.suspend(your_identifier), to resume them just call the counterpart scheduler.resume(your_identifier). The scheduler is part of the agent and accessible via self._scheduler.

Dispatch Tasks to other Process

As asyncio does not provide real parallelism to utilize multiple cores and agents may have tasks, which need a lot computational power, the need to dispatch certain tasks to other processes appear. Handling inter process communication manually is quite exhausting and having multiple process pools across different roles or agents leads to inefficient resource allocations. As a result mango offers a way to dispatch tasks, based on coroutine-functions, to other processes, managed by the framework.

Analogues to the normal API there are two different ways, first you create a ScheduledProcessTask and call schedule_process_task, second you invoke the convnience methods with “process” in the name. These methods exists on any Agent, the RoleContext and the Scheduler. In mango the following process tasks are available:

Available ProcessTasks

Class

Description

ScheduledProcessTask

Marks a ScheduledTask as process compatible

TimestampScheduledProcessTask

Timestamp based one-shot task

AwaitingProcessTask

Await a coroutine, then execute another

InstantScheduledProcessTask

One-shot task, which will get executed instantly

PeriodicScheduledProcessTask

Executes a coroutine periodically with a static delay between the cycles

RecurrentScheduledProcessTask

Will get executed periodically with a specified delay

ConditionalProcessTask

Will get executed as soon as the given condition is fulfilled

from mango import Agent
from mango.util.scheduling import InstantScheduledProcessTask

    class ScheduleAgent(Agent):
        def __init__(self, container, other_addr, other_id):
            self.schedule_instant_process_task(coroutine_creator=lambda: self.context.send_acl_message(
                receiver_addr=other_addr,
                receiver_id=other_id,
                content="Hello world!")
            )
            # equivalent to
            self.schedule_process_task(InstantScheduledProcessTask(coroutine_creator=lambda: self.context.send_acl_message(
                receiver_addr=other_addr,
                receiver_id=other_id,
                content="Hello world!"))
            )

        def handle_message(self, content, meta: Dict[str, Any]):
            pass

Using an external clock

Usually, the scheduler will schedule the tasks of a mango agent based on the real time. This is the default behaviour of the scheduler. However, in some contexts it is necessary to schedule the agent based on an external clock, e. g. in simulations that run faster than real-time. In mango, this is possible by defining the Clock of a container, which will be used by the scheduler of all agents within this container. The default clock is the AsyncioClock, which works as a real-time clock. An alternative clock is the ExternalClock. Time of this clock has to be set by an external process. That way you can control how fast or slow time passes within your agent system:

import asyncio
from mango import create_container
from mango import Agent
from mango.util.clock import AsyncioClock, ExternalClock


class Caller(Agent):
    def __init__(self, container, receiver_addr, receiver_id):
        super().__init__(container)
        self.schedule_timestamp_task(coroutine=self.send_hello_world(receiver_addr, receiver_id),
                                     timestamp=self.current_timestamp + 5)

    async def send_hello_world(self, receiver_addr, receiver_id):
        await self.context.send_acl_message(receiver_addr=receiver_addr,
                                           receiver_id=receiver_id,
                                           content='Hello World')

    def handle_message(self, content, meta):
        pass


class Receiver(Agent):
    def __init__(self, container):
        super().__init__(container)
        self.wait_for_reply = asyncio.Future()

    def handle_message(self, content, meta):
        print(f'Received a message with the following content {content}.')
        self.wait_for_reply.set_result(True)


async def main():
    clock = AsyncioClock()
    # clock = ExternalClock(start_time=1000)
    addr = ('127.0.0.1', 5555)
    c = await create_container(addr=addr, clock=clock)
    receiver = Receiver(c)
    caller = Caller(c, addr, receiver.aid)
    await receiver.wait_for_reply
    await c.shutdown()


if __name__ == '__main__':
    asyncio.run(main())

This code will terminate after 5 seconds. If you change the clock to an ExternalClock by uncommenting the ExternalClock in the example above, the program won’t terminate as the time of the clock is not proceeded by an external process. If you comment in the ExternalClock and change your main() as follows, the program will terminate after one second:

async def main():
    # clock = AsyncioClock()
    clock = ExternalClock(start_time=1000)
    addr = ('127.0.0.1', 5555)

    c = await create_container(addr=addr, clock=clock)
    receiver = Receiver(c)
    caller = Caller(c, addr, receiver.aid)
    if isinstance(clock, ExternalClock):
        await asyncio.sleep(1)
        clock.set_time(clock.time + 5)
    await receiver.wait_for_reply
    await c.shutdown()