4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
16 #define EAGER_LIMIT 65536
19 void smpi_process_init(int* argc, char*** argv) {
21 smpi_process_data_t data;
24 proc = SIMIX_process_self();
25 index = atoi((*argv)[1]);
26 data = smpi_process_remote_data(index);
27 SIMIX_process_set_data(proc, data);
30 memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
31 (*argv)[(*argc) - 1] = NULL;
34 DEBUG2("<%d> New process in the game: %p", index, proc);
37 void smpi_process_destroy(void) {
38 int index = smpi_process_index();
40 DEBUG1("<%d> Process left the game", index);
43 /* MPI Low level calls */
44 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
46 size_t size = smpi_datatype_size(datatype) * count;
48 if (size > EAGER_LIMIT) {
49 /* Warning: this (zero-length synchronous) call will come back here with size == 0 */
50 DEBUG1("RDV send to %d", dst);
52 request = xbt_new(s_smpi_mpi_request_t, 1);
54 request->src = smpi_comm_rank(comm);
58 request->complete = 0;
59 request->match = MPI_REQUEST_NULL;
60 smpi_process_post_send(comm, request);
61 DEBUG3("NEW send request %p with rdv %p and match %p", request, request->rdv, request->match);
62 request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, NULL);
66 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
68 size_t size = smpi_datatype_size(datatype) * count;
70 if (size > EAGER_LIMIT) {
71 /* Warning: this (zero-length synchronous) call will come back here with size == 0 */
72 DEBUG1("RDV recv from %d", src);
74 request = xbt_new(s_smpi_mpi_request_t, 1);
77 request->dst = smpi_comm_rank(comm);
80 request->complete = 0;
81 request->match = MPI_REQUEST_NULL;
82 smpi_process_post_recv(request);
83 DEBUG3("NEW recv request %p with rdv %p and match %p", request, request->rdv, request->match);
84 request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
88 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
91 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
92 smpi_mpi_wait(&request, status);
95 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
98 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
99 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
102 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
103 MPI_Request requests[2];
106 requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
107 requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
108 smpi_mpi_waitall(2, requests, stats);
109 if(status != MPI_STATUS_IGNORE) {
110 // Copy receive status
111 memcpy(status, &stats[1], sizeof(MPI_Status));
115 static void finish_wait(MPI_Request* request, MPI_Status* status) {
116 if(status != MPI_STATUS_IGNORE) {
117 status->MPI_SOURCE = (*request)->src;
118 status->MPI_TAG = (*request)->tag;
119 status->MPI_ERROR = MPI_SUCCESS;
121 DEBUG5("finishing wait for %p [src = %d, dst = %d, tag = %d, complete = %d]",
122 *request, (*request)->src, (*request)->dst, (*request)->tag, (*request)->complete);
123 DEBUG2("match %p, complete %d", (*request)->match, (*request)->match ? (*request)->match->complete : -1);
124 if((*request)->complete == 1) {
125 SIMIX_rdv_destroy((*request)->rdv);
127 (*request)->match->complete = 1;
128 (*request)->match->match = MPI_REQUEST_NULL;
131 *request = MPI_REQUEST_NULL;
134 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
135 int flag = (*request)->complete;
138 SIMIX_communication_destroy((*request)->pair);
139 finish_wait(request, status);
144 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
147 *index = MPI_UNDEFINED;
149 for(i = 0; i < count; i++) {
150 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
151 SIMIX_communication_destroy(requests[i]->pair);
152 finish_wait(&requests[i], status);
161 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
162 DEBUG6("wait for request %p (%p) [src = %d, dst = %d, tag = %d, complete = %d]",
163 *request, (*request)->pair, (*request)->src, (*request)->dst, (*request)->tag, (*request)->complete);
164 // data is null if receiver waits before sender enters the rdv
165 if((*request)->complete) {
166 SIMIX_communication_destroy((*request)->pair);
168 SIMIX_network_wait((*request)->pair, -1.0);
170 finish_wait(request, status);
173 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
178 index = MPI_UNDEFINED;
180 // First check for already completed requests
181 for(i = 0; i < count; i++) {
182 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
184 SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
188 if(index == MPI_UNDEFINED) {
189 // Otherwise, wait for a request to complete
190 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
191 map = xbt_new(int, count);
193 DEBUG0("Wait for one of");
194 for(i = 0; i < count; i++) {
195 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
196 DEBUG4(" request %p [src = %d, dst = %d, tag = %d]",
197 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
198 xbt_dynar_push(comms, &requests[i]->pair);
204 index = SIMIX_network_waitany(comms);
208 xbt_dynar_free(&comms);
210 if(index != MPI_UNDEFINED) {
211 finish_wait(&requests[index], status);
217 void smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) {
222 index = smpi_mpi_waitany(count, requests, &stat);
223 if(index == MPI_UNDEFINED) {
226 if(status != MPI_STATUS_IGNORE) {
227 memcpy(&status[index], &stat, sizeof(stat));
229 // FIXME: check this -v
230 // Move the last request to the found position
231 requests[index] = requests[count - 1];
232 requests[count - 1] = MPI_REQUEST_NULL;
237 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
241 for(i = 0; i < incount; i++) {
242 if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
243 SIMIX_communication_destroy(requests[i]->pair);
244 finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
252 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
253 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
254 nary_tree_bcast(buf, count, datatype, root, comm, 4);
257 void smpi_mpi_barrier(MPI_Comm comm) {
258 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
259 nary_tree_barrier(comm, 4);
262 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
263 int system_tag = 666;
264 int rank, size, src, index, sendsize, recvsize;
265 MPI_Request* requests;
267 rank = smpi_comm_rank(comm);
268 size = smpi_comm_size(comm);
270 // Send buffer to root
271 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
273 sendsize = smpi_datatype_size(sendtype);
274 recvsize = smpi_datatype_size(recvtype);
275 // Local copy from root
276 memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
277 // Receive buffers from senders
278 requests = xbt_new(MPI_Request, size - 1);
280 for(src = 0; src < size; src++) {
282 requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
286 // Wait for completion of irecv's.
287 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
292 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
293 int system_tag = 666;
294 int rank, size, src, index, sendsize;
295 MPI_Request* requests;
297 rank = smpi_comm_rank(comm);
298 size = smpi_comm_size(comm);
300 // Send buffer to root
301 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
303 sendsize = smpi_datatype_size(sendtype);
304 // Local copy from root
305 memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
306 // Receive buffers from senders
307 requests = xbt_new(MPI_Request, size - 1);
309 for(src = 0; src < size; src++) {
311 requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
315 // Wait for completion of irecv's.
316 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
321 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
322 int system_tag = 666;
323 int rank, size, other, index, sendsize, recvsize;
324 MPI_Request* requests;
326 rank = smpi_comm_rank(comm);
327 size = smpi_comm_size(comm);
328 sendsize = smpi_datatype_size(sendtype);
329 recvsize = smpi_datatype_size(recvtype);
330 // Local copy from self
331 memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
332 // Send/Recv buffers to/from others;
333 requests = xbt_new(MPI_Request, 2 * (size - 1));
335 for(other = 0; other < size; other++) {
337 requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
339 requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
343 // Wait for completion of all comms.
344 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
348 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
349 int system_tag = 666;
350 int rank, size, other, index, sendsize, recvsize;
351 MPI_Request* requests;
353 rank = smpi_comm_rank(comm);
354 size = smpi_comm_size(comm);
355 sendsize = smpi_datatype_size(sendtype);
356 recvsize = smpi_datatype_size(recvtype);
357 // Local copy from self
358 memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
359 // Send buffers to others;
360 requests = xbt_new(MPI_Request, 2 * (size - 1));
362 for(other = 0; other < size; other++) {
364 requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
366 requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
370 // Wait for completion of all comms.
371 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
375 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
376 int system_tag = 666;
377 int rank, size, dst, index, sendsize, recvsize;
378 MPI_Request* requests;
380 rank = smpi_comm_rank(comm);
381 size = smpi_comm_size(comm);
383 // Recv buffer from root
384 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
386 sendsize = smpi_datatype_size(sendtype);
387 recvsize = smpi_datatype_size(recvtype);
388 // Local copy from root
389 memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
390 // Send buffers to receivers
391 requests = xbt_new(MPI_Request, size - 1);
393 for(dst = 0; dst < size; dst++) {
395 requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
399 // Wait for completion of isend's.
400 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
405 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
406 int system_tag = 666;
407 int rank, size, dst, index, sendsize, recvsize;
408 MPI_Request* requests;
410 rank = smpi_comm_rank(comm);
411 size = smpi_comm_size(comm);
413 // Recv buffer from root
414 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
416 sendsize = smpi_datatype_size(sendtype);
417 recvsize = smpi_datatype_size(recvtype);
418 // Local copy from root
419 memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
420 // Send buffers to receivers
421 requests = xbt_new(MPI_Request, size - 1);
423 for(dst = 0; dst < size; dst++) {
425 requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
429 // Wait for completion of isend's.
430 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
435 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
436 int system_tag = 666;
437 int rank, size, src, index, datasize;
438 MPI_Request* requests;
441 rank = smpi_comm_rank(comm);
442 size = smpi_comm_size(comm);
444 // Send buffer to root
445 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
447 datasize = smpi_datatype_size(datatype);
448 // Local copy from root
449 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
450 // Receive buffers from senders
451 //TODO: make a MPI_barrier here ?
452 requests = xbt_new(MPI_Request, size - 1);
453 tmpbufs = xbt_new(void*, size - 1);
455 for(src = 0; src < size; src++) {
457 tmpbufs[index] = xbt_malloc(count * datasize);
458 requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
462 // Wait for completion of irecv's.
463 for(src = 0; src < size - 1; src++) {
464 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
465 if(index == MPI_UNDEFINED) {
468 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
470 for(index = 0; index < size - 1; index++) {
471 xbt_free(tmpbufs[index]);
478 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
479 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
480 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
483 FIXME: buggy implementation
485 int system_tag = 666;
486 int rank, size, other, index, datasize;
487 MPI_Request* requests;
490 rank = smpi_comm_rank(comm);
491 size = smpi_comm_size(comm);
492 datasize = smpi_datatype_size(datatype);
493 // Local copy from self
494 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
495 // Send/Recv buffers to/from others;
496 //TODO: make a MPI_barrier here ?
497 requests = xbt_new(MPI_Request, 2 * (size - 1));
498 tmpbufs = xbt_new(void*, size - 1);
500 for(other = 0; other < size; other++) {
502 tmpbufs[index / 2] = xbt_malloc(count * datasize);
503 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
504 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
508 // Wait for completion of all comms.
509 for(other = 0; other < 2 * (size - 1); other++) {
510 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
511 if(index == MPI_UNDEFINED) {
514 if((index & 1) == 1) {
515 // Request is odd: it's a irecv
516 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
519 for(index = 0; index < size - 1; index++) {
520 xbt_free(tmpbufs[index]);
527 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
528 int system_tag = 666;
529 int rank, size, other, index, datasize;
531 MPI_Request* requests;
534 rank = smpi_comm_rank(comm);
535 size = smpi_comm_size(comm);
536 datasize = smpi_datatype_size(datatype);
537 // Local copy from self
538 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
539 // Send/Recv buffers to/from others;
540 total = rank + (size - (rank + 1));
541 requests = xbt_new(MPI_Request, total);
542 tmpbufs = xbt_new(void*, rank);
544 for(other = 0; other < rank; other++) {
545 tmpbufs[index] = xbt_malloc(count * datasize);
546 requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, other, system_tag, comm);
549 for(other = rank + 1; other < size; other++) {
550 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
553 // Wait for completion of all comms.
554 for(other = 0; other < total; other++) {
555 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
556 if(index == MPI_UNDEFINED) {
560 // #Request is below rank: it's a irecv
561 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
564 for(index = 0; index < size - 1; index++) {
565 xbt_free(tmpbufs[index]);