Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update ChangeLog.
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009, 2010, 2011, 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16
17 typedef struct {
18   xbt_dynar_t isends; /* of MPI_Request */
19   xbt_dynar_t irecvs; /* of MPI_Request */
20 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
21
22 /* Helper function */
23 static double parse_double(const char *string)
24 {
25   double value;
26   char *endptr;
27   value = strtod(string, &endptr);
28   if (*endptr != '\0')
29     THROWF(unknown_error, 0, "%s is not a double", string);
30   return value;
31 }
32
33 static void action_init(const char *const *action)
34 {
35   XBT_DEBUG("Initialize the counters");
36   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
37   globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL);
38   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
39
40   smpi_process_set_user_data((void*) globals);
41
42   /* start a simulated timer */
43   smpi_process_simulated_start();
44 }
45
46 static void action_finalize(const char *const *action)
47 {
48   double sim_time= 1.;
49   smpi_replay_globals_t globals =
50       (smpi_replay_globals_t) smpi_process_get_user_data();
51
52   if (globals){
53     XBT_DEBUG("There are %lu isends and %lu irecvs in the dynars",
54          xbt_dynar_length(globals->isends),xbt_dynar_length(globals->irecvs));
55     xbt_dynar_free_container(&(globals->isends));
56     xbt_dynar_free_container(&(globals->irecvs));
57   }
58   free(globals);
59   /* end the simulated timer */
60   sim_time = smpi_process_simulated_elapsed();
61   if (!smpi_process_index())
62     XBT_INFO("Simulation time %g", sim_time);
63   smpi_process_finalize();
64   smpi_process_destroy();
65 }
66
67 static void action_comm_size(const char *const *action)
68 {
69   double clock = smpi_process_simulated_elapsed();
70
71   communicator_size = parse_double(action[2]);
72
73   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
74            smpi_process_simulated_elapsed()-clock);
75 }
76
77
78 static void action_compute(const char *const *action)
79 {
80   double clock = smpi_process_simulated_elapsed();
81   smpi_execute_flops(parse_double(action[2]));
82
83   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
84            smpi_process_simulated_elapsed()-clock);
85 }
86
87 static void action_send(const char *const *action)
88 {
89   int to = atoi(action[2]);
90   double size=parse_double(action[3]);
91   double clock = smpi_process_simulated_elapsed();
92 #ifdef HAVE_TRACING
93   int rank = smpi_comm_rank(MPI_COMM_WORLD);
94   TRACE_smpi_computing_out(rank);
95   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
96   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
97   TRACE_smpi_send(rank, rank, dst_traced);
98 #endif
99
100   smpi_mpi_send(NULL, size, MPI_BYTE, to , 0, MPI_COMM_WORLD);
101   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
102            smpi_process_simulated_elapsed()-clock);
103 #ifdef HAVE_TRACING
104   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
105   TRACE_smpi_computing_in(rank);
106 #endif
107
108 }
109
110 static void action_Isend(const char *const *action)
111 {
112   int to = atoi(action[2]);
113   double size=parse_double(action[3]);
114   double clock = smpi_process_simulated_elapsed();
115   smpi_replay_globals_t globals =
116      (smpi_replay_globals_t) smpi_process_get_user_data();
117 #ifdef HAVE_TRACING
118   int rank = smpi_comm_rank(MPI_COMM_WORLD);
119   TRACE_smpi_computing_out(rank);
120   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
121   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
122   TRACE_smpi_send(rank, rank, dst_traced);
123 #endif
124
125   MPI_Request request = smpi_mpi_isend(NULL, size, MPI_BYTE, to, 0,
126                                        MPI_COMM_WORLD);
127 #ifdef HAVE_TRACING
128   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
129   request->send = 1;
130   TRACE_smpi_computing_in(rank);
131 #endif
132
133   xbt_dynar_push(globals->isends,&request);
134
135   //TODO do the asynchronous cleanup
136   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
137            smpi_process_simulated_elapsed()-clock);
138 }
139
140 static void action_recv(const char *const *action) {
141   int from = atoi(action[2]);
142   double size=parse_double(action[3]);
143   double clock = smpi_process_simulated_elapsed();
144   MPI_Status status;
145 #ifdef HAVE_TRACING
146   int rank = smpi_comm_rank(MPI_COMM_WORLD);
147   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
148   TRACE_smpi_computing_out(rank);
149
150   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
151 #endif
152
153   smpi_mpi_recv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD, &status);
154
155 #ifdef HAVE_TRACING
156   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
157   TRACE_smpi_recv(rank, src_traced, rank);
158   TRACE_smpi_computing_in(rank);
159 #endif
160
161   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
162            smpi_process_simulated_elapsed()-clock);
163 }
164
165 static void action_Irecv(const char *const *action)
166 {
167   int from = atoi(action[2]);
168   double size=parse_double(action[3]);
169   double clock = smpi_process_simulated_elapsed();
170   MPI_Request request;
171   smpi_replay_globals_t globals =
172      (smpi_replay_globals_t) smpi_process_get_user_data();
173
174 #ifdef HAVE_TRACING
175   int rank = smpi_comm_rank(MPI_COMM_WORLD);
176   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
177   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
178 #endif
179
180   request = smpi_mpi_irecv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD);
181 #ifdef HAVE_TRACING
182   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
183   request->recv = 1;
184 #endif
185   xbt_dynar_push(globals->irecvs,&request);
186
187   //TODO do the asynchronous cleanup
188   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
189            smpi_process_simulated_elapsed()-clock);
190 }
191
192 static void action_wait(const char *const *action){
193   double clock = smpi_process_simulated_elapsed();
194   MPI_Request request;
195   MPI_Status status;
196   smpi_replay_globals_t globals =
197       (smpi_replay_globals_t) smpi_process_get_user_data();
198
199   xbt_assert(xbt_dynar_length(globals->irecvs),
200       "action wait not preceded by any irecv: %s",
201       xbt_str_join_array(action," "));
202   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
203 #ifdef HAVE_TRACING
204   int rank = request && request->comm != MPI_COMM_NULL
205       ? smpi_comm_rank(request->comm)
206       : -1;
207   TRACE_smpi_computing_out(rank);
208
209   MPI_Group group = smpi_comm_group(request->comm);
210   int src_traced = smpi_group_rank(group, request->src);
211   int dst_traced = smpi_group_rank(group, request->dst);
212   int is_wait_for_receive = request->recv;
213   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__);
214 #endif
215   smpi_mpi_wait(&request, &status);
216 #ifdef HAVE_TRACING
217   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
218   if (is_wait_for_receive) {
219     TRACE_smpi_recv(rank, src_traced, dst_traced);
220   }
221   TRACE_smpi_computing_in(rank);
222 #endif
223
224   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
225            smpi_process_simulated_elapsed()-clock);
226 }
227
228 static void action_barrier(const char *const *action){
229   double clock = smpi_process_simulated_elapsed();
230 #ifdef HAVE_TRACING
231   int rank = smpi_comm_rank(MPI_COMM_WORLD);
232   TRACE_smpi_computing_out(rank);
233   TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
234 #endif
235   smpi_mpi_barrier(MPI_COMM_WORLD);
236 #ifdef HAVE_TRACING
237   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
238   TRACE_smpi_computing_in(rank);
239 #endif
240
241   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
242            smpi_process_simulated_elapsed()-clock);
243 }
244
245 static void action_bcast(const char *const *action)
246 {
247   double size = parse_double(action[2]);
248   double clock = smpi_process_simulated_elapsed();
249 #ifdef HAVE_TRACING
250   int rank = smpi_comm_rank(MPI_COMM_WORLD);
251   TRACE_smpi_computing_out(rank);
252   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
253   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
254 #endif
255
256   smpi_mpi_bcast(NULL, size, MPI_BYTE, 0, MPI_COMM_WORLD);
257 #ifdef HAVE_TRACING
258   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
259   TRACE_smpi_computing_in(rank);
260 #endif
261
262   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
263            smpi_process_simulated_elapsed()-clock);
264 }
265
266 static void action_reduce(const char *const *action)
267 {
268   double size = parse_double(action[2]);
269   double clock = smpi_process_simulated_elapsed();
270 #ifdef HAVE_TRACING
271   int rank = smpi_comm_rank(MPI_COMM_WORLD);
272   TRACE_smpi_computing_out(rank);
273   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
274   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
275 #endif
276    smpi_mpi_reduce(NULL, NULL, size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
277 #ifdef HAVE_TRACING
278   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
279   TRACE_smpi_computing_in(rank);
280 #endif
281
282   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
283            smpi_process_simulated_elapsed()-clock);
284 }
285
286 static void action_allReduce(const char *const *action) {
287   double comm_size = parse_double(action[2]);
288   double comp_size = parse_double(action[3]);
289   double clock = smpi_process_simulated_elapsed();
290 #ifdef HAVE_TRACING
291   int rank = smpi_comm_rank(MPI_COMM_WORLD);
292   TRACE_smpi_computing_out(rank);
293   TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
294 #endif
295   smpi_mpi_reduce(NULL, NULL, comm_size, MPI_BYTE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
296   smpi_execute_flops(comp_size);
297   smpi_mpi_bcast(NULL, comm_size, MPI_BYTE, 0, MPI_COMM_WORLD);
298 #ifdef HAVE_TRACING
299   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
300   TRACE_smpi_computing_in(rank);
301 #endif
302   XBT_VERB("%s %f", xbt_str_join_array(action, " "),
303            smpi_process_simulated_elapsed()-clock);
304 }
305
306 void smpi_replay_init(int *argc, char***argv){
307   PMPI_Init(argc, argv);
308   _xbt_replay_action_init();
309
310   xbt_replay_action_register("init",     action_init);
311   xbt_replay_action_register("finalize", action_finalize);
312   xbt_replay_action_register("comm_size",action_comm_size);
313   xbt_replay_action_register("send",     action_send);
314   xbt_replay_action_register("Isend",    action_Isend);
315   xbt_replay_action_register("recv",     action_recv);
316   xbt_replay_action_register("Irecv",    action_Irecv);
317   xbt_replay_action_register("wait",     action_wait);
318   xbt_replay_action_register("barrier",  action_barrier);
319   xbt_replay_action_register("bcast",    action_bcast);
320   xbt_replay_action_register("reduce",   action_reduce);
321   xbt_replay_action_register("allReduce",action_allReduce);
322   xbt_replay_action_register("compute",  action_compute);
323
324   xbt_replay_action_runner(*argc, *argv);
325 }
326
327 int smpi_replay_finalize(){
328   return PMPI_Finalize();
329 }