Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cleanups
[simgrid.git] / examples / c / app-bittorrent / bittorrent-peer.c
1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "bittorrent-peer.h"
7 #include "tracker.h"
8 #include <simgrid/forward.h>
9
10 #include <limits.h>
11 #include <stdio.h> /* snprintf */
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
22
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
25
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 #ifndef MIN
30 #define MIN(a, b) ((a) < (b) ? (a) : (b))
31 #endif
32
33 static peer_t peer_init(int id, int seed)
34 {
35   peer_t peer = xbt_new(s_peer_t, 1);
36   peer->id    = id;
37
38   char mailbox_name[MAILBOX_SIZE];
39   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
40   peer->mailbox = sg_mailbox_by_name(mailbox_name);
41
42   peer->connected_peers = xbt_dict_new_homogeneous(NULL);
43   peer->active_peers    = xbt_dict_new_homogeneous(NULL);
44
45   if (seed) {
46     peer->bitfield        = (1U << FILE_PIECES) - 1U;
47     peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
48   } else {
49     peer->bitfield        = 0;
50     peer->bitfield_blocks = 0;
51   }
52
53   peer->current_pieces = 0;
54   peer->pieces_count   = xbt_new0(short, FILE_PIECES);
55   peer->comm_received  = NULL;
56   peer->round          = 0;
57
58   return peer;
59 }
60
61 static void peer_free(peer_t peer)
62 {
63   char* key;
64   connection_t connection;
65   xbt_dict_cursor_t cursor;
66   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
67     xbt_free(connection);
68
69   xbt_dict_free(&peer->connected_peers);
70   xbt_dict_free(&peer->active_peers);
71   xbt_free(peer->pieces_count);
72   xbt_free(peer);
73 }
74
75 /** Peer main function */
76 void peer(int argc, char* argv[])
77 {
78   // Check arguments
79   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
80
81   // Build peer object
82   peer_t peer = peer_init(xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
83
84   // Retrieve deadline
85   peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
86   xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
87
88   char* status = xbt_malloc0(FILE_PIECES + 1);
89   get_status(&status, peer->bitfield);
90
91   XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
92
93   // Getting peer data from the tracker.
94   if (get_peers_from_tracker(peer)) {
95     XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
96     peer->begin_receive_time = simgrid_get_clock();
97     sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
98
99     if (has_finished(peer->bitfield)) {
100       send_handshake_to_all_peers(peer);
101     } else {
102       leech(peer);
103     }
104     seed(peer);
105   } else {
106     XBT_INFO("Couldn't contact the tracker.");
107   }
108
109   get_status(&status, peer->bitfield);
110   XBT_INFO("Here is my current status: %s", status);
111   if (peer->comm_received) {
112     sg_comm_unref(peer->comm_received);
113   }
114
115   xbt_free(status);
116   peer_free(peer);
117 }
118
119 /** @brief Retrieves the peer list from the tracker */
120 int get_peers_from_tracker(const_peer_t peer)
121 {
122   sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
123
124   // Build the task to send to the tracker
125   tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
126
127   XBT_DEBUG("Sending a peer request to the tracker.");
128   sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
129   sg_error_t res    = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
130
131   if (res == SG_ERROR_TIMEOUT) {
132     XBT_DEBUG("Timeout expired when requesting peers to tracker");
133     xbt_free(peer_request);
134     return 0;
135   }
136
137   void* message           = NULL;
138   tracker_answer_t ta     = NULL;
139   sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
140   res                     = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
141   if (res == SG_OK) {
142     ta = (tracker_answer_t)message;
143     // Add the peers the tracker gave us to our peer list.
144     unsigned i;
145     int peer_id;
146     // Add the peers the tracker gave us to our peer list.
147     xbt_dynar_foreach (ta->peers, i, peer_id) {
148       if (peer_id != peer->id)
149         xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
150     }
151     tracker_answer_free(message);
152   } else if (res == SG_ERROR_TIMEOUT) {
153     XBT_DEBUG("Timeout expired when requesting peers to tracker");
154     tracker_answer_free(message);
155     return 0;
156   }
157
158   return 1;
159 }
160
161 /** @brief Send a handshake message to all the peers the peer has. */
162 void send_handshake_to_all_peers(const_peer_t peer)
163 {
164   connection_t remote_peer;
165   xbt_dict_cursor_t cursor = NULL;
166   char* key;
167   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
168     message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
169     sg_comm_t comm      = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
170     sg_comm_detach(comm, NULL);
171     XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
172   }
173 }
174
175 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
176 {
177   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
178   XBT_DEBUG("Sending %s to %s", type_names[type], sg_mailbox_get_name(mailbox));
179   message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
180   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, size);
181   sg_comm_detach(comm, NULL);
182 }
183
184 /** @brief Send a bitfield message to all the peers the peer has */
185 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
186 {
187   XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
188   message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
189   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
190   sg_comm_detach(comm, NULL);
191 }
192
193 /** Send a "piece" message to a pair, containing a piece of the file */
194 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
195 {
196   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
197   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
198   xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
199   message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
200   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
201   sg_comm_detach(comm, NULL);
202 }
203
204 /** Send a "HAVE" message to all peers we are connected to */
205 void send_have_to_all_peers(const_peer_t peer, int piece)
206 {
207   XBT_DEBUG("Sending HAVE message to all my peers");
208   connection_t remote_peer;
209   xbt_dict_cursor_t cursor = NULL;
210   char* key;
211   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
212     message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
213     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
214     sg_comm_detach(comm, NULL);
215   }
216 }
217
218 /** @brief Send request messages to a peer that have unchoked us */
219 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
220 {
221   remote_peer->current_piece = piece;
222   xbt_assert(connection_has_piece(remote_peer, piece));
223   int block_index = get_first_missing_block_from(peer, piece);
224   if (block_index != -1) {
225     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
226     XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
227               block_index, block_length);
228     message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
229     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
230     sg_comm_detach(comm, NULL);
231   }
232 }
233
234 void get_status(char** status, unsigned int bitfield)
235 {
236   for (int i = FILE_PIECES - 1; i >= 0; i--)
237     (*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
238   (*status)[FILE_PIECES] = '\0';
239 }
240
241 int has_finished(unsigned int bitfield)
242 {
243   return bitfield == (1U << FILE_PIECES) - 1U;
244 }
245
246 /** Indicates if the remote peer has a piece not stored by the local peer */
247 int is_interested(const_peer_t peer, const_connection_t remote_peer)
248 {
249   return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
250 }
251
252 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
253 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
254 {
255   for (int i = 0; i < FILE_PIECES; i++)
256     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
257       return 1;
258   return 0;
259 }
260
261 /** @brief Updates the list of who has a piece from a bitfield */
262 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
263 {
264   for (int i = 0; i < FILE_PIECES; i++)
265     if (bitfield & (1U << i))
266       peer->pieces_count[i]++;
267 }
268
269 int count_pieces(unsigned int bitfield)
270 {
271   int count      = 0;
272   unsigned int n = bitfield;
273   while (n) {
274     count += n & 1U;
275     n >>= 1U;
276   }
277   return count;
278 }
279
280 int nb_interested_peers(const_peer_t peer)
281 {
282   xbt_dict_cursor_t cursor = NULL;
283   char* key;
284   connection_t connection;
285   int nb = 0;
286   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
287     if (connection->interested)
288       nb++;
289
290   return nb;
291 }
292
293 /** @brief Peer main loop when it is leeching. */
294 void leech(peer_t peer)
295 {
296   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
297   XBT_DEBUG("Start downloading.");
298
299   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
300   send_handshake_to_all_peers(peer);
301   XBT_DEBUG("Starting main leech loop");
302
303   void* data = NULL;
304   while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
305     if (peer->comm_received == NULL)
306       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
307
308     if (sg_comm_test(peer->comm_received)) {
309       peer->message = (message_t)data;
310       handle_message(peer, peer->message);
311       xbt_free(peer->message);
312       peer->comm_received = NULL;
313     } else {
314       // We don't execute the choke algorithm if we don't already have a piece
315       if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
316         update_choked_peers(peer);
317         next_choked_update += UPDATE_CHOKED_INTERVAL;
318       } else {
319         sg_actor_sleep_for(SLEEP_DURATION);
320       }
321     }
322   }
323   if (has_finished(peer->bitfield))
324     XBT_DEBUG("%d becomes a seeder", peer->id);
325 }
326
327 /** @brief Peer main loop when it is seeding */
328 void seed(peer_t peer)
329 {
330   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
331   XBT_DEBUG("Start seeding.");
332   // start the main seed loop
333   void* data = NULL;
334   while (simgrid_get_clock() < peer->deadline) {
335     if (peer->comm_received == NULL)
336       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
337
338     if (sg_comm_test(peer->comm_received)) {
339       peer->message = (message_t)data;
340       handle_message(peer, peer->message);
341       xbt_free(peer->message);
342       peer->comm_received = NULL;
343     } else {
344       if (simgrid_get_clock() >= next_choked_update) {
345         update_choked_peers(peer);
346         // TODO: Change the choked peer algorithm when seeding.
347         next_choked_update += UPDATE_CHOKED_INTERVAL;
348       } else {
349         sg_actor_sleep_for(SLEEP_DURATION);
350       }
351     }
352   }
353 }
354
355 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
356 {
357   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
358     // add in the active peers set
359     xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
360   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
361     xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
362   }
363 }
364
365 /** @brief Handle a received message sent by another peer */
366 void handle_message(peer_t peer, message_t message)
367 {
368   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
369                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
370   XBT_DEBUG("Received a %s message from %s", type_names[message->type], sg_mailbox_get_name(message->return_mailbox));
371
372   connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
373   xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
374              "The impossible did happened: A not-in-our-list peer sent us a message.");
375
376   switch (message->type) {
377     case MESSAGE_HANDSHAKE:
378       // Check if the peer is in our connection list.
379       if (remote_peer == 0) {
380         xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
381                          connection_new(message->peer_id));
382         send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
383       }
384       // Send our bitfield to the peer
385       send_bitfield(peer, message->return_mailbox);
386       break;
387     case MESSAGE_BITFIELD:
388       // Update the pieces list
389       update_pieces_count_from_bitfield(peer, message->bitfield);
390       // Store the bitfield
391       remote_peer->bitfield = message->bitfield;
392       xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
393       if (is_interested(peer, remote_peer)) {
394         remote_peer->am_interested = 1;
395         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
396       }
397       break;
398     case MESSAGE_INTERESTED:
399       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
400       // Update the interested state of the peer.
401       remote_peer->interested = 1;
402       update_active_peers_set(peer, remote_peer);
403       break;
404     case MESSAGE_NOTINTERESTED:
405       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
406       remote_peer->interested = 0;
407       update_active_peers_set(peer, remote_peer);
408       break;
409     case MESSAGE_UNCHOKE:
410       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
411       xbt_assert(remote_peer->choked_download);
412       remote_peer->choked_download = 0;
413       // Send requests to the peer, since it has unchoked us
414       if (remote_peer->am_interested)
415         request_new_piece_to_peer(peer, remote_peer);
416       break;
417     case MESSAGE_CHOKE:
418       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
419       xbt_assert(!remote_peer->choked_download);
420       remote_peer->choked_download = 1;
421       if (remote_peer->current_piece != -1)
422         remove_current_piece(peer, remote_peer, remote_peer->current_piece);
423       break;
424     case MESSAGE_HAVE:
425       XBT_DEBUG("\t for piece %d", message->piece);
426       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
427       remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
428       peer->pieces_count[message->piece]++;
429       // If the piece is in our pieces, we tell the peer that we are interested.
430       if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
431         remote_peer->am_interested = 1;
432         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
433         if (remote_peer->choked_download == 0)
434           request_new_piece_to_peer(peer, remote_peer);
435       }
436       break;
437     case MESSAGE_REQUEST:
438       xbt_assert(remote_peer->interested);
439       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
440       if (remote_peer->choked_upload == 0) {
441         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
442                   message->block_index + message->block_length);
443         if (!peer_has_not_piece(peer, message->piece)) {
444           send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
445         }
446       } else {
447         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
448       }
449       break;
450     case MESSAGE_PIECE:
451       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
452                 message->block_index + message->block_length);
453       xbt_assert(!remote_peer->choked_download);
454       xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
455       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
456       // TODO: Execute Ã  computation.
457       if (peer_has_not_piece(peer, message->piece)) {
458         update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
459         if (piece_complete(peer, message->piece)) {
460           // Removing the piece from our piece list
461           remove_current_piece(peer, remote_peer, message->piece);
462           // Setting the fact that we have the piece
463           peer->bitfield = peer->bitfield | (1U << message->piece);
464           char* status   = xbt_malloc0(FILE_PIECES + 1);
465           get_status(&status, peer->bitfield);
466           XBT_DEBUG("My status is now %s", status);
467           xbt_free(status);
468           // Sending the information to all the peers we are connected to
469           send_have_to_all_peers(peer, message->piece);
470           // sending UNINTERESTED to peers that do not have what we want.
471           update_interested_after_receive(peer);
472         } else {                                                   // piece not completed
473           send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
474         }
475       } else {
476         XBT_DEBUG("However, we already have it");
477         request_new_piece_to_peer(peer, remote_peer);
478       }
479       break;
480     case MESSAGE_CANCEL:
481       break;
482     default:
483       THROW_IMPOSSIBLE;
484   }
485   // Update the peer speed.
486   if (remote_peer) {
487     connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
488   }
489   peer->begin_receive_time = simgrid_get_clock();
490 }
491
492 /** Selects the appropriate piece to download and requests it to the remote_peer */
493 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
494 {
495   int piece = select_piece_to_download(peer, remote_peer);
496   if (piece != -1) {
497     peer->current_pieces |= (1U << (unsigned int)piece);
498     send_request_to_peer(peer, remote_peer, piece);
499   }
500 }
501
502 /** remove current_piece from the list of currently downloaded pieces. */
503 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
504 {
505   peer->current_pieces &= ~(1U << current_piece);
506   remote_peer->current_piece = -1;
507 }
508
509 /** @brief Return the piece to be downloaded
510  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
511  * If a piece is partially downloaded, this piece will be selected prioritarily
512  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
513  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
514  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
515  * @param peer: local peer
516  * @param remote_peer: information about the connection
517  * @return the piece to download if possible. -1 otherwise
518  */
519 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
520 {
521   int piece = partially_downloaded_piece(peer, remote_peer);
522   // strict priority policy
523   if (piece != -1)
524     return piece;
525
526   // end game mode
527   if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
528       (is_interested(peer, remote_peer) != 0)) {
529     int nb_interesting_pieces = 0;
530     // compute the number of interesting pieces
531     for (int i = 0; i < FILE_PIECES; i++) {
532       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
533         nb_interesting_pieces++;
534       }
535     }
536     xbt_assert(nb_interesting_pieces != 0);
537     // get a random interesting piece
538     int random_piece_index = rand() % nb_interesting_pieces;
539     int current_index      = 0;
540     for (int i = 0; i < FILE_PIECES; i++) {
541       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
542         if (random_piece_index == current_index) {
543           piece = i;
544           break;
545         }
546         current_index++;
547       }
548     }
549     xbt_assert(piece != -1);
550     return piece;
551   }
552   // Random first policy
553   if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
554     int nb_interesting_pieces = 0;
555     // compute the number of interesting pieces
556     for (int i = 0; i < FILE_PIECES; i++) {
557       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
558           peer_is_not_downloading_piece(peer, i)) {
559         nb_interesting_pieces++;
560       }
561     }
562     xbt_assert(nb_interesting_pieces != 0);
563     // get a random interesting piece
564     int random_piece_index = rand() % nb_interesting_pieces;
565     int current_index      = 0;
566     for (int i = 0; i < FILE_PIECES; i++) {
567       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
568           peer_is_not_downloading_piece(peer, i)) {
569         if (random_piece_index == current_index) {
570           piece = i;
571           break;
572         }
573         current_index++;
574       }
575     }
576     xbt_assert(piece != -1);
577     return piece;
578   } else { // Rarest first policy
579     short min         = SHRT_MAX;
580     int nb_min_pieces = 0;
581     int current_index = 0;
582     // compute the smallest number of copies of available pieces
583     for (int i = 0; i < FILE_PIECES; i++) {
584       if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
585           peer_is_not_downloading_piece(peer, i))
586         min = peer->pieces_count[i];
587     }
588     xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
589     // compute the number of rarest pieces
590     for (int i = 0; i < FILE_PIECES; i++) {
591       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
592           peer_is_not_downloading_piece(peer, i))
593         nb_min_pieces++;
594     }
595     xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
596     // get a random rarest piece
597     int random_rarest_index = 0;
598     if (nb_min_pieces > 0) {
599       random_rarest_index = rand() % nb_min_pieces;
600     }
601     for (int i = 0; i < FILE_PIECES; i++) {
602       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
603           peer_is_not_downloading_piece(peer, i)) {
604         if (random_rarest_index == current_index) {
605           piece = i;
606           break;
607         }
608         current_index++;
609       }
610     }
611     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
612     return piece;
613   }
614 }
615
616 /** Update the list of current choked and unchoked peers, using the choke algorithm */
617 void update_choked_peers(peer_t peer)
618 {
619   if (nb_interested_peers(peer) == 0)
620     return;
621   XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
622   // update the current round
623   peer->round = (peer->round + 1) % 3;
624   char* key;
625   char* choked_key         = NULL;
626   connection_t chosen_peer = NULL;
627   connection_t choked_peer = NULL;
628   // remove a peer from the list
629   xbt_dict_cursor_t cursor = NULL;
630   xbt_dict_cursor_first(peer->active_peers, &cursor);
631   if (!xbt_dict_is_empty(peer->active_peers)) {
632     choked_key  = xbt_dict_cursor_get_key(cursor);
633     choked_peer = xbt_dict_cursor_get_data(cursor);
634   }
635   xbt_dict_cursor_free(&cursor);
636
637   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
638   if (has_finished(peer->bitfield)) {
639     connection_t connection;
640     double unchoke_time = simgrid_get_clock() + 1;
641
642     xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
643       if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
644           (connection->choked_upload != 0)) {
645         unchoke_time = connection->last_unchoke;
646         chosen_peer  = connection;
647       }
648     }
649   } else {
650     // Random optimistic unchoking
651     if (peer->round == 0) {
652       int j = 0;
653       do {
654         // We choose a random peer to unchoke.
655         int id_chosen = 0;
656         if (xbt_dict_length(peer->connected_peers) > 0) {
657           id_chosen = rand() % xbt_dict_length(peer->connected_peers);
658         }
659         int i = 0;
660         connection_t connection;
661         xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
662           if (i == id_chosen) {
663             chosen_peer = connection;
664             break;
665           }
666           i++;
667         }
668         xbt_dict_cursor_free(&cursor);
669         xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
670         if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
671           chosen_peer = NULL;
672         else
673           XBT_DEBUG("Nothing to do, keep going");
674         j++;
675       } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
676     } else {
677       // Use the "fastest download" policy.
678       connection_t connection;
679       double fastest_speed = 0.0;
680       xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
681         if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
682             (connection->interested != 0)) {
683           chosen_peer   = connection;
684           fastest_speed = connection->peer_speed;
685         }
686       }
687     }
688   }
689
690   if (chosen_peer != NULL)
691     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
692               chosen_peer->interested, chosen_peer->choked_upload);
693
694   if (choked_peer != chosen_peer) {
695     if (choked_peer != NULL) {
696       xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
697       choked_peer->choked_upload = 1;
698       xbt_assert((*((int*)choked_key) == choked_peer->id));
699       update_active_peers_set(peer, choked_peer);
700       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
701       send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
702     }
703     if (chosen_peer != NULL) {
704       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
705       chosen_peer->choked_upload = 0;
706       xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
707       chosen_peer->last_unchoke = simgrid_get_clock();
708       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
709       update_active_peers_set(peer, chosen_peer);
710       send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
711     }
712   }
713 }
714
715 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
716 void update_interested_after_receive(const_peer_t peer)
717 {
718   char* key;
719   xbt_dict_cursor_t cursor;
720   connection_t connection;
721   xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
722     if (connection->am_interested != 0) {
723       int interested = 0;
724       // Check if the peer still has a piece we want.
725       for (int i = 0; i < FILE_PIECES; i++) {
726         if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
727           interested = 1;
728           break;
729         }
730       }
731       if (!interested) { // no more piece to download from connection
732         connection->am_interested = 0;
733         send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
734       }
735     }
736   }
737 }
738
739 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
740 {
741   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
742   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
743   for (int i = block_index; i < (block_index + block_length); i++) {
744     peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
745   }
746 }
747
748 /** Returns if a peer has completed the download of a piece */
749 int piece_complete(const_peer_t peer, int index)
750 {
751   for (int i = 0; i < PIECES_BLOCKS; i++) {
752     if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
753       return 0;
754     }
755   }
756   return 1;
757 }
758
759 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
760 int get_first_missing_block_from(const_peer_t peer, int piece)
761 {
762   for (int i = 0; i < PIECES_BLOCKS; i++) {
763     if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
764       return i;
765     }
766   }
767   return -1;
768 }
769
770 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
771 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
772 {
773   for (int i = 0; i < FILE_PIECES; i++) {
774     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
775         get_first_missing_block_from(peer, i) > 0)
776       return i;
777   }
778   return -1;
779 }
780
781 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
782 {
783   return !(peer->bitfield & 1U << piece);
784 }
785
786 /** Check that a piece is not currently being download by the peer. */
787 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
788 {
789   return !(peer->current_pieces & 1U << piece);
790 }
791
792 /***************** Connection internal functions ***********************/
793 connection_t connection_new(int id)
794 {
795   connection_t connection = xbt_new(s_connection_t, 1);
796   char mailbox_name[MAILBOX_SIZE];
797   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
798   connection->id              = id;
799   connection->mailbox         = sg_mailbox_by_name(mailbox_name);
800   connection->bitfield        = 0;
801   connection->peer_speed      = 0;
802   connection->last_unchoke    = 0;
803   connection->current_piece   = -1;
804   connection->am_interested   = 0;
805   connection->interested      = 0;
806   connection->choked_upload   = 1;
807   connection->choked_download = 1;
808
809   return connection;
810 }
811
812 void connection_add_speed_value(connection_t connection, double speed)
813 {
814   connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
815 }
816
817 int connection_has_piece(const_connection_t connection, unsigned int piece)
818 {
819   return (connection->bitfield & 1U << piece);
820 }
821
822 /***************** Messages creation functions ***********************/
823 /** @brief Build a new empty message */
824 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
825 {
826   message_t message       = xbt_new(s_message_t, 1);
827   message->peer_id        = peer_id;
828   message->return_mailbox = return_mailbox;
829   message->type           = type;
830   return message;
831 }
832
833 /** Builds a message containing an index. */
834 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
835 {
836   message_t message = message_new(type, peer_id, return_mailbox);
837   message->piece    = index;
838   return message;
839 }
840
841 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
842 {
843   message_t message = message_new(type, peer_id, return_mailbox);
844   message->bitfield = bitfield;
845   return message;
846 }
847
848 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
849 {
850   message_t message     = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
851   message->block_index  = block_index;
852   message->block_length = block_length;
853   return message;
854 }
855
856 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
857 {
858   message_t message     = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
859   message->block_index  = block_index;
860   message->block_length = block_length;
861   return message;
862 }