--- /dev/null
+dofile "sim_splay.lua"
+between, call, thread, ping = misc.between_c, rpc.call, events.thread, rpc.ping
+n, predecessor, finger, timeout, m = {}, nil, {}, 5, 24
+function join(n0) -- n0: some node in the ring
+ simgrid.info("Euh...")
+ predecessor = nil
+ finger[1] = call(n0, {'find_successor', n.id})
+ simgrid.info("8Here..")
+ call(finger[1], {'notify', n})
+end
+
+function closest_preceding_node(id)
+ for i = m, 1, -1 do
+ if finger[i] and between(finger[i].id, n.id, id) then
+ return finger[i]
+ end
+ end
+ return n
+end
+
+function find_successor(id)
+ if finger[1].id == n.id or between(id, n.id, (finger[1].id + 1) % 2^m) then
+ return finger[1]
+ else
+ local n0 = closest_preceding_node(id)
+ return call(n0, {'find_successor', id})
+ end
+end
+function stabilize()
+ local x = call(finger[1], 'predecessor')
+ if x and between(x.id, n.id, finger[1].id) then
+ finger[1] = x -- new successor
+ call(finger[1], {'notify', n})
+ end
+end
+function notify(n0)
+ if n0.id ~= n.id and
+ (not predecessor or between(n0.id, predecessor.id, n.id)) then
+ predecessor = n0
+ end
+end
+function fix_fingers()
+ refresh = (refresh and (refresh % m) + 1) or 1 -- 1 <= next <= m
+ finger[refresh] = find_successor((n.id + 2^(refresh - 1)) % 2^m)
+end
+function check_predecessor()
+ if predecessor and not rpc.ping(predecessor) then
+ predecessor = nil
+ end
+end
+
+n.id = math.random(1, 2^m)
+finger[1] = n
+if job then
+ n.ip, n.port = job.me.ip, job.me.port
+ join({ip = "192.42.43.42", port = 20000})
+else
+ simgrid.info("bizzaaaaaar...")
+ n.ip, n.port = "127.0.0.1", 20000
+ if arg[1] then n.ip = arg[1] end
+ if arg[2] then n.port = tonumber(arg[2]) end
+ if not arg[3] then
+ print("RDV")
+ else
+ print("JOIN")
+ thread(function() join({ip = arg[3], port = tonumber(arg[4])}) end)
+ end
+end
+rpc.server(n.port)
+events.periodic(stabilize, timeout)
+events.periodic(check_predecessor, timeout)
+events.periodic(fix_fingers, timeout)
+events.loop()
rpc = {}
log = {}
job = {}
-event = {}
+events = {}
os = {}
start = {}
+misc = {}
-- Splay global variables
job.me ={}
job.nodes = {}
job.list_type = "random"
-
--Init nodes tables
function init_nodes()
for i= 1,simgrid.Host.number() do
init_nodes()
end
+
-- Job methods
function job.me.ip()
return simgrid.Host.getPropValue(simgrid.Host.self(),"ip");
end
+
function job.me.port()
return simgrid.Host.getPropValue(simgrid.Host.self(),"port");
end
+
function job.position()
return simgrid.Host.getPropValue(simgrid.Host.self(),"position");
end
arg = "empty"
mailbox = node
+ if type(node) == "table" then
+ mailbox = node.ip..":"..node.port
+ end
+
if type(call) == "table" then
func = call[1]
arg = call[2]
task_call = simgrid.Task.new("splay_task",10000,10000);
task_call['func_call_name'] = func;
task_call['func_call_arg'] = arg;
- --log:print("Sending Task to mailbox "..mailbox.." to call "..func.." with arg "..arg);
+ log:print("Sending Task to mailbox "..mailbox.." to call '"..func.."' with arg '"..arg.."'");
simgrid.Task.iSend(task_call,mailbox);
- call_function(func,arg)
+
end
+function rpc.server(port)
+ -- nothing really to do : no need to open Socket since it's a Simulation
+end
+
+
-- event Methods
-function event.sleep(time)
+function events.sleep(time)
my_mailbox = job.me.ip()..":"..job.me.port()
- task = simgrid.Task.splay_recv(my_mailbox, time)
+ tk = simgrid.Task.splay_recv(my_mailbox, time)
+
+ if type(tk) == "table" then
+ call_function(task['func_call_name'],task['func_call_arg'])
+ else log:print("task type is :"..type(tk).." it must be table?!");
+ end
end
+
-- main func for each process, this is equivalent to the Deploiment file
-function event.thread(main_func)
+function events.thread(main_func)
dofile("platform_script.lua");
init_jobs()
end
-- Start Methods
function start.loop()
simgrid.run()
- simgrid.clean()
+ --simgrid.clean()
end
+
+-- Misc Methods
+function misc.between(a,b)
+ return a
+end
+
-- useful functions
function call_function(fct,arg)
_G[fct](arg)
end
-function SPLAYschool()
- simgrid.info("Calling me...")
+function SPLAYschool(arg)
+ simgrid.info("Calling me..."..arg)
end
function SPLAYschool()
log:print("My ip is :" ..job.me.ip())
- event.sleep(5)
- rpc.call(job.nodes[3],{"call_me","Helloooooow"})
- event.sleep(5)
+ events.sleep(5)
+ rpc.call(job.nodes[3],{"call_me","Arg_test"})
+ events.sleep(5)
os.exit()
end
log:print("I received an RPC from node "..position);
end
-event.thread("SPLAYschool")
+events.thread("SPLAYschool")
start.loop()
("MSG_task_receive failed : Unexpected error , please report this bug");
break;
}
-
return 1;
}
MSG_comm_wait(comm, timeout);
if (MSG_comm_get_status(comm) == MSG_OK)
{
+ XBT_DEBUG("Receiving task : %s",MSG_task_get_name(task));
lua_State *sender_stack = MSG_task_get_data(task);
lua_xmove(sender_stack, L, 1); // copy the data directly from sender's stack
MSG_task_set_data(task, NULL);
- MSG_comm_destroy(comm);
}
-
- return 1;
+ MSG_comm_destroy(comm);
+ return 1;
}
static int Task_splay_isend(lua_State *L)