[docs]defunfinished_task_count(container:Container):unfinished_tasks=0foragentincontainer._agents.values():sleeping_tasks=[]scheduled_tasks=(agent._scheduler._scheduled_tasks+agent._scheduler._scheduled_process_tasks)forscheduled_task,task,_,_inscheduled_tasks:ifscheduled_task._is_sleeping.done():# we need to recognize how many sleeping tasks we have in order to find out if all tasks are donesleeping_tasks.append(scheduled_task)unfinished_tasks+=len(scheduled_tasks)-len(sleeping_tasks)returnunfinished_tasks
[docs]asyncdeftasks_complete_or_sleeping(container:Container,except_sources=["no_wait"]):sleeping_tasks=[]task_list=[]# is None for containers in MirrorContainerProcessManagerifcontainer.inboxisnotNone:awaitcontainer.inbox.join()# python does not have do while patternforagentincontainer._agents.values():awaitagent.inbox.join()task_list.extend(agent.scheduler._scheduled_tasks)task_list.extend(agent.scheduler._scheduled_process_tasks)task_list=list(filter(lambdax:x[3]notinexcept_sources,task_list))whilelen(task_list)>len(sleeping_tasks):# sleep needed so that asyncio tasks of this time step are correctly awaken.# await asyncio.sleep(0)ifcontainer.inboxisnotNone:awaitcontainer.inbox.join()forscheduled_task,task,_,_intask_list:awaitasyncio.wait([scheduled_task._is_sleeping,scheduled_task._is_done],return_when=asyncio.FIRST_COMPLETED,)if(scheduled_task._is_sleeping.done()andscheduled_tasknotinsleeping_tasks):# we need to recognize how many sleeping tasks we have in order to find out if all tasks are donesleeping_tasks.append(scheduled_task)# recreate task_list - as new tasks might have been addedtask_list=[]foragentincontainer._agents.values():awaitagent.inbox.join()task_list.extend(agent.scheduler._scheduled_tasks)task_list.extend(agent.scheduler._scheduled_process_tasks)task_list=list(filter(lambdax:x[3]notinexcept_sources,task_list))