Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Lua: copy tasks so that the sender doesn't have to sleep
authorChristophe Thiéry <christopho128@gmail.com>
Tue, 15 Nov 2011 13:23:43 +0000 (14:23 +0100)
committerChristophe Thiéry <christopho128@gmail.com>
Tue, 15 Nov 2011 13:29:03 +0000 (14:29 +0100)
This will allow to implement asynchronous comms (isend/irecv)

src/bindings/lua/simgrid_lua.c

index ef4ace5..acd4e49 100644 (file)
@@ -161,6 +161,78 @@ static int l_task_execute(lua_State* L)
   }
 }
 
+/**
+ * \brief Pops the Lua task on top of the stack and registers it so that a
+ * receiver can retrieve it later knowing the C task.
+ *
+ * After calling this function, you can send the C task to someone and he
+ * will be able to also get the corresponding Lua task.
+ *
+ * \param L a lua state
+ */
+static void task_register(lua_State* L) {
+
+  m_task_t task = sglua_checktask(L, -1);
+                                  /* ... task */
+  /* put in the C task a ref to the lua task so that the receiver finds it */
+  unsigned long ref = luaL_ref(L, LUA_REGISTRYINDEX);
+                                  /* ... */
+  MSG_task_set_data(task, (void*) ref);
+}
+
+/**
+ * \brief Pushes onto the stack the Lua task corresponding to a C task.
+ *
+ * The Lua task must have been previously registered with task_register so
+ * that it can be retrieved knowing the C task.
+ *
+ * \param L a lua state
+ * \param task a C task
+ */
+static void task_unregister(lua_State* L, m_task_t task) {
+
+                                  /* ... */
+  /* the task is in my registry, put it onto my stack */
+  unsigned long ref = (unsigned long) MSG_task_get_data(task);
+  lua_rawgeti(L, LUA_REGISTRYINDEX, ref);
+                                  /* ... task */
+  luaL_unref(L, LUA_REGISTRYINDEX, ref);
+  MSG_task_set_data(task, NULL);
+}
+
+/**
+ * \brief When a C task has been received, retrieves the corresponding Lua
+ * task from the sender and pushes it onto the receiver's stack.
+ *
+ * This function should be called from the receiver process.
+ *
+ * \param dst the receiver
+ * \param task the task just received
+ */
+static void task_copy(lua_State* dst, m_task_t task) {
+
+  m_process_t src_proc = MSG_task_get_sender(task);
+  lua_State* src = MSG_process_get_data(src_proc);
+
+                                  /* src: ...
+                                     dst: ... */
+  task_unregister(src, task);
+                                  /* src: ... task */
+  sglua_copy_value(src, dst);
+                                  /* src: ... task
+                                     dst: ... task */
+
+  /* the receiver is the owner of the task and may destroy it:
+   * make the C task NULL on the sender side so that it doesn't garbage
+   * collect it */
+  lua_getfield(src, -1, "__simgrid_task");
+                                  /* src: ... task ctask */
+  m_task_t* udata = (m_task_t*) luaL_checkudata(src, -1, TASK_MODULE_NAME);
+  *udata = NULL;
+  lua_pop(src, 2);
+                                  /* src: ... */
+}
+
 /**
  * \brief Sends a task to a mailbox and waits for its completion.
  * \param L a Lua state
@@ -180,26 +252,19 @@ static int l_task_send(lua_State* L)
                                   /* task mailbox */
   lua_settop(L, 1);
                                   /* task */
-  /* copy my stack into the task, so that the receiver can copy the lua task */
-  MSG_task_set_data(task, L);
+  task_register(L);
+                                  /* -- */
   MSG_error_t res = MSG_task_send(task, mailbox);
-  while (MSG_task_get_data(task) != NULL) {
-    /* don't mess up with my stack: the receiver didn't copy the data yet */
-    MSG_process_sleep(0);
-  }
 
   if (res == MSG_OK) {
-    /* the receiver is the owner of the task and may destroy it:
-     * remove the C task on my side so that I don't garbage collect it */
-    lua_getfield(L, 1, "__simgrid_task");
-                                  /* task ctask */
-    m_task_t* udata = (m_task_t*) luaL_checkudata(L, -1, TASK_MODULE_NAME);
-    *udata = NULL;
     return 0;
   }
   else {
-    lua_settop(L, 0);
+    /* the communication has failed, I'm still the owner of the task */
+    task_unregister(L, task);
+                                  /* task */
     lua_pushstring(L, msg_errors[res]);
+                                  /* task error */
     return 1;
   }
 }
@@ -241,21 +306,19 @@ static int l_task_recv(lua_State* L)
     timeout = -1;
     /* no timeout by default */
   }
-  lua_settop(L, 0);
-                                  /* -- */
+                                  /* mailbox ... -- */
   MSG_error_t res = MSG_task_receive_with_timeout(&task, mailbox, timeout);
 
   if (res == MSG_OK) {
-    /* copy the data directly from sender's stack */
-    lua_State* sender_stack = MSG_task_get_data(task);
-    sglua_copy_value(sender_stack, L);
-                                  /* task */
-    MSG_task_set_data(task, NULL);
+    task_copy(L, task);
+                                  /* mailbox ... task */
     return 1;
   }
   else {
     lua_pushnil(L);
+                                  /* mailbox ... nil */
     lua_pushstring(L, msg_errors[res]);
+                                  /* mailbox ... nil error */
     return 2;
   }
 }
@@ -344,92 +407,97 @@ static msg_comm_t sglua_checkcomm(lua_State* L, int index)
  * \param L a Lua state
  * \return number of values returned to Lua
  *
