Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecate Comm::wait_all(). Remove it in python
[simgrid.git] / examples / python / comm-waitallfor / comm-waitallfor.py
index 207b327..d72490b 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the license (GNU LGPL) which comes with this package.
@@ -14,18 +14,18 @@ This example implements the following scenario:
 
 from argparse import ArgumentParser
 from dataclasses import dataclass
+from typing import List
 from uuid import uuid4
 import sys
-import typing
 
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
 
 
 SIMULATED_JOB_SIZE_BYTES = 1024
 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
 
 
-def parse_requests(requests_str: str) -> typing.List[float]:
+def parse_requests(requests_str: str) -> List[float]:
     return [float(item.strip()) for item in requests_str.split(",")]
 
 
@@ -34,6 +34,7 @@ def create_parser() -> ArgumentParser:
     parser.add_argument(
         '--platform',
         type=str,
+        required=True,
         help='path to the platform description'
     )
     parser.add_argument(
@@ -77,19 +78,18 @@ def worker(worker_id: str):
 @dataclass
 class AsyncJobResult:
     job: Job
-    result_comm: Comm
-    async_data: PyGetAsync
+    comm: Comm
 
     @property
     def complete(self) -> bool:
-        return self.result_comm.test()
+        return self.comm.test()
 
     @property
     def status(self) -> str:
         return "complete" if self.complete else "pending"
 
 
-def client(client_id: str, jobs: typing.List[float], wait_timeout: float):
+def client(client_id: str, jobs: List[float], wait_timeout: float):
     worker_mailbox: Mailbox = Mailbox.by_name("worker")
     this_actor.info(f"{client_id} started")
     async_job_results: list[AsyncJobResult] = []
@@ -102,20 +102,19 @@ def client(client_id: str, jobs: typing.List[float], wait_timeout: float):
             result_mailbox=result_mailbox
         ), SIMULATED_JOB_SIZE_BYTES)
         out_comm.detach()
-        result_comm, async_data = result_mailbox.get_async()
+        result_comm = result_mailbox.get_async()
         async_job_results.append(AsyncJobResult(
             job=job,
-            result_comm=result_comm,
-            async_data=async_data
+            comm=result_comm
         ))
     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
-    completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+    completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
     logger(f"received {completed_comms}/{len(async_job_results)} results")
     for result in async_job_results:
         this_actor.info(f"{result.job.job_id}"
                         f" status={result.status}"
-                        f" result_payload={result.async_data.get() if result.complete else ''}")
+                        f" result_payload={result.comm.get_payload() if result.complete else ''}")
 
 
 def main():