X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/37baaf70ee95c42a6f4b80913db243c199230fb9..7e0b698c2cc09a28687fd9eb216a8ba1dc8aad74:/src/simix/smx_network.c?ds=sidebyside diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index da59e87a51..a6106bc1d2 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -19,9 +19,9 @@ static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall); static void SIMIX_comm_copy_data(smx_action_t comm); static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type); static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm); -static smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type, - int (*match_fun)(void *, void *,smx_action_t), - void *user_data, smx_action_t my_action); +static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type, + int (*match_fun)(void *, void *,smx_action_t), + void *user_data, smx_action_t my_action); static void SIMIX_rdv_free(void *data); void SIMIX_network_init(void) @@ -47,6 +47,10 @@ smx_rdv_t SIMIX_rdv_create(const char *name) rdv = xbt_new0(s_smx_rvpoint_t, 1); rdv->name = name ? xbt_strdup(name) : NULL; rdv->comm_fifo = xbt_fifo_new(); + rdv->done_comm_fifo = xbt_fifo_new(); + rdv->permanent_receiver=NULL; + + XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name); if (rdv->name) xbt_dict_set(rdv_points, rdv->name, rdv, NULL); @@ -62,9 +66,12 @@ void SIMIX_rdv_destroy(smx_rdv_t rdv) void SIMIX_rdv_free(void *data) { + XBT_DEBUG("rdv free %p", data); smx_rdv_t rdv = (smx_rdv_t) data; xbt_free(rdv->name); xbt_fifo_free(rdv->comm_fifo); + xbt_fifo_free(rdv->done_comm_fifo); + xbt_free(rdv); } @@ -97,6 +104,26 @@ smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv) return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo)); } +/** + * \brief get the receiver (process associated to the mailbox) + * \param rdv The rendez-vous point + * \return process The receiving process (NULL if not set) + */ +smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv) +{ + return rdv->permanent_receiver; +} + +/** + * \brief set the receiver of the rendez vous point to allow eager sends + * \param rdv The rendez-vous point + * \param process The receiving process + */ +void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process) +{ + rdv->permanent_receiver=process; +} + /** * \brief Pushes a communication action into a rendez-vous point * \param rdv The rendez-vous point @@ -120,11 +147,11 @@ XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm) } /** - * \brief Checks if there is a communication action queued in a rendez-vous matching our needs + * \brief Checks if there is a communication action queued in a fifo matching our needs * \param type The type of communication we are looking for (comm_send, comm_recv) * \return The communication action if found, NULL otherwise */ -smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type, +smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type, int (*match_fun)(void *, void *,smx_action_t), void *this_user_data, smx_action_t my_action) { @@ -132,7 +159,7 @@ smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type, xbt_fifo_item_t item; void* other_user_data = NULL; - xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) { + xbt_fifo_foreach(fifo, item, action, smx_action_t) { if (action->comm.type == SIMIX_COMM_SEND) { other_user_data = action->comm.src_data; } else if (action->comm.type == SIMIX_COMM_RECEIVE) { @@ -142,7 +169,7 @@ smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type, (!match_fun || match_fun(this_user_data, other_user_data, action)) && (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) { XBT_DEBUG("Found a matching communication action %p", action); - xbt_fifo_remove_item(rdv->comm_fifo, item); + xbt_fifo_remove_item(fifo, item); xbt_fifo_free_item(item); action->comm.refcount++; action->comm.rdv = NULL; @@ -263,6 +290,8 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, void *data, int detached) { + XBT_DEBUG("send from %p\n", rdv); + /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */ smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND); @@ -270,17 +299,33 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, * ourself so that the other side also gets a chance of choosing if it wants to match with us. * * If it is not found then push our communication into the rendez-vous point */ - smx_action_t other_action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data, this_action); + smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action); if (!other_action) { other_action = this_action; - SIMIX_rdv_push(rdv, this_action); + + if (rdv->permanent_receiver!=NULL){ + //this mailbox is for small messages, which have to be sent right now + other_action->state = SIMIX_READY; + other_action->comm.dst_proc=rdv->permanent_receiver; + other_action->comm.refcount++; + other_action->comm.rdv = rdv; + xbt_fifo_push(rdv->done_comm_fifo,other_action); + other_action->comm.rdv=rdv; + XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm)); + + }else{ + SIMIX_rdv_push(rdv, this_action); + } } else { + XBT_DEBUG("Receive already pushed\n"); + SIMIX_comm_destroy(this_action); --smx_total_comms; // this creation was a pure waste other_action->state = SIMIX_READY; other_action->comm.type = SIMIX_COMM_READY; + } xbt_fifo_push(src_proc->comms, other_action); @@ -317,41 +362,85 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size, int (*match_fun)(void *, void *, smx_action_t), void *data) { - /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */ - smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE); - - /* Look for communication action matching our needs. We also provide a description of - * ourself so that the other side also gets a chance of choosing if it wants to match with us. - * - * If it is not found then push our communication into the rendez-vous point */ - smx_action_t other_action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data, this_action); - - if (!other_action) { - other_action = this_action; - SIMIX_rdv_push(rdv, this_action); - } else { - SIMIX_comm_destroy(this_action); - --smx_total_comms; // this creation was a pure waste - - other_action->state = SIMIX_READY; - other_action->comm.type = SIMIX_COMM_READY; - } - xbt_fifo_push(dst_proc->comms, other_action); - - /* Setup communication action */ - other_action->comm.dst_proc = dst_proc; - other_action->comm.dst_buff = dst_buff; - other_action->comm.dst_buff_size = dst_buff_size; - other_action->comm.dst_data = data; - - other_action->comm.match_fun = match_fun; - - if (MC_IS_ENABLED) { - other_action->state = SIMIX_RUNNING; - return other_action; - } - - SIMIX_comm_start(other_action); + XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo); + smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE); + + smx_action_t other_action; + //communication already done, get it inside the fifo of completed comms + //permanent receive v1 + //int already_received=0; + if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){ + + XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n"); + //find a match in the already received fifo + other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + //if not found, assume the receiver came first, register it to the mailbox in the classical way + if (!other_action) { + XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n"); + other_action = this_action; + SIMIX_rdv_push(rdv, this_action); + }else{ + if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0) + { + XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm)); + other_action->state = SIMIX_DONE; + other_action->comm.type = SIMIX_COMM_DONE; + other_action->comm.rdv = NULL; + SIMIX_comm_destroy(this_action); + --smx_total_comms; // this creation was a pure waste + //already_received=1; + other_action->comm.refcount--; + }/*else{ + XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo)); + }*/ + other_action->comm.refcount--; + SIMIX_comm_destroy(this_action); + --smx_total_comms; // this creation was a pure waste + } + }else{ + /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */ + + /* Look for communication action matching our needs. We also provide a description of + * ourself so that the other side also gets a chance of choosing if it wants to match with us. + * + * If it is not found then push our communication into the rendez-vous point */ + other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + + if (!other_action) { + XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo)); + other_action = this_action; + SIMIX_rdv_push(rdv, this_action); + } else { + SIMIX_comm_destroy(this_action); + --smx_total_comms; // this creation was a pure waste + other_action->state = SIMIX_READY; + other_action->comm.type = SIMIX_COMM_READY; + xbt_fifo_push(dst_proc->comms, other_action); + + } + } + + + /* Setup communication action */ + other_action->comm.dst_proc = dst_proc; + other_action->comm.dst_buff = dst_buff; + other_action->comm.dst_buff_size = dst_buff_size; + other_action->comm.dst_data = data; + + other_action->comm.match_fun = match_fun; + + + /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info + SIMIX_comm_copy_data(other_action);*/ + + + if (MC_IS_ENABLED) { + other_action->state = SIMIX_RUNNING; + return other_action; + } + + SIMIX_comm_start(other_action); + // } return other_action; } @@ -362,6 +451,8 @@ void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double time surf_action_t sleep; /* Associate this simcall to the wait action */ + XBT_DEBUG("SIMIX_pre_comm_wait, %p", action); + xbt_fifo_push(action->simcalls, simcall); simcall->issuer->waiting_action = action; @@ -518,7 +609,7 @@ XBT_INLINE void SIMIX_comm_start(smx_action_t action) /* If a link is failed, detect it immediately */ if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) { XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", - SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver)); + SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver)); action->state = SIMIX_LINK_FAILURE; SIMIX_comm_destroy_internal_actions(action); } @@ -589,14 +680,16 @@ void SIMIX_comm_finish(smx_action_t action) case SIMIX_SRC_HOST_FAILURE: if (simcall->issuer == action->comm.src_proc) - SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); + simcall->issuer->context->iwannadie = 1; +// SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); else SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); break; case SIMIX_DST_HOST_FAILURE: if (simcall->issuer == action->comm.dst_proc) - SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); + simcall->issuer->context->iwannadie = 1; +// SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); else SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); break; @@ -640,6 +733,11 @@ void SIMIX_comm_finish(smx_action_t action) } } + if (surf_workstation_model->extension. + workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) { + simcall->issuer->context->iwannadie = 1; + } + simcall->issuer->waiting_action = NULL; xbt_fifo_remove(simcall->issuer->comms, action); SIMIX_simcall_answer(simcall); @@ -671,7 +769,7 @@ void SIMIX_post_comm(smx_action_t action) action->state = SIMIX_DST_HOST_FAILURE; else if (action->comm.surf_comm && surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) { - XBT_DEBUG("Puta madre. Surf says that the link broke"); + XBT_DEBUG("Puta madre. Surf says that the link broke"); action->state = SIMIX_LINK_FAILURE; } else action->state = SIMIX_DONE; @@ -835,12 +933,6 @@ void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff } void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size) -{ - XBT_DEBUG("Copy the data over"); - memcpy(comm->comm.dst_buff, buff, buff_size); -} - -void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size) { XBT_DEBUG("Copy the data over"); memcpy(comm->comm.dst_buff, buff, buff_size); @@ -850,6 +942,7 @@ void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_siz } } + /** * \brief Copy the communication data from the sender's buffer to the receiver's one * \param comm The communication @@ -858,7 +951,7 @@ void SIMIX_comm_copy_data(smx_action_t comm) { size_t buff_size = comm->comm.src_buff_size; /* If there is no data to be copy then return */ - if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1) + if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied) return; XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",