[docs]defhandle_message(self,content:float,meta):sender=sender_addr(meta)logger.debug("clockmanager: %s from %s",content,sender)ifcontent:assertisinstance(content,int|float),f"{content} was {type(content)}"self.schedules.append(content)ifnotself.futures[sender].done():self.futures[sender].set_result(True)else:# with as_agent_process - messages can come from ourselveslogger.debug("got another message from agent %s - %s",sender,content)
[docs]asyncdefbroadcast(self,message,add_futures=True):""" Broadcast the given message to all receiver clock addresses. If add_futures is set, a future is added which is finished when an answer by the receiving clock agent was received. Args: message (object): the given message add_futures (bool, optional): Adds futures which can be awaited until a response to a message is given. Defaults to True. """forreceiver_addrinself.receiver_clock_addresses:logger.debug("clockmanager send: %s - %s",message,receiver_addr)# in MQTT we can not be sure if the message was delivered# checking the return code here would only help for TCPawaitself.send_message(message,receiver_addr)ifadd_futures:self.futures[receiver_addr]=asyncio.Future()
[docs]asyncdefsend_current_time(self,time=None):""" Broadcasts the current time to all receiver clock addresses. Does not add futures to wait for responses, as no response is expected here. Args: time (number, optional): The current time which is set. Defaults to None. """time=timeorself.scheduler.clock.timeawaitself.broadcast(time,add_futures=False)
[docs]asyncdefwait_for_futures(self):""" Waits for all futures in self.futures Gives debug log output to see which agent is waited for. """forcontainer_id,futinlist(self.futures.items()):logger.debug("waiting for %s",container_id)# waits forever if manager was started first# as answer is never receivedawaitfut
[docs]asyncdefwait_all_online(self):""" sends a broadcast to ask for the next event to all expected addresses. Waits one second and repeats this behavior until a response by all addresses is receivd. This effectively waits until all agents are up and running and the manager can start the simulation. This is needed, as there is no way in paho mqtt to check whether a message was retrieved, except for by sending ping pong messages. """all_online=Falsewhilenotall_online:awaitself.broadcast("next_event")awaitasyncio.sleep(0)try:awaitasyncio.wait_for(self.wait_for_futures(),1)exceptTimeoutError:logger.info("waiting for all to come online")else:all_online=True
[docs]asyncdefget_next_event(self):"""Get the next event from the scheduler by requesting all known clock agents"""self.schedules=[]awaitself.broadcast("next_event")awaitasyncio.sleep(0)awaitself.wait_for_futures()# wait for our container tooawaitself.wait_all_done()next_activity=self.scheduler.clock.get_next_activity()ifnext_activityisnotNone:# logger.error(f"{next_activity}")self.schedules.append(next_activity)ifself.schedules:next_event=min(self.schedules)else:logger.info("%s: no new events, time stands still",self.aid)next_event=self.scheduler.clock.timeifnext_event<self.scheduler.clock.time:logger.info("%s: got old event, time stands still",self.aid)next_event=self.scheduler.clock.timelogger.debug("next event at %s",next_event)returnnext_event
[docs]asyncdefdistribute_time(self,time=None):""" Waits until the current container is done. Brodcasts the new time to all the other clock agents. Thn awaits until the work in the other agents is done and their next event is received. Args: time (number, optional): The new time which is set. Defaults to None. Returns: number or None: The time at which the next event happens """awaitself.wait_all_done()awaitself.send_current_time(time)ifnottime:time=awaitself.get_next_event()returntime
[docs]defhandle_message(self,content:float,meta):sender=sender_addr(meta)logger.info("clockagent: %s from %s",content,sender)ifcontent=="stop":ifnotself.stopped.done():self.stopped.set_result(True)elifcontent=="next_event":asyncdefwait_done():awaitself.wait_all_done()t=asyncio.create_task(wait_done())defrespond(fut:asyncio.Future=None):ifself.stopped.done():returnnext_time=self.scheduler.clock.get_next_activity()self.schedule_instant_message(next_time,sender_addr(meta))t.add_done_callback(respond)else:assertisinstance(content,int|float),f"{content} was {type(content)}"self.scheduler.clock.set_time(content)