1 /* Copyright (c) 2009, 2010, 2011, 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include <xbt/replay.h>
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
19 MPI_Datatype MPI_DEFAULT_TYPE, MPI_CURRENT_TYPE;
21 static void log_timed_action (const char *const *action, double clock){
22 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23 char *name = xbt_str_join_array(action, " ");
24 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30 xbt_dynar_t isends; /* of MPI_Request */
31 xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
36 static double parse_double(const char *string)
40 value = strtod(string, &endptr);
42 THROWF(unknown_error, 0, "%s is not a double", string);
46 static MPI_Datatype decode_datatype(const char *const action)
48 // Declared datatypes,
53 MPI_CURRENT_TYPE=MPI_DOUBLE;
56 MPI_CURRENT_TYPE=MPI_INT;
59 MPI_CURRENT_TYPE=MPI_CHAR;
62 MPI_CURRENT_TYPE=MPI_SHORT;
65 MPI_CURRENT_TYPE=MPI_LONG;
68 MPI_CURRENT_TYPE=MPI_FLOAT;
71 MPI_CURRENT_TYPE=MPI_BYTE;
74 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
77 return MPI_CURRENT_TYPE;
80 static void action_init(const char *const *action)
83 XBT_DEBUG("Initialize the counters");
84 smpi_replay_globals_t globals = xbt_new(s_smpi_replay_globals_t, 1);
85 globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL);
86 globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
88 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
89 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
91 smpi_process_set_user_data((void*) globals);
93 /* start a simulated timer */
94 smpi_process_simulated_start();
95 /*initialize the number of active processes */
96 active_processes = smpi_process_count();
98 reqq=xbt_new0(xbt_dynar_t,active_processes);
100 for(i=0;i<active_processes;i++){
101 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
105 static void action_finalize(const char *const *action)
107 smpi_replay_globals_t globals =
108 (smpi_replay_globals_t) smpi_process_get_user_data();
110 XBT_DEBUG("There are %lu isends and %lu irecvs in the dynars",
111 xbt_dynar_length(globals->isends),xbt_dynar_length(globals->irecvs));
112 xbt_dynar_free_container(&(globals->isends));
113 xbt_dynar_free_container(&(globals->irecvs));
118 static void action_comm_size(const char *const *action)
120 double clock = smpi_process_simulated_elapsed();
122 communicator_size = parse_double(action[2]);
123 log_timed_action (action, clock);
126 static void action_comm_split(const char *const *action)
128 double clock = smpi_process_simulated_elapsed();
130 log_timed_action (action, clock);
133 static void action_comm_dup(const char *const *action)
135 double clock = smpi_process_simulated_elapsed();
137 log_timed_action (action, clock);
140 static void action_compute(const char *const *action)
142 double clock = smpi_process_simulated_elapsed();
143 smpi_execute_flops(parse_double(action[2]));
145 log_timed_action (action, clock);
148 static void action_send(const char *const *action)
150 int to = atoi(action[2]);
151 double size=parse_double(action[3]);
152 double clock = smpi_process_simulated_elapsed();
155 MPI_CURRENT_TYPE=decode_datatype(action[4]);
157 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
161 int rank = smpi_comm_rank(MPI_COMM_WORLD);
162 TRACE_smpi_computing_out(rank);
163 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
164 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
165 TRACE_smpi_send(rank, rank, dst_traced);
168 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
170 log_timed_action (action, clock);
173 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
174 TRACE_smpi_computing_in(rank);
179 static void action_Isend(const char *const *action)
181 int to = atoi(action[2]);
182 double size=parse_double(action[3]);
183 double clock = smpi_process_simulated_elapsed();
186 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
187 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
189 smpi_replay_globals_t globals =
190 (smpi_replay_globals_t) smpi_process_get_user_data();
192 int rank = smpi_comm_rank(MPI_COMM_WORLD);
193 TRACE_smpi_computing_out(rank);
194 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
195 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__);
196 TRACE_smpi_send(rank, rank, dst_traced);
199 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
202 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
204 TRACE_smpi_computing_in(rank);
207 xbt_dynar_push(globals->isends,&request);
208 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
210 log_timed_action (action, clock);
213 static void action_recv(const char *const *action) {
214 int from = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process_simulated_elapsed();
219 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
220 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
223 int rank = smpi_comm_rank(MPI_COMM_WORLD);
224 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
225 TRACE_smpi_computing_out(rank);
227 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
230 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
233 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
234 TRACE_smpi_recv(rank, src_traced, rank);
235 TRACE_smpi_computing_in(rank);
238 log_timed_action (action, clock);
241 static void action_Irecv(const char *const *action)
243 int from = atoi(action[2]);
244 double size=parse_double(action[3]);
245 double clock = smpi_process_simulated_elapsed();
248 smpi_replay_globals_t globals =
249 (smpi_replay_globals_t) smpi_process_get_user_data();
251 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
252 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
255 int rank = smpi_comm_rank(MPI_COMM_WORLD);
256 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
257 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__);
260 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
263 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
266 xbt_dynar_push(globals->irecvs,&request);
267 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
269 log_timed_action (action, clock);
272 static void action_wait(const char *const *action){
273 double clock = smpi_process_simulated_elapsed();
276 smpi_replay_globals_t globals =
277 (smpi_replay_globals_t) smpi_process_get_user_data();
279 xbt_assert(xbt_dynar_length(globals->irecvs),
280 "action wait not preceded by any irecv: %s",
281 xbt_str_join_array(action," "));
282 request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
284 int rank = request && request->comm != MPI_COMM_NULL
285 ? smpi_comm_rank(request->comm)
287 TRACE_smpi_computing_out(rank);
289 MPI_Group group = smpi_comm_group(request->comm);
290 int src_traced = smpi_group_rank(group, request->src);
291 int dst_traced = smpi_group_rank(group, request->dst);
292 int is_wait_for_receive = request->recv;
293 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__);
295 smpi_mpi_wait(&request, &status);
297 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
298 if (is_wait_for_receive) {
299 TRACE_smpi_recv(rank, src_traced, dst_traced);
301 TRACE_smpi_computing_in(rank);
304 log_timed_action (action, clock);
307 static void action_waitall(const char *const *action){
308 double clock = smpi_process_simulated_elapsed();
309 int count_requests=0;
312 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
314 if (count_requests>0) {
315 MPI_Request requests[count_requests];
316 MPI_Status status[count_requests];
318 /* The reqq is an array of dynars. Its index corresponds to the rank.
319 Thus each rank saves its own requests to the array request. */
320 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
323 //save information from requests
325 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
326 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
327 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
328 for (i = 0; i < count_requests; i++) {
330 int *asrc = xbt_new(int, 1);
331 int *adst = xbt_new(int, 1);
332 int *arecv = xbt_new(int, 1);
333 *asrc = requests[i]->src;
334 *adst = requests[i]->dst;
335 *arecv = requests[i]->recv;
336 xbt_dynar_insert_at(srcs, i, asrc);
337 xbt_dynar_insert_at(dsts, i, adst);
338 xbt_dynar_insert_at(recvs, i, arecv);
343 int *t = xbt_new(int, 1);
344 xbt_dynar_insert_at(srcs, i, t);
345 xbt_dynar_insert_at(dsts, i, t);
346 xbt_dynar_insert_at(recvs, i, t);
350 int rank_traced = smpi_process_index();
351 TRACE_smpi_computing_out(rank_traced);
353 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__);
356 smpi_mpi_waitall(count_requests, requests, status);
359 for (i = 0; i < count_requests; i++) {
360 int src_traced, dst_traced, is_wait_for_receive;
361 xbt_dynar_get_cpy(srcs, i, &src_traced);
362 xbt_dynar_get_cpy(dsts, i, &dst_traced);
363 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
364 if (is_wait_for_receive) {
365 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
368 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
370 xbt_dynar_free(&srcs);
371 xbt_dynar_free(&dsts);
372 xbt_dynar_free(&recvs);
373 TRACE_smpi_computing_in(rank_traced);
376 xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
378 log_timed_action (action, clock);
381 static void action_barrier(const char *const *action){
382 double clock = smpi_process_simulated_elapsed();
384 int rank = smpi_comm_rank(MPI_COMM_WORLD);
385 TRACE_smpi_computing_out(rank);
386 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
388 smpi_mpi_barrier(MPI_COMM_WORLD);
390 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
391 TRACE_smpi_computing_in(rank);
394 log_timed_action (action, clock);
398 static void action_bcast(const char *const *action)
400 double size = parse_double(action[2]);
401 double clock = smpi_process_simulated_elapsed();
404 * Initialize MPI_CURRENT_TYPE in order to decrease
405 * the number of the checks
407 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
410 root= atoi(action[3]);
412 MPI_CURRENT_TYPE=decode_datatype(action[4]);
417 int rank = smpi_comm_rank(MPI_COMM_WORLD);
418 TRACE_smpi_computing_out(rank);
419 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
420 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
423 smpi_mpi_bcast(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
425 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
426 TRACE_smpi_computing_in(rank);
429 log_timed_action (action, clock);
432 static void action_reduce(const char *const *action)
434 double comm_size = parse_double(action[2]);
435 double comp_size = parse_double(action[3]);
436 double clock = smpi_process_simulated_elapsed();
438 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
441 root= atoi(action[4]);
443 MPI_CURRENT_TYPE=decode_datatype(action[5]);
448 int rank = smpi_comm_rank(MPI_COMM_WORLD);
449 TRACE_smpi_computing_out(rank);
450 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
451 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__);
453 smpi_mpi_reduce(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
454 smpi_execute_flops(comp_size);
456 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
457 TRACE_smpi_computing_in(rank);
460 log_timed_action (action, clock);
463 static void action_allReduce(const char *const *action) {
464 double comm_size = parse_double(action[2]);
465 double comp_size = parse_double(action[3]);
467 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
468 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
470 double clock = smpi_process_simulated_elapsed();
472 int rank = smpi_comm_rank(MPI_COMM_WORLD);
473 TRACE_smpi_computing_out(rank);
474 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
476 smpi_mpi_reduce(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
477 smpi_execute_flops(comp_size);
478 smpi_mpi_bcast(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
480 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
481 TRACE_smpi_computing_in(rank);
484 log_timed_action (action, clock);
487 static void action_allToAll(const char *const *action) {
488 double clock = smpi_process_simulated_elapsed();
489 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
490 int send_size = parse_double(action[2]);
491 int recv_size = parse_double(action[3]);
492 void *send = xbt_new0(int, send_size*comm_size);
493 void *recv = xbt_new0(int, send_size*comm_size);
495 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
496 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
499 int rank = smpi_process_index();
500 TRACE_smpi_computing_out(rank);
501 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
504 if (send_size < 200 && comm_size > 12) {
505 smpi_coll_tuned_alltoall_bruck(send, send_size, MPI_CURRENT_TYPE,
506 recv, recv_size, MPI_CURRENT_TYPE,
508 } else if (send_size < 3000) {
509 smpi_coll_tuned_alltoall_basic_linear(send, send_size, MPI_CURRENT_TYPE,
510 recv, recv_size, MPI_CURRENT_TYPE,
513 smpi_coll_tuned_alltoall_pairwise(send, send_size, MPI_CURRENT_TYPE,
514 recv, recv_size, MPI_CURRENT_TYPE,
519 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
520 TRACE_smpi_computing_in(rank);
523 log_timed_action (action, clock);
528 static void action_allToAllv(const char *const *action) {
530 The structure of the allToAllV action for the rank 0 (total 4 processes)
532 0 allToAllV 100 1 7 10 12 5 10 20 45 100 1 70 10 5 1 5 77 90
535 1) 100 is the size of the send buffer *sizeof(int),
536 2) 1 7 10 12 is the sendcounts array
537 3) 5 10 20 45 is the sdispls array
538 4) 100*sizeof(int) is the size of the receiver buffer
539 5) 1 70 10 5 is the recvcounts array
540 6) 1 5 77 90 is the rdispls array
545 double clock = smpi_process_simulated_elapsed();
547 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
548 int send_buf_size=0,recv_buf_size=0,i=0;
549 int *sendcounts = xbt_new0(int, comm_size);
550 int *recvcounts = xbt_new0(int, comm_size);
551 int *senddisps = xbt_new0(int, comm_size);
552 int *recvdisps = xbt_new0(int, comm_size);
554 send_buf_size=parse_double(action[2]);
555 recv_buf_size=parse_double(action[3+2*comm_size]);
557 int *sendbuf = xbt_new0(int, send_buf_size);
558 int *recvbuf = xbt_new0(int, recv_buf_size);
560 if(action[4+4*comm_size]) MPI_CURRENT_TYPE=decode_datatype(action[4+4*comm_size]);
561 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
563 for(i=0;i<comm_size;i++) {
564 sendcounts[i] = atoi(action[i+3]);
565 senddisps[i] = atoi(action[i+3+comm_size]);
566 recvcounts[i] = atoi(action[i+4+2*comm_size]);
567 recvdisps[i] = atoi(action[i+4+3*comm_size]);
572 int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
573 TRACE_smpi_computing_out(rank);
574 TRACE_smpi_collective_in(rank, -1, __FUNCTION__);
576 smpi_coll_basic_alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
577 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
580 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
581 TRACE_smpi_computing_in(rank);
584 log_timed_action (action, clock);
587 xbt_free(sendcounts);
588 xbt_free(recvcounts);
595 void smpi_replay_init(int *argc, char***argv){
596 PMPI_Init(argc, argv);
597 if (!smpi_process_index()){
598 _xbt_replay_action_init();
599 xbt_replay_action_register("init", action_init);
600 xbt_replay_action_register("finalize", action_finalize);
601 xbt_replay_action_register("comm_size", action_comm_size);
602 xbt_replay_action_register("comm_split", action_comm_split);
603 xbt_replay_action_register("comm_dup", action_comm_dup);
604 xbt_replay_action_register("send", action_send);
605 xbt_replay_action_register("Isend", action_Isend);
606 xbt_replay_action_register("recv", action_recv);
607 xbt_replay_action_register("Irecv", action_Irecv);
608 xbt_replay_action_register("wait", action_wait);
609 xbt_replay_action_register("waitAll", action_waitall);
610 xbt_replay_action_register("barrier", action_barrier);
611 xbt_replay_action_register("bcast", action_bcast);
612 xbt_replay_action_register("reduce", action_reduce);
613 xbt_replay_action_register("allReduce", action_allReduce);
614 xbt_replay_action_register("allToAll", action_allToAll);
615 xbt_replay_action_register("allToAllV", action_allToAllv);
616 xbt_replay_action_register("compute", action_compute);
619 xbt_replay_action_runner(*argc, *argv);
622 int smpi_replay_finalize(){
624 /* One active process will stop. Decrease the counter*/
626 if(!active_processes){
627 /* Last process alive speaking */
628 /* end the simulated timer */
629 xbt_dynar_free(reqq);
630 sim_time = smpi_process_simulated_elapsed();
631 XBT_INFO("Simulation time %g", sim_time);
632 _xbt_replay_action_exit();
634 return PMPI_Finalize();