From ff3cdbac76f96b8a8f7f5ba4e3b8d5a737625348 Mon Sep 17 00:00:00 2001 From: mquinson Date: Mon, 5 Oct 2009 12:27:45 +0000 Subject: [PATCH] Improvements to the MSG port on top of SIMIX network. All MSG tests pass now. This commit breaks the implementation of MSG_get_task_from_host, and is going to be removed from the MSG API. Clean-ups in the SIMIX networking code. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6703 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Virtu/sg_process.c | 23 +------- src/include/simix/simix.h | 13 ++--- src/msg/msg_mailbox.c | 9 ++- src/simix/private.h | 22 +++++--- src/simix/smx_network.c | 107 ++++++++++++++---------------------- src/smpi/smpi_global.c | 21 +------ 6 files changed, 67 insertions(+), 128 deletions(-) diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index 1ecbf3e2e4..bc39cf2c7a 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -243,32 +243,13 @@ void gras_function_register(const char *name, xbt_main_func_t code) void gras_main() { - smx_action_t action; - xbt_fifo_t actions_done = xbt_fifo_new(); - xbt_fifo_t actions_failed = xbt_fifo_new(); - /* Clean IO before the run */ fflush(stdout); fflush(stderr); SIMIX_init(); - while (SIMIX_solve(actions_done, actions_failed) != -1.0) { - while ((action = xbt_fifo_pop(actions_failed))) { - DEBUG1("** %s failed **", SIMIX_action_get_name(action)); - SIMIX_action_signal_all (action); - /* action finished, destroy it */ - // SIMIX_action_destroy(action); - } - - while ((action = xbt_fifo_pop(actions_done))) { - DEBUG1("** %s done **", SIMIX_action_get_name(action)); - SIMIX_action_signal_all (action); - /* action finished, destroy it */ - //SIMIX_action_destroy(action); - } - } - xbt_fifo_free(actions_failed); - xbt_fifo_free(actions_done); + while (SIMIX_solve(NULL, NULL) != -1.0); + return; } diff --git a/src/include/simix/simix.h b/src/include/simix/simix.h index 0260a97c00..9479cdbf53 100644 --- a/src/include/simix/simix.h +++ b/src/include/simix/simix.h @@ -178,25 +178,20 @@ XBT_PUBLIC(void) SIMIX_display_process_status(void); XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name); XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp); XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, - double timeout, void *data, size_t data_size, - int (filter)(smx_comm_t, void *), void *arg); + double timeout, void *data, size_t data_size); XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, - size_t *data_size, int (filter)(smx_comm_t, void *), void *arg); + size_t *data_size); XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm); XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm); XBT_PUBLIC(int) SIMIX_communication_isSend(smx_comm_t comm); XBT_PUBLIC(int) SIMIX_communication_isRecv(smx_comm_t comm); -/* FIXME: Filter function */ -int comm_filter_get(smx_comm_t comm, void *arg); -int comm_filter_put(smx_comm_t comm, void *arg); - /* These should be private to SIMIX */ -smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv); +smx_comm_t SIMIX_communication_new(smx_comm_type_t type); void SIMIX_communication_destroy(smx_comm_t comm); static inline void SIMIX_communication_use(smx_comm_t comm); static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout); -smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, int (filter)(smx_comm_t, void *), void *arg); +smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type); static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm); static inline void SIMIX_rdv_remove(smx_rdv_t rdv, smx_comm_t comm); static inline smx_cond_t SIMIX_rdv_get_cond(smx_rdv_t rdv); diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index 48d2b824c1..b0ea9b1136 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -176,8 +176,7 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task, smx_host = host ? host->simdata->smx_host : NULL; TRY{ - SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size, - comm_filter_get, smx_host); + SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size); } CATCH(e){ switch(e.category){ @@ -244,7 +243,7 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, TRY{ SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate, - timeout, &task, sizeof(void *), comm_filter_put, NULL); + timeout, &task, sizeof(void *)); } CATCH(e){ @@ -262,12 +261,12 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, ret = MSG_OK; RETHROW; break; - /*xbt_die("Unhandled SIMIX network exception");*/ + xbt_die("Unhandled SIMIX network exception"); } xbt_ex_free(e); MSG_RETURN(ret); } - /* t_simdata->refcount--;*/ + /*t_simdata->refcount--;*/ MSG_RETURN (MSG_OK); } diff --git a/src/simix/private.h b/src/simix/private.h index 3e45a08f39..ee9c9cedf3 100644 --- a/src/simix/private.h +++ b/src/simix/private.h @@ -129,19 +129,25 @@ typedef struct s_smx_rvpoint { } s_smx_rvpoint_t; typedef struct s_smx_comm { - smx_comm_type_t type; - smx_host_t src_host; - smx_host_t dst_host; - smx_rdv_t rdv; - smx_cond_t cond; + + + smx_comm_type_t type; /* Type of the communication (comm_send,comm_recv) */ + smx_rdv_t rdv; /* Rendez-vous where the comm is queued */ + smx_cond_t cond; /* Condition associated to the surf simulation */ + int refcount; /* Number of processes involved in the cond */ + + /* Surf action data */ + smx_process_t src_proc; + smx_process_t dst_proc; smx_action_t act; + double rate; + double task_size; + + /* Data to be transfered */ void *src_buff; size_t src_buff_size; void *dst_buff; size_t *dst_buff_size; - double rate; - double task_size; - int refcount; } s_smx_comm_t; /********************************* Action *************************************/ diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 400e8d0afb..0f7fd681d4 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -54,6 +54,7 @@ void SIMIX_rdv_destroy(smx_rdv_t rdv) static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm) { xbt_fifo_push(rdv->comm_fifo, comm); + comm->rdv = rdv; } /** @@ -64,28 +65,24 @@ static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm) static inline void SIMIX_rdv_remove(smx_rdv_t rdv, smx_comm_t comm) { xbt_fifo_remove(rdv->comm_fifo, comm); + comm->rdv = NULL; } - /** * \brief Checks if there is a communication request queued in a rendez-vous matching our needs - * \param rdv The rendez-vous with the queue - * \param look_for_src boolean. True: we are receiver looking for sender; False: other way round - * \return The communication request if found, or a newly created one otherwise. + * \param type The type of communication we are looking for (comm_send, comm_recv) + * \return The communication request if found, NULL otherwise. */ -smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, int (filter)(smx_comm_t, void*), void *arg) { - smx_comm_t comm; - xbt_fifo_item_t item; - - /* Traverse the rendez-vous queue looking for a comm request matching the - filter conditions. If found return it and remove it from the list. */ - xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_comm_t) { - if(filter(comm, arg)){ - SIMIX_communication_use(comm); - xbt_fifo_remove_item(rdv->comm_fifo, item); - DEBUG1("Communication request found! %p", comm); - return comm; - } +smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type) +{ + smx_comm_t comm = (smx_comm_t)xbt_fifo_get_item_content( + xbt_fifo_get_first_item(rdv->comm_fifo)); + + if(comm && comm->type == type){ + DEBUG0("Communication request found!"); + xbt_fifo_shift(rdv->comm_fifo); + SIMIX_communication_use(comm); + return comm; } /* no relevant request found. Return NULL */ @@ -99,17 +96,15 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, int (filter)(smx_comm_t, void*), /** * \brief Creates a new communication request - * \param sender The process starting the communication (by send) - * \param receiver The process receiving the communication (by recv) - * \return the communication request + * \param type The type of communication (comm_send, comm_recv) + * \return The new communication request */ -smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv) +smx_comm_t SIMIX_communication_new(smx_comm_type_t type) { /* alloc structures */ smx_comm_t comm = xbt_new0(s_smx_comm_t, 1); comm->type = type; comm->cond = SIMIX_cond_init(); - comm->rdv = rdv; comm->refcount = 1; return comm; @@ -151,11 +146,19 @@ static inline void SIMIX_communication_use(smx_comm_t comm) static inline void SIMIX_communication_start(smx_comm_t comm) { /* If both the sender and the receiver are already there, start the communication */ - if(comm->src_host != NULL && comm->dst_host != NULL){ + if(comm->src_proc && comm->dst_proc){ DEBUG1("Starting communication %p", comm); - comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, + comm->act = SIMIX_action_communicate(comm->src_proc->smx_host, + comm->dst_proc->smx_host, NULL, comm->task_size, comm->rate); + /* If any of the process is suspend, create the action but stop its execution, + it will be restart when the sender process resume */ + if(SIMIX_process_is_suspended(comm->src_proc) || + SIMIX_process_is_suspended(comm->dst_proc)) { + SIMIX_action_set_priority(comm->act, 0); + } + /* Add the communication as user data of the action */ comm->act->data = comm; @@ -235,26 +238,10 @@ void SIMIX_network_copy_data(smx_comm_t comm) memcpy(comm->dst_buff, comm->src_buff, dst_buff_size); DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", - comm, comm->src_host->name, comm->dst_host->name, dst_buff_size); -} - -/* FIXME: move to some other place */ -int comm_filter_get(smx_comm_t comm, void *arg) -{ - if(comm->type == comm_send){ - if(arg && comm->src_host != (smx_host_t)arg) - return FALSE; - else - return TRUE; - }else{ - return FALSE; - } + comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name, + dst_buff_size); } -int comm_filter_put(smx_comm_t comm, void *arg) -{ - return comm->type == comm_recv ? TRUE : FALSE; -} /******************************************************************************/ /* Synchronous Communication */ /******************************************************************************/ @@ -264,22 +251,21 @@ int comm_filter_put(smx_comm_t comm, void *arg) * - network_error if network failed or peer issued a timeout */ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, - double timeout, void *data, size_t data_size, - int (filter)(smx_comm_t, void *), void *arg) + double timeout, void *data, size_t data_size) { smx_comm_t comm; /* Look for communication request matching our needs. If it is not found then create it and push it into the rendez-vous point */ - comm = SIMIX_rdv_get_request(rdv, filter, arg); + comm = SIMIX_rdv_get_request(rdv, comm_recv); - if(comm == NULL){ - comm = SIMIX_communication_new(comm_send, rdv); + if(!comm){ + comm = SIMIX_communication_new(comm_send); SIMIX_rdv_push(rdv, comm); } /* Setup the communication request */ - comm->src_host = SIMIX_host_self(); + comm->src_proc = SIMIX_process_self(); comm->task_size = task_size; comm->rate = rate; comm->src_buff = data; @@ -288,7 +274,6 @@ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, SIMIX_communication_start(comm); /* Wait for communication completion */ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ SIMIX_communication_wait_for_completion(comm, timeout); SIMIX_communication_destroy(comm); @@ -299,29 +284,27 @@ void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, * - timeout_error if communication reached the timeout specified * - network_error if network failed or peer issued a timeout */ -void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, - size_t *data_size, int (filter)(smx_comm_t, void *), void *arg) +void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, size_t *data_size) { smx_comm_t comm; /* Look for communication request matching our needs. If it is not found then create it and push it into the rendez-vous point */ - comm = SIMIX_rdv_get_request(rdv, filter, arg); + comm = SIMIX_rdv_get_request(rdv, comm_send); - if(comm == NULL){ - comm = SIMIX_communication_new(comm_recv, rdv); + if(!comm){ + comm = SIMIX_communication_new(comm_recv); SIMIX_rdv_push(rdv, comm); } /* Setup communication request */ - comm->dst_host = SIMIX_host_self(); + comm->dst_proc = SIMIX_process_self(); comm->dst_buff = data; comm->dst_buff_size = data_size; SIMIX_communication_start(comm); /* Wait for communication completion */ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ SIMIX_communication_wait_for_completion(comm, timeout); SIMIX_communication_destroy(comm); @@ -334,20 +317,12 @@ void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, /* void SIMIX_network_wait(smx_action_t comm, double timeout) { - if (timeout > 0) - SIMIX_cond_wait_timeout(rdv_cond, rdv_comm_mutex, timeout - start_time); - else - SIMIX_cond_wait(rdv_cond, rdv_comm_mutex); - + TO BE IMPLEMENTED } - XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm) { - if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){ - memcpy(comm->data - - return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE; + TO BE IMPLEMENTED }*/ diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 62f27d8d6e..bef49b1db8 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -315,11 +315,6 @@ static void smpi_cfg_cb_host_speed(const char *name, int pos) int smpi_run_simulation(int *argc, char **argv) { - smx_action_t action = NULL; - - xbt_fifo_t actions_failed = xbt_fifo_new(); - xbt_fifo_t actions_done = xbt_fifo_new(); - srand(SMPI_RAND_SEED); double default_reference_speed = 20000.0; @@ -350,21 +345,9 @@ int smpi_run_simulation(int *argc, char **argv) fflush(stderr); SIMIX_init(); - while (SIMIX_solve(actions_done, actions_failed) != -1.0) { - while ((action = xbt_fifo_pop(actions_failed))) { - DEBUG1("** %s failed **", SIMIX_action_get_name(action)); - SIMIX_action_signal_all(action); - } - while ((action = xbt_fifo_pop(actions_done))) { - DEBUG1("** %s done **", SIMIX_action_get_name(action)); - SIMIX_action_signal_all(action); - } - } - + while (SIMIX_solve(NULL, NULL) != -1.0); + // FIXME: cleanup incomplete - xbt_fifo_free(actions_failed); - xbt_fifo_free(actions_done); - if (xbt_cfg_get_int(_surf_cfg_set, "display_timing")) INFO1("simulation time %g", SIMIX_get_clock()); -- 2.20.1