1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
23 static int match_recv(void* a, void* b) {
24 MPI_Request ref = (MPI_Request)a;
25 MPI_Request req = (MPI_Request)b;
27 xbt_assert0(ref, "Cannot match recv against null reference");
28 xbt_assert0(req, "Cannot match recv against null request");
29 return req->comm == ref->comm
30 && (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
31 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
34 static int match_send(void* a, void* b) {
35 MPI_Request ref = (MPI_Request)a;
36 MPI_Request req = (MPI_Request)b;
38 xbt_assert0(ref, "Cannot match send against null reference");
39 xbt_assert0(req, "Cannot match send against null request");
40 return req->comm == ref->comm
41 && (req->src == MPI_ANY_SOURCE || req->src == ref->src)
42 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
45 static MPI_Request build_request(void *buf, int count,
46 MPI_Datatype datatype, int src, int dst,
47 int tag, MPI_Comm comm, unsigned flags)
51 request = xbt_new(s_smpi_mpi_request_t, 1);
53 request->size = smpi_datatype_size(datatype) * count;
58 request->action = NULL;
59 request->flags = flags;
67 /* MPI Low level calls */
68 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
69 int dst, int tag, MPI_Comm comm)
72 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
73 comm, PERSISTENT | SEND);
78 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
79 int src, int tag, MPI_Comm comm)
82 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
83 comm, PERSISTENT | RECV);
88 void smpi_mpi_start(MPI_Request request)
92 xbt_assert0(!request->action,
93 "Cannot (re)start a non-finished communication");
94 if(request->flags & RECV) {
95 print_request("New recv", request);
96 mailbox = smpi_process_mailbox();
97 request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
99 print_request("New send", request);
100 mailbox = smpi_process_remote_mailbox(request->dst);
101 request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
102 request->buf, request->size, &match_send, request);
104 SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
109 void smpi_mpi_startall(int count, MPI_Request * requests)
113 for(i = 0; i < count; i++) {
114 smpi_mpi_start(requests[i]);
118 void smpi_mpi_request_free(MPI_Request * request)
121 *request = MPI_REQUEST_NULL;
124 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
125 int dst, int tag, MPI_Comm comm)
127 MPI_Request request =
128 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
129 comm, NON_PERSISTENT | SEND);
134 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
135 int dst, int tag, MPI_Comm comm)
137 MPI_Request request =
138 smpi_isend_init(buf, count, datatype, dst, tag, comm);
140 smpi_mpi_start(request);
144 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
145 int src, int tag, MPI_Comm comm)
147 MPI_Request request =
148 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
149 comm, NON_PERSISTENT | RECV);
154 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
155 int src, int tag, MPI_Comm comm)
157 MPI_Request request =
158 smpi_irecv_init(buf, count, datatype, src, tag, comm);
160 smpi_mpi_start(request);
164 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
165 int tag, MPI_Comm comm, MPI_Status * status)
169 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
170 smpi_mpi_wait(&request, status);
173 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
174 int tag, MPI_Comm comm)
178 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
179 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
182 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
183 int dst, int sendtag, void *recvbuf, int recvcount,
184 MPI_Datatype recvtype, int src, int recvtag,
185 MPI_Comm comm, MPI_Status * status)
187 MPI_Request requests[2];
191 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
193 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
194 smpi_mpi_startall(2, requests);
195 smpi_mpi_waitall(2, requests, stats);
196 if(status != MPI_STATUS_IGNORE) {
197 // Copy receive status
198 memcpy(status, &stats[1], sizeof(MPI_Status));
202 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
204 return status->count / smpi_datatype_size(datatype);
207 static void finish_wait(MPI_Request * request, MPI_Status * status)
209 MPI_Request req = *request;
211 if(status != MPI_STATUS_IGNORE) {
212 status->MPI_SOURCE = req->src;
213 status->MPI_TAG = req->tag;
214 status->MPI_ERROR = MPI_SUCCESS;
215 status->count = SIMIX_req_comm_get_dst_buff_size(req->action);
217 SIMIX_req_comm_destroy(req->action);
218 print_request("Finishing", req);
219 if(req->flags & NON_PERSISTENT) {
220 smpi_mpi_request_free(request);
226 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
228 int flag = SIMIX_req_comm_test((*request)->action);
231 smpi_mpi_wait(request, status);
236 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
243 *index = MPI_UNDEFINED;
246 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
247 map = xbt_new(int, count);
249 for(i = 0; i < count; i++) {
250 if(requests[i]->action) {
251 xbt_dynar_push(comms, &requests[i]->action);
257 *index = SIMIX_req_comm_testany(comms);
258 *index = map[*index];
259 if(*index != MPI_UNDEFINED) {
260 smpi_mpi_wait(&requests[*index], status);
265 xbt_dynar_free(&comms);
270 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
272 print_request("Waiting", *request);
273 SIMIX_req_comm_wait((*request)->action, -1.0);
274 finish_wait(request, status);
277 int smpi_mpi_waitany(int count, MPI_Request requests[],
284 index = MPI_UNDEFINED;
286 // Wait for a request to complete
287 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
288 map = xbt_new(int, count);
290 DEBUG0("Wait for one of");
291 for(i = 0; i < count; i++) {
292 if(requests[i] != MPI_REQUEST_NULL) {
293 print_request(" ", requests[i]);
294 xbt_dynar_push(comms, &requests[i]->action);
300 index = SIMIX_req_comm_waitany(comms);
302 finish_wait(&requests[index], status);
305 xbt_dynar_free(&comms);
310 void smpi_mpi_waitall(int count, MPI_Request requests[],
315 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
317 for(c = 0; c < count; c++) {
319 smpi_mpi_wait(&requests[c], pstat);
322 index = smpi_mpi_waitany(count, requests, pstat);
323 if(index == MPI_UNDEFINED) {
327 if(status != MPI_STATUS_IGNORE) {
328 memcpy(&status[index], pstat, sizeof(*pstat));
333 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
339 for(i = 0; i < incount; i++) {
340 if(smpi_mpi_testany(incount, requests, &index, status)) {
341 indices[count] = index;
348 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
351 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
352 nary_tree_bcast(buf, count, datatype, root, comm, 4);
355 void smpi_mpi_barrier(MPI_Comm comm)
357 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
358 nary_tree_barrier(comm, 4);
361 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
362 void *recvbuf, int recvcount, MPI_Datatype recvtype,
363 int root, MPI_Comm comm)
365 int system_tag = 666;
366 int rank, size, src, index, sendsize, recvsize;
367 MPI_Request *requests;
369 rank = smpi_comm_rank(comm);
370 size = smpi_comm_size(comm);
372 // Send buffer to root
373 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
375 sendsize = smpi_datatype_size(sendtype);
376 recvsize = smpi_datatype_size(recvtype);
377 // Local copy from root
378 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
379 sendcount * sendsize * sizeof(char));
380 // Receive buffers from senders
381 requests = xbt_new(MPI_Request, size - 1);
383 for(src = 0; src < size; src++) {
385 requests[index] = smpi_irecv_init(&((char *) recvbuf)
386 [src * recvcount * recvsize],
387 recvcount, recvtype, src,
392 // Wait for completion of irecv's.
393 smpi_mpi_startall(size - 1, requests);
394 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
399 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
400 void *recvbuf, int *recvcounts, int *displs,
401 MPI_Datatype recvtype, int root, MPI_Comm comm)
403 int system_tag = 666;
404 int rank, size, src, index, sendsize;
405 MPI_Request *requests;
407 rank = smpi_comm_rank(comm);
408 size = smpi_comm_size(comm);
410 // Send buffer to root
411 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
413 sendsize = smpi_datatype_size(sendtype);
414 // Local copy from root
415 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
416 sendcount * sendsize * sizeof(char));
417 // Receive buffers from senders
418 requests = xbt_new(MPI_Request, size - 1);
420 for(src = 0; src < size; src++) {
423 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
424 recvcounts[src], recvtype, src, system_tag,
429 // Wait for completion of irecv's.
430 smpi_mpi_startall(size - 1, requests);
431 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
436 void smpi_mpi_allgather(void *sendbuf, int sendcount,
437 MPI_Datatype sendtype, void *recvbuf,
438 int recvcount, MPI_Datatype recvtype,
441 int system_tag = 666;
442 int rank, size, other, index, sendsize, recvsize;
443 MPI_Request *requests;
445 rank = smpi_comm_rank(comm);
446 size = smpi_comm_size(comm);
447 sendsize = smpi_datatype_size(sendtype);
448 recvsize = smpi_datatype_size(recvtype);
449 // Local copy from self
450 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
451 sendcount * sendsize * sizeof(char));
452 // Send/Recv buffers to/from others;
453 requests = xbt_new(MPI_Request, 2 * (size - 1));
455 for(other = 0; other < size; other++) {
458 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
461 requests[index] = smpi_irecv_init(&((char *) recvbuf)
462 [other * recvcount * recvsize],
463 recvcount, recvtype, other,
468 // Wait for completion of all comms.
469 smpi_mpi_startall(2 * (size - 1), requests);
470 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
474 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
475 MPI_Datatype sendtype, void *recvbuf,
476 int *recvcounts, int *displs,
477 MPI_Datatype recvtype, MPI_Comm comm)
479 int system_tag = 666;
480 int rank, size, other, index, sendsize, recvsize;
481 MPI_Request *requests;
483 rank = smpi_comm_rank(comm);
484 size = smpi_comm_size(comm);
485 sendsize = smpi_datatype_size(sendtype);
486 recvsize = smpi_datatype_size(recvtype);
487 // Local copy from self
488 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
489 sendcount * sendsize * sizeof(char));
490 // Send buffers to others;
491 requests = xbt_new(MPI_Request, 2 * (size - 1));
493 for(other = 0; other < size; other++) {
496 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
500 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
501 recvcounts[other], recvtype, other, system_tag,
506 // Wait for completion of all comms.
507 smpi_mpi_startall(2 * (size - 1), requests);
508 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
512 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
513 void *recvbuf, int recvcount, MPI_Datatype recvtype,
514 int root, MPI_Comm comm)
516 int system_tag = 666;
517 int rank, size, dst, index, sendsize, recvsize;
518 MPI_Request *requests;
520 rank = smpi_comm_rank(comm);
521 size = smpi_comm_size(comm);
523 // Recv buffer from root
524 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
527 sendsize = smpi_datatype_size(sendtype);
528 recvsize = smpi_datatype_size(recvtype);
529 // Local copy from root
530 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
531 recvcount * recvsize * sizeof(char));
532 // Send buffers to receivers
533 requests = xbt_new(MPI_Request, size - 1);
535 for(dst = 0; dst < size; dst++) {
537 requests[index] = smpi_isend_init(&((char *) sendbuf)
538 [dst * sendcount * sendsize],
539 sendcount, sendtype, dst,
544 // Wait for completion of isend's.
545 smpi_mpi_startall(size - 1, requests);
546 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
551 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
552 MPI_Datatype sendtype, void *recvbuf, int recvcount,
553 MPI_Datatype recvtype, int root, MPI_Comm comm)
555 int system_tag = 666;
556 int rank, size, dst, index, sendsize, recvsize;
557 MPI_Request *requests;
559 rank = smpi_comm_rank(comm);
560 size = smpi_comm_size(comm);
562 // Recv buffer from root
563 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
566 sendsize = smpi_datatype_size(sendtype);
567 recvsize = smpi_datatype_size(recvtype);
568 // Local copy from root
569 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
570 recvcount * recvsize * sizeof(char));
571 // Send buffers to receivers
572 requests = xbt_new(MPI_Request, size - 1);
574 for(dst = 0; dst < size; dst++) {
577 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
578 sendcounts[dst], sendtype, dst, system_tag,
583 // Wait for completion of isend's.
584 smpi_mpi_startall(size - 1, requests);
585 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
590 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
591 MPI_Datatype datatype, MPI_Op op, int root,
594 int system_tag = 666;
595 int rank, size, src, index, datasize;
596 MPI_Request *requests;
599 rank = smpi_comm_rank(comm);
600 size = smpi_comm_size(comm);
602 // Send buffer to root
603 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
605 datasize = smpi_datatype_size(datatype);
606 // Local copy from root
607 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
608 // Receive buffers from senders
609 //TODO: make a MPI_barrier here ?
610 requests = xbt_new(MPI_Request, size - 1);
611 tmpbufs = xbt_new(void *, size - 1);
613 for(src = 0; src < size; src++) {
615 tmpbufs[index] = xbt_malloc(count * datasize);
617 smpi_irecv_init(tmpbufs[index], count, datatype, src,
622 // Wait for completion of irecv's.
623 smpi_mpi_startall(size - 1, requests);
624 for(src = 0; src < size - 1; src++) {
625 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
626 if(index == MPI_UNDEFINED) {
629 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
631 for(index = 0; index < size - 1; index++) {
632 xbt_free(tmpbufs[index]);
639 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
640 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
642 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
643 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
646 FIXME: buggy implementation
648 int system_tag = 666;
649 int rank, size, other, index, datasize;
650 MPI_Request* requests;
653 rank = smpi_comm_rank(comm);
654 size = smpi_comm_size(comm);
655 datasize = smpi_datatype_size(datatype);
656 // Local copy from self
657 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
658 // Send/Recv buffers to/from others;
659 //TODO: make a MPI_barrier here ?
660 requests = xbt_new(MPI_Request, 2 * (size - 1));
661 tmpbufs = xbt_new(void*, size - 1);
663 for(other = 0; other < size; other++) {
665 tmpbufs[index / 2] = xbt_malloc(count * datasize);
666 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
667 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
671 // Wait for completion of all comms.
672 for(other = 0; other < 2 * (size - 1); other++) {
673 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
674 if(index == MPI_UNDEFINED) {
677 if((index & 1) == 1) {
678 // Request is odd: it's a irecv
679 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
682 for(index = 0; index < size - 1; index++) {
683 xbt_free(tmpbufs[index]);
690 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
691 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
693 int system_tag = 666;
694 int rank, size, other, index, datasize;
696 MPI_Request *requests;
699 rank = smpi_comm_rank(comm);
700 size = smpi_comm_size(comm);
701 datasize = smpi_datatype_size(datatype);
702 // Local copy from self
703 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
704 // Send/Recv buffers to/from others;
705 total = rank + (size - (rank + 1));
706 requests = xbt_new(MPI_Request, total);
707 tmpbufs = xbt_new(void *, rank);
709 for(other = 0; other < rank; other++) {
710 tmpbufs[index] = xbt_malloc(count * datasize);
712 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
716 for(other = rank + 1; other < size; other++) {
718 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
721 // Wait for completion of all comms.
722 smpi_mpi_startall(size - 1, requests);
723 for(other = 0; other < total; other++) {
724 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
725 if(index == MPI_UNDEFINED) {
729 // #Request is below rank: it's a irecv
730 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
733 for(index = 0; index < size - 1; index++) {
734 xbt_free(tmpbufs[index]);