SMPI:
* Implement some more MPI primitives:
- MPI_Waitany, MPI_Waitall, MPI_Allreduce
+ MPI_Waitany, MPI_Waitall, MPI_Reduce, MPI_Allreduce
+ * Add support for optimized collectives (Bcast is now binomial by default)
SURF:
* Extract the routing logic into its own object.
smpi/smpi_mpi.c \
smpi/smpi_sender.c \
smpi/smpi_receiver.c \
- smpi/smpi_util.c
+ smpi/smpi_util.c \
+ smpi/smpi_coll.c
MSG_SRC= msg/msg_config.c \
msg/task.c msg/host.c msg/m_process.c msg/gos.c \
return retval;
}
+void print_req( smpi_mpi_request_t r );
+void print_req( smpi_mpi_request_t r ) {
+ printf("***req %p-> src=%d dst=%d tag=%d completed=0x%x consumed=0x%x\n",r,r->src,r->dst,r->tag,r->completed,r->consumed);
+}
+
+
+/**
+ * wait and friends ...
+ **/
int smpi_mpi_wait(smpi_mpi_request_t request, smpi_mpi_status_t * status)
{
int retval = MPI_SUCCESS;
retval = MPI_ERR_INTERN;
} else {
SIMIX_mutex_lock(request->mutex);
+
+#ifdef DEBUG_STEPH
+ print_req( request ); //@@
+#endif
while (!request->completed) {
SIMIX_cond_wait(request->cond, request->mutex);
}
return retval;
}
+/**
+ * waitall
+ **/
int smpi_mpi_waitall(int count, smpi_mpi_request_t requests[],
- smpi_mpi_status_t status[])
+ smpi_mpi_status_t status[])
{
- int cpt;
- int index;
- int retval;
- smpi_mpi_status_t stat;
-
- for (cpt = 0; cpt < count; cpt++) {
- retval = smpi_mpi_waitany(count, requests, &index, &stat);
- if (retval != MPI_SUCCESS)
- return retval;
- memcpy(&(status[index]), &stat, sizeof(stat));
- }
- return MPI_SUCCESS;
+ int cpt;
+ int index;
+ int retval;
+ smpi_mpi_status_t stat;
+
+ for (cpt = 0; cpt < count; cpt++) {
+ retval = smpi_mpi_waitany(count, requests, &index, &stat);
+ if (retval != MPI_SUCCESS)
+ return retval;
+ if (MPI_STATUS_IGNORE != status)
+ memcpy(&(status[index]), &stat, sizeof(stat));
+ }
+ return MPI_SUCCESS;
}
+/**
+ * waitany
+ **/
int smpi_mpi_waitany(int count, smpi_mpi_request_t * requests, int *index,
smpi_mpi_status_t * status)
{
}
/* First check if one of them is already done */
for (cpt = 0; cpt < count; cpt++) {
+ printf("...exam req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
if (requests[cpt]->completed && !requests[cpt]->consumed) { /* got ya */
+ printf("...found match req[%d] of msg from [%d]\n",cpt,requests[cpt]->src);
*index = cpt;
goto found_request;
}
/* FIXME: should use a SIMIX_cond_waitany, when implemented. For now, block on the first one */
while (1) {
for (cpt = 0; cpt < count; cpt++) {
+
+#ifdef DEBUG_STEPH
+ print_req( requests[cpt] );
+#endif
if (!requests[cpt]->completed) { /* this one is not done, wait on it */
+ printf("... blocked waiting a msg %d->%d, tag=%d\n",requests[cpt]->src,requests[cpt]->dst,requests[cpt]->tag);
while (!requests[cpt]->completed)
SIMIX_cond_wait(requests[cpt]->cond, requests[cpt]->mutex);
}
found_request:
+#ifdef DEBUG_STEPH
+ print_req( requests[cpt] );
+#endif
requests[*index]->consumed = 1;
-
+#ifdef DEBUG_STEPH
+ print_req( requests[cpt] );
+#endif
+ printf("...accessing *req[%d]->consumed\n",cpt);
if (NULL != status) {
status->MPI_SOURCE = requests[*index]->src;
status->MPI_TAG = requests[*index]->tag;
--- /dev/null
+/* $Id$tag */
+
+/* smpi_coll.c -- various optimized routing for collectives */
+
+/* Copyright (c) 2009 Stephane Genaud. */
+/* All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "private.h"
+
+
+/* proctree taken and translated from P2P-MPI */
+
+struct proc_tree {
+ int PROCTREE_A;
+ int numChildren;
+ int * child;
+ int parent;
+ int me;
+ int root;
+ int isRoot;
+};
+typedef struct proc_tree * proc_tree_t;
+
+
+
+/* prototypes */
+void build_tree( int index, int extent, proc_tree_t *tree);
+proc_tree_t alloc_tree( int arity );
+void free_tree( proc_tree_t tree);
+void print_tree(proc_tree_t tree);
+int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
+
+
+
+/**
+ * alloc and init
+ **/
+proc_tree_t alloc_tree( int arity ) {
+ proc_tree_t tree = malloc(1*sizeof(struct proc_tree));
+ int i;
+
+ tree->PROCTREE_A = arity;
+ tree->isRoot = 0;
+ tree->numChildren = 0;
+ tree->child = malloc(arity*sizeof(int));
+ for (i=0; i < arity; i++) {
+ tree->child[i] = -1;
+ }
+ tree->root = -1;
+ tree->parent = -1;
+ return( tree );
+}
+
+/**
+ * free
+ **/
+void free_tree( proc_tree_t tree) {
+ free (tree->child );
+ free(tree);
+}
+
+
+
+/**
+ * Build the tree depending on a process rank (index) and the group size (extent)
+ * @param index the rank of the calling process
+ * @param extent the total number of processes
+ **/
+void build_tree( int index, int extent, proc_tree_t *tree) {
+ int places = (*tree)->PROCTREE_A * index;
+ int i;
+ int ch;
+ int pr;
+
+ (*tree)->me = index;
+ (*tree)->root = 0 ;
+
+ for (i = 1; i <= (*tree)->PROCTREE_A; i++) {
+ ++places;
+ ch = (*tree)->PROCTREE_A * index + i + (*tree)->root;
+ //printf("places %d\n",places);
+ ch %= extent;
+ if (places < extent) {
+ //printf("ch <%d> = <%d>\n",i,ch);
+ //printf("adding to the tree at index <%d>\n\n",i-1);
+ (*tree)->child[i - 1] = ch;
+ (*tree)->numChildren++;
+ }
+ else {
+ //fprintf(stderr,"not adding to the tree\n");
+ }
+ }
+ //fprintf(stderr,"procTree.numChildren <%d>\n",(*tree)->numChildren);
+
+ if (index == (*tree)->root) {
+ (*tree)->isRoot = 1;
+ }
+ else {
+ (*tree)->isRoot = 0;
+ pr = (index - 1) / (*tree)->PROCTREE_A;
+ (*tree)->parent = pr;
+ }
+}
+
+
+void print_tree(proc_tree_t tree) {
+ int i;
+ char *spacer;
+ if (-1 != tree->parent ) {
+ printf("[%d]\n +---",tree->parent);
+ spacer= strdup(" ");
+ }
+ else {
+ spacer=strdup("");
+ }
+ printf("<%d>\n",tree->me);
+ for (i=0;i < tree->numChildren; i++) {
+ printf("%s +--- %d\n", spacer,tree->child[i]);
+ }
+ free(spacer);
+}
+
+/**
+ * bcast
+ **/
+int binomial_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
+ MPI_Comm comm)
+{
+ int system_tag = 999; // used negative int but smpi_create_request() declares this illegal (to be checked)
+ int rank;
+ int retval = MPI_SUCCESS;
+ int i;
+ smpi_mpi_request_t request;
+ smpi_mpi_request_t * requests;
+ void **tmpbufs;
+
+ rank = smpi_mpi_comm_rank(comm);
+ proc_tree_t tree = alloc_tree( 2 ); // arity=2: a binomial tree
+
+ build_tree( rank, comm->size, &tree );
+ /* wait for data from my parent in the tree */
+ if (!tree->isRoot) {
+#ifdef DEBUG_STEPH
+ printf("[%d] recv(%d from %d, tag=%d)\n",rank,rank, tree->parent, system_tag+rank);
+#endif
+ retval = smpi_create_request(buf, count, datatype,
+ tree->parent, rank,
+ system_tag + rank,
+ comm, &request);
+ if (MPI_SUCCESS != retval) {
+ printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
+ rank,retval,__FILE__,__LINE__);
+ }
+ smpi_mpi_irecv(request);
+#ifdef DEBUG_STEPH
+ printf("[%d] waiting on irecv from %d\n",rank , tree->parent);
+#endif
+ smpi_mpi_wait(request, MPI_STATUS_IGNORE);
+ }
+
+ tmpbufs = xbt_malloc( tree->numChildren * sizeof(void *));
+ requests = xbt_malloc( tree->numChildren * sizeof(smpi_mpi_request_t));
+#ifdef DEBUG_STEPH
+ printf("[%d] creates %d requests\n",rank,tree->numChildren);
+#endif
+
+ /* iniates sends to ranks lower in the tree */
+ for (i=0; i < tree->numChildren; i++) {
+ if (tree->child[i] != -1) {
+#ifdef DEBUG_STEPH
+ printf("[%d] send(%d->%d, tag=%d)\n",rank,rank, tree->child[i], system_tag+tree->child[i]);
+#endif
+ tmpbufs[i] = xbt_malloc( count * datatype->size);
+ memcpy( tmpbufs[i], buf, count * datatype->size * sizeof(char));
+ retval = smpi_create_request(tmpbufs[i], count, datatype,
+ rank, tree->child[i],
+ system_tag + tree->child[i],
+ comm, &(requests[i]));
+#ifdef DEBUG_STEPH
+ printf("[%d] after create req[%d]=%p req->(src=%d,dst=%d)\n",rank , i, requests[i],requests[i]->src,requests[i]->dst );
+#endif
+ if (MPI_SUCCESS != retval) {
+ printf("** internal error: smpi_create_request() rank=%d returned retval=%d, %s:%d\n",
+ rank,retval,__FILE__,__LINE__);
+ }
+ smpi_mpi_isend(requests[i]);
+ /* FIXME : we should not wait immediately here. See next FIXME. */
+ smpi_mpi_wait( requests[i], MPI_STATUS_IGNORE);
+ xbt_free(tmpbufs[i]);
+ xbt_free(requests[i]);
+ }
+ }
+ /* FIXME : normally, we sould wait only once all isend have been issued:
+ * this is the following commented code. It deadlocks, probably because
+ * of a bug in the sender process */
+
+ /* wait for completion of sends */
+ /* printf("[%d] wait for %d send completions\n",rank,tree->numChildren);
+ smpi_mpi_waitall( tree->numChildren, requests, MPI_STATUS_IGNORE);
+ printf("[%d] reqs completed\n)",rank);
+ */
+
+ xbt_free(tmpbufs);
+ xbt_free(requests);
+ return(retval);
+}
+
+/**
+ * example usage
+ **/
+/*
+ * int main() {
+
+ int rank;
+ int size=12;
+
+ proc_tree_t tree;
+ for (rank=0;rank<size;rank++) {
+ printf("--------------tree for rank %d ----------\n",rank);
+ tree = alloc_tree( 2 );
+ build_tree( rank, size, &tree );
+ print_tree( tree );
+ free_tree( tree );
+
+ }
+ printf("-------------- bcast ----------\n");
+ for (rank=0;rank<size;rank++) {
+ bcast( rank, size );
+ }
+
+
+}
+*/
+
+
+
+
+
#include "private.h"
+#include "smpi_coll_private.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_mpi, smpi,
"Logging specific to SMPI (mpi)");
/**
* MPI_Bcast
**/
+
+/**
+ * flat bcast
+ **/
+int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
+int flat_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root,
+ MPI_Comm comm)
+{
+ int rank;
+ int retval = MPI_SUCCESS;
+ smpi_mpi_request_t request;
+
+ rank = smpi_mpi_comm_rank(comm);
+ if (rank == root) {
+ retval = smpi_create_request(buf, count, datatype, root,
+ (root + 1) % comm->size, 0, comm, &request);
+ request->forward = comm->size - 1;
+ smpi_mpi_isend(request);
+ } else {
+ retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank,
+ 0, comm, &request);
+ smpi_mpi_irecv(request);
+ }
+
+ smpi_mpi_wait(request, MPI_STATUS_IGNORE);
+ xbt_mallocator_release(smpi_global->request_mallocator, request);
+
+ return(retval);
+
+}
+
+/**
+ * Bcast user entry point
+ **/
int SMPI_MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root,
MPI_Comm comm)
{
int retval = MPI_SUCCESS;
- int rank;
- smpi_mpi_request_t request;
smpi_bench_end();
- rank = smpi_mpi_comm_rank(comm);
-
- if (rank == root) {
- retval = smpi_create_request(buf, count, datatype, root,
- (root + 1) % comm->size, 0, comm, &request);
- request->forward = comm->size - 1;
- smpi_mpi_isend(request);
- } else {
- retval = smpi_create_request(buf, count, datatype, MPI_ANY_SOURCE, rank,
- 0, comm, &request);
- smpi_mpi_irecv(request);
- }
-
- smpi_mpi_wait(request, MPI_STATUS_IGNORE);
- xbt_mallocator_release(smpi_global->request_mallocator, request);
+ //retval = flat_tree_bcast(buf, count, datatype, root, comm);
+ retval = binomial_tree_bcast(buf, count, datatype, root, comm);
smpi_bench_begin();
request_queue = mydata->pending_recv_request_queue;
message_queue = mydata->received_message_queue;
+
+
while (1) {
// FIXME: better algorithm, maybe some kind of balanced tree? or a heap?
xbt_fifo_foreach(message_queue, message_item, message,
smpi_received_message_t) {
+//#define DEBUG_MATCH
+#ifdef DEBUG_MATCH
+ printf("[%s] try match (req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n",
+ __FILE__,request->src,message->src,request->tag, message->tag);
+#endif
if (request->comm == message->comm &&
(MPI_ANY_SOURCE == request->src || request->src == message->src)
&& (MPI_ANY_TAG == request->tag || request->tag == message->tag)) {
xbt_fifo_free_item(request_item);
xbt_fifo_remove_item(message_queue, message_item);
xbt_fifo_free_item(message_item);
+#ifdef DEBUG_MATCH
+ printf("[%s] found match: req_src=%d,msg_src=%d)x(req_tag=%d,msg_tag=%d)\n",
+ __FILE__,request->src,message->src,request->tag, message->tag);
+#endif
goto stopsearch;
}
}
(request->dst + message->forward + 1) % request->comm->size;
xbt_fifo_push(request_queue, request);
} else {
+//#define DEBUG_MATCH
+#ifdef DEBUG_MATCH
+ printf("**SENDER: request %p completed :=1\n",request);
+#endif
request->completed = 1;
}