Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move datadesc and TCP sockets from GRAS to XBT.
[simgrid.git] / examples / gras / pmm / pmm.c
old mode 100755 (executable)
new mode 100644 (file)
index b29af45..38c834c
@@ -20,32 +20,34 @@ const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
 
 /* struct for recovering results */
-GRAS_DEFINE_TYPE(s_result, struct s_result {
-                 int linepos;
-                 int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);});
+XBT_DEFINE_TYPE(s_result, struct s_result {
+                 int linepos; int rowpos;
+                 xbt_matrix_t C XBT_ANNOTE(subtype, double);
+                 });
 
 typedef struct s_result result_t;
 
 /* struct to send initial data to slave */
-GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
+XBT_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
                  int linepos;
                  int rowpos;
                  xbt_peer_t line[NEIGHBOR_COUNT];
                  xbt_peer_t row[NEIGHBOR_COUNT];
-                 xbt_matrix_t A GRAS_ANNOTE(subtype, double);
-                 xbt_matrix_t B GRAS_ANNOTE(subtype, double);});
+                 xbt_matrix_t A XBT_ANNOTE(subtype, double);
+                 xbt_matrix_t B XBT_ANNOTE(subtype, double);
+                 });
 
 typedef struct s_pmm_assignment s_pmm_assignment_t;
 
 /* register messages which may be sent (common to client and server) */
 static void register_messages(void)
 {
-  gras_datadesc_type_t result_type;
-  gras_datadesc_type_t pmm_assignment_type;
+  xbt_datadesc_type_t result_type;
+  xbt_datadesc_type_t pmm_assignment_type;
 
-  gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
-  result_type = gras_datadesc_by_symbol(s_result);
-  pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
+  xbt_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
+  result_type = xbt_datadesc_by_symbol(s_result);
+  pmm_assignment_type = xbt_datadesc_by_symbol(s_pmm_assignment);
 
   /* receive a final result from slave */
   gras_msgtype_declare("result", result_type);
@@ -55,11 +57,42 @@ static void register_messages(void)
 
   /* send data between slaves */
   gras_msgtype_declare("dataA",
-                       gras_datadesc_matrix(gras_datadesc_by_name("double"),
-                                            NULL));
+                       xbt_datadesc_matrix(xbt_datadesc_by_name
+                                            ("double"), NULL));
   gras_msgtype_declare("dataB",
-                       gras_datadesc_matrix(gras_datadesc_by_name("double"),
-                                            NULL));
+                       xbt_datadesc_matrix(xbt_datadesc_by_name
+                                            ("double"), NULL));
+
+  /* synchronization message */
+  gras_msgtype_declare("pmm_sync", 0);
+}
+
+static xbt_socket_t try_gras_socket_client_from_string(const char *host)
+{
+  volatile xbt_socket_t sock = NULL;
+  xbt_ex_t e;
+  TRY {
+    sock = gras_socket_client_from_string(host);
+  }
+  CATCH(e) {
+    if (e.category != system_error)
+      /* dunno what happened, let the exception go through */
+      RETHROWF("Unable to connect to the server: %s");
+    xbt_ex_free(e);
+  }
+  return sock;
+}
+
+static void my_gras_msg_wait(double timeout, const char* msgt_want,
+                             xbt_socket_t* expeditor, void *payload,
+                             const char *error_msg)
+{
+  TRY {
+    gras_msg_wait(timeout, msgt_want, expeditor, payload);
+  }
+  CATCH_ANONYMOUS {
+    RETHROWF("%s: %s", error_msg);
+  }
 }
 
 /* Function prototypes */
@@ -86,11 +119,11 @@ int master(int argc, char *argv[])
   xbt_matrix_t A, B, C;
   result_t result;
 
-  gras_socket_t from;
+  xbt_socket_t from;
 
   xbt_dynar_t peers;            /* group of slaves */
   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
-  gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
+  xbt_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
 
   /* Init the GRAS's infrastructure */
   gras_init(&argc, argv);
@@ -103,24 +136,25 @@ int master(int argc, char *argv[])
   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
 
   /* Create the connexions */
-  xbt_assert0(argc > 1, "Usage: master <port>");
+  xbt_assert(argc > 1, "Usage: master <port>");
   gras_socket_server(atoi(argv[1]));
   peers = amok_pm_group_new("pmm");
 
   /* friends, we're ready. Come and play */
-  INFO0("Wait for peers for 2 sec");
+  XBT_INFO("Wait for peers for 2 sec");
   gras_msg_handleall(2);
