1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 static MPI_Request build_request(void *buf, int count,
23 MPI_Datatype datatype, int src, int dst,
24 int tag, MPI_Comm comm, unsigned flags)
28 request = xbt_new(s_smpi_mpi_request_t, 1);
30 request->size = smpi_datatype_size(datatype) * count;
37 request->complete = 0;
38 request->match = MPI_REQUEST_NULL;
39 request->flags = flags;
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49 int dst, int tag, MPI_Comm comm)
52 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53 comm, PERSISTENT | SEND);
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59 int src, int tag, MPI_Comm comm)
62 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63 comm, PERSISTENT | RECV);
68 void smpi_mpi_start(MPI_Request request)
70 xbt_assert0(request->complete == 0,
71 "Cannot start a non-finished communication");
72 if ((request->flags & RECV) == RECV) {
73 smpi_process_post_recv(request);
74 print_request("New recv", request);
76 SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size);
78 smpi_process_post_send(request->comm, request); // FIXME
79 print_request("New send", request);
81 SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
82 request->buf, request->size, NULL);
84 SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
89 void smpi_mpi_startall(int count, MPI_Request * requests)
93 for (i = 0; i < count; i++) {
94 smpi_mpi_start(requests[i]);
98 void smpi_mpi_request_free(MPI_Request * request)
101 *request = MPI_REQUEST_NULL;
104 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
105 int dst, int tag, MPI_Comm comm)
107 MPI_Request request =
108 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
109 comm, NON_PERSISTENT | SEND);
114 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
115 int dst, int tag, MPI_Comm comm)
117 MPI_Request request =
118 smpi_isend_init(buf, count, datatype, dst, tag, comm);
120 smpi_mpi_start(request);
124 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
125 int src, int tag, MPI_Comm comm)
127 MPI_Request request =
128 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
129 comm, NON_PERSISTENT | RECV);
134 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
135 int src, int tag, MPI_Comm comm)
137 MPI_Request request =
138 smpi_irecv_init(buf, count, datatype, src, tag, comm);
140 smpi_mpi_start(request);
144 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
145 int tag, MPI_Comm comm, MPI_Status * status)
149 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
150 smpi_mpi_wait(&request, status);
153 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
154 int tag, MPI_Comm comm)
158 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
159 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
162 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
163 int dst, int sendtag, void *recvbuf, int recvcount,
164 MPI_Datatype recvtype, int src, int recvtag,
165 MPI_Comm comm, MPI_Status * status)
167 MPI_Request requests[2];
171 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
173 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
174 smpi_mpi_startall(2, requests);
175 smpi_mpi_waitall(2, requests, stats);
176 if (status != MPI_STATUS_IGNORE) {
177 // Copy receive status
178 memcpy(status, &stats[1], sizeof(MPI_Status));
182 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
184 return status->count / smpi_datatype_size(datatype);
187 static void finish_wait(MPI_Request * request, MPI_Status * status)
189 if (status != MPI_STATUS_IGNORE) {
190 status->MPI_SOURCE = (*request)->src;
191 status->MPI_TAG = (*request)->tag;
192 status->MPI_ERROR = MPI_SUCCESS;
193 status->count = SIMIX_req_comm_get_dst_buff_size((*request)->pair);
195 SIMIX_req_comm_destroy((*request)->pair);
196 print_request("finishing wait", *request);
197 if ((*request)->complete == 1) {
198 SIMIX_req_rdv_destroy((*request)->rdv);
200 (*request)->match->complete = 1;
201 (*request)->match->match = MPI_REQUEST_NULL;
203 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
204 smpi_mpi_request_free(request);
206 (*request)->rdv = NULL;
207 (*request)->pair = NULL;
211 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
213 int flag = (*request)->complete;
216 smpi_mpi_wait(request, status);
221 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
226 *index = MPI_UNDEFINED;
228 for (i = 0; i < count; i++) {
229 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
230 smpi_mpi_wait(&requests[i], status);
239 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
241 print_request("wait", *request);
242 SIMIX_req_comm_wait((*request)->pair, -1.0);
243 finish_wait(request, status);
246 int smpi_mpi_waitany(int count, MPI_Request requests[],
253 index = MPI_UNDEFINED;
255 // First check for already completed requests
256 for (i = 0; i < count; i++) {
257 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
259 smpi_mpi_wait(&requests[index], status);
263 if (index == MPI_UNDEFINED) {
264 // Otherwise, wait for a request to complete
265 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
266 map = xbt_new(int, count);
268 DEBUG0("Wait for one of");
269 for (i = 0; i < count; i++) {
270 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
271 print_request(" ", requests[i]);
272 xbt_dynar_push(comms, &requests[i]->pair);
278 index = SIMIX_req_comm_waitany(comms);
280 finish_wait(&requests[index], status);
283 xbt_dynar_free(&comms);
289 void smpi_mpi_waitall(int count, MPI_Request requests[],
294 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
298 index = smpi_mpi_waitany(count, requests, pstat);
299 if (index == MPI_UNDEFINED) {
302 if (status != MPI_STATUS_IGNORE) {
303 memcpy(&status[index], pstat, sizeof *pstat);
309 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
315 for (i = 0; i < incount; i++) {
316 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
317 smpi_mpi_wait(&requests[i],
319 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
327 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
330 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
331 nary_tree_bcast(buf, count, datatype, root, comm, 4);
334 void smpi_mpi_barrier(MPI_Comm comm)
336 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
337 nary_tree_barrier(comm, 4);
340 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
341 void *recvbuf, int recvcount, MPI_Datatype recvtype,
342 int root, MPI_Comm comm)
344 int system_tag = 666;
345 int rank, size, src, index, sendsize, recvsize;
346 MPI_Request *requests;
348 rank = smpi_comm_rank(comm);
349 size = smpi_comm_size(comm);
351 // Send buffer to root
352 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
354 sendsize = smpi_datatype_size(sendtype);
355 recvsize = smpi_datatype_size(recvtype);
356 // Local copy from root
357 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
358 sendcount * sendsize * sizeof(char));
359 // Receive buffers from senders
360 requests = xbt_new(MPI_Request, size - 1);
362 for (src = 0; src < size; src++) {
364 requests[index] = smpi_irecv_init(&((char *) recvbuf)
365 [src * recvcount * recvsize],
366 recvcount, recvtype, src,
371 // Wait for completion of irecv's.
372 smpi_mpi_startall(size - 1, requests);
373 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
378 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
379 void *recvbuf, int *recvcounts, int *displs,
380 MPI_Datatype recvtype, int root, MPI_Comm comm)
382 int system_tag = 666;
383 int rank, size, src, index, sendsize;
384 MPI_Request *requests;
386 rank = smpi_comm_rank(comm);
387 size = smpi_comm_size(comm);
389 // Send buffer to root
390 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
392 sendsize = smpi_datatype_size(sendtype);
393 // Local copy from root
394 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
395 sendcount * sendsize * sizeof(char));
396 // Receive buffers from senders
397 requests = xbt_new(MPI_Request, size - 1);
399 for (src = 0; src < size; src++) {
402 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
403 recvcounts[src], recvtype, src, system_tag,
408 // Wait for completion of irecv's.
409 smpi_mpi_startall(size - 1, requests);
410 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
415 void smpi_mpi_allgather(void *sendbuf, int sendcount,
416 MPI_Datatype sendtype, void *recvbuf,
417 int recvcount, MPI_Datatype recvtype,
420 int system_tag = 666;
421 int rank, size, other, index, sendsize, recvsize;
422 MPI_Request *requests;
424 rank = smpi_comm_rank(comm);
425 size = smpi_comm_size(comm);
426 sendsize = smpi_datatype_size(sendtype);
427 recvsize = smpi_datatype_size(recvtype);
428 // Local copy from self
429 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
430 sendcount * sendsize * sizeof(char));
431 // Send/Recv buffers to/from others;
432 requests = xbt_new(MPI_Request, 2 * (size - 1));
434 for (other = 0; other < size; other++) {
437 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
440 requests[index] = smpi_irecv_init(&((char *) recvbuf)
441 [other * recvcount * recvsize],
442 recvcount, recvtype, other,
447 // Wait for completion of all comms.
448 smpi_mpi_startall(2 * (size - 1), requests);
449 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
453 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
454 MPI_Datatype sendtype, void *recvbuf,
455 int *recvcounts, int *displs,
456 MPI_Datatype recvtype, MPI_Comm comm)
458 int system_tag = 666;
459 int rank, size, other, index, sendsize, recvsize;
460 MPI_Request *requests;
462 rank = smpi_comm_rank(comm);
463 size = smpi_comm_size(comm);
464 sendsize = smpi_datatype_size(sendtype);
465 recvsize = smpi_datatype_size(recvtype);
466 // Local copy from self
467 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
468 sendcount * sendsize * sizeof(char));
469 // Send buffers to others;
470 requests = xbt_new(MPI_Request, 2 * (size - 1));
472 for (other = 0; other < size; other++) {
475 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
479 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
480 recvcounts[other], recvtype, other, system_tag,
485 // Wait for completion of all comms.
486 smpi_mpi_startall(2 * (size - 1), requests);
487 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
491 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
492 void *recvbuf, int recvcount, MPI_Datatype recvtype,
493 int root, MPI_Comm comm)
495 int system_tag = 666;
496 int rank, size, dst, index, sendsize, recvsize;
497 MPI_Request *requests;
499 rank = smpi_comm_rank(comm);
500 size = smpi_comm_size(comm);
502 // Recv buffer from root
503 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
506 sendsize = smpi_datatype_size(sendtype);
507 recvsize = smpi_datatype_size(recvtype);
508 // Local copy from root
509 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
510 recvcount * recvsize * sizeof(char));
511 // Send buffers to receivers
512 requests = xbt_new(MPI_Request, size - 1);
514 for (dst = 0; dst < size; dst++) {
516 requests[index] = smpi_isend_init(&((char *) sendbuf)
517 [dst * sendcount * sendsize],
518 sendcount, sendtype, dst,
523 // Wait for completion of isend's.
524 smpi_mpi_startall(size - 1, requests);
525 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
530 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
531 MPI_Datatype sendtype, void *recvbuf, int recvcount,
532 MPI_Datatype recvtype, int root, MPI_Comm comm)
534 int system_tag = 666;
535 int rank, size, dst, index, sendsize, recvsize;
536 MPI_Request *requests;
538 rank = smpi_comm_rank(comm);
539 size = smpi_comm_size(comm);
541 // Recv buffer from root
542 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
545 sendsize = smpi_datatype_size(sendtype);
546 recvsize = smpi_datatype_size(recvtype);
547 // Local copy from root
548 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
549 recvcount * recvsize * sizeof(char));
550 // Send buffers to receivers
551 requests = xbt_new(MPI_Request, size - 1);
553 for (dst = 0; dst < size; dst++) {
556 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
557 sendcounts[dst], sendtype, dst, system_tag,
562 // Wait for completion of isend's.
563 smpi_mpi_startall(size - 1, requests);
564 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
569 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
570 MPI_Datatype datatype, MPI_Op op, int root,
573 int system_tag = 666;
574 int rank, size, src, index, datasize;
575 MPI_Request *requests;
578 rank = smpi_comm_rank(comm);
579 size = smpi_comm_size(comm);
581 // Send buffer to root
582 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
584 datasize = smpi_datatype_size(datatype);
585 // Local copy from root
586 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
587 // Receive buffers from senders
588 //TODO: make a MPI_barrier here ?
589 requests = xbt_new(MPI_Request, size - 1);
590 tmpbufs = xbt_new(void *, size - 1);
592 for (src = 0; src < size; src++) {
594 tmpbufs[index] = xbt_malloc(count * datasize);
596 smpi_irecv_init(tmpbufs[index], count, datatype, src,
601 // Wait for completion of irecv's.
602 smpi_mpi_startall(size - 1, requests);
603 for (src = 0; src < size - 1; src++) {
604 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
605 if (index == MPI_UNDEFINED) {
608 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
610 for (index = 0; index < size - 1; index++) {
611 xbt_free(tmpbufs[index]);
618 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
619 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
621 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
622 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
625 FIXME: buggy implementation
627 int system_tag = 666;
628 int rank, size, other, index, datasize;
629 MPI_Request* requests;
632 rank = smpi_comm_rank(comm);
633 size = smpi_comm_size(comm);
634 datasize = smpi_datatype_size(datatype);
635 // Local copy from self
636 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
637 // Send/Recv buffers to/from others;
638 //TODO: make a MPI_barrier here ?
639 requests = xbt_new(MPI_Request, 2 * (size - 1));
640 tmpbufs = xbt_new(void*, size - 1);
642 for(other = 0; other < size; other++) {
644 tmpbufs[index / 2] = xbt_malloc(count * datasize);
645 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
646 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
650 // Wait for completion of all comms.
651 for(other = 0; other < 2 * (size - 1); other++) {
652 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
653 if(index == MPI_UNDEFINED) {
656 if((index & 1) == 1) {
657 // Request is odd: it's a irecv
658 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
661 for(index = 0; index < size - 1; index++) {
662 xbt_free(tmpbufs[index]);
669 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
670 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
672 int system_tag = 666;
673 int rank, size, other, index, datasize;
675 MPI_Request *requests;
678 rank = smpi_comm_rank(comm);
679 size = smpi_comm_size(comm);
680 datasize = smpi_datatype_size(datatype);
681 // Local copy from self
682 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
683 // Send/Recv buffers to/from others;
684 total = rank + (size - (rank + 1));
685 requests = xbt_new(MPI_Request, total);
686 tmpbufs = xbt_new(void *, rank);
688 for (other = 0; other < rank; other++) {
689 tmpbufs[index] = xbt_malloc(count * datasize);
691 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
695 for (other = rank + 1; other < size; other++) {
697 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
700 // Wait for completion of all comms.
701 smpi_mpi_startall(size - 1, requests);
702 for (other = 0; other < total; other++) {
703 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
704 if (index == MPI_UNDEFINED) {
708 // #Request is below rank: it's a irecv
709 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
712 for (index = 0; index < size - 1; index++) {
713 xbt_free(tmpbufs[index]);