Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
finally putting in proper use of mutexes. code still needs work.
[simgrid.git] / src / smpi / src / smpi_base.c
1 #include <stdio.h>
2
3 #include <signal.h>
4 #include <sys/time.h>
5 #include "xbt/xbt_portability.h"
6 #include "simix/simix.h"
7 #include "smpi.h"
8
9 xbt_fifo_t *smpi_pending_send_requests      = NULL;
10 xbt_fifo_t *smpi_pending_recv_requests      = NULL;
11 xbt_fifo_t *smpi_received_messages          = NULL;
12
13 int smpi_running_hosts = 0;
14
15 smpi_mpi_communicator_t smpi_mpi_comm_world;
16
17 smpi_mpi_status_t smpi_mpi_status_ignore;
18
19 smpi_mpi_datatype_t smpi_mpi_byte;
20 smpi_mpi_datatype_t smpi_mpi_int;
21 smpi_mpi_datatype_t smpi_mpi_double;
22
23 smpi_mpi_op_t smpi_mpi_land;
24 smpi_mpi_op_t smpi_mpi_sum;
25
26 static xbt_os_timer_t smpi_timer;
27 static int smpi_benchmarking;
28 static double smpi_reference_speed;
29
30 XBT_LOG_NEW_DEFAULT_CATEGORY(smpi, "SMPI");
31
32 // mutexes
33 smx_mutex_t smpi_running_hosts_mutex;
34
35 int smpi_sender(int argc, char **argv)
36 {
37         return 0;
38 }
39
40 int smpi_receiver(int argc, char **argv)
41 {
42         return 0;
43 }
44
45 int smpi_run_simulation(int argc, char **argv)
46 {
47         smx_cond_t   cond           = NULL;
48         smx_action_t action         = NULL;
49
50         xbt_fifo_t   actions_failed = xbt_fifo_new();
51         xbt_fifo_t   actions_done   = xbt_fifo_new();
52
53         srand(SMPI_RAND_SEED);
54
55         SIMIX_global_init(&argc, argv);
56         SIMIX_function_register("smpi_simulated_main", smpi_simulated_main);
57         SIMIX_create_environment(argv[1]);
58         SIMIX_launch_application(argv[2]);
59
60         /* Prepare to display some more info when dying on Ctrl-C pressing */
61         //signal(SIGINT, inthandler);
62
63         /* Clean IO before the run */
64         fflush(stdout);
65         fflush(stderr);
66
67         while (SIMIX_solve(actions_done, actions_failed) != -1.0) {
68                 while (action = xbt_fifo_pop(actions_failed)) {
69                         DEBUG1("** %s failed **", action->name);
70                         while (cond = xbt_fifo_pop(action->cond_list)) {
71                                 SIMIX_cond_broadcast(cond);
72                         }
73                         SIMIX_action_destroy(action);
74                 }
75                 while (action = xbt_fifo_pop(actions_done)) {
76                         DEBUG1("** %s done **",action->name);
77                         while (cond = xbt_fifo_pop(action->cond_list)) {
78                                 SIMIX_cond_broadcast(cond);
79                         }
80                         SIMIX_action_destroy(action);
81                 }
82         }
83         xbt_fifo_free(actions_failed);
84         xbt_fifo_free(actions_done);
85         INFO1("simulation time %g", SIMIX_get_clock());
86         SIMIX_clean();
87         return 0;
88 }
89
90 void smpi_mpi_land_func(void *x, void *y, void *z)
91 {
92         *(int *)z = *(int *)x && *(int *)y;
93 }
94
95 void smpi_mpi_sum_func(void *x, void *y, void *z)
96 {
97         *(int *)z = *(int *)x + *(int *)y;
98 }
99
100 int smpi_mpi_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
101 {
102         int i;
103
104         for(i = comm->size - 1; i > 0 && host != comm->hosts[i]; i--);
105
106         return i;
107 }
108
109 int inline smpi_mpi_rank_self(smpi_mpi_communicator_t *comm)
110 {
111         return smpi_mpi_rank(comm, SIMIX_host_self());
112 }
113
114 void smpi_mpi_init()
115 {
116         int i;
117         int size;
118         smx_host_t *hosts;
119         smx_host_t host;
120         double duration;
121
122         // initialize some local variables
123         host  = SIMIX_host_self();
124         hosts = SIMIX_host_get_table();
125         size  = SIMIX_host_get_number();
126
127         // node 0 sets the globals
128         if (host == hosts[0]) {
129
130                 // mutexes
131                 smpi_running_hosts_mutex          = SIMIX_mutex_init();
132
133                 // global communicator
134                 smpi_mpi_comm_world.size          = size;
135                 smpi_mpi_comm_world.barrier       = 0;
136                 smpi_mpi_comm_world.barrier_mutex = SIMIX_mutex_init();
137                 smpi_mpi_comm_world.barrier_cond  = SIMIX_cond_init();
138                 smpi_mpi_comm_world.hosts         = hosts;
139                 smpi_mpi_comm_world.processes     = xbt_new0(smx_process_t, size);
140                 smpi_mpi_comm_world.processes[0]  = SIMIX_process_self();
141
142                 // mpi datatypes
143                 smpi_mpi_byte.size                = (size_t)1;
144                 smpi_mpi_int.size                 = sizeof(int);
145                 smpi_mpi_double.size              = sizeof(double);
146
147                 // mpi operations
148                 smpi_mpi_land.func                = &smpi_mpi_land_func;
149                 smpi_mpi_sum.func                 = &smpi_mpi_sum_func;
150
151                 // smpi globals
152                 smpi_pending_send_requests        = xbt_new0(xbt_fifo_t, size);
153                 smpi_pending_recv_requests        = xbt_new0(xbt_fifo_t, size);
154                 smpi_received_messages            = xbt_new0(xbt_fifo_t, size);
155
156                 for(i = 0; i < size; i++) {
157                         smpi_pending_send_requests[i] = xbt_fifo_new();
158                         smpi_pending_recv_requests[i] = xbt_fifo_new();
159                         smpi_received_messages[i]     = xbt_fifo_new();
160                 }
161
162                 smpi_timer                      = xbt_os_timer_new();
163                 smpi_reference_speed            = SMPI_DEFAULT_SPEED;
164                 smpi_benchmarking               = 0;
165
166                 // FIXME: tell other nodes to initialize, and wait for all clear
167
168                 // FIXME: send everyone okay to begin
169
170         } else {
171                 // FIMXE: wait for node 0 
172                 smpi_mpi_comm_world.processes[smpi_mpi_rank_self(&smpi_mpi_comm_world)] = SIMIX_process_self();
173                 // FIXME: signal node 0
174                 // FIXME: wait for node 0
175         }
176
177         SIMIX_mutex_lock(smpi_running_hosts_mutex);
178         smpi_running_hosts++;
179         SIMIX_mutex_lock(smpi_running_hosts_mutex);
180 }
181
182 void smpi_mpi_finalize()
183 {
184         int i;
185
186         SIMIX_mutex_lock(smpi_running_hosts_mutex);
187         i = --smpi_running_hosts;
188         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
189
190         if (0 >= i) {
191
192                 SIMIX_mutex_destroy(smpi_running_hosts_mutex);
193
194                 for (i = 0 ; i < smpi_mpi_comm_world.size; i++) {
195                         xbt_fifo_free(smpi_pending_send_requests[i]);
196                         xbt_fifo_free(smpi_pending_recv_requests[i]);
197                         xbt_fifo_free(smpi_received_messages[i]);
198                 }
199
200                 xbt_free(smpi_pending_send_requests);
201                 xbt_free(smpi_pending_recv_requests);
202                 xbt_free(smpi_received_messages);
203                 xbt_free(smpi_mpi_comm_world.processes);
204                 xbt_os_timer_free(smpi_timer);
205         }
206
207 }
208
209 void smpi_bench_begin()
210 {
211         xbt_assert0(!smpi_benchmarking, "Already benchmarking");
212         smpi_benchmarking = 1;
213         xbt_os_timer_start(smpi_timer);
214         return;
215 }
216
217 void smpi_bench_end()
218 {
219         double duration;
220         xbt_assert0(smpi_benchmarking, "Not benchmarking yet");
221         smpi_benchmarking = 0;
222         xbt_os_timer_stop(smpi_timer);
223         duration = xbt_os_timer_elapsed(smpi_timer);
224         // FIXME: add simix call to perform computation
225         return;
226 }
227
228 void smpi_barrier(smpi_mpi_communicator_t *comm) {
229         int i;
230         SIMIX_mutex_lock(comm->barrier_mutex);
231         comm->barrier++;
232         if(i < comm->size) {
233                 SIMIX_cond_wait(comm->barrier_cond, comm->barrier_mutex);
234         } else {
235                 comm->barrier = 0;
236                 SIMIX_cond_broadcast(comm->barrier_cond);
237         }
238         SIMIX_mutex_unlock(comm->barrier_mutex);
239 }
240
241 int smpi_comm_rank(smpi_mpi_communicator_t *comm, smx_host_t host)
242 {
243         int i;
244         for(i = 0; i < comm->size && host != comm->hosts[i]; i++);
245         if (i >= comm->size) i = -1;
246         return i;
247 }
248
249 // FIXME: move into own file
250 int smpi_gettimeofday(struct timeval *tv, struct timezone *tz)
251 {
252         double now;
253         int retval = 0;
254         smpi_bench_end();
255         if (NULL == tv) {
256                 retval = -1;
257         } else {
258                 now = SIMIX_get_clock();
259                 tv->tv_sec  = now;
260                 tv->tv_usec = ((now - (double)tv->tv_sec) * 1000000.0);
261         }
262         smpi_bench_begin();
263         return retval;
264 }
265
266 unsigned int smpi_sleep(unsigned int seconds)
267 {
268         smx_host_t self;
269         smx_action_t sleep_action;
270
271         smpi_bench_end();
272         // FIXME: simix sleep
273         self = SIMIX_host_self();
274         sleep_action = SIMIX_action_sleep(self, seconds);
275         sleep(seconds);
276         smpi_bench_begin();
277         return 0;
278 }
279
280 void smpi_exit(int status)
281 {
282         smpi_bench_end();
283         SIMIX_mutex_lock(smpi_running_hosts_mutex);
284         smpi_running_hosts--;
285         SIMIX_mutex_unlock(smpi_running_hosts_mutex);
286         SIMIX_process_kill(SIMIX_process_self());
287         return;
288 }