Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
backport
[simgrid.git] / examples / msg / app-bittorrent / peer.c
1 /* Copyright (c) 2012-2016. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <simgrid/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26 #define ENABLE_END_GAME_MODE 1
27
28 /** Number of blocks asked by each request */
29 #define BLOCKS_REQUESTED 2
30
31 static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
32
33 #define SLEEP_DURATION 1
34
35 /** Peer main function */
36 int peer(int argc, char *argv[])
37 {
38   s_peer_t peer;
39   //Check arguments
40   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
41   //Build peer object
42   if (argc == 4) {
43     peer_init(&peer, xbt_str_parse_int(argv[1],"Invalid ID: %s"), 1);
44   } else {
45     peer_init(&peer, xbt_str_parse_int(argv[1],"Invalid ID: %s"), 0);
46   }
47   //Retrieve deadline
48   double deadline = xbt_str_parse_double(argv[2],"Invalid deadline: %s");
49   xbt_assert(deadline > 0, "Wrong deadline supplied");
50   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
51   //Getting peer data from the tracker.
52   if (get_peers_data(&peer)) {
53     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
54     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
55     peer.begin_receive_time = MSG_get_clock();
56     MSG_mailbox_set_async(peer.mailbox);
57     if (has_finished(peer.bitfield)) {
58       peer.pieces = FILE_PIECES;
59       send_handshake_all(&peer);
60       seed_loop(&peer, deadline);
61     } else {
62       leech_loop(&peer, deadline);
63       seed_loop(&peer, deadline);
64     }
65   } else {
66     XBT_INFO("Couldn't contact the tracker.");
67   }
68
69   XBT_INFO("Here is my current status: %s", peer.bitfield);
70   if (peer.comm_received) {
71     MSG_comm_destroy(peer.comm_received);
72   }
73
74   peer_free(&peer);
75   return 0;
76 }
77
78 /** @brief Peer main loop when it is leeching.
79  *  @param peer peer data
80  *  @param deadline time at which the peer has to leave
81  */
82 void leech_loop(peer_t peer, double deadline)
83 {
84   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
85   XBT_DEBUG("Start downloading.");
86   /*
87    * Send a "handshake" message to all the peers it got
88    * (since it couldn't have gotten more than 50 peers)
89    */
90   send_handshake_all(peer);
91   XBT_DEBUG("Starting main leech loop");
92
93   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
94     if (peer->comm_received == NULL) {
95       peer->task_received = NULL;
96       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
97     }
98     if (MSG_comm_test(peer->comm_received)) {
99       msg_error_t status = MSG_comm_get_status(peer->comm_received);
100       MSG_comm_destroy(peer->comm_received);
101       peer->comm_received = NULL;
102       if (status == MSG_OK) {
103         handle_message(peer, peer->task_received);
104       }
105     } else {
106       //We don't execute the choke algorithm if we don't already have a piece
107       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
108         update_choked_peers(peer);
109         next_choked_update += UPDATE_CHOKED_INTERVAL;
110       } else {
111         MSG_process_sleep(SLEEP_DURATION);
112       }
113     }
114   }
115   if (peer->pieces == FILE_PIECES)
116     XBT_DEBUG("%d becomes a seeder", peer->id);
117
118 }
119
120 /** @brief Peer main loop when it is seeding
121  *  @param peer peer data
122  *  @param deadline time when the peer will leave
123  */
124 void seed_loop(peer_t peer, double deadline)
125 {
126   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
127   XBT_DEBUG("Start seeding.");
128   //start the main seed loop
129   while (MSG_get_clock() < deadline) {
130     if (peer->comm_received == NULL) {
131       peer->task_received = NULL;
132       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
133     }
134     if (MSG_comm_test(peer->comm_received)) {
135       msg_error_t status = MSG_comm_get_status(peer->comm_received);
136       MSG_comm_destroy(peer->comm_received);
137       peer->comm_received = NULL;
138       if (status == MSG_OK) {
139         handle_message(peer, peer->task_received);
140       }
141     } else {
142       if (MSG_get_clock() >= next_choked_update) {
143         update_choked_peers(peer);
144         //TODO: Change the choked peer algorithm when seeding.
145         next_choked_update += UPDATE_CHOKED_INTERVAL;
146       } else {
147         MSG_process_sleep(SLEEP_DURATION);
148       }
149     }
150   }
151 }
152
153 /** @brief Retrieves the peer list from the tracker
154  *  @param peer current peer data
155  */
156 int get_peers_data(peer_t peer)
157 {
158   int success = 0;
159   int send_success = 0;
160   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
161   //Build the task to send to the tracker
162   tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker,
163                                                    peer->id, 0, 0, FILE_SIZE);
164   //Build the task to send.
165   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
166   msg_task_t task_received = NULL;
167   msg_comm_t comm_received;
168   while ((send_success == 0) && MSG_get_clock() < timeout) {
169     XBT_DEBUG("Sending a peer request to the tracker.");
170     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, GET_PEERS_TIMEOUT);
171     if (status == MSG_OK) {
172       send_success = 1;
173     }
174   }
175   while ((success ==0) && MSG_get_clock() < timeout) {
176     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
177     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
178     if (status == MSG_OK) {
179       tracker_task_data_t data = MSG_task_get_data(task_received);
180       unsigned i;
181       int peer_id;
182       //Add the peers the tracker gave us to our peer list.
183       xbt_dynar_foreach(data->peers, i, peer_id) {
184         if (peer_id != peer->id)
185           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int), connection_new(peer_id), NULL);
186       }
187       success = 1;
188       //free the communication and the task
189       MSG_comm_destroy(comm_received);
190       tracker_task_data_free(data);
191       MSG_task_destroy(task_received);
192     }
193   }
194
195   return success;
196 }
197
198 /** @brief Initialize the peer data.
199  *  @param peer peer data
200  *  @param id id of the peer to take in the network
201  *  @param seed indicates if the peer is a seed.
202  */
203 void peer_init(peer_t peer, int id, int seed)
204 {
205   peer->id = id;
206   snprintf(peer->mailbox,MAILBOX_SIZE-1, "%d", id);
207   snprintf(peer->mailbox_tracker,MAILBOX_SIZE-1, "tracker_%d", id);
208   peer->peers        = xbt_dict_new_homogeneous(NULL);
209   peer->active_peers = xbt_dict_new_homogeneous(NULL);
210   peer->hostname = MSG_host_get_name(MSG_host_self());
211
212   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
213   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
214   if (seed) {
215     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
216     memset(peer->bitfield_blocks, '1', sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
217   } else {
218     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
219     memset(peer->bitfield_blocks, '0', sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
220   }
221
222   peer->bitfield[FILE_PIECES] = '\0';
223   peer->pieces = 0;
224
225   peer->pieces_count = xbt_new0(short, FILE_PIECES);
226
227   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
228
229   peer->stream        = (RngStream)MSG_host_get_data(MSG_host_self());
230   peer->comm_received = NULL;
231
232   peer->round = 0;
233 }
234
235 /** Destroys a poor peer object. */
236 void peer_free(peer_t peer)
237 {
238   char *key;
239   connection_t connection;
240   xbt_dict_cursor_t cursor;
241   xbt_dict_foreach(peer->peers, cursor, key, connection) {
242     connection_free(connection);
243   }
244   xbt_dict_free(&peer->peers);
245   xbt_dict_free(&peer->active_peers);
246   xbt_dynar_free(&peer->current_pieces);
247   xbt_free(peer->pieces_count);
248   xbt_free(peer->bitfield);
249   xbt_free(peer->bitfield_blocks);
250 }
251
252 /** @brief Returns if a peer has finished downloading the file
253  *  @param bitfield peer bitfield
254  */
255 int has_finished(char *bitfield)
256 {
257   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
258 }
259
260 int nb_interested_peers(peer_t peer)
261 {
262   xbt_dict_cursor_t cursor = NULL;
263   char *key;
264   connection_t connection;
265   int nb = 0;
266   xbt_dict_foreach(peer->peers, cursor, key, connection) {
267     if (connection->interested)
268       nb++;
269   }
270   return nb;
271 }
272
273 void update_active_peers_set(peer_t peer, connection_t remote_peer)
274 {
275   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
276     //add in the active peers set
277     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int), remote_peer, NULL);
278   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int))) {
279     xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int));
280   }
281 }
282
283 /** @brief Handle a received message sent by another peer
284  * @param peer Peer data
285  * @param task task received.
286  */
287 void handle_message(peer_t peer, msg_task_t task)
288 {
289   message_t message = MSG_task_get_data(task);
290   connection_t remote_peer;
291   remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id, sizeof(int));
292   switch (message->type) {
293   case MESSAGE_HANDSHAKE:
294     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox, message->issuer_host_name);
295     //Check if the peer is in our connection list.
296     if (remote_peer == 0) {
297       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
298       send_handshake(peer, message->mailbox);
299     }
300     //Send our bitfield to the peer
301     send_bitfield(peer, message->mailbox);
302     break;
303   case MESSAGE_BITFIELD:
304     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox, message->issuer_host_name);
305     //Update the pieces list
306     update_pieces_count_from_bitfield(peer, message->bitfield);
307     //Store the bitfield
308     remote_peer->bitfield = xbt_strdup(message->bitfield);
309     xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
310     if (is_interested(peer, remote_peer)) {
311       remote_peer->am_interested = 1;
312       send_interested(peer, message->mailbox);
313     }
314     break;
315   case MESSAGE_INTERESTED:
316     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
317     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
318     //Update the interested state of the peer.
319     remote_peer->interested = 1;
320     update_active_peers_set(peer, remote_peer);
321     break;
322   case MESSAGE_NOTINTERESTED:
323     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
324     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
325     remote_peer->interested = 0;
326     update_active_peers_set(peer, remote_peer);
327     break;
328   case MESSAGE_UNCHOKE:
329     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
330     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox, message->issuer_host_name);
331     xbt_assert(remote_peer->choked_download);
332     remote_peer->choked_download = 0;
333     //Send requests to the peer, since it has unchoked us
334     if (remote_peer->am_interested)
335       request_new_piece_to_peer(peer, remote_peer);
336     break;
337   case MESSAGE_CHOKE:
338     xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
339     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox, message->issuer_host_name);
340     xbt_assert(!remote_peer->choked_download);
341     remote_peer->choked_download = 1;
342     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
343     break;
344   case MESSAGE_HAVE:
345     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d", message->mailbox, message->issuer_host_name,
346               message->index);
347     xbt_assert(remote_peer->bitfield, "bitfield not received");
348     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
349     remote_peer->bitfield[message->index] = '1';
350     peer->pieces_count[message->index]++;
351     //If the piece is in our pieces, we tell the peer that we are interested.
352     if ((remote_peer->am_interested == 0) &&
353          peer->bitfield[message->index] == '0') {
354       remote_peer->am_interested = 1;
355       send_interested(peer, message->mailbox);
356       if (remote_peer->choked_download == 0)
357         request_new_piece_to_peer(peer, remote_peer);
358     }
359     break;
360   case MESSAGE_REQUEST:
361     xbt_assert(remote_peer->interested);
362
363     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
364     if (remote_peer->choked_upload == 0) {
365       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)", message->mailbox, message->issuer_host_name,
366                 message->index, message->block_index, message->block_index + message->block_length);
367       if (peer->bitfield[message->index] == '1') {
368         send_piece(peer, message->mailbox, message->index, message->block_index, message->block_length);
369       }
370     } else {
371       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.", message->mailbox,
372                 message->issuer_host_name, message->peer_id);
373     }
374     break;
375   case MESSAGE_PIECE:
376     XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index, message->block_index,
377               message->block_index + message->block_length, message->mailbox, message->issuer_host_name);
378     xbt_assert(!remote_peer->choked_download);
379     xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
380                "Can't received a piece if I'm not interested wihtout end-game mode!"
381                "piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
382     xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
383     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
384     //TODO: Execute à computation.
385       if (peer->bitfield[message->index] == '0') {
386         update_bitfield_blocks(peer, message->index, message->block_index, message->block_length);
387         if (piece_complete(peer, message->index)) {
388           //Removing the piece from our piece list
389           remove_current_piece(peer, remote_peer, message->index);
390           //Setting the fact that we have the piece
391           peer->bitfield[message->index] = '1';
392           peer->pieces++;
393           XBT_DEBUG("My status is now %s", peer->bitfield);
394           //Sending the information to all the peers we are connected to
395           send_have(peer, message->index);
396           //sending UNINTERSTED to peers that doesn't have what we want.
397           update_interested_after_receive(peer);
398         } else {                // piece not completed
399           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
400         }
401       } else {
402         XBT_DEBUG("However, we already have it");
403         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
404         request_new_piece_to_peer(peer, remote_peer);
405       }
406     break;
407   case MESSAGE_CANCEL:
408     XBT_DEBUG("The received CANCEL from %s (%s)", message->mailbox, message->issuer_host_name);
409     break;
410   default:
411     THROW_IMPOSSIBLE;
412   }
413   //Update the peer speed.
414   if (remote_peer) {
415     connection_add_speed_value(remote_peer, 1.0 / (MSG_get_clock() - peer->begin_receive_time));
416   }
417   peer->begin_receive_time = MSG_get_clock();
418
419   task_message_free(task);
420 }
421
422 /** Selects the appropriate piece to download and requests it to the remote_peer */
423 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
424 {
425   int piece = select_piece_to_download(peer, remote_peer);
426   if (piece != -1) {
427     xbt_dynar_push_as(peer->current_pieces, int, piece);
428     send_request_to_peer(peer, remote_peer, piece);
429   }
430 }
431
432 /** remove current_piece from the list of currently downloaded pieces. */
433 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece)
434 {
435   int piece_index = xbt_dynar_search_or_negative(peer->current_pieces, &current_piece);
436   if (piece_index != -1)
437     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
438   remote_peer->current_piece = -1;
439 }
440
441 /** @brief Updates the list of who has a piece from a bitfield
442  *  @param peer peer we want to update the list
443  *  @param bitfield bitfield
444  */
445 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
446 {
447   int i;
448   for (i = 0; i < FILE_PIECES; i++) {
449     if (bitfield[i] == '1') {
450       peer->pieces_count[i]++;
451     }
452   }
453 }
454
455 /** @brief Return the piece to be downloaded
456  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
457  * If a piece is partially downloaded, this piece will be selected prioritarily
458  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
459  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
460  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
461  * @param peer: local peer
462  * @param remote_peer: information about the connection
463  * @return the piece to download if possible. -1 otherwise
464  */
465 int select_piece_to_download(peer_t peer, connection_t remote_peer)
466 {
467   int piece = partially_downloaded_piece(peer, remote_peer);
468   // strict priority policy
469   if (piece != -1)
470     return piece;
471
472   // end game mode
473   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) &&
474       (is_interested(peer, remote_peer) != 0)) {
475 #if ENABLE_END_GAME_MODE == 0
476       return -1;
477 #endif
478     int i;
479     int nb_interesting_pieces = 0;
480     int current_index = 0;
481     // compute the number of interesting pieces
482     for (i = 0; i < FILE_PIECES; i++) {
483       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
484         nb_interesting_pieces++;
485       }
486     }
487     xbt_assert(nb_interesting_pieces != 0);
488     // get a random interesting piece
489     int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
490     for (i = 0; i < FILE_PIECES; i++) {
491       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
492         if (random_piece_index == current_index) {
493           piece = i;
494           break;
495         }
496         current_index++;
497       }
498     }
499     xbt_assert(piece != -1);
500     return piece;
501   }
502   // Random first policy
503   if (peer->pieces < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
504     int i;
505     int nb_interesting_pieces = 0;
506     int current_index = 0;
507     // compute the number of interesting pieces
508     for (i = 0; i < FILE_PIECES; i++) {
509       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
510           (in_current_pieces(peer, i) == 0)) {
511         nb_interesting_pieces++;
512       }
513     }
514     xbt_assert(nb_interesting_pieces != 0);
515     // get a random interesting piece
516     int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
517     for (i = 0; i < FILE_PIECES; i++) {
518       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
519           (in_current_pieces(peer, i) == 0)) {
520         if (random_piece_index == current_index) {
521           piece = i;
522           break;
523         }
524         current_index++;
525       }
526     }
527     xbt_assert(piece != -1);
528     return piece;
529   } else {                      // Rarest first policy
530     int i;
531     short min = SHRT_MAX;
532     int nb_min_pieces = 0;
533     int current_index = 0;
534     // compute the smallest number of copies of available pieces
535     for (i = 0; i < FILE_PIECES; i++) {
536       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0' &&
537           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) == 0))
538         min = peer->pieces_count[i];
539     }
540     xbt_assert(min != SHRT_MAX ||
541                (is_interested_and_free(peer, remote_peer) ==0));
542     // compute the number of rarest pieces
543     for (i = 0; i < FILE_PIECES; i++) {
544       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
545           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) ==0))
546         nb_min_pieces++;
547     }
548     xbt_assert(nb_min_pieces != 0 ||
549                (is_interested_and_free(peer, remote_peer)==0));
550     // get a random rarest piece
551     int random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
552     for (i = 0; i < FILE_PIECES; i++) {
553       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
554           remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i)==0)) {
555         if (random_rarest_index == current_index) {
556           piece = i;
557           break;
558         }
559         current_index++;
560       }
561     }
562     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
563     return piece;
564   }
565 }
566
567 /** @brief Update the list of current choked and unchoked peers, using the choke algorithm
568  *  @param peer the current peer
569  */
570 void update_choked_peers(peer_t peer)
571 {
572   if (nb_interested_peers(peer) == 0)
573     return;
574   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id, xbt_dict_size(peer->active_peers));
575   //update the current round
576   peer->round = (peer->round + 1) % 3;
577   char *key;
578   char *key_choked=NULL;
579   connection_t peer_choosed = NULL;
580   connection_t peer_choked = NULL;
581   //remove a peer from the list
582   xbt_dict_cursor_t cursor = NULL;
583   xbt_dict_cursor_first(peer->active_peers, &cursor);
584   if (xbt_dict_length(peer->active_peers) > 0) {
585     key_choked = xbt_dict_cursor_get_key(cursor);
586     peer_choked = xbt_dict_cursor_get_data(cursor);
587   }
588   xbt_dict_cursor_free(&cursor);
589
590   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
591   if (peer->pieces == FILE_PIECES) {
592     connection_t connection;
593     double unchoke_time = MSG_get_clock() + 1;
594
595     xbt_dict_foreach(peer->peers, cursor, key, connection) {
596       if (connection->last_unchoke < unchoke_time &&
597           (connection->interested != 0) && (connection->choked_upload != 0)) {
598         unchoke_time = connection->last_unchoke;
599         peer_choosed = connection;
600       }
601     }
602   } else {
603     //Random optimistic unchoking
604     if (peer->round == 0) {
605       int j = 0;
606       do {
607         //We choose a random peer to unchoke.
608         int id_chosen = RngStream_RandInt(peer->stream, 0, xbt_dict_length(peer->peers) - 1);
609         int i = 0;
610         connection_t connection;
611         xbt_dict_foreach(peer->peers, cursor, key, connection) {
612           if (i == id_chosen) {
613             peer_choosed = connection;
614             break;
615           }
616           i++;
617         }
618         xbt_dict_cursor_free(&cursor);
619         if (peer_choosed == NULL)
620           THROWF(unknown_error, 0, "A peer should have be selected at this point");
621         else if ((peer_choosed->interested == 0) || (peer_choosed->choked_upload == 0))
622           peer_choosed = NULL;
623         else
624           XBT_DEBUG("Nothing to do, keep going");
625         j++;
626       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
627     } else {
628       //Use the "fastest download" policy.
629       connection_t connection;
630       double fastest_speed = 0.0;
631       xbt_dict_foreach(peer->peers, cursor, key, connection) {
632         if (connection->peer_speed > fastest_speed &&
633             (connection->choked_upload != 0) && (connection->interested != 0)) {
634           peer_choosed = connection;
635           fastest_speed = connection->peer_speed;
636         }
637       }
638     }
639   }
640
641   if (peer_choosed != NULL)
642     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
643               peer->id, peer_choosed->id, peer_choosed->interested, peer_choosed->choked_upload);
644
645   if (peer_choked != peer_choosed) {
646     if (peer_choked != NULL) {
647       xbt_assert((!peer_choked->choked_upload), "Tries to choked a choked peer");
648       peer_choked->choked_upload = 1;
649       xbt_assert((*((int *) key_choked) == peer_choked->id));
650       update_active_peers_set(peer, peer_choked);
651       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
652       send_choked(peer, peer_choked->mailbox);
653     }
654     if (peer_choosed != NULL) {
655       xbt_assert((peer_choosed->choked_upload), "Tries to unchoked an unchoked peer");
656       peer_choosed->choked_upload = 0;
657       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id, sizeof(int), peer_choosed, NULL);
658       peer_choosed->last_unchoke = MSG_get_clock();
659       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
660       update_active_peers_set(peer, peer_choosed);
661       send_unchoked(peer, peer_choosed->mailbox);
662     }
663   }
664 }
665
666 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.
667  *  @param peer our peer data
668  */
669 void update_interested_after_receive(peer_t peer)
670 {
671   char *key;
672   xbt_dict_cursor_t cursor;
673   connection_t connection;
674   int interested;
675   xbt_dict_foreach(peer->peers, cursor, key, connection) {
676     interested = 0;
677     if (connection->am_interested != 0) {
678       xbt_assert(connection->bitfield, "Bitfield not received");
679       //Check if the peer still has a piece we want.
680       int i;
681       for (i = 0; i < FILE_PIECES; i++) {
682         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
683           interested = 1;
684           break;
685         }
686       }
687       if (!interested) {        //no more piece to download from connection
688         connection->am_interested = 0;
689         send_notinterested(peer, connection->mailbox);
690       }
691     }
692   }
693 }
694
695 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
696 {
697   int i;
698   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
699   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
700   for (i = block_index; i < (block_index + block_length); i++) {
701     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
702   }
703 }
704
705 /** Returns if a peer has completed the download of a piece */
706 int piece_complete(peer_t peer, int index)
707 {
708   int i;
709   for (i = 0; i < PIECES_BLOCKS; i++) {
710     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
711       return 0;
712     }
713   }
714   return 1;
715 }
716
717 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
718 int get_first_block(peer_t peer, int piece)
719 {
720   int i;
721   for (i = 0; i < PIECES_BLOCKS; i++) {
722     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
723       return i;
724     }
725   }
726   return -1;
727 }
728
729 /** Indicates if the remote peer has a piece not stored by the local peer */
730 int is_interested(peer_t peer, connection_t remote_peer)
731 {
732   xbt_assert(remote_peer->bitfield, "Bitfield not received");
733   for (int i = 0; i < FILE_PIECES; i++) {
734     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
735       return 1;
736     }
737   }
738   return 0;
739 }
740
741 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
742 int is_interested_and_free(peer_t peer, connection_t remote_peer)
743 {
744   xbt_assert(remote_peer->bitfield, "Bitfield not received");
745   for (int i = 0; i < FILE_PIECES; i++) {
746     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
747         (in_current_pieces(peer, i) == 0)) {
748       return 1;
749     }
750   }
751   return 0;
752 }
753
754 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
755 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
756 {
757   xbt_assert(remote_peer->bitfield, "Bitfield not received");
758   for (int i = 0; i < FILE_PIECES; i++) {
759     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
760         (in_current_pieces(peer, i) == 0)) {
761       if (get_first_block(peer, i) > 0)
762         return i;
763     }
764   }
765   return -1;
766 }
767
768 /** @brief Send request messages to a peer that have unchoked us
769  *  @param peer peer
770  *  @param remote_peer peer data to the peer we want to send the request
771  */
772 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
773 {
774   remote_peer->current_piece = piece;
775   xbt_assert(remote_peer->bitfield, "bitfield not received");
776   xbt_assert(remote_peer->bitfield[piece] == '1');
777   int block_index = get_first_block(peer, piece);
778   if (block_index != -1) {
779     int block_length = PIECES_BLOCKS - block_index;
780     block_length = MIN(BLOCKS_REQUESTED, block_length);
781     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
782   }
783 }
784
785 /** Indicates if a piece is currently being downloaded by the peer. */
786 int in_current_pieces(peer_t peer, int piece)
787 {
788   return xbt_dynar_member(peer->current_pieces, &piece);
789 }
790
791 /***********************************************************
792  *
793  *  Low level message functions
794  *
795  ***********************************************************/
796
797 /** @brief Send a "interested" message to a peer
798  *  @param peer peer data
799  *  @param mailbox destination mailbox
800  */
801 void send_interested(peer_t peer, const char *mailbox)
802 {
803   msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id,
804                                      task_message_size(MESSAGE_INTERESTED));
805   MSG_task_dsend(task, mailbox, task_message_free);
806   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
807 }
808
809 /** @brief Send a "not interested" message to a peer
810  *  @param peer peer data
811  *  @param mailbox destination mailbox
812  */
813 void send_notinterested(peer_t peer, const char *mailbox)
814 {
815   msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id,
816                                      task_message_size(MESSAGE_NOTINTERESTED));
817   MSG_task_dsend(task, mailbox, task_message_free);
818   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
819 }
820
821 /** @brief Send a handshake message to all the peers the peer has.
822  *  @param peer peer data
823  */
824 void send_handshake_all(peer_t peer)
825 {
826   connection_t remote_peer;
827   xbt_dict_cursor_t cursor = NULL;
828   char *key;
829   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
830     msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
831                                        task_message_size(MESSAGE_HANDSHAKE));
832     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
833     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
834   }
835 }
836
837 /** @brief Send a "handshake" message to an user
838  *  @param peer peer data
839  *  @param mailbox mailbox where to we send the message
840  */
841 void send_handshake(peer_t peer, const char *mailbox)
842 {
843   msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
844                                      task_message_size(MESSAGE_HANDSHAKE));
845   MSG_task_dsend(task, mailbox, task_message_free);
846   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
847 }
848
849 /** Send a "choked" message to a peer. */
850 void send_choked(peer_t peer, const char *mailbox)
851 {
852   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
853   msg_task_t task = task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id,
854                     task_message_size(MESSAGE_CHOKE));
855   MSG_task_dsend(task, mailbox, task_message_free);
856 }
857
858 /** Send a "unchoked" message to a peer */
859 void send_unchoked(peer_t peer, const char *mailbox)
860 {
861   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
862   msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id,
863                                      task_message_size(MESSAGE_UNCHOKE));
864   MSG_task_dsend(task, mailbox, task_message_free);
865 }
866
867 /** Send a "HAVE" message to all peers we are connected to */
868 void send_have(peer_t peer, int piece)
869 {
870   XBT_DEBUG("Sending HAVE message to all my peers");
871   connection_t remote_peer;
872   xbt_dict_cursor_t cursor = NULL;
873   char *key;
874   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
875     msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, peer->id, piece,
876                                              task_message_size(MESSAGE_HAVE));
877     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
878   }
879 }
880
881 /** @brief Send a bitfield message to all the peers the peer has.
882  *  @param peer peer data
883  */
884 void send_bitfield(peer_t peer, const char *mailbox)
885 {
886   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
887   msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES);
888   MSG_task_dsend(task, mailbox, task_message_free);
889 }
890
891 /** Send a "request" message to a pair, containing a request for a piece */
892 void send_request(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
893 {
894   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, block_index, block_length);
895   msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
896   MSG_task_dsend(task, mailbox, task_message_free);
897 }
898
899 /** Send a "piece" message to a pair, containing a piece of the file */
900 void send_piece(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
901 {
902   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, mailbox);
903   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
904   xbt_assert((peer->bitfield[piece] == '1'), "Tried to send a piece that we doesn't have.");
905   msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length,
906                                            BLOCK_SIZE);
907   MSG_task_dsend(task, mailbox, task_message_free);
908 }