Source code for mango.util.clock
import asyncio
import bisect
import time
from abc import ABC, abstractmethod
[docs]
class Clock(ABC):
"""
Abstract class for clocks that can be used in mango
"""
@property
def time(self) -> float:
"""
Returns the current time of the clock
"""
raise NotImplementedError
[docs]
@abstractmethod
def sleep(self, t: float) -> asyncio.Future:
raise NotImplementedError
[docs]
class AsyncioClock(Clock):
"""
The AsyncioClock
"""
def __init__(self):
pass
@property
def time(self) -> float:
"""
Current time using the time module
"""
return time.time()
[docs]
def sleep(self, t) -> asyncio.Future:
"""
Sleeping via asyncio sleep
"""
return asyncio.sleep(t)
[docs]
class ExternalClock(Clock):
"""
An external clock that proceeds only when set_time is called
"""
def __init__(self, start_time: float = 0):
self._time: float = start_time
self._futures: list[
tuple[float, asyncio.Future]
] = [] # list of all futures to be triggered
@property
def time(self) -> float:
"""
Current time of the external clock
"""
return self._time
[docs]
def set_time(self, t: float):
"""
New time is set
"""
if t < self._time:
raise ValueError(f"Time must be > {self._time} but is {t}.")
# set time
self._time = t
# search for all futures that have to be triggerd
keys = [k[0] for k in self._futures]
threshold = bisect.bisect_right(keys, t)
# store
current_futures, self._futures = (
self._futures[:threshold],
self._futures[threshold:],
)
# Tuple of time, future
for _, future in current_futures:
# set result of future, if future is not already done
if not future.done():
future.set_result(True)
[docs]
def sleep(self, t: float) -> asyncio.Future:
"""
Sleeps for t based on the external clock
"""
f = asyncio.Future()
if t <= 0:
# trigger directly if time is <= 0
f.set_result(None)
return f
# insert future in sorted list of futures
keys = [k[0] for k in self._futures]
index = bisect.bisect_right(
keys,
self.time + t,
)
self._futures.insert(index, (self.time + t, f))
return f
[docs]
def get_next_activity(self) -> float:
return None if len(self._futures) == 0 else self._futures[0][0]