1 /* Copyright (c) 2009, 2010, 2011, 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include <xbt/replay.h>
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
19 xbt_dynar_t isends; /* of MPI_Request */
20 xbt_dynar_t irecvs; /* of MPI_Request */
21 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
24 static double parse_double(const char *string)
28 value = strtod(string, &endptr);
30 THROWF(unknown_error, 0, "%s is not a double", string);
34 static void action_init(const char *const *action)
36 XBT_DEBUG("Initialize the counters");
37 smpi_replay_globals_t globals = xbt_new(s_smpi_replay_globals_t, 1);
38 globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL);
39 globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
41 smpi_process_set_user_data((void*) globals);
43 /* start a simulated timer */
44 smpi_process_simulated_start();
45 /*initialize the number of active processes */
46 active_processes = smpi_process_count();
49 static void action_finalize(const char *const *action)
51 smpi_replay_globals_t globals =
52 (smpi_replay_globals_t) smpi_process_get_user_data();
55 XBT_DEBUG("There are %lu isends and %lu irecvs in the dynars",
56 xbt_dynar_length(globals->isends),xbt_dynar_length(globals->irecvs));
57 xbt_dynar_free_container(&(globals->isends));
58 xbt_dynar_free_container(&(globals->irecvs));
63 static void action_comm_size(const char *const *action)
65 double clock = smpi_process_simulated_elapsed();
67 communicator_size = parse_double(action[2]);
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 char *name = xbt_str_join_array(action, " ");
71 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
77 static void action_compute(const char *const *action)
79 double clock = smpi_process_simulated_elapsed();
80 smpi_execute_flops(parse_double(action[2]));
82 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
83 char *name = xbt_str_join_array(action, " ");
84 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
89 static void action_send(const char *const *action)
91 int to = atoi(action[2]);
92 double size=parse_double(action[3]);
93 double clock = smpi_process_simulated_elapsed();
95 int rank = smpi_comm_rank(MPI_COMM_WORLD);
96 TRACE_smpi_computing_out(rank);
97 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
98 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
99 TRACE_smpi_send(rank, rank, dst_traced);
102 smpi_mpi_send(NULL, size, MPI_BYTE, to , 0, MPI_COMM_WORLD);
104 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
105 char *name = xbt_str_join_array(action, " ");
106 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
111 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
112 TRACE_smpi_computing_in(rank);
117 static void action_Isend(const char *const *action)
119 int to = atoi(action[2]);
120 double size=parse_double(action[3]);
121 double clock = smpi_process_simulated_elapsed();
122 smpi_replay_globals_t globals =
123 (smpi_replay_globals_t) smpi_process_get_user_data();
125 int rank = smpi_comm_rank(MPI_COMM_WORLD);
126 TRACE_smpi_computing_out(rank);
127 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
128 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
129 TRACE_smpi_send(rank, rank, dst_traced);
132 MPI_Request request = smpi_mpi_isend(NULL, size, MPI_BYTE, to, 0,
135 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
137 TRACE_smpi_computing_in(rank);
140 xbt_dynar_push(globals->isends,&request);
142 //TODO do the asynchronous cleanup
143 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
144 char *name = xbt_str_join_array(action, " ");
145 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
150 static void action_recv(const char *const *action) {
151 int from = atoi(action[2]);
152 double size=parse_double(action[3]);
153 double clock = smpi_process_simulated_elapsed();
156 int rank = smpi_comm_rank(MPI_COMM_WORLD);
157 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
158 TRACE_smpi_computing_out(rank);
160 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
163 smpi_mpi_recv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD, &status);
166 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
167 TRACE_smpi_recv(rank, src_traced, rank);
168 TRACE_smpi_computing_in(rank);
171 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
172 char *name = xbt_str_join_array(action, " ");
173 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
178 static void action_Irecv(const char *const *action)
180 int from = atoi(action[2]);
181 double size=parse_double(action[3]);
182 double clock = smpi_process_simulated_elapsed();
184 smpi_replay_globals_t globals =
185 (smpi_replay_globals_t) smpi_process_get_user_data();
188 int rank = smpi_comm_rank(MPI_COMM_WORLD);
189 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
190 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
193 request = smpi_mpi_irecv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD);
195 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
198 xbt_dynar_push(globals->irecvs,&request);
200 //TODO do the asynchronous cleanup
201 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
202 char *name = xbt_str_join_array(action, " ");
203 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
208 static void action_wait(const char *const *action){
209 double clock = smpi_process_simulated_elapsed();
212 smpi_replay_globals_t globals =
213 (smpi_replay_globals_t) smpi_process_get_user_data();
215 xbt_assert(xbt_dynar_length(globals->irecvs),
216 "action wait not preceded by any irecv: %s",
217 xbt_str_join_array(action," "));
218 request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
220 int rank = request && request->comm != MPI_COMM_NULL
221 ? smpi_comm_rank(request->comm)
223 TRACE_smpi_computing_out(rank);
225 MPI_Group group = smpi_comm_group(request->comm);
226 int src_traced = smpi_group_rank(group, request->src);
227 int dst_traced = smpi_group_rank(group, request->dst);
228 int is_wait_for_receive = request->recv;
229 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__);
231 smpi_mpi_wait(&request, &status);
233 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
234 if (is_wait_for_receive) {
235 TRACE_smpi_recv(rank, src_traced, dst_traced);
237 TRACE_smpi_computing_in(rank);
240 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
241 char *name = xbt_str_join_array(action, " ");
242 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
247 static void action_barrier(const char *const *action){
248 double clock = smpi_process_simulated_elapsed();
250 int rank = smpi_comm_rank(MPI_COMM_WORLD);
251 TRACE_smpi_computing_out(rank);
252 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
254 smpi_mpi_barrier(MPI_COMM_WORLD);
256 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
257 TRACE_smpi_computing_in(rank);
260 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
261 char *name = xbt_str_join_array(action, " ");
262 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
267 static void action_bcast(const char *const *action)
269 double size = parse_double(action[2]);
270 double clock = smpi_process_simulated_elapsed();
272 int rank = smpi_comm_rank(MPI_COMM_WORLD);
273 TRACE_smpi_computing_out(rank);
274 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
275 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
278 smpi_mpi_bcast(NULL, size, MPI_BYTE, 0, MPI_COMM_WORLD);
280 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
281 TRACE_smpi_computing_in(rank);
284 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
285 char *name = xbt_str_join_array(action, " ");
286 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
291 static void action_reduce(const char *const *action)
293 double size = parse_double(action[2]);
294 double clock = smpi_process_simulated_elapsed();
296 int rank = smpi_comm_rank(MPI_COMM_WORLD);
297 TRACE_smpi_computing_out(rank);
298 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
299 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
301 smpi_mpi_reduce(NULL, NULL, size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
303 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
304 TRACE_smpi_computing_in(rank);
307 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
308 char *name = xbt_str_join_array(action, " ");
309 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
314 static void action_allReduce(const char *const *action) {
315 double comm_size = parse_double(action[2]);
316 double comp_size = parse_double(action[3]);
317 double clock = smpi_process_simulated_elapsed();
319 int rank = smpi_comm_rank(MPI_COMM_WORLD);
320 TRACE_smpi_computing_out(rank);
321 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
323 smpi_mpi_reduce(NULL, NULL, comm_size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
324 smpi_execute_flops(comp_size);
325 smpi_mpi_bcast(NULL, comm_size, MPI_BYTE, 0, MPI_COMM_WORLD);
327 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
328 TRACE_smpi_computing_in(rank);
331 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
332 char *name = xbt_str_join_array(action, " ");
333 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
338 void smpi_replay_init(int *argc, char***argv){
339 PMPI_Init(argc, argv);
340 if (!smpi_process_index()){
341 _xbt_replay_action_init();
342 xbt_replay_action_register("init", action_init);
343 xbt_replay_action_register("finalize", action_finalize);
344 xbt_replay_action_register("comm_size",action_comm_size);
345 xbt_replay_action_register("send", action_send);
346 xbt_replay_action_register("Isend", action_Isend);
347 xbt_replay_action_register("recv", action_recv);
348 xbt_replay_action_register("Irecv", action_Irecv);
349 xbt_replay_action_register("wait", action_wait);
350 xbt_replay_action_register("barrier", action_barrier);
351 xbt_replay_action_register("bcast", action_bcast);
352 xbt_replay_action_register("reduce", action_reduce);
353 xbt_replay_action_register("allReduce",action_allReduce);
354 xbt_replay_action_register("compute", action_compute);
357 xbt_replay_action_runner(*argc, *argv);
360 int smpi_replay_finalize(){
362 /* One active process will stop. Decrease the counter*/
365 if(!active_processes){
366 /* Last process alive speaking */
367 /* end the simulated timer */
368 sim_time = smpi_process_simulated_elapsed();
369 XBT_INFO("Simulation time %g", sim_time);
370 _xbt_replay_action_exit();
372 return PMPI_Finalize();