- Comm::waitallfor() is gone too. Its semantic was unclear on timeout anyway.
- Io::waitany() and waitanyfor() are gone. Please use ActivitySet() instead.
+C API:
+ - Introduce sg_activity_set_t and deprecate wait_all/wait_any/test_any for
+ Exec, Io and Comm.
+
----------------------------------------------------------------------------
SimGrid (3.34) June 26. 2023
actor-suspend actor-yield
activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworker app-token-ring
- comm-pingpong comm-wait comm-waitany
+ comm-pingpong comm-wait
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-pastry
exec-async exec-basic exec-dvfs exec-remote
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait2_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait3_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait4_d.xml
- ${CMAKE_CURRENT_SOURCE_DIR}/comm-waitany/comm-waitany_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/dht-kademlia_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/dht-pastry/dht-pastry_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/io-file-remote/io-file-remote_d.xml
actor-suspend actor-yield
activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-bittorrent app-chainsend app-masterworker app-token-ring
- comm-pingpong comm-wait comm-waitany
+ comm-pingpong comm-wait
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-kademlia dht-pastry
exec-async exec-basic exec-dvfs exec-remote
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/comm.h"
-#include "simgrid/engine.h"
-#include "simgrid/forward.h"
-#include "simgrid/mailbox.h"
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/sysdep.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitany, "Messages specific for this example");
-
-static void sender(int argc, char* argv[])
-{
- xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
- long messages_count = xbt_str_parse_int(argv[1], "Invalid message count");
- long msg_size = xbt_str_parse_int(argv[2], "Invalid message size");
- long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
- xbt_assert(receivers_count > 0);
-
- /* Array in which we store all ongoing communications */
- sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
- int pending_comms_count = 0;
-
- /* Make an array of the mailboxes to use */
- sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
- for (long i = 0; i < receivers_count; i++) {
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%ld", i);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- mboxes[i] = mbox;
- }
-
- /* Start dispatching all messages to receivers, in a round robin fashion */
- for (long i = 0; i < messages_count; i++) {
- char msg_content[80];
- snprintf(msg_content, 79, "Message %ld", i);
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
-
- /* Create a communication representing the ongoing communication, and store it in pending_comms */
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), msg_size);
- }
- /* Start sending messages to let the workers know that they should stop */
- for (long i = 0; i < receivers_count; i++) {
- XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
- char* end_msg = xbt_strdup("finalize");
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
- }
-
- XBT_INFO("Done dispatching all messages");
-
- /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating message with wait_any() and remove it from the array (with a memmove),
- * until all comms are terminated.
- * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- */
- while (pending_comms_count != 0) {
- ssize_t changed_pos = sg_comm_wait_any(pending_comms, pending_comms_count);
- memmove(pending_comms + changed_pos, pending_comms + changed_pos + 1,
- sizeof(sg_comm_t) * (pending_comms_count - changed_pos - 1));
- pending_comms_count--;
-
- if (changed_pos != 0)
- XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
- changed_pos);
- }
-
- xbt_free(pending_comms);
- xbt_free(mboxes);
-
- XBT_INFO("Goodbye now!");
-}
-
-static void receiver(int argc, char* argv[])
-{
- xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
- int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%d", id);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- XBT_INFO("Wait for my first message on '%s'", mailbox_name);
- while (1) {
- char* received = (char*)sg_mailbox_get(mbox);
- XBT_INFO("I got a '%s'.", received);
- if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
- xbt_free(received);
- break;
- }
- xbt_free(received);
- }
-
- XBT_INFO("I'm done. See you!");
-}
-
-int main(int argc, char* argv[])
-{
- simgrid_init(&argc, argv);
- xbt_assert(argc > 2,
- "Usage: %s platform_file deployment_file\n"
- "\tExample: %s platform.xml deployment.xml\n",
- argv[0], argv[0]);
-
- simgrid_load_platform(argv[1]);
-
- simgrid_register_function("sender", sender);
- simgrid_register_function("receiver", receiver);
- simgrid_load_deployment(argv[2]);
-
- simgrid_run();
- XBT_INFO("Simulation time %g", simgrid_get_clock());
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-comm-waitany ${platfdir:=.}/small_platform.xml ${srcdir:=.}/comm-waitany_d.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.000000] (2:receiver@Fafard) Wait for my first message on 'receiver-0'
-> [ 0.000000] (3:receiver@Jupiter) Wait for my first message on 'receiver-1'
-> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (2:receiver@Fafard) I'm done. See you!
-> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [ 0.526478] (0:maestro@) Simulation time 0.526478
-> [ 0.526478] (1:sender@Tremblay) Goodbye now!
-> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [ 0.526478] (3:receiver@Jupiter) I'm done. See you!
\ No newline at end of file
+++ /dev/null
-<?xml version='1.0'?>
-<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
-<platform version="4.1">
- <!-- The master actor (with some arguments) -->
- <actor host="Tremblay" function="sender">
- <argument value="6"/> <!-- Number of tasks -->
- <argument value="1000000"/> <!-- Communication size of tasks -->
- <argument value="2"/> <!-- Number of receivers -->
- </actor>
- <!-- The receiver actors -->
- <actor host="Fafard" function="receiver">
- <argument value="0"/>
- </actor>
- <actor host="Jupiter" function="receiver">
- <argument value="1"/>
- </actor>
-</platform>
XBT_PUBLIC sg_error_t sg_comm_wait_for(sg_comm_t comm, double timeout);
XBT_PUBLIC void sg_comm_unref(sg_comm_t comm);
+#ifndef DOXYGEN
XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
void sg_comm_wait_all(sg_comm_t* comms, size_t count);
-XBT_PUBLIC ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+ ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+ ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
+#endif
SG_END_DECL
/*! \static take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done). */
static ssize_t test_any(const std::vector<CommPtr>& comms);
- /*! \static take a vector s4u::CommPtr and return when one of them is finished.
- * The return value is the rank of the first finished CommPtr. */
- static ssize_t wait_any(const std::vector<CommPtr>& comms) { return wait_any_for(comms, -1); }
- /*! \static Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
- static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout);
-
#ifndef DOXYGEN
+ static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
+ static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
+
+ static ssize_t deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout);
+
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static void wait_all(const std::vector<CommPtr>& comms);
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static size_t
wait_all_for(const std::vector<CommPtr>& comms, double timeout);
return this;
}
-ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
+ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout)
{
std::vector<ActivityPtr> activities;
for (const auto& comm : comms)
ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
{
- return sg_comm_wait_any_for(comms, count, -1);
+ std::vector<simgrid::s4u::CommPtr> s4u_comms;
+ for (size_t i = 0; i < count; i++)
+ s4u_comms.emplace_back(comms[i], false);
+
+ ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, -1);
+ for (size_t i = 0; i < count; i++) {
+ if (pos != -1 && static_cast<size_t>(pos) != i)
+ s4u_comms[i]->add_ref();
+ }
+ return pos;
}
ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
for (size_t i = 0; i < count; i++)
s4u_comms.emplace_back(comms[i], false);
- ssize_t pos = simgrid::s4u::Comm::wait_any_for(s4u_comms, timeout);
+ ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, timeout);
for (size_t i = 0; i < count; i++) {
if (pos != -1 && static_cast<size_t>(pos) != i)
s4u_comms[i]->add_ref();