1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
11 #include "src/mc/mc_replay.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
16 static xbt_dict_t rdv_points = NULL;
17 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_rdv_push(smx_mailbox_t rdv, smx_synchro_t comm);
23 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
24 int (*match_fun)(void *, void *,smx_synchro_t),
25 void *user_data, smx_synchro_t my_synchro);
26 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
27 int (*match_fun)(void *, void *,smx_synchro_t),
28 void *user_data, smx_synchro_t my_synchro);
29 static void SIMIX_rdv_free(void *data);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
32 void SIMIX_network_init(void)
34 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
37 void SIMIX_network_exit(void)
39 xbt_dict_free(&rdv_points);
42 /******************************************************************************/
43 /* Rendez-Vous Points */
44 /******************************************************************************/
46 smx_mailbox_t SIMIX_rdv_create(const char *name)
48 /* two processes may have pushed the same rdv_create simcall at the same time */
49 smx_mailbox_t rdv = name ? (smx_mailbox_t) xbt_dict_get_or_null(rdv_points, name) : NULL;
52 rdv = xbt_new0(s_smx_rvpoint_t, 1);
53 rdv->name = name ? xbt_strdup(name) : NULL;
54 rdv->comm_fifo = xbt_fifo_new();
55 rdv->done_comm_fifo = xbt_fifo_new();
56 rdv->permanent_receiver=NULL;
58 XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
61 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66 void SIMIX_rdv_destroy(smx_mailbox_t rdv)
69 xbt_dict_remove(rdv_points, rdv->name);
72 void SIMIX_rdv_free(void *data)
74 XBT_DEBUG("rdv free %p", data);
75 smx_mailbox_t rdv = (smx_mailbox_t) data;
77 xbt_fifo_free(rdv->comm_fifo);
78 xbt_fifo_free(rdv->done_comm_fifo);
83 xbt_dict_t SIMIX_get_rdv_points()
88 smx_mailbox_t SIMIX_rdv_get_by_name(const char *name)
90 return (smx_mailbox_t) xbt_dict_get_or_null(rdv_points, name);
93 int SIMIX_rdv_comm_count_by_host(smx_mailbox_t rdv, sg_host_t host)
95 smx_synchro_t comm = NULL;
96 xbt_fifo_item_t item = NULL;
99 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
100 if (comm->comm.src_proc->host == host)
107 smx_synchro_t SIMIX_rdv_get_head(smx_mailbox_t rdv)
109 return (smx_synchro_t) xbt_fifo_get_item_content(
110 xbt_fifo_get_first_item(rdv->comm_fifo));
114 * \brief get the receiver (process associated to the mailbox)
115 * \param rdv The rendez-vous point
116 * \return process The receiving process (NULL if not set)
118 smx_process_t SIMIX_rdv_get_receiver(smx_mailbox_t rdv)
120 return rdv->permanent_receiver;
124 * \brief set the receiver of the rendez vous point to allow eager sends
125 * \param rdv The rendez-vous point
126 * \param process The receiving process
128 void SIMIX_rdv_set_receiver(smx_mailbox_t rdv, smx_process_t process)
130 rdv->permanent_receiver=process;
134 * \brief Pushes a communication synchro into a rendez-vous point
135 * \param rdv The rendez-vous point
136 * \param comm The communication synchro
138 static inline void SIMIX_rdv_push(smx_mailbox_t rdv, smx_synchro_t comm)
140 xbt_fifo_push(rdv->comm_fifo, comm);
141 comm->comm.rdv = rdv;
145 * \brief Removes a communication synchro from a rendez-vous point
146 * \param rdv The rendez-vous point
147 * \param comm The communication synchro
149 void SIMIX_rdv_remove(smx_mailbox_t rdv, smx_synchro_t comm)
151 xbt_fifo_remove(rdv->comm_fifo, comm);
152 comm->comm.rdv = NULL;
156 * \brief Checks if there is a communication synchro queued in a fifo matching our needs
157 * \param type The type of communication we are looking for (comm_send, comm_recv)
158 * \return The communication synchro if found, NULL otherwise
160 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
161 int (*match_fun)(void *, void *,smx_synchro_t),
162 void *this_user_data, smx_synchro_t my_synchro)
164 smx_synchro_t synchro;
165 xbt_fifo_item_t item;
166 void* other_user_data = NULL;
168 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
169 if (synchro->comm.type == SIMIX_COMM_SEND) {
170 other_user_data = synchro->comm.src_data;
171 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
172 other_user_data = synchro->comm.dst_data;
174 if (synchro->comm.type == type &&
175 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
176 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
177 XBT_DEBUG("Found a matching communication synchro %p", synchro);
178 xbt_fifo_remove_item(fifo, item);
179 xbt_fifo_free_item(item);
180 synchro->comm.refcount++;
182 synchro->comm.rdv_cpy = synchro->comm.rdv;
184 synchro->comm.rdv = NULL;
187 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
188 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
189 synchro, (int)synchro->comm.type, (int)type);
191 XBT_DEBUG("No matching communication synchro found");
197 * \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
198 * \param type The type of communication we are looking for (comm_send, comm_recv)
199 * \return The communication synchro if found, NULL otherwise
201 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
202 int (*match_fun)(void *, void *,smx_synchro_t),
203 void *this_user_data, smx_synchro_t my_synchro)
205 smx_synchro_t synchro;
206 xbt_fifo_item_t item;
207 void* other_user_data = NULL;
209 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
210 if (synchro->comm.type == SIMIX_COMM_SEND) {
211 other_user_data = synchro->comm.src_data;
212 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
213 other_user_data = synchro->comm.dst_data;
215 if (synchro->comm.type == type &&
216 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
217 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
218 XBT_DEBUG("Found a matching communication synchro %p", synchro);
219 synchro->comm.refcount++;
223 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
224 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
225 synchro, (int)synchro->comm.type, (int)type);
227 XBT_DEBUG("No matching communication synchro found");
230 /******************************************************************************/
231 /* Communication synchros */
232 /******************************************************************************/
235 * \brief Creates a new communicate synchro
236 * \param type The direction of communication (comm_send, comm_recv)
237 * \return The new communicate synchro
239 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
241 smx_synchro_t synchro;
243 /* alloc structures */
244 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
246 synchro->type = SIMIX_SYNC_COMMUNICATE;
247 synchro->state = SIMIX_WAITING;
249 /* set communication */
250 synchro->comm.type = type;
251 synchro->comm.refcount = 1;
252 synchro->comm.src_data=NULL;
253 synchro->comm.dst_data=NULL;
255 synchro->category = NULL;
257 XBT_DEBUG("Create communicate synchro %p", synchro);
264 * \brief Destroy a communicate synchro
265 * \param synchro The communicate synchro to be destroyed
267 void SIMIX_comm_destroy(smx_synchro_t synchro)
269 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
270 synchro, synchro->comm.refcount, (int)synchro->state);
272 if (synchro->comm.refcount <= 0) {
273 xbt_backtrace_display_current();
274 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
275 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
277 synchro->comm.refcount--;
278 if (synchro->comm.refcount > 0)
280 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
281 synchro->comm.refcount);
283 xbt_free(synchro->name);
284 SIMIX_comm_destroy_internal_actions(synchro);
286 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
287 /* the communication has failed and was detached:
288 * we have to free the buffer */
289 if (synchro->comm.clean_fun) {
290 synchro->comm.clean_fun(synchro->comm.src_buff);
292 synchro->comm.src_buff = NULL;
295 if(synchro->comm.rdv)
296 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
298 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
301 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
303 if (synchro->comm.surf_comm){
304 synchro->comm.surf_comm->unref();
305 synchro->comm.surf_comm = NULL;
308 if (synchro->comm.src_timeout){
309 synchro->comm.src_timeout->unref();
310 synchro->comm.src_timeout = NULL;
313 if (synchro->comm.dst_timeout){
314 synchro->comm.dst_timeout->unref();
315 synchro->comm.dst_timeout = NULL;
319 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t rdv,
320 double task_size, double rate,
321 void *src_buff, size_t src_buff_size,
322 int (*match_fun)(void *, void *,smx_synchro_t),
323 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
324 void *data, double timeout){
325 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, rdv, task_size, rate,
326 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
328 SIMCALL_SET_MC_VALUE(simcall, 0);
329 simcall_HANDLER_comm_wait(simcall, comm, timeout);
331 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t rdv,
332 double task_size, double rate,
333 void *src_buff, size_t src_buff_size,
334 int (*match_fun)(void *, void *,smx_synchro_t),
335 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
336 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
337 void *data, int detached)
339 XBT_DEBUG("send from %p", rdv);
341 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
342 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
344 /* Look for communication synchro matching our needs. We also provide a description of
345 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
347 * If it is not found then push our communication into the rendez-vous point */
348 smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
350 if (!other_synchro) {
351 other_synchro = this_synchro;
353 if (rdv->permanent_receiver!=NULL){
354 //this mailbox is for small messages, which have to be sent right now
355 other_synchro->state = SIMIX_READY;
356 other_synchro->comm.dst_proc=rdv->permanent_receiver;
357 other_synchro->comm.refcount++;
358 xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
359 other_synchro->comm.rdv=rdv;
360 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
363 SIMIX_rdv_push(rdv, this_synchro);
366 XBT_DEBUG("Receive already pushed");
368 SIMIX_comm_destroy(this_synchro);
369 --smx_total_comms; // this creation was a pure waste
371 other_synchro->state = SIMIX_READY;
372 other_synchro->comm.type = SIMIX_COMM_READY;
375 xbt_fifo_push(src_proc->comms, other_synchro);
377 /* if the communication synchro is detached then decrease the refcount
378 * by one, so it will be eliminated by the receiver's destroy call */
380 other_synchro->comm.detached = 1;
381 other_synchro->comm.refcount--;
382 other_synchro->comm.clean_fun = clean_fun;
384 other_synchro->comm.clean_fun = NULL;
387 /* Setup the communication synchro */
388 other_synchro->comm.src_proc = src_proc;
389 other_synchro->comm.task_size = task_size;
390 other_synchro->comm.rate = rate;
391 other_synchro->comm.src_buff = src_buff;
392 other_synchro->comm.src_buff_size = src_buff_size;
393 other_synchro->comm.src_data = data;
395 other_synchro->comm.match_fun = match_fun;
396 other_synchro->comm.copy_data_fun = copy_data_fun;
399 if (MC_is_active() || MC_record_replay_is_active()) {
400 other_synchro->state = SIMIX_RUNNING;
401 return (detached ? NULL : other_synchro);
404 SIMIX_comm_start(other_synchro);
405 return (detached ? NULL : other_synchro);
408 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t rdv,
409 void *dst_buff, size_t *dst_buff_size,
410 int (*match_fun)(void *, void *, smx_synchro_t),
411 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
412 void *data, double timeout, double rate)
414 smx_synchro_t comm = SIMIX_comm_irecv(receiver, rdv, dst_buff,
415 dst_buff_size, match_fun, copy_data_fun, data, rate);
416 SIMCALL_SET_MC_VALUE(simcall, 0);
417 simcall_HANDLER_comm_wait(simcall, comm, timeout);
420 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t rdv,
421 void *dst_buff, size_t *dst_buff_size,
422 int (*match_fun)(void *, void *, smx_synchro_t),
423 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
424 void *data, double rate)
426 return SIMIX_comm_irecv(receiver, rdv, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
429 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t rdv, void *dst_buff, size_t *dst_buff_size,
430 int (*match_fun)(void *, void *, smx_synchro_t),
431 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
432 void *data, double rate)
434 XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
435 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
437 smx_synchro_t other_synchro;
438 //communication already done, get it inside the fifo of completed comms
439 if (rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0) {
441 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
442 //find a match in the already received fifo
443 other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
444 //if not found, assume the receiver came first, register it to the mailbox in the classical way
445 if (!other_synchro) {
446 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
447 other_synchro = this_synchro;
448 SIMIX_rdv_push(rdv, this_synchro);
450 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
451 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
452 other_synchro->state = SIMIX_DONE;
453 other_synchro->comm.type = SIMIX_COMM_DONE;
454 other_synchro->comm.rdv = NULL;
456 other_synchro->comm.refcount--;
457 SIMIX_comm_destroy(this_synchro);
458 --smx_total_comms; // this creation was a pure waste
461 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
463 /* Look for communication synchro matching our needs. We also provide a description of
464 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
466 * If it is not found then push our communication into the rendez-vous point */
467 other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
469 if (!other_synchro) {
470 XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
471 other_synchro = this_synchro;
472 SIMIX_rdv_push(rdv, this_synchro);
474 SIMIX_comm_destroy(this_synchro);
475 --smx_total_comms; // this creation was a pure waste
476 other_synchro->state = SIMIX_READY;
477 other_synchro->comm.type = SIMIX_COMM_READY;
478 //other_synchro->comm.refcount--;
480 xbt_fifo_push(dst_proc->comms, other_synchro);
483 /* Setup communication synchro */
484 other_synchro->comm.dst_proc = dst_proc;
485 other_synchro->comm.dst_buff = dst_buff;
486 other_synchro->comm.dst_buff_size = dst_buff_size;
487 other_synchro->comm.dst_data = data;
489 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
490 other_synchro->comm.rate = rate;
492 other_synchro->comm.match_fun = match_fun;
493 other_synchro->comm.copy_data_fun = copy_data_fun;
495 if (MC_is_active() || MC_record_replay_is_active()) {
496 other_synchro->state = SIMIX_RUNNING;
497 return other_synchro;
500 SIMIX_comm_start(other_synchro);
501 return other_synchro;
504 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t rdv,
505 int type, int src, int tag,
506 int (*match_fun)(void *, void *, smx_synchro_t),
508 return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
511 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t rdv, int type, int src,
512 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
514 XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
515 smx_synchro_t this_synchro;
518 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
519 smx_type = SIMIX_COMM_RECEIVE;
521 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
522 smx_type = SIMIX_COMM_SEND;
524 smx_synchro_t other_synchro=NULL;
525 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
526 //find a match in the already received fifo
527 XBT_DEBUG("first try in the perm recv mailbox");
529 other_synchro = SIMIX_fifo_probe_comm(
530 rdv->done_comm_fifo, (e_smx_comm_type_t) smx_type,
531 match_fun, data, this_synchro);
535 XBT_DEBUG("try in the normal mailbox");
536 other_synchro = SIMIX_fifo_probe_comm(
537 rdv->comm_fifo, (e_smx_comm_type_t) smx_type,
538 match_fun, data, this_synchro);
541 if(other_synchro)other_synchro->comm.refcount--;
543 SIMIX_comm_destroy(this_synchro);
545 return other_synchro;
548 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
550 /* the simcall may be a wait, a send or a recv */
553 /* Associate this simcall to the wait synchro */
554 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
556 xbt_fifo_push(synchro->simcalls, simcall);
557 simcall->issuer->waiting_synchro = synchro;
559 if (MC_is_active() || MC_record_replay_is_active()) {
560 int idx = SIMCALL_GET_MC_VALUE(simcall);
562 synchro->state = SIMIX_DONE;
564 /* If we reached this point, the wait simcall must have a timeout */
565 /* Otherwise it shouldn't be enabled and executed by the MC */
569 if (synchro->comm.src_proc == simcall->issuer)
570 synchro->state = SIMIX_SRC_TIMEOUT;
572 synchro->state = SIMIX_DST_TIMEOUT;
575 SIMIX_comm_finish(synchro);
579 /* If the synchro has already finish perform the error handling, */
580 /* otherwise set up a waiting timeout on the right side */
581 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
582 SIMIX_comm_finish(synchro);
583 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
584 sleep = surf_host_sleep(simcall->issuer->host, timeout);
585 sleep->setData(synchro);
587 if (simcall->issuer == synchro->comm.src_proc)
588 synchro->comm.src_timeout = sleep;
590 synchro->comm.dst_timeout = sleep;
594 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
596 if(MC_is_active() || MC_record_replay_is_active()){
597 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
598 if(simcall_comm_test__get__result(simcall)){
599 synchro->state = SIMIX_DONE;
600 xbt_fifo_push(synchro->simcalls, simcall);
601 SIMIX_comm_finish(synchro);
603 SIMIX_simcall_answer(simcall);
608 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
609 if (simcall_comm_test__get__result(simcall)) {
610 xbt_fifo_push(synchro->simcalls, simcall);
611 SIMIX_comm_finish(synchro);
613 SIMIX_simcall_answer(simcall);
617 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
620 smx_synchro_t synchro;
621 simcall_comm_testany__set__result(simcall, -1);
623 if (MC_is_active() || MC_record_replay_is_active()){
624 int idx = SIMCALL_GET_MC_VALUE(simcall);
626 SIMIX_simcall_answer(simcall);
628 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
629 simcall_comm_testany__set__result(simcall, idx);
630 xbt_fifo_push(synchro->simcalls, simcall);
631 synchro->state = SIMIX_DONE;
632 SIMIX_comm_finish(synchro);
637 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
638 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
639 simcall_comm_testany__set__result(simcall, cursor);
640 xbt_fifo_push(synchro->simcalls, simcall);
641 SIMIX_comm_finish(synchro);
645 SIMIX_simcall_answer(simcall);
648 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
650 smx_synchro_t synchro;
651 unsigned int cursor = 0;
653 if (MC_is_active() || MC_record_replay_is_active()){
654 int idx = SIMCALL_GET_MC_VALUE(simcall);
655 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
656 xbt_fifo_push(synchro->simcalls, simcall);
657 simcall_comm_waitany__set__result(simcall, idx);
658 synchro->state = SIMIX_DONE;
659 SIMIX_comm_finish(synchro);
663 xbt_dynar_foreach(synchros, cursor, synchro){
664 /* associate this simcall to the the synchro */
665 xbt_fifo_push(synchro->simcalls, simcall);
667 /* see if the synchro is already finished */
668 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
669 SIMIX_comm_finish(synchro);
675 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
677 smx_synchro_t synchro;
678 unsigned int cursor = 0;
679 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
681 xbt_dynar_foreach(synchros, cursor, synchro) {
682 xbt_fifo_remove(synchro->simcalls, simcall);
687 * \brief Starts the simulation of a communication synchro.
688 * \param synchro the communication synchro
690 static inline void SIMIX_comm_start(smx_synchro_t synchro)
692 /* If both the sender and the receiver are already there, start the communication */
693 if (synchro->state == SIMIX_READY) {
695 sg_host_t sender = synchro->comm.src_proc->host;
696 sg_host_t receiver = synchro->comm.dst_proc->host;
698 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
699 sg_host_get_name(sender), sg_host_get_name(receiver));
701 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
703 synchro->comm.task_size, synchro->comm.rate);
705 synchro->comm.surf_comm->setData(synchro);
707 synchro->state = SIMIX_RUNNING;
709 /* If a link is failed, detect it immediately */
710 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
711 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
712 sg_host_get_name(sender), sg_host_get_name(receiver));
713 synchro->state = SIMIX_LINK_FAILURE;
714 SIMIX_comm_destroy_internal_actions(synchro);
717 /* If any of the process is suspend, create the synchro but stop its execution,
718 it will be restarted when the sender process resume */
719 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
720 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
721 /* FIXME: check what should happen with the synchro state */
723 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
724 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
725 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
727 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
728 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
730 synchro->comm.surf_comm->suspend();
737 * \brief Answers the SIMIX simcalls associated to a communication synchro.
738 * \param synchro a finished communication synchro
740 void SIMIX_comm_finish(smx_synchro_t synchro)
742 unsigned int destroy_count = 0;
743 smx_simcall_t simcall;
745 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
747 /* If a waitany simcall is waiting for this synchro to finish, then remove
748 it from the other synchros in the waitany list. Afterwards, get the
749 position of the actual synchro in the waitany dynar and
750 return it as the result of the simcall */
752 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
753 continue; // if process handling comm is killed
754 if (simcall->call == SIMCALL_COMM_WAITANY) {
755 SIMIX_waitany_remove_simcall_from_actions(simcall);
756 if (!MC_is_active() && !MC_record_replay_is_active())
757 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
760 /* If the synchro is still in a rendez-vous point then remove from it */
761 if (synchro->comm.rdv)
762 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
764 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
766 /* Check out for errors */
768 if (simcall->issuer->host->isOff()) {
769 simcall->issuer->context->iwannadie = 1;
770 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
773 switch (synchro->state) {
776 XBT_DEBUG("Communication %p complete!", synchro);
777 SIMIX_comm_copy_data(synchro);
780 case SIMIX_SRC_TIMEOUT:
781 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
782 "Communication timeouted because of sender");
785 case SIMIX_DST_TIMEOUT:
786 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
787 "Communication timeouted because of receiver");
790 case SIMIX_SRC_HOST_FAILURE:
791 if (simcall->issuer == synchro->comm.src_proc)
792 simcall->issuer->context->iwannadie = 1;
793 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
795 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
798 case SIMIX_DST_HOST_FAILURE:
799 if (simcall->issuer == synchro->comm.dst_proc)
800 simcall->issuer->context->iwannadie = 1;
801 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
803 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
806 case SIMIX_LINK_FAILURE:
808 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
810 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
811 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
812 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
813 if (synchro->comm.src_proc == simcall->issuer) {
814 XBT_DEBUG("I'm source");
815 } else if (synchro->comm.dst_proc == simcall->issuer) {
816 XBT_DEBUG("I'm dest");
818 XBT_DEBUG("I'm neither source nor dest");
820 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
824 if (simcall->issuer == synchro->comm.dst_proc)
825 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
826 "Communication canceled by the sender");
828 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
829 "Communication canceled by the receiver");
833 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
836 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
837 if (simcall->issuer->doexception) {
838 if (simcall->call == SIMCALL_COMM_WAITANY) {
839 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
841 else if (simcall->call == SIMCALL_COMM_TESTANY) {
842 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
846 if (simcall->issuer->host->isOff()) {
847 simcall->issuer->context->iwannadie = 1;
850 simcall->issuer->waiting_synchro = NULL;
851 xbt_fifo_remove(simcall->issuer->comms, synchro);
852 if(synchro->comm.detached){
853 if(simcall->issuer == synchro->comm.src_proc){
854 if(synchro->comm.dst_proc)
855 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
857 if(simcall->issuer == synchro->comm.dst_proc){
858 if(synchro->comm.src_proc)
859 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
862 SIMIX_simcall_answer(simcall);
866 while (destroy_count-- > 0)
867 SIMIX_comm_destroy(synchro);
871 * \brief This function is called when a Surf communication synchro is finished.
872 * \param synchro the corresponding Simix communication
874 void SIMIX_post_comm(smx_synchro_t synchro)
876 /* Update synchro state */
877 if (synchro->comm.src_timeout &&
878 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
879 synchro->state = SIMIX_SRC_TIMEOUT;
880 else if (synchro->comm.dst_timeout &&
881 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
882 synchro->state = SIMIX_DST_TIMEOUT;
883 else if (synchro->comm.src_timeout &&
884 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
885 synchro->state = SIMIX_SRC_HOST_FAILURE;
886 else if (synchro->comm.dst_timeout &&
887 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
888 synchro->state = SIMIX_DST_HOST_FAILURE;
889 else if (synchro->comm.surf_comm &&
890 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
891 XBT_DEBUG("Puta madre. Surf says that the link broke");
892 synchro->state = SIMIX_LINK_FAILURE;
894 synchro->state = SIMIX_DONE;
896 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
897 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
899 /* destroy the surf actions associated with the Simix communication */
900 SIMIX_comm_destroy_internal_actions(synchro);
902 /* if there are simcalls associated with the synchro, then answer them */
903 if (xbt_fifo_size(synchro->simcalls)) {
904 SIMIX_comm_finish(synchro);
908 void SIMIX_comm_cancel(smx_synchro_t synchro)
910 /* if the synchro is a waiting state means that it is still in a rdv */
911 /* so remove from it and delete it */
912 if (synchro->state == SIMIX_WAITING) {
913 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
914 synchro->state = SIMIX_CANCELED;
916 else if (!MC_is_active() /* when running the MC there are no surf actions */
917 && !MC_record_replay_is_active()
918 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
920 synchro->comm.surf_comm->cancel();
924 void SIMIX_comm_suspend(smx_synchro_t synchro)
926 /*FIXME: shall we suspend also the timeout synchro? */
927 if (synchro->comm.surf_comm)
928 synchro->comm.surf_comm->suspend();
929 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
932 void SIMIX_comm_resume(smx_synchro_t synchro)
934 /*FIXME: check what happen with the timeouts */
935 if (synchro->comm.surf_comm)
936 synchro->comm.surf_comm->resume();
937 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
941 /************* synchro Getters **************/
944 * \brief get the amount remaining from the communication
945 * \param synchro The communication
947 double SIMIX_comm_get_remains(smx_synchro_t synchro)
955 switch (synchro->state) {
958 remains = synchro->comm.surf_comm->getRemains();
963 remains = 0; /*FIXME: check what should be returned */
967 remains = 0; /*FIXME: is this correct? */
973 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
975 return synchro->state;
979 * \brief Return the user data associated to the sender of the communication
980 * \param synchro The communication
981 * \return the user data
983 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
985 return synchro->comm.src_data;
989 * \brief Return the user data associated to the receiver of the communication
990 * \param synchro The communication
991 * \return the user data
993 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
995 return synchro->comm.dst_data;
998 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1000 return synchro->comm.src_proc;
1003 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1005 return synchro->comm.dst_proc;
1008 /******************************************************************************/
1009 /* SIMIX_comm_copy_data callbacks */
1010 /******************************************************************************/
1011 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1012 &SIMIX_comm_copy_pointer_callback;
1015 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1017 SIMIX_comm_copy_data_callback = callback;
1020 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1022 xbt_assert((buff_size == sizeof(void *)),
1023 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1024 *(void **) (comm->comm.dst_buff) = buff;
1027 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1029 XBT_DEBUG("Copy the data over");
1030 memcpy(comm->comm.dst_buff, buff, buff_size);
1031 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1033 comm->comm.src_buff = NULL;
1039 * \brief Copy the communication data from the sender's buffer to the receiver's one
1040 * \param comm The communication
1042 void SIMIX_comm_copy_data(smx_synchro_t comm)
1044 size_t buff_size = comm->comm.src_buff_size;
1045 /* If there is no data to be copy then return */
1046 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1049 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1051 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1052 comm->comm.src_buff,
1053 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1054 comm->comm.dst_buff, buff_size);
1056 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1057 if (comm->comm.dst_buff_size)
1058 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1060 /* Update the receiver's buffer size to the copied amount */
1061 if (comm->comm.dst_buff_size)
1062 *comm->comm.dst_buff_size = buff_size;
1065 if(comm->comm.copy_data_fun)
1066 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1068 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1072 /* Set the copied flag so we copy data only once */
1073 /* (this function might be called from both communication ends) */
1074 comm->comm.copied = 1;