-  while (xbt_dynar_length(peers)<9) {
-    INFO1("Got only %ld pals. Wait 2 more seconds", xbt_dynar_length(peers));
+  while (xbt_dynar_length(peers) < SLAVE_COUNT) {
+    XBT_INFO("Got only %ld pals (of %d). Wait 2 more seconds",
+        xbt_dynar_length(peers),SLAVE_COUNT);
     gras_msg_handleall(2);
   }
-  INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
+  XBT_INFO("Good. Got %ld pals", xbt_dynar_length(peers));
 
   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
     xbt_dynar_get_cpy(peers, i, &grid[i]);
     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
   }
-  xbt_assert2(i == SLAVE_COUNT,
+  xbt_assert(i == SLAVE_COUNT,
               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
               i, SLAVE_COUNT);
 
@@ -129,7 +163,7 @@ int master(int argc, char *argv[])
     xbt_peer_t h;
 
     xbt_dynar_remove_at(peers, i, &h);
-    INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
+    XBT_INFO("Too much slaves. Killing %s:%d", h->name, h->port);
     amok_pm_kill_hp(h->name, h->port);
     free(h);
   }
@@ -137,7 +171,7 @@ int master(int argc, char *argv[])
 
   /* Assign job to slaves */
   int row = 0, line = 0;
-  INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
+  XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
   for (i = 0; i < SLAVE_COUNT; i++) {
     s_pmm_assignment_t assignment;
     int j, k;
@@ -164,8 +198,9 @@ int master(int argc, char *argv[])
                                       submatrix_size * line,
                                       submatrix_size * row, NULL);
     assignment.B =
-      xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
-                         submatrix_size * line, submatrix_size * row, NULL);
+        xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
+                           submatrix_size * line, submatrix_size * row,
+                           NULL);
     row++;
     if (row >= PROC_MATRIX_SIZE) {
       row = 0;
@@ -177,12 +212,20 @@ int master(int argc, char *argv[])
     xbt_matrix_free(assignment.B);
   }
 
-  /* (have a rest while the slave perform the multiplication) */
+  /* synchronize slaves */
+  for (i = 0; i < PROC_MATRIX_SIZE; i++) {
+    int j;
+    for (j = 0; j < SLAVE_COUNT; j++)
+      gras_msg_wait(600, "pmm_sync", NULL, NULL);
+    for (j = 0; j < SLAVE_COUNT; j++)
+      gras_msg_send(socket[j], "pmm_sync", NULL);
+  }
 
   /* Retrieve the results */
   for (i = 0; i < SLAVE_COUNT; i++) {
     gras_msg_wait(6000, "result", &from, &result);
-    VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
+    XBT_VERB("%d slaves are done already. Waiting for %d", i + 1,
+          SLAVE_COUNT);
     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
                            submatrix_size * result.linepos,
                            submatrix_size * result.rowpos, 0, 0, NULL);
@@ -191,16 +234,16 @@ int master(int argc, char *argv[])
   /*    end of gather   */
 
   if (xbt_matrix_double_is_seq(C))
