1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
23 static int match_recv(void* a, void* b) {
24 MPI_Request ref = (MPI_Request)a;
25 MPI_Request req = (MPI_Request)b;
27 xbt_assert(ref, "Cannot match recv against null reference");
28 xbt_assert(req, "Cannot match recv against null request");
29 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
30 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
33 static int match_send(void* a, void* b) {
34 MPI_Request ref = (MPI_Request)a;
35 MPI_Request req = (MPI_Request)b;
37 xbt_assert(ref, "Cannot match send against null reference");
38 xbt_assert(req, "Cannot match send against null request");
39 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
40 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
43 static MPI_Request build_request(void *buf, int count,
44 MPI_Datatype datatype, int src, int dst,
45 int tag, MPI_Comm comm, unsigned flags)
49 request = xbt_new(s_smpi_mpi_request_t, 1);
51 request->size = smpi_datatype_size(datatype) * count;
56 request->action = NULL;
57 request->flags = flags;
65 /* MPI Low level calls */
66 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
67 int dst, int tag, MPI_Comm comm)
70 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
71 comm, PERSISTENT | SEND);
76 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
77 int src, int tag, MPI_Comm comm)
80 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
81 comm, PERSISTENT | RECV);
86 void smpi_mpi_start(MPI_Request request)
90 xbt_assert(!request->action,
91 "Cannot (re)start a non-finished communication");
92 if(request->flags & RECV) {
93 print_request("New recv", request);
94 mailbox = smpi_process_mailbox();
95 request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
97 print_request("New send", request);
98 mailbox = smpi_process_remote_mailbox(request->dst);
99 request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
100 request->buf, request->size, &match_send, request, 0);
102 SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
107 void smpi_mpi_startall(int count, MPI_Request * requests)
111 for(i = 0; i < count; i++) {
112 smpi_mpi_start(requests[i]);
116 void smpi_mpi_request_free(MPI_Request * request)
119 *request = MPI_REQUEST_NULL;
122 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
123 int dst, int tag, MPI_Comm comm)
125 MPI_Request request =
126 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
127 comm, NON_PERSISTENT | SEND);
132 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
133 int dst, int tag, MPI_Comm comm)
135 MPI_Request request =
136 smpi_isend_init(buf, count, datatype, dst, tag, comm);
138 smpi_mpi_start(request);
142 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
143 int src, int tag, MPI_Comm comm)
145 MPI_Request request =
146 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
147 comm, NON_PERSISTENT | RECV);
152 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
153 int src, int tag, MPI_Comm comm)
155 MPI_Request request =
156 smpi_irecv_init(buf, count, datatype, src, tag, comm);
158 smpi_mpi_start(request);
162 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
163 int tag, MPI_Comm comm, MPI_Status * status)
167 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
168 smpi_mpi_wait(&request, status);
171 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
172 int tag, MPI_Comm comm)
176 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
177 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
180 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
181 int dst, int sendtag, void *recvbuf, int recvcount,
182 MPI_Datatype recvtype, int src, int recvtag,
183 MPI_Comm comm, MPI_Status * status)
185 MPI_Request requests[2];
189 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
191 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
192 smpi_mpi_startall(2, requests);
193 smpi_mpi_waitall(2, requests, stats);
194 if(status != MPI_STATUS_IGNORE) {
195 // Copy receive status
196 memcpy(status, &stats[1], sizeof(MPI_Status));
200 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
202 return status->count / smpi_datatype_size(datatype);
205 static void finish_wait(MPI_Request * request, MPI_Status * status)
207 MPI_Request req = *request;
209 if(status != MPI_STATUS_IGNORE) {
210 status->MPI_SOURCE = req->src;
211 status->MPI_TAG = req->tag;
212 status->MPI_ERROR = MPI_SUCCESS;
213 status->count = req->size;
215 print_request("Finishing", req);
216 if(req->flags & NON_PERSISTENT) {
217 smpi_mpi_request_free(request);
223 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
225 int flag = SIMIX_req_comm_test((*request)->action);
228 smpi_mpi_wait(request, status);
233 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
240 *index = MPI_UNDEFINED;
243 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
244 map = xbt_new(int, count);
246 for(i = 0; i < count; i++) {
247 if(requests[i]->action) {
248 xbt_dynar_push(comms, &requests[i]->action);
254 *index = SIMIX_req_comm_testany(comms);
255 *index = map[*index];
256 if(*index != MPI_UNDEFINED) {
257 smpi_mpi_wait(&requests[*index], status);
262 xbt_dynar_free(&comms);
267 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
269 print_request("Waiting", *request);
270 SIMIX_req_comm_wait((*request)->action, -1.0);
271 finish_wait(request, status);
274 int smpi_mpi_waitany(int count, MPI_Request requests[],
281 index = MPI_UNDEFINED;
283 // Wait for a request to complete
284 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
285 map = xbt_new(int, count);
287 XBT_DEBUG("Wait for one of");
288 for(i = 0; i < count; i++) {
289 if(requests[i] != MPI_REQUEST_NULL) {
290 print_request(" ", requests[i]);
291 xbt_dynar_push(comms, &requests[i]->action);
297 index = SIMIX_req_comm_waitany(comms);
299 finish_wait(&requests[index], status);
302 xbt_dynar_free(&comms);
307 void smpi_mpi_waitall(int count, MPI_Request requests[],
312 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
314 for(c = 0; c < count; c++) {
316 smpi_mpi_wait(&requests[c], pstat);
319 index = smpi_mpi_waitany(count, requests, pstat);
320 if(index == MPI_UNDEFINED) {
324 if(status != MPI_STATUS_IGNORE) {
325 memcpy(&status[index], pstat, sizeof(*pstat));
330 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
336 for(i = 0; i < incount; i++) {
337 if(smpi_mpi_testany(incount, requests, &index, status)) {
338 indices[count] = index;
345 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
348 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
349 nary_tree_bcast(buf, count, datatype, root, comm, 4);
352 void smpi_mpi_barrier(MPI_Comm comm)
354 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
355 nary_tree_barrier(comm, 4);
358 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
359 void *recvbuf, int recvcount, MPI_Datatype recvtype,
360 int root, MPI_Comm comm)
362 int system_tag = 666;
363 int rank, size, src, index, sendsize, recvsize;
364 MPI_Request *requests;
366 rank = smpi_comm_rank(comm);
367 size = smpi_comm_size(comm);
369 // Send buffer to root
370 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
372 sendsize = smpi_datatype_size(sendtype);
373 recvsize = smpi_datatype_size(recvtype);
374 // Local copy from root
375 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
376 sendcount * sendsize * sizeof(char));
377 // Receive buffers from senders
378 requests = xbt_new(MPI_Request, size - 1);
380 for(src = 0; src < size; src++) {
382 requests[index] = smpi_irecv_init(&((char *) recvbuf)
383 [src * recvcount * recvsize],
384 recvcount, recvtype, src,
389 // Wait for completion of irecv's.
390 smpi_mpi_startall(size - 1, requests);
391 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
396 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
397 void *recvbuf, int *recvcounts, int *displs,
398 MPI_Datatype recvtype, int root, MPI_Comm comm)
400 int system_tag = 666;
401 int rank, size, src, index, sendsize, recvsize;
402 MPI_Request *requests;
404 rank = smpi_comm_rank(comm);
405 size = smpi_comm_size(comm);
407 // Send buffer to root
408 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
410 sendsize = smpi_datatype_size(sendtype);
411 recvsize = smpi_datatype_size(recvtype);
412 // Local copy from root
413 memcpy(&((char *) recvbuf)[displs[root] * recvsize], sendbuf,
414 sendcount * sendsize * sizeof(char));
415 // Receive buffers from senders
416 requests = xbt_new(MPI_Request, size - 1);
418 for(src = 0; src < size; src++) {
421 smpi_irecv_init(&((char *) recvbuf)[displs[src] * recvsize],
422 recvcounts[src], recvtype, src, system_tag,
427 // Wait for completion of irecv's.
428 smpi_mpi_startall(size - 1, requests);
429 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
434 void smpi_mpi_allgather(void *sendbuf, int sendcount,
435 MPI_Datatype sendtype, void *recvbuf,
436 int recvcount, MPI_Datatype recvtype,
439 int system_tag = 666;
440 int rank, size, other, index, sendsize, recvsize;
441 MPI_Request *requests;
443 rank = smpi_comm_rank(comm);
444 size = smpi_comm_size(comm);
445 sendsize = smpi_datatype_size(sendtype);
446 recvsize = smpi_datatype_size(recvtype);
447 // Local copy from self
448 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
449 sendcount * sendsize * sizeof(char));
450 // Send/Recv buffers to/from others;
451 requests = xbt_new(MPI_Request, 2 * (size - 1));
453 for(other = 0; other < size; other++) {
456 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
459 requests[index] = smpi_irecv_init(&((char *) recvbuf)
460 [other * recvcount * recvsize],
461 recvcount, recvtype, other,
466 // Wait for completion of all comms.
467 smpi_mpi_startall(2 * (size - 1), requests);
468 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
472 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
473 MPI_Datatype sendtype, void *recvbuf,
474 int *recvcounts, int *displs,
475 MPI_Datatype recvtype, MPI_Comm comm)
477 int system_tag = 666;
478 int rank, size, other, index, sendsize, recvsize;
479 MPI_Request *requests;
481 rank = smpi_comm_rank(comm);
482 size = smpi_comm_size(comm);
483 sendsize = smpi_datatype_size(sendtype);
484 recvsize = smpi_datatype_size(recvtype);
485 // Local copy from self
486 memcpy(&((char *) recvbuf)[displs[rank] * recvsize], sendbuf,
487 sendcount * sendsize * sizeof(char));
488 // Send buffers to others;
489 requests = xbt_new(MPI_Request, 2 * (size - 1));
491 for(other = 0; other < size; other++) {
494 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
498 smpi_irecv_init(&((char *) recvbuf)[displs[other] * recvsize],
499 recvcounts[other], recvtype, other, system_tag,
504 // Wait for completion of all comms.
505 smpi_mpi_startall(2 * (size - 1), requests);
506 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
510 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
511 void *recvbuf, int recvcount, MPI_Datatype recvtype,
512 int root, MPI_Comm comm)
514 int system_tag = 666;
515 int rank, size, dst, index, sendsize, recvsize;
516 MPI_Request *requests;
518 rank = smpi_comm_rank(comm);
519 size = smpi_comm_size(comm);
521 // Recv buffer from root
522 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
525 sendsize = smpi_datatype_size(sendtype);
526 recvsize = smpi_datatype_size(recvtype);
527 // Local copy from root
528 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
529 recvcount * recvsize * sizeof(char));
530 // Send buffers to receivers
531 requests = xbt_new(MPI_Request, size - 1);
533 for(dst = 0; dst < size; dst++) {
535 requests[index] = smpi_isend_init(&((char *) sendbuf)
536 [dst * sendcount * sendsize],
537 sendcount, sendtype, dst,
542 // Wait for completion of isend's.
543 smpi_mpi_startall(size - 1, requests);
544 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
549 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
550 MPI_Datatype sendtype, void *recvbuf, int recvcount,
551 MPI_Datatype recvtype, int root, MPI_Comm comm)
553 int system_tag = 666;
554 int rank, size, dst, index, sendsize, recvsize;
555 MPI_Request *requests;
557 rank = smpi_comm_rank(comm);
558 size = smpi_comm_size(comm);
560 // Recv buffer from root
561 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
564 sendsize = smpi_datatype_size(sendtype);
565 recvsize = smpi_datatype_size(recvtype);
566 // Local copy from root
567 memcpy(recvbuf, &((char *) sendbuf)[displs[root] * sendsize],
568 recvcount * recvsize * sizeof(char));
569 // Send buffers to receivers
570 requests = xbt_new(MPI_Request, size - 1);
572 for(dst = 0; dst < size; dst++) {
575 smpi_isend_init(&((char *) sendbuf)[displs[dst] * sendsize],
576 sendcounts[dst], sendtype, dst, system_tag,
581 // Wait for completion of isend's.
582 smpi_mpi_startall(size - 1, requests);
583 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
588 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
589 MPI_Datatype datatype, MPI_Op op, int root,
592 int system_tag = 666;
593 int rank, size, src, index, datasize;
594 MPI_Request *requests;
597 rank = smpi_comm_rank(comm);
598 size = smpi_comm_size(comm);
600 // Send buffer to root
601 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
603 datasize = smpi_datatype_size(datatype);
604 // Local copy from root
605 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
606 // Receive buffers from senders
607 //TODO: make a MPI_barrier here ?
608 requests = xbt_new(MPI_Request, size - 1);
609 tmpbufs = xbt_new(void *, size - 1);
611 for(src = 0; src < size; src++) {
613 tmpbufs[index] = xbt_malloc(count * datasize);
615 smpi_irecv_init(tmpbufs[index], count, datatype, src,
620 // Wait for completion of irecv's.
621 smpi_mpi_startall(size - 1, requests);
622 for(src = 0; src < size - 1; src++) {
623 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
624 if(index == MPI_UNDEFINED) {
627 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
629 for(index = 0; index < size - 1; index++) {
630 xbt_free(tmpbufs[index]);
637 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
638 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
640 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
641 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
644 FIXME: buggy implementation
646 int system_tag = 666;
647 int rank, size, other, index, datasize;
648 MPI_Request* requests;
651 rank = smpi_comm_rank(comm);
652 size = smpi_comm_size(comm);
653 datasize = smpi_datatype_size(datatype);
654 // Local copy from self
655 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
656 // Send/Recv buffers to/from others;
657 //TODO: make a MPI_barrier here ?
658 requests = xbt_new(MPI_Request, 2 * (size - 1));
659 tmpbufs = xbt_new(void*, size - 1);
661 for(other = 0; other < size; other++) {
663 tmpbufs[index / 2] = xbt_malloc(count * datasize);
664 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
665 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
669 // Wait for completion of all comms.
670 for(other = 0; other < 2 * (size - 1); other++) {
671 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
672 if(index == MPI_UNDEFINED) {
675 if((index & 1) == 1) {
676 // Request is odd: it's a irecv
677 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
680 for(index = 0; index < size - 1; index++) {
681 xbt_free(tmpbufs[index]);
688 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
689 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
691 int system_tag = 666;
692 int rank, size, other, index, datasize;
694 MPI_Request *requests;
697 rank = smpi_comm_rank(comm);
698 size = smpi_comm_size(comm);
699 datasize = smpi_datatype_size(datatype);
700 // Local copy from self
701 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
702 // Send/Recv buffers to/from others;
703 total = rank + (size - (rank + 1));
704 requests = xbt_new(MPI_Request, total);
705 tmpbufs = xbt_new(void *, rank);
707 for(other = 0; other < rank; other++) {
708 tmpbufs[index] = xbt_malloc(count * datasize);
710 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
714 for(other = rank + 1; other < size; other++) {
716 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
719 // Wait for completion of all comms.
720 smpi_mpi_startall(size - 1, requests);
721 for(other = 0; other < total; other++) {
722 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
723 if(index == MPI_UNDEFINED) {
727 // #Request is below rank: it's a irecv
728 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
731 for(index = 0; index < rank; index++) {
732 xbt_free(tmpbufs[index]);