1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 #define EAGER_LIMIT 65536
25 void smpi_process_init(int* argc, char*** argv) {
27 smpi_process_data_t data;
30 proc = SIMIX_process_self();
31 index = atoi((*argv)[1]);
32 data = smpi_process_remote_data(index);
33 SIMIX_process_set_data(proc, data);
36 memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
37 (*argv)[(*argc) - 1] = NULL;
40 DEBUG2("<%d> New process in the game: %p", index, proc);
43 void smpi_process_destroy(void) {
44 int index = smpi_process_index();
46 DEBUG1("<%d> Process left the game", index);
49 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
52 request = xbt_new(s_smpi_mpi_request_t, 1);
54 request->size = smpi_datatype_size(datatype) * count;
61 request->complete = 0;
62 request->match = MPI_REQUEST_NULL;
63 request->flags = flags;
64 if(request->size <= EAGER_LIMIT) {
65 request->ack = MPI_REQUEST_NULL;
67 request->ack = xbt_new(s_smpi_mpi_request_t, 1);
68 request->ack->buf = NULL;
69 request->ack->size = 0;
70 request->ack->src = dst;
71 request->ack->dst = src;
72 request->ack->tag = RDV_TAG;
73 request->ack->comm = comm;
74 request->ack->rdv = NULL;
75 request->ack->pair = NULL;
76 request->ack->complete = 0;
77 request->ack->match = MPI_REQUEST_NULL;
78 request->ack->flags = NON_PERSISTENT | ((request->flags & RECV) == RECV ? SEND : RECV);
79 smpi_mpi_start(request->ack);
84 /* MPI Low level calls */
85 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
86 MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
91 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
92 MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
97 void smpi_mpi_start(MPI_Request request) {
98 xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
99 if(request->size > EAGER_LIMIT) {
100 print_request("RDV ack", request->ack);
101 smpi_mpi_wait(&request->ack, MPI_STATUS_IGNORE);
103 if((request->flags & RECV) == RECV) {
104 smpi_process_post_recv(request);
105 print_request("New recv", request);
106 request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
108 smpi_process_post_send(request->comm, request); // FIXME
109 print_request("New send", request);
110 request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
114 void smpi_mpi_startall(int count, MPI_Request* requests) {
117 for(i = 0; i < count; i++) {
118 smpi_mpi_start(requests[i]);
122 void smpi_mpi_request_free(MPI_Request* request) {
124 *request = MPI_REQUEST_NULL;
127 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
128 MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
133 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
134 MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
136 smpi_mpi_start(request);
140 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
141 MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
146 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
147 MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
149 smpi_mpi_start(request);
153 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
156 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
157 smpi_mpi_wait(&request, status);
160 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
163 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
164 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
167 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
168 MPI_Request requests[2];
171 requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
172 requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
173 smpi_mpi_startall(2, requests);
174 smpi_mpi_waitall(2, requests, stats);
175 if(status != MPI_STATUS_IGNORE) {
176 // Copy receive status
177 memcpy(status, &stats[1], sizeof(MPI_Status));
181 static void finish_wait(MPI_Request* request, MPI_Status* status) {
182 if(status != MPI_STATUS_IGNORE) {
183 status->MPI_SOURCE = (*request)->src;
184 status->MPI_TAG = (*request)->tag;
185 status->MPI_ERROR = MPI_SUCCESS;
187 print_request("finishing wait", *request);
188 if((*request)->complete == 1) {
189 SIMIX_rdv_destroy((*request)->rdv);
191 (*request)->match->complete = 1;
192 (*request)->match->match = MPI_REQUEST_NULL;
194 if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
195 smpi_mpi_request_free(request);
199 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
200 int flag = (*request)->complete;
203 SIMIX_communication_destroy((*request)->pair);
204 finish_wait(request, status);
209 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
212 *index = MPI_UNDEFINED;
214 for(i = 0; i < count; i++) {
215 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
216 SIMIX_communication_destroy(requests[i]->pair);
217 finish_wait(&requests[i], status);
226 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
227 print_request("wait", *request);
228 // data is null if receiver waits before sender enters the rdv
229 if((*request)->complete) {
230 SIMIX_communication_destroy((*request)->pair);
232 SIMIX_network_wait((*request)->pair, -1.0);
234 finish_wait(request, status);
237 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
242 index = MPI_UNDEFINED;
244 // First check for already completed requests
245 for(i = 0; i < count; i++) {
246 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
248 SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
252 if(index == MPI_UNDEFINED) {
253 // Otherwise, wait for a request to complete
254 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
255 map = xbt_new(int, count);
257 DEBUG0("Wait for one of");
258 for(i = 0; i < count; i++) {
259 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
260 print_request(" ", requests[i]);
261 xbt_dynar_push(comms, &requests[i]->pair);
267 index = SIMIX_network_waitany(comms);
271 xbt_dynar_free(&comms);
273 if(index != MPI_UNDEFINED) {
274 finish_wait(&requests[index], status);
280 void smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) {
285 index = smpi_mpi_waitany(count, requests, &stat);
286 if(index == MPI_UNDEFINED) {
289 if(status != MPI_STATUS_IGNORE) {
290 memcpy(&status[index], &stat, sizeof(stat));
292 // FIXME: check this -v
293 // Move the last request to the found position
294 requests[index] = requests[count - 1];
295 requests[count - 1] = MPI_REQUEST_NULL;
300 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
304 for(i = 0; i < incount; i++) {
305 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
306 SIMIX_communication_destroy(requests[i]->pair);
307 finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
315 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
316 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
317 nary_tree_bcast(buf, count, datatype, root, comm, 4);
320 void smpi_mpi_barrier(MPI_Comm comm) {
321 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
322 nary_tree_barrier(comm, 4);
325 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
326 int system_tag = 666;
327 int rank, size, src, index, sendsize, recvsize;
328 MPI_Request* requests;
330 rank = smpi_comm_rank(comm);
331 size = smpi_comm_size(comm);
333 // Send buffer to root
334 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
336 sendsize = smpi_datatype_size(sendtype);
337 recvsize = smpi_datatype_size(recvtype);
338 // Local copy from root
339 memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
340 // Receive buffers from senders
341 requests = xbt_new(MPI_Request, size - 1);
343 for(src = 0; src < size; src++) {
345 requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
349 // Wait for completion of irecv's.
350 smpi_mpi_startall(size - 1, requests);
351 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
356 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
357 int system_tag = 666;
358 int rank, size, src, index, sendsize;
359 MPI_Request* requests;
361 rank = smpi_comm_rank(comm);
362 size = smpi_comm_size(comm);
364 // Send buffer to root
365 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
367 sendsize = smpi_datatype_size(sendtype);
368 // Local copy from root
369 memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
370 // Receive buffers from senders
371 requests = xbt_new(MPI_Request, size - 1);
373 for(src = 0; src < size; src++) {
375 requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
379 // Wait for completion of irecv's.
380 smpi_mpi_startall(size - 1, requests);
381 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
386 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
387 int system_tag = 666;
388 int rank, size, other, index, sendsize, recvsize;
389 MPI_Request* requests;
391 rank = smpi_comm_rank(comm);
392 size = smpi_comm_size(comm);
393 sendsize = smpi_datatype_size(sendtype);
394 recvsize = smpi_datatype_size(recvtype);
395 // Local copy from self
396 memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
397 // Send/Recv buffers to/from others;
398 requests = xbt_new(MPI_Request, 2 * (size - 1));
400 for(other = 0; other < size; other++) {
402 requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
404 requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
408 // Wait for completion of all comms.
409 smpi_mpi_startall(2 * (size - 1), requests);
410 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
414 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
415 int system_tag = 666;
416 int rank, size, other, index, sendsize, recvsize;
417 MPI_Request* requests;
419 rank = smpi_comm_rank(comm);
420 size = smpi_comm_size(comm);
421 sendsize = smpi_datatype_size(sendtype);
422 recvsize = smpi_datatype_size(recvtype);
423 // Local copy from self
424 memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
425 // Send buffers to others;
426 requests = xbt_new(MPI_Request, 2 * (size - 1));
428 for(other = 0; other < size; other++) {
430 requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
432 requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
436 // Wait for completion of all comms.
437 smpi_mpi_startall(2 * (size - 1), requests);
438 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
442 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
443 int system_tag = 666;
444 int rank, size, dst, index, sendsize, recvsize;
445 MPI_Request* requests;
447 rank = smpi_comm_rank(comm);
448 size = smpi_comm_size(comm);
450 // Recv buffer from root
451 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
453 sendsize = smpi_datatype_size(sendtype);
454 recvsize = smpi_datatype_size(recvtype);
455 // Local copy from root
456 memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
457 // Send buffers to receivers
458 requests = xbt_new(MPI_Request, size - 1);
460 for(dst = 0; dst < size; dst++) {
462 requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
466 // Wait for completion of isend's.
467 smpi_mpi_startall(size - 1, requests);
468 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
473 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
474 int system_tag = 666;
475 int rank, size, dst, index, sendsize, recvsize;
476 MPI_Request* requests;
478 rank = smpi_comm_rank(comm);
479 size = smpi_comm_size(comm);
481 // Recv buffer from root
482 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
484 sendsize = smpi_datatype_size(sendtype);
485 recvsize = smpi_datatype_size(recvtype);
486 // Local copy from root
487 memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
488 // Send buffers to receivers
489 requests = xbt_new(MPI_Request, size - 1);
491 for(dst = 0; dst < size; dst++) {
493 requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
497 // Wait for completion of isend's.
498 smpi_mpi_startall(size - 1, requests);
499 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
504 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
505 int system_tag = 666;
506 int rank, size, src, index, datasize;
507 MPI_Request* requests;
510 rank = smpi_comm_rank(comm);
511 size = smpi_comm_size(comm);
513 // Send buffer to root
514 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
516 datasize = smpi_datatype_size(datatype);
517 // Local copy from root
518 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
519 // Receive buffers from senders
520 //TODO: make a MPI_barrier here ?
521 requests = xbt_new(MPI_Request, size - 1);
522 tmpbufs = xbt_new(void*, size - 1);
524 for(src = 0; src < size; src++) {
526 tmpbufs[index] = xbt_malloc(count * datasize);
527 requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
531 // Wait for completion of irecv's.
532 smpi_mpi_startall(size - 1, requests);
533 for(src = 0; src < size - 1; src++) {
534 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
535 if(index == MPI_UNDEFINED) {
538 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
540 for(index = 0; index < size - 1; index++) {
541 xbt_free(tmpbufs[index]);
548 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
549 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
550 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
553 FIXME: buggy implementation
555 int system_tag = 666;
556 int rank, size, other, index, datasize;
557 MPI_Request* requests;
560 rank = smpi_comm_rank(comm);
561 size = smpi_comm_size(comm);
562 datasize = smpi_datatype_size(datatype);
563 // Local copy from self
564 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
565 // Send/Recv buffers to/from others;
566 //TODO: make a MPI_barrier here ?
567 requests = xbt_new(MPI_Request, 2 * (size - 1));
568 tmpbufs = xbt_new(void*, size - 1);
570 for(other = 0; other < size; other++) {
572 tmpbufs[index / 2] = xbt_malloc(count * datasize);
573 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
574 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
578 // Wait for completion of all comms.
579 for(other = 0; other < 2 * (size - 1); other++) {
580 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
581 if(index == MPI_UNDEFINED) {
584 if((index & 1) == 1) {
585 // Request is odd: it's a irecv
586 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
589 for(index = 0; index < size - 1; index++) {
590 xbt_free(tmpbufs[index]);
597 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
598 int system_tag = 666;
599 int rank, size, other, index, datasize;
601 MPI_Request* requests;
604 rank = smpi_comm_rank(comm);
605 size = smpi_comm_size(comm);
606 datasize = smpi_datatype_size(datatype);
607 // Local copy from self
608 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
609 // Send/Recv buffers to/from others;
610 total = rank + (size - (rank + 1));
611 requests = xbt_new(MPI_Request, total);
612 tmpbufs = xbt_new(void*, rank);
614 for(other = 0; other < rank; other++) {
615 tmpbufs[index] = xbt_malloc(count * datasize);
616 requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
619 for(other = rank + 1; other < size; other++) {
620 requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
623 // Wait for completion of all comms.
624 smpi_mpi_startall(size - 1, requests);
625 for(other = 0; other < total; other++) {
626 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
627 if(index == MPI_UNDEFINED) {
631 // #Request is below rank: it's a irecv
632 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
635 for(index = 0; index < size - 1; index++) {
636 xbt_free(tmpbufs[index]);