-    INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
+    XBT_INFO("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
   else {
-    WARN0("the result seems wrong");
-  if (DATA_MATRIX_SIZE < 30) {
-    INFO0("The Result of Multiplication is :");
-    xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
-  } else {
-    INFO1("Matrix size too big (%d>30) to be displayed here",
-          DATA_MATRIX_SIZE);
-  }
+    XBT_WARN("the result seems wrong");
+    if (DATA_MATRIX_SIZE < 30) {
+      XBT_INFO("The Result of Multiplication is :");
+      xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
+    } else {
+      XBT_INFO("Matrix size too big (%d>30) to be displayed here",
+            DATA_MATRIX_SIZE);
+    }
   }
 
   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
@@ -223,9 +266,7 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
 {
   /* Recover my initialized Data and My Position */
   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
-  gras_socket_t master = gras_msg_cb_ctx_from(ctx);
-
-  xbt_ex_t e;
+  xbt_socket_t master = gras_msg_cb_ctx_from(ctx);
 
   int step, l;
   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
@@ -236,15 +277,15 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   int myline, myrow;
   xbt_matrix_t mydataA, mydataB;
   xbt_matrix_t bC =
-    xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
+      xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
 
   result_t result;
 
-  gras_socket_t from;           /* to exchange data with my neighbor */
+  xbt_socket_t from;           /* to exchange data with my neighbor */
 
   /* sockets for brodcast to other slave */
-  gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
-  gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
+  xbt_socket_t socket_line[PROC_MATRIX_SIZE - 1];
+  xbt_socket_t socket_row[PROC_MATRIX_SIZE - 1];
   memset(socket_line, 0, sizeof(socket_line));
   memset(socket_row, 0, sizeof(socket_row));
 
@@ -258,9 +299,9 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   mydataB = assignment.B;
 
   if (gras_if_RL())
-    INFO0("Receive my pos and assignment");
+    XBT_INFO("Receive my pos and assignment");
   else
-    INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
+    XBT_INFO("Receive my pos (%d,%d) and assignment", myline, myrow);
 
   /* Get my neighborhood from the assignment message (skipping myself) */
   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
@@ -275,12 +316,15 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   }
 
   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
+    gras_msg_send(master, "pmm_sync", NULL);
+    gras_msg_wait(600, "pmm_sync", NULL, NULL);
 
     /* a line brodcast */
     if (myline == step) {
-      VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
+      XBT_VERB("LINE: step(%d) = Myline(%d). Broadcast my data.", step,
+            myline);
       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
-        VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
+        XBT_VERB("LINE:   Send to %s", xbt_socket_peer_name(socket_row[l]));
         gras_msg_send(socket_row[l], "dataB", &mydataB);
       }
 
@@ -289,41 +333,34 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
       bB = xbt_matrix_new_sub(mydataB,
                               submatrix_size, submatrix_size, 0, 0, NULL);
     } else {
-      TRY {
-        xbt_matrix_free(bB);
-        gras_msg_wait(600, "dataB", &from, &bB);
-      }
-      CATCH(e) {
-        RETHROW0("Can't get a data message from line : %s");
-      }
-      VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
-            myline, gras_socket_peer_name(from));
+      xbt_matrix_free(bB);
+      my_gras_msg_wait(600, "dataB", &from, &bB,
+                       "Can't get a data message from line");
+      XBT_VERB("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
+            myline, xbt_socket_peer_name(from));
     }
 
     /* a row brodcast */
     if (myrow == step) {
-      VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
+      XBT_VERB("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
-        VERB1("ROW:   Send to %s", gras_socket_peer_name(socket_line[l - 1]));
+        XBT_VERB("ROW:   Send to %s",
+              xbt_socket_peer_name(socket_line[l - 1]));
         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
       }
       xbt_matrix_free(bA);
       bA = xbt_matrix_new_sub(mydataA,
                               submatrix_size, submatrix_size, 0, 0, NULL);
     } else {
-      TRY {
-        xbt_matrix_free(bA);
-        gras_msg_wait(1200, "dataA", &from, &bA);
-      }
-      CATCH(e) {
-        RETHROW0("Can't get a data message from row : %s");
-      }
-      VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
-            gras_socket_peer_name(from));
+      xbt_matrix_free(bA);
+      my_gras_msg_wait(1200, "dataA", &from, &bA,
+                       "Can't get a data message from row");
+      XBT_VERB("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
+            xbt_socket_peer_name(from));
     }
     xbt_matrix_double_addmult(bA, bB, bC);
 
-  };
+  }
 
   /* send Result to master */
   result.C = bC;
@@ -333,11 +370,11 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
   TRY {
     gras_msg_send(master, "result", &result);
   }
-  CATCH(e) {
-    RETHROW0("Failed to send answer to server: %s");
+  CATCH_ANONYMOUS {
+    RETHROWF("Failed to send answer to server: %s");
   }
-  VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
-        gras_socket_peer_name(master), gras_socket_peer_port(master));
+  XBT_VERB(">>>>>>>> Result sent to %s:%d <<<<<<<<",
+        xbt_socket_peer_name(master), xbt_socket_peer_port(master));
   /*  Free the allocated resources, and shut GRAS down */
 
   xbt_matrix_free(bA);
@@ -361,9 +398,8 @@ static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
 
 int slave(int argc, char *argv[])
 {
-  gras_socket_t mysock;
-  gras_socket_t master = NULL;
-  int connected = 0;
+  xbt_socket_t mysock;
+  xbt_socket_t master = NULL;
   int rank;
 
   /* Init the GRAS's infrastructure */
@@ -382,20 +418,9 @@ int slave(int argc, char *argv[])
 
   /* Create the connexions */
   mysock = gras_socket_server_range(3000, 9999, 0, 0);
-  INFO1("Sensor %d starting", rank);
-  while (!connected) {
-    xbt_ex_t e;
-    TRY {
-      master = gras_socket_client_from_string(argv[1]);
-      connected = 1;
-    }
-    CATCH(e) {
-      if (e.category != system_error)
-        RETHROW;
-      xbt_ex_free(e);
-      gras_os_sleep(0.5);
-    }
-  }
+  XBT_INFO("Sensor %d starting", rank);
+  while (!(master = try_gras_socket_client_from_string(argv[1])))
+    gras_os_sleep(0.5);
 
   /* Join and run the group */
   rank = amok_pm_group_join(master, "pmm");