1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
7 This example implements the following scenario:
8 - Multiple workers consume jobs (Job) from a shared mailbox (worker)
9 - A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
10 - The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
11 - The client then displays the status of individual jobs.
15 from argparse import ArgumentParser
16 from dataclasses import dataclass
17 from typing import List
18 from uuid import uuid4
21 from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
24 SIMULATED_JOB_SIZE_BYTES = 1024
25 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
28 def parse_requests(requests_str: str) -> List[float]:
29 return [float(item.strip()) for item in requests_str.split(",")]
32 def create_parser() -> ArgumentParser:
33 parser = ArgumentParser()
38 help='path to the platform description'
44 help="number of worker actors to start"
50 help="duration of individual jobs sent to the workers by the client"
56 help="number of seconds before the client gives up waiting for results and aborts the simulation"
65 result_mailbox: Mailbox
68 def worker(worker_id: str):
69 this_actor.info(f"{worker_id} started")
70 mailbox: Mailbox = Mailbox.by_name("worker")
72 job: Job = mailbox.get()
73 this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
74 this_actor.sleep_for(job.duration)
75 job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
84 def complete(self) -> bool:
85 return self.comm.test()
88 def status(self) -> str:
89 return "complete" if self.complete else "pending"
92 def client(client_id: str, jobs: List[float], wait_timeout: float):
93 worker_mailbox: Mailbox = Mailbox.by_name("worker")
94 this_actor.info(f"{client_id} started")
95 async_job_results: list[AsyncJobResult] = []
96 for job_idx, job_duration in enumerate(jobs):
97 result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
98 job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
99 out_comm = worker_mailbox.put_init(Job(
100 job_id=f"job-{job_idx}",
101 duration=job_duration,
102 result_mailbox=result_mailbox
103 ), SIMULATED_JOB_SIZE_BYTES)
105 result_comm = result_mailbox.get_async()
106 async_job_results.append(AsyncJobResult(
110 this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
111 completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
112 logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
113 logger(f"received {completed_comms}/{len(async_job_results)} results")
114 for result in async_job_results:
115 this_actor.info(f"{result.job.job_id}"
116 f" status={result.status}"
117 f" result_payload={result.comm.get_payload() if result.complete else ''}")
121 settings = create_parser().parse_known_args()[0]
123 e.load_platform(settings.platform)
124 Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
125 for worker_idx in range(settings.workers):
126 Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
130 if __name__ == "__main__":