Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
please sonar
[simgrid.git] / examples / c / app-bittorrent / bittorrent-peer.c
1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "bittorrent-peer.h"
7 #include "tracker.h"
8 #include <simgrid/forward.h>
9
10 #include <limits.h>
11 #include <stdio.h> /* snprintf */
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
22
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
25
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 #ifndef MIN
30 #define MIN(a, b) ((a) < (b) ? (a) : (b))
31 #endif
32
33 static peer_t peer_init(int id, int seed)
34 {
35   peer_t peer = xbt_new(s_peer_t, 1);
36   peer->id    = id;
37
38   char mailbox_name[MAILBOX_SIZE];
39   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
40   peer->mailbox = sg_mailbox_by_name(mailbox_name);
41
42   peer->connected_peers = xbt_dict_new_homogeneous(NULL);
43   peer->active_peers    = xbt_dict_new_homogeneous(NULL);
44
45   if (seed) {
46     peer->bitfield        = (1U << FILE_PIECES) - 1U;
47     peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
48   } else {
49     peer->bitfield        = 0;
50     peer->bitfield_blocks = 0;
51   }
52
53   peer->current_pieces = 0;
54   peer->pieces_count   = xbt_new0(short, FILE_PIECES);
55   peer->comm_received  = NULL;
56   peer->round          = 0;
57
58   return peer;
59 }
60
61 static void peer_free(peer_t peer)
62 {
63   char* key;
64   connection_t connection;
65   xbt_dict_cursor_t cursor;
66   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
67     xbt_free(connection);
68
69   xbt_dict_free(&peer->connected_peers);
70   xbt_dict_free(&peer->active_peers);
71   xbt_free(peer->pieces_count);
72   xbt_free(peer);
73 }
74
75 /** Peer main function */
76 void peer(int argc, char* argv[])
77 {
78   // Check arguments
79   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
80
81   // Build peer object
82   peer_t peer = peer_init(xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
83
84   // Retrieve deadline
85   peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
86   xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
87
88   char* status = xbt_malloc0(FILE_PIECES + 1);
89   get_status(&status, peer->bitfield);
90
91   XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
92
93   // Getting peer data from the tracker.
94   if (get_peers_from_tracker(peer)) {
95     XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
96     peer->begin_receive_time = simgrid_get_clock();
97     sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
98
99     if (has_finished(peer->bitfield)) {
100       send_handshake_to_all_peers(peer);
101     } else {
102       leech(peer);
103     }
104     seed(peer);
105   } else {
106     XBT_INFO("Couldn't contact the tracker.");
107   }
108
109   get_status(&status, peer->bitfield);
110   XBT_INFO("Here is my current status: %s", status);
111   if (peer->comm_received) {
112     sg_comm_unref(peer->comm_received);
113   }
114
115   xbt_free(status);
116   peer_free(peer);
117 }
118
119 /** @brief Retrieves the peer list from the tracker */
120 int get_peers_from_tracker(const_peer_t peer)
121 {
122   sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
123
124   // Build the task to send to the tracker
125   tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
126
127   XBT_DEBUG("Sending a peer request to the tracker.");
128   sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
129   sg_error_t res    = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
130
131   if (res == SG_ERROR_TIMEOUT) {
132     XBT_DEBUG("Timeout expired when requesting peers to tracker");
133     xbt_free(peer_request);
134     return 0;
135   }
136
137   void* message           = NULL;
138   sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
139   res                     = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
140   if (res == SG_OK) {
141     const_tracker_answer_t ta = (const_tracker_answer_t)message;
142     // Add the peers the tracker gave us to our peer list.
143     unsigned i;
144     int peer_id;
145     // Add the peers the tracker gave us to our peer list.
146     xbt_dynar_foreach (ta->peers, i, peer_id) {
147       if (peer_id != peer->id)
148         xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
149     }
150     tracker_answer_free(message);
151   } else if (res == SG_ERROR_TIMEOUT) {
152     XBT_DEBUG("Timeout expired when requesting peers to tracker");
153     tracker_answer_free(message);
154     return 0;
155   }
156
157   return 1;
158 }
159
160 /** @brief Send a handshake message to all the peers the peer has. */
161 void send_handshake_to_all_peers(const_peer_t peer)
162 {
163   connection_t remote_peer;
164   xbt_dict_cursor_t cursor = NULL;
165   char* key;
166   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
167     message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
168     sg_comm_t comm      = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
169     sg_comm_detach(comm, NULL);
170     XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
171   }
172 }
173
174 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
175 {
176   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
177   XBT_DEBUG("Sending %s to %s", type_names[type], sg_mailbox_get_name(mailbox));
178   message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
179   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, size);
180   sg_comm_detach(comm, NULL);
181 }
182
183 /** @brief Send a bitfield message to all the peers the peer has */
184 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
185 {
186   XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
187   message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
188   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
189   sg_comm_detach(comm, NULL);
190 }
191
192 /** Send a "piece" message to a pair, containing a piece of the file */
193 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
194 {
195   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
196   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
197   xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
198   message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
199   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
200   sg_comm_detach(comm, NULL);
201 }
202
203 /** Send a "HAVE" message to all peers we are connected to */
204 void send_have_to_all_peers(const_peer_t peer, int piece)
205 {
206   XBT_DEBUG("Sending HAVE message to all my peers");
207   connection_t remote_peer;
208   xbt_dict_cursor_t cursor = NULL;
209   char* key;
210   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
211     message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
212     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
213     sg_comm_detach(comm, NULL);
214   }
215 }
216
217 /** @brief Send request messages to a peer that have unchoked us */
218 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
219 {
220   remote_peer->current_piece = piece;
221   xbt_assert(connection_has_piece(remote_peer, piece));
222   int block_index = get_first_missing_block_from(peer, piece);
223   if (block_index != -1) {
224     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
225     XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
226               block_index, block_length);
227     message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
228     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
229     sg_comm_detach(comm, NULL);
230   }
231 }
232
233 void get_status(char** status, unsigned int bitfield)
234 {
235   for (int i = FILE_PIECES - 1; i >= 0; i--)
236     (*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
237   (*status)[FILE_PIECES] = '\0';
238 }
239
240 int has_finished(unsigned int bitfield)
241 {
242   return bitfield == (1U << FILE_PIECES) - 1U;
243 }
244
245 /** Indicates if the remote peer has a piece not stored by the local peer */
246 int is_interested(const_peer_t peer, const_connection_t remote_peer)
247 {
248   return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
249 }
250
251 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
252 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
253 {
254   for (int i = 0; i < FILE_PIECES; i++)
255     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
256       return 1;
257   return 0;
258 }
259
260 /** @brief Updates the list of who has a piece from a bitfield */
261 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
262 {
263   for (int i = 0; i < FILE_PIECES; i++)
264     if (bitfield & (1U << i))
265       peer->pieces_count[i]++;
266 }
267
268 int count_pieces(unsigned int bitfield)
269 {
270   int count      = 0;
271   unsigned int n = bitfield;
272   while (n) {
273     count += n & 1U;
274     n >>= 1U;
275   }
276   return count;
277 }
278
279 int nb_interested_peers(const_peer_t peer)
280 {
281   xbt_dict_cursor_t cursor = NULL;
282   char* key;
283   connection_t connection;
284   int nb = 0;
285   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
286     if (connection->interested)
287       nb++;
288
289   return nb;
290 }
291
292 /** @brief Peer main loop when it is leeching. */
293 void leech(peer_t peer)
294 {
295   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
296   XBT_DEBUG("Start downloading.");
297
298   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
299   send_handshake_to_all_peers(peer);
300   XBT_DEBUG("Starting main leech loop");
301
302   void* data = NULL;
303   while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
304     if (peer->comm_received == NULL)
305       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
306
307     if (sg_comm_test(peer->comm_received)) {
308       peer->message = (message_t)data;
309       handle_message(peer, peer->message);
310       xbt_free(peer->message);
311       peer->comm_received = NULL;
312     } else {
313       // We don't execute the choke algorithm if we don't already have a piece
314       if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
315         update_choked_peers(peer);
316         next_choked_update += UPDATE_CHOKED_INTERVAL;
317       } else {
318         sg_actor_sleep_for(SLEEP_DURATION);
319       }
320     }
321   }
322   if (has_finished(peer->bitfield))
323     XBT_DEBUG("%d becomes a seeder", peer->id);
324 }
325
326 /** @brief Peer main loop when it is seeding */
327 void seed(peer_t peer)
328 {
329   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
330   XBT_DEBUG("Start seeding.");
331   // start the main seed loop
332   void* data = NULL;
333   while (simgrid_get_clock() < peer->deadline) {
334     if (peer->comm_received == NULL)
335       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
336
337     if (sg_comm_test(peer->comm_received)) {
338       peer->message = (message_t)data;
339       handle_message(peer, peer->message);
340       xbt_free(peer->message);
341       peer->comm_received = NULL;
342     } else {
343       if (simgrid_get_clock() >= next_choked_update) {
344         update_choked_peers(peer);
345         // TODO: Change the choked peer algorithm when seeding.
346         next_choked_update += UPDATE_CHOKED_INTERVAL;
347       } else {
348         sg_actor_sleep_for(SLEEP_DURATION);
349       }
350     }
351   }
352 }
353
354 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
355 {
356   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
357     // add in the active peers set
358     xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
359   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
360     xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
361   }
362 }
363
364 /** @brief Handle a received message sent by another peer */
365 void handle_message(peer_t peer, message_t message)
366 {
367   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
368                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
369   XBT_DEBUG("Received a %s message from %s", type_names[message->type], sg_mailbox_get_name(message->return_mailbox));
370
371   connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
372   xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
373              "The impossible did happened: A not-in-our-list peer sent us a message.");
374
375   switch (message->type) {
376     case MESSAGE_HANDSHAKE:
377       // Check if the peer is in our connection list.
378       if (remote_peer == 0) {
379         xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
380                          connection_new(message->peer_id));
381         send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
382       }
383       // Send our bitfield to the peer
384       send_bitfield(peer, message->return_mailbox);
385       break;
386     case MESSAGE_BITFIELD:
387       // Update the pieces list
388       update_pieces_count_from_bitfield(peer, message->bitfield);
389       // Store the bitfield
390       remote_peer->bitfield = message->bitfield;
391       xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
392       if (is_interested(peer, remote_peer)) {
393         remote_peer->am_interested = 1;
394         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
395       }
396       break;
397     case MESSAGE_INTERESTED:
398       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
399       // Update the interested state of the peer.
400       remote_peer->interested = 1;
401       update_active_peers_set(peer, remote_peer);
402       break;
403     case MESSAGE_NOTINTERESTED:
404       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
405       remote_peer->interested = 0;
406       update_active_peers_set(peer, remote_peer);
407       break;
408     case MESSAGE_UNCHOKE:
409       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
410       xbt_assert(remote_peer->choked_download);
411       remote_peer->choked_download = 0;
412       // Send requests to the peer, since it has unchoked us
413       if (remote_peer->am_interested)
414         request_new_piece_to_peer(peer, remote_peer);
415       break;
416     case MESSAGE_CHOKE:
417       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
418       xbt_assert(!remote_peer->choked_download);
419       remote_peer->choked_download = 1;
420       if (remote_peer->current_piece != -1)
421         remove_current_piece(peer, remote_peer, remote_peer->current_piece);
422       break;
423     case MESSAGE_HAVE:
424       XBT_DEBUG("\t for piece %d", message->piece);
425       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
426       remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
427       peer->pieces_count[message->piece]++;
428       // If the piece is in our pieces, we tell the peer that we are interested.
429       if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
430         remote_peer->am_interested = 1;
431         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
432         if (remote_peer->choked_download == 0)
433           request_new_piece_to_peer(peer, remote_peer);
434       }
435       break;
436     case MESSAGE_REQUEST:
437       xbt_assert(remote_peer->interested);
438       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
439       if (remote_peer->choked_upload == 0) {
440         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
441                   message->block_index + message->block_length);
442         if (!peer_has_not_piece(peer, message->piece)) {
443           send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
444         }
445       } else {
446         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
447       }
448       break;
449     case MESSAGE_PIECE:
450       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
451                 message->block_index + message->block_length);
452       xbt_assert(!remote_peer->choked_download);
453       xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
454       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
455       // TODO: Execute Ã  computation.
456       if (peer_has_not_piece(peer, message->piece)) {
457         update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
458         if (piece_complete(peer, message->piece)) {
459           // Removing the piece from our piece list
460           remove_current_piece(peer, remote_peer, message->piece);
461           // Setting the fact that we have the piece
462           peer->bitfield = peer->bitfield | (1U << message->piece);
463           char* status   = xbt_malloc0(FILE_PIECES + 1);
464           get_status(&status, peer->bitfield);
465           XBT_DEBUG("My status is now %s", status);
466           xbt_free(status);
467           // Sending the information to all the peers we are connected to
468           send_have_to_all_peers(peer, message->piece);
469           // sending UNINTERESTED to peers that do not have what we want.
470           update_interested_after_receive(peer);
471         } else {                                                   // piece not completed
472           send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
473         }
474       } else {
475         XBT_DEBUG("However, we already have it");
476         request_new_piece_to_peer(peer, remote_peer);
477       }
478       break;
479     case MESSAGE_CANCEL:
480       break;
481     default:
482       THROW_IMPOSSIBLE;
483   }
484   // Update the peer speed.
485   if (remote_peer) {
486     connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
487   }
488   peer->begin_receive_time = simgrid_get_clock();
489 }
490
491 /** Selects the appropriate piece to download and requests it to the remote_peer */
492 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
493 {
494   int piece = select_piece_to_download(peer, remote_peer);
495   if (piece != -1) {
496     peer->current_pieces |= (1U << (unsigned int)piece);
497     send_request_to_peer(peer, remote_peer, piece);
498   }
499 }
500
501 /** remove current_piece from the list of currently downloaded pieces. */
502 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
503 {
504   peer->current_pieces &= ~(1U << current_piece);
505   remote_peer->current_piece = -1;
506 }
507
508 /** @brief Return the piece to be downloaded
509  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
510  * If a piece is partially downloaded, this piece will be selected prioritarily
511  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
512  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
513  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
514  * @param peer: local peer
515  * @param remote_peer: information about the connection
516  * @return the piece to download if possible. -1 otherwise
517  */
518 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
519 {
520   int piece = partially_downloaded_piece(peer, remote_peer);
521   // strict priority policy
522   if (piece != -1)
523     return piece;
524
525   // end game mode
526   if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
527       (is_interested(peer, remote_peer) != 0)) {
528     int nb_interesting_pieces = 0;
529     // compute the number of interesting pieces
530     for (int i = 0; i < FILE_PIECES; i++) {
531       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
532         nb_interesting_pieces++;
533       }
534     }
535     xbt_assert(nb_interesting_pieces != 0);
536     // get a random interesting piece
537     int random_piece_index = rand() % nb_interesting_pieces;
538     int current_index      = 0;
539     for (int i = 0; i < FILE_PIECES; i++) {
540       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
541         if (random_piece_index == current_index) {
542           piece = i;
543           break;
544         }
545         current_index++;
546       }
547     }
548     xbt_assert(piece != -1);
549     return piece;
550   }
551   // Random first policy
552   if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
553     int nb_interesting_pieces = 0;
554     // compute the number of interesting pieces
555     for (int i = 0; i < FILE_PIECES; i++) {
556       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
557           peer_is_not_downloading_piece(peer, i)) {
558         nb_interesting_pieces++;
559       }
560     }
561     xbt_assert(nb_interesting_pieces != 0);
562     // get a random interesting piece
563     int random_piece_index = rand() % nb_interesting_pieces;
564     int current_index      = 0;
565     for (int i = 0; i < FILE_PIECES; i++) {
566       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
567           peer_is_not_downloading_piece(peer, i)) {
568         if (random_piece_index == current_index) {
569           piece = i;
570           break;
571         }
572         current_index++;
573       }
574     }
575     xbt_assert(piece != -1);
576     return piece;
577   } else { // Rarest first policy
578     short min         = SHRT_MAX;
579     int nb_min_pieces = 0;
580     int current_index = 0;
581     // compute the smallest number of copies of available pieces
582     for (int i = 0; i < FILE_PIECES; i++) {
583       if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
584           peer_is_not_downloading_piece(peer, i))
585         min = peer->pieces_count[i];
586     }
587     xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
588     // compute the number of rarest pieces
589     for (int i = 0; i < FILE_PIECES; i++) {
590       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
591           peer_is_not_downloading_piece(peer, i))
592         nb_min_pieces++;
593     }
594     xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
595     // get a random rarest piece
596     int random_rarest_index = 0;
597     if (nb_min_pieces > 0) {
598       random_rarest_index = rand() % nb_min_pieces;
599     }
600     for (int i = 0; i < FILE_PIECES; i++) {
601       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
602           peer_is_not_downloading_piece(peer, i)) {
603         if (random_rarest_index == current_index) {
604           piece = i;
605           break;
606         }
607         current_index++;
608       }
609     }
610     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
611     return piece;
612   }
613 }
614
615 /** Update the list of current choked and unchoked peers, using the choke algorithm */
616 void update_choked_peers(peer_t peer)
617 {
618   if (nb_interested_peers(peer) == 0)
619     return;
620   XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
621   // update the current round
622   peer->round = (peer->round + 1) % 3;
623   char* key;
624   char* choked_key         = NULL;
625   connection_t chosen_peer = NULL;
626   connection_t choked_peer = NULL;
627   // remove a peer from the list
628   xbt_dict_cursor_t cursor = NULL;
629   xbt_dict_cursor_first(peer->active_peers, &cursor);
630   if (!xbt_dict_is_empty(peer->active_peers)) {
631     choked_key  = xbt_dict_cursor_get_key(cursor);
632     choked_peer = xbt_dict_cursor_get_data(cursor);
633   }
634   xbt_dict_cursor_free(&cursor);
635
636   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
637   if (has_finished(peer->bitfield)) {
638     connection_t connection;
639     double unchoke_time = simgrid_get_clock() + 1;
640
641     xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
642       if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
643           (connection->choked_upload != 0)) {
644         unchoke_time = connection->last_unchoke;
645         chosen_peer  = connection;
646       }
647     }
648   } else {
649     // Random optimistic unchoking
650     if (peer->round == 0) {
651       int j = 0;
652       do {
653         // We choose a random peer to unchoke.
654         int id_chosen = 0;
655         if (xbt_dict_length(peer->connected_peers) > 0) {
656           id_chosen = rand() % xbt_dict_length(peer->connected_peers);
657         }
658         int i = 0;
659         connection_t connection;
660         xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
661           if (i == id_chosen) {
662             chosen_peer = connection;
663             break;
664           }
665           i++;
666         }
667         xbt_dict_cursor_free(&cursor);
668         xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
669         if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
670           chosen_peer = NULL;
671         else
672           XBT_DEBUG("Nothing to do, keep going");
673         j++;
674       } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
675     } else {
676       // Use the "fastest download" policy.
677       connection_t connection;
678       double fastest_speed = 0.0;
679       xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
680         if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
681             (connection->interested != 0)) {
682           chosen_peer   = connection;
683           fastest_speed = connection->peer_speed;
684         }
685       }
686     }
687   }
688
689   if (chosen_peer != NULL)
690     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
691               chosen_peer->interested, chosen_peer->choked_upload);
692
693   if (choked_peer != chosen_peer) {
694     if (choked_peer != NULL) {
695       xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
696       choked_peer->choked_upload = 1;
697       xbt_assert((*((int*)choked_key) == choked_peer->id));
698       update_active_peers_set(peer, choked_peer);
699       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
700       send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
701     }
702     if (chosen_peer != NULL) {
703       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
704       chosen_peer->choked_upload = 0;
705       xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
706       chosen_peer->last_unchoke = simgrid_get_clock();
707       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
708       update_active_peers_set(peer, chosen_peer);
709       send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
710     }
711   }
712 }
713
714 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
715 void update_interested_after_receive(const_peer_t peer)
716 {
717   char* key;
718   xbt_dict_cursor_t cursor;
719   connection_t connection;
720   xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
721     if (connection->am_interested != 0) {
722       int interested = 0;
723       // Check if the peer still has a piece we want.
724       for (int i = 0; i < FILE_PIECES; i++) {
725         if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
726           interested = 1;
727           break;
728         }
729       }
730       if (!interested) { // no more piece to download from connection
731         connection->am_interested = 0;
732         send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
733       }
734     }
735   }
736 }
737
738 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
739 {
740   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
741   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
742   for (int i = block_index; i < (block_index + block_length); i++) {
743     peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
744   }
745 }
746
747 /** Returns if a peer has completed the download of a piece */
748 int piece_complete(const_peer_t peer, int index)
749 {
750   for (int i = 0; i < PIECES_BLOCKS; i++) {
751     if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
752       return 0;
753     }
754   }
755   return 1;
756 }
757
758 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
759 int get_first_missing_block_from(const_peer_t peer, int piece)
760 {
761   for (int i = 0; i < PIECES_BLOCKS; i++) {
762     if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
763       return i;
764     }
765   }
766   return -1;
767 }
768
769 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
770 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
771 {
772   for (int i = 0; i < FILE_PIECES; i++) {
773     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
774         get_first_missing_block_from(peer, i) > 0)
775       return i;
776   }
777   return -1;
778 }
779
780 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
781 {
782   return !(peer->bitfield & 1U << piece);
783 }
784
785 /** Check that a piece is not currently being download by the peer. */
786 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
787 {
788   return !(peer->current_pieces & 1U << piece);
789 }
790
791 /***************** Connection internal functions ***********************/
792 connection_t connection_new(int id)
793 {
794   connection_t connection = xbt_new(s_connection_t, 1);
795   char mailbox_name[MAILBOX_SIZE];
796   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
797   connection->id              = id;
798   connection->mailbox         = sg_mailbox_by_name(mailbox_name);
799   connection->bitfield        = 0;
800   connection->peer_speed      = 0;
801   connection->last_unchoke    = 0;
802   connection->current_piece   = -1;
803   connection->am_interested   = 0;
804   connection->interested      = 0;
805   connection->choked_upload   = 1;
806   connection->choked_download = 1;
807
808   return connection;
809 }
810
811 void connection_add_speed_value(connection_t connection, double speed)
812 {
813   connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
814 }
815
816 int connection_has_piece(const_connection_t connection, unsigned int piece)
817 {
818   return (connection->bitfield & 1U << piece);
819 }
820
821 /***************** Messages creation functions ***********************/
822 /** @brief Build a new empty message */
823 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
824 {
825   message_t message       = xbt_new(s_message_t, 1);
826   message->peer_id        = peer_id;
827   message->return_mailbox = return_mailbox;
828   message->type           = type;
829   return message;
830 }
831
832 /** Builds a message containing an index. */
833 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
834 {
835   message_t message = message_new(type, peer_id, return_mailbox);
836   message->piece    = index;
837   return message;
838 }
839
840 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
841 {
842   message_t message = message_new(type, peer_id, return_mailbox);
843   message->bitfield = bitfield;
844   return message;
845 }
846
847 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
848 {
849   message_t message     = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
850   message->block_index  = block_index;
851   message->block_length = block_length;
852   return message;
853 }
854
855 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
856 {
857   message_t message     = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
858   message->block_index  = block_index;
859   message->block_length = block_length;
860   return message;
861 }