From aedc7f54098826e18dc8c2bf32594e9baeb3832f Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Fri, 25 Apr 2014 16:33:35 +0200 Subject: [PATCH] Begin to add an interface to instanciate SMPI programatically. This adds SMPI_init, SMPI_finalize, and SMPI_app_instance_register This allows user to deploy MSG+MPI applications in the same simulation, as well as deploying several instances of SMPI with the same or different application, without using smpirun. This means that we can now handle several MPI_COMM_WORLD. This is only a very early step, as for now the interface still depends on MSG calls to deploy the application, and run the processes. We should work on an interface to do this for SMPI, or add a generic interface that would allow several SimGrid interfaces to run alltogether. --- buildtools/Cmake/DefinePackages.cmake | 1 + include/smpi/smpi.h | 7 ++ src/smpi/private.h | 6 ++ src/smpi/smpi_comm.c | 18 ++++ src/smpi/smpi_deployment.c | 76 ++++++++++++++++ src/smpi/smpi_global.c | 123 +++++++++++++++++--------- src/smpi/smpi_group.c | 9 +- src/smpi/smpirun.in | 1 + 8 files changed, 195 insertions(+), 46 deletions(-) create mode 100644 src/smpi/smpi_deployment.c diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index 8a1eb4574c..4d85ae8eb4 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -143,6 +143,7 @@ set(SMPI_SRC src/smpi/smpi_pmpi.c src/smpi/smpi_replay.c src/smpi/smpi_topo.c + src/smpi/smpi_deployment.c src/smpi/colls/smpi_openmpi_selector.c src/smpi/colls/smpi_mpich_selector.c src/smpi/colls/colls_global.c diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index b91aece639..314636085e 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -747,5 +747,12 @@ XBT_PUBLIC(void) smpi_replay_init(int *argc, char***argv); XBT_PUBLIC(void) smpi_action_trace_run(char *); XBT_PUBLIC(int) smpi_replay_finalize(void); +XBT_PUBLIC(void) SMPI_app_instance_register(const char *name, xbt_main_func_t code, int num_processes); +XBT_PUBLIC(void) SMPI_init(void); +XBT_PUBLIC(void) SMPI_finalize(void); + + + + SG_END_DECL() #endif diff --git a/src/smpi/private.h b/src/smpi/private.h index 2d3fd06a0e..2c53164f74 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -72,6 +72,9 @@ typedef struct s_smpi_mpi_datatype{ #define COLL_TAG_GATHERV -2223 #define COLL_TAG_BCAST -3334 #define COLL_TAG_ALLREDUCE -4445 + +#define MPI_COMM_UNINITIALIZED ((MPI_Comm)-1) + //***************************************************************************************** typedef struct s_smpi_mpi_request { @@ -129,6 +132,7 @@ smpi_process_data_t smpi_process_remote_data(int index); void smpi_process_set_user_data(void *); void* smpi_process_get_user_data(void); int smpi_process_count(void); +MPI_Comm smpi_process_comm_world(void); smx_rdv_t smpi_process_mailbox(void); smx_rdv_t smpi_process_remote_mailbox(int index); smx_rdv_t smpi_process_mailbox_small(void); @@ -139,6 +143,8 @@ double smpi_process_simulated_elapsed(void); void smpi_process_set_sampling(int s); int smpi_process_get_sampling(void); +MPI_Comm* smpi_deployment_register_process(const char* instance_id, int rank, int index); + void smpi_comm_copy_buffer_callback(smx_action_t comm, void *buff, size_t buff_size); diff --git a/src/smpi/smpi_comm.c b/src/smpi/smpi_comm.c index a1b5939a5e..d462877aeb 100644 --- a/src/smpi/smpi_comm.c +++ b/src/smpi/smpi_comm.c @@ -59,6 +59,8 @@ MPI_Comm smpi_comm_new(MPI_Group group, MPI_Topology topo) void smpi_comm_destroy(MPI_Comm comm) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); smpi_group_unuse(comm->group); smpi_topo_destroy(comm->topo); // there's no use count on topos smpi_comm_unuse(comm); @@ -66,6 +68,9 @@ void smpi_comm_destroy(MPI_Comm comm) MPI_Group smpi_comm_group(MPI_Comm comm) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); + return comm->group; } @@ -77,16 +82,23 @@ MPI_Topology smpi_comm_topo(MPI_Comm comm) { int smpi_comm_size(MPI_Comm comm) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); + return smpi_group_size(smpi_comm_group(comm)); } int smpi_comm_rank(MPI_Comm comm) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); return smpi_group_rank(smpi_comm_group(comm), smpi_process_index()); } void smpi_comm_get_name (MPI_Comm comm, char* name, int* len) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); if(comm == MPI_COMM_WORLD) { strcpy(name, "WORLD"); *len = 5; @@ -97,6 +109,8 @@ void smpi_comm_get_name (MPI_Comm comm, char* name, int* len) MPI_Comm smpi_comm_split(MPI_Comm comm, int color, int key) { + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); int system_tag = 123; int index, rank, size, i, j, count, reqs; int* sendbuf; @@ -177,10 +191,14 @@ MPI_Comm smpi_comm_split(MPI_Comm comm, int color, int key) } void smpi_comm_use(MPI_Comm comm){ + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); comm->refcount++; } void smpi_comm_unuse(MPI_Comm comm){ + if (comm == MPI_COMM_UNINITIALIZED) + comm = smpi_process_comm_world(); comm->refcount--; if(comm->refcount==0) xbt_free(comm); diff --git a/src/smpi/smpi_deployment.c b/src/smpi/smpi_deployment.c new file mode 100644 index 0000000000..42f679f482 --- /dev/null +++ b/src/smpi/smpi_deployment.c @@ -0,0 +1,76 @@ +/* Copyright (c) 2004-2014. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "private.h" +#include "xbt/sysdep.h" +#include "xbt/log.h" +#include "xbt/dict.h" + +static xbt_dict_t smpi_instances = NULL; +extern int process_count; +extern int* index_to_process_data; + +typedef struct s_smpi_mpi_instance{ + const char* name; + int size; + int present_processes; + int index; + MPI_Comm comm_world; +} s_smpi_mpi_instance_t; + + +/** \ingroup smpi_simulation + * \brief Registers a running instance of a MPI program. + * + * FIXME : remove MSG from the loop at some point. + * \param name the reference name of the function. + * \param code the main mpi function (must have the same prototype than the main function of any C program: int ..(int argc, char *argv[])) + * \param num_processes the size of the instance we want to deploy + */ +void SMPI_app_instance_register(const char *name, xbt_main_func_t code, int num_processes) +{ + SIMIX_function_register(name, code); + + s_smpi_mpi_instance_t* instance = + (s_smpi_mpi_instance_t*)xbt_malloc(sizeof(s_smpi_mpi_instance_t)); + + instance->name = name; + instance->size = num_processes; + instance->present_processes = 0; + instance->index = process_count; + instance->comm_world = MPI_COMM_NULL; + process_count+=num_processes; + + if(!smpi_instances){ + smpi_instances=xbt_dict_new_homogeneous(xbt_free); + } + + xbt_dict_set(smpi_instances, name, (void*)instance, NULL); + return; +} + + +//get the index of the process in the process_data array +MPI_Comm* smpi_deployment_register_process(const char* instance_id, int rank, int index){ + + if(!smpi_instances){//no instance registered, we probably used smpirun. + index_to_process_data[index]=index; + return NULL; + } + + s_smpi_mpi_instance_t* instance = xbt_dict_get_or_null(smpi_instances, instance_id); + if (!instance) + xbt_die("Error, unknown instance %s", instance_id); + + if(instance->comm_world == MPI_COMM_NULL){ + MPI_Group group = smpi_group_new(instance->size); + instance->comm_world = smpi_comm_new(group, NULL); + } + instance->present_processes++; + index_to_process_data[index]=instance->index+rank; + smpi_group_set_mapping(smpi_comm_group(instance->comm_world), index, rank); + return & instance->comm_world; +} diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 84d0a7ec10..332b929ab3 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -28,16 +28,20 @@ typedef struct s_smpi_process_data { smx_rdv_t mailbox_small; xbt_os_timer_t timer; MPI_Comm comm_self; + MPI_Comm* comm_world; void *data; /* user data */ int index; char state; int sampling; /* inside an SMPI_SAMPLE_ block? */ + char* instance_id; } s_smpi_process_data_t; static smpi_process_data_t *process_data = NULL; -static int process_count = 0; +int process_count = 0; +int* index_to_process_data = NULL; -MPI_Comm MPI_COMM_WORLD = MPI_COMM_NULL; + +MPI_Comm MPI_COMM_WORLD = MPI_COMM_UNINITIALIZED; int MPI_UNIVERSE_SIZE; MPI_Errhandler *MPI_ERRORS_RETURN = NULL; @@ -62,25 +66,38 @@ static char *get_mailbox_name_small(char *str, int index) void smpi_process_init(int *argc, char ***argv) { - int index; + int index=-1; smpi_process_data_t data; smx_process_t proc; if (argc && argv) { proc = SIMIX_process_self(); - index = atoi((*argv)[1]); + //FIXME: dirty cleanup method to avoid using msg cleanup functions on these processes when using MSG+SMPI + proc->context->cleanup_func=SIMIX_process_cleanup; + char* instance_id = (*argv)[1]; + int rank = atoi((*argv)[2]); + index = SIMIX_process_get_PID(proc) -1; + #ifdef SMPI_F2C smpi_current_rank = index; #endif - + if(!index_to_process_data){ + index_to_process_data=(int*)xbt_malloc(SIMIX_process_count()*sizeof(int)); + } + MPI_Comm* temp_comm_world = smpi_deployment_register_process(instance_id, rank, index); data = smpi_process_remote_data(index); + data->comm_world = temp_comm_world; + data->index = index; + data->instance_id = instance_id; simcall_process_set_data(proc, data); - if (*argc > 2) { + if (*argc > 3) { free((*argv)[1]); - memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2)); + free((*argv)[2]); + memmove(&(*argv)[1], &(*argv)[3], sizeof(char *) * (*argc - 3)); (*argv)[(*argc) - 1] = NULL; + (*argv)[(*argc) - 2] = NULL; } - (*argc)--; + (*argc)-=2; data->argc = argc; data->argv = argv; // set the process attached to the mailbox @@ -102,7 +119,7 @@ void smpi_process_destroy(void) if(smpi_privatize_global_variables){ switch_data_segment(index); } - process_data[index]->state = SMPI_FINALIZED; + process_data[index_to_process_data[index]]->state = SMPI_FINALIZED; XBT_DEBUG("<%d> Process left the game", index); } @@ -158,7 +175,7 @@ int smpi_process_finalized() { int index = smpi_process_index(); if (index != MPI_UNDEFINED) - return (process_data[index]->state == SMPI_FINALIZED); + return (process_data[index_to_process_data[index]]->state == SMPI_FINALIZED); else return 0; } @@ -170,7 +187,7 @@ int smpi_process_initialized(void) { int index = smpi_process_index(); return ( (index != MPI_UNDEFINED) - && (process_data[index]->state == SMPI_INITIALIZED)); + && (process_data[index_to_process_data[index]]->state == SMPI_INITIALIZED)); } /** @@ -179,8 +196,8 @@ int smpi_process_initialized(void) void smpi_process_mark_as_initialized(void) { int index = smpi_process_index(); - if ((index != MPI_UNDEFINED) && (!process_data[index]->state != SMPI_FINALIZED)) - process_data[index]->state = SMPI_INITIALIZED; + if ((index != MPI_UNDEFINED) && (!process_data[index_to_process_data[index]]->state != SMPI_FINALIZED)) + process_data[index_to_process_data[index]]->state = SMPI_INITIALIZED; } @@ -230,7 +247,7 @@ smpi_process_data_t smpi_process_data(void) smpi_process_data_t smpi_process_remote_data(int index) { - return process_data[index]; + return process_data[index_to_process_data[index]]; } void smpi_process_set_user_data(void *data) @@ -257,6 +274,13 @@ int smpi_process_index(void) return data ? data->index : MPI_UNDEFINED; } +MPI_Comm smpi_process_comm_world(void) +{ + smpi_process_data_t data = smpi_process_data(); + //return MPI_COMM_NULL if not initialized + return data ? *data->comm_world : MPI_COMM_NULL; +} + smx_rdv_t smpi_process_mailbox(void) { smpi_process_data_t data = smpi_process_data(); @@ -332,7 +356,7 @@ void print_request(const char *message, MPI_Request request) request->dst, request->tag, request->flags); } -static void smpi_comm_copy_buffer_callback(smx_action_t comm, +void smpi_comm_copy_buffer_callback(smx_action_t comm, void *buff, size_t buff_size) { XBT_DEBUG("Copy the data over"); @@ -394,13 +418,17 @@ void smpi_global_init(void) int i; MPI_Group group; char name[MAILBOX_NAME_MAXLEN]; + int smpirun=0; - SIMIX_comm_set_copy_data_callback(&smpi_comm_copy_buffer_callback); - process_count = SIMIX_process_count(); - process_data = xbt_new(smpi_process_data_t, process_count); + + if (process_count == 0){ + process_count = SIMIX_process_count(); + smpirun=1; + } + process_data = xbt_new0(smpi_process_data_t, process_count); for (i = 0; i < process_count; i++) { process_data[i] = xbt_new(s_smpi_process_data_t, 1); - process_data[i]->index = i; + //process_data[i]->index = i; process_data[i]->argc = NULL; process_data[i]->argv = NULL; process_data[i]->mailbox = simcall_rdv_create(get_mailbox_name(name, i)); @@ -410,29 +438,22 @@ void smpi_global_init(void) if (MC_is_active()) MC_ignore_heap(process_data[i]->timer, xbt_os_timer_size()); process_data[i]->comm_self = MPI_COMM_NULL; + process_data[i]->comm_world = NULL; process_data[i]->state = SMPI_UNINITIALIZED; process_data[i]->sampling = 0; } - group = smpi_group_new(process_count); - MPI_COMM_WORLD = smpi_comm_new(group, NULL); - MPI_UNIVERSE_SIZE = smpi_comm_size(MPI_COMM_WORLD); - for (i = 0; i < process_count; i++) { - smpi_group_set_mapping(group, i, i); - } - - //check correctness of MPI parameters - - xbt_assert(sg_cfg_get_int("smpi/async_small_thres") <= - sg_cfg_get_int("smpi/send_is_detached_thres")); - - if (sg_cfg_is_default_value("smpi/running_power")) { - XBT_INFO("You did not set the power of the host running the simulation. " - "The timings will certainly not be accurate. " - "Use the option \"--cfg=smpi/running_power:\" to set its value." - "Check http://simgrid.org/simgrid/latest/doc/options.html#options_smpi_bench for more information. "); + //if the process was launched through smpirun script + //we generate a global mpi_comm_world + //if not, we let MPI_COMM_NULL, and the comm world + //will be private to each mpi instance + if(smpirun){ + group = smpi_group_new(process_count); + MPI_COMM_WORLD = smpi_comm_new(group, NULL); + MPI_UNIVERSE_SIZE = smpi_comm_size(MPI_COMM_WORLD); + for (i = 0; i < process_count; i++) { + smpi_group_set_mapping(group, i, i); + } } - if(smpi_privatize_global_variables) - smpi_initialize_global_memory_segments(); } void smpi_global_destroy(void) @@ -441,8 +462,10 @@ void smpi_global_destroy(void) int i; smpi_bench_destroy(); - while (smpi_group_unuse(smpi_comm_group(MPI_COMM_WORLD)) > 0); - xbt_free(MPI_COMM_WORLD); + if (MPI_COMM_WORLD != MPI_COMM_UNINITIALIZED){ + while (smpi_group_unuse(smpi_comm_group(MPI_COMM_WORLD)) > 0); + xbt_free(MPI_COMM_WORLD); + } MPI_COMM_WORLD = MPI_COMM_NULL; for (i = 0; i < count; i++) { if(process_data[i]->comm_self!=MPI_COMM_NULL){ @@ -655,3 +678,23 @@ int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[]) return 0; } + +// This function can be called from extern file, to initialize logs, options, and processes of smpi +// without the need of smpirun +void SMPI_init(){ + smpi_init_logs(); + smpi_init_options(); + smpi_global_init(); + smpi_check_options(); +#ifdef HAVE_TRACING + if (TRACE_is_enabled() && TRACE_is_configured()) { + TRACE_smpi_alloc(); + } +#endif + if(smpi_privatize_global_variables) + smpi_initialize_global_memory_segments(); +} + +void SMPI_finalize(){ + smpi_global_destroy(); +} diff --git a/src/smpi/smpi_group.c b/src/smpi/smpi_group.c index 587ab1df5e..921f71910e 100644 --- a/src/smpi/smpi_group.c +++ b/src/smpi/smpi_group.c @@ -30,7 +30,7 @@ MPI_Group smpi_group_new(int size) MPI_Group group; int i, count; - count = smpi_process_count(); + count = SIMIX_process_count(); group = xbt_new(s_smpi_mpi_group_t, 1); group->size = size; group->rank_to_index_map = xbt_new(int, size); @@ -84,7 +84,7 @@ void smpi_group_destroy(MPI_Group group) void smpi_group_set_mapping(MPI_Group group, int index, int rank) { - if (rank < group->size && index < smpi_process_count()) { + if (rank < group->size && index < SIMIX_process_count()) { group->rank_to_index_map[rank] = index; if(index!=MPI_UNDEFINED)group->index_to_rank_map[index] = rank; } @@ -103,10 +103,7 @@ int smpi_group_index(MPI_Group group, int rank) int smpi_group_rank(MPI_Group group, int index) { int rank = MPI_UNDEFINED; - - if (index < smpi_process_count()) { - rank = group->index_to_rank_map[index]; - } + rank = group->index_to_rank_map[index]; return rank; } diff --git a/src/smpi/smpirun.in b/src/smpi/smpirun.in index 3a015ffbda..9882e0692a 100755 --- a/src/smpi/smpirun.in +++ b/src/smpi/smpirun.in @@ -356,6 +356,7 @@ do host="$(echo $hostnames|cut -d' ' -f$j)" fi echo " " >> ${APPLICATIONTMP} + echo " " >> ${APPLICATIONTMP} echo " " >> ${APPLICATIONTMP} if [ "${EXTOPT}" = "smpi_replay" ]; then if [ ${NUMTRACES} -gt 1 ]; then -- 2.20.1