1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 static MPI_Request build_request(void *buf, int count,
23 MPI_Datatype datatype, int src, int dst,
24 int tag, MPI_Comm comm, unsigned flags)
28 request = xbt_new(s_smpi_mpi_request_t, 1);
30 request->size = smpi_datatype_size(datatype) * count;
37 request->complete = 0;
38 request->match = MPI_REQUEST_NULL;
39 request->flags = flags;
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49 int dst, int tag, MPI_Comm comm)
52 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53 comm, PERSISTENT | SEND);
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59 int src, int tag, MPI_Comm comm)
62 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63 comm, PERSISTENT | RECV);
68 void smpi_mpi_start(MPI_Request request)
70 xbt_assert0(request->complete == 0,
71 "Cannot start a non-finished communication");
72 if ((request->flags & RECV) == RECV) {
73 smpi_process_post_recv(request);
74 print_request("New recv", request);
76 SIMIX_network_irecv(request->rdv, request->buf, &request->size);
78 smpi_process_post_send(request->comm, request); // FIXME
79 print_request("New send", request);
81 SIMIX_network_isend(request->rdv, request->size, -1.0,
82 request->buf, request->size, NULL);
86 void smpi_mpi_startall(int count, MPI_Request * requests)
90 for (i = 0; i < count; i++) {
91 smpi_mpi_start(requests[i]);
95 void smpi_mpi_request_free(MPI_Request * request)
98 *request = MPI_REQUEST_NULL;
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102 int dst, int tag, MPI_Comm comm)
104 MPI_Request request =
105 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106 comm, NON_PERSISTENT | SEND);
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112 int dst, int tag, MPI_Comm comm)
114 MPI_Request request =
115 smpi_isend_init(buf, count, datatype, dst, tag, comm);
117 smpi_mpi_start(request);
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122 int src, int tag, MPI_Comm comm)
124 MPI_Request request =
125 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126 comm, NON_PERSISTENT | RECV);
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132 int src, int tag, MPI_Comm comm)
134 MPI_Request request =
135 smpi_irecv_init(buf, count, datatype, src, tag, comm);
137 smpi_mpi_start(request);
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142 int tag, MPI_Comm comm, MPI_Status * status)
146 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147 smpi_mpi_wait(&request, status);
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151 int tag, MPI_Comm comm)
155 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160 int dst, int sendtag, void *recvbuf, int recvcount,
161 MPI_Datatype recvtype, int src, int recvtag,
162 MPI_Comm comm, MPI_Status * status)
164 MPI_Request requests[2];
168 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
170 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171 smpi_mpi_startall(2, requests);
172 smpi_mpi_waitall(2, requests, stats);
173 if (status != MPI_STATUS_IGNORE) {
174 // Copy receive status
175 memcpy(status, &stats[1], sizeof(MPI_Status));
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
181 return status->count / smpi_datatype_size(datatype);
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
186 if (status != MPI_STATUS_IGNORE) {
187 status->MPI_SOURCE = (*request)->src;
188 status->MPI_TAG = (*request)->tag;
189 status->MPI_ERROR = MPI_SUCCESS;
190 status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
192 SIMIX_communication_destroy((*request)->pair);
193 print_request("finishing wait", *request);
194 if ((*request)->complete == 1) {
195 SIMIX_rdv_destroy((*request)->rdv);
197 (*request)->match->complete = 1;
198 (*request)->match->match = MPI_REQUEST_NULL;
200 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201 smpi_mpi_request_free(request);
203 (*request)->rdv = NULL;
204 (*request)->pair = NULL;
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
210 int flag = (*request)->complete;
213 smpi_mpi_wait(request, status);
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
223 *index = MPI_UNDEFINED;
225 for (i = 0; i < count; i++) {
226 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227 smpi_mpi_wait(&requests[i], status);
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
238 print_request("wait", *request);
239 SIMIX_network_wait((*request)->pair, -1.0);
240 finish_wait(request, status);
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
250 index = MPI_UNDEFINED;
252 // First check for already completed requests
253 for (i = 0; i < count; i++) {
254 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
256 smpi_mpi_wait(&requests[index], status);
260 if (index == MPI_UNDEFINED) {
261 // Otherwise, wait for a request to complete
262 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
263 map = xbt_new(int, count);
265 DEBUG0("Wait for one of");
266 for (i = 0; i < count; i++) {
267 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268 print_request(" ", requests[i]);
269 xbt_dynar_push(comms, &requests[i]->pair);
275 index = SIMIX_network_waitany(comms);
277 finish_wait(&requests[index], status);
280 xbt_dynar_free(&comms);
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
294 index = smpi_mpi_waitany(count, requests, &stat);
295 if (index == MPI_UNDEFINED) {
298 if (status != MPI_STATUS_IGNORE) {
299 memcpy(&status[index], &stat, sizeof(stat));
305 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
311 for (i = 0; i < incount; i++) {
312 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
313 smpi_mpi_wait(&requests[i],
315 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
323 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
326 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
327 nary_tree_bcast(buf, count, datatype, root, comm, 4);
330 void smpi_mpi_barrier(MPI_Comm comm)
332 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
333 nary_tree_barrier(comm, 4);
336 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
337 void *recvbuf, int recvcount, MPI_Datatype recvtype,
338 int root, MPI_Comm comm)
340 int system_tag = 666;
341 int rank, size, src, index, sendsize, recvsize;
342 MPI_Request *requests;
344 rank = smpi_comm_rank(comm);
345 size = smpi_comm_size(comm);
347 // Send buffer to root
348 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
350 sendsize = smpi_datatype_size(sendtype);
351 recvsize = smpi_datatype_size(recvtype);
352 // Local copy from root
353 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
354 sendcount * sendsize * sizeof(char));
355 // Receive buffers from senders
356 requests = xbt_new(MPI_Request, size - 1);
358 for (src = 0; src < size; src++) {
360 requests[index] = smpi_irecv_init(&((char *) recvbuf)
361 [src * recvcount * recvsize],
362 recvcount, recvtype, src,
367 // Wait for completion of irecv's.
368 smpi_mpi_startall(size - 1, requests);
369 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
374 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
375 void *recvbuf, int *recvcounts, int *displs,
376 MPI_Datatype recvtype, int root, MPI_Comm comm)
378 int system_tag = 666;
379 int rank, size, src, index, sendsize;
380 MPI_Request *requests;
382 rank = smpi_comm_rank(comm);
383 size = smpi_comm_size(comm);
385 // Send buffer to root
386 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
388 sendsize = smpi_datatype_size(sendtype);
389 // Local copy from root
390 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
391 sendcount * sendsize * sizeof(char));
392 // Receive buffers from senders
393 requests = xbt_new(MPI_Request, size - 1);
395 for (src = 0; src < size; src++) {
398 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
399 recvcounts[src], recvtype, src, system_tag,
404 // Wait for completion of irecv's.
405 smpi_mpi_startall(size - 1, requests);
406 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
411 void smpi_mpi_allgather(void *sendbuf, int sendcount,
412 MPI_Datatype sendtype, void *recvbuf,
413 int recvcount, MPI_Datatype recvtype,
416 int system_tag = 666;
417 int rank, size, other, index, sendsize, recvsize;
418 MPI_Request *requests;
420 rank = smpi_comm_rank(comm);
421 size = smpi_comm_size(comm);
422 sendsize = smpi_datatype_size(sendtype);
423 recvsize = smpi_datatype_size(recvtype);
424 // Local copy from self
425 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
426 sendcount * sendsize * sizeof(char));
427 // Send/Recv buffers to/from others;
428 requests = xbt_new(MPI_Request, 2 * (size - 1));
430 for (other = 0; other < size; other++) {
433 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
436 requests[index] = smpi_irecv_init(&((char *) recvbuf)
437 [other * recvcount * recvsize],
438 recvcount, recvtype, other,
443 // Wait for completion of all comms.
444 smpi_mpi_startall(2 * (size - 1), requests);
445 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
449 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
450 MPI_Datatype sendtype, void *recvbuf,
451 int *recvcounts, int *displs,
452 MPI_Datatype recvtype, MPI_Comm comm)
454 int system_tag = 666;
455 int rank, size, other, index, sendsize, recvsize;
456 MPI_Request *requests;
458 rank = smpi_comm_rank(comm);
459 size = smpi_comm_size(comm);
460 sendsize = smpi_datatype_size(sendtype);
461 recvsize = smpi_datatype_size(recvtype);
462 // Local copy from self
463 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
464 sendcount * sendsize * sizeof(char));
465 // Send buffers to others;
466 requests = xbt_new(MPI_Request, 2 * (size - 1));
468 for (other = 0; other < size; other++) {
471 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
475 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
476 recvcounts[other], recvtype, other, system_tag,
481 // Wait for completion of all comms.
482 smpi_mpi_startall(2 * (size - 1), requests);
483 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
487 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
488 void *recvbuf, int recvcount, MPI_Datatype recvtype,
489 int root, MPI_Comm comm)
491 int system_tag = 666;
492 int rank, size, dst, index, sendsize, recvsize;
493 MPI_Request *requests;
495 rank = smpi_comm_rank(comm);
496 size = smpi_comm_size(comm);
498 // Recv buffer from root
499 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
502 sendsize = smpi_datatype_size(sendtype);
503 recvsize = smpi_datatype_size(recvtype);
504 // Local copy from root
505 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
506 recvcount * recvsize * sizeof(char));
507 // Send buffers to receivers
508 requests = xbt_new(MPI_Request, size - 1);
510 for (dst = 0; dst < size; dst++) {
512 requests[index] = smpi_isend_init(&((char *) sendbuf)
513 [dst * sendcount * sendsize],
514 sendcount, sendtype, dst,
519 // Wait for completion of isend's.
520 smpi_mpi_startall(size - 1, requests);
521 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
526 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
527 MPI_Datatype sendtype, void *recvbuf, int recvcount,
528 MPI_Datatype recvtype, int root, MPI_Comm comm)
530 int system_tag = 666;
531 int rank, size, dst, index, sendsize, recvsize;
532 MPI_Request *requests;
534 rank = smpi_comm_rank(comm);
535 size = smpi_comm_size(comm);
537 // Recv buffer from root
538 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
541 sendsize = smpi_datatype_size(sendtype);
542 recvsize = smpi_datatype_size(recvtype);
543 // Local copy from root
544 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
545 recvcount * recvsize * sizeof(char));
546 // Send buffers to receivers
547 requests = xbt_new(MPI_Request, size - 1);
549 for (dst = 0; dst < size; dst++) {
552 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
553 sendcounts[dst], sendtype, dst, system_tag,
558 // Wait for completion of isend's.
559 smpi_mpi_startall(size - 1, requests);
560 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
565 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
566 MPI_Datatype datatype, MPI_Op op, int root,
569 int system_tag = 666;
570 int rank, size, src, index, datasize;
571 MPI_Request *requests;
574 rank = smpi_comm_rank(comm);
575 size = smpi_comm_size(comm);
577 // Send buffer to root
578 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
580 datasize = smpi_datatype_size(datatype);
581 // Local copy from root
582 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
583 // Receive buffers from senders
584 //TODO: make a MPI_barrier here ?
585 requests = xbt_new(MPI_Request, size - 1);
586 tmpbufs = xbt_new(void *, size - 1);
588 for (src = 0; src < size; src++) {
590 tmpbufs[index] = xbt_malloc(count * datasize);
592 smpi_irecv_init(tmpbufs[index], count, datatype, src,
597 // Wait for completion of irecv's.
598 smpi_mpi_startall(size - 1, requests);
599 for (src = 0; src < size - 1; src++) {
600 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
601 if (index == MPI_UNDEFINED) {
604 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
606 for (index = 0; index < size - 1; index++) {
607 xbt_free(tmpbufs[index]);
614 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
615 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
617 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
618 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
621 FIXME: buggy implementation
623 int system_tag = 666;
624 int rank, size, other, index, datasize;
625 MPI_Request* requests;
628 rank = smpi_comm_rank(comm);
629 size = smpi_comm_size(comm);
630 datasize = smpi_datatype_size(datatype);
631 // Local copy from self
632 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
633 // Send/Recv buffers to/from others;
634 //TODO: make a MPI_barrier here ?
635 requests = xbt_new(MPI_Request, 2 * (size - 1));
636 tmpbufs = xbt_new(void*, size - 1);
638 for(other = 0; other < size; other++) {
640 tmpbufs[index / 2] = xbt_malloc(count * datasize);
641 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
642 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
646 // Wait for completion of all comms.
647 for(other = 0; other < 2 * (size - 1); other++) {
648 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
649 if(index == MPI_UNDEFINED) {
652 if((index & 1) == 1) {
653 // Request is odd: it's a irecv
654 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
657 for(index = 0; index < size - 1; index++) {
658 xbt_free(tmpbufs[index]);
665 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
666 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
668 int system_tag = 666;
669 int rank, size, other, index, datasize;
671 MPI_Request *requests;
674 rank = smpi_comm_rank(comm);
675 size = smpi_comm_size(comm);
676 datasize = smpi_datatype_size(datatype);
677 // Local copy from self
678 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
679 // Send/Recv buffers to/from others;
680 total = rank + (size - (rank + 1));
681 requests = xbt_new(MPI_Request, total);
682 tmpbufs = xbt_new(void *, rank);
684 for (other = 0; other < rank; other++) {
685 tmpbufs[index] = xbt_malloc(count * datasize);
687 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
691 for (other = rank + 1; other < size; other++) {
693 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
696 // Wait for completion of all comms.
697 smpi_mpi_startall(size - 1, requests);
698 for (other = 0; other < total; other++) {
699 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
700 if (index == MPI_UNDEFINED) {
704 // #Request is below rank: it's a irecv
705 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
708 for (index = 0; index < size - 1; index++) {
709 xbt_free(tmpbufs[index]);