1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 void smpi_process_init(int *argc, char ***argv)
25 smpi_process_data_t data;
28 proc = SIMIX_process_self();
29 index = atoi((*argv)[1]);
30 data = smpi_process_remote_data(index);
31 SIMIX_process_set_data(proc, data);
34 memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
35 (*argv)[(*argc) - 1] = NULL;
38 DEBUG2("<%d> New process in the game: %p", index, proc);
41 void smpi_process_destroy(void)
43 int index = smpi_process_index();
45 DEBUG1("<%d> Process left the game", index);
48 static MPI_Request build_request(void *buf, int count,
49 MPI_Datatype datatype, int src, int dst,
50 int tag, MPI_Comm comm, unsigned flags)
54 request = xbt_new(s_smpi_mpi_request_t, 1);
56 request->size = smpi_datatype_size(datatype) * count;
63 request->complete = 0;
64 request->match = MPI_REQUEST_NULL;
65 request->flags = flags;
73 /* MPI Low level calls */
74 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
75 int dst, int tag, MPI_Comm comm)
78 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
79 comm, PERSISTENT | SEND);
84 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
85 int src, int tag, MPI_Comm comm)
88 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
89 comm, PERSISTENT | RECV);
94 void smpi_mpi_start(MPI_Request request)
96 xbt_assert0(request->complete == 0,
97 "Cannot start a non-finished communication");
98 if ((request->flags & RECV) == RECV) {
99 smpi_process_post_recv(request);
100 print_request("New recv", request);
102 SIMIX_network_irecv(request->rdv, request->buf, &request->size);
104 smpi_process_post_send(request->comm, request); // FIXME
105 print_request("New send", request);
107 SIMIX_network_isend(request->rdv, request->size, -1.0,
108 request->buf, request->size, NULL);
112 void smpi_mpi_startall(int count, MPI_Request * requests)
116 for (i = 0; i < count; i++) {
117 smpi_mpi_start(requests[i]);
121 void smpi_mpi_request_free(MPI_Request * request)
124 *request = MPI_REQUEST_NULL;
127 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
128 int dst, int tag, MPI_Comm comm)
130 MPI_Request request =
131 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132 comm, NON_PERSISTENT | SEND);
137 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
138 int dst, int tag, MPI_Comm comm)
140 MPI_Request request =
141 smpi_isend_init(buf, count, datatype, dst, tag, comm);
143 smpi_mpi_start(request);
147 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
148 int src, int tag, MPI_Comm comm)
150 MPI_Request request =
151 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
152 comm, NON_PERSISTENT | RECV);
157 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
158 int src, int tag, MPI_Comm comm)
160 MPI_Request request =
161 smpi_irecv_init(buf, count, datatype, src, tag, comm);
163 smpi_mpi_start(request);
167 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
168 int tag, MPI_Comm comm, MPI_Status * status)
172 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
173 smpi_mpi_wait(&request, status);
176 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
177 int tag, MPI_Comm comm)
181 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
182 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
185 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
186 int dst, int sendtag, void *recvbuf, int recvcount,
187 MPI_Datatype recvtype, int src, int recvtag,
188 MPI_Comm comm, MPI_Status * status)
190 MPI_Request requests[2];
194 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
196 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
197 smpi_mpi_startall(2, requests);
198 smpi_mpi_waitall(2, requests, stats);
199 if (status != MPI_STATUS_IGNORE) {
200 // Copy receive status
201 memcpy(status, &stats[1], sizeof(MPI_Status));
205 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
207 return status->count / smpi_datatype_size(datatype);
210 static void finish_wait(MPI_Request * request, MPI_Status * status)
212 if (status != MPI_STATUS_IGNORE) {
213 status->MPI_SOURCE = (*request)->src;
214 status->MPI_TAG = (*request)->tag;
215 status->MPI_ERROR = MPI_SUCCESS;
216 status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
218 SIMIX_communication_destroy((*request)->pair);
219 print_request("finishing wait", *request);
220 if ((*request)->complete == 1) {
221 SIMIX_rdv_destroy((*request)->rdv);
223 (*request)->match->complete = 1;
224 (*request)->match->match = MPI_REQUEST_NULL;
226 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
227 smpi_mpi_request_free(request);
229 (*request)->rdv = NULL;
230 (*request)->pair = NULL;
234 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
236 int flag = (*request)->complete;
239 smpi_mpi_wait(request, status);
244 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
249 *index = MPI_UNDEFINED;
251 for (i = 0; i < count; i++) {
252 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
253 smpi_mpi_wait(&requests[i], status);
262 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
264 print_request("wait", *request);
265 SIMIX_network_wait((*request)->pair, -1.0);
266 finish_wait(request, status);
269 int smpi_mpi_waitany(int count, MPI_Request requests[],
276 index = MPI_UNDEFINED;
278 // First check for already completed requests
279 for (i = 0; i < count; i++) {
280 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
282 smpi_mpi_wait(&requests[index], status);
286 if (index == MPI_UNDEFINED) {
287 // Otherwise, wait for a request to complete
288 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
289 map = xbt_new(int, count);
291 DEBUG0("Wait for one of");
292 for (i = 0; i < count; i++) {
293 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
294 print_request(" ", requests[i]);
295 xbt_dynar_push(comms, &requests[i]->pair);
301 index = SIMIX_network_waitany(comms);
303 finish_wait(&requests[index], status);
306 xbt_dynar_free(&comms);
312 void smpi_mpi_waitall(int count, MPI_Request requests[],
319 index = smpi_mpi_waitany(count, requests, &stat);
320 if (index == MPI_UNDEFINED) {
323 if (status != MPI_STATUS_IGNORE) {
324 memcpy(&status[index], &stat, sizeof(stat));
326 // FIXME: check this -v
327 // Move the last request to the found position
328 requests[index] = requests[count - 1];
329 requests[count - 1] = MPI_REQUEST_NULL;
334 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
340 for (i = 0; i < incount; i++) {
341 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
342 smpi_mpi_wait(&requests[i],
344 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
352 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
355 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
356 nary_tree_bcast(buf, count, datatype, root, comm, 4);
359 void smpi_mpi_barrier(MPI_Comm comm)
361 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
362 nary_tree_barrier(comm, 4);
365 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
366 void *recvbuf, int recvcount, MPI_Datatype recvtype,
367 int root, MPI_Comm comm)
369 int system_tag = 666;
370 int rank, size, src, index, sendsize, recvsize;
371 MPI_Request *requests;
373 rank = smpi_comm_rank(comm);
374 size = smpi_comm_size(comm);
376 // Send buffer to root
377 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
379 sendsize = smpi_datatype_size(sendtype);
380 recvsize = smpi_datatype_size(recvtype);
381 // Local copy from root
382 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
383 sendcount * sendsize * sizeof(char));
384 // Receive buffers from senders
385 requests = xbt_new(MPI_Request, size - 1);
387 for (src = 0; src < size; src++) {
389 requests[index] = smpi_irecv_init(&((char *) recvbuf)
390 [src * recvcount * recvsize],
391 recvcount, recvtype, src,
396 // Wait for completion of irecv's.
397 smpi_mpi_startall(size - 1, requests);
398 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
403 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
404 void *recvbuf, int *recvcounts, int *displs,
405 MPI_Datatype recvtype, int root, MPI_Comm comm)
407 int system_tag = 666;
408 int rank, size, src, index, sendsize;
409 MPI_Request *requests;
411 rank = smpi_comm_rank(comm);
412 size = smpi_comm_size(comm);
414 // Send buffer to root
415 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
417 sendsize = smpi_datatype_size(sendtype);
418 // Local copy from root
419 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
420 sendcount * sendsize * sizeof(char));
421 // Receive buffers from senders
422 requests = xbt_new(MPI_Request, size - 1);
424 for (src = 0; src < size; src++) {
427 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
428 recvcounts[src], recvtype, src, system_tag,
433 // Wait for completion of irecv's.
434 smpi_mpi_startall(size - 1, requests);
435 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
440 void smpi_mpi_allgather(void *sendbuf, int sendcount,
441 MPI_Datatype sendtype, void *recvbuf,
442 int recvcount, MPI_Datatype recvtype,
445 int system_tag = 666;
446 int rank, size, other, index, sendsize, recvsize;
447 MPI_Request *requests;
449 rank = smpi_comm_rank(comm);
450 size = smpi_comm_size(comm);
451 sendsize = smpi_datatype_size(sendtype);
452 recvsize = smpi_datatype_size(recvtype);
453 // Local copy from self
454 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
455 sendcount * sendsize * sizeof(char));
456 // Send/Recv buffers to/from others;
457 requests = xbt_new(MPI_Request, 2 * (size - 1));
459 for (other = 0; other < size; other++) {
462 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
465 requests[index] = smpi_irecv_init(&((char *) recvbuf)
466 [other * recvcount * recvsize],
467 recvcount, recvtype, other,
472 // Wait for completion of all comms.
473 smpi_mpi_startall(2 * (size - 1), requests);
474 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
478 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
479 MPI_Datatype sendtype, void *recvbuf,
480 int *recvcounts, int *displs,
481 MPI_Datatype recvtype, MPI_Comm comm)
483 int system_tag = 666;
484 int rank, size, other, index, sendsize, recvsize;
485 MPI_Request *requests;
487 rank = smpi_comm_rank(comm);
488 size = smpi_comm_size(comm);
489 sendsize = smpi_datatype_size(sendtype);
490 recvsize = smpi_datatype_size(recvtype);
491 // Local copy from self
492 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
493 sendcount * sendsize * sizeof(char));
494 // Send buffers to others;
495 requests = xbt_new(MPI_Request, 2 * (size - 1));
497 for (other = 0; other < size; other++) {
500 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
504 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
505 recvcounts[other], recvtype, other, system_tag,
510 // Wait for completion of all comms.
511 smpi_mpi_startall(2 * (size - 1), requests);
512 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
516 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
517 void *recvbuf, int recvcount, MPI_Datatype recvtype,
518 int root, MPI_Comm comm)
520 int system_tag = 666;
521 int rank, size, dst, index, sendsize, recvsize;
522 MPI_Request *requests;
524 rank = smpi_comm_rank(comm);
525 size = smpi_comm_size(comm);
527 // Recv buffer from root
528 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
531 sendsize = smpi_datatype_size(sendtype);
532 recvsize = smpi_datatype_size(recvtype);
533 // Local copy from root
534 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
535 recvcount * recvsize * sizeof(char));
536 // Send buffers to receivers
537 requests = xbt_new(MPI_Request, size - 1);
539 for (dst = 0; dst < size; dst++) {
541 requests[index] = smpi_isend_init(&((char *) sendbuf)
542 [dst * sendcount * sendsize],
543 sendcount, sendtype, dst,
548 // Wait for completion of isend's.
549 smpi_mpi_startall(size - 1, requests);
550 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
555 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
556 MPI_Datatype sendtype, void *recvbuf, int recvcount,
557 MPI_Datatype recvtype, int root, MPI_Comm comm)
559 int system_tag = 666;
560 int rank, size, dst, index, sendsize, recvsize;
561 MPI_Request *requests;
563 rank = smpi_comm_rank(comm);
564 size = smpi_comm_size(comm);
566 // Recv buffer from root
567 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
570 sendsize = smpi_datatype_size(sendtype);
571 recvsize = smpi_datatype_size(recvtype);
572 // Local copy from root
573 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
574 recvcount * recvsize * sizeof(char));
575 // Send buffers to receivers
576 requests = xbt_new(MPI_Request, size - 1);
578 for (dst = 0; dst < size; dst++) {
581 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
582 sendcounts[dst], sendtype, dst, system_tag,
587 // Wait for completion of isend's.
588 smpi_mpi_startall(size - 1, requests);
589 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
594 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
595 MPI_Datatype datatype, MPI_Op op, int root,
598 int system_tag = 666;
599 int rank, size, src, index, datasize;
600 MPI_Request *requests;
603 rank = smpi_comm_rank(comm);
604 size = smpi_comm_size(comm);
606 // Send buffer to root
607 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
609 datasize = smpi_datatype_size(datatype);
610 // Local copy from root
611 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
612 // Receive buffers from senders
613 //TODO: make a MPI_barrier here ?
614 requests = xbt_new(MPI_Request, size - 1);
615 tmpbufs = xbt_new(void *, size - 1);
617 for (src = 0; src < size; src++) {
619 tmpbufs[index] = xbt_malloc(count * datasize);
621 smpi_irecv_init(tmpbufs[index], count, datatype, src,
626 // Wait for completion of irecv's.
627 smpi_mpi_startall(size - 1, requests);
628 for (src = 0; src < size - 1; src++) {
629 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
630 if (index == MPI_UNDEFINED) {
633 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
635 for (index = 0; index < size - 1; index++) {
636 xbt_free(tmpbufs[index]);
643 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
644 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
646 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
647 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
650 FIXME: buggy implementation
652 int system_tag = 666;
653 int rank, size, other, index, datasize;
654 MPI_Request* requests;
657 rank = smpi_comm_rank(comm);
658 size = smpi_comm_size(comm);
659 datasize = smpi_datatype_size(datatype);
660 // Local copy from self
661 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
662 // Send/Recv buffers to/from others;
663 //TODO: make a MPI_barrier here ?
664 requests = xbt_new(MPI_Request, 2 * (size - 1));
665 tmpbufs = xbt_new(void*, size - 1);
667 for(other = 0; other < size; other++) {
669 tmpbufs[index / 2] = xbt_malloc(count * datasize);
670 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
671 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
675 // Wait for completion of all comms.
676 for(other = 0; other < 2 * (size - 1); other++) {
677 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
678 if(index == MPI_UNDEFINED) {
681 if((index & 1) == 1) {
682 // Request is odd: it's a irecv
683 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
686 for(index = 0; index < size - 1; index++) {
687 xbt_free(tmpbufs[index]);
694 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
695 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
697 int system_tag = 666;
698 int rank, size, other, index, datasize;
700 MPI_Request *requests;
703 rank = smpi_comm_rank(comm);
704 size = smpi_comm_size(comm);
705 datasize = smpi_datatype_size(datatype);
706 // Local copy from self
707 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
708 // Send/Recv buffers to/from others;
709 total = rank + (size - (rank + 1));
710 requests = xbt_new(MPI_Request, total);
711 tmpbufs = xbt_new(void *, rank);
713 for (other = 0; other < rank; other++) {
714 tmpbufs[index] = xbt_malloc(count * datasize);
716 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
720 for (other = rank + 1; other < size; other++) {
722 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
725 // Wait for completion of all comms.
726 smpi_mpi_startall(size - 1, requests);
727 for (other = 0; other < total; other++) {
728 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
729 if (index == MPI_UNDEFINED) {
733 // #Request is below rank: it's a irecv
734 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
737 for (index = 0; index < size - 1; index++) {
738 xbt_free(tmpbufs[index]);