Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make the pmm robust to the change of amount of slaves
[simgrid.git] / examples / gras / pmm / pmm.c
old mode 100755 (executable)
new mode 100644 (file)
index a09c143..7e7d35d
@@ -1,7 +1,7 @@
-/* $Id$ */
 /* pmm - parallel matrix multiplication "double diffusion"                  */
 
-/* Copyright (c) 2006-2008 The SimGrid team. All rights reserved.           */
+/* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
+ * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -10,7 +10,7 @@
 #include "xbt/matrix.h"
 #include "amok/peermanagement.h"
 
-#define PROC_MATRIX_SIZE 3
+#define PROC_MATRIX_SIZE 2
 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
 
@@ -57,11 +57,14 @@ static void register_messages(void)
 
   /* send data between slaves */
   gras_msgtype_declare("dataA",
-                       gras_datadesc_matrix(gras_datadesc_by_name("double"),
-                                            NULL));
+                       gras_datadesc_matrix(gras_datadesc_by_name
+                                            ("double"), NULL));
   gras_msgtype_declare("dataB",
-                       gras_datadesc_matrix(gras_datadesc_by_name("double"),
-                                            NULL));
+                       gras_datadesc_matrix(gras_datadesc_by_name
+                                            ("double"), NULL));
+
+  /* synchronization message */
+  gras_msgtype_declare("pmm_sync", 0);
 }
 
 /* Function prototypes */
@@ -80,7 +83,6 @@ typedef struct {
   int remaining_ack;
 } master_data_t;
 
-
 int master(int argc, char *argv[])
 {
 
@@ -111,12 +113,16 @@ int master(int argc, char *argv[])
   peers = amok_pm_group_new("pmm");
 
   /* friends, we're ready. Come and play */
-  INFO0("Wait for peers for 5 sec");
-  gras_msg_handleall(5);
-  INFO1("Got %ld pals", xbt_dynar_length(peers));
+  INFO0("Wait for peers for 2 sec");
+  gras_msg_handleall(2);
+  do {
+    INFO2("Got only %ld pals (of %d). Wait 2 more seconds",
+        xbt_dynar_length(peers),SLAVE_COUNT);
+    gras_msg_handleall(2);
+  } while (xbt_dynar_length(peers) < SLAVE_COUNT);
+  INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
 
   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
-
     xbt_dynar_get_cpy(peers, i, &grid[i]);
     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
   }
@@ -164,8 +170,9 @@ int master(int argc, char *argv[])
                                       submatrix_size * line,
                                       submatrix_size * row, NULL);
     assignment.B =
-      xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
-                         submatrix_size * line, submatrix_size * row, NULL);
+        xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
+                           submatrix_size * line, submatrix_size * row,
+                           NULL);
     row++;
     if (row >= PROC_MATRIX_SIZE) {
       row = 0;
@@ -177,12 +184,20 @@ int master(int argc, char *argv[])
     xbt_matrix_free(assignment.B);
   }
 
-  /* (have a rest while the slave perform the multiplication) */
+  /* synchronize slaves */
+  for (i = 0; i < PROC_MATRIX_SIZE; i++) {
+    int j;
+    for (j = 0; j < SLAVE_COUNT; j++)
+      gras_msg_wait(600, "pmm_sync", NULL, NULL);
+    for (j = 0; j < SLAVE_COUNT; j++)
+      gras_msg_send(socket[j], "pmm_sync", NULL);
+  }
 
   /* Retrieve the results */
   for (i = 0; i < SLAVE_COUNT; i++) {
     gras_msg_wait(6000, "result", &from, &result);
-    VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
+    VERB2("%d slaves are done already. Waiting for %d", i + 1,
+          SLAVE_COUNT);
     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
                            submatrix_size * result.linepos,
                            submatrix_size * result.rowpos, 0, 0, NULL);
@@ -190,19 +205,23 @@ int master(int argc, char *argv[])
   }
   /*    end of gather   */
 
-  if (DATA_MATRIX_SIZE < 30) {
-    INFO0("The Result of Multiplication is :");
-    xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
-  } else {
-    INFO1("Matrix size too big (%d>30) to be displayed here",
-          DATA_MATRIX_SIZE);
+  if (xbt_matrix_double_is_seq(C))
+    INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
+  else {
+    WARN0("the result seems wrong");
+    if (DATA_MATRIX_SIZE < 30) {
+      INFO0("The Result of Multiplication is :");
+      xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
+    } else {
+      INFO1("Matrix size too big (%d>30) to be displayed here",
+            DATA_MATRIX_SIZE);
+    }
   }
 
   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
 
-  for (i = 0; i < SLAVE_COUNT; i++) {
+  for (i = 0; i < SLAVE_COUNT; i++)
     gras_socket_close(socket[i]);
-  }
 
   xbt_matrix_free(A);
   xbt_matrix_free(B);
@@ -232,7 +251,7 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   int myline, myrow;
   xbt_matrix_t mydataA, mydataB;
   xbt_matrix_t bC =
-    xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
+      xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
 
   result_t result;
 
@@ -253,7 +272,10 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   mydataA = assignment.A;
   mydataB = assignment.B;
 
-  INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
+  if (gras_if_RL())
+    INFO0("Receive my pos and assignment");
+  else
+    INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
 
   /* Get my neighborhood from the assignment message (skipping myself) */
   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
@@ -268,12 +290,15 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   }
 
   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
+    gras_msg_send(master, "pmm_sync", NULL);
+    gras_msg_wait(600, "pmm_sync", NULL, NULL);
 
     /* a line brodcast */
     if (myline == step) {
-      INFO2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
+      VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step,
+            myline);
       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
-        INFO1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
+        VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
         gras_msg_send(socket_row[l], "dataB", &mydataB);
       }
 
@@ -289,15 +314,16 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
       CATCH(e) {
         RETHROW0("Can't get a data message from line : %s");
       }
-      INFO3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
+      VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
             myline, gras_socket_peer_name(from));
     }
 
     /* a row brodcast */
     if (myrow == step) {
-      INFO2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
+      VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
-        INFO1("ROW:   Send to %s", gras_socket_peer_name(socket_line[l - 1]));
+        VERB1("ROW:   Send to %s",
+              gras_socket_peer_name(socket_line[l - 1]));
         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
       }
       xbt_matrix_free(bA);
@@ -311,7 +337,7 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
       CATCH(e) {
         RETHROW0("Can't get a data message from row : %s");
       }
-      INFO3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
+      VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
             gras_socket_peer_name(from));
     }
     xbt_matrix_double_addmult(bA, bB, bC);
@@ -329,7 +355,7 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   CATCH(e) {
     RETHROW0("Failed to send answer to server: %s");
   }
-  INFO2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
+  VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
         gras_socket_peer_name(master), gras_socket_peer_port(master));
   /*  Free the allocated resources, and shut GRAS down */
 
@@ -389,9 +415,10 @@ int slave(int argc, char *argv[])
       gras_os_sleep(0.5);
     }
   }
+  INFO2("Connected to master: %s:%d",gras_socket_peer_name(master),gras_socket_peer_port(master));
 
   /* Join and run the group */
-  amok_pm_group_join(master, "pmm", rank);
+  rank = amok_pm_group_join(master, "pmm");
   amok_pm_mainloop(600);
 
   /* housekeeping */