From 2070401863db92e8198b0b8e5f29fbf0f7af2ff2 Mon Sep 17 00:00:00 2001 From: Tom Cornebize Date: Thu, 6 Apr 2017 13:55:38 +0200 Subject: [PATCH] Begin working on the communication optimization for partial shared malloc. --- include/smpi/smpi.h | 1 - include/smpi/smpi_shared_malloc.hpp | 16 +++++++++++ src/smpi/smpi_datatype.cpp | 3 ++ src/smpi/smpi_global.cpp | 43 +++++++++++++++++++++++++---- src/smpi/smpi_request.cpp | 6 ++-- src/smpi/smpi_shared.cpp | 40 +++++++++++++++++++-------- 6 files changed, 88 insertions(+), 21 deletions(-) create mode 100644 include/smpi/smpi_shared_malloc.hpp diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index 7f332247e5..b8a1475c52 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -872,7 +872,6 @@ XBT_PUBLIC(void) smpi_trace_set_call_location__(const char *file, int* line); #define SMPI_SAMPLE_DELAY(duration) for(smpi_execute(duration); 0; ) #define SMPI_SAMPLE_FLOPS(flops) for(smpi_execute_flops(flops); 0; ) -XBT_PUBLIC(int) smpi_is_shared(void *buf); XBT_PUBLIC(void *) smpi_shared_malloc(size_t size, const char *file, int line); #define SMPI_SHARED_MALLOC(size) smpi_shared_malloc(size, __FILE__, __LINE__) XBT_PUBLIC(void *) smpi_shared_malloc_global__(size_t size, const char *file, int line, int *shared_block_offsets, int nb_shared_blocks); diff --git a/include/smpi/smpi_shared_malloc.hpp b/include/smpi/smpi_shared_malloc.hpp new file mode 100644 index 0000000000..cf554aea70 --- /dev/null +++ b/include/smpi/smpi_shared_malloc.hpp @@ -0,0 +1,16 @@ +#ifndef SMPI_SHARED_HPP +#define SMPI_SHARED_HPP +#include +#include +#include + + +/* + * We cannot put this declaration in smpi.h, since we use C++ features. + */ + + +XBT_PUBLIC(int) smpi_is_shared(void* ptr, std::vector> &private_blocks); + + +#endif diff --git a/src/smpi/smpi_datatype.cpp b/src/smpi/smpi_datatype.cpp index 52ca497300..3b36cab2cd 100644 --- a/src/smpi/smpi_datatype.cpp +++ b/src/smpi/smpi_datatype.cpp @@ -271,11 +271,14 @@ int Datatype::copy(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype){ int count; +// FIXME Handle the case of a partial shared malloc. +#if 0 if(smpi_is_shared(sendbuf)){ XBT_DEBUG("Copy input buf %p is shared. Let's ignore it.", sendbuf); }else if(smpi_is_shared(recvbuf)){ XBT_DEBUG("Copy output buf %p is shared. Let's ignore it.", recvbuf); } +#endif if(smpi_privatize_global_variables){ smpi_switch_data_segment(smpi_process()->index()); diff --git a/src/smpi/smpi_global.cpp b/src/smpi/smpi_global.cpp index 0acc0d4d59..a343824724 100644 --- a/src/smpi/smpi_global.cpp +++ b/src/smpi/smpi_global.cpp @@ -7,6 +7,7 @@ #include "private.h" #include "private.hpp" #include "simgrid/s4u/Mailbox.hpp" +#include "smpi/smpi_shared_malloc.hpp" #include "simgrid/sg_config.h" #include "src/kernel/activity/SynchroComm.hpp" #include "src/mc/mc_record.h" @@ -103,17 +104,47 @@ void smpi_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, s smpi_comm_copy_data_callback = callback; } +std::vector> merge_private_blocks(std::vector> src, std::vector> dst) { + std::vector> result; + int i_src=0, i_dst=0; + while(i_src < src.size() && i_dst < dst.size()) { + std::pair block; + if(src[i_src].first < dst[i_dst].first) { + block = src[i_src]; + i_src ++; + } + else { + block = dst[i_dst]; + i_dst ++; + } + if(block.first <= result.back().second) { // overlapping with the last block inserted + result.back().second = std::max(result.back().second, block.second); + } + else { // not overlapping, we insert a new block + result.push_back(block); + } + } + for(; i_src < src.size(); i_src++) { + result.push_back(src[i_src]); + } + for(; i_dst < dst.size(); i_dst++) { + result.push_back(dst[i_dst]); + } + return result; +} + void smpi_comm_copy_buffer_callback(smx_activity_t synchro, void *buff, size_t buff_size) { - simgrid::kernel::activity::Comm *comm = dynamic_cast(synchro); - + int src_shared=0, dst_shared=0; + std::vector> src_private_blocks; + std::vector> dst_private_blocks; XBT_DEBUG("Copy the data over"); - if(smpi_is_shared(buff)){ + if(src_shared=smpi_is_shared(buff, src_private_blocks)) XBT_DEBUG("Sender %p is shared. Let's ignore it.", buff); - }else if(smpi_is_shared((char*)comm->dst_buff)){ - XBT_DEBUG("Receiver %p is shared. Let's ignore it.", (char*)comm->dst_buff); - }else{ + if(dst_shared=smpi_is_shared((char*)comm->dst_buff, src_private_blocks)) + XBT_DEBUG("Receiver %p is shared. Let's ignore it.", (char*)comm->dst_buff); + if(!src_shared && !dst_shared){ void* tmpbuff=buff; if((smpi_privatize_global_variables) && (static_cast(buff) >= smpi_start_data_exe) && (static_cast(buff) < smpi_start_data_exe + smpi_size_data_exe ) diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index fd91cf4eb2..b3988d883a 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -108,7 +108,8 @@ namespace smpi{ Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) { void *old_buf = nullptr; - if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(buf_))){ +// FIXME Handle the case of a partial shared malloc. + if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED))) { // && (!smpi_is_shared(buf_))){ // This part handles the problem of non-contiguous memory old_buf = buf; buf_ = count==0 ? nullptr : xbt_malloc(count*datatype->size()); @@ -758,7 +759,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) req->print_request("Finishing"); MPI_Datatype datatype = req->old_type_; - if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(req->old_buf_))){ +// FIXME Handle the case of a partial shared malloc. + if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED))){// && (!smpi_is_shared(req->old_buf_))){ if (!smpi_process()->replaying()){ if( smpi_privatize_global_variables != 0 && (static_cast(req->old_buf_) >= smpi_start_data_exe) diff --git a/src/smpi/smpi_shared.cpp b/src/smpi/smpi_shared.cpp index c5208ce31c..731f4ae29d 100644 --- a/src/smpi/smpi_shared.cpp +++ b/src/smpi/smpi_shared.cpp @@ -114,6 +114,7 @@ typedef std::unordered_map::value_type shar typedef struct { size_t size; + std::vector> private_blocks; shared_data_key_type* data; } shared_metadata_t; @@ -240,6 +241,9 @@ void *smpi_shared_malloc_global__(size_t size, const char *file, int line, int * xbt_assert(0 <= start_offset, "start_offset (%d) should be greater than 0", start_offset); xbt_assert(start_offset < stop_offset, "start_offset (%d) should be lower than stop offset (%d)", start_offset, stop_offset); xbt_assert(stop_offset <= size, "stop_offset (%d) should be lower than size (%lu)", stop_offset, size); + if(i_block < nb_shared_blocks-1) + xbt_assert(stop_offset < shared_block_offsets[2*i_block+2], + "stop_offset (%d) should be lower than its successor start offset (%d)", stop_offset, shared_block_offsets[2*i_block+2]); // fprintf(stderr, "shared block 0x%x - 0x%x\n", start_offset, stop_offset); int start_block_offset = ALIGN_UP(start_offset, smpi_shared_malloc_blocksize); int stop_block_offset = ALIGN_DOWN(stop_offset, smpi_shared_malloc_blocksize); @@ -281,16 +285,23 @@ void *smpi_shared_malloc_global__(size_t size, const char *file, int line, int * } } - if(nb_shared_blocks == 1 && shared_block_offsets[0] == 0 && shared_block_offsets[1] == size) { - shared_metadata_t newmeta; - //register metadata for memcpy avoidance - shared_data_key_type* data = (shared_data_key_type*)xbt_malloc(sizeof(shared_data_key_type)); - data->second.fd = -1; - data->second.count = 1; - newmeta.size = size; - newmeta.data = data; - allocs_metadata[mem] = newmeta; + shared_metadata_t newmeta; + //register metadata for memcpy avoidance + shared_data_key_type* data = (shared_data_key_type*)xbt_malloc(sizeof(shared_data_key_type)); + data->second.fd = -1; + data->second.count = 1; + newmeta.size = size; + newmeta.data = data; + if(shared_block_offsets[0] > 0) { + newmeta.private_blocks.push_back(std::make_pair(0, shared_block_offsets[0])); + } + for(int i_block = 0; i_block < nb_shared_blocks-1; i_block ++) { + newmeta.private_blocks.push_back(std::make_pair(shared_block_offsets[2*i_block+1], shared_block_offsets[2*i_block+2])); + } + if(shared_block_offsets[nb_shared_blocks-1] < size) { + newmeta.private_blocks.push_back(std::make_pair(shared_block_offsets[nb_shared_blocks-1], size)); } + allocs_metadata[mem] = newmeta; return mem; } @@ -326,18 +337,23 @@ void *smpi_shared_malloc(size_t size, const char *file, int line) { return mem; } -int smpi_is_shared(void* ptr){ +int smpi_is_shared(void* ptr, std::vector> &private_blocks){ + private_blocks.clear(); // being paranoid if (allocs_metadata.empty()) return 0; if ( smpi_cfg_shared_malloc == shmalloc_local || smpi_cfg_shared_malloc == shmalloc_global) { auto low = allocs_metadata.lower_bound(ptr); - if (low->first==ptr) + if (low->first==ptr) { + private_blocks = low->second.private_blocks; return 1; + } if (low == allocs_metadata.begin()) return 0; low --; - if (ptr < (char*)low->first + low->second.size) + if (ptr < (char*)low->first + low->second.size) { + private_blocks = low->second.private_blocks; return 1; + } return 0; } else { return 0; -- 2.20.1