- * This function performs a waitany operation: you can specify an
- * individual communication or a list of communications.
- * If you provide a list, the function returns whenever one communication of
- * the list is finished, and this communication will be removed from you list.
- * This means you can make a waitall operation with successive calls to this
- * function.
- *
- * - Argument 1 (comm or table): a comm or an array of comms
- * - Argument 2 (number, optional): timeout (supported only when there is only
- * one comm)
- * - Return values (comm + string): the first comm of your list that finishes,
- * plus an error string if this comm was unsuccessful.
+ * - Argument 1 (comm): a comm (previously created by isend or irecv)
+ * - Argument 2 (number, optional): timeout (default is no timeout)
+ * - Return values (task or nil + string): in case of success, returns the task
+ * received if you are the receiver and nil if you are the sender. In case of
+ * failure, returns nil plus an error string.
  */
 static int l_comm_wait(lua_State* L) {
 
-  if (lua_istable(L, 1)) {
-    // TODO implement waitany
-    THROW_UNIMPLEMENTED;
+  msg_comm_t comm = sglua_checkcomm(L, 1);
+  double timeout = -1;
+  if (lua_gettop(L) >= 2) {
+    timeout = luaL_checknumber(L, 2);
   }
-  else {
-    /* only one comm */
-    msg_comm_t comm = sglua_checkcomm(L, 1);
-    double timeout = -1;
-    if (lua_gettop(L) >= 2) {
-      timeout = luaL_checknumber(L, 2);
-    }
                                   /* comm ... */
-    MSG_error_t res = MSG_comm_wait(comm, timeout);
-    lua_settop(L, 1);
-                                  /* comm */
-    if (res == MSG_OK) {
-      return 1;
+  MSG_error_t res = MSG_comm_wait(comm, timeout);
+
+  if (res == MSG_OK) {
+    m_task_t task = MSG_comm_get_task(comm);
+    if (MSG_task_get_sender(task) == MSG_process_self()) {
+      /* I'm the sender */
+      return 0;
     }
     else {
-      lua_pushstring(L, msg_errors[res]);
-                                  /* comm error */
-      return 2;
+      /* I'm the receiver: copy the Lua task from the sender */
+      task_copy(L, task);
+                                  /* comm ... task */
+      return 1;
     }
   }
-  return 0;
+  else {
+    /* the communication has failed */
+    lua_pushnil(L);
+                                  /* comm ... nil */
+    lua_pushstring(L, msg_errors[res]);
+                                  /* comm ... nil error */
+    return 2;
+  }
 }
 
 /**
  * @brief Returns whether a communication is finished.
  *
- * This function always returns immediately.
- * It performs a testany operation: you can provide an individual
- * communication or a list of communications.
+ * Unlike wait(), This function always returns immediately.
  *
- * - Argument 1 (comm or table): a comm or an array of comms
- * - Return values (nil or comm + string): if no comm from your list is finished
- * yet, returns nil. If a comm is finished, removes it from your list and
- * returns it, plus an error string if it was unsuccessful.
+ * - Argument 1 (comm): a comm (previously created by isend or irecv)
+ * - Return value 1 (boolean): indicates whether the comm is finished
+ * (note that a finished comm may have failed)
+ * - Return value 2 (task): if you are the receiver, returns the task received
+ * in case of success, or nil if the comm has failed
+ * - Return value 3 (string): if the comm has failed, returns an error string
  */
 static int l_comm_test(lua_State* L) {
 
-  if (lua_istable(L, 1)) {
-    /* TODO implement testany */
-    THROW_UNIMPLEMENTED;
+  msg_comm_t comm = sglua_checkcomm(L, 1);
+                                  /* comm ... */
+  if (!MSG_comm_test(comm)) {
+    return 0;
   }
   else {
-    /* only one comm */
-    msg_comm_t comm = sglua_checkcomm(L, 1);
-                                  /* comm ... */
-    if (!MSG_comm_test(comm)) {
-      return 0;
-    }
-    else {
-      lua_settop(L, 1);
-                                  /* comm */
-      MSG_error_t res = MSG_comm_get_status(comm);
-      if (res == MSG_OK) {
+    MSG_error_t res = MSG_comm_get_status(comm);
+
+    if (res == MSG_OK) {
+      lua_pushboolean(L, 1);
+                                  /* comm ... true */
+      m_task_t task = MSG_comm_get_task(comm);
+      if (MSG_task_get_sender(task) == MSG_process_self()) {
+        /* I'm the sender */
         return 1;
       }
       else {
-        lua_pushstring(L, msg_errors[res]);
-                                  /* comm error */
+        /* I'm the receiver: copy the Lua task from the sender */
+        task_copy(L, task);
+                                    /* comm ... true task */
         return 2;
       }
     }
+    else {
+      /* the communication has failed */
+      lua_pushnil(L);
+                                    /* comm ... true nil */
+      lua_pushstring(L, msg_errors[res]);
+                                    /* comm ... true nil error */
+      return 3;
+    }
   }
 }
 
 static const luaL_reg comm_functions[] = {
   {"wait", l_comm_wait},
   {"test", l_comm_test},
+  /* TODO waitany, testany */
   {NULL, NULL}
 };
 
@@ -1212,7 +1280,7 @@ static int run_lua_code(int argc, char **argv)
 
   /* create a new state, getting globals from maestro */
   lua_State *L = sglua_clone_maestro();
-  int res = 1;
+  MSG_process_set_data(MSG_process_self(), L);
 
   /* start the function */
   lua_getglobal(L, argv[0]);
@@ -1231,6 +1299,7 @@ static int run_lua_code(int argc, char **argv)
               lua_tostring(L, -1));
 
   /* retrieve result */
+  int res = 1;
   if (lua_isnumber(L, -1)) {
     res = lua_tonumber(L, -1);
     lua_pop(L, 1);              /* pop returned value */