Utilities¶
Clock¶
- class mango.util.clock.AsyncioClock[source]¶
Bases:
Clock
The AsyncioClock
- property time: float¶
Current time using the time module
- class mango.util.clock.Clock[source]¶
Bases:
ABC
Abstract class for clocks that can be used in mango
- property time: float¶
Returns the current time of the clock
Distributed clock¶
- class mango.util.distributed_clock.DistributedClockAgent[source]¶
Bases:
ClockAgent
- handle_message(content: float, meta)[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- class mango.util.distributed_clock.DistributedClockManager(receiver_clock_addresses: list)[source]¶
Bases:
ClockAgent
- async broadcast(message, add_futures=True)[source]¶
Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received.
- Args:
message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True.
- async distribute_time(time=None)[source]¶
Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received.
- Args:
time (number, optional): The new time which is set. Defaults to None.
- Returns:
number or None: The time at which the next event happens
- async get_next_event()[source]¶
Get the next event from the scheduler by requesting all known clock agents
- handle_message(content: float, meta)[source]¶
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- async send_current_time(time=None)[source]¶
Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here.
- Args:
time (number, optional): The current time which is set. Defaults to None.
- async wait_all_online()[source]¶
sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation.
This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages.
Scheduling¶
Module for commonly used time based scheduled task executed inside one agent.
- class mango.util.scheduling.AwaitingProcessTask(coroutine_creator, awaited_coroutine_creator, clock=None, on_stop=None)[source]¶
Bases:
AwaitingTask
,ScheduledProcessTask
Await a coroutine, then execute another.
- class mango.util.scheduling.AwaitingTask(coroutine, awaited_coroutine, clock=None, on_stop=None, observable=True)[source]¶
Bases:
ScheduledTask
Awaiting task. This task will execute a given coroutine after another given coroutine has been awaited. Can be useful if you want to execute something after a Future has finished.
- class mango.util.scheduling.ConditionalProcessTask(coro_func, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None)[source]¶
Bases:
ConditionalTask
,ScheduledProcessTask
Task which will get executed as soon as the given condition is fulfilled.
- class mango.util.scheduling.ConditionalTask(coroutine, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None, observable=True)[source]¶
Bases:
ScheduledTask
Task which will get executed as soon as the given condition is fulfilled.
- class mango.util.scheduling.InstantScheduledProcessTask(coroutine_creator, clock: Clock = None, on_stop=None)[source]¶
Bases:
TimestampScheduledProcessTask
One-shot task, which will get executed instantly.
- class mango.util.scheduling.InstantScheduledTask(coroutine, clock: Clock = None, on_stop=None, observable=True)[source]¶
Bases:
TimestampScheduledTask
One-shot task, which will get executed instantly.
- class mango.util.scheduling.PeriodicScheduledProcessTask(coroutine_func, delay, clock: Clock = None, on_stop=None)[source]¶
- class mango.util.scheduling.PeriodicScheduledTask(coroutine_func, delay, clock: Clock = None, on_stop=None, observable=True)[source]¶
Bases:
ScheduledTask
Class for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.
- class mango.util.scheduling.RecurrentScheduledProcessTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None)[source]¶
- class mango.util.scheduling.RecurrentScheduledTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None, observable=True)[source]¶
Bases:
ScheduledTask
Class for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.
- class mango.util.scheduling.ScheduledProcessControl(run_task_event: multiprocessing.synchronize.Event, kill_process_event: multiprocessing.synchronize.Event)[source]¶
Bases:
object
- kill_process_event: Event¶
- run_task_event: Event¶
- class mango.util.scheduling.ScheduledProcessTask(clock: Clock, on_stop=None, observable=False)[source]¶
Bases:
ScheduledTask
This alt-name marks a ScheduledTask as process compatible. This is necessary due to the fact that not everything can be transferred to other processes i.e. coroutines are bound to the current event-loop resp the current thread, so they won’t work in other processes. Furthermore, when using a ProcessTask you have to ensure, that the coroutine functions should not be bound to complex objects, meaning they should be static or bound to simple objects, which are transferable via pythons IPC implementation.
- class mango.util.scheduling.ScheduledTask(clock: Clock = None, observable=True, on_stop=None)[source]¶
Bases:
object
Base class for scheduled tasks in mango. Within this class it is possible to define what to do on execution and on stop. In most cases the logic should get passed as lambda while the scheduling logic is inside of class inheriting from this one.
- class mango.util.scheduling.Scheduler(clock: Clock = None, num_process_parallel=16, suspendable=True, observable=True)[source]¶
Bases:
object
Scheduler for executing tasks.
- resume(given_src)[source]¶
Resume a set of tasks triggered by the given src object.
- Parameters:
given_src (object) – the src object
- schedule_awaiting_task(coroutine, awaited_coroutine, on_stop=None, src=None)[source]¶
Schedule a task after future of other task returned.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
awaited_coroutine (asyncio.Future) – datetime defining when the task should start
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)[source]¶
Schedule a task when a specified condition is met. :param coroutine_creator: coroutine_creator creating coroutine to be scheduled :type coroutine_creator: coroutine_creator :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param src: creator of the task :type src: Object
- schedule_conditional_task(coroutine, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)[source]¶
Schedule a task when a specified condition is met. :param coroutine: coroutine to be scheduled :type coroutine: Coroutine :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)[source]¶
Schedule an instantly executed task dispatched to another process. :param coroutine_creator: coroutine_creator to be scheduled :type coroutine_creator: :param src: creator of the task :type src: Object
- schedule_instant_task(coroutine, on_stop=None, src=None)[source]¶
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)[source]¶
Schedule an open end periodically executed task dispatched to another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)[source]¶
Schedule an open end periodically executed task. :param coroutine_func: coroutine function creating coros to be scheduled :type coroutine_func: Coroutine Function :param delay: delay in between the cycles :type delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object
- schedule_process_task(task: ScheduledProcessTask, src=None)[source]¶
Schedule as task with asyncio in a different process managed by a ProcessWorkerPool in this Scheduler-object. For scheduling options see the subclasses of ScheduledProcessTask.
- Parameters:
task (ScheduledProcessTask) – task you want to schedule
src (Object) – creator of the task
- Returns:
future to check whether the task is done and to finally retrieve the result
- Return type:
_type_
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)[source]¶
Schedule an open end periodically executed task dispatched to another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – rrule object which gets executed
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)[source]¶
Schedule a task using a fine-grained recurrency rule.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None) Task [source]¶
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task (ScheduledTask) – task to be scheduled
src – creator of the task
- Type:
Object
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)[source]¶
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)[source]¶
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (float) – timestamp defining when the task should start (unix timestamp)
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- suspend(given_src)[source]¶
Suspend a set of tasks triggered by the given src object.
- Parameters:
given_src (object) – the src object
- class mango.util.scheduling.Suspendable(coro, ext_contr_event=None, kill_event=None)[source]¶
Bases:
object
Wraps a coroutine, intercepting __await__ to add the functionality of suspending.
- property coro¶
Return the coroutine
- Returns:
the coroutine
- Return type:
a coroutine
- class mango.util.scheduling.TimestampScheduledProcessTask(coroutine_creator, timestamp: float, clock=None, on_stop=None)[source]¶
Bases:
TimestampScheduledTask
,ScheduledProcessTask
Timestamp based one-shot task.
- class mango.util.scheduling.TimestampScheduledTask(coroutine, timestamp: float, clock=None, on_stop=None, observable=True)[source]¶
Bases:
ScheduledTask
Timestamp based one-shot task. This task will get executed when a given unix timestamp is reached.