Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7214b99498f72841b84bff966b31a64dd6bc7b46
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009, 2010, 2011, 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16
17 typedef struct {
18   xbt_dynar_t isends; /* of MPI_Request */
19   xbt_dynar_t irecvs; /* of MPI_Request */
20 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
21
22 /* Helper function */
23 static double parse_double(const char *string)
24 {
25   double value;
26   char *endptr;
27   value = strtod(string, &endptr);
28   if (*endptr != '\0')
29     THROWF(unknown_error, 0, "%s is not a double", string);
30   return value;
31 }
32
33 static void action_init(const char *const *action)
34 {
35   XBT_DEBUG("Initialize the counters");
36   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
37   globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL);
38   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
39
40   smpi_process_set_user_data((void*) globals);
41
42   /* start a simulated timer */
43   smpi_process_simulated_start();
44 }
45
46 static void action_finalize(const char *const *action)
47 {
48   double sim_time= 1.;
49   smpi_replay_globals_t globals =
50       (smpi_replay_globals_t) smpi_process_get_user_data();
51
52   if (globals){
53     XBT_DEBUG("There are %lu isends and %lu irecvs in the dynars",
54          xbt_dynar_length(globals->isends),xbt_dynar_length(globals->irecvs));
55     xbt_dynar_free_container(&(globals->isends));
56     xbt_dynar_free_container(&(globals->irecvs));
57   }
58   free(globals);
59   /* end the simulated timer */
60   sim_time = smpi_process_simulated_elapsed();
61   if (!smpi_process_index())
62     XBT_INFO("Simulation time %g", sim_time);
63   smpi_process_finalize();
64   smpi_process_destroy();
65 }
66
67 static void action_comm_size(const char *const *action)
68 {
69   double clock = smpi_process_simulated_elapsed();
70
71   communicator_size = parse_double(action[2]);
72
73   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
74     char *name = xbt_str_join_array(action, " ");
75     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
76     free(name);
77   }
78 }
79
80
81 static void action_compute(const char *const *action)
82 {
83   double clock = smpi_process_simulated_elapsed();
84   smpi_execute_flops(parse_double(action[2]));
85
86   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
87     char *name = xbt_str_join_array(action, " ");
88     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
89     free(name);
90   }
91 }
92
93 static void action_send(const char *const *action)
94 {
95   int to = atoi(action[2]);
96   double size=parse_double(action[3]);
97   double clock = smpi_process_simulated_elapsed();
98 #ifdef HAVE_TRACING
99   int rank = smpi_comm_rank(MPI_COMM_WORLD);
100   TRACE_smpi_computing_out(rank);
101   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
102   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
103   TRACE_smpi_send(rank, rank, dst_traced);
104 #endif
105
106   smpi_mpi_send(NULL, size, MPI_BYTE, to , 0, MPI_COMM_WORLD);
107
108   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
109     char *name = xbt_str_join_array(action, " ");
110     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
111     free(name);
112   }
113
114   #ifdef HAVE_TRACING
115   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
116   TRACE_smpi_computing_in(rank);
117 #endif
118
119 }
120
121 static void action_Isend(const char *const *action)
122 {
123   int to = atoi(action[2]);
124   double size=parse_double(action[3]);
125   double clock = smpi_process_simulated_elapsed();
126   smpi_replay_globals_t globals =
127      (smpi_replay_globals_t) smpi_process_get_user_data();
128 #ifdef HAVE_TRACING
129   int rank = smpi_comm_rank(MPI_COMM_WORLD);
130   TRACE_smpi_computing_out(rank);
131   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
132   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
133   TRACE_smpi_send(rank, rank, dst_traced);
134 #endif
135
136   MPI_Request request = smpi_mpi_isend(NULL, size, MPI_BYTE, to, 0,
137                                        MPI_COMM_WORLD);
138 #ifdef HAVE_TRACING
139   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
140   request->send = 1;
141   TRACE_smpi_computing_in(rank);
142 #endif
143
144   xbt_dynar_push(globals->isends,&request);
145
146   //TODO do the asynchronous cleanup
147   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
148     char *name = xbt_str_join_array(action, " ");
149     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
150     free(name);
151   }
152 }
153
154 static void action_recv(const char *const *action) {
155   int from = atoi(action[2]);
156   double size=parse_double(action[3]);
157   double clock = smpi_process_simulated_elapsed();
158   MPI_Status status;
159 #ifdef HAVE_TRACING
160   int rank = smpi_comm_rank(MPI_COMM_WORLD);
161   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
162   TRACE_smpi_computing_out(rank);
163
164   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
165 #endif
166
167   smpi_mpi_recv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD, &status);
168
169 #ifdef HAVE_TRACING
170   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
171   TRACE_smpi_recv(rank, src_traced, rank);
172   TRACE_smpi_computing_in(rank);
173 #endif
174
175   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
176     char *name = xbt_str_join_array(action, " ");
177     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
178     free(name);
179   }
180 }
181
182 static void action_Irecv(const char *const *action)
183 {
184   int from = atoi(action[2]);
185   double size=parse_double(action[3]);
186   double clock = smpi_process_simulated_elapsed();
187   MPI_Request request;
188   smpi_replay_globals_t globals =
189      (smpi_replay_globals_t) smpi_process_get_user_data();
190
191 #ifdef HAVE_TRACING
192   int rank = smpi_comm_rank(MPI_COMM_WORLD);
193   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
194   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
195 #endif
196
197   request = smpi_mpi_irecv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD);
198 #ifdef HAVE_TRACING
199   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
200   request->recv = 1;
201 #endif
202   xbt_dynar_push(globals->irecvs,&request);
203
204   //TODO do the asynchronous cleanup
205   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
206     char *name = xbt_str_join_array(action, " ");
207     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
208     free(name);
209   }
210 }
211
212 static void action_wait(const char *const *action){
213   double clock = smpi_process_simulated_elapsed();
214   MPI_Request request;
215   MPI_Status status;
216   smpi_replay_globals_t globals =
217       (smpi_replay_globals_t) smpi_process_get_user_data();
218
219   xbt_assert(xbt_dynar_length(globals->irecvs),
220       "action wait not preceded by any irecv: %s",
221       xbt_str_join_array(action," "));
222   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
223 #ifdef HAVE_TRACING
224   int rank = request && request->comm != MPI_COMM_NULL
225       ? smpi_comm_rank(request->comm)
226       : -1;
227   TRACE_smpi_computing_out(rank);
228
229   MPI_Group group = smpi_comm_group(request->comm);
230   int src_traced = smpi_group_rank(group, request->src);
231   int dst_traced = smpi_group_rank(group, request->dst);
232   int is_wait_for_receive = request->recv;
233   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__);
234 #endif
235   smpi_mpi_wait(&request, &status);
236 #ifdef HAVE_TRACING
237   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
238   if (is_wait_for_receive) {
239     TRACE_smpi_recv(rank, src_traced, dst_traced);
240   }
241   TRACE_smpi_computing_in(rank);
242 #endif
243
244   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
245     char *name = xbt_str_join_array(action, " ");
246     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
247     free(name);
248   }
249 }
250
251 static void action_barrier(const char *const *action){
252   double clock = smpi_process_simulated_elapsed();
253 #ifdef HAVE_TRACING
254   int rank = smpi_comm_rank(MPI_COMM_WORLD);
255   TRACE_smpi_computing_out(rank);
256   TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
257 #endif
258   smpi_mpi_barrier(MPI_COMM_WORLD);
259 #ifdef HAVE_TRACING
260   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
261   TRACE_smpi_computing_in(rank);
262 #endif
263
264   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
265     char *name = xbt_str_join_array(action, " ");
266     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
267     free(name);
268   }
269 }
270
271 static void action_bcast(const char *const *action)
272 {
273   double size = parse_double(action[2]);
274   double clock = smpi_process_simulated_elapsed();
275 #ifdef HAVE_TRACING
276   int rank = smpi_comm_rank(MPI_COMM_WORLD);
277   TRACE_smpi_computing_out(rank);
278   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
279   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
280 #endif
281
282   smpi_mpi_bcast(NULL, size, MPI_BYTE, 0, MPI_COMM_WORLD);
283 #ifdef HAVE_TRACING
284   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
285   TRACE_smpi_computing_in(rank);
286 #endif
287
288   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
289     char *name = xbt_str_join_array(action, " ");
290     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
291     free(name);
292   }
293 }
294
295 static void action_reduce(const char *const *action)
296 {
297   double size = parse_double(action[2]);
298   double clock = smpi_process_simulated_elapsed();
299 #ifdef HAVE_TRACING
300   int rank = smpi_comm_rank(MPI_COMM_WORLD);
301   TRACE_smpi_computing_out(rank);
302   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
303   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
304 #endif
305    smpi_mpi_reduce(NULL, NULL, size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
306 #ifdef HAVE_TRACING
307   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
308   TRACE_smpi_computing_in(rank);
309 #endif
310
311   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
312     char *name = xbt_str_join_array(action, " ");
313     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
314     free(name);
315   }
316 }
317
318 static void action_allReduce(const char *const *action) {
319   double comm_size = parse_double(action[2]);
320   double comp_size = parse_double(action[3]);
321   double clock = smpi_process_simulated_elapsed();
322 #ifdef HAVE_TRACING
323   int rank = smpi_comm_rank(MPI_COMM_WORLD);
324   TRACE_smpi_computing_out(rank);
325   TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
326 #endif
327   smpi_mpi_reduce(NULL, NULL, comm_size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
328   smpi_execute_flops(comp_size);
329   smpi_mpi_bcast(NULL, comm_size, MPI_BYTE, 0, MPI_COMM_WORLD);
330 #ifdef HAVE_TRACING
331   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
332   TRACE_smpi_computing_in(rank);
333 #endif
334
335   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
336     char *name = xbt_str_join_array(action, " ");
337     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
338     free(name);
339   }
340 }
341
342 void smpi_replay_init(int *argc, char***argv){
343   PMPI_Init(argc, argv);
344   if (!smpi_process_index()){
345     _xbt_replay_action_init();
346     xbt_replay_action_register("init",     action_init);
347     xbt_replay_action_register("finalize", action_finalize);
348     xbt_replay_action_register("comm_size",action_comm_size);
349     xbt_replay_action_register("send",     action_send);
350     xbt_replay_action_register("Isend",    action_Isend);
351     xbt_replay_action_register("recv",     action_recv);
352     xbt_replay_action_register("Irecv",    action_Irecv);
353     xbt_replay_action_register("wait",     action_wait);
354     xbt_replay_action_register("barrier",  action_barrier);
355     xbt_replay_action_register("bcast",    action_bcast);
356     xbt_replay_action_register("reduce",   action_reduce);
357     xbt_replay_action_register("allReduce",action_allReduce);
358     xbt_replay_action_register("compute",  action_compute);
359   }
360
361   xbt_replay_action_runner(*argc, *argv);
362 }
363
364 int smpi_replay_finalize(){
365   if(!smpi_process_index())
366      _xbt_replay_action_exit();
367   return PMPI_Finalize();
368 }