Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2022.
[simgrid.git] / src / msg / msg_comm.cpp
1 /* Copyright (c) 2004-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <cmath>
7
8 #include "simgrid/Exception.hpp"
9 #include "simgrid/s4u/Actor.hpp"
10 #include "simgrid/s4u/Comm.hpp"
11 #include "simgrid/s4u/Exec.hpp"
12 #include "simgrid/s4u/Mailbox.hpp"
13 #include "src/instr/instr_private.hpp"
14 #include "src/msg/msg_private.hpp"
15
16 namespace simgrid {
17 namespace msg {
18
19 bool Comm::test()
20 {
21   bool finished = false;
22
23   try {
24     finished = s_comm->test();
25     if (finished && task_received != nullptr) {
26       /* I am the receiver */
27       (*task_received)->set_not_used();
28     }
29   } catch (const simgrid::TimeoutException&) {
30     status_  = MSG_TIMEOUT;
31     finished = true;
32   } catch (const simgrid::CancelException&) {
33     status_  = MSG_TASK_CANCELED;
34     finished = true;
35   } catch (const simgrid::NetworkFailureException&) {
36     status_  = MSG_TRANSFER_FAILURE;
37     finished = true;
38   }
39
40   return finished;
41 }
42 msg_error_t Comm::wait_for(double timeout)
43 {
44   try {
45     s_comm->wait_for(timeout);
46
47     if (task_received != nullptr) {
48       /* I am the receiver */
49       (*task_received)->set_not_used();
50     }
51
52     /* FIXME: these functions are not traceable */
53   } catch (const simgrid::TimeoutException&) {
54     status_ = MSG_TIMEOUT;
55   } catch (const simgrid::CancelException&) {
56     status_ = MSG_TASK_CANCELED;
57   } catch (const simgrid::NetworkFailureException&) {
58     status_ = MSG_TRANSFER_FAILURE;
59   }
60
61   return status_;
62 }
63 } // namespace msg
64 } // namespace simgrid
65
66 /**
67  * @brief Checks whether a communication is done, and if yes, finalizes it.
68  * @param comm the communication to test
69  * @return 'true' if the communication is finished
70  * (but it may have failed, use MSG_comm_get_status() to know its status)
71  * or 'false' if the communication is not finished yet
72  * If the status is 'false', don't forget to use MSG_process_sleep() after the test.
73  */
74 int MSG_comm_test(msg_comm_t comm)
75 {
76   return comm->test();
77 }
78
79 /**
80  * @brief This function checks if a communication is finished.
81  * @param comms a vector of communications
82  * @return the position of the finished communication if any
83  * (but it may have failed, use MSG_comm_get_status() to know its status), or -1 if none is finished
84  */
85 int MSG_comm_testany(const_xbt_dynar_t comms)
86 {
87   ssize_t finished_index = -1;
88
89   /* Create the equivalent array with SIMIX objects: */
90   std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
91   s_comms.reserve(xbt_dynar_length(comms));
92   msg_comm_t comm;
93   unsigned int cursor;
94   xbt_dynar_foreach (comms, cursor, comm) {
95     s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
96   }
97
98   msg_error_t status = MSG_OK;
99   try {
100     finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
101   } catch (const simgrid::TimeoutException& e) {
102     finished_index = e.get_value();
103     status         = MSG_TIMEOUT;
104   } catch (const simgrid::CancelException& e) {
105     finished_index = e.get_value();
106     status         = MSG_TASK_CANCELED;
107   } catch (const simgrid::NetworkFailureException& e) {
108     finished_index = e.get_value();
109     status         = MSG_TRANSFER_FAILURE;
110   }
111
112   if (finished_index != -1) {
113     comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
114     /* the communication is finished */
115     comm->set_status(status);
116
117     if (status == MSG_OK && comm->task_received != nullptr) {
118       /* I am the receiver */
119       (*comm->task_received)->set_not_used();
120     }
121   }
122
123   return static_cast<int>(finished_index);
124 }
125
126 /** @brief Destroys the provided communication. */
127 void MSG_comm_destroy(const_msg_comm_t comm)
128 {
129   delete comm;
130 }
131
132 /** @brief Wait for the completion of a communication.
133  *
134  * It takes two parameters.
135  * @param comm the communication to wait.
136  * @param timeout Wait until the communication terminates or the timeout occurs.
137  *                You can provide a -1 timeout to obtain an infinite timeout.
138  * @return msg_error_t
139  */
140 msg_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
141 {
142   return comm->wait_for(timeout);
143 }
144
145 /** @brief This function is called by a sender and permits waiting for each communication
146  *
147  * @param comm a vector of communication
148  * @param nb_elem is the size of the comm vector
149  * @param timeout for each call of MSG_comm_wait
150  */
151 void MSG_comm_waitall(msg_comm_t* comm, int nb_elem, double timeout)
152 {
153   for (int i = 0; i < nb_elem; i++)
154     comm[i]->wait_for(timeout);
155 }
156
157 /** @brief This function waits for the first communication finished in a list.
158  * @param comms a vector of communications
159  * @return the position of the first finished communication
160  * (but it may have failed, use MSG_comm_get_status() to know its status)
161  */
162 int MSG_comm_waitany(const_xbt_dynar_t comms)
163 {
164   ssize_t finished_index = -1;
165
166   /* Create the equivalent array with SIMIX objects: */
167   std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
168   s_comms.reserve(xbt_dynar_length(comms));
169   msg_comm_t comm;
170   unsigned int cursor;
171   xbt_dynar_foreach (comms, cursor, comm) {
172     s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl()));
173   }
174
175   msg_error_t status = MSG_OK;
176   try {
177     finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
178   } catch (const simgrid::TimeoutException& e) {
179     finished_index = e.get_value();
180     status         = MSG_TIMEOUT;
181   } catch (const simgrid::CancelException& e) {
182     finished_index = e.get_value();
183     status         = MSG_TASK_CANCELED;
184   } catch (const simgrid::NetworkFailureException& e) {
185     finished_index = e.get_value();
186     status         = MSG_TRANSFER_FAILURE;
187   }
188
189   xbt_assert(finished_index != -1, "WaitAny returned -1");
190
191   comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
192   /* the communication is finished */
193   comm->set_status(status);
194
195   if (comm->task_received != nullptr) {
196     /* I am the receiver */
197     (*comm->task_received)->set_not_used();
198   }
199
200   return static_cast<int>(finished_index);
201 }
202
203 /**
204  * @brief Returns the error (if any) that occurred during a finished communication.
205  * @param comm a finished communication
206  * @return the status of the communication, or #MSG_OK if no error occurred during the communication
207  */
208 msg_error_t MSG_comm_get_status(const_msg_comm_t comm)
209 {
210   return comm->get_status();
211 }
212
213 /** @brief Get a task (#msg_task_t) from a communication
214  *
215  * @param comm the communication where to get the task
216  * @return the task from the communication
217  */
218 msg_task_t MSG_comm_get_task(const_msg_comm_t comm)
219 {
220   xbt_assert(comm, "Invalid parameter");
221
222   return comm->task_received ? *comm->task_received : comm->task_sent;
223 }