Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cosmetics
[simgrid.git] / examples / msg / app-bittorrent / peer.c
1 /* Copyright (c) 2012-2016. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <simgrid/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26 #define ENABLE_END_GAME_MODE 1
27
28 /** Number of blocks asked by each request */
29 #define BLOCKS_REQUESTED 2
30
31 static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
32
33 #define SLEEP_DURATION 1
34
35 /** Peer main function */
36 int peer(int argc, char *argv[])
37 {
38   s_peer_t peer;
39   //Check arguments
40   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
41   //Build peer object
42   if (argc == 4) {
43     peer_init(&peer, xbt_str_parse_int(argv[1],"Invalid ID: %s"), 1);
44   } else {
45     peer_init(&peer, xbt_str_parse_int(argv[1],"Invalid ID: %s"), 0);
46   }
47   //Retrieve deadline
48   double deadline = xbt_str_parse_double(argv[2],"Invalid deadline: %s");
49   xbt_assert(deadline > 0, "Wrong deadline supplied");
50   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
51   //Getting peer data from the tracker.
52   if (get_peers_data(&peer)) {
53     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
54     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
55     peer.begin_receive_time = MSG_get_clock();
56     MSG_mailbox_set_async(peer.mailbox);
57     if (has_finished(peer.bitfield)) {
58       peer.pieces = FILE_PIECES;
59       send_handshake_all(&peer);
60       seed_loop(&peer, deadline);
61     } else {
62       leech_loop(&peer, deadline);
63       seed_loop(&peer, deadline);
64     }
65   } else {
66     XBT_INFO("Couldn't contact the tracker.");
67   }
68
69   XBT_INFO("Here is my current status: %s", peer.bitfield);
70   if (peer.comm_received) {
71     MSG_comm_destroy(peer.comm_received);
72   }
73
74   peer_free(&peer);
75   return 0;
76 }
77
78 /** @brief Peer main loop when it is leeching.
79  *  @param peer peer data
80  *  @param deadline time at which the peer has to leave
81  */
82 void leech_loop(peer_t peer, double deadline)
83 {
84   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
85   XBT_DEBUG("Start downloading.");
86   /*
87    * Send a "handshake" message to all the peers it got
88    * (since it couldn't have gotten more than 50 peers)
89    */
90   send_handshake_all(peer);
91   XBT_DEBUG("Starting main leech loop");
92
93   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
94     if (peer->comm_received == NULL) {
95       peer->task_received = NULL;
96       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
97     }
98     if (MSG_comm_test(peer->comm_received)) {
99       msg_error_t status = MSG_comm_get_status(peer->comm_received);
100       MSG_comm_destroy(peer->comm_received);
101       peer->comm_received = NULL;
102       if (status == MSG_OK) {
103         handle_message(peer, peer->task_received);
104       }
105     } else {
106       //We don't execute the choke algorithm if we don't already have a piece
107       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
108         update_choked_peers(peer);
109         next_choked_update += UPDATE_CHOKED_INTERVAL;
110       } else {
111         MSG_process_sleep(SLEEP_DURATION);
112       }
113     }
114   }
115   if (peer->pieces == FILE_PIECES)
116     XBT_DEBUG("%d becomes a seeder", peer->id);
117
118 }
119
120 /** @brief Peer main loop when it is seeding
121  *  @param peer peer data
122  *  @param deadline time when the peer will leave
123  */
124 void seed_loop(peer_t peer, double deadline)
125 {
126   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
127   XBT_DEBUG("Start seeding.");
128   //start the main seed loop
129   while (MSG_get_clock() < deadline) {
130     if (peer->comm_received == NULL) {
131       peer->task_received = NULL;
132       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
133     }
134     if (MSG_comm_test(peer->comm_received)) {
135       msg_error_t status = MSG_comm_get_status(peer->comm_received);
136       MSG_comm_destroy(peer->comm_received);
137       peer->comm_received = NULL;
138       if (status == MSG_OK) {
139         handle_message(peer, peer->task_received);
140       }
141     } else {
142       if (MSG_get_clock() >= next_choked_update) {
143         update_choked_peers(peer);
144         //TODO: Change the choked peer algorithm when seeding.
145         next_choked_update += UPDATE_CHOKED_INTERVAL;
146       } else {
147         MSG_process_sleep(SLEEP_DURATION);
148       }
149     }
150   }
151 }
152
153 /** @brief Retrieves the peer list from the tracker
154  *  @param peer current peer data
155  */
156 int get_peers_data(peer_t peer)
157 {
158   int success = 0;
159   int send_success = 0;
160   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
161   //Build the task to send to the tracker
162   tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker,
163                                                    peer->id, 0, 0, FILE_SIZE);
164   //Build the task to send.
165   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
166   msg_task_t task_received = NULL;
167   msg_comm_t comm_received;
168   while ((send_success == 0) && MSG_get_clock() < timeout) {
169     XBT_DEBUG("Sending a peer request to the tracker.");
170     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, GET_PEERS_TIMEOUT);
171     if (status == MSG_OK) {
172       send_success = 1;
173     }
174   }
175   while ((success ==0) && MSG_get_clock() < timeout) {
176     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
177     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
178     if (status == MSG_OK) {
179       tracker_task_data_t data = MSG_task_get_data(task_received);
180       unsigned i;
181       int peer_id;
182       //Add the peers the tracker gave us to our peer list.
183       xbt_dynar_foreach(data->peers, i, peer_id) {
184         if (peer_id != peer->id)
185           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int), connection_new(peer_id), NULL);
186       }
187       success = 1;
188       //free the communication and the task
189       MSG_comm_destroy(comm_received);
190       tracker_task_data_free(data);
191       MSG_task_destroy(task_received);
192     }
193   }
194
195   return success;
196 }
197
198 /** @brief Initialize the peer data.
199  *  @param peer peer data
200  *  @param id id of the peer to take in the network
201  *  @param seed indicates if the peer is a seed.
202  */
203 void peer_init(peer_t peer, int id, int seed)
204 {
205   peer->id = id;
206   snprintf(peer->mailbox,MAILBOX_SIZE-1, "%d", id);
207   snprintf(peer->mailbox_tracker,MAILBOX_SIZE-1, "tracker_%d", id);
208   peer->peers        = xbt_dict_new_homogeneous(NULL);
209   peer->active_peers = xbt_dict_new_homogeneous(NULL);
210   peer->hostname = MSG_host_get_name(MSG_host_self());
211
212   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
213   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
214   if (seed) {
215     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
216     memset(peer->bitfield_blocks, '1', sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
217   } else {
218     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
219     memset(peer->bitfield_blocks, '0', sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
220   }
221
222   peer->bitfield[FILE_PIECES] = '\0';
223   peer->pieces = 0;
224
225   peer->pieces_count = xbt_new0(short, FILE_PIECES);
226
227   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
228
229   peer->stream        = (RngStream)MSG_host_get_data(MSG_host_self());
230   peer->comm_received = NULL;
231
232   peer->round = 0;
233 }
234
235 /** Destroys a poor peer object. */
236 void peer_free(peer_t peer)
237 {
238   char *key;
239   connection_t connection;
240   xbt_dict_cursor_t cursor;
241   xbt_dict_foreach(peer->peers, cursor, key, connection) {
242     connection_free(connection);
243   }
244   xbt_dict_free(&peer->peers);
245   xbt_dict_free(&peer->active_peers);
246   xbt_dynar_free(&peer->current_pieces);
247   xbt_free(peer->pieces_count);
248   xbt_free(peer->bitfield);
249   xbt_free(peer->bitfield_blocks);
250 }
251
252 /** @brief Returns if a peer has finished downloading the file
253  *  @param bitfield peer bitfield
254  */
255 int has_finished(char *bitfield)
256 {
257   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
258 }
259
260 int nb_interested_peers(peer_t peer)
261 {
262   xbt_dict_cursor_t cursor = NULL;
263   char *key;
264   connection_t connection;
265   int nb = 0;
266   xbt_dict_foreach(peer->peers, cursor, key, connection) {
267     if (connection->interested)
268       nb++;
269   }
270   return nb;
271 }
272
273 void update_active_peers_set(peer_t peer, connection_t remote_peer)
274 {
275   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
276     //add in the active peers set
277     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int), remote_peer, NULL);
278   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int))) {
279     xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int));
280   }
281 }
282
283 /** @brief Handle a received message sent by another peer
284  * @param peer Peer data
285  * @param task task received.
286  */
287 void handle_message(peer_t peer, msg_task_t task)
288 {
289   message_t message = MSG_task_get_data(task);
290   connection_t remote_peer;
291   remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id, sizeof(int));
292   switch (message->type) {
293   case MESSAGE_HANDSHAKE:
294     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox, message->issuer_host_name);
295     //Check if the peer is in our connection list.
296     if (remote_peer == 0) {
297       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
298       send_handshake(peer, message->mailbox);
299     }
300     //Send our bitfield to the peer
301     send_bitfield(peer, message->mailbox);
302     break;
303   case MESSAGE_BITFIELD:
304     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox, message->issuer_host_name);
305     //Update the pieces list
306     update_pieces_count_from_bitfield(peer, message->bitfield);
307     //Store the bitfield
308     remote_peer->bitfield = xbt_strdup(message->bitfield);
309     xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
310     if (is_interested(peer, remote_peer)) {
311       remote_peer->am_interested = 1;
312       send_interested(peer, message->mailbox);
313     }
314     break;
315   case MESSAGE_INTERESTED:
316     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
317     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
318     //Update the interested state of the peer.
319     remote_peer->interested = 1;
320     update_active_peers_set(peer, remote_peer);
321     break;
322   case MESSAGE_NOTINTERESTED:
323     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
324     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
325     remote_peer->interested = 0;
326     update_active_peers_set(peer, remote_peer);
327     break;
328   case MESSAGE_UNCHOKE:
329     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
330     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox, message->issuer_host_name);
331     xbt_assert(remote_peer->choked_download);
332     remote_peer->choked_download = 0;
333     //Send requests to the peer, since it has unchoked us
334     if (remote_peer->am_interested)
335       request_new_piece_to_peer(peer, remote_peer);
336     break;
337   case MESSAGE_CHOKE:
338     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
339     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox, message->issuer_host_name);
340     xbt_assert(!remote_peer->choked_download);
341     remote_peer->choked_download = 1;
342     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
343     break;
344   case MESSAGE_HAVE:
345     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d", message->mailbox, message->issuer_host_name,
346               message->index);
347     xbt_assert(remote_peer->bitfield, "bitfield not received");
348     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
349     remote_peer->bitfield[message->index] = '1';
350     peer->pieces_count[message->index]++;
351     //If the piece is in our pieces, we tell the peer that we are interested.
352     if ((remote_peer->am_interested == 0) &&
353          peer->bitfield[message->index] == '0') {
354       remote_peer->am_interested = 1;
355       send_interested(peer, message->mailbox);
356       if (remote_peer->choked_download == 0)
357         request_new_piece_to_peer(peer, remote_peer);
358     }
359     break;
360   case MESSAGE_REQUEST:
361     xbt_assert(remote_peer->interested);
362
363     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
364     if (remote_peer->choked_upload == 0) {
365       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)", message->mailbox, message->issuer_host_name,
366                 message->index, message->block_index, message->block_index + message->block_length);
367       if (peer->bitfield[message->index] == '1') {
368         send_piece(peer, message->mailbox, message->index, message->block_index, message->block_length);
369       }
370     } else {
371       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.", message->mailbox,
372                 message->issuer_host_name, message->peer_id);
373     }
374     break;
375   case MESSAGE_PIECE:
376     XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index, message->block_index,
377               message->block_index + message->block_length, message->mailbox, message->issuer_host_name);
378     xbt_assert(!remote_peer->choked_download);
379     xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
380                "Can't received a piece if I'm not interested wihtout end-game mode!"
381                "piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
382     xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
383     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
384     //TODO: Execute à computation.
385       if (peer->bitfield[message->index] == '0') {
386         update_bitfield_blocks(peer, message->index, message->block_index, message->block_length);
387         if (piece_complete(peer, message->index)) {
388           //Removing the piece from our piece list
389           remove_current_piece(peer, remote_peer, message->index);
390           //Setting the fact that we have the piece
391           peer->bitfield[message->index] = '1';
392           peer->pieces++;
393           XBT_DEBUG("My status is now %s", peer->bitfield);
394           //Sending the information to all the peers we are connected to
395           send_have(peer, message->index);
396           //sending UNINTERSTED to peers that doesn't have what we want.
397           update_interested_after_receive(peer);
398         } else {                // piece not completed
399           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
400         }
401       } else {
402         XBT_DEBUG("However, we already have it");
403         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
404         request_new_piece_to_peer(peer, remote_peer);
405       }
406     break;
407   case MESSAGE_CANCEL:
408     XBT_DEBUG("The received CANCEL from %s (%s)", message->mailbox, message->issuer_host_name);
409     break;
410   default:
411     THROW_IMPOSSIBLE;
412   }
413   //Update the peer speed.
414   if (remote_peer) {
415     connection_add_speed_value(remote_peer, 1.0 / (MSG_get_clock() - peer->begin_receive_time));
416   }
417   peer->begin_receive_time = MSG_get_clock();
418
419   task_message_free(task);
420 }
421
422 /** Selects the appropriate piece to download and requests it to the remote_peer */
423 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
424 {
425   int piece = select_piece_to_download(peer, remote_peer);
426   if (piece != -1) {
427     xbt_dynar_push_as(peer->current_pieces, int, piece);
428     send_request_to_peer(peer, remote_peer, piece);
429   }
430 }
431
432 /** remove current_piece from the list of currently downloaded pieces. */
433 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece)
434 {
435   int piece_index = xbt_dynar_search_or_negative(peer->current_pieces, &current_piece);
436   if (piece_index != -1)
437     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
438   remote_peer->current_piece = -1;
439 }
440
441 /** @brief Updates the list of who has a piece from a bitfield
442  *  @param peer peer we want to update the list
443  *  @param bitfield bitfield
444  */
445 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
446 {
447   int i;
448   for (i = 0; i < FILE_PIECES; i++) {
449     if (bitfield[i] == '1') {
450       peer->pieces_count[i]++;
451     }
452   }
453 }
454
455 /** @brief Return the piece to be downloaded
456  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
457  * If a piece is partially downloaded, this piece will be selected prioritarily
458  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
459  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
460  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
461  * @param peer: local peer
462  * @param remote_peer: information about the connection
463  * @return the piece to download if possible. -1 otherwise
464  */
465 int select_piece_to_download(peer_t peer, connection_t remote_peer)
466 {
467   int piece = -1;
468
469   piece = partially_downloaded_piece(peer, remote_peer);
470   // strict priority policy
471   if (piece != -1)
472     return piece;
473
474   // end game mode
475   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) &&
476       (is_interested(peer, remote_peer) != 0)) {
477 #if ENABLE_END_GAME_MODE == 0
478       return -1;
479 #endif
480     int i;
481     int nb_interesting_pieces = 0;
482     int current_index = 0;
483     // compute the number of interesting pieces
484     for (i = 0; i < FILE_PIECES; i++) {
485       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
486         nb_interesting_pieces++;
487       }
488     }
489     xbt_assert(nb_interesting_pieces != 0);
490     // get a random interesting piece
491     int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
492     for (i = 0; i < FILE_PIECES; i++) {
493       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
494         if (random_piece_index == current_index) {
495           piece = i;
496           break;
497         }
498         current_index++;
499       }
500     }
501     xbt_assert(piece != -1);
502     return piece;
503   }
504   // Random first policy
505   if (peer->pieces < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
506     int i;
507     int nb_interesting_pieces = 0;
508     int current_index = 0;
509     // compute the number of interesting pieces
510     for (i = 0; i < FILE_PIECES; i++) {
511       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
512           (in_current_pieces(peer, i) == 0)) {
513         nb_interesting_pieces++;
514       }
515     }
516     xbt_assert(nb_interesting_pieces != 0);
517     // get a random interesting piece
518     int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
519     for (i = 0; i < FILE_PIECES; i++) {
520       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
521           (in_current_pieces(peer, i) == 0)) {
522         if (random_piece_index == current_index) {
523           piece = i;
524           break;
525         }
526         current_index++;
527       }
528     }
529     xbt_assert(piece != -1);
530     return piece;
531   } else {                      // Rarest first policy
532     int i;
533     short min = SHRT_MAX;
534     int nb_min_pieces = 0;
535     int current_index = 0;
536     // compute the smallest number of copies of available pieces
537     for (i = 0; i < FILE_PIECES; i++) {
538       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0' &&
539           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) == 0))
540         min = peer->pieces_count[i];
541     }
542     xbt_assert(min != SHRT_MAX ||
543                (is_interested_and_free(peer, remote_peer) ==0));
544     // compute the number of rarest pieces
545     for (i = 0; i < FILE_PIECES; i++) {
546       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
547           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) ==0))
548         nb_min_pieces++;
549     }
550     xbt_assert(nb_min_pieces != 0 ||
551                (is_interested_and_free(peer, remote_peer)==0));
552     // get a random rarest piece
553     int random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
554     for (i = 0; i < FILE_PIECES; i++) {
555       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
556           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i)==0)) {
557         if (random_rarest_index == current_index) {
558           piece = i;
559           break;
560         }
561         current_index++;
562       }
563     }
564     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
565     return piece;
566   }
567 }
568
569 /** @brief Update the list of current choked and unchoked peers, using the choke algorithm
570  *  @param peer the current peer
571  */
572 void update_choked_peers(peer_t peer)
573 {
574   if (nb_interested_peers(peer) == 0)
575     return;
576   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id, xbt_dict_size(peer->active_peers));
577   //update the current round
578   peer->round = (peer->round + 1) % 3;
579   char *key;
580   char *key_choked=NULL;
581   connection_t peer_choosed = NULL;
582   connection_t peer_choked = NULL;
583   //remove a peer from the list
584   xbt_dict_cursor_t cursor = NULL;
585   xbt_dict_cursor_first(peer->active_peers, &cursor);
586   if (xbt_dict_length(peer->active_peers) > 0) {
587     key_choked = xbt_dict_cursor_get_key(cursor);
588     peer_choked = xbt_dict_cursor_get_data(cursor);
589   }
590   xbt_dict_cursor_free(&cursor);
591
592   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
593   if (peer->pieces == FILE_PIECES) {
594     connection_t connection;
595     double unchoke_time = MSG_get_clock() + 1;
596
597     xbt_dict_foreach(peer->peers, cursor, key, connection) {
598       if (connection->last_unchoke < unchoke_time &&
599           (connection->interested != 0) && (connection->choked_upload != 0)) {
600         unchoke_time = connection->last_unchoke;
601         peer_choosed = connection;
602       }
603     }
604   } else {
605     //Random optimistic unchoking
606     if (peer->round == 0) {
607       int j = 0;
608       do {
609         //We choose a random peer to unchoke.
610         int id_chosen = RngStream_RandInt(peer->stream, 0, xbt_dict_length(peer->peers) - 1);
611         int i = 0;
612         connection_t connection;
613         xbt_dict_foreach(peer->peers, cursor, key, connection) {
614           if (i == id_chosen) {
615             peer_choosed = connection;
616             break;
617           }
618           i++;
619         }
620         xbt_dict_cursor_free(&cursor);
621         if (peer_choosed == NULL)
622           THROWF(unknown_error, 0, "A peer should have be selected at this point");
623         else if ((peer_choosed->interested == 0) || (peer_choosed->choked_upload == 0))
624           peer_choosed = NULL;
625         else
626           XBT_DEBUG("Nothing to do, keep going");
627         j++;
628       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
629     } else {
630       //Use the "fastest download" policy.
631       connection_t connection;
632       double fastest_speed = 0.0;
633       xbt_dict_foreach(peer->peers, cursor, key, connection) {
634         if (connection->peer_speed > fastest_speed &&
635             (connection->choked_upload != 0) && (connection->interested != 0)) {
636           peer_choosed = connection;
637           fastest_speed = connection->peer_speed;
638         }
639       }
640     }
641   }
642
643   if (peer_choosed != NULL)
644     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
645               peer->id, peer_choosed->id, peer_choosed->interested, peer_choosed->choked_upload);
646
647   if (peer_choked != peer_choosed) {
648     if (peer_choked != NULL) {
649       xbt_assert((!peer_choked->choked_upload), "Tries to choked a choked peer");
650       peer_choked->choked_upload = 1;
651       xbt_assert((*((int *) key_choked) == peer_choked->id));
652       update_active_peers_set(peer, peer_choked);
653       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
654       send_choked(peer, peer_choked->mailbox);
655     }
656     if (peer_choosed != NULL) {
657       xbt_assert((peer_choosed->choked_upload), "Tries to unchoked an unchoked peer");
658       peer_choosed->choked_upload = 0;
659       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id, sizeof(int), peer_choosed, NULL);
660       peer_choosed->last_unchoke = MSG_get_clock();
661       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
662       update_active_peers_set(peer, peer_choosed);
663       send_unchoked(peer, peer_choosed->mailbox);
664     }
665   }
666 }
667
668 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.
669  *  @param peer our peer data
670  */
671 void update_interested_after_receive(peer_t peer)
672 {
673   char *key;
674   xbt_dict_cursor_t cursor;
675   connection_t connection;
676   int interested;
677   xbt_dict_foreach(peer->peers, cursor, key, connection) {
678     interested = 0;
679     if (connection->am_interested != 0) {
680       xbt_assert(connection->bitfield, "Bitfield not received");
681       //Check if the peer still has a piece we want.
682       int i;
683       for (i = 0; i < FILE_PIECES; i++) {
684         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
685           interested = 1;
686           break;
687         }
688       }
689       if (!interested) {        //no more piece to download from connection
690         connection->am_interested = 0;
691         send_notinterested(peer, connection->mailbox);
692       }
693     }
694   }
695 }
696
697 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
698 {
699   int i;
700   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
701   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
702   for (i = block_index; i < (block_index + block_length); i++) {
703     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
704   }
705 }
706
707 /** Returns if a peer has completed the download of a piece */
708 int piece_complete(peer_t peer, int index)
709 {
710   int i;
711   for (i = 0; i < PIECES_BLOCKS; i++) {
712     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
713       return 0;
714     }
715   }
716   return 1;
717 }
718
719 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
720 int get_first_block(peer_t peer, int piece)
721 {
722   int i;
723   for (i = 0; i < PIECES_BLOCKS; i++) {
724     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
725       return i;
726     }
727   }
728   return -1;
729 }
730
731 /** Indicates if the remote peer has a piece not stored by the local peer */
732 int is_interested(peer_t peer, connection_t remote_peer)
733 {
734   xbt_assert(remote_peer->bitfield, "Bitfield not received");
735   for (int i = 0; i < FILE_PIECES; i++) {
736     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
737       return 1;
738     }
739   }
740   return 0;
741 }
742
743 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
744 int is_interested_and_free(peer_t peer, connection_t remote_peer)
745 {
746   xbt_assert(remote_peer->bitfield, "Bitfield not received");
747   for (int i = 0; i < FILE_PIECES; i++) {
748     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
749         (in_current_pieces(peer, i) == 0)) {
750       return 1;
751     }
752   }
753   return 0;
754 }
755
756 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
757 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
758 {
759   xbt_assert(remote_peer->bitfield, "Bitfield not received");
760   for (int i = 0; i < FILE_PIECES; i++) {
761     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
762         (in_current_pieces(peer, i) == 0)) {
763       if (get_first_block(peer, i) > 0)
764         return i;
765     }
766   }
767   return -1;
768 }
769
770 /** @brief Send request messages to a peer that have unchoked us
771  *  @param peer peer
772  *  @param remote_peer peer data to the peer we want to send the request
773  */
774 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
775 {
776   remote_peer->current_piece = piece;
777   xbt_assert(remote_peer->bitfield, "bitfield not received");
778   xbt_assert(remote_peer->bitfield[piece] == '1');
779   int block_index = get_first_block(peer, piece);
780   if (block_index != -1) {
781     int block_length = PIECES_BLOCKS - block_index;
782     block_length = MIN(BLOCKS_REQUESTED, block_length);
783     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
784   }
785 }
786
787 /** Indicates if a piece is currently being downloaded by the peer. */
788 int in_current_pieces(peer_t peer, int piece)
789 {
790   return xbt_dynar_member(peer->current_pieces, &piece);
791 }
792
793 /***********************************************************
794  *
795  *  Low level message functions
796  *
797  ***********************************************************/
798
799 /** @brief Send a "interested" message to a peer
800  *  @param peer peer data
801  *  @param mailbox destination mailbox
802  */
803 void send_interested(peer_t peer, const char *mailbox)
804 {
805   msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id,
806                                      task_message_size(MESSAGE_INTERESTED));
807   MSG_task_dsend(task, mailbox, task_message_free);
808   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
809 }
810
811 /** @brief Send a "not interested" message to a peer
812  *  @param peer peer data
813  *  @param mailbox destination mailbox
814  */
815 void send_notinterested(peer_t peer, const char *mailbox)
816 {
817   msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id,
818                                      task_message_size(MESSAGE_NOTINTERESTED));
819   MSG_task_dsend(task, mailbox, task_message_free);
820   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
821 }
822
823 /** @brief Send a handshake message to all the peers the peer has.
824  *  @param peer peer data
825  */
826 void send_handshake_all(peer_t peer)
827 {
828   connection_t remote_peer;
829   xbt_dict_cursor_t cursor = NULL;
830   char *key;
831   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
832     msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
833                                        task_message_size(MESSAGE_HANDSHAKE));
834     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
835     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
836   }
837 }
838
839 /** @brief Send a "handshake" message to an user
840  *  @param peer peer data
841  *  @param mailbox mailbox where to we send the message
842  */
843 void send_handshake(peer_t peer, const char *mailbox)
844 {
845   msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
846                                      task_message_size(MESSAGE_HANDSHAKE));
847   MSG_task_dsend(task, mailbox, task_message_free);
848   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
849 }
850
851 /** Send a "choked" message to a peer. */
852 void send_choked(peer_t peer, const char *mailbox)
853 {
854   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
855   msg_task_t task = task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id,
856                     task_message_size(MESSAGE_CHOKE));
857   MSG_task_dsend(task, mailbox, task_message_free);
858 }
859
860 /** Send a "unchoked" message to a peer */
861 void send_unchoked(peer_t peer, const char *mailbox)
862 {
863   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
864   msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id,
865                                      task_message_size(MESSAGE_UNCHOKE));
866   MSG_task_dsend(task, mailbox, task_message_free);
867 }
868
869 /** Send a "HAVE" message to all peers we are connected to */
870 void send_have(peer_t peer, int piece)
871 {
872   XBT_DEBUG("Sending HAVE message to all my peers");
873   connection_t remote_peer;
874   xbt_dict_cursor_t cursor = NULL;
875   char *key;
876   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
877     msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, peer->id, piece,
878                                              task_message_size(MESSAGE_HAVE));
879     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
880   }
881 }
882
883 /** @brief Send a bitfield message to all the peers the peer has.
884  *  @param peer peer data
885  */
886 void send_bitfield(peer_t peer, const char *mailbox)
887 {
888   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
889   msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES);
890   MSG_task_dsend(task, mailbox, task_message_free);
891 }
892
893 /** Send a "request" message to a pair, containing a request for a piece */
894 void send_request(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
895 {
896   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, block_index, block_length);
897   msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
898   MSG_task_dsend(task, mailbox, task_message_free);
899 }
900
901 /** Send a "piece" message to a pair, containing a piece of the file */
902 void send_piece(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
903 {
904   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, mailbox);
905   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
906   xbt_assert((peer->bitfield[piece] == '1'), "Tried to send a piece that we doesn't have.");
907   msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length,
908                                            BLOCK_SIZE);
909   MSG_task_dsend(task, mailbox, task_message_free);
910 }