Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2022.
[simgrid.git] / examples / c / comm-waitany / comm-waitany.c
1 /* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/actor.h"
7 #include "simgrid/comm.h"
8 #include "simgrid/engine.h"
9 #include "simgrid/forward.h"
10 #include "simgrid/mailbox.h"
11 #include "xbt/log.h"
12 #include "xbt/str.h"
13 #include "xbt/sysdep.h"
14
15 #include <stdio.h> /* snprintf */
16
17 XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitany, "Messages specific for this example");
18
19 static void sender(int argc, char* argv[])
20 {
21   xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
22   long messages_count  = xbt_str_parse_int(argv[1], "Invalid message count");
23   long msg_size        = xbt_str_parse_int(argv[2], "Invalid message size");
24   long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
25   xbt_assert(receivers_count > 0);
26
27   /* Array in which we store all ongoing communications */
28   sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
29   int pending_comms_count  = 0;
30
31   /* Make an array of the mailboxes to use */
32   sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
33   for (long i = 0; i < receivers_count; i++) {
34     char mailbox_name[80];
35     snprintf(mailbox_name, 79, "receiver-%ld", i);
36     sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
37     mboxes[i]         = mbox;
38   }
39
40   /* Start dispatching all messages to receivers, in a round robin fashion */
41   for (long i = 0; i < messages_count; i++) {
42     char msg_content[80];
43     snprintf(msg_content, 79, "Message %ld", i);
44     sg_mailbox_t mbox = mboxes[i % receivers_count];
45     XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
46
47     /* Create a communication representing the ongoing communication, and store it in pending_comms */
48     pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), msg_size);
49   }
50   /* Start sending messages to let the workers know that they should stop */
51   for (long i = 0; i < receivers_count; i++) {
52     XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
53     char* end_msg                        = xbt_strdup("finalize");
54     sg_mailbox_t mbox                    = mboxes[i % receivers_count];
55     pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
56   }
57
58   XBT_INFO("Done dispatching all messages");
59
60   /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
61    *
62    * This loop waits for first terminating message with wait_any() and remove it from the array (with a memmove),
63    *  until all comms are terminated.
64    * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
65    */
66   while (pending_comms_count != 0) {
67     ssize_t changed_pos = sg_comm_wait_any(pending_comms, pending_comms_count);
68     memmove(pending_comms + changed_pos, pending_comms + changed_pos + 1,
69             sizeof(sg_comm_t) * (pending_comms_count - changed_pos - 1));
70     pending_comms_count--;
71
72     if (changed_pos != 0)
73       XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
74                changed_pos);
75   }
76
77   free(pending_comms);
78   free(mboxes);
79
80   XBT_INFO("Goodbye now!");
81 }
82
83 static void receiver(int argc, char* argv[])
84 {
85   xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
86   int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
87   char mailbox_name[80];
88   snprintf(mailbox_name, 79, "receiver-%d", id);
89   sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
90   XBT_INFO("Wait for my first message on '%s'", mailbox_name);
91   while (1) {
92     char* received = (char*)sg_mailbox_get(mbox);
93     XBT_INFO("I got a '%s'.", received);
94     if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
95       xbt_free(received);
96       break;
97     }
98     xbt_free(received);
99   }
100
101   XBT_INFO("I'm done. See you!");
102 }
103
104 int main(int argc, char* argv[])
105 {
106   simgrid_init(&argc, argv);
107   xbt_assert(argc > 2,
108              "Usage: %s platform_file deployment_file\n"
109              "\tExample: %s platform.xml deployment.xml\n",
110              argv[0], argv[0]);
111
112   simgrid_load_platform(argv[1]);
113
114   simgrid_register_function("sender", sender);
115   simgrid_register_function("receiver", receiver);
116   simgrid_load_deployment(argv[2]);
117
118   simgrid_run();
119   XBT_INFO("Simulation time %g", simgrid_get_clock());
120
121   return 0;
122 }