1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 static MPI_Request build_request(void *buf, int count,
23 MPI_Datatype datatype, int src, int dst,
24 int tag, MPI_Comm comm, unsigned flags)
28 request = xbt_new(s_smpi_mpi_request_t, 1);
30 request->size = smpi_datatype_size(datatype) * count;
37 request->complete = 0;
38 request->match = MPI_REQUEST_NULL;
39 request->flags = flags;
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49 int dst, int tag, MPI_Comm comm)
52 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53 comm, PERSISTENT | SEND);
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59 int src, int tag, MPI_Comm comm)
62 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63 comm, PERSISTENT | RECV);
68 void smpi_mpi_start(MPI_Request request)
70 xbt_assert0(request->complete == 0,
71 "Cannot start a non-finished communication");
72 if ((request->flags & RECV) == RECV) {
73 smpi_process_post_recv(request);
74 print_request("New recv", request);
76 SIMIX_network_irecv(request->rdv, request->buf, &request->size);
78 smpi_process_post_send(request->comm, request); // FIXME
79 print_request("New send", request);
81 SIMIX_network_isend(request->rdv, request->size, -1.0,
82 request->buf, request->size, NULL);
86 void smpi_mpi_startall(int count, MPI_Request * requests)
90 for (i = 0; i < count; i++) {
91 smpi_mpi_start(requests[i]);
95 void smpi_mpi_request_free(MPI_Request * request)
98 *request = MPI_REQUEST_NULL;
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102 int dst, int tag, MPI_Comm comm)
104 MPI_Request request =
105 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106 comm, NON_PERSISTENT | SEND);
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112 int dst, int tag, MPI_Comm comm)
114 MPI_Request request =
115 smpi_isend_init(buf, count, datatype, dst, tag, comm);
117 smpi_mpi_start(request);
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122 int src, int tag, MPI_Comm comm)
124 MPI_Request request =
125 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126 comm, NON_PERSISTENT | RECV);
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132 int src, int tag, MPI_Comm comm)
134 MPI_Request request =
135 smpi_irecv_init(buf, count, datatype, src, tag, comm);
137 smpi_mpi_start(request);
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142 int tag, MPI_Comm comm, MPI_Status * status)
146 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147 smpi_mpi_wait(&request, status);
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151 int tag, MPI_Comm comm)
155 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160 int dst, int sendtag, void *recvbuf, int recvcount,
161 MPI_Datatype recvtype, int src, int recvtag,
162 MPI_Comm comm, MPI_Status * status)
164 MPI_Request requests[2];
168 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
170 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171 smpi_mpi_startall(2, requests);
172 smpi_mpi_waitall(2, requests, stats);
173 if (status != MPI_STATUS_IGNORE) {
174 // Copy receive status
175 memcpy(status, &stats[1], sizeof(MPI_Status));
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
181 return status->count / smpi_datatype_size(datatype);
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
186 if (status != MPI_STATUS_IGNORE) {
187 status->MPI_SOURCE = (*request)->src;
188 status->MPI_TAG = (*request)->tag;
189 status->MPI_ERROR = MPI_SUCCESS;
190 status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
192 SIMIX_communication_destroy((*request)->pair);
193 print_request("finishing wait", *request);
194 if ((*request)->complete == 1) {
195 SIMIX_rdv_destroy((*request)->rdv);
197 (*request)->match->complete = 1;
198 (*request)->match->match = MPI_REQUEST_NULL;
200 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201 smpi_mpi_request_free(request);
203 (*request)->rdv = NULL;
204 (*request)->pair = NULL;
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
210 int flag = (*request)->complete;
213 smpi_mpi_wait(request, status);
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
223 *index = MPI_UNDEFINED;
225 for (i = 0; i < count; i++) {
226 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227 smpi_mpi_wait(&requests[i], status);
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
238 print_request("wait", *request);
239 SIMIX_network_wait((*request)->pair, -1.0);
240 finish_wait(request, status);
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
250 index = MPI_UNDEFINED;
252 // First check for already completed requests
253 for (i = 0; i < count; i++) {
254 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
256 smpi_mpi_wait(&requests[index], status);
260 if (index == MPI_UNDEFINED) {
261 // Otherwise, wait for a request to complete
262 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
263 map = xbt_new(int, count);
265 DEBUG0("Wait for one of");
266 for (i = 0; i < count; i++) {
267 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268 print_request(" ", requests[i]);
269 xbt_dynar_push(comms, &requests[i]->pair);
275 index = SIMIX_network_waitany(comms);
277 finish_wait(&requests[index], status);
280 xbt_dynar_free(&comms);
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
291 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
295 index = smpi_mpi_waitany(count, requests, pstat);
296 if (index == MPI_UNDEFINED) {
299 if (status != MPI_STATUS_IGNORE) {
300 memcpy(&status[index], pstat, sizeof *pstat);
306 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
312 for (i = 0; i < incount; i++) {
313 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
314 smpi_mpi_wait(&requests[i],
316 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
324 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
327 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
328 nary_tree_bcast(buf, count, datatype, root, comm, 4);
331 void smpi_mpi_barrier(MPI_Comm comm)
333 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
334 nary_tree_barrier(comm, 4);
337 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
338 void *recvbuf, int recvcount, MPI_Datatype recvtype,
339 int root, MPI_Comm comm)
341 int system_tag = 666;
342 int rank, size, src, index, sendsize, recvsize;
343 MPI_Request *requests;
345 rank = smpi_comm_rank(comm);
346 size = smpi_comm_size(comm);
348 // Send buffer to root
349 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
351 sendsize = smpi_datatype_size(sendtype);
352 recvsize = smpi_datatype_size(recvtype);
353 // Local copy from root
354 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
355 sendcount * sendsize * sizeof(char));
356 // Receive buffers from senders
357 requests = xbt_new(MPI_Request, size - 1);
359 for (src = 0; src < size; src++) {
361 requests[index] = smpi_irecv_init(&((char *) recvbuf)
362 [src * recvcount * recvsize],
363 recvcount, recvtype, src,
368 // Wait for completion of irecv's.
369 smpi_mpi_startall(size - 1, requests);
370 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
375 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
376 void *recvbuf, int *recvcounts, int *displs,
377 MPI_Datatype recvtype, int root, MPI_Comm comm)
379 int system_tag = 666;
380 int rank, size, src, index, sendsize;
381 MPI_Request *requests;
383 rank = smpi_comm_rank(comm);
384 size = smpi_comm_size(comm);
386 // Send buffer to root
387 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
389 sendsize = smpi_datatype_size(sendtype);
390 // Local copy from root
391 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
392 sendcount * sendsize * sizeof(char));
393 // Receive buffers from senders
394 requests = xbt_new(MPI_Request, size - 1);
396 for (src = 0; src < size; src++) {
399 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
400 recvcounts[src], recvtype, src, system_tag,
405 // Wait for completion of irecv's.
406 smpi_mpi_startall(size - 1, requests);
407 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
412 void smpi_mpi_allgather(void *sendbuf, int sendcount,
413 MPI_Datatype sendtype, void *recvbuf,
414 int recvcount, MPI_Datatype recvtype,
417 int system_tag = 666;
418 int rank, size, other, index, sendsize, recvsize;
419 MPI_Request *requests;
421 rank = smpi_comm_rank(comm);
422 size = smpi_comm_size(comm);
423 sendsize = smpi_datatype_size(sendtype);
424 recvsize = smpi_datatype_size(recvtype);
425 // Local copy from self
426 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
427 sendcount * sendsize * sizeof(char));
428 // Send/Recv buffers to/from others;
429 requests = xbt_new(MPI_Request, 2 * (size - 1));
431 for (other = 0; other < size; other++) {
434 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
437 requests[index] = smpi_irecv_init(&((char *) recvbuf)
438 [other * recvcount * recvsize],
439 recvcount, recvtype, other,
444 // Wait for completion of all comms.
445 smpi_mpi_startall(2 * (size - 1), requests);
446 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
450 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
451 MPI_Datatype sendtype, void *recvbuf,
452 int *recvcounts, int *displs,
453 MPI_Datatype recvtype, MPI_Comm comm)
455 int system_tag = 666;
456 int rank, size, other, index, sendsize, recvsize;
457 MPI_Request *requests;
459 rank = smpi_comm_rank(comm);
460 size = smpi_comm_size(comm);
461 sendsize = smpi_datatype_size(sendtype);
462 recvsize = smpi_datatype_size(recvtype);
463 // Local copy from self
464 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
465 sendcount * sendsize * sizeof(char));
466 // Send buffers to others;
467 requests = xbt_new(MPI_Request, 2 * (size - 1));
469 for (other = 0; other < size; other++) {
472 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
476 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
477 recvcounts[other], recvtype, other, system_tag,
482 // Wait for completion of all comms.
483 smpi_mpi_startall(2 * (size - 1), requests);
484 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
488 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
489 void *recvbuf, int recvcount, MPI_Datatype recvtype,
490 int root, MPI_Comm comm)
492 int system_tag = 666;
493 int rank, size, dst, index, sendsize, recvsize;
494 MPI_Request *requests;
496 rank = smpi_comm_rank(comm);
497 size = smpi_comm_size(comm);
499 // Recv buffer from root
500 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
503 sendsize = smpi_datatype_size(sendtype);
504 recvsize = smpi_datatype_size(recvtype);
505 // Local copy from root
506 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
507 recvcount * recvsize * sizeof(char));
508 // Send buffers to receivers
509 requests = xbt_new(MPI_Request, size - 1);
511 for (dst = 0; dst < size; dst++) {
513 requests[index] = smpi_isend_init(&((char *) sendbuf)
514 [dst * sendcount * sendsize],
515 sendcount, sendtype, dst,
520 // Wait for completion of isend's.
521 smpi_mpi_startall(size - 1, requests);
522 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
527 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
528 MPI_Datatype sendtype, void *recvbuf, int recvcount,
529 MPI_Datatype recvtype, int root, MPI_Comm comm)
531 int system_tag = 666;
532 int rank, size, dst, index, sendsize, recvsize;
533 MPI_Request *requests;
535 rank = smpi_comm_rank(comm);
536 size = smpi_comm_size(comm);
538 // Recv buffer from root
539 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
542 sendsize = smpi_datatype_size(sendtype);
543 recvsize = smpi_datatype_size(recvtype);
544 // Local copy from root
545 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
546 recvcount * recvsize * sizeof(char));
547 // Send buffers to receivers
548 requests = xbt_new(MPI_Request, size - 1);
550 for (dst = 0; dst < size; dst++) {
553 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
554 sendcounts[dst], sendtype, dst, system_tag,
559 // Wait for completion of isend's.
560 smpi_mpi_startall(size - 1, requests);
561 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
566 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
567 MPI_Datatype datatype, MPI_Op op, int root,
570 int system_tag = 666;
571 int rank, size, src, index, datasize;
572 MPI_Request *requests;
575 rank = smpi_comm_rank(comm);
576 size = smpi_comm_size(comm);
578 // Send buffer to root
579 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
581 datasize = smpi_datatype_size(datatype);
582 // Local copy from root
583 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
584 // Receive buffers from senders
585 //TODO: make a MPI_barrier here ?
586 requests = xbt_new(MPI_Request, size - 1);
587 tmpbufs = xbt_new(void *, size - 1);
589 for (src = 0; src < size; src++) {
591 tmpbufs[index] = xbt_malloc(count * datasize);
593 smpi_irecv_init(tmpbufs[index], count, datatype, src,
598 // Wait for completion of irecv's.
599 smpi_mpi_startall(size - 1, requests);
600 for (src = 0; src < size - 1; src++) {
601 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
602 if (index == MPI_UNDEFINED) {
605 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
607 for (index = 0; index < size - 1; index++) {
608 xbt_free(tmpbufs[index]);
615 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
616 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
618 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
619 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
622 FIXME: buggy implementation
624 int system_tag = 666;
625 int rank, size, other, index, datasize;
626 MPI_Request* requests;
629 rank = smpi_comm_rank(comm);
630 size = smpi_comm_size(comm);
631 datasize = smpi_datatype_size(datatype);
632 // Local copy from self
633 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
634 // Send/Recv buffers to/from others;
635 //TODO: make a MPI_barrier here ?
636 requests = xbt_new(MPI_Request, 2 * (size - 1));
637 tmpbufs = xbt_new(void*, size - 1);
639 for(other = 0; other < size; other++) {
641 tmpbufs[index / 2] = xbt_malloc(count * datasize);
642 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
643 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
647 // Wait for completion of all comms.
648 for(other = 0; other < 2 * (size - 1); other++) {
649 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
650 if(index == MPI_UNDEFINED) {
653 if((index & 1) == 1) {
654 // Request is odd: it's a irecv
655 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
658 for(index = 0; index < size - 1; index++) {
659 xbt_free(tmpbufs[index]);
666 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
667 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
669 int system_tag = 666;
670 int rank, size, other, index, datasize;
672 MPI_Request *requests;
675 rank = smpi_comm_rank(comm);
676 size = smpi_comm_size(comm);
677 datasize = smpi_datatype_size(datatype);
678 // Local copy from self
679 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
680 // Send/Recv buffers to/from others;
681 total = rank + (size - (rank + 1));
682 requests = xbt_new(MPI_Request, total);
683 tmpbufs = xbt_new(void *, rank);
685 for (other = 0; other < rank; other++) {
686 tmpbufs[index] = xbt_malloc(count * datasize);
688 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
692 for (other = rank + 1; other < size; other++) {
694 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
697 // Wait for completion of all comms.
698 smpi_mpi_startall(size - 1, requests);
699 for (other = 0; other < total; other++) {
700 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
701 if (index == MPI_UNDEFINED) {
705 // #Request is below rank: it's a irecv
706 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
709 for (index = 0; index < size - 1; index++) {
710 xbt_free(tmpbufs[index]);