1 /* pmm - paralel matrix multiplication "double diffusion" */
3 /* Copyright (c) 2006- Ahmed Harbaoui. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
11 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm,"Messages specific to this example");
13 GRAS_DEFINE_TYPE(s_matrix,struct s_matrix {
16 double *data GRAS_ANNOTE(size, rows*cols);
18 typedef struct s_matrix matrix_t;
20 /* struct for recovering results */
21 GRAS_DEFINE_TYPE(s_result,struct s_result {
26 typedef struct s_result result_t;
28 /* struct to send initial data to sensor */
29 GRAS_DEFINE_TYPE(s_init_data,struct s_init_data {
35 typedef struct s_init_data init_data_t;
37 /* register messages which may be sent (common to client and server) */
38 static void register_messages(void) {
39 gras_datadesc_type_t result_type;
40 gras_datadesc_type_t init_data_type;
41 result_type=gras_datadesc_by_symbol(s_result);
42 init_data_type=gras_datadesc_by_symbol(s_init_data);
44 gras_msgtype_declare("result", result_type);
45 gras_msgtype_declare("init_data", init_data_type);
47 gras_msgtype_declare("ask_result", gras_datadesc_by_name("int"));
48 gras_msgtype_declare("step", gras_datadesc_by_name("int"));
49 gras_msgtype_declare("step_ack", gras_datadesc_by_name("int"));
50 gras_msgtype_declare("data", gras_datadesc_by_name("int"));
53 /* Function prototypes */
54 int maestro (int argc,char *argv[]);
55 int sensor (int argc,char *argv[]);
57 /* **********************************************************************
59 * **********************************************************************/
61 /* Global private data */
69 static int maestro_cb_data_handler(gras_msg_cb_ctx_t ctx, void *payload) {
72 /* 1. Get the payload into the msg variable */
73 int msg=*(int*)payload_data;
75 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
79 /* 8. Make sure we don't leak sockets */
80 gras_socket_close(expeditor);
82 /* 9. Tell GRAS that we consummed this message */
84 } /* end_of_maestro_cb_data_handler */
86 /*** Function initilaze matrixs ***/
88 static void initmatrix(matrix_t *X){
90 for(i=0 ; i<X.rows*X.cols; i++)
91 X.data[i]=1.0;//1.0*rand()/(RAND_MAX+1.0);
93 } /* end_of_initmatrixs */
95 /*** Function Scatter Sequentiel ***/
97 static void scatter(){
101 /*** Function: Scatter // ***/
103 static void scatter_parl(){
105 }/* end_of_Scatter // */
107 /*** Function: multiplication ***/
109 static void multiplication(){
113 for (step=1; step <= MATRIX_SIZE; step++){
114 for (i=0; i< nbr_sensor; i++){
116 gras_msg_send(proc[(i/3)+1][(i%3)+1], gras_msgtype_by_name("step"), &step); /* initialize Mycol, MyRow, mydataA,mydataB*/
118 myrow,mycol,mydataA,mydataB
120 gras_socket_close(proc[(i/3)+1][(i%3)+1]);
121 RETHROW0("Unable to send the msg : %s");
124 /* wait for computing and sensor messages exchange */
126 gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
128 RETHROW0("I Can't get a init Data message from Maestro : %s");
132 }/* end_of_multiplication */
134 /*** Function: gather ***/
136 static void gather(){
141 /*** Function: Display Matrix ***/
143 static void display(matrix_t X){
148 for(j=0;j<X.cols;j++)
152 for(j=0;j<X.cols;j++)
156 for(i=0;i<X.rows;i++){
158 for(j=0;j<X.cols;j++)
159 printf("%.3g ",X.data[t++]);
163 for(j=0;j<X.cols;j++)
167 }/* end_of_display */
169 int maestro (int argc,char *argv[]) {
172 int i,ask_result,step;
178 gras_socket_t socket[MATRIX_SIZE*MATRIX_SIZE]; /* sockets for brodcast to other sensor */
181 /* Initialize Matrixs */
183 A.rows=A.cols=MATRIX_SIZE;
184 B.rows=B.cols=MATRIX_SIZE;
185 C.rows=C.cols=MATRIX_SIZE;
187 A.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
188 B.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
189 C.data=xbt_malloc0(sizeof(double)*MATRIX_SIZE*MATRIX_SIZE);
194 /* Init the GRAS's infrastructure */
195 gras_init(&argc, argv);
196 /* Get arguments and create sockets */
198 //scatter();multiplication();gather();
200 /****************************** Init Data Send *********************************/
203 for( i=2;i< argc;i+=3){
206 socket[j]=gras_socket_client(argv[i],port);
209 RETHROW0("Unable to connect to the server: %s");
211 INFO2("Connected to %s:%d.",argv[i],port);
213 mydata.myrow=argv[i+1]; // My rank of row
214 mydata.mycol=argv[i+2]; // My rank of column
215 mydata.a=A.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];
216 mydata.b=B.data[(mydata.myrow-1)*MATRIX_SIZE+(mydata.mycol-1)];;
218 gras_msg_send(socket[j],gras_msgtype_by_name("init_data"),&mydata);
220 } // end init Data Send
222 /******************************* multiplication ********************************/
224 for (step=1; step <= MATRIX_SIZE; step++){
225 for (i=0; i< nbr_sensor; i++){
227 gras_msg_send(socket[i], gras_msgtype_by_name("step"), &step); /* initialize Mycol, MyRow, mydataA,mydataB*/
229 myrow,mycol,mydataA,mydataB
231 gras_socket_close(socket[i]);
232 RETHROW0("Unable to send the msg : %s");
235 INFO0("send to sensor to begin a next step");
236 /* wait for computing and sensor messages exchange */
238 while ( i< nbr_sensor){
240 gras_msg_wait(600,gras_msgtype_by_name("step_ack"),&from,&mydata);
242 RETHROW0("I Can't get a init Data message from Maestro : %s");
246 INFO1("Recive step ack from %s",gras_socket_peer_name(from));
250 /********************************* gather ***************************************/
253 for( i=1;i< argc;i++){
254 gras_msg_send(socket[i],gras_msgtype_by_name("ask_result"),&ask_result);
256 /* wait for results */
257 for( i=1;i< argc;i++){
258 gras_msg_wait(600,gras_msgtype_by_name("result"),&from,&result);
259 C.data[(result.i-1)*MATRIX_SIZE+(result.j-1)]=result.value;
265 } /* end_of_maestro */
267 /* **********************************************************************
269 * **********************************************************************/
271 int sensor(int argc,char *argv[]) {
276 static int myrow,mycol;
277 static double mydataA,mydataB;
279 int step,l,result_ack=0;
281 gras_socket_t from; /* to recive from server for steps */
283 gras_socket_t socket_row[2],socket_column[2]; /* sockets for brodcast to other sensor */
285 /* Init the GRAS's infrastructure */
287 gras_init(&argc, argv);
289 /* Get arguments and create sockets */
293 for (i=1;i<MATRIX_SIZE;i++){
294 socket_row[i]=gras_socket_client(argv[i+1],port);
295 socket_column[i]=gras_socket_client(argv[i+MATRIX_SIZE],port);
297 INFO2("Launch %s (port=%d)",argv[0],port);
299 /* Create my master socket */
300 sock = gras_socket_server(port);
302 /* Register the known messages */
305 /* Recover my initialized Data and My Position*/
309 gras_msg_wait(600,gras_msgtype_by_name("init_data"),&from,&mydata);
311 RETHROW0("I Can't get a init Data message from Maestro : %s");
318 INFO4("Recover MY POSITION (%d,%d) and MY INIT DATA ( A=%.3g | B=%.3g )",
319 myrow,mycol,mydataA,mydataB);
322 do { //repeat until compute Cb
323 step=MATRIX_SIZE+1; // juste intilization for loop
326 gras_msg_wait(600,gras_msgtype_by_name("step"),&from,&step);
328 RETHROW0("I Can't get a Next Step message from Maestro : %s");
331 /* Wait for sensors startup */
334 if (step < MATRIX_SIZE){
337 for (l=1;l < MATRIX_SIZE ;l++){
338 gras_msg_send(socket_row[l], gras_msgtype_by_name("data"), &mydataB);
345 gras_msg_wait(600,gras_msgtype_by_name("data"),
348 RETHROW0("I Can't get a data message from row : %s");
351 /* a column brodcast */
353 for (l=1;l < MATRIX_SIZE ;l++){
354 gras_msg_send(socket_column[l],gras_msgtype_by_name("data"), &mydataA);
361 gras_msg_wait(600,gras_msgtype_by_name("data"),
364 RETHROW0("I Can't get a data message from column : %s");
369 /* send a ack msg to Maestro */
371 gras_msg_send(from,gras_msgtype_by_name("step_ack"),&step);
373 INFO1("Send ack to maestro for to end %d th step",step);
375 if(step==MATRIX_SIZE-1) break;
377 } while (step < MATRIX_SIZE);
378 /* wait Message from maestro to send the result */
385 gras_msg_wait(600,gras_msgtype_by_name("ask_result"),
388 RETHROW0("I Can't get a data message from row : %s");
390 /* 5. send Result to Maestro */
392 gras_msg_send(from, gras_msgtype_by_name("result"),&result);
394 // gras_socket_close(from);
395 RETHROW0("Failed to send PING to server: %s");
397 INFO3(">>>>>>>> Result: %d sent to %s:%d <<<<<<<<",
399 gras_socket_peer_name(from),gras_socket_peer_port(from));
400 /* Free the allocated resources, and shut GRAS down */
401 gras_socket_close(from);
405 } /* end_of_sensor */