Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Refactor spawned_send and bcast_spawned_send
[simgrid.git] / examples / msg / actions / actions.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2009. The SimGrid team. All rights reserved.               */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
11 #include "xbt.h"                /* calloc, printf */
12
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                  "Messages specific for this msg example");
16 int communicator_size=0;
17
18 typedef struct coll_ctr_t{
19   int bcast_counter;
20 } *coll_ctr;
21
22 /* Helper function */
23 static double parse_double(const char *string) {
24   double value;
25   char *endptr;
26
27   value=strtod(string, &endptr);
28   if (*endptr != '\0')
29           THROW1(unknown_error, 0, "%s is not a double", string);
30   return value;
31 }
32
33
34 /* My actions */
35 static void send(xbt_dynar_t action)
36 {
37   char *name = xbt_str_join(action, " ");
38   char *to = xbt_dynar_get_as(action, 2, char *);
39   char *size = xbt_dynar_get_as(action, 3, char *);
40   INFO2("Send: %s (size: %lg)", name, parse_double(size));
41   MSG_task_send(MSG_task_create(name, 0, parse_double(size), NULL), to);
42   INFO1("Sent %s", name);
43   free(name);
44 }
45
46
47 static int spawned_send(int argc, char *argv[])
48 {
49   INFO3("%s: Sending %s on %s", MSG_process_self()->name, 
50         argv[1],argv[0]);
51   MSG_task_send(MSG_task_create(argv[0], 0, parse_double(argv[1]), NULL), 
52                 argv[0]);
53   return 0;
54 }
55
56 static void Isend(xbt_dynar_t action)
57 {
58   char spawn_name[80];
59   char *to = xbt_dynar_get_as(action, 2, char *);
60   char *size = xbt_dynar_get_as(action, 3, char *);
61   char **myargv;
62   m_process_t comm_helper;
63
64   INFO1("Isend on %s: spawn process ", 
65          MSG_process_self()->name);
66
67   myargv = (char**) calloc (3, sizeof (char*));
68   
69   myargv[0] = xbt_strdup(to);
70   myargv[1] = xbt_strdup(size);
71   myargv[2] = NULL;
72
73   sprintf(spawn_name,"%s_wait",MSG_process_self()->name);
74   comm_helper =
75     MSG_process_create_with_arguments(spawn_name, spawned_send, 
76                                       NULL, MSG_host_self(), 2, myargv);
77 }
78
79
80 static void recv(xbt_dynar_t action)
81 {
82   char *name = xbt_str_join(action, " ");
83   m_task_t task = NULL;
84   INFO1("Receiving: %s", name);
85   //FIXME: argument of action ignored so far; semantic not clear
86   //char *from=xbt_dynar_get_as(action,2,char*);
87   MSG_task_receive(&task, MSG_process_get_name(MSG_process_self()));
88   INFO1("Received %s", MSG_task_get_name(task));
89   MSG_task_destroy(task);
90   free(name);
91 }
92
93 static int spawned_recv(int argc, char *argv[])
94 {
95   m_task_t task = NULL;
96   char* name = (char *) MSG_process_get_data(MSG_process_self());
97   INFO1("Receiving on %s", name);
98   MSG_task_receive(&task, name);
99   INFO1("Received %s", MSG_task_get_name(task));
100   MSG_task_send(MSG_task_create("waiter",0,0,NULL),MSG_process_self()->name); 
101   
102   MSG_task_destroy(task);
103   return 0;
104 }
105
106
107 static void Irecv(xbt_dynar_t action)
108 {
109   char *name = xbt_str_join(action, " ");
110   m_process_t comm_helper;
111
112   INFO1("Irecv on %s: spawn process ", 
113          MSG_process_get_name(MSG_process_self()));
114
115   sprintf(name,"%s_wait",MSG_process_self()->name);
116   comm_helper = MSG_process_create(name,spawned_recv,
117                  (void *) MSG_process_get_name(MSG_process_self()),
118                  MSG_host_self());
119
120
121   free(name);
122 }
123
124
125 static void wait(xbt_dynar_t action)
126 {
127   char *name = xbt_str_join(action, " ");
128   char task_name[80];
129   m_task_t task = NULL;
130   
131   INFO1("wait: %s", name);
132   sprintf(task_name,"%s_wait",MSG_process_self()->name);
133   MSG_task_receive(&task,task_name);
134   INFO1("waited: %s", name);
135   free(name);
136 }
137
138 static void barrier (xbt_dynar_t action)
139 {
140   char *name = xbt_str_join(action, " ");
141   INFO1("barrier: %s", name);
142   
143
144   free(name);
145
146 }
147
148 static void bcast (xbt_dynar_t action)
149 {
150   int i;
151   char *name;
152   const char* process_name;
153   char task_name[80];
154   char spawn_name[80];
155   char **myargv;
156   m_process_t comm_helper=NULL;
157   m_task_t task=NULL;
158   char *size = xbt_dynar_get_as(action, 2, char *);
159   coll_ctr counters =  (coll_ctr) MSG_process_get_data(MSG_process_self());
160
161   xbt_assert0(communicator_size, "Size of Communicator is not defined"
162               ", can't use collective operations");
163
164   MSG_process_self()->data=NULL;
165
166   process_name = MSG_process_self()->name;
167   if (!counters){
168     DEBUG0("Initialize the counters");
169     counters = (coll_ctr) calloc (1, sizeof(struct coll_ctr_t));
170   }
171
172   name = bprintf("bcast_%d", counters->bcast_counter++);
173   if (!strcmp(process_name, "process0")){
174     INFO2("%s: %s is the Root",name, process_name);
175
176     for(i=1;i<communicator_size;i++){
177       myargv = (char**) calloc (3, sizeof (char*));
178       myargv[0] = xbt_strdup(name);
179       myargv[1] = xbt_strdup(size);
180       myargv[2] = NULL;
181
182       sprintf(spawn_name,"%s_%d", myargv[0], i);
183       comm_helper = 
184         MSG_process_create_with_arguments(spawn_name, spawned_send, 
185                                           NULL, MSG_host_self(), 2, myargv);
186     }
187     
188     for(i=1;i<communicator_size;i++){
189       sprintf(task_name,"process%d_wait", i);
190       DEBUG1("get on %s", task_name);
191       MSG_task_receive(&task,task_name);      
192       MSG_task_destroy(task);
193       task=NULL;
194     }
195     INFO2("%s: all messages sent by %s have been received",
196           name, process_name);
197   } else {
198     INFO2("%s: %s receives", name, process_name);
199     MSG_task_receive(&task, name);
200     MSG_task_destroy(task);
201     INFO2("%s: %s has received", name,process_name);
202     sprintf(task_name,"%s_wait", process_name);
203     DEBUG1("put on %s", task_name);
204     MSG_task_send(MSG_task_create("waiter",0,0,NULL),task_name);
205   }
206
207   MSG_process_set_data(MSG_process_self(), (void*)counters);
208   free(name);
209 }
210
211
212 static void sleep(xbt_dynar_t action)
213 {
214   char *name = xbt_str_join(action, " ");
215   char *duration = xbt_dynar_get_as(action, 2, char *);
216   INFO1("sleep: %s", name);
217   MSG_process_sleep(parse_double(duration));
218   INFO1("sleept: %s", name);
219   free(name);
220 }
221
222 static void comm_size(xbt_dynar_t action)
223 {
224   char *size = xbt_dynar_get_as(action, 2, char *);
225   communicator_size = parse_double(size);
226 }
227
228 static void compute(xbt_dynar_t action)
229 {
230   char *name = xbt_str_join(action, " ");
231   char *amout = xbt_dynar_get_as(action, 2, char *);
232   m_task_t task = MSG_task_create(name, parse_double(amout), 0, NULL);
233   INFO1("computing: %s", name);
234   MSG_task_execute(task);
235   MSG_task_destroy(task);
236   INFO1("computed: %s", name);
237   free(name);
238 }
239
240 /** Main function */
241 int main(int argc, char *argv[])
242 {
243   MSG_error_t res = MSG_OK;
244   
245   /* Check the given arguments */
246   MSG_global_init(&argc, argv);
247   if (argc < 4) {
248     printf("Usage: %s platform_file deployment_file action_files\n", argv[0]);
249     printf("example: %s msg_platform.xml msg_deployment.xml actions\n",
250            argv[0]);
251     exit(1);
252   }
253
254   /*  Simulation setting */
255   MSG_create_environment(argv[1]);
256
257   /* No need to register functions as in classical MSG programs: the actions get started anyway */
258   MSG_launch_application(argv[2]);
259
260   /*   Action registration */
261   MSG_action_register("comm_size", comm_size);
262   MSG_action_register("send", send);
263   MSG_action_register("Isend", Isend);
264   MSG_action_register("recv", recv);
265   MSG_action_register("Irecv", Irecv);
266   MSG_action_register("wait", wait);
267   MSG_action_register("barrier", barrier);
268   MSG_action_register("bcast", bcast);
269   MSG_action_register("sleep", sleep);
270   MSG_action_register("compute", compute);
271
272
273   /* Actually do the simulation using MSG_action_trace_run */
274   res = MSG_action_trace_run(argv[3]);
275
276   INFO1("Simulation time %g", MSG_get_clock());
277   MSG_clean();
278
279   if (res == MSG_OK)
280     return 0;
281   else
282     return 1;
283 }                               /* end_of_main */