Utilities

Clock

class mango.util.clock.AsyncioClock[source]

Bases: Clock

The AsyncioClock

sleep(t) Future[source]

Sleeping via asyncio sleep

property time: float

Current time using the time module

class mango.util.clock.Clock[source]

Bases: ABC

Abstract class for clocks that can be used in mango

abstractmethod sleep(t: float) Future[source]
property time: float

Returns the current time of the clock

class mango.util.clock.ExternalClock(start_time: float = 0)[source]

Bases: Clock

An external clock that proceeds only when set_time is called

get_next_activity() float[source]
set_time(t: float)[source]

New time is set

sleep(t: float) Future[source]

Sleeps for t based on the external clock

property time: float

Current time of the external clock

Distributed clock

class mango.util.distributed_clock.ClockAgent[source]

Bases: Agent

async wait_all_done()[source]
class mango.util.distributed_clock.DistributedClockAgent[source]

Bases: ClockAgent

handle_message(content: float, meta)[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

class mango.util.distributed_clock.DistributedClockManager(receiver_clock_addresses: list)[source]

Bases: ClockAgent

async broadcast(message, add_futures=True)[source]

Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received.

Args:

message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True.

async distribute_time(time=None)[source]

Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received.

Args:

time (number, optional): The new time which is set. Defaults to None.

Returns:

number or None: The time at which the next event happens

async get_next_event()[source]

Get the next event from the scheduler by requesting all known clock agents

handle_message(content: float, meta)[source]

Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’

on_ready()[source]

Called when all container has been started using activate(…).

async send_current_time(time=None)[source]

Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here.

Args:

time (number, optional): The current time which is set. Defaults to None.

async shutdown()[source]

Shutdown all tasks that are running and deregister from the container

async wait_all_online()[source]

sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation.

This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages.

async wait_for_futures()[source]

Waits for all futures in self.futures

Gives debug log output to see which agent is waited for.

Scheduling

Module for commonly used time based scheduled task executed inside one agent.

class mango.util.scheduling.AwaitingProcessTask(coroutine_creator, awaited_coroutine_creator, clock=None, on_stop=None)[source]

Bases: AwaitingTask, ScheduledProcessTask

Await a coroutine, then execute another.

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.AwaitingTask(coroutine, awaited_coroutine, clock=None, on_stop=None, observable=True)[source]

Bases: ScheduledTask

Awaiting task. This task will execute a given coroutine after another given coroutine has been awaited. Can be useful if you want to execute something after a Future has finished.

close()[source]

Perform closing actions

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.ConditionalProcessTask(coro_func, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None)[source]

Bases: ConditionalTask, ScheduledProcessTask

Task which will get executed as soon as the given condition is fulfilled.

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.ConditionalTask(coroutine, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None, observable=True)[source]

Bases: ScheduledTask

Task which will get executed as soon as the given condition is fulfilled.

close()[source]

Perform closing actions

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.InstantScheduledProcessTask(coroutine_creator, clock: Clock = None, on_stop=None)[source]

Bases: TimestampScheduledProcessTask

One-shot task, which will get executed instantly.

class mango.util.scheduling.InstantScheduledTask(coroutine, clock: Clock = None, on_stop=None, observable=True)[source]

Bases: TimestampScheduledTask

One-shot task, which will get executed instantly.

close()[source]

Perform closing actions

class mango.util.scheduling.PeriodicScheduledProcessTask(coroutine_func, delay, clock: Clock = None, on_stop=None)[source]

Bases: PeriodicScheduledTask, ScheduledProcessTask

class mango.util.scheduling.PeriodicScheduledTask(coroutine_func, delay, clock: Clock = None, on_stop=None, observable=True)[source]

Bases: ScheduledTask

Class for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.RecurrentScheduledProcessTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None)[source]

Bases: RecurrentScheduledTask, ScheduledProcessTask

class mango.util.scheduling.RecurrentScheduledTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None, observable=True)[source]

Bases: ScheduledTask

Class for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.ScheduledProcessControl(run_task_event: multiprocessing.synchronize.Event, kill_process_event: multiprocessing.synchronize.Event)[source]

Bases: object

init_process()[source]
kill_process()[source]
kill_process_event: Event
resume_task()[source]
run_task_event: Event
suspend_task()[source]
class mango.util.scheduling.ScheduledProcessTask(clock: Clock, on_stop=None, observable=False)[source]

Bases: ScheduledTask

This alt-name marks a ScheduledTask as process compatible. This is necessary due to the fact that not everything can be transferred to other processes i.e. coroutines are bound to the current event-loop resp the current thread, so they won’t work in other processes. Furthermore, when using a ProcessTask you have to ensure, that the coroutine functions should not be bound to complex objects, meaning they should be static or bound to simple objects, which are transferable via pythons IPC implementation.

class mango.util.scheduling.ScheduledTask(clock: Clock = None, observable=True, on_stop=None)[source]

Bases: object

Base class for scheduled tasks in mango. Within this class it is possible to define what to do on execution and on stop. In most cases the logic should get passed as lambda while the scheduling logic is inside of class inheriting from this one.

