"""
Module for commonly used time based scheduled task executed inside one agent.
"""
import asyncio
import concurrent.futures
import logging
from abc import abstractmethod
from asyncio import Future
from dataclasses import dataclass
from datetime import datetime, timezone
from multiprocessing import Manager
from multiprocessing.synchronize import Event as MultiprocessingEvent
from typing import Any
from dateutil.rrule import rrule
from mango.util.clock import AsyncioClock, Clock, ExternalClock
logger = logging.getLogger(__name__)
def _raise_exceptions(fut: asyncio.Future):
"""
Inline function used as a callback to raise exceptions.
:param fut: The Future object of the task
"""
try:
if fut.exception() is not None:
try:
raise fut.exception()
except BaseException:
logger.exception("got exception in scheduled event")
except asyncio.CancelledError:
pass # if this happens the task has been cancelled by mango
[docs]
@dataclass
class ScheduledProcessControl:
run_task_event: MultiprocessingEvent
kill_process_event: MultiprocessingEvent
[docs]
def kill_process(self):
self.kill_process_event.set()
[docs]
def init_process(self):
self.kill_process_event.clear()
[docs]
def resume_task(self):
self.run_task_event.set()
[docs]
def suspend_task(self):
self.run_task_event.clear()
[docs]
class Suspendable:
"""
Wraps a coroutine, intercepting __await__ to add the functionality of suspending.
"""
def __init__(self, coro, ext_contr_event=None, kill_event=None):
self._coro = coro
self._kill_event = kill_event
if ext_contr_event is not None:
self._can_run = ext_contr_event
else:
self._can_run = asyncio.Event()
self._can_run.set()
def __await__(self):
coro_iter = self._coro.__await__()
iter_send, iter_throw = coro_iter.send, coro_iter.throw
send, message = iter_send, None
while True:
try:
while not self._can_run.is_set():
if isinstance(self._can_run, asyncio.Event):
# essentially same as 'await self._can_run.wait()',
# not allowed here as this is not an async method
yield from self._can_run.wait().__await__()
else:
self._can_run.wait()
except BaseException as err:
send, message = iter_throw, err
if self._kill_event is not None and self._kill_event.is_set():
return None
try:
# throw error or resume coroutine
signal = send(message)
except StopIteration as err:
return err.value
else:
send = iter_send
try:
# pass signal via yielding it
message = yield signal
except BaseException as err:
send, message = iter_throw, err
[docs]
def suspend(self):
"""
Suspend the wrapped coroutine (have to executed as task externally)
"""
self._can_run.clear()
[docs]
def is_suspended(self):
"""
Return whether the coro is suspended
:return: True if suspended, False otherwise
:rtype: bool
"""
return not self._can_run.is_set()
[docs]
def resume(self):
"""
Resume the coroutine
"""
self._can_run.set()
@property
def coro(self):
"""
Return the coroutine
:return: the coroutine
:rtype: a coroutine
"""
return self._coro
# asyncio tasks
def _close_coro(coro):
try:
coro.close()
except Exception:
pass
[docs]
class ScheduledTask:
"""
Base class for scheduled tasks in mango. Within this class it is possible to
define what to do on execution and on stop. In most cases the logic should get
passed as lambda while the scheduling logic is inside of class inheriting from this one.
"""
def __init__(self, clock: Clock = None, observable=True, on_stop=None) -> None:
self.clock = clock if clock is not None else AsyncioClock()
self._on_stop_hook_in = on_stop
self._is_observable = observable
if self._is_observable:
self._is_sleeping = asyncio.Future()
self._is_done = asyncio.Future()
[docs]
def notify_sleeping(self):
if self._is_observable:
self._is_sleeping.set_result(True)
[docs]
def notify_running(self):
if self._is_observable:
self._is_sleeping = asyncio.Future()
[docs]
@abstractmethod
async def run(self):
"""Called via asyncio as asyncio.task.
Raises:
NotImplementedError: must be overwritten
"""
raise NotImplementedError
[docs]
def on_stop(self, fut: asyncio.Future = None):
"""
Called when the task is cancelled or finished.
"""
if self._on_stop_hook_in is not None:
self._on_stop_hook_in(fut)
if self._is_observable:
self._is_done.set_result(True)
self.close()
[docs]
def close(self):
"""Perform closing actions"""
[docs]
class TimestampScheduledTask(ScheduledTask):
"""
Timestamp based one-shot task. This task will get executed when a given unix timestamp is reached.
"""
def __init__(
self, coroutine, timestamp: float, clock=None, on_stop=None, observable=True
):
super().__init__(clock, on_stop=on_stop, observable=observable)
self._timestamp = timestamp
self._coro = coroutine
async def _wait(self, timestamp: float):
sleep_future: asyncio.Future = self.clock.sleep(timestamp - self.clock.time)
self.notify_sleeping()
await sleep_future
self.notify_running()
[docs]
async def run(self):
await self._wait(self._timestamp)
return await self._coro
[docs]
def close(self):
_close_coro(self._coro)
[docs]
class AwaitingTask(ScheduledTask):
"""
Awaiting task. This task will execute a given coroutine after another given coroutine has been awaited.
Can be useful if you want to execute something after a Future has finished.
"""
def __init__(
self, coroutine, awaited_coroutine, clock=None, on_stop=None, observable=True
):
super().__init__(clock, on_stop=on_stop, observable=observable)
self._coroutine = coroutine
self._awaited_coroutine = awaited_coroutine
[docs]
async def run(self):
self.notify_sleeping()
await self._awaited_coroutine
self.notify_running()
return await self._coroutine
[docs]
def close(self):
_close_coro(self._awaited_coroutine)
_close_coro(self._coroutine)
[docs]
class InstantScheduledTask(TimestampScheduledTask):
"""
One-shot task, which will get executed instantly.
"""
def __init__(self, coroutine, clock: Clock = None, on_stop=None, observable=True):
if clock is None:
clock = AsyncioClock()
super().__init__(
coroutine, clock.time, clock=clock, on_stop=on_stop, observable=observable
)
[docs]
def close(self):
_close_coro(self._coro)
[docs]
class PeriodicScheduledTask(ScheduledTask):
"""
Class for periodic scheduled tasks. It enables to create a task for an agent
which will get executed periodically with a specified delay.
"""
def __init__(
self, coroutine_func, delay, clock: Clock = None, on_stop=None, observable=True
):
super().__init__(clock, on_stop=on_stop, observable=observable)
self._stopped = False
self._coroutine_func = coroutine_func
self._delay = delay
[docs]
async def run(self):
while not self._stopped:
await self._coroutine_func()
sleep_future: asyncio.Future = self.clock.sleep(self._delay)
self.notify_sleeping()
await sleep_future
self.notify_running()
[docs]
class RecurrentScheduledTask(ScheduledTask):
"""
Class for periodic scheduled tasks. It enables to create a task for an agent
which will get executed periodically with a specified delay.
"""
def __init__(
self,
coroutine_func,
recurrency: rrule,
clock: Clock = None,
on_stop=None,
observable=True,
):
super().__init__(clock, on_stop=on_stop, observable=observable)
self._recurrency_rule = recurrency
self._stopped = False
self._coroutine_func = coroutine_func
[docs]
async def run(self):
while not self._stopped:
current_time = datetime.fromtimestamp(
self.clock.time, tz=timezone.utc
).replace(tzinfo=None)
after = self._recurrency_rule.after(current_time)
# after can be None, if until or count was set on the rrule
if after is None:
self._stopped = True
else:
delay = (after - current_time).total_seconds()
sleep_future: asyncio.Future = self.clock.sleep(delay)
self.notify_sleeping()
await sleep_future
self.notify_running()
await self._coroutine_func()
[docs]
class ConditionalTask(ScheduledTask):
"""Task which will get executed as soon as the given condition is fulfilled."""
def __init__(
self,
coroutine,
condition_func,
lookup_delay=0.1,
clock: Clock = None,
on_stop=None,
observable=True,
):
super().__init__(clock=clock, on_stop=on_stop, observable=observable)
assert coroutine is not None
self._condition = condition_func
self._coro = coroutine
self._delay = lookup_delay
[docs]
async def run(self):
while not self._condition():
sleep_future: asyncio.Future = self.clock.sleep(self._delay)
self.notify_sleeping()
await sleep_future
self.notify_running()
return await self._coro
[docs]
def close(self):
_close_coro(self._coro)
# process tasks
[docs]
class ScheduledProcessTask(ScheduledTask):
# Mark class as task for an external process
"""
This alt-name marks a ScheduledTask as process compatible.
This is necessary due to the fact that not everything can be transferred to other processes i.e. coroutines are
bound to the current event-loop resp the current thread, so they won't work in other processes.
Furthermore, when using a ProcessTask you have to ensure, that the coroutine functions should not be bound to
complex objects, meaning they should be static or bound to simple objects, which are transferable
via pythons IPC implementation.
"""
def __init__(self, clock: Clock, on_stop=None, observable=False):
if isinstance(clock, ExternalClock):
raise ValueError("Process Tasks do currently not work with external clocks")
super().__init__(clock=clock, observable=observable, on_stop=on_stop)
[docs]
class TimestampScheduledProcessTask(TimestampScheduledTask, ScheduledProcessTask):
"""
Timestamp based one-shot task.
"""
def __init__(self, coroutine_creator, timestamp: float, clock=None, on_stop=None):
super().__init__(
coroutine_creator, timestamp, clock, on_stop=on_stop, observable=False
)
[docs]
async def run(self):
await self._wait(self._timestamp)
return await self._coro()
[docs]
class AwaitingProcessTask(AwaitingTask, ScheduledProcessTask):
"""
Await a coroutine, then execute another.
"""
def __init__(
self, coroutine_creator, awaited_coroutine_creator, clock=None, on_stop=None
):
super().__init__(
coroutine_creator,
awaited_coroutine_creator,
clock,
on_stop=on_stop,
observable=False,
)
[docs]
async def run(self):
await self._awaited_coroutine()
return await self._coroutine()
[docs]
class InstantScheduledProcessTask(TimestampScheduledProcessTask):
"""One-shot task, which will get executed instantly."""
def __init__(self, coroutine_creator, clock: Clock = None, on_stop=None):
if clock is None:
clock = AsyncioClock()
super().__init__(
coroutine_creator,
timestamp=clock.time,
clock=clock,
on_stop=on_stop,
)
[docs]
class PeriodicScheduledProcessTask(PeriodicScheduledTask, ScheduledProcessTask):
def __init__(self, coroutine_func, delay, clock: Clock = None, on_stop=None):
super().__init__(coroutine_func, delay, clock, on_stop=on_stop)
[docs]
class RecurrentScheduledProcessTask(RecurrentScheduledTask, ScheduledProcessTask):
def __init__(
self, coroutine_func, recurrency: rrule, clock: Clock = None, on_stop=None
):
super().__init__(
coroutine_func, recurrency, clock, on_stop=on_stop, observable=False
)
[docs]
class ConditionalProcessTask(ConditionalTask, ScheduledProcessTask):
"""
Task which will get executed as soon as the given condition is fulfilled.
"""
def __init__(
self,
coro_func,
condition_func,
lookup_delay=0.1,
clock: Clock = None,
on_stop=None,
):
super().__init__(
coro_func,
condition_func,
lookup_delay,
clock=clock,
on_stop=on_stop,
observable=False,
)
[docs]
async def run(self):
while not self._condition():
await self.clock.sleep(self._delay)
return await self._coro()
def _create_asyncio_context():
asyncio.set_event_loop(asyncio.new_event_loop())
[docs]
class Scheduler:
"""Scheduler for executing tasks."""
def __init__(
self,
clock: Clock = None,
num_process_parallel=16,
suspendable=True,
observable=True,
):
# List of Tuples with asyncio.Future, ScheduledTask, Suspendable coro, Source
self._scheduled_tasks: list[
tuple[ScheduledTask, asyncio.Future, Suspendable, Any]
] = []
self.clock = clock if clock is not None else AsyncioClock()
self._scheduled_process_tasks: list[
tuple[ScheduledProcessTask, Future, ScheduledProcessControl, Any]
] = []
self._manager = None
self._num_process_parallel = num_process_parallel
self._process_pool_exec = None
self.suspendable = suspendable
self.observable = observable
@staticmethod
def _run_task_in_p_context(
task, scheduled_process_control: ScheduledProcessControl
):
try:
coro = Suspendable(
task.run(),
ext_contr_event=scheduled_process_control.run_task_event,
kill_event=scheduled_process_control.kill_process_event,
)
return asyncio.get_event_loop().run_until_complete(coro)
finally:
pass
[docs]
async def sleep(self, t: float):
"""
:param t: The time to sleep [s]
"""
return await self.clock.sleep(t)
# conv methods for asyncio Tasks
[docs]
def schedule_task(self, task: ScheduledTask, src=None) -> asyncio.Task:
"""
Schedule a task with asyncio. When the task is finished, if finite, its automatically removed afterwards.
For scheduling options see the subclasses of ScheduledTask.
:param task: task to be scheduled
:type task: ScheduledTask
:param src: creator of the task
:type: Object
"""
l_task = None
if self.suspendable:
coro = Suspendable(task.run())
l_task = asyncio.ensure_future(coro)
else:
coro = task.run()
l_task = asyncio.create_task(coro)
l_task.add_done_callback(task.on_stop)
l_task.add_done_callback(_raise_exceptions)
l_task.add_done_callback(self._remove_task)
self._scheduled_tasks.append((task, l_task, coro, src))
return l_task
[docs]
def schedule_timestamp_task(
self, coroutine, timestamp: float, on_stop=None, src=None
):
"""Schedule a task at specified unix timestamp.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param timestamp: timestamp defining when the task should start (unix timestamp)
:type timestamp: float
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
TimestampScheduledTask(
coroutine=coroutine,
timestamp=timestamp,
clock=self.clock,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
[docs]
def schedule_instant_task(self, coroutine, on_stop=None, src=None):
"""Schedule an instantly executed task.
:param coroutine: coroutine to be scheduled
:type coroutine:
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
InstantScheduledTask(
coroutine=coroutine,
clock=self.clock,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
[docs]
def schedule_periodic_task(self, coroutine_func, delay, on_stop=None, src=None):
"""
Schedule an open end periodically executed task.
:param coroutine_func: coroutine function creating coros to be scheduled
:type coroutine_func: Coroutine Function
:param delay: delay in between the cycles
:type delay: float
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
PeriodicScheduledTask(
coroutine_func=coroutine_func,
delay=delay,
clock=self.clock,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
[docs]
def schedule_recurrent_task(
self, coroutine_func, recurrency, on_stop=None, src=None
):
"""Schedule a task using a fine-grained recurrency rule.
:param coroutine_func: coroutine function creating coros to be scheduled
:type coroutine_func: Coroutine Function
:param recurrency: recurrency rule to calculate next event
:type recurrency: dateutil.rrule.rrule
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
RecurrentScheduledTask(
coroutine_func=coroutine_func,
recurrency=recurrency,
clock=self.clock,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
[docs]
def schedule_conditional_task(
self,
coroutine,
condition_func,
lookup_delay: float = 0.1,
on_stop=None,
src=None,
):
"""
Schedule a task when a specified condition is met.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param condition_func: function for determining whether the condition is fulfilled
:type condition_func: lambda () -> bool
:param lookup_delay: delay between checking the condition [s]
:type lookup_delay: float
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
ConditionalTask(
coroutine=coroutine,
condition_func=condition_func,
clock=self.clock,
lookup_delay=lookup_delay,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
[docs]
def schedule_awaiting_task(
self, coroutine, awaited_coroutine, on_stop=None, src=None
):
"""Schedule a task after future of other task returned.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param awaited_coroutine: datetime defining when the task should start
:type awaited_coroutine: asyncio.Future
:param on_stop: coroutine to run on stop
:type on_stop: Object
:param src: creator of the task
:type src: Object
"""
return self.schedule_task(
AwaitingTask(
coroutine=coroutine,
awaited_coroutine=awaited_coroutine,
clock=self.clock,
on_stop=on_stop,
observable=self.observable,
),
src=src,
)
# conv. methods for process tasks
[docs]
def schedule_process_task(self, task: ScheduledProcessTask, src=None):
"""
Schedule as task with asyncio in a different process managed by a ProcessWorkerPool in this Scheduler-object.
For scheduling options see the subclasses of ScheduledProcessTask.
:param task: task you want to schedule
:type task: ScheduledProcessTask
:return: future to check whether the task is done and to finally retrieve the result
:rtype: _type_
:param src: creator of the task
:type src: Object
"""
if self._process_pool_exec is None:
self._process_pool_exec = concurrent.futures.ProcessPoolExecutor(
max_workers=self._num_process_parallel,
initializer=_create_asyncio_context,
)
loop = asyncio.get_running_loop()
if self._manager is None:
self._manager = Manager()
scheduled_process_control = ScheduledProcessControl(
run_task_event=self._manager.Event(),
kill_process_event=self._manager.Event(),
)
scheduled_process_control.init_process()
scheduled_process_control.resume_task()
l_task = asyncio.ensure_future(
loop.run_in_executor(
self._process_pool_exec,
Scheduler._run_task_in_p_context,
task,
scheduled_process_control,
)
)
l_task.add_done_callback(self._remove_process_task)
l_task.add_done_callback(task.on_stop)
l_task.add_done_callback(_raise_exceptions)
self._scheduled_process_tasks.append(
(task, l_task, scheduled_process_control, src)
)
return l_task
[docs]
def schedule_timestamp_process_task(
self, coroutine_creator, timestamp: float, on_stop=None, src=None
):
"""Schedule a task at specified unix timestamp dispatched to another process.
:param coroutine_creator: coroutine_creator creating coroutine to be scheduled
:type coroutine_creator: coroutine_creator
:param timestamp: unix timestamp defining when the task should start
:type timestamp: float
:param src: creator of the task
:type src: Object
"""
return self.schedule_process_task(
TimestampScheduledProcessTask(
coroutine_creator=coroutine_creator,
timestamp=timestamp,
clock=self.clock,
on_stop=on_stop,
),
src=src,
)
[docs]
def schedule_instant_process_task(self, coroutine_creator, on_stop=None, src=None):
"""
Schedule an instantly executed task dispatched to another process.
:param coroutine_creator: coroutine_creator to be scheduled
:type coroutine_creator:
:param src: creator of the task
:type src: Object
"""
return self.schedule_process_task(
InstantScheduledProcessTask(
coroutine_creator=coroutine_creator, on_stop=on_stop
),
src=src,
)
[docs]
def schedule_periodic_process_task(
self, coroutine_creator, delay, on_stop=None, src=None
):
"""Schedule an open end periodically executed task dispatched to another process.
:param coroutine_creator: coroutine function creating coros to be scheduled
:type coroutine_creator: Coroutine Function
:param delay: delay in between the cycles
:type delay: float
:param src: creator of the task
:type src: Object
"""
return self.schedule_process_task(
PeriodicScheduledProcessTask(
coroutine_func=coroutine_creator,
delay=delay,
clock=self.clock,
on_stop=on_stop,
),
src=src,
)
[docs]
def schedule_recurrent_process_task(
self, coroutine_creator, recurrency, on_stop=None, src=None
):
"""Schedule an open end periodically executed task dispatched to another process.
:param coroutine_creator: coroutine function creating coros to be scheduled
:type coroutine_creator: Coroutine Function
:param recurrency: rrule object which gets executed
:type recurrency: dateutil.rrule.rrule
:param src: creator of the task
:type src: Object
"""
return self.schedule_process_task(
RecurrentScheduledProcessTask(
coroutine_func=coroutine_creator,
recurrency=recurrency,
clock=self.clock,
on_stop=on_stop,
),
src=src,
)
[docs]
def schedule_conditional_process_task(
self,
coroutine_creator,
condition_func,
lookup_delay: float = 0.1,
on_stop=None,
src=None,
):
"""
Schedule a task when a specified condition is met.
:param coroutine_creator: coroutine_creator creating coroutine to be scheduled
:type coroutine_creator: coroutine_creator
:param condition_func: function for determining whether the condition is fulfilled
:type condition_func: lambda () -> bool
:param lookup_delay: delay between checking the condition [s]
:type lookup_delay: float
:param src: creator of the task
:type src: Object
"""
return self.schedule_process_task(
ConditionalProcessTask(
coro_func=coroutine_creator,
condition_func=condition_func,
lookup_delay=lookup_delay,
on_stop=on_stop,
clock=self.clock,
),
src=src,
)
# methods to suspend or resume tasks
[docs]
def suspend(self, given_src):
"""Suspend a set of tasks triggered by the given src object.
:param given_src: the src object
:type given_src: object
"""
if not self.suspendable:
raise Exception("The scheduler is configured as non-suspendable!")
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.suspend()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.suspend_task()
[docs]
def resume(self, given_src):
"""Resume a set of tasks triggered by the given src object.
:param given_src: the src object
:type given_src: object
"""
if not self.suspendable:
raise Exception("The scheduler is configured as non-suspendable!")
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.resume()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.resume_task()
def _remove_process_task(self, fut=asyncio.Future):
for i in range(len(self._scheduled_process_tasks)):
_, task, scheduled_process_control, _ = self._scheduled_process_tasks[i]
if task == fut:
scheduled_process_control.resume_task()
scheduled_process_control.kill_process()
del self._scheduled_process_tasks[i]
break
# methods for removing tasks, stopping or shutting down
def _remove_task(self, fut=asyncio.Future):
self._remove_generic_task(self._scheduled_tasks, fut=fut)
def _remove_generic_task(self, target_list, fut=asyncio.Future):
for i in range(len(target_list)):
_, task, _, _ = target_list[i]
if task == fut:
del target_list[i]
break
[docs]
async def stop_tasks(self, task_list):
for i in range(len(task_list) - 1, -1, -1):
_, task, _, _ = task_list[i]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
[docs]
async def stop(self):
"""
Cancel all not finished scheduled tasks
"""
await self.stop_tasks(self._scheduled_tasks)
await self.stop_tasks(self._scheduled_process_tasks)
[docs]
async def tasks_complete(self, timeout=1, recursive=False):
"""Finish all pending tasks using a timeout.
Args:
timeout (int, optional): waiting timeout. Defaults to 1.
"""
for _, task, _, _ in self._scheduled_tasks + self._scheduled_process_tasks:
await asyncio.wait_for(task, timeout=timeout)
# As it might happen that tasks spawn new tasks, one might want to finish these tasks
# as well. Caution: this can result in an infinte loop, f.e. if a tasks spawns itself
# after completion
if recursive and len(self._scheduled_tasks + self._scheduled_process_tasks) > 0:
await self.tasks_complete(timeout=timeout, recursive=recursive)
[docs]
async def tasks_complete_or_sleeping(self):
""" """
sleeping_tasks = []
# we need to use the while loop here, as new tasks may have been scheduled while waiting for other tasks
while len(self._scheduled_tasks + self._scheduled_process_tasks) > len(
sleeping_tasks
):
for scheduled_task, task, _, _ in (
self._scheduled_tasks + self._scheduled_process_tasks
):
await asyncio.wait(
[scheduled_task._is_sleeping, scheduled_task._is_done],
return_when=asyncio.FIRST_COMPLETED,
)
if (
scheduled_task._is_sleeping.done()
and scheduled_task not in sleeping_tasks
):
# we need to recognize how many sleeping tasks we have in order to find out if all tasks are done
sleeping_tasks.append(scheduled_task)
[docs]
async def shutdown(self):
"""
Shutdown internal process executor pool.
"""
# resume all process so they can get shutdown
for _, _, scheduled_process_control, _ in self._scheduled_process_tasks:
scheduled_process_control.kill_process()
if len(self._scheduled_tasks) > 0:
logger.debug(
"There are still scheduled tasks running on shutdown %s",
self._scheduled_tasks,
)
await self.stop()
for task, _, _, _ in self._scheduled_tasks:
task.close()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()