teshsuite/smpi/coll-reduce-scatter/coll-reduce-scatter
teshsuite/smpi/coll-scatter/coll-scatter
teshsuite/smpi/macro-shared/macro-shared
+teshsuite/smpi/macro-partial-shared/macro-partial-shared
+teshsuite/smpi/macro-partial-shared-communication/macro-partial-shared-communication
teshsuite/smpi/type-struct/type-struct
teshsuite/smpi/type-vector/type-vector
+ teshsuite/s4u/actor/actor
+ teshsuite/s4u/concurrent_rw/concurrent_rw
+ teshsuite/s4u/host_on_off_wait/host_on_off_wait
+ teshsuite/s4u/listen_async/listen_async
+ teshsuite/s4u/pid/pid
+ teshsuite/s4u/storage_client_server/storage_client_server
teshsuite/surf/lmm_usage/lmm_usage
teshsuite/surf/maxmin_bench/maxmin_bench
teshsuite/surf/surf_usage/surf_usage
MPI_CALL(XBT_PUBLIC(int), MPI_Get_accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
void* result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win));
+
+ MPI_CALL(XBT_PUBLIC(int), MPI_Rget,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+ MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Rput,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+ MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Raccumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
+ int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Rget_accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
+ void* result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
+ int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request));
+
+ MPI_CALL(XBT_PUBLIC(int), MPI_Fetch_and_op,( void *origin_addr, void* result_addr, MPI_Datatype datatype,
+ int target_rank, MPI_Aint target_disp, MPI_Op op, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Compare_and_swap, (void *origin_addr, void *compare_addr,
+ void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Win win));
+
MPI_CALL(XBT_PUBLIC(int), MPI_Alloc_mem, (MPI_Aint size, MPI_Info info, void *baseptr));
MPI_CALL(XBT_PUBLIC(int), MPI_Free_mem, (void *base));
MPI_Comm comm, MPI_Comm *intercomm, int* array_of_errcodes));
MPI_CALL(XBT_PUBLIC(int), MPI_Comm_get_parent,( MPI_Comm *parent));
MPI_CALL(XBT_PUBLIC(int), MPI_Win_complete,(MPI_Win win));
- MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win));
+
MPI_CALL(XBT_PUBLIC(int), MPI_Win_post,(MPI_Group group, int assert, MPI_Win win));
MPI_CALL(XBT_PUBLIC(int), MPI_Win_start,(MPI_Group group, int assert, MPI_Win win));
MPI_CALL(XBT_PUBLIC(int), MPI_Win_test,(MPI_Win win, int *flag));
- MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock,(int rank, MPI_Win win));
MPI_CALL(XBT_PUBLIC(int), MPI_Win_wait,(MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock_all,(int assert, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock,(int rank, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock_all,(MPI_Win win));
+
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush,(int rank, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_local,(int rank, MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_all,(MPI_Win win));
+ MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_local_all,(MPI_Win win));
MPI_CALL(XBT_PUBLIC(int), MPI_File_get_errhandler , (MPI_File file, MPI_Errhandler *errhandler));
MPI_CALL(XBT_PUBLIC(int), MPI_File_set_errhandler, (MPI_File file, MPI_Errhandler errhandler));
#define SMPI_SAMPLE_DELAY(duration) for(smpi_execute(duration); 0; )
#define SMPI_SAMPLE_FLOPS(flops) for(smpi_execute_flops(flops); 0; )
-XBT_PUBLIC(int) smpi_is_shared(void *buf);
XBT_PUBLIC(void *) smpi_shared_malloc(size_t size, const char *file, int line);
#define SMPI_SHARED_MALLOC(size) smpi_shared_malloc(size, __FILE__, __LINE__)
+XBT_PUBLIC(void *) smpi_shared_malloc_global__(size_t size, const char *file, int line, size_t *shared_block_offsets, int nb_shared_blocks);
+#define SMPI_PARTIAL_SHARED_MALLOC(size, shared_block_offsets, nb_shared_blocks)\
+ smpi_shared_malloc_global__(size, __FILE__, __LINE__, shared_block_offsets, nb_shared_blocks)
XBT_PUBLIC(void) smpi_shared_free(void *data);
#define SMPI_SHARED_FREE(data) smpi_shared_free(data)
/* Fortran specific stuff */
- XBT_PUBLIC(int) __attribute__((weak)) smpi_simulated_main_(int argc, char** argv);
- XBT_PUBLIC(int) __attribute__((weak)) MAIN__();
- XBT_PUBLIC(int) smpi_main(int (*realmain) (int argc, char *argv[]),int argc, char *argv[]);
- XBT_PUBLIC(void) __attribute__((weak)) user_main_();
+ XBT_PUBLIC(int) smpi_main(const char* program, int argc, char *argv[]);
XBT_PUBLIC(int) smpi_process_index();
XBT_PUBLIC(void) smpi_process_init(int *argc, char ***argv);
void *recvbuf, int recvcount, MPI_Datatype recvtype){
int count;
+// FIXME Handle the case of a partial shared malloc.
+#if 0
if(smpi_is_shared(sendbuf)){
XBT_DEBUG("Copy input buf %p is shared. Let's ignore it.", sendbuf);
}else if(smpi_is_shared(recvbuf)){
XBT_DEBUG("Copy output buf %p is shared. Let's ignore it.", recvbuf);
}
+#endif
- if(smpi_privatize_global_variables){
+ if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){
smpi_switch_data_segment(smpi_process()->index());
}
/* First check if we really have something to do */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+ #include <spawn.h>
+ #include <sys/types.h>
+ #include <sys/wait.h>
+ #include <dlfcn.h>
+
#include "mc/mc.h"
#include "private.h"
#include "private.hpp"
#include "simgrid/s4u/Mailbox.hpp"
+#include "smpi/smpi_shared_malloc.hpp"
#include "simgrid/sg_config.h"
#include "src/kernel/activity/SynchroComm.hpp"
#include "src/mc/mc_record.h"
#include <stdio.h>
#include <stdlib.h>
#include <string>
+ #include <utility>
#include <vector>
+ #include <memory>
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_kernel, smpi, "Logging specific to SMPI (kernel)");
#include <boost/tokenizer.hpp>
#include <boost/algorithm/string.hpp> /* trim_right / trim_left */
+ #ifndef RTLD_DEEPBIND
+ /* RTLD_DEEPBIND is a bad idea of GNU ld that obviously does not exist on other platforms
+ * See https://www.akkadia.org/drepper/dsohowto.pdf
+ * and https://lists.freebsd.org/pipermail/freebsd-current/2016-March/060284.html
+ */
+ #define RTLD_DEEPBIND 0
+ #endif
+
+ /* Mac OSX does not have any header file providing that definition so we have to duplicate it here. Bummers. */
+ extern char** environ; /* we use it in posix_spawnp below */
+
#if HAVE_PAPI
#include "papi.h"
const char* papi_default_config_name = "default";
smpi_comm_copy_data_callback = callback;
}
+static void print(std::vector<std::pair<size_t, size_t>> vec) {
+ fprintf(stderr, "{");
+ for(auto elt: vec) {
+ fprintf(stderr, "(0x%lx, 0x%lx),", elt.first, elt.second);
+ }
+ fprintf(stderr, "}\n");
+}
+static void memcpy_private(void *dest, const void *src, size_t n, std::vector<std::pair<size_t, size_t>> &private_blocks) {
+ for(auto block : private_blocks) {
+ memcpy((uint8_t*)dest+block.first, (uint8_t*)src+block.first, block.second-block.first);
+ }
+}
+
+static void check_blocks(std::vector<std::pair<size_t, size_t>> &private_blocks, size_t buff_size) {
+ for(auto block : private_blocks) {
+ xbt_assert(block.first <= block.second && block.second <= buff_size, "Oops, bug in shared malloc.");
+ }
+}
+
void smpi_comm_copy_buffer_callback(smx_activity_t synchro, void *buff, size_t buff_size)
{
-
simgrid::kernel::activity::Comm *comm = dynamic_cast<simgrid::kernel::activity::Comm*>(synchro);
-
+ int src_shared=0, dst_shared=0;
+ size_t src_offset=0, dst_offset=0;
+ std::vector<std::pair<size_t, size_t>> src_private_blocks;
+ std::vector<std::pair<size_t, size_t>> dst_private_blocks;
XBT_DEBUG("Copy the data over");
- if(smpi_is_shared(buff)){
+ if((src_shared=smpi_is_shared(buff, src_private_blocks, &src_offset))) {
XBT_DEBUG("Sender %p is shared. Let's ignore it.", buff);
- }else if(smpi_is_shared((char*)comm->dst_buff)){
+ src_private_blocks = shift_and_frame_private_blocks(src_private_blocks, src_offset, buff_size);
+ }
+ else {
+ src_private_blocks.clear();
+ src_private_blocks.push_back(std::make_pair(0, buff_size));
+ }
+ if((dst_shared=smpi_is_shared((char*)comm->dst_buff, dst_private_blocks, &dst_offset))) {
XBT_DEBUG("Receiver %p is shared. Let's ignore it.", (char*)comm->dst_buff);
- }else{
- void* tmpbuff=buff;
- if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP) && (static_cast<char*>(buff) >= smpi_start_data_exe)
- && (static_cast<char*>(buff) < smpi_start_data_exe + smpi_size_data_exe )
- ){
- XBT_DEBUG("Privatization : We are copying from a zone inside global memory... Saving data to temp buffer !");
-
- smpi_switch_data_segment(
- (static_cast<simgrid::smpi::Process*>((static_cast<simgrid::MsgActorExt*>(comm->src_proc->data)->data))->index()));
- tmpbuff = static_cast<void*>(xbt_malloc(buff_size));
- memcpy(tmpbuff, buff, buff_size);
- }
-
- if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP) && ((char*)comm->dst_buff >= smpi_start_data_exe)
- && ((char*)comm->dst_buff < smpi_start_data_exe + smpi_size_data_exe )){
- XBT_DEBUG("Privatization : We are copying to a zone inside global memory - Switch data segment");
- smpi_switch_data_segment(
- (static_cast<simgrid::smpi::Process*>((static_cast<simgrid::MsgActorExt*>(comm->dst_proc->data)->data))->index()));
- }
-
- XBT_DEBUG("Copying %zu bytes from %p to %p", buff_size, tmpbuff,comm->dst_buff);
- memcpy(comm->dst_buff, tmpbuff, buff_size);
+ dst_private_blocks = shift_and_frame_private_blocks(dst_private_blocks, dst_offset, buff_size);
+ }
+ else {
+ dst_private_blocks.clear();
+ dst_private_blocks.push_back(std::make_pair(0, buff_size));
+ }
+/*
+ fprintf(stderr, "size: 0x%x\n", buff_size);
+ fprintf(stderr, "src: ");
+ print(src_private_blocks);
+ fprintf(stderr, "src_offset = 0x%x\n", src_offset);
+ fprintf(stderr, "dst: ");
+ print(dst_private_blocks);
+ fprintf(stderr, "dst_offset = 0x%x\n", dst_offset);
+*/
+ check_blocks(src_private_blocks, buff_size);
+ check_blocks(dst_private_blocks, buff_size);
+ auto private_blocks = merge_private_blocks(src_private_blocks, dst_private_blocks);
+/*
+ fprintf(stderr, "Private blocks: ");
+ print(private_blocks);
+*/
+ check_blocks(private_blocks, buff_size);
+ void* tmpbuff=buff;
- if((smpi_privatize_global_variables) && (static_cast<char*>(buff) >= smpi_start_data_exe)
++ if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP) && (static_cast<char*>(buff) >= smpi_start_data_exe)
+ && (static_cast<char*>(buff) < smpi_start_data_exe + smpi_size_data_exe )
+ ){
+ XBT_DEBUG("Privatization : We are copying from a zone inside global memory... Saving data to temp buffer !");
+
+ smpi_switch_data_segment(
+ (static_cast<simgrid::smpi::Process*>((static_cast<simgrid::MsgActorExt*>(comm->src_proc->data)->data))->index()));
+ tmpbuff = static_cast<void*>(xbt_malloc(buff_size));
+ memcpy_private(tmpbuff, buff, buff_size, private_blocks);
+ }
- if((smpi_privatize_global_variables) && ((char*)comm->dst_buff >= smpi_start_data_exe)
- if (comm->detached) {
- // if this is a detached send, the source buffer was duplicated by SMPI
- // sender to make the original buffer available to the application ASAP
- xbt_free(buff);
- //It seems that the request is used after the call there this should be free somewhere else but where???
- //xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free
- comm->src_buff = nullptr;
- }
- if(tmpbuff!=buff)xbt_free(tmpbuff);
++ if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP) && ((char*)comm->dst_buff >= smpi_start_data_exe)
+ && ((char*)comm->dst_buff < smpi_start_data_exe + smpi_size_data_exe )){
+ XBT_DEBUG("Privatization : We are copying to a zone inside global memory - Switch data segment");
+ smpi_switch_data_segment(
+ (static_cast<simgrid::smpi::Process*>((static_cast<simgrid::MsgActorExt*>(comm->dst_proc->data)->data))->index()));
+ }
-
+ XBT_DEBUG("Copying %zu bytes from %p to %p", buff_size, tmpbuff,comm->dst_buff);
+ memcpy_private(comm->dst_buff, tmpbuff, buff_size, private_blocks);
+
+ if (comm->detached) {
+ // if this is a detached send, the source buffer was duplicated by SMPI
+ // sender to make the original buffer available to the application ASAP
+ xbt_free(buff);
+ //It seems that the request is used after the call there this should be free somewhere else but where???
+ //xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free
+ comm->src_buff = nullptr;
}
+ if(tmpbuff!=buff)xbt_free(tmpbuff);
}
void smpi_global_init()
{
- int i;
MPI_Group group;
- int smpirun=0;
if (!MC_is_active()) {
global_timer = xbt_os_timer_new();
}
}
#endif
+
+ int smpirun = 0;
+ msg_bar_t finalization_barrier = nullptr;
if (process_count == 0){
process_count = SIMIX_process_count();
smpirun=1;
+ finalization_barrier = MSG_barrier_init(process_count);
}
smpi_universe_size = process_count;
process_data = new simgrid::smpi::Process*[process_count];
- for (i = 0; i < process_count; i++) {
- process_data[i] = new simgrid::smpi::Process(i);
+ for (int i = 0; i < process_count; i++) {
+ process_data[i] = new simgrid::smpi::Process(i, finalization_barrier);
}
//if the process was launched through smpirun script we generate a global mpi_comm_world
//if not, we let MPI_COMM_NULL, and the comm world will be private to each mpi instance
- if(smpirun){
+ if (smpirun) {
group = new simgrid::smpi::Group(process_count);
MPI_COMM_WORLD = new simgrid::smpi::Comm(group, nullptr);
MPI_Attr_put(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, reinterpret_cast<void *>(process_count));
- msg_bar_t bar = MSG_barrier_init(process_count);
- for (i = 0; i < process_count; i++) {
+ for (int i = 0; i < process_count; i++)
group->set_mapping(i, i);
- process_data[i]->set_finalization_barrier(bar);
- }
}
}
}
xbt_free(index_to_process_data);
- if(smpi_privatize_global_variables)
+ if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP)
smpi_destroy_global_memory_segments();
smpi_free_static();
}
extern "C" {
- #ifndef WIN32
-
- void __attribute__ ((weak)) user_main_()
- {
- xbt_die("Should not be in this smpi_simulated_main");
- }
-
- int __attribute__ ((weak)) smpi_simulated_main_(int argc, char **argv)
- {
- simgrid::smpi::Process::init(&argc, &argv);
- user_main_();
- return 0;
- }
-
- inline static int smpi_main_wrapper(int argc, char **argv){
- int ret = smpi_simulated_main_(argc,argv);
- if(ret !=0){
- XBT_WARN("SMPI process did not return 0. Return value : %d", ret);
- smpi_process()->set_return_value(ret);
- }
- return 0;
- }
-
- int __attribute__ ((weak)) main(int argc, char **argv)
- {
- return smpi_main(smpi_main_wrapper, argc, argv);
- }
-
- #endif
-
static void smpi_init_logs(){
/* Connect log categories. See xbt/log.c */
simgrid::smpi::Colls::smpi_coll_cleanup_callback=nullptr;
smpi_cpu_threshold = xbt_cfg_get_double("smpi/cpu-threshold");
smpi_host_speed = xbt_cfg_get_double("smpi/host-speed");
- smpi_privatize_global_variables = xbt_cfg_get_boolean("smpi/privatize-global-variables");
+ const char* smpi_privatize_option = xbt_cfg_get_string("smpi/privatize-global-variables");
+ if (std::strcmp(smpi_privatize_option, "no") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_NONE;
+ else if (std::strcmp(smpi_privatize_option, "yes") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_DEFAULT;
+ else if (std::strcmp(smpi_privatize_option, "mmap") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_MMAP;
+ else if (std::strcmp(smpi_privatize_option, "dlopen") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_DLOPEN;
+
+ // Some compatibility stuff:
+ else if (std::strcmp(smpi_privatize_option, "1") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_DEFAULT;
+ else if (std::strcmp(smpi_privatize_option, "0") == 0)
+ smpi_privatize_global_variables = SMPI_PRIVATIZE_NONE;
+
+ else
+ xbt_die("Invalid value for smpi/privatize-global-variables: %s",
+ smpi_privatize_option);
+
if (smpi_cpu_threshold < 0)
smpi_cpu_threshold = DBL_MAX;
}
}
- int smpi_main(int (*realmain) (int argc, char *argv[]), int argc, char *argv[])
+ static int execute_command(const char * const argv[])
+ {
+ pid_t pid;
+ int status;
+ if (posix_spawnp(&pid, argv[0], nullptr, nullptr, (char* const*) argv, environ) != 0)
+ return 127;
+ if (waitpid(pid, &status, 0) != pid)
+ return 127;
+ return status;
+ }
+
+ typedef std::function<int(int argc, char *argv[])> smpi_entry_point_type;
+ typedef int (* smpi_c_entry_point_type)(int argc, char **argv);
+ typedef void (* smpi_fortran_entry_point_type)(void);
+
+ static int smpi_run_entry_point(smpi_entry_point_type entry_point, std::vector<std::string> args)
+ {
+ const int argc = args.size();
+ std::unique_ptr<char*[]> argv(new char*[argc + 1]);
+ for (int i = 0; i != argc; ++i)
+ argv[i] = args[i].empty() ? const_cast<char*>(""): &args[i].front();
+ argv[argc] = nullptr;
+
+ int res = entry_point(argc, argv.get());
+ if (res != 0){
+ XBT_WARN("SMPI process did not return 0. Return value : %d", res);
+ smpi_process()->set_return_value(res);
+ }
+ return 0;
+ }
+
+ // TODO, remove the number of functions involved here
+ static smpi_entry_point_type smpi_resolve_function(void* handle)
+ {
+ smpi_fortran_entry_point_type entry_point2 =
+ (smpi_fortran_entry_point_type) dlsym(handle, "user_main_");
+ if (entry_point2 != nullptr) {
+ // fprintf(stderr, "EP user_main_=%p\n", entry_point2);
+ return [entry_point2](int argc, char** argv) {
+ smpi_process_init(&argc, &argv);
+ entry_point2();
+ return 0;
+ };
+ }
+
+ smpi_c_entry_point_type entry_point = (smpi_c_entry_point_type) dlsym(handle, "main");
+ if (entry_point != nullptr) {
+ // fprintf(stderr, "EP main=%p\n", entry_point);
+ return entry_point;
+ }
+
+ return smpi_entry_point_type();
+ }
+
+ int smpi_main(const char* executable, int argc, char *argv[])
{
srand(SMPI_RAND_SEED);
// parse the platform file: get the host list
SIMIX_create_environment(argv[1]);
- SIMIX_comm_set_copy_data_callback(smpi_comm_copy_data_callback);
- SIMIX_function_register_default(realmain);
+ SIMIX_comm_set_copy_data_callback(smpi_comm_copy_buffer_callback);
+
+ static std::size_t rank = 0;
+
+ if (smpi_privatize_global_variables == SMPI_PRIVATIZE_DLOPEN) {
+
+ std::string executable_copy = executable;
+ simix_global->default_function = [executable_copy](std::vector<std::string> args) {
+ return std::function<void()>([executable_copy, args] {
+
+ // Copy the dynamic library:
+ std::string target_executable = executable_copy
+ + "_" + std::to_string(getpid())
+ + "_" + std::to_string(rank++) + ".so";
+ // TODO, execute directly instead of relying on cp
+ const char* command1 [] = {
+ "cp", "--reflink=auto", "--", executable_copy.c_str(), target_executable.c_str(),
+ nullptr
+ };
+ const char* command2 [] = {
+ "cp", "--", executable_copy.c_str(), target_executable.c_str(),
+ nullptr
+ };
+ if (execute_command(command1) != 0 && execute_command(command2) != 0)
+ xbt_die("copy failed");
+
+ // Load the copy and resolve the entry point:
+ void* handle = dlopen(target_executable.c_str(), RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND);
+ unlink(target_executable.c_str());
+ if (handle == nullptr)
+ xbt_die("dlopen failed");
+ smpi_entry_point_type entry_point = smpi_resolve_function(handle);
+ if (!entry_point)
+ xbt_die("Could not resolve entry point");
+
+ smpi_run_entry_point(entry_point, args);
+ });
+ };
+
+ }
+ else {
+
+ // Load the dynamic library and resolve the entry point:
+ void* handle = dlopen(executable, RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND);
+ if (handle == nullptr)
+ xbt_die("dlopen failed for %s", executable);
+ smpi_entry_point_type entry_point = smpi_resolve_function(handle);
+ if (!entry_point)
+ xbt_die("main not found in %s", executable);
+ // TODO, register the executable for SMPI privatization
+
+ // Execute the same entry point for each simulated process:
+ simix_global->default_function = [entry_point](std::vector<std::string> args) {
+ return std::function<void()>([entry_point, args] {
+ smpi_run_entry_point(entry_point, args);
+ });
+ };
+
+ }
+
SIMIX_launch_application(argv[2]);
smpi_global_init();
smpi_check_options();
- if(smpi_privatize_global_variables)
+ if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP)
smpi_initialize_global_memory_segments();
/* Clean IO before the run */
smpi_check_options();
if (TRACE_is_enabled() && TRACE_is_configured())
TRACE_smpi_alloc();
- if(smpi_privatize_global_variables)
+ if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP)
smpi_initialize_global_memory_segments();
}
Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags)
{
void *old_buf = nullptr;
- if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(buf_))){
+// FIXME Handle the case of a partial shared malloc.
+ if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED))) { // && (!smpi_is_shared(buf_))){
// This part handles the problem of non-contiguous memory
old_buf = buf;
- buf_ = count==0 ? nullptr : xbt_malloc(count*datatype->size());
- if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
- datatype->serialize(old_buf, buf_, count);
+ if (count==0){
+ buf_ = nullptr;
+ }else {
+ buf_ = xbt_malloc(count*datatype->size());
+ if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
+ datatype->serialize(old_buf, buf_, count);
+ }
}
}
// This part handles the problem of non-contiguous memory (for the unserialisation at the reception)
if ((flags_ & RECV) != 0) {
this->print_request("New recv");
+ simgrid::smpi::Process* process = smpi_process_remote(dst_);
+
int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
- xbt_mutex_t mut = smpi_process()->mailboxes_mutex();
+ xbt_mutex_t mut = process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & RMA) != 0)
xbt_mutex_acquire(mut);
if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) {
- mailbox = smpi_process()->mailbox();
+ mailbox = process->mailbox();
}
else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
//We have to check both mailboxes (because SSEND messages are sent to the large mbox).
//begin with the more appropriate one : the small one.
- mailbox = smpi_process()->mailbox_small();
+ mailbox = process->mailbox_small();
XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv,
static_cast<void*>(this));
if (action == nullptr) {
- mailbox = smpi_process()->mailbox();
+ mailbox = process->mailbox();
XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast<void*>(this));
if (action == nullptr) {
XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
- mailbox = smpi_process()->mailbox_small();
+ mailbox = process->mailbox_small();
}
} else {
XBT_DEBUG("yes there was something for us in the large mailbox");
}
} else {
- mailbox = smpi_process()->mailbox_small();
+ mailbox = process->mailbox_small();
XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast<void*>(this));
if (action == nullptr) {
XBT_DEBUG("No, nothing in the permanent receive mailbox");
- mailbox = smpi_process()->mailbox();
+ mailbox = process->mailbox();
} else {
XBT_DEBUG("yes there was something for us in the small mailbox");
}
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
real_size_=size_;
- action_ = simcall_comm_irecv(SIMIX_process_self(), mailbox, buf_, &real_size_, &match_recv,
- ! smpi_process()->replaying()? smpi_comm_copy_data_callback
+ action_ = simcall_comm_irecv(process->process(), mailbox, buf_, &real_size_, &match_recv,
+ ! process->replaying()? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, this, -1.0);
XBT_DEBUG("recv simcall posted");
if (async_small_thresh != 0 || (flags_ & RMA) != 0 )
xbt_mutex_release(mut);
} else { /* the RECV flag was not set, so this is a send */
- int receiver = dst_;
-
+ simgrid::smpi::Process* process = smpi_process_remote(dst_);
int rank = src_;
if (TRACE_smpi_view_internals()) {
- TRACE_smpi_send(rank, rank, receiver, tag_, size_);
+ TRACE_smpi_send(rank, rank, dst_, tag_, size_);
}
this->print_request("New send");
refcount_++;
if(!(old_type_->flags() & DT_FLAG_DERIVED)){
oldbuf = buf_;
- if (!smpi_process()->replaying() && oldbuf != nullptr && size_!=0){
- if((smpi_privatize_global_variables != 0)
+ if (!process->replaying() && oldbuf != nullptr && size_!=0){
+ if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP)
&& (static_cast<char*>(buf_) >= smpi_start_data_exe)
&& (static_cast<char*>(buf_) < smpi_start_data_exe + smpi_size_data_exe )){
XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
- xbt_mutex_t mut=smpi_process_remote(receiver)->mailboxes_mutex();
+ xbt_mutex_t mut=process->mailboxes_mutex();
if (async_small_thresh != 0 || (flags_ & RMA) != 0)
xbt_mutex_acquire(mut);
if (!(async_small_thresh != 0 || (flags_ & RMA) !=0)) {
- mailbox = smpi_process_remote(receiver)->mailbox();
+ mailbox = process->mailbox();
} else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
- mailbox = smpi_process_remote(receiver)->mailbox();
+ mailbox = process->mailbox();
XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
smx_activity_t action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send,
static_cast<void*>(this));
if (action == nullptr) {
if ((flags_ & SSEND) == 0){
- mailbox = smpi_process_remote(receiver)->mailbox_small();
+ mailbox = process->mailbox_small();
XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
} else {
- mailbox = smpi_process_remote(receiver)->mailbox_small();
+ mailbox = process->mailbox_small();
XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast<void*>(this));
if (action == nullptr) {
XBT_DEBUG("No, we are first, send to large mailbox");
- mailbox = smpi_process_remote(receiver)->mailbox();
+ mailbox = process->mailbox();
}
}
} else {
XBT_DEBUG("Yes there was something for us in the large mailbox");
}
} else {
- mailbox = smpi_process_remote(receiver)->mailbox();
+ mailbox = process->mailbox();
XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_);
}
action_ = simcall_comm_isend(SIMIX_process_from_PID(src_+1), mailbox, size_, -1.0,
buf, real_size_, &match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
- !smpi_process()->replaying() ? smpi_comm_copy_data_callback
+ !process->replaying() ? smpi_comm_copy_data_callback
: &smpi_comm_null_copy_buffer_callback, this,
// detach if msg size < eager/rdv switch limit
detached_);
req->print_request("Finishing");
MPI_Datatype datatype = req->old_type_;
- if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(req->old_buf_))){
+// FIXME Handle the case of a partial shared malloc.
+ if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED))){// && (!smpi_is_shared(req->old_buf_))){
if (!smpi_process()->replaying()){
- if( smpi_privatize_global_variables != 0 && (static_cast<char*>(req->old_buf_) >= smpi_start_data_exe)
+ if( smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP && (static_cast<char*>(req->old_buf_) >= smpi_start_data_exe)
&& ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){
XBT_VERB("Privatization : We are unserializing to a zone in global memory Switch data segment ");
smpi_switch_data_segment(smpi_process()->index());
datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_);
xbt_free(req->buf_);
}else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate
- int n =req->real_size_/datatype->size();
- req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
+ if(datatype->size()!=0){
+ int n =req->real_size_/datatype->size();
+ req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
+ }
xbt_free(req->buf_);
}
}