From: Christophe ThiƩry Date: Tue, 15 Nov 2011 13:23:43 +0000 (+0100) Subject: Lua: copy tasks so that the sender doesn't have to sleep X-Git-Tag: exp_20120216~264 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/40b49952bc589b2972e9936828afe7f8f5d8cef2 Lua: copy tasks so that the sender doesn't have to sleep This will allow to implement asynchronous comms (isend/irecv) --- diff --git a/src/bindings/lua/simgrid_lua.c b/src/bindings/lua/simgrid_lua.c index ef4ace54fc..acd4e49f3a 100644 --- a/src/bindings/lua/simgrid_lua.c +++ b/src/bindings/lua/simgrid_lua.c @@ -161,6 +161,78 @@ static int l_task_execute(lua_State* L) } } +/** + * \brief Pops the Lua task on top of the stack and registers it so that a + * receiver can retrieve it later knowing the C task. + * + * After calling this function, you can send the C task to someone and he + * will be able to also get the corresponding Lua task. + * + * \param L a lua state + */ +static void task_register(lua_State* L) { + + m_task_t task = sglua_checktask(L, -1); + /* ... task */ + /* put in the C task a ref to the lua task so that the receiver finds it */ + unsigned long ref = luaL_ref(L, LUA_REGISTRYINDEX); + /* ... */ + MSG_task_set_data(task, (void*) ref); +} + +/** + * \brief Pushes onto the stack the Lua task corresponding to a C task. + * + * The Lua task must have been previously registered with task_register so + * that it can be retrieved knowing the C task. + * + * \param L a lua state + * \param task a C task + */ +static void task_unregister(lua_State* L, m_task_t task) { + + /* ... */ + /* the task is in my registry, put it onto my stack */ + unsigned long ref = (unsigned long) MSG_task_get_data(task); + lua_rawgeti(L, LUA_REGISTRYINDEX, ref); + /* ... task */ + luaL_unref(L, LUA_REGISTRYINDEX, ref); + MSG_task_set_data(task, NULL); +} + +/** + * \brief When a C task has been received, retrieves the corresponding Lua + * task from the sender and pushes it onto the receiver's stack. + * + * This function should be called from the receiver process. + * + * \param dst the receiver + * \param task the task just received + */ +static void task_copy(lua_State* dst, m_task_t task) { + + m_process_t src_proc = MSG_task_get_sender(task); + lua_State* src = MSG_process_get_data(src_proc); + + /* src: ... + dst: ... */ + task_unregister(src, task); + /* src: ... task */ + sglua_copy_value(src, dst); + /* src: ... task + dst: ... task */ + + /* the receiver is the owner of the task and may destroy it: + * make the C task NULL on the sender side so that it doesn't garbage + * collect it */ + lua_getfield(src, -1, "__simgrid_task"); + /* src: ... task ctask */ + m_task_t* udata = (m_task_t*) luaL_checkudata(src, -1, TASK_MODULE_NAME); + *udata = NULL; + lua_pop(src, 2); + /* src: ... */ +} + /** * \brief Sends a task to a mailbox and waits for its completion. * \param L a Lua state @@ -180,26 +252,19 @@ static int l_task_send(lua_State* L) /* task mailbox */ lua_settop(L, 1); /* task */ - /* copy my stack into the task, so that the receiver can copy the lua task */ - MSG_task_set_data(task, L); + task_register(L); + /* -- */ MSG_error_t res = MSG_task_send(task, mailbox); - while (MSG_task_get_data(task) != NULL) { - /* don't mess up with my stack: the receiver didn't copy the data yet */ - MSG_process_sleep(0); - } if (res == MSG_OK) { - /* the receiver is the owner of the task and may destroy it: - * remove the C task on my side so that I don't garbage collect it */ - lua_getfield(L, 1, "__simgrid_task"); - /* task ctask */ - m_task_t* udata = (m_task_t*) luaL_checkudata(L, -1, TASK_MODULE_NAME); - *udata = NULL; return 0; } else { - lua_settop(L, 0); + /* the communication has failed, I'm still the owner of the task */ + task_unregister(L, task); + /* task */ lua_pushstring(L, msg_errors[res]); + /* task error */ return 1; } } @@ -241,21 +306,19 @@ static int l_task_recv(lua_State* L) timeout = -1; /* no timeout by default */ } - lua_settop(L, 0); - /* -- */ + /* mailbox ... -- */ MSG_error_t res = MSG_task_receive_with_timeout(&task, mailbox, timeout); if (res == MSG_OK) { - /* copy the data directly from sender's stack */ - lua_State* sender_stack = MSG_task_get_data(task); - sglua_copy_value(sender_stack, L); - /* task */ - MSG_task_set_data(task, NULL); + task_copy(L, task); + /* mailbox ... task */ return 1; } else { lua_pushnil(L); + /* mailbox ... nil */ lua_pushstring(L, msg_errors[res]); + /* mailbox ... nil error */ return 2; } } @@ -344,92 +407,97 @@ static msg_comm_t sglua_checkcomm(lua_State* L, int index) * \param L a Lua state * \return number of values returned to Lua * - * This function performs a waitany operation: you can specify an - * individual communication or a list of communications. - * If you provide a list, the function returns whenever one communication of - * the list is finished, and this communication will be removed from you list. - * This means you can make a waitall operation with successive calls to this - * function. - * - * - Argument 1 (comm or table): a comm or an array of comms - * - Argument 2 (number, optional): timeout (supported only when there is only - * one comm) - * - Return values (comm + string): the first comm of your list that finishes, - * plus an error string if this comm was unsuccessful. + * - Argument 1 (comm): a comm (previously created by isend or irecv) + * - Argument 2 (number, optional): timeout (default is no timeout) + * - Return values (task or nil + string): in case of success, returns the task + * received if you are the receiver and nil if you are the sender. In case of + * failure, returns nil plus an error string. */ static int l_comm_wait(lua_State* L) { - if (lua_istable(L, 1)) { - // TODO implement waitany - THROW_UNIMPLEMENTED; + msg_comm_t comm = sglua_checkcomm(L, 1); + double timeout = -1; + if (lua_gettop(L) >= 2) { + timeout = luaL_checknumber(L, 2); } - else { - /* only one comm */ - msg_comm_t comm = sglua_checkcomm(L, 1); - double timeout = -1; - if (lua_gettop(L) >= 2) { - timeout = luaL_checknumber(L, 2); - } /* comm ... */ - MSG_error_t res = MSG_comm_wait(comm, timeout); - lua_settop(L, 1); - /* comm */ - if (res == MSG_OK) { - return 1; + MSG_error_t res = MSG_comm_wait(comm, timeout); + + if (res == MSG_OK) { + m_task_t task = MSG_comm_get_task(comm); + if (MSG_task_get_sender(task) == MSG_process_self()) { + /* I'm the sender */ + return 0; } else { - lua_pushstring(L, msg_errors[res]); - /* comm error */ - return 2; + /* I'm the receiver: copy the Lua task from the sender */ + task_copy(L, task); + /* comm ... task */ + return 1; } } - return 0; + else { + /* the communication has failed */ + lua_pushnil(L); + /* comm ... nil */ + lua_pushstring(L, msg_errors[res]); + /* comm ... nil error */ + return 2; + } } /** * @brief Returns whether a communication is finished. * - * This function always returns immediately. - * It performs a testany operation: you can provide an individual - * communication or a list of communications. + * Unlike wait(), This function always returns immediately. * - * - Argument 1 (comm or table): a comm or an array of comms - * - Return values (nil or comm + string): if no comm from your list is finished - * yet, returns nil. If a comm is finished, removes it from your list and - * returns it, plus an error string if it was unsuccessful. + * - Argument 1 (comm): a comm (previously created by isend or irecv) + * - Return value 1 (boolean): indicates whether the comm is finished + * (note that a finished comm may have failed) + * - Return value 2 (task): if you are the receiver, returns the task received + * in case of success, or nil if the comm has failed + * - Return value 3 (string): if the comm has failed, returns an error string */ static int l_comm_test(lua_State* L) { - if (lua_istable(L, 1)) { - /* TODO implement testany */ - THROW_UNIMPLEMENTED; + msg_comm_t comm = sglua_checkcomm(L, 1); + /* comm ... */ + if (!MSG_comm_test(comm)) { + return 0; } else { - /* only one comm */ - msg_comm_t comm = sglua_checkcomm(L, 1); - /* comm ... */ - if (!MSG_comm_test(comm)) { - return 0; - } - else { - lua_settop(L, 1); - /* comm */ - MSG_error_t res = MSG_comm_get_status(comm); - if (res == MSG_OK) { + MSG_error_t res = MSG_comm_get_status(comm); + + if (res == MSG_OK) { + lua_pushboolean(L, 1); + /* comm ... true */ + m_task_t task = MSG_comm_get_task(comm); + if (MSG_task_get_sender(task) == MSG_process_self()) { + /* I'm the sender */ return 1; } else { - lua_pushstring(L, msg_errors[res]); - /* comm error */ + /* I'm the receiver: copy the Lua task from the sender */ + task_copy(L, task); + /* comm ... true task */ return 2; } } + else { + /* the communication has failed */ + lua_pushnil(L); + /* comm ... true nil */ + lua_pushstring(L, msg_errors[res]); + /* comm ... true nil error */ + return 3; + } } } static const luaL_reg comm_functions[] = { {"wait", l_comm_wait}, {"test", l_comm_test}, + /* TODO waitany, testany */ {NULL, NULL} }; @@ -1212,7 +1280,7 @@ static int run_lua_code(int argc, char **argv) /* create a new state, getting globals from maestro */ lua_State *L = sglua_clone_maestro(); - int res = 1; + MSG_process_set_data(MSG_process_self(), L); /* start the function */ lua_getglobal(L, argv[0]); @@ -1231,6 +1299,7 @@ static int run_lua_code(int argc, char **argv) lua_tostring(L, -1)); /* retrieve result */ + int res = 1; if (lua_isnumber(L, -1)) { res = lua_tonumber(L, -1); lua_pop(L, 1); /* pop returned value */