[docs]classClock(ABC):""" Abstract class for clocks that can be used in mango """@propertydeftime(self)->float:""" Returns the current time of the clock """raiseNotImplementedError
[docs]classAsyncioClock(Clock):""" The AsyncioClock """def__init__(self):pass@propertydeftime(self)->float:""" Current time using the time module """returntime.time()
[docs]defsleep(self,t)->asyncio.Future:""" Sleeping via asyncio sleep """returnasyncio.sleep(t)
[docs]classExternalClock(Clock):""" An external clock that proceeds only when set_time is called """def__init__(self,start_time:float=0):self._time:float=start_timeself._futures:list[tuple[float,asyncio.Future]]=[]# list of all futures to be triggered@propertydeftime(self)->float:""" Current time of the external clock """returnself._time
[docs]defset_time(self,t:float):""" New time is set """ift<self._time:raiseValueError(f"Time must be > {self._time} but is {t}.")# set timeself._time=t# search for all futures that have to be triggerdkeys=[k[0]forkinself._futures]threshold=bisect.bisect_right(keys,t)# storecurrent_futures,self._futures=(self._futures[:threshold],self._futures[threshold:],)# Tuple of time, futurefor_,futureincurrent_futures:# set result of future, if future is not already doneifnotfuture.done():future.set_result(True)
[docs]defsleep(self,t:float)->asyncio.Future:""" Sleeps for t based on the external clock """f=asyncio.Future()ift<=0:# trigger directly if time is <= 0f.set_result(None)returnf# insert future in sorted list of futureskeys=[k[0]forkinself._futures]index=bisect.bisect_right(keys,self.time+t,)self._futures.insert(index,(self.time+t,f))returnf