Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
99f1c6b7ce033c5ed612c02dc6804093a07ce338
[simgrid.git] / examples / smpi / smpi_s4u_masterworker / masterworker_mailbox_smpi.cpp
1 /* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "mpi.h"
7 #include "simgrid/s4u.hpp"
8
9 #include <array>
10 #include <cstdio> /* snprintf */
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
13
14 static void master(std::vector<std::string> args)
15 {
16   xbt_assert(args.size() > 4, "The master function expects at least 3 arguments");
17
18   long tasks_count        = std::stol(args[1]);
19   double compute_cost     = std::stod(args[2]);
20   long communication_cost = std::stol(args[3]);
21   std::vector<simgrid::s4u::Mailbox*> workers;
22   for (unsigned int i = 4; i < args.size(); i++)
23     workers.push_back(simgrid::s4u::Mailbox::by_name(args[i]));
24
25   XBT_INFO("Got %zu workers and %ld tasks to process", workers.size(), tasks_count);
26
27   for (int i = 0; i < tasks_count; i++) { /* For each task to be executed: */
28     /* - Select a worker in a round-robin way */
29     simgrid::s4u::Mailbox* mailbox = workers[i % workers.size()];
30
31     /* - Send the computation cost to that worker */
32     XBT_INFO("Sending task %d of %ld to mailbox '%s'", i, tasks_count, mailbox->get_cname());
33     mailbox->put(new double(compute_cost), communication_cost);
34   }
35
36   XBT_INFO("All tasks have been dispatched. Request all workers to stop.");
37   for (unsigned int i = 0; i < workers.size(); i++) {
38     /* The workers stop when receiving a negative compute_cost */
39     simgrid::s4u::Mailbox* mailbox = workers[i % workers.size()];
40
41     mailbox->put(new double(-1.0), 0);
42   }
43 }
44
45 static void worker(std::vector<std::string> args)
46 {
47   xbt_assert(args.size() == 1, "The worker expects no argument");
48
49   const simgrid::s4u::Host* my_host = simgrid::s4u::this_actor::get_host();
50   simgrid::s4u::Mailbox* mailbox    = simgrid::s4u::Mailbox::by_name(my_host->get_name());
51
52   double compute_cost;
53   do {
54     auto msg     = mailbox->get_unique<double>();
55     compute_cost = *msg;
56
57     if (compute_cost > 0) /* If compute_cost is valid, execute a computation of that cost */
58       simgrid::s4u::this_actor::execute(compute_cost);
59   } while (compute_cost > 0); /* Stop when receiving an invalid compute_cost */
60
61   XBT_INFO("Exiting now.");
62 }
63
64 static void master_mpi(int argc, char* argv[])
65 {
66   MPI_Init(&argc, &argv);
67
68   int rank;
69   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
70   XBT_INFO("here for rank %d", rank);
71   std::array<int, 1000> test{{rank}};
72   if (rank == 0)
73     MPI_Send(test.data(), 1000, MPI_INT, 1, 1, MPI_COMM_WORLD);
74   else
75     MPI_Recv(test.data(), 1000, MPI_INT, 0, 1, MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
76
77   XBT_INFO("After comm %d", rank);
78   MPI_Finalize();
79
80   XBT_INFO("After finalize %d %d", rank, test[0]);
81 }
82
83 static void alltoall_mpi(int argc, char* argv[])
84 {
85   MPI_Init(&argc, &argv);
86
87   int rank;
88   int size;
89   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
90   MPI_Comm_size(MPI_COMM_WORLD, &size);
91   XBT_INFO("alltoall for rank %d", rank);
92   std::vector<int> out(1000 * size);
93   std::vector<int> in(1000 * size);
94   MPI_Alltoall(out.data(), 1000, MPI_INT, in.data(), 1000, MPI_INT, MPI_COMM_WORLD);
95
96   XBT_INFO("after alltoall %d", rank);
97   MPI_Finalize();
98 }
99
100 int main(int argc, char* argv[])
101 {
102   simgrid::s4u::Engine e(&argc, argv);
103
104   SMPI_init();
105
106   xbt_assert(argc > 2,
107              "Usage: %s platform_file deployment_file\n"
108              "\nexample: %s platform.xml deployment.xml\n",
109              argv[0], argv[0]);
110
111   e.load_platform(argv[1]);
112
113   e.register_function("master", master);
114   e.register_function("worker", worker);
115   // launch two MPI applications as well, one using master_mpi function as main on 2 nodes
116   SMPI_app_instance_register("master_mpi", master_mpi, 2);
117   // the second performing an alltoall on 4 nodes
118   SMPI_app_instance_register("alltoall_mpi", alltoall_mpi, 4);
119   e.load_deployment(argv[2]);
120
121   e.run();
122
123   XBT_INFO("Simulation time %g", simgrid::s4u::Engine::get_clock());
124
125   SMPI_finalize();
126   return 0;
127 }