1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
17 static XBT_INLINE void SIMIX_comm_start(smx_action_t action);
18 static void SIMIX_comm_finish(smx_action_t action);
19 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
20 static void SIMIX_comm_copy_data(smx_action_t comm);
21 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
24 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
25 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
26 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
27 int (*match_fun)(void *, void *), void *);
28 static void SIMIX_rdv_free(void *data);
30 void SIMIX_network_init(void)
32 rdv_points = xbt_dict_new();
35 void SIMIX_network_exit(void)
37 xbt_dict_free(&rdv_points);
40 /******************************************************************************/
41 /* Rendez-Vous Points */
42 /******************************************************************************/
44 smx_rdv_t SIMIX_rdv_create(const char *name)
46 /* two processes may have pushed the same rdv_create request at the same time */
47 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
50 rdv = xbt_new0(s_smx_rvpoint_t, 1);
51 rdv->name = name ? xbt_strdup(name) : NULL;
52 rdv->comm_fifo = xbt_fifo_new();
55 xbt_dict_set(rdv_points, name, rdv, SIMIX_rdv_free);
60 void SIMIX_rdv_destroy(smx_rdv_t rdv)
63 xbt_dict_remove(rdv_points, rdv->name);
66 void SIMIX_rdv_free(void *data)
68 smx_rdv_t rdv = (smx_rdv_t) data;
71 xbt_fifo_free(rdv->comm_fifo);
75 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
77 return xbt_dict_get_or_null(rdv_points, name);
80 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
82 smx_action_t comm = NULL;
83 xbt_fifo_item_t item = NULL;
86 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
87 if (comm->comm.src_proc->smx_host == host)
94 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
96 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
100 * \brief Push a communication request into a rendez-vous point
101 * \param rdv The rendez-vous point
102 * \param comm The communication request
104 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
106 xbt_fifo_push(rdv->comm_fifo, comm);
107 comm->comm.rdv = rdv;
111 * \brief Remove a communication request from a rendez-vous point
112 * \param rdv The rendez-vous point
113 * \param comm The communication request
115 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
117 xbt_fifo_remove(rdv->comm_fifo, comm);
118 comm->comm.rdv = NULL;
122 * \brief Checks if there is a communication request queued in a rendez-vous matching our needs
123 * \param type The type of communication we are looking for (comm_send, comm_recv)
124 * \return The communication request if found, NULL otherwise
126 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
127 int (*match_fun)(void *, void *), void *data)
130 xbt_fifo_item_t item;
131 void* req_data = NULL;
133 xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){
134 if(req->comm.type == SIMIX_COMM_SEND) {
135 req_data = req->comm.src_data;
136 } else if(req->comm.type == SIMIX_COMM_RECEIVE) {
137 req_data = req->comm.dst_data;
139 if(req->comm.type == type && (!match_fun || match_fun(data, req_data))) {
140 xbt_fifo_remove_item(rdv->comm_fifo, item);
141 req->comm.refcount++;
142 req->comm.rdv = NULL;
146 DEBUG0("Communication request not found");
150 /******************************************************************************/
151 /* Comunication Actions */
152 /******************************************************************************/
155 * \brief Creates a new comunicate action
156 * \param type The type of request (comm_send, comm_recv)
157 * \return The new comunicate action
159 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
163 /* alloc structures */
164 act = xbt_new0(s_smx_action_t, 1);
165 act->type = SIMIX_ACTION_COMMUNICATE;
166 act->state = SIMIX_WAITING;
167 act->request_list = xbt_fifo_new();
169 /* set communication */
170 act->comm.type = type;
171 act->comm.refcount = 1;
174 act->category = NULL;
177 DEBUG1("Create communicate action %p", act);
183 * \brief Destroy a communicate action
184 * \param action The communicate action to be destroyed
186 void SIMIX_comm_destroy(smx_action_t action)
188 DEBUG1("Destroy action %p", action);
190 if (action->comm.refcount <= 0)
191 xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
193 #ifdef HAVE_LATENCY_BOUND_TRACKING
194 //save is latency limited flag to use afterwards
195 if (action->comm.surf_comm) {
196 DEBUG2("adding key %p with latency limited value %d to the dict", action,
197 SIMIX_comm_is_latency_bounded(action));
198 xbt_dicti_set(simix_global->latency_limited_dict, (uintptr_t) action,
199 SIMIX_comm_is_latency_bounded(action));
203 action->comm.refcount--;
204 if (action->comm.refcount > 0)
206 VERB2("Really free communication %p; refcount is now %d", action,
207 action->comm.refcount);
210 TRACE_smx_action_destroy(action);
214 xbt_free(action->name);
216 xbt_fifo_free(action->request_list);
218 SIMIX_comm_destroy_internal_actions(action);
223 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
225 if (action->comm.surf_comm){
226 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
227 action->comm.surf_comm = NULL;
230 if (action->comm.src_timeout){
231 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
232 action->comm.src_timeout = NULL;
235 if (action->comm.dst_timeout){
236 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
237 action->comm.dst_timeout = NULL;
241 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
242 double task_size, double rate,
243 void *src_buff, size_t src_buff_size,
244 int (*match_fun)(void *, void *), void *data)
248 /* Look for communication request matching our needs.
249 If it is not found then create it and push it into the rendez-vous point */
250 action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
253 action = SIMIX_comm_new(SIMIX_COMM_SEND);
254 SIMIX_rdv_push(rdv, action);
256 action->state = SIMIX_READY;
257 action->comm.type = SIMIX_COMM_READY;
260 /* Setup the communication request */
261 action->comm.src_proc = src_proc;
262 action->comm.task_size = task_size;
263 action->comm.rate = rate;
264 action->comm.src_buff = src_buff;
265 action->comm.src_buff_size = src_buff_size;
266 action->comm.src_data = data;
269 action->state = SIMIX_RUNNING;
273 SIMIX_comm_start(action);
277 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
278 void *dst_buff, size_t *dst_buff_size,
279 int (*match_fun)(void *, void *), void *data)
283 /* Look for communication request matching our needs.
284 * If it is not found then create it and push it into the rendez-vous point
286 action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
289 action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
290 SIMIX_rdv_push(rdv, action);
292 action->state = SIMIX_READY;
293 action->comm.type = SIMIX_COMM_READY;
296 /* Setup communication request */
297 action->comm.dst_proc = dst_proc;
298 action->comm.dst_buff = dst_buff;
299 action->comm.dst_buff_size = dst_buff_size;
300 action->comm.dst_data = data;
303 action->state = SIMIX_RUNNING;
307 SIMIX_comm_start(action);
311 void SIMIX_pre_comm_wait(smx_req_t req)
313 smx_action_t action = req->comm_wait.comm;
314 double timeout = req->comm_wait.timeout;
317 /* Associate this request to the action */
318 xbt_fifo_push(action->request_list, req);
319 req->issuer->waiting_action = action;
322 action->state = SIMIX_DONE;
323 SIMIX_comm_finish(action);
327 /* If the action has already finish perform the error handling, */
328 /* otherwise set up a waiting timeout on the right side */
329 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
330 SIMIX_comm_finish(action);
331 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
332 sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
333 surf_workstation_model->action_data_set(sleep, action);
335 if (req->issuer == action->comm.src_proc)
336 action->comm.src_timeout = sleep;
338 action->comm.dst_timeout = sleep;
342 void SIMIX_pre_comm_test(smx_req_t req)
344 smx_action_t action = req->comm_test.comm;
345 req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
347 if (req->comm_test.result) {
348 xbt_fifo_push(action->request_list, req);
349 SIMIX_comm_finish(action);
352 SIMIX_request_answer(req);
356 void SIMIX_pre_comm_testany(smx_req_t req)
360 req->comm_testany.result = -1;
361 xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
362 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
363 req->comm_testany.result = cursor;
364 xbt_fifo_push(action->request_list, req);
365 SIMIX_comm_finish(action);
369 SIMIX_request_answer(req);
372 void SIMIX_pre_comm_waitany(smx_req_t req)
375 unsigned int cursor = 0;
376 xbt_dynar_t actions = req->comm_waitany.comms;
377 xbt_dynar_foreach(actions, cursor, action){
378 /* Associate this request to the action */
379 xbt_fifo_push(action->request_list, req);
380 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
381 SIMIX_comm_finish(action);
387 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
390 unsigned int cursor = 0;
391 xbt_dynar_t actions = req->comm_waitany.comms;
393 xbt_dynar_foreach(actions, cursor, action){
394 xbt_fifo_remove(action->request_list, req);
399 * \brief Start the simulation of a communication request
400 * \param action The communication action
402 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
404 /* If both the sender and the receiver are already there, start the communication */
405 if (action->state == SIMIX_READY) {
406 smx_host_t sender = action->comm.src_proc->smx_host;
407 smx_host_t receiver = action->comm.dst_proc->smx_host;
409 DEBUG3("Starting communication %p from '%s' to '%s'", action,
410 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
412 action->comm.surf_comm = surf_workstation_model->extension.workstation.
413 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
415 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
417 action->state = SIMIX_RUNNING;
420 TRACE_smx_action_communicate(action, action->comm.src_proc);
423 /* If a link is failed, detect it immediately */
424 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
425 DEBUG2("Communication from '%s' to '%s' failed to start because of a link failure",
426 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
427 action->state = SIMIX_LINK_FAILURE;
428 SIMIX_comm_destroy_internal_actions(action);
431 /* If any of the process is suspend, create the action but stop its execution,
432 it will be restarted when the sender process resume */
433 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
434 SIMIX_process_is_suspended(action->comm.dst_proc)) {
435 /* FIXME: check what should happen with the action state */
436 surf_workstation_model->suspend(action->comm.surf_comm);
441 void SIMIX_comm_finish(smx_action_t action)
445 while ((req = xbt_fifo_shift(action->request_list))) {
447 /* If a waitany request is waiting for this action to finish, then remove
448 it from the other actions in the waitany list. Afterwards, get the
449 position of the actual action in the waitany request's actions dynar and
450 return it as the result of the call */
451 if (req->call == REQ_COMM_WAITANY) {
452 SIMIX_waitany_req_remove_from_actions(req);
453 req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
456 /* If the action is still in a rendez-vous point then remove from it */
457 if (action->comm.rdv)
458 SIMIX_rdv_remove(action->comm.rdv, action);
460 DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
462 /* Check out for errors */
463 switch (action->state) {
466 DEBUG1("Communication %p complete!", action);
467 SIMIX_comm_copy_data(action);
470 case SIMIX_SRC_TIMEOUT:
472 THROW0(timeout_error, 0, "Communication timeouted because of sender");
474 CATCH(req->issuer->running_ctx->exception) {
475 req->issuer->doexception = 1;
479 case SIMIX_DST_TIMEOUT:
481 THROW0(timeout_error, 0, "Communication timeouted because of receiver");
483 CATCH(req->issuer->running_ctx->exception) {
484 req->issuer->doexception = 1;
488 case SIMIX_SRC_HOST_FAILURE:
490 if (req->issuer == action->comm.src_proc)
491 THROW0(host_error, 0, "Host failed");
493 THROW0(network_error, 0, "Remote peer failed");
495 CATCH(req->issuer->running_ctx->exception) {
496 req->issuer->doexception = 1;
500 case SIMIX_DST_HOST_FAILURE:
502 if (req->issuer == action->comm.dst_proc)
503 THROW0(host_error, 0, "Host failed");
505 THROW0(network_error, 0, "Remote peer failed");
507 CATCH(req->issuer->running_ctx->exception) {
508 req->issuer->doexception = 1;
512 case SIMIX_LINK_FAILURE:
514 DEBUG5("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
515 action, action->comm.src_proc->smx_host->name, action->comm.dst_proc->smx_host->name,
516 req->issuer->name, req->issuer);
517 THROW0(network_error, 0, "Link failure");
519 CATCH(req->issuer->running_ctx->exception) {
520 req->issuer->doexception = 1;
527 req->issuer->waiting_action = NULL;
528 SIMIX_request_answer(req);
532 void SIMIX_post_comm(smx_action_t action)
534 /* Update action state */
535 if (action->comm.src_timeout &&
536 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
537 action->state = SIMIX_SRC_TIMEOUT;
538 else if (action->comm.dst_timeout &&
539 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
540 action->state = SIMIX_DST_TIMEOUT;
541 else if (action->comm.src_timeout &&
542 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
543 action->state = SIMIX_SRC_HOST_FAILURE;
544 else if (action->comm.dst_timeout &&
545 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
546 action->state = SIMIX_DST_HOST_FAILURE;
547 else if (action->comm.surf_comm &&
548 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
549 action->state = SIMIX_LINK_FAILURE;
551 action->state = SIMIX_DONE;
553 DEBUG1("SIMIX_post_comm: action state = %d", action->state);
555 /* After this point the surf actions associated with the simix communicate
556 action are no longer needed, thus we delete them. */
557 SIMIX_comm_destroy_internal_actions(action);
559 /* If there are requests associated with the action, then answer them */
560 if (xbt_fifo_size(action->request_list))
561 SIMIX_comm_finish(action);
564 void SIMIX_comm_cancel(smx_action_t action)
566 /* If the action is a waiting state means that it is still in a rdv */
567 /* so remove from it and delete it */
568 if (action->state == SIMIX_WAITING) {
569 SIMIX_rdv_remove(action->comm.rdv, action);
570 action->state = SIMIX_FAILED;
572 surf_workstation_model->action_cancel(action->comm.surf_comm);
576 void SIMIX_comm_suspend(smx_action_t action)
578 /*FIXME: shall we suspend also the timeout actions? */
579 surf_workstation_model->suspend(action->comm.surf_comm);
582 void SIMIX_comm_resume(smx_action_t action)
584 /*FIXME: check what happen with the timeouts */
585 surf_workstation_model->resume(action->comm.surf_comm);
589 /************* Action Getters **************/
592 * \brief get the amount remaining from the communication
593 * \param action The communication
595 double SIMIX_comm_get_remains(smx_action_t action)
599 switch (action->state) {
602 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
607 remains = 0; /*FIXME: check what should be returned */
611 remains = 0; /*FIXME: is this correct? */
617 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
619 return action->state;
623 * \brief Return the user data associated to the sender of the communication
624 * \param action The communication
625 * \return the user data
627 void* SIMIX_comm_get_src_data(smx_action_t action)
629 return action->comm.src_data;
633 * \brief Return the user data associated to the receiver of the communication
634 * \param action The communication
635 * \return the user data
637 void* SIMIX_comm_get_dst_data(smx_action_t action)
639 return action->comm.dst_data;
642 void* SIMIX_comm_get_src_buff(smx_action_t action)
644 return action->comm.src_buff;
647 void* SIMIX_comm_get_dst_buff(smx_action_t action)
649 return action->comm.dst_buff;
652 size_t SIMIX_comm_get_src_buff_size(smx_action_t action)
654 return action->comm.src_buff_size;
657 size_t SIMIX_comm_get_dst_buff_size(smx_action_t action)
661 if (action->comm.dst_buff_size)
662 buff_size = *(action->comm.dst_buff_size);
669 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
671 return action->comm.src_proc;
674 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
676 return action->comm.dst_proc;
679 #ifdef HAVE_LATENCY_BOUND_TRACKING
681 * \brief verify if communication is latency bounded
682 * \param comm The communication
684 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
686 //try to find comm on the list of finished flows
689 xbt_dict_cursor_t cursor;
690 xbt_dict_foreach(simix_global->latency_limited_dict, cursor, key, data) {
691 DEBUG2("comparing key=%p with comm=%p", (void *) key, (void *) action);
692 if ((void *) action == (void *) key) {
693 DEBUG2("key %p found, return value latency limited value %d",
694 (void *) key, (int) data);
695 xbt_dict_cursor_free(&cursor);
700 return surf_workstation_model->get_latency_limited(action->comm.surf_comm);
704 /******************************************************************************/
705 /* SIMIX_comm_copy_data callbacks */
706 /******************************************************************************/
707 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
708 &SIMIX_comm_copy_pointer_callback;
711 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
713 SIMIX_comm_copy_data_callback = callback;
716 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
718 xbt_assert1((buff_size == sizeof(void *)),
719 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
720 *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
723 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
725 memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
729 * \brief Copy the communication data from the sender's buffer to the receiver's one
730 * \param comm The communication
732 void SIMIX_comm_copy_data(smx_action_t comm)
734 size_t buff_size = comm->comm.src_buff_size;
735 /* If there is no data to be copy then return */
736 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
739 DEBUG6("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
741 comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
742 comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
744 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
745 if (comm->comm.dst_buff_size)
746 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
748 /* Update the receiver's buffer size to the copied amount */
749 if (comm->comm.dst_buff_size)
750 *comm->comm.dst_buff_size = buff_size;
755 (*SIMIX_comm_copy_data_callback) (comm, buff_size);
757 /* Set the copied flag so we copy data only once */
758 /* (this function might be called from both communication ends) */
759 comm->comm.copied = 1;