Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines.
[simgrid.git] / examples / c / app-bittorrent / bittorrent-peer.c
1 /* Copyright (c) 2012-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "bittorrent-peer.h"
7 #include "tracker.h"
8 #include <simgrid/forward.h>
9
10 #include <limits.h>
11 #include <stdio.h> /* snprintf */
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
22
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
25
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 const char* const message_type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
30                                             "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
31
32 #ifndef MIN
33 #define MIN(a, b) ((a) < (b) ? (a) : (b))
34 #endif
35
36 static peer_t peer_init(int id, int seed)
37 {
38   peer_t peer = xbt_new(s_peer_t, 1);
39   peer->id    = id;
40
41   char mailbox_name[MAILBOX_SIZE];
42   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
43   peer->mailbox = sg_mailbox_by_name(mailbox_name);
44
45   peer->connected_peers = xbt_dict_new_homogeneous(NULL);
46   peer->active_peers    = xbt_dict_new_homogeneous(NULL);
47
48   if (seed) {
49     peer->bitfield        = (1U << FILE_PIECES) - 1U;
50     peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
51   } else {
52     peer->bitfield        = 0;
53     peer->bitfield_blocks = 0;
54   }
55
56   peer->current_pieces = 0;
57   peer->pieces_count   = xbt_new0(short, FILE_PIECES);
58   peer->comm_received  = NULL;
59   peer->round          = 0;
60
61   return peer;
62 }
63
64 static void peer_free(peer_t peer)
65 {
66   char* key;
67   connection_t connection;
68   xbt_dict_cursor_t cursor;
69   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
70     xbt_free(connection);
71
72   xbt_dict_free(&peer->connected_peers);
73   xbt_dict_free(&peer->active_peers);
74   xbt_free(peer->pieces_count);
75   xbt_free(peer);
76 }
77
78 /** Peer main function */
79 void peer(int argc, char* argv[])
80 {
81   // Check arguments
82   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
83
84   // Build peer object
85   peer_t peer = peer_init((int)xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
86
87   // Retrieve deadline
88   peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
89   xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
90
91   char* status = xbt_malloc0(FILE_PIECES + 1);
92   get_status(&status, peer->bitfield);
93
94   XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
95
96   // Getting peer data from the tracker.
97   if (get_peers_from_tracker(peer)) {
98     XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
99     peer->begin_receive_time = simgrid_get_clock();
100     sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
101
102     if (has_finished(peer->bitfield)) {
103       send_handshake_to_all_peers(peer);
104     } else {
105       leech(peer);
106     }
107     seed(peer);
108   } else {
109     XBT_INFO("Couldn't contact the tracker.");
110   }
111
112   get_status(&status, peer->bitfield);
113   XBT_INFO("Here is my current status: %s", status);
114   if (peer->comm_received) {
115     sg_comm_unref(peer->comm_received);
116   }
117
118   xbt_free(status);
119   peer_free(peer);
120 }
121
122 /** @brief Retrieves the peer list from the tracker */
123 int get_peers_from_tracker(const_peer_t peer)
124 {
125   sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
126
127   // Build the task to send to the tracker
128   tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
129
130   XBT_DEBUG("Sending a peer request to the tracker.");
131   sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
132   sg_error_t res    = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
133
134   if (res == SG_ERROR_TIMEOUT) {
135     XBT_DEBUG("Timeout expired when requesting peers to tracker");
136     xbt_free(peer_request);
137     return 0;
138   }
139
140   void* message           = NULL;
141   sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
142   res                     = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
143   if (res == SG_OK) {
144     const_tracker_answer_t ta = (const_tracker_answer_t)message;
145     // Add the peers the tracker gave us to our peer list.
146     unsigned i;
147     int peer_id;
148     // Add the peers the tracker gave us to our peer list.
149     xbt_dynar_foreach (ta->peers, i, peer_id) {
150       if (peer_id != peer->id)
151         xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
152     }
153     tracker_answer_free(message);
154   } else if (res == SG_ERROR_TIMEOUT) {
155     XBT_DEBUG("Timeout expired when requesting peers to tracker");
156     tracker_answer_free(message);
157     return 0;
158   }
159
160   return 1;
161 }
162
163 /** @brief Send a handshake message to all the peers the peer has. */
164 void send_handshake_to_all_peers(const_peer_t peer)
165 {
166   connection_t remote_peer;
167   xbt_dict_cursor_t cursor = NULL;
168   char* key;
169   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
170     message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
171     sg_comm_t comm      = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
172     sg_comm_detach(comm, NULL);
173     XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
174   }
175 }
176
177 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
178 {
179   XBT_DEBUG("Sending %s to %s", message_type_names[type], sg_mailbox_get_name(mailbox));
180   message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
181   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, size);
182   sg_comm_detach(comm, NULL);
183 }
184
185 /** @brief Send a bitfield message to all the peers the peer has */
186 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
187 {
188   XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
189   message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
190   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
191   sg_comm_detach(comm, NULL);
192 }
193
194 /** Send a "piece" message to a pair, containing a piece of the file */
195 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
196 {
197   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
198   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
199   xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
200   message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
201   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
202   sg_comm_detach(comm, NULL);
203 }
204
205 /** Send a "HAVE" message to all peers we are connected to */
206 void send_have_to_all_peers(const_peer_t peer, int piece)
207 {
208   XBT_DEBUG("Sending HAVE message to all my peers");
209   connection_t remote_peer;
210   xbt_dict_cursor_t cursor = NULL;
211   char* key;
212   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
213     message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
214     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
215     sg_comm_detach(comm, NULL);
216   }
217 }
218
219 /** @brief Send request messages to a peer that have unchoked us */
220 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
221 {
222   remote_peer->current_piece = piece;
223   xbt_assert(connection_has_piece(remote_peer, piece));
224   int block_index = get_first_missing_block_from(peer, piece);
225   if (block_index != -1) {
226     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
227     XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
228               block_index, block_length);
229     message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
230     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
231     sg_comm_detach(comm, NULL);
232   }
233 }
234
235 void get_status(char** status, unsigned int bitfield)
236 {
237   for (int i = FILE_PIECES - 1; i >= 0; i--)
238     (*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
239   (*status)[FILE_PIECES] = '\0';
240 }
241
242 int has_finished(unsigned int bitfield)
243 {
244   return bitfield == (1U << FILE_PIECES) - 1U;
245 }
246
247 /** Indicates if the remote peer has a piece not stored by the local peer */
248 int is_interested(const_peer_t peer, const_connection_t remote_peer)
249 {
250   return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
251 }
252
253 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
254 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
255 {
256   for (int i = 0; i < FILE_PIECES; i++)
257     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
258       return 1;
259   return 0;
260 }
261
262 /** @brief Updates the list of who has a piece from a bitfield */
263 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
264 {
265   for (int i = 0; i < FILE_PIECES; i++)
266     if (bitfield & (1U << i))
267       peer->pieces_count[i]++;
268 }
269
270 int count_pieces(unsigned int bitfield)
271 {
272   int count      = 0;
273   unsigned int n = bitfield;
274   while (n) {
275     count += n & 1U;
276     n >>= 1U;
277   }
278   return count;
279 }
280
281 int nb_interested_peers(const_peer_t peer)
282 {
283   xbt_dict_cursor_t cursor = NULL;
284   char* key;
285   connection_t connection;
286   int nb = 0;
287   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
288     if (connection->interested)
289       nb++;
290
291   return nb;
292 }
293
294 /** @brief Peer main loop when it is leeching. */
295 void leech(peer_t peer)
296 {
297   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
298   XBT_DEBUG("Start downloading.");
299
300   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
301   send_handshake_to_all_peers(peer);
302   XBT_DEBUG("Starting main leech loop");
303
304   void* data = NULL;
305   while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
306     if (peer->comm_received == NULL)
307       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
308
309     if (sg_comm_test(peer->comm_received)) {
310       peer->message = (message_t)data;
311       handle_message(peer, peer->message);
312       xbt_free(peer->message);
313       peer->comm_received = NULL;
314     } else {
315       // We don't execute the choke algorithm if we don't already have a piece
316       if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
317         update_choked_peers(peer);
318         next_choked_update += UPDATE_CHOKED_INTERVAL;
319       } else {
320         sg_actor_sleep_for(SLEEP_DURATION);
321       }
322     }
323   }
324   if (has_finished(peer->bitfield))
325     XBT_DEBUG("%d becomes a seeder", peer->id);
326 }
327
328 /** @brief Peer main loop when it is seeding */
329 void seed(peer_t peer)
330 {
331   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
332   XBT_DEBUG("Start seeding.");
333   // start the main seed loop
334   void* data = NULL;
335   while (simgrid_get_clock() < peer->deadline) {
336     if (peer->comm_received == NULL)
337       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
338
339     if (sg_comm_test(peer->comm_received)) {
340       peer->message = (message_t)data;
341       handle_message(peer, peer->message);
342       xbt_free(peer->message);
343       peer->comm_received = NULL;
344     } else {
345       if (simgrid_get_clock() >= next_choked_update) {
346         update_choked_peers(peer);
347         // TODO: Change the choked peer algorithm when seeding.
348         next_choked_update += UPDATE_CHOKED_INTERVAL;
349       } else {
350         sg_actor_sleep_for(SLEEP_DURATION);
351       }
352     }
353   }
354 }
355
356 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
357 {
358   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
359     // add in the active peers set
360     xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
361   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
362     xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
363   }
364 }
365
366 /** @brief Handle a received message sent by another peer */
367 void handle_message(peer_t peer, message_t message)
368 {
369   XBT_DEBUG("Received a %s message from %s", message_type_names[message->type],
370             sg_mailbox_get_name(message->return_mailbox));
371
372   connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
373   xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
374              "The impossible did happened: A not-in-our-list peer sent us a message.");
375
376   switch (message->type) {
377     case MESSAGE_HANDSHAKE:
378       // Check if the peer is in our connection list.
379       if (remote_peer == 0) {
380         xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
381                          connection_new(message->peer_id));
382         send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
383       }
384       // Send our bitfield to the peer
385       send_bitfield(peer, message->return_mailbox);
386       break;
387     case MESSAGE_BITFIELD:
388       // Update the pieces list
389       update_pieces_count_from_bitfield(peer, message->bitfield);
390       // Store the bitfield
391       remote_peer->bitfield = message->bitfield;
392       xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
393       if (is_interested(peer, remote_peer)) {
394         remote_peer->am_interested = 1;
395         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
396       }
397       break;
398     case MESSAGE_INTERESTED:
399       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
400       // Update the interested state of the peer.
401       remote_peer->interested = 1;
402       update_active_peers_set(peer, remote_peer);
403       break;
404     case MESSAGE_NOTINTERESTED:
405       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
406       remote_peer->interested = 0;
407       update_active_peers_set(peer, remote_peer);
408       break;
409     case MESSAGE_UNCHOKE:
410       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
411       xbt_assert(remote_peer->choked_download);
412       remote_peer->choked_download = 0;
413       // Send requests to the peer, since it has unchoked us
414       if (remote_peer->am_interested)
415         request_new_piece_to_peer(peer, remote_peer);
416       break;
417     case MESSAGE_CHOKE:
418       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
419       xbt_assert(!remote_peer->choked_download);
420       remote_peer->choked_download = 1;
421       if (remote_peer->current_piece != -1)
422         remove_current_piece(peer, remote_peer, remote_peer->current_piece);
423       break;
424     case MESSAGE_HAVE:
425       XBT_DEBUG("\t for piece %d", message->piece);
426       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
427       remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
428       peer->pieces_count[message->piece]++;
429       // If the piece is in our pieces, we tell the peer that we are interested.
430       if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
431         remote_peer->am_interested = 1;
432         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
433         if (remote_peer->choked_download == 0)
434           request_new_piece_to_peer(peer, remote_peer);
435       }
436       break;
437     case MESSAGE_REQUEST:
438       xbt_assert(remote_peer->interested);
439       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
440       if (remote_peer->choked_upload == 0) {
441         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
442                   message->block_index + message->block_length);
443         if (!peer_has_not_piece(peer, message->piece)) {
444           send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
445         }
446       } else {
447         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
448       }
449       break;
450     case MESSAGE_PIECE:
451       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
452                 message->block_index + message->block_length);
453       xbt_assert(!remote_peer->choked_download);
454       xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
455       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
456       // TODO: Execute Ã  computation.
457       if (peer_has_not_piece(peer, message->piece)) {
458         update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
459         if (piece_complete(peer, message->piece)) {
460           // Removing the piece from our piece list
461           remove_current_piece(peer, remote_peer, message->piece);
462           // Setting the fact that we have the piece
463           peer->bitfield = peer->bitfield | (1U << message->piece);
464           char* status   = xbt_malloc0(FILE_PIECES + 1);
465           get_status(&status, peer->bitfield);
466           XBT_DEBUG("My status is now %s", status);
467           xbt_free(status);
468           // Sending the information to all the peers we are connected to
469           send_have_to_all_peers(peer, message->piece);
470           // sending UNINTERESTED to peers that do not have what we want.
471           update_interested_after_receive(peer);
472         } else {                                                   // piece not completed
473           send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
474         }
475       } else {
476         XBT_DEBUG("However, we already have it");
477         request_new_piece_to_peer(peer, remote_peer);
478       }
479       break;
480     case MESSAGE_CANCEL:
481       break;
482     default:
483       THROW_IMPOSSIBLE;
484   }
485   // Update the peer speed.
486   if (remote_peer) {
487     connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
488   }
489   peer->begin_receive_time = simgrid_get_clock();
490 }
491
492 /** Selects the appropriate piece to download and requests it to the remote_peer */
493 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
494 {
495   int piece = select_piece_to_download(peer, remote_peer);
496   if (piece != -1) {
497     peer->current_pieces |= (1U << (unsigned int)piece);
498     send_request_to_peer(peer, remote_peer, piece);
499   }
500 }
501
502 /** remove current_piece from the list of currently downloaded pieces. */
503 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
504 {
505   peer->current_pieces &= ~(1U << current_piece);
506   remote_peer->current_piece = -1;
507 }
508
509 /** @brief Return the piece to be downloaded
510  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
511  * If a piece is partially downloaded, this piece will be selected prioritarily
512  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
513  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
514  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
515  * @param peer: local peer
516  * @param remote_peer: information about the connection
517  * @return the piece to download if possible. -1 otherwise
518  */
519 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
520 {
521   int piece = partially_downloaded_piece(peer, remote_peer);
522   // strict priority policy
523   if (piece != -1)
524     return piece;
525
526   // end game mode
527   if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
528       (is_interested(peer, remote_peer) != 0)) {
529     int nb_interesting_pieces = 0;
530     // compute the number of interesting pieces
531     for (int i = 0; i < FILE_PIECES; i++) {
532       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
533         nb_interesting_pieces++;
534       }
535     }
536     xbt_assert(nb_interesting_pieces != 0);
537     // get a random interesting piece
538     int random_piece_index = rand() % nb_interesting_pieces;
539     int current_index      = 0;
540     for (int i = 0; i < FILE_PIECES; i++) {
541       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
542         if (random_piece_index == current_index) {
543           piece = i;
544           break;
545         }
546         current_index++;
547       }
548     }
549     xbt_assert(piece != -1);
550     return piece;
551   }
552   // Random first policy
553   if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
554     int nb_interesting_pieces = 0;
555     // compute the number of interesting pieces
556     for (int i = 0; i < FILE_PIECES; i++) {
557       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
558           peer_is_not_downloading_piece(peer, i)) {
559         nb_interesting_pieces++;
560       }
561     }
562     xbt_assert(nb_interesting_pieces != 0);
563     // get a random interesting piece
564     int random_piece_index = rand() % nb_interesting_pieces;
565     int current_index      = 0;
566     for (int i = 0; i < FILE_PIECES; i++) {
567       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
568           peer_is_not_downloading_piece(peer, i)) {
569         if (random_piece_index == current_index) {
570           piece = i;
571           break;
572         }
573         current_index++;
574       }
575     }
576     xbt_assert(piece != -1);
577     return piece;
578   } else { // Rarest first policy
579     short min         = SHRT_MAX;
580     int nb_min_pieces = 0;
581     int current_index = 0;
582     // compute the smallest number of copies of available pieces
583     for (int i = 0; i < FILE_PIECES; i++) {
584       if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
585           peer_is_not_downloading_piece(peer, i))
586         min = peer->pieces_count[i];
587     }
588     xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
589     // compute the number of rarest pieces
590     for (int i = 0; i < FILE_PIECES; i++) {
591       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
592           peer_is_not_downloading_piece(peer, i))
593         nb_min_pieces++;
594     }
595     xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
596     // get a random rarest piece
597     int random_rarest_index = 0;
598     if (nb_min_pieces > 0) {
599       random_rarest_index = rand() % nb_min_pieces;
600     }
601     for (int i = 0; i < FILE_PIECES; i++) {
602       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
603           peer_is_not_downloading_piece(peer, i)) {
604         if (random_rarest_index == current_index) {
605           piece = i;
606           break;
607         }
608         current_index++;
609       }
610     }
611     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
612     return piece;
613   }
614 }
615
616 /** Update the list of current choked and unchoked peers, using the choke algorithm */
617 void update_choked_peers(peer_t peer)
618 {
619   if (nb_interested_peers(peer) == 0)
620     return;
621   XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
622   // update the current round
623   peer->round = (peer->round + 1) % 3;
624   char* key;
625   char* choked_key         = NULL;
626   connection_t chosen_peer = NULL;
627   connection_t choked_peer = NULL;
628   // remove a peer from the list
629   xbt_dict_cursor_t cursor = NULL;
630   xbt_dict_cursor_first(peer->active_peers, &cursor);
631   if (!xbt_dict_is_empty(peer->active_peers)) {
632     choked_key  = xbt_dict_cursor_get_key(cursor);
633     choked_peer = xbt_dict_cursor_get_data(cursor);
634   }
635   xbt_dict_cursor_free(&cursor);
636
637   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
638   if (has_finished(peer->bitfield)) {
639     connection_t connection;
640     double unchoke_time = simgrid_get_clock() + 1;
641
642     xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
643       if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
644           (connection->choked_upload != 0)) {
645         unchoke_time = connection->last_unchoke;
646         chosen_peer  = connection;
647       }
648     }
649   } else {
650     // Random optimistic unchoking
651     if (peer->round == 0) {
652       int j = 0;
653       do {
654         // We choose a random peer to unchoke.
655         int id_chosen = 0;
656         if (xbt_dict_length(peer->connected_peers) > 0) {
657           id_chosen = rand() % xbt_dict_length(peer->connected_peers);
658         }
659         int i = 0;
660         connection_t connection;
661         xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
662           if (i == id_chosen) {
663             chosen_peer = connection;
664             break;
665           }
666           i++;
667         }
668         xbt_dict_cursor_free(&cursor);
669         xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
670         if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
671           chosen_peer = NULL;
672         else
673           XBT_DEBUG("Nothing to do, keep going");
674         j++;
675       } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
676     } else {
677       // Use the "fastest download" policy.
678       connection_t connection;
679       double fastest_speed = 0.0;
680       xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
681         if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
682             (connection->interested != 0)) {
683           chosen_peer   = connection;
684           fastest_speed = connection->peer_speed;
685         }
686       }
687     }
688   }
689
690   if (chosen_peer != NULL)
691     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
692               chosen_peer->interested, chosen_peer->choked_upload);
693
694   if (choked_peer != chosen_peer) {
695     if (choked_peer != NULL) {
696       xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
697       choked_peer->choked_upload = 1;
698       xbt_assert((*((int*)choked_key) == choked_peer->id));
699       update_active_peers_set(peer, choked_peer);
700       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
701       send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
702     }
703     if (chosen_peer != NULL) {
704       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
705       chosen_peer->choked_upload = 0;
706       xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
707       chosen_peer->last_unchoke = simgrid_get_clock();
708       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
709       update_active_peers_set(peer, chosen_peer);
710       send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
711     }
712   }
713 }
714
715 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
716 void update_interested_after_receive(const_peer_t peer)
717 {
718   char* key;
719   xbt_dict_cursor_t cursor;
720   connection_t connection;
721   xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
722     if (connection->am_interested != 0) {
723       int interested = 0;
724       // Check if the peer still has a piece we want.
725       for (int i = 0; i < FILE_PIECES; i++) {
726         if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
727           interested = 1;
728           break;
729         }
730       }
731       if (!interested) { // no more piece to download from connection
732         connection->am_interested = 0;
733         send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
734       }
735     }
736   }
737 }
738
739 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
740 {
741   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
742   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
743   for (int i = block_index; i < (block_index + block_length); i++) {
744     peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
745   }
746 }
747
748 /** Returns if a peer has completed the download of a piece */
749 int piece_complete(const_peer_t peer, int index)
750 {
751   for (int i = 0; i < PIECES_BLOCKS; i++) {
752     if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
753       return 0;
754     }
755   }
756   return 1;
757 }
758
759 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
760 int get_first_missing_block_from(const_peer_t peer, int piece)
761 {
762   for (int i = 0; i < PIECES_BLOCKS; i++) {
763     if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
764       return i;
765     }
766   }
767   return -1;
768 }
769
770 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
771 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
772 {
773   for (int i = 0; i < FILE_PIECES; i++) {
774     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
775         get_first_missing_block_from(peer, i) > 0)
776       return i;
777   }
778   return -1;
779 }
780
781 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
782 {
783   return !(peer->bitfield & 1U << piece);
784 }
785
786 /** Check that a piece is not currently being download by the peer. */
787 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
788 {
789   return !(peer->current_pieces & 1U << piece);
790 }
791
792 /***************** Connection internal functions ***********************/
793 connection_t connection_new(int id)
794 {
795   connection_t connection = xbt_new(s_connection_t, 1);
796   char mailbox_name[MAILBOX_SIZE];
797   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
798   connection->id              = id;
799   connection->mailbox         = sg_mailbox_by_name(mailbox_name);
800   connection->bitfield        = 0;
801   connection->peer_speed      = 0;
802   connection->last_unchoke    = 0;
803   connection->current_piece   = -1;
804   connection->am_interested   = 0;
805   connection->interested      = 0;
806   connection->choked_upload   = 1;
807   connection->choked_download = 1;
808
809   return connection;
810 }
811
812 void connection_add_speed_value(connection_t connection, double speed)
813 {
814   connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
815 }
816
817 int connection_has_piece(const_connection_t connection, unsigned int piece)
818 {
819   return (connection->bitfield & 1U << piece);
820 }
821
822 /***************** Messages creation functions ***********************/
823 /** @brief Build a new empty message */
824 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
825 {
826   message_t message       = xbt_new(s_message_t, 1);
827   message->peer_id        = peer_id;
828   message->return_mailbox = return_mailbox;
829   message->type           = type;
830   return message;
831 }
832
833 /** Builds a message containing an index. */
834 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
835 {
836   message_t message = message_new(type, peer_id, return_mailbox);
837   message->piece    = index;
838   return message;
839 }
840
841 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
842 {
843   message_t message = message_new(type, peer_id, return_mailbox);
844   message->bitfield = bitfield;
845   return message;
846 }
847
848 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
849 {
850   message_t message     = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
851   message->block_index  = block_index;
852   message->block_length = block_length;
853   return message;
854 }
855
856 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
857 {
858   message_t message     = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
859   message->block_index  = block_index;
860   message->block_length = block_length;
861   return message;
862 }