1 /* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "simgrid/actor.h"
7 #include "simgrid/comm.h"
8 #include "simgrid/engine.h"
9 #include "simgrid/forward.h"
10 #include "simgrid/mailbox.h"
11 #include "xbt/asserts.h"
15 #include <stdio.h> /* snprintf */
17 XBT_LOG_NEW_DEFAULT_CATEGORY(async_waitany, "Messages specific for this example");
19 static int sender(int argc, char* argv[])
21 xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
22 long messages_count = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
23 long msg_size = xbt_str_parse_int(argv[2], "Invalid communication size: %s");
24 long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers: %s");
26 /* Array in which we store all ongoing communications */
27 sg_comm_t* pending_comms = malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
28 int pending_comms_count = 0;
30 /* Make an array of the mailboxes to use */
31 sg_mailbox_t* mboxes = malloc(sizeof(sg_mailbox_t) * receivers_count);
32 for (long i = 0; i < receivers_count; i++) {
33 char mailbox_name[80];
34 snprintf(mailbox_name, 79, "receiver-%ld", (i));
35 sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
39 /* Start dispatching all messages to receivers, in a round robin fashion */
40 for (int i = 0; i < messages_count; i++) {
42 snprintf(msg_content, 79, "Message_%d", i);
43 sg_mailbox_t mbox = mboxes[i % receivers_count];
44 XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
46 sg_comm_t comm = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), msg_size);
47 pending_comms[pending_comms_count++] = comm;
49 /* Start sending messages to let the workers know that they should stop */
50 for (int i = 0; i < receivers_count; i++) {
51 XBT_INFO("Send 'finalize' to 'receiver-%d'", i);
52 char* end_msg = xbt_strdup("finalize");
53 sg_mailbox_t mbox = mboxes[i % receivers_count];
54 sg_comm_t comm = sg_mailbox_put_async(mbox, end_msg, 0);
55 pending_comms[pending_comms_count++] = comm;
58 XBT_INFO("Done dispatching all messages");
60 /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
62 * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are
64 * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
66 while (pending_comms_count != 0) {
67 int changed_pos = sg_comm_wait_any_for(pending_comms, pending_comms_count, -1);
68 memmove(pending_comms + changed_pos, pending_comms + changed_pos + 1,
69 sizeof(sg_comm_t) * (pending_comms_count - changed_pos - 1));
70 pending_comms_count--;
73 XBT_INFO("Remove the %dth pending comm: it terminated earlier than another comm that was initiated first.",
80 XBT_INFO("Goodbye now!");
84 static int receiver(int argc, char* argv[])
86 xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
87 int id = xbt_str_parse_int(argv[1], "ID should be numerical, not %s");
88 char mailbox_name[80];
89 snprintf(mailbox_name, 79, "receiver-%d", id);
90 sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
91 XBT_INFO("Wait for my first message on '%s'", mailbox_name);
93 char* received = (char*)sg_mailbox_get(mbox);
94 XBT_INFO("I got a '%s'.", received);
95 if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
102 XBT_INFO("I'm done. See you!");
106 int main(int argc, char* argv[])
108 simgrid_init(&argc, argv);
110 "Usage: %s platform_file deployment_file\n"
111 "\tExample: %s msg_platform.xml msg_deployment.xml\n",
114 simgrid_load_platform(argv[1]);
116 simgrid_register_function("sender", sender);
117 simgrid_register_function("receiver", receiver);
118 simgrid_load_deployment(argv[2]);
122 XBT_INFO("Simulation time %g", simgrid_get_clock());