-# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.
from argparse import ArgumentParser
from dataclasses import dataclass
+from typing import List
from uuid import uuid4
import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
SIMULATED_JOB_SIZE_BYTES = 1024
SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
-def parse_requests(requests_str: str) -> list[float]:
+def parse_requests(requests_str: str) -> List[float]:
return [float(item.strip()) for item in requests_str.split(",")]
parser.add_argument(
'--platform',
type=str,
+ required=True,
help='path to the platform description'
)
parser.add_argument(
@dataclass
class AsyncJobResult:
job: Job
- result_comm: Comm
- async_data: PyGetAsync
+ comm: Comm
@property
def complete(self) -> bool:
- return self.result_comm.test()
+ return self.comm.test()
@property
def status(self) -> str:
return "complete" if self.complete else "pending"
-def client(client_id: str, jobs: list[float], wait_timeout: float):
+def client(client_id: str, jobs: List[float], wait_timeout: float):
worker_mailbox: Mailbox = Mailbox.by_name("worker")
this_actor.info(f"{client_id} started")
async_job_results: list[AsyncJobResult] = []
result_mailbox=result_mailbox
), SIMULATED_JOB_SIZE_BYTES)
out_comm.detach()
- result_comm, async_data = result_mailbox.get_async()
+ result_comm = result_mailbox.get_async()
async_job_results.append(AsyncJobResult(
job=job,
- result_comm=result_comm,
- async_data=async_data
+ comm=result_comm
))
this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
- completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+ completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
logger(f"received {completed_comms}/{len(async_job_results)} results")
for result in async_job_results:
this_actor.info(f"{result.job.job_id}"
f" status={result.status}"
- f" result_payload={result.async_data.get() if result.complete else ''}")
+ f" result_payload={result.comm.get_payload() if result.complete else ''}")
def main():