Source code for mango.util.termination_detection

import asyncio

from mango.container.core import Container


[docs] def unfinished_task_count(container: Container): unfinished_tasks = 0 for agent in container._agents.values(): sleeping_tasks = [] scheduled_tasks = ( agent._scheduler._scheduled_tasks + agent._scheduler._scheduled_process_tasks ) for scheduled_task, task, _, _ in scheduled_tasks: if scheduled_task._is_sleeping.done(): # we need to recognize how many sleeping tasks we have in order to find out if all tasks are done sleeping_tasks.append(scheduled_task) unfinished_tasks += len(scheduled_tasks) - len(sleeping_tasks) return unfinished_tasks
[docs] async def tasks_complete_or_sleeping(container: Container, except_sources=["no_wait"]): sleeping_tasks = [] task_list = [] # is None for containers in MirrorContainerProcessManager if container.inbox is not None: await container.inbox.join() # python does not have do while pattern for agent in container._agents.values(): await agent.inbox.join() task_list.extend(agent.scheduler._scheduled_tasks) task_list.extend(agent.scheduler._scheduled_process_tasks) task_list = list(filter(lambda x: x[3] not in except_sources, task_list)) while len(task_list) > len(sleeping_tasks): # sleep needed so that asyncio tasks of this time step are correctly awaken. # await asyncio.sleep(0) if container.inbox is not None: await container.inbox.join() for scheduled_task, task, _, _ in task_list: await asyncio.wait( [scheduled_task._is_sleeping, scheduled_task._is_done], return_when=asyncio.FIRST_COMPLETED, ) if ( scheduled_task._is_sleeping.done() and scheduled_task not in sleeping_tasks ): # we need to recognize how many sleeping tasks we have in order to find out if all tasks are done sleeping_tasks.append(scheduled_task) # recreate task_list - as new tasks might have been added task_list = [] for agent in container._agents.values(): await agent.inbox.join() task_list.extend(agent.scheduler._scheduled_tasks) task_list.extend(agent.scheduler._scheduled_process_tasks) task_list = list(filter(lambda x: x[3] not in except_sources, task_list))