import logging
import time
from dataclasses import dataclass
from mango.agent.core import AgentAddress
from mango.container.core import Container
from mango.container.mp import ContainerMirrorData
from ..messages.codecs import Codec
from ..messages.message import MangoMessage
from ..util.clock import ExternalClock
from ..util.termination_detection import tasks_complete_or_sleeping
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ExternalAgentMessage:
message: bytes
time: float
receiver: str
[docs]
@dataclass
class ExternalSchedulingContainerOutput:
duration: float
messages: list[ExternalAgentMessage]
next_activity: None | float
[docs]
def ext_mirror_container_creator(
container_data, loop, message_pipe, main_queue, event_pipe, terminate_event
):
return ExternalSchedulingContainer(
addr=container_data.addr,
codec=container_data.codec,
clock=container_data.clock,
loop=loop,
mirror_data=ContainerMirrorData(
message_pipe=message_pipe,
event_pipe=event_pipe,
terminate_event=terminate_event,
main_queue=main_queue,
),
**container_data.kwargs,
)
[docs]
class ExternalSchedulingContainer(Container):
""" """
def __init__(
self,
*,
addr: str,
codec: Codec,
clock: ExternalClock = None,
**kwargs,
):
"""
Initializes a ExternalSchedulingContainer. Do not directly call this method but use
the factory method of **Container** instead
:param addr: The container sid / eid respectively
:param codec: The codec to use
:param loop: Current event loop
proto as codec
"""
if not clock:
clock = ExternalClock()
super().__init__(
addr=addr,
name=addr,
codec=codec,
clock=clock,
**kwargs,
)
self.current_start_time_of_step = time.time()
self._new_internal_message: bool = False
self.message_buffer = []
[docs]
async def send_message(
self,
content,
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
) -> bool:
"""
The Container sends a message to an agent according the container protocol.
:param content: The content of the message
:param receiver_addr: Address of the receiving container
:param kwargs: Additional parameters to provide protocol specific settings
"""
message = content
meta = {}
for key, value in kwargs.items():
meta[key] = value
meta["sender_id"] = sender_id
meta["sender_addr"] = self.addr
meta["receiver_id"] = receiver_addr.aid
meta["receiver_addr"] = receiver_addr.protocol_addr
if receiver_addr.protocol_addr == self.addr:
receiver_id = receiver_addr.aid
meta.update({"network_protocol": "external_connection"})
success = self._send_internal_message(
message=message, receiver_id=receiver_id, default_meta=meta
)
self._new_internal_message = True
return success
else:
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
return await self._send_external_message(receiver_addr, message)
async def _send_external_message(self, addr, message) -> bool:
"""
Sends an external message to another container
:param addr: The address of the receiving container
:param message: The non-encoded message
:return: Success or not
"""
encoded_msg = self.codec.encode(message)
# store message in the buffer, which will be emptied in step
self.message_buffer.append(
ExternalAgentMessage(
time=time.time() - self.current_start_time_of_step + self.clock.time,
receiver=addr.protocol_addr,
message=encoded_msg,
)
)
return True
[docs]
async def step(
self, simulation_time: float, incoming_messages: list[bytes]
) -> ExternalSchedulingContainerOutput:
if self.message_buffer:
logger.warning(
"There are messages in the message buffer to be sent, at the start when step was called."
)
self.current_start_time_of_step = time.time()
self.clock.set_time(simulation_time)
# now we will decode and distribute the incoming messages
for encoded_msg in incoming_messages:
message = self.codec.decode(encoded_msg)
content, meta = message.split_content_and_meta()
meta["network_protocol"] = "external_connection"
await self.inbox.put((0, content, meta))
# now wait for the msg_queue to be empty
await self.inbox.join()
# now wait for all agents to terminate
# we need to loop here, because we might need to join the agents inbox another times in case we send internal
# messages
while True:
self._new_internal_message = False
await tasks_complete_or_sleeping(self)
# wait until all agents are done with their tasks
if not self._new_internal_message:
# if there have
break
# now all agents in this container should be done
end_time = time.time()
messages_this_step, self.message_buffer = self.message_buffer, []
return ExternalSchedulingContainerOutput(
duration=end_time - self.current_start_time_of_step,
messages=messages_this_step,
next_activity=self.clock.get_next_activity(),
)
def _create_mirror_container(self):
return ext_mirror_container_creator