close()[source]

Perform closing actions

notify_running()[source]
notify_sleeping()[source]
on_stop(fut: Future = None)[source]

Called when the task is cancelled or finished.

abstractmethod async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.Scheduler(clock: Clock = None, num_process_parallel=16, suspendable=True, observable=True)[source]

Bases: object

Scheduler for executing tasks.

resume(given_src)[source]

Resume a set of tasks triggered by the given src object.

Parameters:

given_src (object) – the src object

schedule_awaiting_task(coroutine, awaited_coroutine, on_stop=None, src=None)[source]

Schedule a task after future of other task returned.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • awaited_coroutine (asyncio.Future) – datetime defining when the task should start

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)[source]

Schedule a task when a specified condition is met. :param coroutine_creator: coroutine_creator creating coroutine to be scheduled :type coroutine_creator: coroutine_creator :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param src: creator of the task :type src: Object

schedule_conditional_task(coroutine, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)[source]

Schedule a task when a specified condition is met. :param coroutine: coroutine to be scheduled :type coroutine: Coroutine :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object

schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)[source]

Schedule an instantly executed task dispatched to another process. :param coroutine_creator: coroutine_creator to be scheduled :type coroutine_creator: :param src: creator of the task :type src: Object

schedule_instant_task(coroutine, on_stop=None, src=None)[source]

Schedule an instantly executed task.

Parameters:
  • coroutine – coroutine to be scheduled

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)[source]

Schedule an open end periodically executed task dispatched to another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • delay (float) – delay in between the cycles

  • src (Object) – creator of the task

schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)[source]

Schedule an open end periodically executed task. :param coroutine_func: coroutine function creating coros to be scheduled :type coroutine_func: Coroutine Function :param delay: delay in between the cycles :type delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object

schedule_process_task(task: ScheduledProcessTask, src=None)[source]

Schedule as task with asyncio in a different process managed by a ProcessWorkerPool in this Scheduler-object. For scheduling options see the subclasses of ScheduledProcessTask.

Parameters:
Returns:

future to check whether the task is done and to finally retrieve the result

Return type:

_type_

schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)[source]

Schedule an open end periodically executed task dispatched to another process.

Parameters:
  • coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – rrule object which gets executed

  • src (Object) – creator of the task

schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)[source]

Schedule a task using a fine-grained recurrency rule.

Parameters:
  • coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled

  • recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

schedule_task(task: ScheduledTask, src=None) Task[source]

Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.

Parameters:
  • task (ScheduledTask) – task to be scheduled

  • src – creator of the task

Type:

Object

schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)[source]

Schedule a task at specified unix timestamp dispatched to another process.

Parameters:
  • coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled

  • timestamp (float) – unix timestamp defining when the task should start

  • src (Object) – creator of the task

schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)[source]

Schedule a task at specified unix timestamp.

Parameters:
  • coroutine (Coroutine) – coroutine to be scheduled

  • timestamp (float) – timestamp defining when the task should start (unix timestamp)

  • on_stop (Object) – coroutine to run on stop

  • src (Object) – creator of the task

async shutdown()[source]

Shutdown internal process executor pool.

async sleep(t: float)[source]
Parameters:

t – The time to sleep [s]

async stop()[source]

Cancel all not finished scheduled tasks

async stop_tasks(task_list)[source]
suspend(given_src)[source]

Suspend a set of tasks triggered by the given src object.

Parameters:

given_src (object) – the src object

async tasks_complete(timeout=1, recursive=False)[source]

Finish all pending tasks using a timeout.

Args:

timeout (int, optional): waiting timeout. Defaults to 1.

async tasks_complete_or_sleeping()[source]
class mango.util.scheduling.Suspendable(coro, ext_contr_event=None, kill_event=None)[source]

Bases: object

Wraps a coroutine, intercepting __await__ to add the functionality of suspending.

property coro

Return the coroutine

Returns:

the coroutine

Return type:

a coroutine

is_suspended()[source]

Return whether the coro is suspended

Returns:

True if suspended, False otherwise

Return type:

bool

resume()[source]

Resume the coroutine

suspend()[source]

Suspend the wrapped coroutine (have to executed as task externally)

class mango.util.scheduling.TimestampScheduledProcessTask(coroutine_creator, timestamp: float, clock=None, on_stop=None)[source]

Bases: TimestampScheduledTask, ScheduledProcessTask

Timestamp based one-shot task.

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

class mango.util.scheduling.TimestampScheduledTask(coroutine, timestamp: float, clock=None, on_stop=None, observable=True)[source]

Bases: ScheduledTask

Timestamp based one-shot task. This task will get executed when a given unix timestamp is reached.

close()[source]

Perform closing actions

async run()[source]

Called via asyncio as asyncio.task.

Raises:

NotImplementedError: must be overwritten

Termination detection

async mango.util.termination_detection.tasks_complete_or_sleeping(container: Container, except_sources=['no_wait'])[source]
mango.util.termination_detection.unfinished_task_count(container: Container)[source]