Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d72490ba4bff46e63219d23113be1706ed585bea
[simgrid.git] / examples / python / comm-waitallfor / comm-waitallfor.py
1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 """
7 This example implements the following scenario:
8 - Multiple workers consume jobs (Job) from a shared mailbox (worker)
9 - A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
10 - The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
11 - The client then displays the status of individual jobs.
12 """
13
14
15 from argparse import ArgumentParser
16 from dataclasses import dataclass
17 from typing import List
18 from uuid import uuid4
19 import sys
20
21 from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
22
23
24 SIMULATED_JOB_SIZE_BYTES = 1024
25 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
26
27
28 def parse_requests(requests_str: str) -> List[float]:
29     return [float(item.strip()) for item in requests_str.split(",")]
30
31
32 def create_parser() -> ArgumentParser:
33     parser = ArgumentParser()
34     parser.add_argument(
35         '--platform',
36         type=str,
37         required=True,
38         help='path to the platform description'
39     )
40     parser.add_argument(
41         "--workers",
42         type=int,
43         default=1,
44         help="number of worker actors to start"
45     )
46     parser.add_argument(
47         "--jobs",
48         type=parse_requests,
49         default="1,2,3,4,5",
50         help="duration of individual jobs sent to the workers by the client"
51     )
52     parser.add_argument(
53         "--wait-timeout",
54         type=float,
55         default=5.0,
56         help="number of seconds before the client gives up waiting for results and aborts the simulation"
57     )
58     return parser
59
60
61 @dataclass
62 class Job:
63     job_id: str
64     duration: float
65     result_mailbox: Mailbox
66
67
68 def worker(worker_id: str):
69     this_actor.info(f"{worker_id} started")
70     mailbox: Mailbox = Mailbox.by_name("worker")
71     while True:
72         job: Job = mailbox.get()
73         this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
74         this_actor.sleep_for(job.duration)
75         job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
76
77
78 @dataclass
79 class AsyncJobResult:
80     job: Job
81     comm: Comm
82
83     @property
84     def complete(self) -> bool:
85         return self.comm.test()
86
87     @property
88     def status(self) -> str:
89         return "complete" if self.complete else "pending"
90
91
92 def client(client_id: str, jobs: List[float], wait_timeout: float):
93     worker_mailbox: Mailbox = Mailbox.by_name("worker")
94     this_actor.info(f"{client_id} started")
95     async_job_results: list[AsyncJobResult] = []
96     for job_idx, job_duration in enumerate(jobs):
97         result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
98         job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
99         out_comm = worker_mailbox.put_init(Job(
100             job_id=f"job-{job_idx}",
101             duration=job_duration,
102             result_mailbox=result_mailbox
103         ), SIMULATED_JOB_SIZE_BYTES)
104         out_comm.detach()
105         result_comm = result_mailbox.get_async()
106         async_job_results.append(AsyncJobResult(
107             job=job,
108             comm=result_comm
109         ))
110     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
111     completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
112     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
113     logger(f"received {completed_comms}/{len(async_job_results)} results")
114     for result in async_job_results:
115         this_actor.info(f"{result.job.job_id}"
116                         f" status={result.status}"
117                         f" result_payload={result.comm.get_payload() if result.complete else ''}")
118
119
120 def main():
121     settings = create_parser().parse_known_args()[0]
122     e = Engine(sys.argv)
123     e.load_platform(settings.platform)
124     Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
125     for worker_idx in range(settings.workers):
126         Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
127     e.run()
128
129
130 if __name__ == "__main__":
131     main()