Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[Lua] Updated the bittorent.lua test
[simgrid.git] / examples / lua / bittorrent / peer.lua
1 -- Copyright (c) 2012, 2014. The SimGrid Team.
2 -- All rights reserved.
3
4 -- This program is free software; you can redistribute it and/or modify it
5 -- under the terms of the license (GNU LGPL) which comes with this package.
6
7 -- A SimGrid Lua implementation of the Bittorrent protocol.
8
9 require("simgrid")
10 -- Common Constants
11 common = {
12         FILE_SIZE = 5120,
13         FILE_PIECE_SIZE = 512,
14         FILE_PIECES = 10,
15         
16         PIECE_COMM_SIZE = 1,
17         
18         MESSAGE_SIZE = 1,
19         MAXIMUM_PEERS = 50,
20         
21         TRACKER_QUERY_INTERVAL = 1000,
22         TRACKER_COMM_SIZE = 0.01,
23         
24         GET_PEERS_TIMEOUT = 10000,
25         TIMEOUT_MESSAGE = 10,
26         MAX_UNCHOKED_PEERS = 4,
27         UPDATE_CHOKED_INTERVAL = 50,
28         MAX_PIECES = 1,
29 }
30
31
32 -- Peer main function
33 function peer(...)
34         
35         local args = {...}
36         
37         if #args ~= 2 and #args ~= 3 then
38                 simgrid.info("Wrong number of arguments")
39         end
40                         
41         -- Setting the peer data
42         data = {
43                         -- Retrieving the peer id
44                         id = tonumber(args[1]),
45                         mailbox = tostring(tonumber(args[1])),
46                         mailbox_tracker = "tracker" .. args[1],
47                         peers = {},
48                         active_peers = {},
49                         current_pieces = {},
50                         pieces_requested = 0,
51                         bitfield = {},
52                         pieces_count = {},
53                         pieces = 0,
54                         deadline = tonumber(args[2]),
55                         round = 0
56         }
57         simgrid.info("Hi, I'm joining the network with id " .. data.id)
58
59         -- Checking if the peer is a seed
60         local bitfield_value = false
61         if args[3] == "1" then
62                 data.pieces = common.PIECES_COUNT
63                 bitfield_value = true
64         else
65                 data.pieces = 0
66         end
67         -- Building the peer bitfield and the pieces list
68         for i = 1, common.FILE_PIECES do
69                 data.pieces_count[i] = 0        
70                 data.bitfield[i] = bitfield_value
71         end
72         
73         if get_peers_data() == true then
74                 data.comm_received = simgrid.task.irecv(data.mailbox)
75                 if has_finished() then
76                         send_handshake_all()
77                         seed_loop()
78                 else
79                         leech_loop()
80                         seed_loop()
81                 end
82         else
83                 simgrid.info("Couldn't contact the tracker")
84         end
85         simgrid.info("My status is now " .. get_status())
86 end
87 -- Peer main loop when it is leeching
88 function leech_loop()
89         simgrid.info("Start downloading.")
90         local now = simgrid.get_clock()
91         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
92         -- Send a "handshake" message to all the peers it got
93         -- it couldn't have gotten more than 50 peers anyway)
94         send_handshake_all()
95         -- Wait for at leaast one bitfield message
96         wait_for_pieces()
97         
98         simgrid.info("Starting main leech loop")
99         local task, err
100         while now < data.deadline and data.pieces < common.FILE_PIECES do
101         task, err = data.comm_received:test()
102         if task then
103             handle_message(task)
104             data.comm_received = simgrid.task.irecv(data.mailbox)
105             now = simgrid.get_clock()
106         elseif err then
107             data.comm_received = simgrid.task.irecv(data.mailbox)
108         else
109             -- If the user has a pending interesting
110             if data.current_piece ~= -1 then
111                 send_interested_to_peers()
112             else
113                 if #data.current_pieces < common.MAX_PIECES then
114                     update_current_piece()
115                 end
116             end
117             -- We don't execute the choke algorithm if we don't already have a piece
118             if now >= next_choked_update and data.pieces > 0 then
119                 update_choked_peers()
120                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
121                 now = simgrid.get_clock()
122             else
123                 simgrid.process.sleep(1)
124                 now = simgrid.get_clock()
125             end
126         end
127         end
128 end
129 -- Peer main loop when it is seeding
130 function seed_loop()
131         local now = simgrid.get_clock()
132         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
133         simgrid.debug("Start seeding.")
134         -- Start the main seed loop
135         while now < data.deadline do
136                 task, err = data.comm_received:test()           
137                 if task then
138                         handle_message(task)
139                         data.comm_received = simgrid.task.irecv(data.mailbox)           
140                         now = simgrid.get_clock()
141                 elseif err then
142                         data.comm_received = simgrid.task.irecv(data.mailbox)           
143                 else
144                         if now >= next_choked_update then
145                                 update_choked_peers()
146                                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
147                                 now = simgrid.get_clock()
148                         else
149                                 simgrid.process.sleep(1)
150                                 now = simgrid.get_clock()
151                         end             
152                 end
153         end
154 end
155 -- Retrieve the peers list from the tracker
156 function get_peers_data() 
157         local success = false
158         local now = simgrid.get_clock()
159         local timeout = now + common.GET_PEERS_TIMEOUT
160         -- Build the task
161         local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
162         task_send.type = "REQUEST"
163         task_send.peer_id = data.id
164         task_send.mailbox = data.mailbox_tracker
165         -- Send the task
166         while not(success) and now < timeout do
167                 simgrid.debug("Sending a peer request to the tracker.")
168                 if task_send:send("tracker") then
169                         success = true
170                 end
171         end
172         now = simgrid.get_clock()
173         success = false
174         -- Wait for the answer
175         local comm_received = simgrid.task.irecv(data.mailbox_tracker)
176         while not(success) and now < timeout do
177                 local task_received = comm_received:wait(timeout)
178                 comm_received = simgrid.task.irecv(data.mailbox_tracker)
179                 if task_received then
180                         simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
181                         -- Add what we received to our peer list
182                         for i,v in pairs(task_received.peers) do
183                                 if v ~= data.id then
184                                         --Add the peer to our list and build its data
185                                         local peer_data = {}
186                                         peer_data.id = v;
187                                         peer_data.bitfield = nil
188                                         peer_data.mailbox = tostring(v);
189                                         peer_data.am_interested = false
190                                         peer_data.interested = false
191                                         peer_data.choked_upload = true
192                                         peer_data.choked_download = true
193                                         data.peers[v] = peer_data
194                                 end
195                         end
196                 else
197                         success = false
198                 end
199                 success = true
200         end
201         return success; 
202 end
203 -- Returns if the peer has finished downloading the piece
204 function has_finished() 
205         for i,v in pairs(data.bitfield) do
206                 if v == false then
207                         return false
208                 end
209         end
210         return true
211 end
212 -- Handle a received message sent by another peer
213 function handle_message(task)
214         local remote_peer = data.peers[task.peer_id]
215         
216         if task.type == "HANDSHAKE" then
217                 simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
218                 -- Check if the peer is in our connection list
219                 if data.peers[task.peer_id] == nil then
220                         local peer_data = {}
221                         peer_data.mailbox = task.mailbox
222                         peer_data.id = task.peer_id
223                         peer_data.am_interested = false
224                         peer_data.interested = false
225                         peer_data.choked_upload = true
226                         peer_data.choked_download = true
227                         peer_data.bitfield = nil
228                         data.peers[task.peer_id] = peer_data
229                         send_handshake(task.mailbox)
230                 end
231                 -- Send our bitfield to the peer
232                 send_bitfield(task.mailbox)
233         elseif task.type == "BITFIELD" then
234                 simgrid.debug("Received a BITFIELD from " .. task.mailbox)
235                 -- Update the pieces list
236                 update_piece_count_from_bitfield(task.bitfield)
237                 -- Update the current piece
238                 if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
239                         update_current_piece()
240                 end
241                 data.peers[task.peer_id].bitfield = task.bitfield
242         elseif task.type == "INTERESTED" then
243                 simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
244                 data.peers[task.peer_id].interested = true
245         elseif task.type == "NOTINTERESTED" then
246                 simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
247                 data.peers[task.peer_id].interested = false
248         elseif task.type == "UNCHOKE" then
249                 simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
250                 data.peers[task.peer_id].choked_download = false
251                 send_requests_to_peer(data.peers[task.peer_id])
252         elseif task.type == "CHOKE" then
253                 simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
254                 data.peers[task.peer_id].choked_download = true         
255         elseif task.type == "HAVE" then
256                 local remote_peer = data.peers[task.peer_id] 
257                 if remote_peer == nil or remote_peer.bitfield == nil then
258                         return
259                 end
260                 simgrid.debug("Received a HAVE message from " .. task.mailbox)
261                 data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
262                 -- Send interested message to the peer if he has what we want
263                 if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
264                         remote_peer.am_interested = true
265                         send_interested(remote_peer.mailbox)
266                 end
267                 if data.current_pieces[task.piece] ~= nil then
268                         send_request(task.mailbox,task.piece)
269                 end
270         elseif task.type == "REQUEST" then
271                 simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
272                 local remote_peer = data.peers[task.peer_id] 
273                 if remote_peer.choked_upload == false then
274                         if data.bitfield[task.piece] == true then
275                                 send_piece(task.mailbox,task.piece,false)
276                         end
277                 end
278         elseif task.type == "PIECE" then
279                 if task.stalled == true then
280                         simgrid.debug("The received piece is stalled")
281                 else
282                         simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
283                         if data.bitfield[task.piece] ~= true then
284                                 data.pieces_requested = data.pieces_requested - 1
285                                 -- Removing the piece from our piece list
286                                 data.current_pieces[task.piece] = nil
287                                 data.bitfield[task.piece] = true
288                                 data.pieces = data.pieces + 1
289                                 simgrid.debug("My status is now:" .. get_status())
290                                 -- Sending the information to all the peers we are connected to
291                                 send_have(task.piece)
292                                 -- Sending UNINTERESTED to the peers that doesn't have any more pieces
293                                 update_interested_after_receive()
294                         end
295                 end
296         end
297 end
298 -- Update the piece the peer is currently interested in.
299 -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
300 -- If the peer has less than 3 pieces, he chooses a piece at random.
301 -- If the peer has more than pieces, he downloads the pieces that are the less
302 -- replicated
303 function update_current_piece() 
304         if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
305                 return
306         end
307         if data.pieces < 3 or true then
308                 repeat
309                         data.current_piece = math.random(1,common.FILE_PIECES)
310 --                      simgrid.info("The new piece is:" .. data.current_piece)
311                 until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
312                 data.current_pieces[data.current_piece] = true
313                 data.pieces_requested = data.pieces_requested + 1
314         end             
315         
316 end
317 -- Updates the list of who has a piece from a bitfield
318 function update_piece_count_from_bitfield(bitfield)
319         for i,v in pairs(bitfield) do
320                 if v == true then       
321                         data.pieces_count[i] = data.pieces_count[i] + 1
322                 end
323         end
324 end
325 -- Wait for the node to receive interesting bitfield messages (ie: non empty)
326 function wait_for_pieces() 
327         local finished = false
328         local now = simgrid.get_clock()
329         local task
330         while now < data.deadline and not(finished) do
331                 task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
332                 if task then
333                         handle_message(task)
334                         if data.current_piece ~= -1 then
335                                 finished = true
336                         end     
337                 end
338                 data.comm_received = simgrid.task.irecv(data.mailbox)           
339         end
340 end
341 -- Update the list of current choked and unchoked peers, using the
342 -- choke algorithm
343 function update_choked_peers()
344         data.round = (data.round + 1) % 3
345         -- Remove a peer from the list
346         for i,v in pairs(data.active_peers) do
347                 data.active_peers[i] = nil
348                 send_choked(v.mailbox)
349                 break
350         end
351         -- Random optimistic unchoking
352         if true then
353                 local values = {}
354                 for key, value in pairs(data.peers) do
355                         values[#values + 1] = value
356                 end
357                 local peer_choosed = nil
358                 local j = 0
359
360                 repeat
361                         peer_choosed = values[math.random(#values)]             
362                         if peer_choosed.interested ~= true then
363                                 peer_choosed = nil
364                         end
365                         j = j + 1
366                 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
367                 if peer_choosed ~= nil then
368                         data.active_peers[peer_choosed.id] = peer_choosed
369                         peer_choosed.choked_upload = false
370                         send_unchoked(peer_choosed.mailbox)
371                 end
372         end
373         -- TODO: Use the leecher choke algorithm
374 end
375 --  Updates our "interested" state about peers: send "not interested" to peers
376 --  that don't have any more pieces we want.
377 function update_interested_after_receive()
378         local interested = false
379         for i,v in pairs(data.peers) do
380                 if v.am_interested then
381                         for piece,j in pairs(data.current_pieces) do
382                                 if v.bitfield ~= nil then
383                                         if v.bitfield[piece] == true then
384                                                 interested = true
385                                                 break
386                                         else
387                                         end
388                                 end
389                         end
390                         if not(interested) then
391                                 v.am_interested = false
392                                 send_not_interested(v.mailbox)
393                         end
394                 end
395         end
396 end
397 -- Find the peers that have the current interested piece and send them
398 -- the "interested" message
399 function send_interested_to_peers()
400         if data.current_piece == -1 then
401                 return
402         end
403         for i,v in pairs(data.peers) do
404                 if v.bitfield ~= nil then
405                         v.am_interested = true
406                         send_interested(v.mailbox)
407                 end
408         end
409         data.current_piece = -1
410 end 
411 -- Send a "interested" message to a peer.
412 function send_interested(mailbox)
413         simgrid.debug("Sending a INTERESTED to " .. mailbox)
414         local task = new_task("INTERESTED")
415         task:dsend(mailbox)
416 end
417 -- Send a "not interested" message to a peer.
418 function send_not_interested(mailbox)
419     simgrid.debug("Sending a send_not_interested")
420         local task = new_task("NOTINTERESTED")
421         task:dsend(mailbox)
422 end
423 -- Send a handshake message to all the peers the peer has
424 function send_handshake_all()
425         for i,v in pairs(data.peers) do
426                 local task = new_task("HANDSHAKE")
427                 task:dsend(v.mailbox)
428         end
429 end
430 -- Send a "handshake" message to an user
431 function send_handshake(mailbox)
432         simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
433         local task = new_task("HANDSHAKE")
434         task:dsend(mailbox)             
435 end
436 -- Send a "choked" message to a peer
437 function send_choked(mailbox)
438         simgrid.debug("Sending a CHOKE to " .. mailbox)
439         local task = new_task("CHOKE")
440         task:dsend(mailbox)     
441 end
442 -- Send a "unchoked" message to a peer
443 function send_unchoked(mailbox)
444         simgrid.debug("Sending a UNCHOKE to " .. mailbox)
445         local task = new_task("UNCHOKE")
446         task:dsend(mailbox)     
447 end
448 -- Send a "HAVE" message to all peers we are connected to
449 function send_have(piece)
450       simgrid.debug("Sending a HAVE message")
451         for i,v in pairs(data.peers) do
452                 local task = new_task("HAVE")
453                 task.piece = piece
454                 task:dsend(v.mailbox)
455         end
456 end
457 -- Send request messages to a peer that have unchoked us        
458 function send_requests_to_peer(remote_peer)
459     simgrid.debug("Sending a request to peer " .. remote_peer.mailbox)
460         for i,v in pairs(data.current_pieces) do
461                 send_request(remote_peer.mailbox,i)
462         end
463 end
464 -- Send a bitfield message to a peer
465 function send_bitfield(mailbox)
466         simgrid.debug("Sending a BITFIELD to " .. mailbox)
467         local task = new_task("BITFIELD")
468         task.bitfield = data.bitfield
469         task:dsend(mailbox)
470 end
471 -- Send a "request" message to a pair, containing a request for a piece
472 function send_request(mailbox, piece)
473         simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
474         local task =  new_task("REQUEST")
475         task.piece = piece
476         task:dsend(mailbox)
477 end     
478 -- Send a "piece" messageto a pair, containing a piece of the file
479 function send_piece(mailbox, piece, stalled)
480         simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
481         local task = new_task("PIECE")
482         task.piece = piece
483         task.stalled = stalled
484         task:dsend(mailbox)     
485 end
486 function new_task(type)
487         local task = simgrid.task.new(type, 0, common.MESSAGE_SIZE)
488         task.type = type
489         task.mailbox = data.mailbox
490         task.peer_id = data.id  
491         return task
492 end
493 function get_status()
494         local s = ""
495         for i,v in pairs(data.bitfield) do
496                 if v == true then
497                          s = s .. '1'
498                 else
499                         s = s .. '0'
500                 end
501         end
502         return s
503 end