mango.util package
mango.util.clock module
- class mango.util.clock.AsyncioClock
Bases:
ClockThe AsyncioClock
- sleep(t) Future
Sleeping via asyncio sleep
- property time: float
Current time using the time module
- class mango.util.clock.Clock
Bases:
ABCAbstract class for clocks that can be used in mango
- abstractmethod sleep(t: float) Future
- property time: float
Returns the current time of the clock
- class mango.util.clock.ExternalClock(start_time: float = 0)
Bases:
ClockAn external clock that proceeds only when set_time is called
- get_next_activity() float
- set_time(t: float)
New time is set
- sleep(t: float) Future
Sleeps for t based on the external clock
- property time: float
Current time of the external clock
mango.util.distributed_clock module
- class mango.util.distributed_clock.ClockAgent(container: Container, suggested_aid: str = None, suspendable_tasks=True, observable_tasks=True)
Bases:
Agent- async wait_all_done()
- class mango.util.distributed_clock.DistributedClockAgent(container, suggested_aid='clock_agent')
Bases:
ClockAgent- handle_message(content: float, meta)
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- class mango.util.distributed_clock.DistributedClockManager(container, receiver_clock_addresses: list)
Bases:
ClockAgent- async broadcast(message, add_futures=True)
Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received.
- Args:
message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True.
- async distribute_time(time=None)
Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received.
- Args:
time (number, optional): The new time which is set. Defaults to None.
- Returns:
number or None: The time at which the next event happens
- async get_next_event()
Get the next event from the scheduler by requesting all known clock agents
- handle_message(content: float, meta)
Has to be implemented by the user. This method is called when a message is received at the agents inbox. :param content: The deserialized message object :param meta: Meta details of the message. In case of mqtt this dict includes at least the field ‘topic’
- async send_current_time(time=None)
Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here.
- Args:
time (number, optional): The current time which is set. Defaults to None.
- async shutdown()
Shutdown all tasks that are running and deregister from the container
- async wait_all_online()
sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation.
This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages.
- async wait_for_futures()
Waits for all futures in self.futures
Gives debug log output to see which agent is waited for.
mango.util.multiprocessing module
Utility classes for handling multiprocessing in mango, especially focusing on IPC in an asyncio context.
The package contains two different variants of async pipes for IPC: duplex, and non-duplex pipes. For creating these pipes, use aiopipe() or aioduplex(). The idea of the code is based on the pypi package ‘aiopipe’.
These pipes provide async compatible APIs, here a general example:
main, sub = aioduplex()
with sub.detach() as sub:
# start your process with sub as inherited pipe
async with main.open() as (rx, tx):
item = await rx.read_object()
tx.write_object()
...
Further there are internal connection objects, which can be used if a synchronous access outside of the asyncio loop is necessary: ‘main.write_connection, main.read_connection’. Note, that you can’t use ‘write_connection’ if the pipe has been opened with ‘open()’, as this will lock the write access to the pipe. For that case you could use ‘open_readonly()’,
- class mango.util.multiprocessing.AioDuplex(rx: AioPipeReader, tx: AioPipeWriter)
Bases:
objectCombines AioPipeReader and AioPipeWriter in one class. Therfore, ‘open’ will open both streams, and ‘detach’ will detach both streams/fds.
- open() AsyncContextManager[Tuple[StreamReader, StreamWriter]]
- open_readonly() AsyncContextManager[Tuple[StreamReader]]
- open_writeonly() AsyncContextManager[Tuple[StreamWriter]]
- property read_connection: Connection
- property write_connection: Connection
- class mango.util.multiprocessing.AioPipeReader(fd)
Bases:
AioPipeStreamReader for a pipe which can attach its file descriptor to an asyncio event loop, to enable asynchronous reading from the pipe fd.
- class mango.util.multiprocessing.AioPipeStream(fd)
Bases:
objectStream-like wrapper for a file descriptor, which implements ‘async with open’ and ‘with detach’
- close()
- detach() ContextManager[AioPipeStream]
- open()
- class mango.util.multiprocessing.AioPipeWriter(fd)
Bases:
AioPipeStreamWriter for a pipe which can attach its file descriptor to an asyncio event loop, to enable asynchronous writing to the pipe fd.
- class mango.util.multiprocessing.ObjectStreamReader(stream_reader: StreamReader)
Bases:
objectWraps a StreamReader to add the ability to read pickled objects from the stream
- async read_bytes()
- async read_object()
- class mango.util.multiprocessing.ObjectStreamWriter(stream_writer: StreamWriter)
Bases:
objectWraps a StreamWriter to add the ability to write objects to the stream
- async drain()
- write_bytes(buf)
- write_object(object)
- class mango.util.multiprocessing.OwnershiplessConnection(handle, readable=True, writable=True)
Bases:
ConnectionSubclass of the mp Connection, which marks it as ownershipless. Following this class won’t close the fd under any circumstance on its own.
- Parameters:
Connection (multiprocessing.connection.Connection) – Connection object
- close()
Close the connection
- recv()
Receive a (picklable) object
- send(obj)
Send a (picklable) object
- class mango.util.multiprocessing.PipeToWriteQueue(pipe: AioDuplex)
Bases:
objectHelper class to make a aio pipe imitate the write/put part of asyncio.Queue.
- put(object)
- put_nowait(object)
- mango.util.multiprocessing.aioduplex() Tuple[AioDuplex, AioDuplex]
Create a pair of pipe endpoints, both readable and writable (duplex).
- Returns:
AioDuplex-Pair
- mango.util.multiprocessing.aiopipe() Tuple[AioPipeReader, AioPipeWriter]
Create a pair of pipe endpoints, both readable and writable (duplex).
- Returns:
Reader-, Writer-Pair
mango.util.scheduling module
Module for commonly used time based scheduled task executed inside one agent.
- class mango.util.scheduling.AwaitingProcessTask(coroutine_creator, awaited_coroutine_creator, clock=None, on_stop=None)
Bases:
AwaitingTask,ScheduledProcessTaskAwait a coroutine, then execute another.
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.AwaitingTask(coroutine, awaited_coroutine, clock=None, on_stop=None, observable=True)
Bases:
ScheduledTaskAwaiting task. This task will execute a given coroutine after another given coroutine has been awaited. Can be useful if you want to execute something after a Future has finished.
- close()
Perform closing actions
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.ConditionalProcessTask(coro_func, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None)
Bases:
ConditionalTask,ScheduledProcessTaskTask which will get executed as soon as the given condition is fulfilled.
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.ConditionalTask(coroutine, condition_func, lookup_delay=0.1, clock: Clock = None, on_stop=None, observable=True)
Bases:
ScheduledTaskTask which will get executed as soon as the given condition is fulfilled.
- close()
Perform closing actions
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.InstantScheduledProcessTask(coroutine_creator, clock: Clock = None, on_stop=None)
Bases:
TimestampScheduledProcessTaskOne-shot task, which will get executed instantly.
- class mango.util.scheduling.InstantScheduledTask(coroutine, clock: Clock = None, on_stop=None, observable=True)
Bases:
TimestampScheduledTaskOne-shot task, which will get executed instantly.
- close()
Perform closing actions
- class mango.util.scheduling.PeriodicScheduledProcessTask(coroutine_func, delay, clock: Clock = None, on_stop=None)
- class mango.util.scheduling.PeriodicScheduledTask(coroutine_func, delay, clock: Clock = None, on_stop=None, observable=True)
Bases:
ScheduledTaskClass for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.RecurrentScheduledProcessTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None)
- class mango.util.scheduling.RecurrentScheduledTask(coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None, observable=True)
Bases:
ScheduledTaskClass for periodic scheduled tasks. It enables to create a task for an agent which will get executed periodically with a specified delay.
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.ScheduledProcessControl(run_task_event: multiprocessing.synchronize.Event, kill_process_event: multiprocessing.synchronize.Event)
Bases:
object- init_process()
- kill_process()
- kill_process_event: Event
- resume_task()
- run_task_event: Event
- suspend_task()
- class mango.util.scheduling.ScheduledProcessTask(clock: Clock, on_stop=None, observable=False)
Bases:
ScheduledTaskThis alt-name marks a ScheduledTask as process compatible. This is necessary due to the fact that not everything can be transferred to other processes i.e. coroutines are bound to the current event-loop resp the current thread, so they won’t work in other processes. Furthermore, when using a ProcessTask you have to ensure, that the coroutine functions should not be bound to complex objects, meaning they should be static or bound to simple objects, which are transferable via pythons IPC implementation.
- class mango.util.scheduling.ScheduledTask(clock: Clock = None, observable=True, on_stop=None)
Bases:
objectBase class for scheduled tasks in mango. Within this class it is possible to define what to do on execution and on stop. In most cases the logic should get passed as lambda while the scheduling logic is inside of class inheriting from this one.
- close()
Perform closing actions
- notify_running()
- notify_sleeping()
- on_stop(fut: Future = None)
Called when the task is cancelled or finished.
- abstractmethod async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.Scheduler(clock: Clock = None, num_process_parallel=16, suspendable=True, observable=True)
Bases:
objectScheduler for executing tasks.
- resume(given_src)
Resume a set of tasks triggered by the given src object.
- Parameters:
given_src (object) – the src object
- schedule_awaiting_task(coroutine, awaited_coroutine, on_stop=None, src=None)
Schedule a task after future of other task returned.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
awaited_coroutine (asyncio.Future) – datetime defining when the task should start
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_conditional_process_task(coroutine_creator, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)
Schedule a task when a specified condition is met. :param coroutine_creator: coroutine_creator creating coroutine to be scheduled :type coroutine_creator: coroutine_creator :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param src: creator of the task :type src: Object
- schedule_conditional_task(coroutine, condition_func, lookup_delay: float = 0.1, on_stop=None, src=None)
Schedule a task when a specified condition is met. :param coroutine: coroutine to be scheduled :type coroutine: Coroutine :param condition_func: function for determining whether the condition is fulfilled :type condition_func: lambda () -> bool :param lookup_delay: delay between checking the condition [s] :type lookup_delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object
- schedule_instant_process_task(coroutine_creator, on_stop=None, src=None)
Schedule an instantly executed task dispatched to another process. :param coroutine_creator: coroutine_creator to be scheduled :type coroutine_creator: :param src: creator of the task :type src: Object
- schedule_instant_task(coroutine, on_stop=None, src=None)
Schedule an instantly executed task.
- Parameters:
coroutine – coroutine to be scheduled
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_periodic_process_task(coroutine_creator, delay, on_stop=None, src=None)
Schedule an open end periodically executed task dispatched to another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
delay (float) – delay in between the cycles
src (Object) – creator of the task
- schedule_periodic_task(coroutine_func, delay, on_stop=None, src=None)
Schedule an open end periodically executed task. :param coroutine_func: coroutine function creating coros to be scheduled :type coroutine_func: Coroutine Function :param delay: delay in between the cycles :type delay: float :param on_stop: coroutine to run on stop :type on_stop: Object :param src: creator of the task :type src: Object
- schedule_process_task(task: ScheduledProcessTask, src=None)
Schedule as task with asyncio in a different process managed by a ProcessWorkerPool in this Scheduler-object. For scheduling options see the subclasses of ScheduledProcessTask.
- Parameters:
task (ScheduledProcessTask) – task you want to schedule
src (Object) – creator of the task
- Returns:
future to check whether the task is done and to finally retrieve the result
- Return type:
_type_
- schedule_recurrent_process_task(coroutine_creator, recurrency, on_stop=None, src=None)
Schedule an open end periodically executed task dispatched to another process.
- Parameters:
coroutine_creator (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – rrule object which gets executed
src (Object) – creator of the task
- schedule_recurrent_task(coroutine_func, recurrency, on_stop=None, src=None)
Schedule a task using a fine-grained recurrency rule.
- Parameters:
coroutine_func (Coroutine Function) – coroutine function creating coros to be scheduled
recurrency (dateutil.rrule.rrule) – recurrency rule to calculate next event
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- schedule_task(task: ScheduledTask, src=None) Task
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards. For scheduling options see the subclasses of ScheduledTask.
- Parameters:
task (ScheduledTask) – task to be scheduled
src – creator of the task
- Type:
Object
- schedule_timestamp_process_task(coroutine_creator, timestamp: float, on_stop=None, src=None)
Schedule a task at specified unix timestamp dispatched to another process.
- Parameters:
coroutine_creator (coroutine_creator) – coroutine_creator creating coroutine to be scheduled
timestamp (float) – unix timestamp defining when the task should start
src (Object) – creator of the task
- schedule_timestamp_task(coroutine, timestamp: float, on_stop=None, src=None)
Schedule a task at specified unix timestamp.
- Parameters:
coroutine (Coroutine) – coroutine to be scheduled
timestamp (float) – timestamp defining when the task should start (unix timestamp)
on_stop (Object) – coroutine to run on stop
src (Object) – creator of the task
- async shutdown()
Shutdown internal process executor pool.
- async sleep(t: float)
- Parameters:
t – The time to sleep [s]
- async stop()
Cancel all not finished scheduled tasks
- suspend(given_src)
Suspend a set of tasks triggered by the given src object.
- Parameters:
given_src (object) – the src object
- async tasks_complete(timeout=1, recursive=False)
Finish all pending tasks using a timeout.
- Args:
timeout (int, optional): waiting timeout. Defaults to 1.
- async tasks_complete_or_sleeping()
- class mango.util.scheduling.Suspendable(coro, ext_contr_event=None, kill_event=None)
Bases:
objectWraps a coroutine, intercepting __await__ to add the functionality of suspending.
- property coro
Return the coroutine
- Returns:
the coroutine
- Return type:
a coroutine
- is_suspended()
Return whether the coro is suspended
- Returns:
True if suspended, False otherwise
- Return type:
bool
- resume()
Resume the coroutine
- suspend()
Suspend the wrapped coroutine (have to executed as task externally)
- class mango.util.scheduling.TimestampScheduledProcessTask(coroutine_creator, timestamp: float, clock=None, on_stop=None)
Bases:
TimestampScheduledTask,ScheduledProcessTaskTimestamp based one-shot task.
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten
- class mango.util.scheduling.TimestampScheduledTask(coroutine, timestamp: float, clock=None, on_stop=None, observable=True)
Bases:
ScheduledTaskTimestamp based one-shot task. This task will get executed when a given unix timestamp is reached.
- close()
Perform closing actions
- async run()
Called via asyncio as asyncio.task.
- Raises:
NotImplementedError: must be overwritten