From 5d67901dba3dfd8e75f708c329b8144287490077 Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Fri, 6 May 2016 16:38:13 +0200 Subject: [PATCH] split smx_synchro_t into a hierarchy of C++ classes - The change is still rather crude since the code of these objects' methods is still spread around in the code (search for dynamic_cast in this patch to find the resulting horrors). I wanted to keep the patch small to have a chance to finish it without revert. - This breaks MC (at least) because I dunno how to read an object in the remote process --- include/simgrid/forward.h | 7 + include/simgrid/s4u/async.hpp | 3 +- include/simgrid/simix.h | 3 - src/mc/mc_base.cpp | 23 +- src/msg/msg_gos.cpp | 14 +- src/msg/msg_private.h | 8 +- src/msg/msg_task.cpp | 4 +- src/simix/Synchro.cpp | 15 ++ src/simix/Synchro.h | 32 +++ src/simix/SynchroComm.hpp | 62 +++++ src/simix/SynchroExec.hpp | 21 ++ src/simix/SynchroIo.hpp | 22 ++ src/simix/SynchroRaw.hpp | 22 ++ src/simix/SynchroSleep.hpp | 22 ++ src/simix/popping.cpp | 52 ++-- src/simix/smx_global.cpp | 69 ++--- src/simix/smx_host.cpp | 141 +++++----- src/simix/smx_io.cpp | 118 +++------ src/simix/smx_network.cpp | 442 ++++++++++++++++--------------- src/simix/smx_private.h | 99 ------- src/simix/smx_process.cpp | 232 ++++++++-------- src/simix/smx_synchro.cpp | 34 ++- src/smpi/smpi_base.cpp | 10 +- src/smpi/smpi_global.cpp | 21 +- tools/cmake/DefinePackages.cmake | 9 +- 25 files changed, 782 insertions(+), 703 deletions(-) create mode 100644 src/simix/Synchro.cpp create mode 100644 src/simix/Synchro.h create mode 100644 src/simix/SynchroComm.hpp create mode 100644 src/simix/SynchroExec.hpp create mode 100644 src/simix/SynchroIo.hpp create mode 100644 src/simix/SynchroRaw.hpp create mode 100644 src/simix/SynchroSleep.hpp diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 549c031112..437d7944bd 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -15,6 +15,9 @@ namespace simgrid { class Host; class Mailbox; } + namespace simix { + class Synchro; + } namespace surf { class Resource; class Cpu; @@ -30,6 +33,7 @@ namespace simgrid { typedef simgrid::s4u::As simgrid_As; typedef simgrid::s4u::Host simgrid_Host; typedef simgrid::s4u::Mailbox simgrid_Mailbox; +typedef simgrid::simix::Synchro simix_Synchro; typedef simgrid::surf::Cpu surf_Cpu; typedef simgrid::surf::NetCard surf_NetCard; typedef simgrid::surf::Link Link; @@ -41,6 +45,7 @@ typedef simgrid::trace_mgr::trace tmgr_Trace; typedef struct simgrid_As simgrid_As; typedef struct simgrid_Host simgrid_Host; typedef struct simgrid_Mailbox simgrid_Mailbox; +typedef struct simix_Synchro simix_Synchro; typedef struct surf_Cpu surf_Cpu; typedef struct surf_NetCard surf_NetCard; typedef struct surf_Resource surf_Resource; @@ -52,6 +57,8 @@ typedef simgrid_As *AS_t; typedef simgrid_Host* sg_host_t; typedef simgrid_Mailbox* sg_mbox_t; +typedef simix_Synchro *smx_synchro_t; + typedef surf_Cpu *surf_cpu_t; typedef surf_NetCard *sg_netcard_t; typedef surf_Resource *sg_resource_t; diff --git a/include/simgrid/s4u/async.hpp b/include/simgrid/s4u/async.hpp index 69981aa051..9ef1778171 100644 --- a/include/simgrid/s4u/async.hpp +++ b/include/simgrid/s4u/async.hpp @@ -12,6 +12,7 @@ #include #include +#include "simgrid/forward.h" SG_BEGIN_DECL(); typedef enum { @@ -33,7 +34,7 @@ protected: virtual ~Async(); private: - struct s_smx_synchro *inferior_ = NULL; + simgrid::simix::Synchro *inferior_ = NULL; private: e_s4u_async_state_t state_ = inited; diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index e0ec0ed653..85503b3709 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -91,9 +91,6 @@ typedef struct s_smx_file *smx_file_t; typedef xbt_dictelm_t smx_storage_t; typedef struct s_smx_storage_priv *smx_storage_priv_t; -/********************************** Synchro *************************************/ -typedef struct s_smx_synchro *smx_synchro_t; /* FIXME: replace by specialized synchro handlers */ - /* ****************************** Process *********************************** */ /** @brief Process datatype @ingroup simix_process_management diff --git a/src/mc/mc_base.cpp b/src/mc/mc_base.cpp index b188c03cad..56d1ebdf2e 100644 --- a/src/mc/mc_base.cpp +++ b/src/mc/mc_base.cpp @@ -20,6 +20,9 @@ #include "mc/mc.h" #include "src/mc/mc_protocol.h" +#include "src/simix/Synchro.h" +#include "src/simix/SynchroComm.hpp" + #if HAVE_MC #include "src/mc/mc_request.h" #include "src/mc/Process.hpp" @@ -70,9 +73,8 @@ void wait_for_requests(void) bool request_is_enabled(smx_simcall_t req) { unsigned int index = 0; - smx_synchro_t act = 0; #if HAVE_MC - s_smx_synchro_t temp_synchro; + simgrid::simix::Synchro temp_synchro; #endif switch (req->call) { @@ -80,8 +82,9 @@ bool request_is_enabled(smx_simcall_t req) return false; case SIMCALL_COMM_WAIT: + { /* FIXME: check also that src and dst processes are not suspended */ - act = simcall_comm_wait__get__comm(req); + simgrid::simix::Comm *act = static_cast(simcall_comm_wait__get__comm(req)); #if HAVE_MC // Fetch from MCed memory: @@ -98,13 +101,15 @@ bool request_is_enabled(smx_simcall_t req) return true; } /* On the other hand if it hasn't a timeout, check if the comm is ready.*/ - else if (act->comm.detached && act->comm.src_proc == nullptr - && act->comm.type == SIMIX_COMM_READY) - return (act->comm.dst_proc != nullptr); - return (act->comm.src_proc && act->comm.dst_proc); + else if (act->detached && act->src_proc == nullptr + && act->type == SIMIX_COMM_READY) + return (act->dst_proc != nullptr); + return (act->src_proc && act->dst_proc); + } case SIMCALL_COMM_WAITANY: { xbt_dynar_t comms; + simgrid::simix::Comm *act = static_cast(simcall_comm_wait__get__comm(req)); #if HAVE_MC s_xbt_dynar_t comms_buffer; @@ -138,8 +143,8 @@ bool request_is_enabled(smx_simcall_t req) } else #endif - act = xbt_dynar_get_as(comms, index, smx_synchro_t); - if (act->comm.src_proc && act->comm.dst_proc) + act = xbt_dynar_get_as(comms, index, simgrid::simix::Comm*); + if (act->src_proc && act->dst_proc) return true; } return false; diff --git a/src/msg/msg_gos.cpp b/src/msg/msg_gos.cpp index bdc1966599..ec226a7b94 100644 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@ -67,9 +67,10 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) simdata->isused = (void*)1; if (simdata->host_nb > 0) { - simdata->compute = simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list, + simdata->compute = static_cast( + simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list, simdata->flops_parallel_amount, simdata->bytes_parallel_amount, - 1.0, -1.0); + 1.0, -1.0)); XBT_DEBUG("Parallel execution action created: %p", simdata->compute); } else { unsigned long affinity_mask = @@ -78,8 +79,9 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) XBT_DEBUG("execute %s@%s with affinity(0x%04lx)", MSG_task_get_name(task), MSG_host_get_name(p_simdata->m_host), affinity_mask); - simdata->compute = simcall_execution_start(task->name, simdata->flops_amount, simdata->priority, - simdata->bound, affinity_mask); + simdata->compute = static_cast( + simcall_execution_start(task->name, simdata->flops_amount, simdata->priority, + simdata->bound, affinity_mask)); } simcall_set_category(simdata->compute, task->category); p_simdata->waiting_action = simdata->compute; @@ -321,7 +323,7 @@ static inline msg_comm_t MSG_task_isend_internal(msg_task_t task, const char *al /* Send it by calling SIMIX network layer */ smx_synchro_t act = simcall_comm_isend(SIMIX_process_self(), mailbox, t_simdata->bytes_amount, t_simdata->rate, task, sizeof(void *), match_fun, cleanup, NULL, match_data,detached); - t_simdata->comm = act; /* FIXME: is the field t_simdata->comm still useful? */ + t_simdata->comm = static_cast(act); /* FIXME: is the field t_simdata->comm still useful? */ msg_comm_t comm; if (detached) { @@ -837,7 +839,7 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl t_simdata->rate, task, sizeof(void *), NULL, NULL, NULL, task, 0); if (TRACE_is_enabled()) simcall_set_category(comm, task->category); - t_simdata->comm = comm; + t_simdata->comm = static_cast(comm); simcall_comm_wait(comm, timeout); } diff --git a/src/msg/msg_private.h b/src/msg/msg_private.h index 3edff2f8d9..f39cdd0d62 100644 --- a/src/msg/msg_private.h +++ b/src/msg/msg_private.h @@ -17,6 +17,10 @@ #include "xbt/dict.h" #include "xbt/config.h" #include "src/instr/instr_private.h" + +#include "src/simix/SynchroExec.hpp" +#include "src/simix/SynchroComm.hpp" + SG_BEGIN_DECL() /**************** datatypes **********************************/ @@ -38,8 +42,8 @@ SG_BEGIN_DECL() ptr = _xbt_ex_t; } while(0) typedef struct simdata_task { - smx_synchro_t compute; /* SIMIX modeling of computation */ - smx_synchro_t comm; /* SIMIX modeling of communication */ + simgrid::simix::Exec *compute; /* SIMIX modeling of computation */ + simgrid::simix::Comm *comm; /* SIMIX modeling of communication */ double bytes_amount; /* Data size */ double flops_amount; /* Computation size */ msg_process_t sender; diff --git a/src/msg/msg_task.cpp b/src/msg/msg_task.cpp index 95382ab787..783715c3aa 100644 --- a/src/msg/msg_task.cpp +++ b/src/msg/msg_task.cpp @@ -390,8 +390,8 @@ void MSG_task_set_affinity(msg_task_t task, msg_host_t host, unsigned long mask) } { - smx_synchro_t compute = task->simdata->compute; - msg_host_t host_now = compute->execution.host; // simix_private.h is necessary + simgrid::simix::Exec *compute = task->simdata->compute; + msg_host_t host_now = compute->host; // simix_private.h is necessary if (host_now != host) { /* task is not yet executed on this host */ XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host), diff --git a/src/simix/Synchro.cpp b/src/simix/Synchro.cpp new file mode 100644 index 0000000000..c10c46c616 --- /dev/null +++ b/src/simix/Synchro.cpp @@ -0,0 +1,15 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "src/simix/Synchro.h" + +simgrid::simix::Synchro::Synchro() { + simcalls = xbt_fifo_new(); +} + +simgrid::simix::Synchro::~Synchro() { + xbt_fifo_free(simcalls); + xbt_free(name); +} diff --git a/src/simix/Synchro.h b/src/simix/Synchro.h new file mode 100644 index 0000000000..b55a2b9a4f --- /dev/null +++ b/src/simix/Synchro.h @@ -0,0 +1,32 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_HPP +#define _SIMIX_SYNCHRO_HPP +#include "simgrid/forward.h" + +#ifdef __cplusplus + +#include + +namespace simgrid { +namespace simix { + + class Synchro { + public: + Synchro(); + virtual ~Synchro(); + e_smx_state_t state; /* State of the synchro */ + char *name; /* synchro name if any */ + xbt_fifo_t simcalls; /* List of simcalls waiting for this synchro */ + char *category = nullptr; /* For instrumentation */ + }; +}} // namespace simgrid::simix +#else /* not C++ */ + + +#endif + +#endif diff --git a/src/simix/SynchroComm.hpp b/src/simix/SynchroComm.hpp new file mode 100644 index 0000000000..03c71d70e8 --- /dev/null +++ b/src/simix/SynchroComm.hpp @@ -0,0 +1,62 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_COMM_HPP +#define _SIMIX_SYNCHRO_COMM_HPP + +#include "src/simix/Synchro.h" + +typedef enum { + SIMIX_COMM_SEND, + SIMIX_COMM_RECEIVE, + SIMIX_COMM_READY, + SIMIX_COMM_DONE +} e_smx_comm_type_t; + +namespace simgrid { +namespace simix { + + XBT_PUBLIC_CLASS Comm : public Synchro { + public: + e_smx_comm_type_t type; /* Type of the communication (SIMIX_COMM_SEND or SIMIX_COMM_RECEIVE) */ + smx_mailbox_t mbox; /* Rendez-vous where the comm is queued */ + +#if HAVE_MC + smx_mailbox_t mbox_cpy; /* Copy of the rendez-vous where the comm is queued, MC needs it for DPOR + (comm.mbox set to NULL when the communication is removed from the mailbox + (used as garbage collector)) */ +#endif + int refcount; /* Number of processes involved in the cond */ + int detached; /* If detached or not */ + + void (*clean_fun)(void*); /* Function to clean the detached src_buf if something goes wrong */ + int (*match_fun)(void*,void*,smx_synchro_t); /* Filter function used by the other side. It is used when + looking if a given communication matches my needs. For that, myself must match the + expectations of the other side, too. See */ + void (*copy_data_fun) (smx_synchro_t, void*, size_t); + + /* Surf action data */ + surf_action_t surf_comm; /* The Surf communication action encapsulated */ + surf_action_t src_timeout; /* Surf's actions to instrument the timeouts */ + surf_action_t dst_timeout; /* Surf's actions to instrument the timeouts */ + smx_process_t src_proc; + smx_process_t dst_proc; + double rate; + double task_size; + + /* Data to be transfered */ + void *src_buff; + void *dst_buff; + size_t src_buff_size; + size_t *dst_buff_size; + unsigned copied:1; /* whether the data were already copied */ + + void* src_data; /* User data associated to communication */ + void* dst_data; + }; + +}} // namespace simgrid::simix + +#endif diff --git a/src/simix/SynchroExec.hpp b/src/simix/SynchroExec.hpp new file mode 100644 index 0000000000..27b62fdb41 --- /dev/null +++ b/src/simix/SynchroExec.hpp @@ -0,0 +1,21 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_EXEC_HPP +#define _SIMIX_SYNCHRO_EXEC_HPP + +#include "src/simix/Synchro.h" + +namespace simgrid { +namespace simix { + + XBT_PUBLIC_CLASS Exec : public Synchro { + public: + sg_host_t host; /* The host where the execution takes place */ + surf_action_t surf_exec; /* The Surf execution action encapsulated */ + }; + +}} // namespace simgrid::simix +#endif diff --git a/src/simix/SynchroIo.hpp b/src/simix/SynchroIo.hpp new file mode 100644 index 0000000000..a3039b75ef --- /dev/null +++ b/src/simix/SynchroIo.hpp @@ -0,0 +1,22 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_IO_HPP +#define _SIMIX_SYNCHRO_IO_HPP + +#include "src/simix/Synchro.h" + +namespace simgrid { +namespace simix { + + XBT_PUBLIC_CLASS Io : public Synchro { + public: + sg_host_t host; + surf_action_t surf_io; + }; + +}} // namespace simgrid::simix + +#endif diff --git a/src/simix/SynchroRaw.hpp b/src/simix/SynchroRaw.hpp new file mode 100644 index 0000000000..3d8a28e473 --- /dev/null +++ b/src/simix/SynchroRaw.hpp @@ -0,0 +1,22 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_RAW_HPP +#define _SIMIX_SYNCHRO_RAW_HPP + +#include "src/simix/Synchro.h" + +namespace simgrid { +namespace simix { + + /** Used to implement mutexes, semaphores and conditions */ + XBT_PUBLIC_CLASS Raw : public Synchro { + public: + surf_action_t sleep; + }; + +}} // namespace simgrid::simix + +#endif diff --git a/src/simix/SynchroSleep.hpp b/src/simix/SynchroSleep.hpp new file mode 100644 index 0000000000..8e4b9a156f --- /dev/null +++ b/src/simix/SynchroSleep.hpp @@ -0,0 +1,22 @@ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _SIMIX_SYNCHRO_SLEEP_HPP +#define _SIMIX_SYNCHRO_SLEEP_HPP + +#include "src/simix/Synchro.h" + +namespace simgrid { +namespace simix { + + XBT_PUBLIC_CLASS Sleep : public Synchro { + public: + sg_host_t host; /* The host that is sleeping */ + surf_action_t surf_sleep; /* The Surf sleeping action encapsulated */ + }; + +}} // namespace simgrid::simix + +#endif diff --git a/src/simix/popping.cpp b/src/simix/popping.cpp index 15ad87daf6..e4a621b274 100644 --- a/src/simix/popping.cpp +++ b/src/simix/popping.cpp @@ -11,6 +11,12 @@ #include "src/mc/mc_private.h" #endif +#include "src/simix/SynchroExec.hpp" +#include "src/simix/SynchroComm.hpp" +#include "src/simix/SynchroSleep.hpp" +#include "src/simix/SynchroRaw.hpp" +#include "src/simix/SynchroIo.hpp" + XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_popping, simix, "Popping part of SIMIX (transmuting from user request into kernel handlers)"); @@ -31,32 +37,34 @@ void SIMIX_simcall_answer(smx_simcall_t simcall) void SIMIX_simcall_exit(smx_synchro_t synchro) { - switch (synchro->type) { - - case SIMIX_SYNC_EXECUTE: - case SIMIX_SYNC_PARALLEL_EXECUTE: - SIMIX_post_host_execute(synchro); - break; - - case SIMIX_SYNC_COMMUNICATE: - SIMIX_post_comm(synchro); - break; + simgrid::simix::Exec *exec = dynamic_cast(synchro); + if (exec != nullptr) { + SIMIX_post_host_execute(synchro); + return; + } - case SIMIX_SYNC_SLEEP: - SIMIX_post_process_sleep(synchro); - break; + simgrid::simix::Comm *comm = dynamic_cast(synchro); + if (comm != nullptr) { + SIMIX_post_comm(synchro); + return; + } - case SIMIX_SYNC_JOIN: - SIMIX_post_process_sleep(synchro); - break; + simgrid::simix::Sleep *sleep = dynamic_cast(synchro); + if (sleep != nullptr) { + SIMIX_post_process_sleep(synchro); + return; + } - case SIMIX_SYNC_SYNCHRO: - SIMIX_post_synchro(synchro); - break; + simgrid::simix::Raw *raw = dynamic_cast(synchro); + if (raw != nullptr) { + SIMIX_post_synchro(synchro); + return; + } - case SIMIX_SYNC_IO: - SIMIX_post_io(synchro); - break; + simgrid::simix::Io *io = dynamic_cast(synchro); + if (io != nullptr) { + SIMIX_post_io(synchro); + return; } } diff --git a/src/simix/smx_global.cpp b/src/simix/smx_global.cpp index 856ccc8c49..d693ac8812 100644 --- a/src/simix/smx_global.cpp +++ b/src/simix/smx_global.cpp @@ -19,6 +19,12 @@ #include "src/mc/mc_replay.h" #include "simgrid/sg_config.h" +#include "src/simix/SynchroExec.hpp" +#include "src/simix/SynchroComm.hpp" +#include "src/simix/SynchroSleep.hpp" +#include "src/simix/SynchroIo.hpp" +#include "src/simix/SynchroRaw.hpp" + #if HAVE_MC #include "src/mc/mc_private.h" #include "src/mc/mc_protocol.h" @@ -49,11 +55,6 @@ typedef struct s_smx_timer { void (*SMPI_switch_data_segment)(int) = NULL; -static void* SIMIX_synchro_mallocator_new_f(void); -static void SIMIX_synchro_mallocator_free_f(void* synchro); -static void SIMIX_synchro_mallocator_reset_f(void* synchro); - - int _sg_do_verbose_exit = 1; static void inthandler(int ignored) { @@ -204,9 +205,6 @@ void SIMIX_global_init(int *argc, char **argv) simix_global->create_process_function = SIMIX_process_create; simix_global->kill_process_function = kill_process; simix_global->cleanup_process_function = SIMIX_process_cleanup; - simix_global->synchro_mallocator = xbt_mallocator_new(65536, - SIMIX_synchro_mallocator_new_f, SIMIX_synchro_mallocator_free_f, - SIMIX_synchro_mallocator_reset_f); simix_global->mutex = xbt_os_mutex_init(); surf_init(argc, argv); /* Initialize SURF structures */ @@ -309,7 +307,6 @@ void SIMIX_clean(void) surf_exit(); - xbt_mallocator_free(simix_global->synchro_mallocator); xbt_free(simix_global); simix_global = NULL; @@ -607,36 +604,34 @@ void SIMIX_display_process_status(void) if (process->waiting_synchro) { const char* synchro_description = "unknown"; - switch (process->waiting_synchro->type) { - case SIMIX_SYNC_EXECUTE: + if (dynamic_cast(process->waiting_synchro) != nullptr) synchro_description = "execution"; - break; - - case SIMIX_SYNC_PARALLEL_EXECUTE: - synchro_description = "parallel execution"; - break; - case SIMIX_SYNC_COMMUNICATE: + if (dynamic_cast(process->waiting_synchro) != nullptr) synchro_description = "communication"; - break; - case SIMIX_SYNC_SLEEP: + if (dynamic_cast(process->waiting_synchro) != nullptr) synchro_description = "sleeping"; + + if (dynamic_cast(process->waiting_synchro) != nullptr) + synchro_description = "synchronization"; + + if (dynamic_cast(process->waiting_synchro) != nullptr) + synchro_description = "I/O"; + + + /* + switch (process->waiting_synchro->type) { + case SIMIX_SYNC_PARALLEL_EXECUTE: + synchro_description = "parallel execution"; break; case SIMIX_SYNC_JOIN: synchro_description = "joining"; break; +*/ - case SIMIX_SYNC_SYNCHRO: - synchro_description = "synchronization"; - break; - - case SIMIX_SYNC_IO: - synchro_description = "I/O"; - break; - } XBT_INFO("Process %lu (%s@%s): waiting for %s synchro %p (%s) in state %d to finish", process->pid, process->name, sg_host_get_name(process->host), synchro_description, process->waiting_synchro, @@ -648,26 +643,6 @@ void SIMIX_display_process_status(void) } } -static void* SIMIX_synchro_mallocator_new_f(void) { - smx_synchro_t synchro = xbt_new(s_smx_synchro_t, 1); - synchro->simcalls = xbt_fifo_new(); - return synchro; -} - -static void SIMIX_synchro_mallocator_free_f(void* synchro) { - xbt_fifo_free(((smx_synchro_t) synchro)->simcalls); - xbt_free(synchro); -} - -static void SIMIX_synchro_mallocator_reset_f(void* synchro) { - - // we also recycle the simcall list - xbt_fifo_t fifo = ((smx_synchro_t) synchro)->simcalls; - xbt_fifo_reset(fifo); - memset(synchro, 0, sizeof(s_smx_synchro_t)); - ((smx_synchro_t) synchro)->simcalls = fifo; -} - xbt_dict_t simcall_HANDLER_asr_get_properties(smx_simcall_t simcall, const char *name){ return SIMIX_asr_get_properties(name); } diff --git a/src/simix/smx_host.cpp b/src/simix/smx_host.cpp index c3e5db0bc4..800a511cc1 100644 --- a/src/simix/smx_host.cpp +++ b/src/simix/smx_host.cpp @@ -11,8 +11,10 @@ #include "src/surf/virtual_machine.hpp" #include "src/surf/HostImpl.hpp" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_host, simix, - "SIMIX hosts"); +#include "src/simix/SynchroExec.hpp" +#include "src/simix/SynchroComm.hpp" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_host, simix, "SIMIX hosts"); static void SIMIX_execution_finish(smx_synchro_t synchro); @@ -162,8 +164,7 @@ const char* SIMIX_host_self_get_name(void) void _SIMIX_host_free_process_arg(void *data) { smx_process_arg_t arg = *(smx_process_arg_t*)data; - int i; - for (i = 0; i < arg->argc; i++) + for (int i = 0; i < arg->argc; i++) xbt_free(arg->argv[i]); xbt_free(arg->argv); xbt_free(arg->name); @@ -266,35 +267,32 @@ smx_synchro_t SIMIX_execution_start(smx_process_t issuer, const char *name, double flops_amount, double priority, double bound, unsigned long affinity_mask){ /* alloc structures and initialize */ - smx_synchro_t synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_EXECUTE; - synchro->name = xbt_strdup(name); - synchro->state = SIMIX_RUNNING; - synchro->execution.host = issuer->host; - synchro->category = NULL; + simgrid::simix::Exec *exec = new simgrid::simix::Exec(); + exec->name = xbt_strdup(name); + exec->state = SIMIX_RUNNING; + exec->host = issuer->host; /* set surf's action */ if (!MC_is_active() && !MC_record_replay_is_active()) { - synchro->execution.surf_exec = issuer->host->pimpl_cpu->execution_start(flops_amount); - synchro->execution.surf_exec->setData(synchro); - synchro->execution.surf_exec->setPriority(priority); + exec->surf_exec = issuer->host->pimpl_cpu->execution_start(flops_amount); + exec->surf_exec->setData(exec); + exec->surf_exec->setPriority(priority); if (bound != 0) - static_cast(synchro->execution.surf_exec) - ->setBound(bound); + static_cast(exec->surf_exec)->setBound(bound); if (affinity_mask != 0) { /* just a double check to confirm that this host is the host where this task is running. */ - xbt_assert(synchro->execution.host == issuer->host); - static_cast(synchro->execution.surf_exec) + xbt_assert(exec->host == issuer->host); + static_cast(exec->surf_exec) ->setAffinity(issuer->host->pimpl_cpu, affinity_mask); } } - XBT_DEBUG("Create execute synchro %p: %s", synchro, synchro->name); + XBT_DEBUG("Create execute synchro %p: %s", exec, exec->name); - return synchro; + return exec; } smx_synchro_t SIMIX_execution_parallel_start(const char *name, @@ -302,16 +300,14 @@ smx_synchro_t SIMIX_execution_parallel_start(const char *name, double *flops_amount, double *bytes_amount, double amount, double rate){ - sg_host_t*host_list_cpy = NULL; + sg_host_t *host_list_cpy = NULL; int i; /* alloc structures and initialize */ - smx_synchro_t synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_PARALLEL_EXECUTE; - synchro->name = xbt_strdup(name); - synchro->state = SIMIX_RUNNING; - synchro->execution.host = NULL; /* FIXME: do we need the list of hosts? */ - synchro->category = NULL; + simgrid::simix::Exec *exec = new simgrid::simix::Exec(); + exec->name = xbt_strdup(name); + exec->state = SIMIX_RUNNING; + exec->host = nullptr; /* FIXME: do we need the list of hosts? */ /* set surf's synchro */ host_list_cpy = xbt_new0(sg_host_t, host_nb); @@ -328,43 +324,42 @@ smx_synchro_t SIMIX_execution_parallel_start(const char *name, /* set surf's synchro */ if (!MC_is_active() && !MC_record_replay_is_active()) { - synchro->execution.surf_exec = - surf_host_model->executeParallelTask( - host_nb, host_list_cpy, flops_amount, bytes_amount, rate); - - synchro->execution.surf_exec->setData(synchro); + exec->surf_exec = surf_host_model->executeParallelTask(host_nb, host_list_cpy, flops_amount, bytes_amount, rate); + exec->surf_exec->setData(exec); } - XBT_DEBUG("Create parallel execute synchro %p", synchro); + XBT_DEBUG("Create parallel execute synchro %p", exec); - return synchro; + return exec; } void SIMIX_execution_destroy(smx_synchro_t synchro) { XBT_DEBUG("Destroy synchro %p", synchro); + simgrid::simix::Exec *exec = static_cast(synchro); - if (synchro->execution.surf_exec) { - synchro->execution.surf_exec->unref(); - synchro->execution.surf_exec = NULL; + if (exec->surf_exec) { + exec->surf_exec->unref(); + exec->surf_exec = NULL; } - xbt_free(synchro->name); - xbt_mallocator_release(simix_global->synchro_mallocator, synchro); + delete exec; } void SIMIX_execution_cancel(smx_synchro_t synchro) { XBT_DEBUG("Cancel synchro %p", synchro); + simgrid::simix::Exec *exec = static_cast(synchro); - if (synchro->execution.surf_exec) - synchro->execution.surf_exec->cancel(); + if (exec->surf_exec) + exec->surf_exec->cancel(); } double SIMIX_execution_get_remains(smx_synchro_t synchro) { double result = 0.0; + simgrid::simix::Exec *exec = static_cast(synchro); if (synchro->state == SIMIX_RUNNING) - result = synchro->execution.surf_exec->getRemains(); + result = exec->surf_exec->getRemains(); return result; } @@ -376,24 +371,25 @@ e_smx_state_t SIMIX_execution_get_state(smx_synchro_t synchro) void SIMIX_execution_set_priority(smx_synchro_t synchro, double priority) { - if(synchro->execution.surf_exec) - synchro->execution.surf_exec->setPriority(priority); + simgrid::simix::Exec *exec = static_cast(synchro); + if(exec->surf_exec) + exec->surf_exec->setPriority(priority); } void SIMIX_execution_set_bound(smx_synchro_t synchro, double bound) { - if(synchro->execution.surf_exec) - static_cast(synchro->execution.surf_exec)->setBound(bound); + simgrid::simix::Exec *exec = static_cast(synchro); + if(exec->surf_exec) + static_cast(exec->surf_exec)->setBound(bound); } void SIMIX_execution_set_affinity(smx_synchro_t synchro, sg_host_t host, unsigned long mask) { - xbt_assert(synchro->type == SIMIX_SYNC_EXECUTE); - - if (synchro->execution.surf_exec) { + simgrid::simix::Exec *exec = static_cast(synchro); + if(exec->surf_exec) { /* just a double check to confirm that this host is the host where this task is running. */ - xbt_assert(synchro->execution.host == host); - static_cast(synchro->execution.surf_exec)->setAffinity(host->pimpl_cpu, mask); + xbt_assert(exec->host == host); + static_cast(exec->surf_exec)->setAffinity(host->pimpl_cpu, mask); } } @@ -420,14 +416,16 @@ void simcall_HANDLER_execution_wait(smx_simcall_t simcall, smx_synchro_t synchro void SIMIX_execution_suspend(smx_synchro_t synchro) { - if(synchro->execution.surf_exec) - synchro->execution.surf_exec->suspend(); + simgrid::simix::Exec *exec = static_cast(synchro); + if(exec->surf_exec) + exec->surf_exec->suspend(); } void SIMIX_execution_resume(smx_synchro_t synchro) { - if(synchro->execution.surf_exec) - synchro->execution.surf_exec->resume(); + simgrid::simix::Exec *exec = static_cast(synchro); + if(exec->surf_exec) + exec->surf_exec->resume(); } void SIMIX_execution_finish(smx_synchro_t synchro) @@ -476,23 +474,23 @@ void SIMIX_execution_finish(smx_synchro_t synchro) void SIMIX_post_host_execute(smx_synchro_t synchro) { - if (synchro->type == SIMIX_SYNC_EXECUTE && /* FIMXE: handle resource failure - * for parallel tasks too */ - synchro->execution.host->isOff()) { - /* If the host running the synchro failed, notice it so that the asking + simgrid::simix::Exec *exec = dynamic_cast(synchro); + + if (exec != nullptr && exec->host && /* FIMXE: handle resource failure for parallel tasks too */ + exec->host->isOff()) { + /* If the host running the synchro failed, notice it. This way, the asking * process can be killed if it runs on that host itself */ synchro->state = SIMIX_FAILED; - } else if (synchro->execution.surf_exec->getState() == simgrid::surf::Action::State::failed) { - /* If the host running the synchro didn't fail, then the synchro was - * canceled */ + } else if (exec->surf_exec->getState() == simgrid::surf::Action::State::failed) { + /* If the host running the synchro didn't fail, then the synchro was canceled */ synchro->state = SIMIX_CANCELED; } else { synchro->state = SIMIX_DONE; } - if (synchro->execution.surf_exec) { - synchro->execution.surf_exec->unref(); - synchro->execution.surf_exec = NULL; + if (exec != nullptr && exec->surf_exec) { + exec->surf_exec->unref(); + exec->surf_exec = NULL; } /* If there are simcalls associated with the synchro, then answer them */ @@ -505,9 +503,16 @@ void SIMIX_post_host_execute(smx_synchro_t synchro) void SIMIX_set_category(smx_synchro_t synchro, const char *category) { if (synchro->state != SIMIX_RUNNING) return; - if (synchro->type == SIMIX_SYNC_EXECUTE){ - synchro->execution.surf_exec->setCategory(category); - }else if (synchro->type == SIMIX_SYNC_COMMUNICATE){ - synchro->comm.surf_comm->setCategory(category); + + simgrid::simix::Exec *exec = dynamic_cast(synchro); + if (exec != nullptr) { + exec->surf_exec->setCategory(category); + return; + } + + simgrid::simix::Comm *comm = dynamic_cast(synchro); + if (comm != nullptr) { + comm->surf_comm->setCategory(category); + return; } } diff --git a/src/simix/smx_io.cpp b/src/simix/smx_io.cpp index 96224c608e..755f531da5 100644 --- a/src/simix/smx_io.cpp +++ b/src/simix/smx_io.cpp @@ -11,8 +11,9 @@ #include "xbt/dict.h" #include "mc/mc.h" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, - "Logging specific to SIMIX (io)"); +#include "src/simix/SynchroIo.hpp" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)"); /** @@ -59,23 +60,17 @@ void simcall_HANDLER_file_read(smx_simcall_t simcall, smx_file_t fd, sg_size_t s smx_synchro_t SIMIX_file_read(smx_file_t fd, sg_size_t size, sg_host_t host) { - smx_synchro_t synchro; - /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); - synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_IO; - synchro->name = NULL; - synchro->category = NULL; - synchro->io.host = host; - synchro->io.surf_io = surf_host_read(host, fd->surf_file, size); + simgrid::simix::Io *synchro = new simgrid::simix::Io(); + synchro->name = NULL; + synchro->host = host; + synchro->surf_io = surf_host_read(host, fd->surf_file, size); - synchro->io.surf_io->setData(synchro); + synchro->surf_io->setData(synchro); XBT_DEBUG("Create io synchro %p", synchro); return synchro; @@ -91,23 +86,14 @@ void simcall_HANDLER_file_write(smx_simcall_t simcall, smx_file_t fd, sg_size_t smx_synchro_t SIMIX_file_write(smx_file_t fd, sg_size_t size, sg_host_t host) { - smx_synchro_t synchro; + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); - /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } - - synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_IO; + simgrid::simix::Io *synchro = new simgrid::simix::Io(); synchro->name = NULL; - synchro->category = NULL; - - synchro->io.host = host; - synchro->io.surf_io = surf_host_write(host, fd->surf_file, size); - - synchro->io.surf_io->setData(synchro); + synchro->host = host; + synchro->surf_io = surf_host_write(host, fd->surf_file, size); + synchro->surf_io->setData(synchro); XBT_DEBUG("Create io synchro %p", synchro); return synchro; @@ -123,23 +109,14 @@ void simcall_HANDLER_file_open(smx_simcall_t simcall, const char* fullpath, sg_h smx_synchro_t SIMIX_file_open(const char* fullpath, sg_host_t host) { - smx_synchro_t synchro; - - /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); - synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_IO; + simgrid::simix::Io *synchro = new simgrid::simix::Io(); synchro->name = NULL; - synchro->category = NULL; - - synchro->io.host = host; - synchro->io.surf_io = surf_host_open(host, fullpath); - - synchro->io.surf_io->setData(synchro); + synchro->host = host; + synchro->surf_io = surf_host_open(host, fullpath); + synchro->surf_io->setData(synchro); XBT_DEBUG("Create io synchro %p", synchro); return synchro; @@ -155,23 +132,14 @@ void simcall_HANDLER_file_close(smx_simcall_t simcall, smx_file_t fd, sg_host_t smx_synchro_t SIMIX_file_close(smx_file_t fd, sg_host_t host) { - smx_synchro_t synchro; - - /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); - synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_IO; + simgrid::simix::Io *synchro = new simgrid::simix::Io(); synchro->name = NULL; - synchro->category = NULL; - - synchro->io.host = host; - synchro->io.surf_io = surf_host_close(host, fd->surf_file); - - synchro->io.surf_io->setData(synchro); + synchro->host = host; + synchro->surf_io = surf_host_close(host, fd->surf_file); + synchro->surf_io->setData(synchro); XBT_DEBUG("Create io synchro %p", synchro); return synchro; @@ -181,11 +149,8 @@ smx_synchro_t SIMIX_file_close(smx_file_t fd, sg_host_t host) //SIMIX FILE UNLINK int SIMIX_file_unlink(smx_file_t fd, sg_host_t host) { - /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); int res = surf_host_unlink(host, fd->surf_file); xbt_free(fd); @@ -249,7 +214,6 @@ int SIMIX_file_move(smx_process_t process, smx_file_t file, const char* fullpath } sg_size_t SIMIX_storage_get_size(smx_storage_t storage){ - xbt_assert((storage != NULL), "Invalid parameters (simix storage is NULL)"); return surf_storage_get_size(storage); } @@ -278,17 +242,14 @@ xbt_dict_t SIMIX_storage_get_properties(smx_storage_t storage){ } const char* SIMIX_storage_get_name(smx_storage_t storage){ - xbt_assert((storage != NULL), "Invalid parameters"); return sg_storage_name(storage); } xbt_dict_t SIMIX_storage_get_content(smx_storage_t storage){ - xbt_assert((storage != NULL), "Invalid parameters (simix storage is NULL)"); return surf_storage_get_content(storage); } const char* SIMIX_storage_get_host(smx_storage_t storage){ - xbt_assert((storage != NULL), "Invalid parameters"); return surf_storage_get_host(storage); } @@ -297,11 +258,13 @@ void SIMIX_post_io(smx_synchro_t synchro) xbt_fifo_item_t i; smx_simcall_t simcall; + simgrid::simix::Io *io = static_cast(synchro); + xbt_fifo_foreach(synchro->simcalls,i,simcall,smx_simcall_t) { switch (simcall->call) { case SIMCALL_FILE_OPEN: { smx_file_t tmp = xbt_new(s_smx_file_t,1); - tmp->surf_file = surf_storage_action_get_file(synchro->io.surf_io); + tmp->surf_file = surf_storage_action_get_file(io->surf_io); simcall_file_open__set__result(simcall, tmp); break; } @@ -310,13 +273,11 @@ void SIMIX_post_io(smx_synchro_t synchro) simcall_file_close__set__result(simcall, 0); break; case SIMCALL_FILE_WRITE: - simcall_file_write__set__result(simcall, - synchro->io.surf_io->getCost()); + simcall_file_write__set__result(simcall, io->surf_io->getCost()); break; case SIMCALL_FILE_READ: - simcall_file_read__set__result(simcall, - synchro->io.surf_io->getCost()); + simcall_file_read__set__result(simcall, io->surf_io->getCost()); break; default: @@ -324,7 +285,7 @@ void SIMIX_post_io(smx_synchro_t synchro) } } - switch (synchro->io.surf_io->getState()) { + switch (io->surf_io->getState()) { case simgrid::surf::Action::State::failed: synchro->state = SIMIX_FAILED; @@ -344,10 +305,11 @@ void SIMIX_post_io(smx_synchro_t synchro) void SIMIX_io_destroy(smx_synchro_t synchro) { + simgrid::simix::Io *io = static_cast(synchro); XBT_DEBUG("Destroy synchro %p", synchro); - if (synchro->io.surf_io) - synchro->io.surf_io->unref(); - xbt_mallocator_release(simix_global->synchro_mallocator, synchro); + if (io->surf_io) + io->surf_io->unref(); + delete io; } void SIMIX_io_finish(smx_synchro_t synchro) diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp index dcf2c73eb2..8d808d7136 100644 --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@ -11,6 +11,8 @@ #include "xbt/dict.h" #include "simgrid/s4u/mailbox.hpp" +#include "src/simix/SynchroComm.hpp" + XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization"); static void SIMIX_mbox_free(void *data); @@ -100,10 +102,12 @@ void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process) * \param mbox The mailbox * \param comm The communication synchro */ -static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm) +static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + mbox->comm_queue->push_back(comm); - comm->comm.mbox = mbox; + comm->mbox = mbox; } /** @@ -111,9 +115,11 @@ static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm) * \param mbox The rendez-vous point * \param comm The communication synchro */ -void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm) +void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro) { - comm->comm.mbox = NULL; + simgrid::simix::Comm *comm = static_cast(synchro); + + comm->mbox = NULL; for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++) if (*it == comm) { mbox->comm_queue->erase(it); @@ -134,27 +140,29 @@ static smx_synchro_t _find_matching_comm(std::deque *deque, e_smx for(auto it = deque->begin(); it != deque->end(); it++){ smx_synchro_t synchro = *it; - if (synchro->comm.type == SIMIX_COMM_SEND) { - other_user_data = synchro->comm.src_data; - } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) { - other_user_data = synchro->comm.dst_data; + simgrid::simix::Comm *comm = static_cast(synchro); + + if (comm->type == SIMIX_COMM_SEND) { + other_user_data = comm->src_data; + } else if (comm->type == SIMIX_COMM_RECEIVE) { + other_user_data = comm->dst_data; } - if (synchro->comm.type == type && - (!match_fun || match_fun(this_user_data, other_user_data, synchro)) && - (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) { - XBT_DEBUG("Found a matching communication synchro %p", synchro); + if (comm->type == type && + (! match_fun || match_fun(this_user_data, other_user_data, synchro)) && + (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) { + XBT_DEBUG("Found a matching communication synchro %p", comm); if (remove_matching) deque->erase(it); - synchro->comm.refcount++; + comm->refcount++; #if HAVE_MC - synchro->comm.mbox_cpy = synchro->comm.mbox; + comm->mbox_cpy = comm->mbox; #endif - synchro->comm.mbox = NULL; - return synchro; + comm->mbox = NULL; + return comm; } XBT_DEBUG("Sorry, communication synchro %p does not match our needs:" " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)", - synchro, (int)synchro->comm.type, (int)type); + comm, (int)comm->type, (int)type); } XBT_DEBUG("No matching communication synchro found"); return NULL; @@ -171,25 +179,16 @@ static smx_synchro_t _find_matching_comm(std::deque *deque, e_smx */ smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type) { - smx_synchro_t synchro; + simgrid::simix::Comm *comm = new simgrid::simix::Comm(); + comm->state = SIMIX_WAITING; + comm->type = type; + comm->refcount = 1; + comm->src_data=NULL; + comm->dst_data=NULL; - /* alloc structures */ - synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); + XBT_DEBUG("Create communicate synchro %p", comm); - synchro->type = SIMIX_SYNC_COMMUNICATE; - synchro->state = SIMIX_WAITING; - - /* set communication */ - synchro->comm.type = type; - synchro->comm.refcount = 1; - synchro->comm.src_data=NULL; - synchro->comm.dst_data=NULL; - - synchro->category = NULL; - - XBT_DEBUG("Create communicate synchro %p", synchro); - - return synchro; + return comm; } /** @@ -198,53 +197,53 @@ smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type) */ void SIMIX_comm_destroy(smx_synchro_t synchro) { - XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", - synchro, synchro->comm.refcount, (int)synchro->state); + simgrid::simix::Comm *comm = static_cast(synchro); + + XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state); - if (synchro->comm.refcount <= 0) { + if (comm->refcount <= 0) { xbt_backtrace_display_current(); xbt_die("The refcount of comm %p is already 0 before decreasing it. " "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro); } - synchro->comm.refcount--; - if (synchro->comm.refcount > 0) + comm->refcount--; + if (comm->refcount > 0) return; - XBT_DEBUG("Really free communication %p; refcount is now %d", synchro, - synchro->comm.refcount); + XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount); - xbt_free(synchro->name); SIMIX_comm_destroy_internal_actions(synchro); - if (synchro->comm.detached && synchro->state != SIMIX_DONE) { + if (comm->detached && comm->state != SIMIX_DONE) { /* the communication has failed and was detached: * we have to free the buffer */ - if (synchro->comm.clean_fun) { - synchro->comm.clean_fun(synchro->comm.src_buff); + if (comm->clean_fun) { + comm->clean_fun(comm->src_buff); } - synchro->comm.src_buff = NULL; + comm->src_buff = NULL; } - if(synchro->comm.mbox) - SIMIX_mbox_remove(synchro->comm.mbox, synchro); + if(comm->mbox) + SIMIX_mbox_remove(comm->mbox, comm); - xbt_mallocator_release(simix_global->synchro_mallocator, synchro); + delete comm; } void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro) { - if (synchro->comm.surf_comm){ - synchro->comm.surf_comm->unref(); - synchro->comm.surf_comm = NULL; + simgrid::simix::Comm *comm = static_cast(synchro); + if (comm->surf_comm){ + comm->surf_comm->unref(); + comm->surf_comm = NULL; } - if (synchro->comm.src_timeout){ - synchro->comm.src_timeout->unref(); - synchro->comm.src_timeout = NULL; + if (comm->src_timeout){ + comm->src_timeout->unref(); + comm->src_timeout = NULL; } - if (synchro->comm.dst_timeout){ - synchro->comm.dst_timeout->unref(); - synchro->comm.dst_timeout = NULL; + if (comm->dst_timeout){ + comm->dst_timeout->unref(); + comm->dst_timeout = NULL; } } @@ -279,18 +278,21 @@ smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sr * If it is not found then push our communication into the rendez-vous point */ smx_synchro_t other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true); + simgrid::simix::Comm *other_comm = static_cast(other_synchro); + if (!other_synchro) { other_synchro = this_synchro; + other_comm = static_cast(other_synchro); if (mbox->permanent_receiver!=NULL){ //this mailbox is for small messages, which have to be sent right now other_synchro->state = SIMIX_READY; - other_synchro->comm.dst_proc=mbox->permanent_receiver; - other_synchro->comm.refcount++; + other_comm->dst_proc=mbox->permanent_receiver; + other_comm->refcount++; mbox->done_comm_queue->push_back(other_synchro); - other_synchro->comm.mbox=mbox; - XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm)); + other_comm->mbox=mbox; + XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm)); }else{ SIMIX_mbox_push(mbox, this_synchro); @@ -300,8 +302,8 @@ smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sr SIMIX_comm_destroy(this_synchro); - other_synchro->state = SIMIX_READY; - other_synchro->comm.type = SIMIX_COMM_READY; + other_comm->state = SIMIX_READY; + other_comm->type = SIMIX_COMM_READY; } xbt_fifo_push(src_proc->comms, other_synchro); @@ -309,32 +311,32 @@ smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sr /* if the communication synchro is detached then decrease the refcount * by one, so it will be eliminated by the receiver's destroy call */ if (detached) { - other_synchro->comm.detached = 1; - other_synchro->comm.refcount--; - other_synchro->comm.clean_fun = clean_fun; + other_comm->detached = 1; + other_comm->refcount--; + other_comm->clean_fun = clean_fun; } else { - other_synchro->comm.clean_fun = NULL; + other_comm->clean_fun = NULL; } /* Setup the communication synchro */ - other_synchro->comm.src_proc = src_proc; - other_synchro->comm.task_size = task_size; - other_synchro->comm.rate = rate; - other_synchro->comm.src_buff = src_buff; - other_synchro->comm.src_buff_size = src_buff_size; - other_synchro->comm.src_data = data; + other_comm->src_proc = src_proc; + other_comm->task_size = task_size; + other_comm->rate = rate; + other_comm->src_buff = src_buff; + other_comm->src_buff_size = src_buff_size; + other_comm->src_data = data; - other_synchro->comm.match_fun = match_fun; - other_synchro->comm.copy_data_fun = copy_data_fun; + other_comm->match_fun = match_fun; + other_comm->copy_data_fun = copy_data_fun; if (MC_is_active() || MC_record_replay_is_active()) { - other_synchro->state = SIMIX_RUNNING; - return (detached ? NULL : other_synchro); + other_comm->state = SIMIX_RUNNING; + return (detached ? NULL : other_comm); } - SIMIX_comm_start(other_synchro); - return (detached ? NULL : other_synchro); + SIMIX_comm_start(other_comm); + return (detached ? NULL : other_comm); } void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, @@ -343,8 +345,7 @@ void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, sm void (*copy_data_fun)(smx_synchro_t, void*, size_t), void *data, double timeout, double rate) { - smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, - dst_buff_size, match_fun, copy_data_fun, data, rate); + smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate); SIMCALL_SET_MC_VALUE(simcall, 0); simcall_HANDLER_comm_wait(simcall, comm, timeout); } @@ -379,13 +380,15 @@ smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void other_synchro = this_synchro; SIMIX_mbox_push(mbox, this_synchro); } else { - if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) { - XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm)); - other_synchro->state = SIMIX_DONE; - other_synchro->comm.type = SIMIX_COMM_DONE; - other_synchro->comm.mbox = NULL; + simgrid::simix::Comm *other_comm = static_cast(other_synchro); + + if(other_comm->surf_comm && SIMIX_comm_get_remains(other_comm)==0.0) { + XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm); + other_comm->state = SIMIX_DONE; + other_comm->type = SIMIX_COMM_DONE; + other_comm->mbox = NULL; } - other_synchro->comm.refcount--; + other_comm->refcount--; SIMIX_comm_destroy(this_synchro); } } else { @@ -403,24 +406,26 @@ smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void SIMIX_mbox_push(mbox, this_synchro); } else { SIMIX_comm_destroy(this_synchro); - other_synchro->state = SIMIX_READY; - other_synchro->comm.type = SIMIX_COMM_READY; - //other_synchro->comm.refcount--; + simgrid::simix::Comm *other_comm = static_cast(other_synchro); + + other_comm->state = SIMIX_READY; + other_comm->type = SIMIX_COMM_READY; } xbt_fifo_push(dst_proc->comms, other_synchro); } /* Setup communication synchro */ - other_synchro->comm.dst_proc = dst_proc; - other_synchro->comm.dst_buff = dst_buff; - other_synchro->comm.dst_buff_size = dst_buff_size; - other_synchro->comm.dst_data = data; + simgrid::simix::Comm *other_comm = static_cast(other_synchro); + other_comm->dst_proc = dst_proc; + other_comm->dst_buff = dst_buff; + other_comm->dst_buff_size = dst_buff_size; + other_comm->dst_data = data; - if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate)) - other_synchro->comm.rate = rate; + if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate)) + other_comm->rate = rate; - other_synchro->comm.match_fun = match_fun; - other_synchro->comm.copy_data_fun = copy_data_fun; + other_comm->match_fun = match_fun; + other_comm->copy_data_fun = copy_data_fun; if (MC_is_active() || MC_record_replay_is_active()) { other_synchro->state = SIMIX_RUNNING; @@ -459,11 +464,13 @@ smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int } if (!other_synchro){ XBT_DEBUG("check if we have more luck in the normal mailbox"); - other_synchro = - _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false); + other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false); + } + + if(other_synchro) { + simgrid::simix::Comm *other_comm = static_cast(other_synchro); + other_comm->refcount--; } - if(other_synchro) - other_synchro->comm.refcount--; SIMIX_comm_destroy(this_synchro); return other_synchro; @@ -490,10 +497,11 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, dou if (timeout == -1) THROW_IMPOSSIBLE; - if (synchro->comm.src_proc == simcall->issuer) - synchro->state = SIMIX_SRC_TIMEOUT; + simgrid::simix::Comm *comm = static_cast(synchro); + if (comm->src_proc == simcall->issuer) + comm->state = SIMIX_SRC_TIMEOUT; else - synchro->state = SIMIX_DST_TIMEOUT; + comm->state = SIMIX_DST_TIMEOUT; } SIMIX_comm_finish(synchro); @@ -508,17 +516,20 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, dou sleep = surf_host_sleep(simcall->issuer->host, timeout); sleep->setData(synchro); - if (simcall->issuer == synchro->comm.src_proc) - synchro->comm.src_timeout = sleep; + simgrid::simix::Comm *comm = static_cast(synchro); + if (simcall->issuer == comm->src_proc) + comm->src_timeout = sleep; else - synchro->comm.dst_timeout = sleep; + comm->dst_timeout = sleep; } } void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + if(MC_is_active() || MC_record_replay_is_active()){ - simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc); + simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc); if(simcall_comm_test__get__result(simcall)){ synchro->state = SIMIX_DONE; xbt_fifo_push(synchro->simcalls, simcall); @@ -613,46 +624,42 @@ void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall) */ static inline void SIMIX_comm_start(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + /* If both the sender and the receiver are already there, start the communication */ if (synchro->state == SIMIX_READY) { - sg_host_t sender = synchro->comm.src_proc->host; - sg_host_t receiver = synchro->comm.dst_proc->host; - - XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, - sg_host_get_name(sender), sg_host_get_name(receiver)); + sg_host_t sender = comm->src_proc->host; + sg_host_t receiver = comm->dst_proc->host; - synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model, - sender, receiver, - synchro->comm.task_size, synchro->comm.rate); + XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver)); - synchro->comm.surf_comm->setData(synchro); - - synchro->state = SIMIX_RUNNING; + comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate); + comm->surf_comm->setData(synchro); + comm->state = SIMIX_RUNNING; /* If a link is failed, detect it immediately */ - if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) { + if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) { XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sg_host_get_name(sender), sg_host_get_name(receiver)); - synchro->state = SIMIX_LINK_FAILURE; + comm->state = SIMIX_LINK_FAILURE; SIMIX_comm_destroy_internal_actions(synchro); } /* If any of the process is suspend, create the synchro but stop its execution, it will be restarted when the sender process resume */ - if (SIMIX_process_is_suspended(synchro->comm.src_proc) || - SIMIX_process_is_suspended(synchro->comm.dst_proc)) { + if (SIMIX_process_is_suspended(comm->src_proc) || + SIMIX_process_is_suspended(comm->dst_proc)) { /* FIXME: check what should happen with the synchro state */ - if (SIMIX_process_is_suspended(synchro->comm.src_proc)) + if (SIMIX_process_is_suspended(comm->src_proc)) XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication", - sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name); + sg_host_get_name(comm->src_proc->host), comm->src_proc->name); else XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication", - sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name); - - synchro->comm.surf_comm->suspend(); + sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name); + comm->surf_comm->suspend(); } } } @@ -663,6 +670,7 @@ static inline void SIMIX_comm_start(smx_synchro_t synchro) */ void SIMIX_comm_finish(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); unsigned int destroy_count = 0; smx_simcall_t simcall; @@ -682,8 +690,8 @@ void SIMIX_comm_finish(smx_synchro_t synchro) } /* If the synchro is still in a rendez-vous point then remove from it */ - if (synchro->comm.mbox) - SIMIX_mbox_remove(synchro->comm.mbox, synchro); + if (comm->mbox) + SIMIX_mbox_remove(comm->mbox, synchro); XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state); @@ -702,17 +710,15 @@ void SIMIX_comm_finish(smx_synchro_t synchro) break; case SIMIX_SRC_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, - "Communication timeouted because of sender"); + SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender"); break; case SIMIX_DST_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, - "Communication timeouted because of receiver"); + SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver"); break; case SIMIX_SRC_HOST_FAILURE: - if (simcall->issuer == synchro->comm.src_proc) + if (simcall->issuer == comm->src_proc) simcall->issuer->context->iwannadie = 1; // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); else @@ -720,7 +726,7 @@ void SIMIX_comm_finish(smx_synchro_t synchro) break; case SIMIX_DST_HOST_FAILURE: - if (simcall->issuer == synchro->comm.dst_proc) + if (simcall->issuer == comm->dst_proc) simcall->issuer->context->iwannadie = 1; // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); else @@ -731,12 +737,12 @@ void SIMIX_comm_finish(smx_synchro_t synchro) XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d", synchro, - synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL, - synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL, - simcall->issuer->name, simcall->issuer, synchro->comm.detached); - if (synchro->comm.src_proc == simcall->issuer) { + comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL, + comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL, + simcall->issuer->name, simcall->issuer, comm->detached); + if (comm->src_proc == simcall->issuer) { XBT_DEBUG("I'm source"); - } else if (synchro->comm.dst_proc == simcall->issuer) { + } else if (comm->dst_proc == simcall->issuer) { XBT_DEBUG("I'm dest"); } else { XBT_DEBUG("I'm neither source nor dest"); @@ -745,12 +751,10 @@ void SIMIX_comm_finish(smx_synchro_t synchro) break; case SIMIX_CANCELED: - if (simcall->issuer == synchro->comm.dst_proc) - SMX_EXCEPTION(simcall->issuer, cancel_error, 0, - "Communication canceled by the sender"); + if (simcall->issuer == comm->dst_proc) + SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender"); else - SMX_EXCEPTION(simcall->issuer, cancel_error, 0, - "Communication canceled by the receiver"); + SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver"); break; default: @@ -773,14 +777,14 @@ void SIMIX_comm_finish(smx_synchro_t synchro) simcall->issuer->waiting_synchro = NULL; xbt_fifo_remove(simcall->issuer->comms, synchro); - if(synchro->comm.detached){ - if(simcall->issuer == synchro->comm.src_proc){ - if(synchro->comm.dst_proc) - xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro); + if(comm->detached){ + if(simcall->issuer == comm->src_proc){ + if(comm->dst_proc) + xbt_fifo_remove(comm->dst_proc->comms, synchro); } - if(simcall->issuer == synchro->comm.dst_proc){ - if(synchro->comm.src_proc) - xbt_fifo_remove(synchro->comm.src_proc->comms, synchro); + if(simcall->issuer == comm->dst_proc){ + if(comm->src_proc) + xbt_fifo_remove(comm->src_proc->comms, synchro); } } SIMIX_simcall_answer(simcall); @@ -797,67 +801,74 @@ void SIMIX_comm_finish(smx_synchro_t synchro) */ void SIMIX_post_comm(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + /* Update synchro state */ - if (synchro->comm.src_timeout && - synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done) + if (comm->src_timeout && + comm->src_timeout->getState() == simgrid::surf::Action::State::done) synchro->state = SIMIX_SRC_TIMEOUT; - else if (synchro->comm.dst_timeout && - synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done) + else if (comm->dst_timeout && + comm->dst_timeout->getState() == simgrid::surf::Action::State::done) synchro->state = SIMIX_DST_TIMEOUT; - else if (synchro->comm.src_timeout && - synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed) + else if (comm->src_timeout && + comm->src_timeout->getState() == simgrid::surf::Action::State::failed) synchro->state = SIMIX_SRC_HOST_FAILURE; - else if (synchro->comm.dst_timeout && - synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed) + else if (comm->dst_timeout && + comm->dst_timeout->getState() == simgrid::surf::Action::State::failed) synchro->state = SIMIX_DST_HOST_FAILURE; - else if (synchro->comm.surf_comm && - synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) { - XBT_DEBUG("Puta madre. Surf says that the link broke"); + else if (comm->surf_comm && + comm->surf_comm->getState() == simgrid::surf::Action::State::failed) { synchro->state = SIMIX_LINK_FAILURE; } else synchro->state = SIMIX_DONE; XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", - synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached); + comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached); /* destroy the surf actions associated with the Simix communication */ - SIMIX_comm_destroy_internal_actions(synchro); + SIMIX_comm_destroy_internal_actions(comm); /* if there are simcalls associated with the synchro, then answer them */ if (xbt_fifo_size(synchro->simcalls)) { - SIMIX_comm_finish(synchro); + SIMIX_comm_finish(comm); } } void SIMIX_comm_cancel(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + /* if the synchro is a waiting state means that it is still in a mbox */ /* so remove from it and delete it */ - if (synchro->state == SIMIX_WAITING) { - SIMIX_mbox_remove(synchro->comm.mbox, synchro); - synchro->state = SIMIX_CANCELED; + if (comm->state == SIMIX_WAITING) { + SIMIX_mbox_remove(comm->mbox, synchro); + comm->state = SIMIX_CANCELED; } else if (!MC_is_active() /* when running the MC there are no surf actions */ && !MC_record_replay_is_active() - && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) { + && (comm->state == SIMIX_READY || comm->state == SIMIX_RUNNING)) { - synchro->comm.surf_comm->cancel(); + comm->surf_comm->cancel(); } } void SIMIX_comm_suspend(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + /*FIXME: shall we suspend also the timeout synchro? */ - if (synchro->comm.surf_comm) - synchro->comm.surf_comm->suspend(); + if (comm->surf_comm) + comm->surf_comm->suspend(); /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */ } void SIMIX_comm_resume(smx_synchro_t synchro) { + simgrid::simix::Comm *comm = static_cast(synchro); + /*FIXME: check what happen with the timeouts */ - if (synchro->comm.surf_comm) - synchro->comm.surf_comm->resume(); + if (comm->surf_comm) + comm->surf_comm->resume(); /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */ } @@ -872,12 +883,13 @@ double SIMIX_comm_get_remains(smx_synchro_t synchro) { if(!synchro) return 0; + simgrid::simix::Comm *comm = static_cast(synchro); double remains; switch (synchro->state) { case SIMIX_RUNNING: - remains = synchro->comm.surf_comm->getRemains(); + remains = comm->surf_comm->getRemains(); break; case SIMIX_WAITING: @@ -904,7 +916,9 @@ e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro) */ void* SIMIX_comm_get_src_data(smx_synchro_t synchro) { - return synchro->comm.src_data; + simgrid::simix::Comm *comm = static_cast(synchro); + + return comm->src_data; } /** @@ -914,44 +928,52 @@ void* SIMIX_comm_get_src_data(smx_synchro_t synchro) */ void* SIMIX_comm_get_dst_data(smx_synchro_t synchro) { - return synchro->comm.dst_data; + simgrid::simix::Comm *comm = static_cast(synchro); + + return comm->dst_data; } smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro) { - return synchro->comm.src_proc; + simgrid::simix::Comm *comm = static_cast(synchro); + + return comm->src_proc; } smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro) { - return synchro->comm.dst_proc; + simgrid::simix::Comm *comm = static_cast(synchro); + + return comm->dst_proc; } /******************************************************************************/ /* SIMIX_comm_copy_data callbacks */ /******************************************************************************/ -static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = - &SIMIX_comm_copy_pointer_callback; +static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback; -void -SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t)) +void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t)) { SIMIX_comm_copy_data_callback = callback; } -void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size) +void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size) { + simgrid::simix::Comm *comm = static_cast(synchro); + xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); - *(void **) (comm->comm.dst_buff) = buff; + *(void **) (comm->dst_buff) = buff; } -void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size) +void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size) { + simgrid::simix::Comm *comm = static_cast(synchro); + XBT_DEBUG("Copy the data over"); - memcpy(comm->comm.dst_buff, buff, buff_size); - if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP + memcpy(comm->dst_buff, buff, buff_size); + if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP xbt_free(buff); - comm->comm.src_buff = NULL; + comm->src_buff = NULL; } } @@ -960,37 +982,39 @@ void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff * \brief Copy the communication data from the sender's buffer to the receiver's one * \param comm The communication */ -void SIMIX_comm_copy_data(smx_synchro_t comm) +void SIMIX_comm_copy_data(smx_synchro_t synchro) { - size_t buff_size = comm->comm.src_buff_size; - /* If there is no data to be copy then return */ - if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied) + simgrid::simix::Comm *comm = static_cast(synchro); + + size_t buff_size = comm->src_buff_size; + /* If there is no data to copy then return */ + if (!comm->src_buff || !comm->dst_buff || comm->copied) return; XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm, - comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process", - comm->comm.src_buff, - comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process", - comm->comm.dst_buff, buff_size); + comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process", + comm->src_buff, + comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process", + comm->dst_buff, buff_size); /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - if (comm->comm.dst_buff_size) - buff_size = MIN(buff_size, *(comm->comm.dst_buff_size)); + if (comm->dst_buff_size) + buff_size = MIN(buff_size, *(comm->dst_buff_size)); /* Update the receiver's buffer size to the copied amount */ - if (comm->comm.dst_buff_size) - *comm->comm.dst_buff_size = buff_size; + if (comm->dst_buff_size) + *comm->dst_buff_size = buff_size; if (buff_size > 0){ - if(comm->comm.copy_data_fun) - comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size); + if(comm->copy_data_fun) + comm->copy_data_fun (comm, comm->src_buff, buff_size); else - SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size); + SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size); } /* Set the copied flag so we copy data only once */ /* (this function might be called from both communication ends) */ - comm->comm.copied = 1; + comm->copied = 1; } diff --git a/src/simix/smx_private.h b/src/simix/smx_private.h index 0e542ca3e8..5f08cc025a 100644 --- a/src/simix/smx_private.h +++ b/src/simix/smx_private.h @@ -69,7 +69,6 @@ typedef struct s_smx_global { void_pfn_smxprocess_t kill_process_function; /** Callback used when killing a SMX_process */ void_pfn_smxprocess_t cleanup_process_function; - xbt_mallocator_t synchro_mallocator; xbt_os_mutex_t mutex; } s_smx_global_t, *smx_global_t; @@ -94,104 +93,6 @@ typedef struct s_smx_file { void* data; /**< @brief user data */ } s_smx_file_t; -/********************************* synchro *************************************/ - -typedef enum { - SIMIX_SYNC_EXECUTE, - SIMIX_SYNC_PARALLEL_EXECUTE, - SIMIX_SYNC_COMMUNICATE, - SIMIX_SYNC_JOIN, - SIMIX_SYNC_SLEEP, - SIMIX_SYNC_SYNCHRO, - SIMIX_SYNC_IO, -} e_smx_synchro_type_t; - -typedef enum { - SIMIX_COMM_SEND, - SIMIX_COMM_RECEIVE, - SIMIX_COMM_READY, - SIMIX_COMM_DONE -} e_smx_comm_type_t; - -typedef enum { - SIMIX_IO_OPEN, - SIMIX_IO_WRITE, - SIMIX_IO_READ, - SIMIX_IO_STAT -} e_smx_io_type_t; - -/** @brief synchro datatype */ -typedef struct s_smx_synchro { - - e_smx_synchro_type_t type; /* Type of SIMIX synchro */ - e_smx_state_t state; /* State of the synchro */ - char *name; /* synchro name if any */ - xbt_fifo_t simcalls; /* List of simcalls waiting for this synchro */ - - /* Data specific to each synchro type */ - union { - - struct { - sg_host_t host; /* The host where the execution takes place */ - surf_action_t surf_exec; /* The Surf execution action encapsulated */ - } execution; /* Possibly parallel execution */ - - struct { - e_smx_comm_type_t type; /* Type of the communication (SIMIX_COMM_SEND or SIMIX_COMM_RECEIVE) */ - smx_mailbox_t mbox; /* Rendez-vous where the comm is queued */ - -#if HAVE_MC - smx_mailbox_t mbox_cpy; /* Copy of the rendez-vous where the comm is queued, MC needs it for DPOR - (comm.mbox set to NULL when the communication is removed from the mailbox - (used as garbage collector)) */ -#endif - int refcount; /* Number of processes involved in the cond */ - int detached; /* If detached or not */ - - void (*clean_fun)(void*); /* Function to clean the detached src_buf if something goes wrong */ - int (*match_fun)(void*,void*,smx_synchro_t); /* Filter function used by the other side. It is used when - looking if a given communication matches my needs. For that, myself must match the - expectations of the other side, too. See */ - void (*copy_data_fun) (smx_synchro_t, void*, size_t); - - /* Surf action data */ - surf_action_t surf_comm; /* The Surf communication action encapsulated */ - surf_action_t src_timeout; /* Surf's actions to instrument the timeouts */ - surf_action_t dst_timeout; /* Surf's actions to instrument the timeouts */ - smx_process_t src_proc; - smx_process_t dst_proc; - double rate; - double task_size; - - /* Data to be transfered */ - void *src_buff; - void *dst_buff; - size_t src_buff_size; - size_t *dst_buff_size; - unsigned copied:1; /* whether the data were already copied */ - - void* src_data; /* User data associated to communication */ - void* dst_data; - } comm; - - struct { - sg_host_t host; /* The host that is sleeping */ - surf_action_t surf_sleep; /* The Surf sleeping action encapsulated */ - } sleep; - - struct { - surf_action_t sleep; - } synchro; - - struct { - sg_host_t host; - surf_action_t surf_io; - } io; - }; - - char *category; /* simix action category for instrumentation */ -} s_smx_synchro_t; - XBT_PRIVATE void SIMIX_context_mod_init(void); XBT_PRIVATE void SIMIX_context_mod_exit(void); diff --git a/src/simix/smx_process.cpp b/src/simix/smx_process.cpp index 2fb0858549..c20273ac17 100644 --- a/src/simix/smx_process.cpp +++ b/src/simix/smx_process.cpp @@ -15,12 +15,15 @@ #include "src/simix/smx_private.hpp" #include "src/msg/msg_private.h" +#include "src/simix/SynchroSleep.hpp" +#include "src/simix/SynchroRaw.hpp" +#include "src/simix/SynchroIo.hpp" + #ifdef HAVE_SMPI #include "src/smpi/private.h" #endif -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_process, simix, - "Logging specific to SIMIX (process)"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_process, simix, "Logging specific to SIMIX (process)"); unsigned long simix_process_maxpid = 0; @@ -66,38 +69,36 @@ void SIMIX_process_cleanup(smx_process_t process) /* cancel non-blocking communications */ smx_synchro_t synchro; while ((synchro = (smx_synchro_t) xbt_fifo_pop(process->comms))) { + simgrid::simix::Comm *comm = static_cast(synchro); /* make sure no one will finish the comm after this process is destroyed, * because src_proc or dst_proc would be an invalid pointer */ - SIMIX_comm_cancel(synchro); + SIMIX_comm_cancel(comm); - if (synchro->comm.src_proc == process) { + if (comm->src_proc == process) { XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d, src = %p, dst = %p", - synchro, synchro->comm.detached, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc); - synchro->comm.src_proc = NULL; + comm, comm->detached, (int)comm->state, comm->src_proc, comm->dst_proc); + comm->src_proc = NULL; /* I'm not supposed to destroy a detached comm from the sender side, */ - if (!synchro->comm.detached) - SIMIX_comm_destroy(synchro); + if (!comm->detached) + SIMIX_comm_destroy(comm); else XBT_DEBUG("Don't destroy it since it's a detached comm"); } - else if (synchro->comm.dst_proc == process){ + else if (comm->dst_proc == process){ XBT_DEBUG("Found an unfinished recv comm %p, state %d, src = %p, dst = %p", - synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc); - synchro->comm.dst_proc = NULL; + comm, (int)comm->state, comm->src_proc, comm->dst_proc); + comm->dst_proc = NULL; - if (synchro->comm.detached && synchro->comm.refcount == 1 - && synchro->comm.src_proc != NULL) { + if (comm->detached && comm->refcount == 1 && comm->src_proc != NULL) { /* the comm will be freed right now, remove it from the sender */ - xbt_fifo_remove(synchro->comm.src_proc->comms, synchro); + xbt_fifo_remove(comm->src_proc->comms, comm); } - SIMIX_comm_destroy(synchro); - } - else { - xbt_die("Communication synchro %p is in my list but I'm not the sender " - "or the receiver", synchro); + SIMIX_comm_destroy(comm); + } else { + xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro); } } @@ -496,39 +497,39 @@ void SIMIX_process_kill(smx_process_t process, smx_process_t issuer) { /* destroy the blocking synchro if any */ if (process->waiting_synchro) { - switch (process->waiting_synchro->type) { + simgrid::simix::Exec *exec = dynamic_cast(process->waiting_synchro); + simgrid::simix::Comm *comm = dynamic_cast(process->waiting_synchro); + simgrid::simix::Sleep *sleep = dynamic_cast(process->waiting_synchro); + simgrid::simix::Raw *raw = dynamic_cast(process->waiting_synchro); + simgrid::simix::Io *io = dynamic_cast(process->waiting_synchro); - case SIMIX_SYNC_EXECUTE: - case SIMIX_SYNC_PARALLEL_EXECUTE: + if (exec != nullptr) { SIMIX_execution_destroy(process->waiting_synchro); - break; - case SIMIX_SYNC_COMMUNICATE: + } else if (comm != nullptr) { xbt_fifo_remove(process->comms, process->waiting_synchro); SIMIX_comm_cancel(process->waiting_synchro); xbt_fifo_remove(process->waiting_synchro->simcalls, &process->simcall); SIMIX_comm_destroy(process->waiting_synchro); - break; - case SIMIX_SYNC_SLEEP: + } else if (sleep != nullptr) { SIMIX_process_sleep_destroy(process->waiting_synchro); - break; - case SIMIX_SYNC_JOIN: - SIMIX_process_sleep_destroy(process->waiting_synchro); - break; - - case SIMIX_SYNC_SYNCHRO: + } else if (raw != nullptr) { SIMIX_synchro_stop_waiting(process, &process->simcall); SIMIX_synchro_destroy(process->waiting_synchro); - break; - case SIMIX_SYNC_IO: + } else if (io != nullptr) { SIMIX_io_destroy(process->waiting_synchro); - break; - } + /* + switch (process->waiting_synchro->type) { + case SIMIX_SYNC_JOIN: + SIMIX_process_sleep_destroy(process->waiting_synchro); + break; + } */ + process->waiting_synchro = NULL; } if(!xbt_dynar_member(simix_global->process_to_run, &(process)) && process != issuer) { @@ -553,35 +554,34 @@ void SIMIX_process_throw(smx_process_t process, xbt_errcat_t cat, int value, con /* cancel the blocking synchro if any */ if (process->waiting_synchro) { - switch (process->waiting_synchro->type) { - - case SIMIX_SYNC_EXECUTE: - case SIMIX_SYNC_PARALLEL_EXECUTE: + simgrid::simix::Exec *exec = dynamic_cast(process->waiting_synchro); + if (exec != nullptr) { SIMIX_execution_cancel(process->waiting_synchro); - break; + } - case SIMIX_SYNC_COMMUNICATE: + simgrid::simix::Comm *comm = dynamic_cast(process->waiting_synchro); + if (comm != nullptr) { xbt_fifo_remove(process->comms, process->waiting_synchro); SIMIX_comm_cancel(process->waiting_synchro); - break; + } - case SIMIX_SYNC_SLEEP: - case SIMIX_SYNC_JOIN: + simgrid::simix::Sleep *sleep = dynamic_cast(process->waiting_synchro); + if (sleep != nullptr) { SIMIX_process_sleep_destroy(process->waiting_synchro); if (!xbt_dynar_member(simix_global->process_to_run, &(process)) && process != SIMIX_process_self()) { XBT_DEBUG("Inserting %s in the to_run list", process->name); xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process); } - break; + } - case SIMIX_SYNC_SYNCHRO: + simgrid::simix::Raw *raw = dynamic_cast(process->waiting_synchro); + if (raw != nullptr) { SIMIX_synchro_stop_waiting(process, &process->simcall); - break; + } - case SIMIX_SYNC_IO: + simgrid::simix::Io *io = dynamic_cast(process->waiting_synchro); + if (io != nullptr) { SIMIX_io_destroy(process->waiting_synchro); - break; - } } process->waiting_synchro = NULL; @@ -653,38 +653,30 @@ smx_synchro_t SIMIX_process_suspend(smx_process_t process, smx_process_t issuer) process->suspended = 1; - /* If we are suspending another process, and it is waiting on a sync, - suspend its synchronization. */ + /* If we are suspending another process, and it is waiting on a sync, suspend its synchronization. */ if (process != issuer) { if (process->waiting_synchro) { - switch (process->waiting_synchro->type) { - - case SIMIX_SYNC_EXECUTE: - case SIMIX_SYNC_PARALLEL_EXECUTE: - SIMIX_execution_suspend(process->waiting_synchro); - break; - - case SIMIX_SYNC_COMMUNICATE: - SIMIX_comm_suspend(process->waiting_synchro); - break; - - case SIMIX_SYNC_SLEEP: - SIMIX_process_sleep_suspend(process->waiting_synchro); - break; + simgrid::simix::Exec *exec = dynamic_cast(process->waiting_synchro); + if (exec != nullptr) { + SIMIX_execution_suspend(process->waiting_synchro); + } - case SIMIX_SYNC_SYNCHRO: - /* Suspension is delayed to when the process is rescheduled. */ - break; + simgrid::simix::Comm *comm = dynamic_cast(process->waiting_synchro); + if (comm != nullptr) { + SIMIX_comm_suspend(process->waiting_synchro); + } - default: - xbt_die("Internal error in SIMIX_process_suspend: unexpected synchronization type %d", - (int)process->waiting_synchro->type); + simgrid::simix::Sleep *sleep = dynamic_cast(process->waiting_synchro); + if (sleep != nullptr) { + SIMIX_process_sleep_suspend(process->waiting_synchro); } + + /* The suspension of raw synchros is delayed to when the process is rescheduled. */ return NULL; } else { - /* Suspension is delayed to when the process is rescheduled. */ + /* If the other process is not waiting, its suspension is delayed to when the process is rescheduled. */ return NULL; } } else { @@ -714,31 +706,24 @@ void SIMIX_process_resume(smx_process_t process, smx_process_t issuer) if (process != issuer) { if (process->waiting_synchro) { + simgrid::simix::Exec *exec = dynamic_cast(process->waiting_synchro); + if (exec != nullptr) { + SIMIX_execution_resume(process->waiting_synchro); + } - switch (process->waiting_synchro->type) { - - case SIMIX_SYNC_EXECUTE: - case SIMIX_SYNC_PARALLEL_EXECUTE: - SIMIX_execution_resume(process->waiting_synchro); - break; - - case SIMIX_SYNC_COMMUNICATE: - SIMIX_comm_resume(process->waiting_synchro); - break; + simgrid::simix::Comm *comm = dynamic_cast(process->waiting_synchro); + if (comm != nullptr) { + SIMIX_comm_resume(process->waiting_synchro); + } - case SIMIX_SYNC_SLEEP: - SIMIX_process_sleep_resume(process->waiting_synchro); - break; + simgrid::simix::Sleep *sleep = dynamic_cast(process->waiting_synchro); + if (sleep != nullptr) { + SIMIX_process_sleep_resume(process->waiting_synchro); + } - case SIMIX_SYNC_SYNCHRO: - /* I cannot resume it now. This is delayed to when the process is rescheduled at - * the end of the synchro. */ - break; + /* I cannot resume raw synchros now. This is delayed to when the process is rescheduled at + * the end of the synchro. */ - default: - xbt_die("Internal error in SIMIX_process_resume: unexpected synchronization type %d", - (int)process->waiting_synchro->type); - } } } else XBT_WARN("Strange. Process %p is trying to resume himself.", issuer); @@ -849,12 +834,14 @@ void simcall_HANDLER_process_join(smx_simcall_t simcall, smx_process_t process, simcall->issuer->waiting_synchro = sync; } -static int SIMIX_process_join_finish(smx_process_exit_status_t status, smx_synchro_t sync){ - if (sync->sleep.surf_sleep) { - sync->sleep.surf_sleep->cancel(); +static int SIMIX_process_join_finish(smx_process_exit_status_t status, smx_synchro_t synchro){ + simgrid::simix::Sleep *sleep = static_cast(synchro); + + if (sleep->surf_sleep) { + sleep->surf_sleep->cancel(); smx_simcall_t simcall; - while ((simcall = (smx_simcall_t) xbt_fifo_shift(sync->simcalls))) { + while ((simcall = (smx_simcall_t) xbt_fifo_shift(sleep->simcalls))) { simcall_process_sleep__set__result(simcall, SIMIX_DONE); simcall->issuer->waiting_synchro = NULL; if (simcall->issuer->suspended) { @@ -865,17 +852,16 @@ static int SIMIX_process_join_finish(smx_process_exit_status_t status, smx_synch SIMIX_simcall_answer(simcall); } } - sync->sleep.surf_sleep->unref(); - sync->sleep.surf_sleep = NULL; + sleep->surf_sleep->unref(); + sleep->surf_sleep = NULL; } - xbt_mallocator_release(simix_global->synchro_mallocator, sync); + delete sleep; return 0; } smx_synchro_t SIMIX_process_join(smx_process_t issuer, smx_process_t process, double timeout) { smx_synchro_t res = SIMIX_process_sleep(issuer, timeout); - res->type = SIMIX_SYNC_JOIN; SIMIX_process_on_exit(process, (int_f_pvoid_pvoid_t)SIMIX_process_join_finish, res); return res; } @@ -898,20 +884,15 @@ smx_synchro_t SIMIX_process_sleep(smx_process_t process, double duration) sg_host_t host = process->host; /* check if the host is active */ - if (host->isOff()) { - THROWF(host_error, 0, "Host %s failed, you cannot call this function", - sg_host_get_name(host)); - } + if (host->isOff()) + THROWF(host_error, 0, "Host %s failed, you cannot call this function", sg_host_get_name(host)); - smx_synchro_t synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - synchro->type = SIMIX_SYNC_SLEEP; + simgrid::simix::Sleep *synchro = new simgrid::simix::Sleep(); synchro->name = NULL; - synchro->category = NULL; - - synchro->sleep.host = host; - synchro->sleep.surf_sleep = surf_host_sleep(host, duration); - synchro->sleep.surf_sleep->setData(synchro); + synchro->host = host; + synchro->surf_sleep = surf_host_sleep(host, duration); + synchro->surf_sleep->setData(synchro); XBT_DEBUG("Create sleep synchronization %p", synchro); return synchro; @@ -921,11 +902,11 @@ void SIMIX_post_process_sleep(smx_synchro_t synchro) { smx_simcall_t simcall; e_smx_state_t state; - xbt_assert(synchro->type == SIMIX_SYNC_SLEEP || synchro->type == SIMIX_SYNC_JOIN); + simgrid::simix::Sleep *sleep = static_cast(synchro); while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) { - switch (synchro->sleep.surf_sleep->getState()){ + switch (sleep->surf_sleep->getState()){ case simgrid::surf::Action::State::failed: simcall->issuer->context->iwannadie = 1; //SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); @@ -960,27 +941,26 @@ void SIMIX_post_process_sleep(smx_synchro_t synchro) void SIMIX_process_sleep_destroy(smx_synchro_t synchro) { XBT_DEBUG("Destroy synchro %p", synchro); - xbt_assert(synchro->type == SIMIX_SYNC_SLEEP || synchro->type == SIMIX_SYNC_JOIN); + simgrid::simix::Sleep *sleep = static_cast(synchro); - if (synchro->sleep.surf_sleep) { - synchro->sleep.surf_sleep->unref(); - synchro->sleep.surf_sleep = NULL; + if (sleep->surf_sleep) { + sleep->surf_sleep->unref(); + sleep->surf_sleep = NULL; } - if (synchro->type == SIMIX_SYNC_SLEEP) - xbt_mallocator_release(simix_global->synchro_mallocator, synchro); } void SIMIX_process_sleep_suspend(smx_synchro_t synchro) { - xbt_assert(synchro->type == SIMIX_SYNC_SLEEP); - synchro->sleep.surf_sleep->suspend(); + simgrid::simix::Sleep *sleep = static_cast(synchro); + sleep->surf_sleep->suspend(); } void SIMIX_process_sleep_resume(smx_synchro_t synchro) { XBT_DEBUG("Synchro state is %d on process_sleep_resume.", synchro->state); - xbt_assert(synchro->type == SIMIX_SYNC_SLEEP); - synchro->sleep.surf_sleep->resume(); + simgrid::simix::Sleep *sleep = static_cast(synchro); + + sleep->surf_sleep->resume(); } /** diff --git a/src/simix/smx_synchro.cpp b/src/simix/smx_synchro.cpp index de245a8ec7..afb57832b8 100644 --- a/src/simix/smx_synchro.cpp +++ b/src/simix/smx_synchro.cpp @@ -8,6 +8,7 @@ #include "smx_private.h" #include "xbt/log.h" +#include "src/simix/SynchroRaw.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_synchro, simix, "SIMIX Synchronization (mutex, semaphores and conditions)"); @@ -25,13 +26,10 @@ static smx_synchro_t SIMIX_synchro_wait(sg_host_t smx_host, double timeout) { XBT_IN("(%p, %f)",smx_host,timeout); - smx_synchro_t sync; - sync = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator); - sync->type = SIMIX_SYNC_SYNCHRO; - sync->name = xbt_strdup("synchro"); - sync->synchro.sleep = surf_host_sleep(smx_host, timeout); - - sync->synchro.sleep->setData(sync); + simgrid::simix::Raw *sync = new simgrid::simix::Raw(); + sync->name = nullptr; + sync->sleep = surf_host_sleep(smx_host, timeout); + sync->sleep->setData(sync); XBT_OUT(); return sync; } @@ -69,25 +67,23 @@ void SIMIX_synchro_stop_waiting(smx_process_t process, smx_simcall_t simcall) void SIMIX_synchro_destroy(smx_synchro_t synchro) { - XBT_IN("(%p)",synchro); XBT_DEBUG("Destroying synchro %p", synchro); - xbt_assert(synchro->type == SIMIX_SYNC_SYNCHRO); - synchro->synchro.sleep->unref(); - xbt_free(synchro->name); - xbt_mallocator_release(simix_global->synchro_mallocator, synchro); - XBT_OUT(); + simgrid::simix::Raw *raw = static_cast(synchro); + + raw->sleep->unref(); + delete raw; } void SIMIX_post_synchro(smx_synchro_t synchro) { XBT_IN("(%p)",synchro); - xbt_assert(synchro->type == SIMIX_SYNC_SYNCHRO); - if (synchro->synchro.sleep->getState() == simgrid::surf::Action::State::failed) - synchro->state = SIMIX_FAILED; - else if(synchro->synchro.sleep->getState() == simgrid::surf::Action::State::done) - synchro->state = SIMIX_SRC_TIMEOUT; + simgrid::simix::Raw *raw = static_cast(synchro); + if (raw->sleep->getState() == simgrid::surf::Action::State::failed) + raw->state = SIMIX_FAILED; + else if(raw->sleep->getState() == simgrid::surf::Action::State::done) + raw->state = SIMIX_SRC_TIMEOUT; - SIMIX_synchro_finish(synchro); + SIMIX_synchro_finish(raw); XBT_OUT(); } diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index aecb3cbbcf..613e09817e 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -17,6 +17,8 @@ #include "simgrid/sg_config.h" #include "colls/colls.h" +#include "src/simix/SynchroComm.hpp" + XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)"); static int match_recv(void* a, void* b, smx_synchro_t ignored) { @@ -884,9 +886,11 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) if ((*request)->action != NULL) { // this is not a detached send simcall_comm_wait((*request)->action, -1.0); - if((MC_is_active() || MC_record_replay_is_active()) && (*request)->action) - (*request)->action->comm.dst_data = NULL; // dangling pointer : dst_data is freed with a wait, need to set it to - // NULL for system state comparison + if((MC_is_active() || MC_record_replay_is_active()) && (*request)->action) { + simgrid::simix::Comm *comm = dynamic_cast( (*request)->action ); + + comm->dst_data = NULL; // dangling pointer: dst_data is freed with a wait, need to set it to NULL for system state comparison + } } finish_wait(request, status); diff --git a/src/smpi/smpi_global.cpp b/src/smpi/smpi_global.cpp index 5783c39929..360866784e 100644 --- a/src/smpi/smpi_global.cpp +++ b/src/smpi/smpi_global.cpp @@ -14,6 +14,8 @@ #include "simgrid/sg_config.h" #include "src/mc/mc_replay.h" #include "src/msg/msg_private.h" +#include "src/simix/SynchroComm.hpp" + #include /* DBL_MAX */ #include @@ -335,34 +337,37 @@ void print_request(const char *message, MPI_Request request) message, request, request->buf, request->size, request->src, request->dst, request->tag, request->flags); } -void smpi_comm_copy_buffer_callback(smx_synchro_t comm, void *buff, size_t buff_size) +void smpi_comm_copy_buffer_callback(smx_synchro_t synchro, void *buff, size_t buff_size) { XBT_DEBUG("Copy the data over"); void* tmpbuff=buff; + simgrid::simix::Comm *comm = dynamic_cast(synchro); if((smpi_privatize_global_variables) && ((char*)buff >= smpi_start_data_exe) && ((char*)buff < smpi_start_data_exe + smpi_size_data_exe ) ){ XBT_DEBUG("Privatization : We are copying from a zone inside global memory... Saving data to temp buffer !"); - smpi_switch_data_segment(((smpi_process_data_t)(((simdata_process_t)SIMIX_process_get_data(comm->comm.src_proc))->data))->index); + + + smpi_switch_data_segment(((smpi_process_data_t)(((simdata_process_t)SIMIX_process_get_data(comm->src_proc))->data))->index); tmpbuff = (void*)xbt_malloc(buff_size); memcpy(tmpbuff, buff, buff_size); } - if((smpi_privatize_global_variables) && ((char*)comm->comm.dst_buff >= smpi_start_data_exe) - && ((char*)comm->comm.dst_buff < smpi_start_data_exe + smpi_size_data_exe )){ + if((smpi_privatize_global_variables) && ((char*)comm->dst_buff >= smpi_start_data_exe) + && ((char*)comm->dst_buff < smpi_start_data_exe + smpi_size_data_exe )){ XBT_DEBUG("Privatization : We are copying to a zone inside global memory - Switch data segment"); - smpi_switch_data_segment(((smpi_process_data_t)(((simdata_process_t)SIMIX_process_get_data(comm->comm.dst_proc))->data))->index); + smpi_switch_data_segment(((smpi_process_data_t)(((simdata_process_t)SIMIX_process_get_data(comm->dst_proc))->data))->index); } - memcpy(comm->comm.dst_buff, tmpbuff, buff_size); - if (comm->comm.detached) { + memcpy(comm->dst_buff, tmpbuff, buff_size); + if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI // sender to make the original buffer available to the application ASAP xbt_free(buff); //It seems that the request is used after the call there this should be free somewhere else but where??? //xbt_free(comm->comm.src_data);// inside SMPI the request is kept inside the user data and should be free - comm->comm.src_buff = NULL; + comm->src_buff = NULL; } if(tmpbuff!=buff)xbt_free(tmpbuff); diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 73202b4c49..d079dd865c 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -32,6 +32,12 @@ set(EXTRA_DIST src/simix/smx_private.hpp src/simix/smx_process_private.h src/simix/smx_synchro_private.h + src/simix/Synchro.h + src/simix/SynchroComm.hpp + src/simix/SynchroExec.hpp + src/simix/SynchroIo.hpp + src/simix/SynchroSleep.hpp + src/simix/SynchroRaw.hpp src/smpi/README src/smpi/colls/coll_tuned_topo.h src/smpi/colls/colls.h @@ -341,7 +347,8 @@ set(SIMIX_SRC src/simix/smx_synchro.cpp src/simix/smx_vm.cpp src/simix/popping.cpp - + src/simix/Synchro.cpp + ${SIMIX_GENERATED_SRC} ) -- 2.20.1