1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 void smpi_process_init(int *argc, char ***argv)
25 smpi_process_data_t data;
28 proc = SIMIX_process_self();
29 index = atoi((*argv)[1]);
30 data = smpi_process_remote_data(index);
31 SIMIX_process_set_data(proc, data);
34 memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
35 (*argv)[(*argc) - 1] = NULL;
38 DEBUG2("<%d> New process in the game: %p", index, proc);
41 void smpi_process_destroy(void)
43 int index = smpi_process_index();
45 DEBUG1("<%d> Process left the game", index);
48 static MPI_Request build_request(void *buf, int count,
49 MPI_Datatype datatype, int src, int dst,
50 int tag, MPI_Comm comm, unsigned flags)
54 request = xbt_new(s_smpi_mpi_request_t, 1);
56 request->size = smpi_datatype_size(datatype) * count;
63 request->complete = 0;
64 request->match = MPI_REQUEST_NULL;
65 request->flags = flags;
73 /* MPI Low level calls */
74 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
75 int dst, int tag, MPI_Comm comm)
78 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
79 comm, PERSISTENT | SEND);
84 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
85 int src, int tag, MPI_Comm comm)
88 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
89 comm, PERSISTENT | RECV);
94 void smpi_mpi_start(MPI_Request request)
96 xbt_assert0(request->complete == 0,
97 "Cannot start a non-finished communication");
98 if ((request->flags & RECV) == RECV) {
99 smpi_process_post_recv(request);
100 print_request("New recv", request);
102 SIMIX_network_irecv(request->rdv, request->buf, &request->size);
104 smpi_process_post_send(request->comm, request); // FIXME
105 print_request("New send", request);
107 SIMIX_network_isend(request->rdv, request->size, -1.0,
108 request->buf, request->size, NULL);
112 void smpi_mpi_startall(int count, MPI_Request * requests)
116 for (i = 0; i < count; i++) {
117 smpi_mpi_start(requests[i]);
121 void smpi_mpi_request_free(MPI_Request * request)
124 *request = MPI_REQUEST_NULL;
127 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
128 int dst, int tag, MPI_Comm comm)
130 MPI_Request request =
131 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132 comm, NON_PERSISTENT | SEND);
137 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
138 int dst, int tag, MPI_Comm comm)
140 MPI_Request request =
141 smpi_isend_init(buf, count, datatype, dst, tag, comm);
143 smpi_mpi_start(request);
147 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
148 int src, int tag, MPI_Comm comm)
150 MPI_Request request =
151 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
152 comm, NON_PERSISTENT | RECV);
157 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
158 int src, int tag, MPI_Comm comm)
160 MPI_Request request =
161 smpi_irecv_init(buf, count, datatype, src, tag, comm);
163 smpi_mpi_start(request);
167 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
168 int tag, MPI_Comm comm, MPI_Status * status)
172 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
173 smpi_mpi_wait(&request, status);
176 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
177 int tag, MPI_Comm comm)
181 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
182 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
185 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
186 int dst, int sendtag, void *recvbuf, int recvcount,
187 MPI_Datatype recvtype, int src, int recvtag,
188 MPI_Comm comm, MPI_Status * status)
190 MPI_Request requests[2];
194 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
196 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
197 smpi_mpi_startall(2, requests);
198 smpi_mpi_waitall(2, requests, stats);
199 if (status != MPI_STATUS_IGNORE) {
200 // Copy receive status
201 memcpy(status, &stats[1], sizeof(MPI_Status));
205 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
207 return status->count / smpi_datatype_size(datatype);
210 static void finish_wait(MPI_Request * request, MPI_Status * status)
212 if (status != MPI_STATUS_IGNORE) {
213 status->MPI_SOURCE = (*request)->src;
214 status->MPI_TAG = (*request)->tag;
215 status->MPI_ERROR = MPI_SUCCESS;
216 status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
218 print_request("finishing wait", *request);
219 if ((*request)->complete == 1) {
220 SIMIX_rdv_destroy((*request)->rdv);
222 (*request)->match->complete = 1;
223 (*request)->match->match = MPI_REQUEST_NULL;
225 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
226 smpi_mpi_request_free(request);
228 (*request)->rdv = NULL;
229 (*request)->pair = NULL;
233 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
235 int flag = (*request)->complete;
238 smpi_mpi_wait(request, status);
243 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
248 *index = MPI_UNDEFINED;
250 for (i = 0; i < count; i++) {
251 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
252 smpi_mpi_wait(&requests[i], status);
261 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
263 print_request("wait", *request);
264 SIMIX_network_wait((*request)->pair, -1.0);
265 finish_wait(request, status);
268 int smpi_mpi_waitany(int count, MPI_Request requests[],
275 index = MPI_UNDEFINED;
277 // First check for already completed requests
278 for (i = 0; i < count; i++) {
279 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
281 smpi_mpi_wait(&requests[index], status);
285 if (index == MPI_UNDEFINED) {
286 // Otherwise, wait for a request to complete
287 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
288 map = xbt_new(int, count);
290 DEBUG0("Wait for one of");
291 for (i = 0; i < count; i++) {
292 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
293 print_request(" ", requests[i]);
294 xbt_dynar_push(comms, &requests[i]->pair);
300 index = SIMIX_network_waitany(comms);
302 finish_wait(&requests[index], status);
305 xbt_dynar_free(&comms);
311 void smpi_mpi_waitall(int count, MPI_Request requests[],
318 index = smpi_mpi_waitany(count, requests, &stat);
319 if (index == MPI_UNDEFINED) {
322 if (status != MPI_STATUS_IGNORE) {
323 memcpy(&status[index], &stat, sizeof(stat));
325 // FIXME: check this -v
326 // Move the last request to the found position
327 requests[index] = requests[count - 1];
328 requests[count - 1] = MPI_REQUEST_NULL;
333 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
339 for (i = 0; i < incount; i++) {
340 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
341 smpi_mpi_wait(&requests[i],
343 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
351 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
354 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
355 nary_tree_bcast(buf, count, datatype, root, comm, 4);
358 void smpi_mpi_barrier(MPI_Comm comm)
360 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
361 nary_tree_barrier(comm, 4);
364 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
365 void *recvbuf, int recvcount, MPI_Datatype recvtype,
366 int root, MPI_Comm comm)
368 int system_tag = 666;
369 int rank, size, src, index, sendsize, recvsize;
370 MPI_Request *requests;
372 rank = smpi_comm_rank(comm);
373 size = smpi_comm_size(comm);
375 // Send buffer to root
376 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
378 sendsize = smpi_datatype_size(sendtype);
379 recvsize = smpi_datatype_size(recvtype);
380 // Local copy from root
381 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
382 sendcount * sendsize * sizeof(char));
383 // Receive buffers from senders
384 requests = xbt_new(MPI_Request, size - 1);
386 for (src = 0; src < size; src++) {
388 requests[index] = smpi_irecv_init(&((char *) recvbuf)
389 [src * recvcount * recvsize],
390 recvcount, recvtype, src,
395 // Wait for completion of irecv's.
396 smpi_mpi_startall(size - 1, requests);
397 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
402 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
403 void *recvbuf, int *recvcounts, int *displs,
404 MPI_Datatype recvtype, int root, MPI_Comm comm)
406 int system_tag = 666;
407 int rank, size, src, index, sendsize;
408 MPI_Request *requests;
410 rank = smpi_comm_rank(comm);
411 size = smpi_comm_size(comm);
413 // Send buffer to root
414 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
416 sendsize = smpi_datatype_size(sendtype);
417 // Local copy from root
418 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
419 sendcount * sendsize * sizeof(char));
420 // Receive buffers from senders
421 requests = xbt_new(MPI_Request, size - 1);
423 for (src = 0; src < size; src++) {
426 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
427 recvcounts[src], recvtype, src, system_tag,
432 // Wait for completion of irecv's.
433 smpi_mpi_startall(size - 1, requests);
434 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
439 void smpi_mpi_allgather(void *sendbuf, int sendcount,
440 MPI_Datatype sendtype, void *recvbuf,
441 int recvcount, MPI_Datatype recvtype,
444 int system_tag = 666;
445 int rank, size, other, index, sendsize, recvsize;
446 MPI_Request *requests;
448 rank = smpi_comm_rank(comm);
449 size = smpi_comm_size(comm);
450 sendsize = smpi_datatype_size(sendtype);
451 recvsize = smpi_datatype_size(recvtype);
452 // Local copy from self
453 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
454 sendcount * sendsize * sizeof(char));
455 // Send/Recv buffers to/from others;
456 requests = xbt_new(MPI_Request, 2 * (size - 1));
458 for (other = 0; other < size; other++) {
461 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
464 requests[index] = smpi_irecv_init(&((char *) recvbuf)
465 [other * recvcount * recvsize],
466 recvcount, recvtype, other,
471 // Wait for completion of all comms.
472 smpi_mpi_startall(2 * (size - 1), requests);
473 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
477 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
478 MPI_Datatype sendtype, void *recvbuf,
479 int *recvcounts, int *displs,
480 MPI_Datatype recvtype, MPI_Comm comm)
482 int system_tag = 666;
483 int rank, size, other, index, sendsize, recvsize;
484 MPI_Request *requests;
486 rank = smpi_comm_rank(comm);
487 size = smpi_comm_size(comm);
488 sendsize = smpi_datatype_size(sendtype);
489 recvsize = smpi_datatype_size(recvtype);
490 // Local copy from self
491 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
492 sendcount * sendsize * sizeof(char));
493 // Send buffers to others;
494 requests = xbt_new(MPI_Request, 2 * (size - 1));
496 for (other = 0; other < size; other++) {
499 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
503 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
504 recvcounts[other], recvtype, other, system_tag,
509 // Wait for completion of all comms.
510 smpi_mpi_startall(2 * (size - 1), requests);
511 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
515 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
516 void *recvbuf, int recvcount, MPI_Datatype recvtype,
517 int root, MPI_Comm comm)
519 int system_tag = 666;
520 int rank, size, dst, index, sendsize, recvsize;
521 MPI_Request *requests;
523 rank = smpi_comm_rank(comm);
524 size = smpi_comm_size(comm);
526 // Recv buffer from root
527 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
530 sendsize = smpi_datatype_size(sendtype);
531 recvsize = smpi_datatype_size(recvtype);
532 // Local copy from root
533 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
534 recvcount * recvsize * sizeof(char));
535 // Send buffers to receivers
536 requests = xbt_new(MPI_Request, size - 1);
538 for (dst = 0; dst < size; dst++) {
540 requests[index] = smpi_isend_init(&((char *) sendbuf)
541 [dst * sendcount * sendsize],
542 sendcount, sendtype, dst,
547 // Wait for completion of isend's.
548 smpi_mpi_startall(size - 1, requests);
549 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
554 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
555 MPI_Datatype sendtype, void *recvbuf, int recvcount,
556 MPI_Datatype recvtype, int root, MPI_Comm comm)
558 int system_tag = 666;
559 int rank, size, dst, index, sendsize, recvsize;
560 MPI_Request *requests;
562 rank = smpi_comm_rank(comm);
563 size = smpi_comm_size(comm);
565 // Recv buffer from root
566 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
569 sendsize = smpi_datatype_size(sendtype);
570 recvsize = smpi_datatype_size(recvtype);
571 // Local copy from root
572 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
573 recvcount * recvsize * sizeof(char));
574 // Send buffers to receivers
575 requests = xbt_new(MPI_Request, size - 1);
577 for (dst = 0; dst < size; dst++) {
580 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
581 sendcounts[dst], sendtype, dst, system_tag,
586 // Wait for completion of isend's.
587 smpi_mpi_startall(size - 1, requests);
588 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
593 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
594 MPI_Datatype datatype, MPI_Op op, int root,
597 int system_tag = 666;
598 int rank, size, src, index, datasize;
599 MPI_Request *requests;
602 rank = smpi_comm_rank(comm);
603 size = smpi_comm_size(comm);
605 // Send buffer to root
606 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
608 datasize = smpi_datatype_size(datatype);
609 // Local copy from root
610 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
611 // Receive buffers from senders
612 //TODO: make a MPI_barrier here ?
613 requests = xbt_new(MPI_Request, size - 1);
614 tmpbufs = xbt_new(void *, size - 1);
616 for (src = 0; src < size; src++) {
618 tmpbufs[index] = xbt_malloc(count * datasize);
620 smpi_irecv_init(tmpbufs[index], count, datatype, src,
625 // Wait for completion of irecv's.
626 smpi_mpi_startall(size - 1, requests);
627 for (src = 0; src < size - 1; src++) {
628 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
629 if (index == MPI_UNDEFINED) {
632 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
634 for (index = 0; index < size - 1; index++) {
635 xbt_free(tmpbufs[index]);
642 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
643 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
645 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
646 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
649 FIXME: buggy implementation
651 int system_tag = 666;
652 int rank, size, other, index, datasize;
653 MPI_Request* requests;
656 rank = smpi_comm_rank(comm);
657 size = smpi_comm_size(comm);
658 datasize = smpi_datatype_size(datatype);
659 // Local copy from self
660 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
661 // Send/Recv buffers to/from others;
662 //TODO: make a MPI_barrier here ?
663 requests = xbt_new(MPI_Request, 2 * (size - 1));
664 tmpbufs = xbt_new(void*, size - 1);
666 for(other = 0; other < size; other++) {
668 tmpbufs[index / 2] = xbt_malloc(count * datasize);
669 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
670 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
674 // Wait for completion of all comms.
675 for(other = 0; other < 2 * (size - 1); other++) {
676 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
677 if(index == MPI_UNDEFINED) {
680 if((index & 1) == 1) {
681 // Request is odd: it's a irecv
682 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
685 for(index = 0; index < size - 1; index++) {
686 xbt_free(tmpbufs[index]);
693 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
694 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
696 int system_tag = 666;
697 int rank, size, other, index, datasize;
699 MPI_Request *requests;
702 rank = smpi_comm_rank(comm);
703 size = smpi_comm_size(comm);
704 datasize = smpi_datatype_size(datatype);
705 // Local copy from self
706 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
707 // Send/Recv buffers to/from others;
708 total = rank + (size - (rank + 1));
709 requests = xbt_new(MPI_Request, total);
710 tmpbufs = xbt_new(void *, rank);
712 for (other = 0; other < rank; other++) {
713 tmpbufs[index] = xbt_malloc(count * datasize);
715 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
719 for (other = rank + 1; other < size; other++) {
721 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
724 // Wait for completion of all comms.
725 smpi_mpi_startall(size - 1, requests);
726 for (other = 0; other < total; other++) {
727 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
728 if (index == MPI_UNDEFINED) {
732 // #Request is below rank: it's a irecv
733 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
736 for (index = 0; index < size - 1; index++) {
737 xbt_free(tmpbufs[index]);