Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
1996437ec56ad185d4a62500f9b9192fde3c68c6
[simgrid.git] / examples / c / app-bittorrent / bittorrent-peer.c
1 /* Copyright (c) 2012-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "bittorrent-peer.h"
7 #include "tracker.h"
8 #include <simgrid/forward.h>
9 #include <xbt/ex.h>
10
11 #include <limits.h>
12 #include <stdio.h> /* snprintf */
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
15
16 /*
17  * User parameters for transferred file data. For the test, the default values are :
18  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
19  */
20 #define FILE_PIECES 10UL
21 #define PIECES_BLOCKS 5UL
22 #define BLOCK_SIZE 16384
23
24 /** Number of blocks asked by each request */
25 #define BLOCKS_REQUESTED 2UL
26
27 #define SLEEP_DURATION 1
28 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
29
30 const char* const message_type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
31                                             "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
32
33 #ifndef MIN
34 #define MIN(a, b) ((a) < (b) ? (a) : (b))
35 #endif
36
37 static peer_t peer_init(int id, int seed)
38 {
39   peer_t peer = xbt_new(s_peer_t, 1);
40   peer->id    = id;
41
42   char mailbox_name[MAILBOX_SIZE];
43   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
44   peer->mailbox = sg_mailbox_by_name(mailbox_name);
45
46   peer->connected_peers = xbt_dict_new_homogeneous(NULL);
47   peer->active_peers    = xbt_dict_new_homogeneous(NULL);
48
49   if (seed) {
50     peer->bitfield        = (1U << FILE_PIECES) - 1U;
51     peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
52   } else {
53     peer->bitfield        = 0;
54     peer->bitfield_blocks = 0;
55   }
56
57   peer->current_pieces = 0;
58   peer->pieces_count   = xbt_new0(short, FILE_PIECES);
59   peer->comm_received  = NULL;
60   peer->round          = 0;
61
62   return peer;
63 }
64
65 static void peer_free(peer_t peer)
66 {
67   char* key;
68   connection_t connection;
69   xbt_dict_cursor_t cursor;
70   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
71     xbt_free(connection);
72
73   xbt_dict_free(&peer->connected_peers);
74   xbt_dict_free(&peer->active_peers);
75   xbt_free(peer->pieces_count);
76   xbt_free(peer);
77 }
78
79 /** Peer main function */
80 void peer(int argc, char* argv[])
81 {
82   // Check arguments
83   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
84
85   // Build peer object
86   peer_t peer = peer_init((int)xbt_str_parse_int(argv[1], "Invalid ID"), argc == 4 ? 1 : 0);
87
88   // Retrieve deadline
89   peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline");
90   xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
91
92   char* status = xbt_malloc0(FILE_PIECES + 1);
93   get_status(status, peer->bitfield);
94
95   XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
96
97   // Getting peer data from the tracker.
98   if (get_peers_from_tracker(peer)) {
99     XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
100     peer->begin_receive_time = simgrid_get_clock();
101     sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
102
103     if (has_finished(peer->bitfield)) {
104       send_handshake_to_all_peers(peer);
105     } else {
106       leech(peer);
107     }
108     seed(peer);
109   } else {
110     XBT_INFO("Couldn't contact the tracker.");
111   }
112
113   get_status(status, peer->bitfield);
114   XBT_INFO("Here is my current status: %s", status);
115   if (peer->comm_received) {
116     sg_comm_unref(peer->comm_received);
117   }
118
119   xbt_free(status);
120   peer_free(peer);
121 }
122
123 /** @brief Retrieves the peer list from the tracker */
124 int get_peers_from_tracker(const_peer_t peer)
125 {
126   sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
127
128   // Build the task to send to the tracker
129   tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
130
131   XBT_DEBUG("Sending a peer request to the tracker.");
132   sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
133   sg_error_t res    = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
134
135   if (res == SG_ERROR_TIMEOUT) {
136     XBT_DEBUG("Timeout expired when requesting peers to tracker");
137     xbt_free(peer_request);
138     return 0;
139   }
140
141   void* message           = NULL;
142   sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
143   res                     = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
144   if (res == SG_OK) {
145     const_tracker_answer_t ta = (const_tracker_answer_t)message;
146     // Add the peers the tracker gave us to our peer list.
147     unsigned i;
148     int peer_id;
149     // Add the peers the tracker gave us to our peer list.
150     xbt_dynar_foreach (ta->peers, i, peer_id) {
151       if (peer_id != peer->id)
152         xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
153     }
154     tracker_answer_free(message);
155   } else if (res == SG_ERROR_TIMEOUT) {
156     XBT_DEBUG("Timeout expired when requesting peers to tracker");
157     tracker_answer_free(message);
158     return 0;
159   }
160
161   return 1;
162 }
163
164 /** @brief Send a handshake message to all the peers the peer has. */
165 void send_handshake_to_all_peers(const_peer_t peer)
166 {
167   connection_t remote_peer;
168   xbt_dict_cursor_t cursor = NULL;
169   char* key;
170   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
171     message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
172     sg_comm_t comm      = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
173     sg_comm_detach(comm, NULL);
174     XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
175   }
176 }
177
178 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
179 {
180   XBT_DEBUG("Sending %s to %s", message_type_names[type], sg_mailbox_get_name(mailbox));
181   message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
182   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, size);
183   sg_comm_detach(comm, NULL);
184 }
185
186 /** @brief Send a bitfield message to all the peers the peer has */
187 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
188 {
189   XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
190   message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
191   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
192   sg_comm_detach(comm, NULL);
193 }
194
195 /** Send a "piece" message to a pair, containing a piece of the file */
196 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
197 {
198   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
199   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
200   xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
201   message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
202   sg_comm_t comm    = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
203   sg_comm_detach(comm, NULL);
204 }
205
206 /** Send a "HAVE" message to all peers we are connected to */
207 void send_have_to_all_peers(const_peer_t peer, int piece)
208 {
209   XBT_DEBUG("Sending HAVE message to all my peers");
210   connection_t remote_peer;
211   xbt_dict_cursor_t cursor = NULL;
212   char* key;
213   xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
214     message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
215     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
216     sg_comm_detach(comm, NULL);
217   }
218 }
219
220 /** @brief Send request messages to a peer that have unchoked us */
221 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
222 {
223   remote_peer->current_piece = piece;
224   xbt_assert(connection_has_piece(remote_peer, piece));
225   int block_index = get_first_missing_block_from(peer, piece);
226   if (block_index != -1) {
227     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
228     XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
229               block_index, block_length);
230     message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
231     sg_comm_t comm    = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
232     sg_comm_detach(comm, NULL);
233   }
234 }
235
236 void get_status(char* status, unsigned int bitfield)
237 {
238   for (int i = FILE_PIECES - 1; i >= 0; i--)
239     status[i] = (bitfield & (1U << i)) ? '1' : '0';
240   status[FILE_PIECES] = '\0';
241 }
242
243 int has_finished(unsigned int bitfield)
244 {
245   return bitfield == (1U << FILE_PIECES) - 1U;
246 }
247
248 /** Indicates if the remote peer has a piece not stored by the local peer */
249 int is_interested(const_peer_t peer, const_connection_t remote_peer)
250 {
251   return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
252 }
253
254 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
255 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
256 {
257   for (int i = 0; i < FILE_PIECES; i++)
258     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
259       return 1;
260   return 0;
261 }
262
263 /** @brief Updates the list of who has a piece from a bitfield */
264 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
265 {
266   for (int i = 0; i < FILE_PIECES; i++)
267     if (bitfield & (1U << i))
268       peer->pieces_count[i]++;
269 }
270
271 int count_pieces(unsigned int bitfield)
272 {
273   int count      = 0;
274   unsigned int n = bitfield;
275   while (n) {
276     count += n & 1U;
277     n >>= 1U;
278   }
279   return count;
280 }
281
282 int nb_interested_peers(const_peer_t peer)
283 {
284   xbt_dict_cursor_t cursor = NULL;
285   char* key;
286   connection_t connection;
287   int nb = 0;
288   xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
289     if (connection->interested)
290       nb++;
291
292   return nb;
293 }
294
295 /** @brief Peer main loop when it is leeching. */
296 void leech(peer_t peer)
297 {
298   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
299   XBT_DEBUG("Start downloading.");
300
301   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
302   send_handshake_to_all_peers(peer);
303   XBT_DEBUG("Starting main leech loop");
304
305   void* data = NULL;
306   while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
307     if (peer->comm_received == NULL)
308       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
309
310     if (sg_comm_test(peer->comm_received)) {
311       peer->message = (message_t)data;
312       handle_message(peer, peer->message);
313       xbt_free(peer->message);
314       peer->comm_received = NULL;
315     } else {
316       // We don't execute the choke algorithm if we don't already have a piece
317       if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
318         update_choked_peers(peer);
319         next_choked_update += UPDATE_CHOKED_INTERVAL;
320       } else {
321         sg_actor_sleep_for(SLEEP_DURATION);
322       }
323     }
324   }
325   if (has_finished(peer->bitfield))
326     XBT_DEBUG("%d becomes a seeder", peer->id);
327 }
328
329 /** @brief Peer main loop when it is seeding */
330 void seed(peer_t peer)
331 {
332   double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
333   XBT_DEBUG("Start seeding.");
334   // start the main seed loop
335   void* data = NULL;
336   while (simgrid_get_clock() < peer->deadline) {
337     if (peer->comm_received == NULL)
338       peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
339
340     if (sg_comm_test(peer->comm_received)) {
341       peer->message = (message_t)data;
342       handle_message(peer, peer->message);
343       xbt_free(peer->message);
344       peer->comm_received = NULL;
345     } else {
346       if (simgrid_get_clock() >= next_choked_update) {
347         update_choked_peers(peer);
348         // TODO: Change the choked peer algorithm when seeding.
349         next_choked_update += UPDATE_CHOKED_INTERVAL;
350       } else {
351         sg_actor_sleep_for(SLEEP_DURATION);
352       }
353     }
354   }
355 }
356
357 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
358 {
359   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
360     // add in the active peers set
361     xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
362   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
363     xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
364   }
365 }
366
367 /** @brief Handle a received message sent by another peer */
368 void handle_message(peer_t peer, message_t message)
369 {
370   XBT_DEBUG("Received a %s message from %s", message_type_names[message->type],
371             sg_mailbox_get_name(message->return_mailbox));
372
373   connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
374   xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
375              "The impossible did happened: A not-in-our-list peer sent us a message.");
376
377   switch (message->type) {
378     case MESSAGE_HANDSHAKE:
379       // Check if the peer is in our connection list.
380       if (remote_peer == 0) {
381         xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
382                          connection_new(message->peer_id));
383         send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
384       }
385       // Send our bitfield to the peer
386       send_bitfield(peer, message->return_mailbox);
387       break;
388     case MESSAGE_BITFIELD:
389       // Update the pieces list
390       update_pieces_count_from_bitfield(peer, message->bitfield);
391       // Store the bitfield
392       remote_peer->bitfield = message->bitfield;
393       xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
394       if (is_interested(peer, remote_peer)) {
395         remote_peer->am_interested = 1;
396         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
397       }
398       break;
399     case MESSAGE_INTERESTED:
400       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
401       // Update the interested state of the peer.
402       remote_peer->interested = 1;
403       update_active_peers_set(peer, remote_peer);
404       break;
405     case MESSAGE_NOTINTERESTED:
406       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
407       remote_peer->interested = 0;
408       update_active_peers_set(peer, remote_peer);
409       break;
410     case MESSAGE_UNCHOKE:
411       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
412       xbt_assert(remote_peer->choked_download);
413       remote_peer->choked_download = 0;
414       // Send requests to the peer, since it has unchoked us
415       if (remote_peer->am_interested)
416         request_new_piece_to_peer(peer, remote_peer);
417       break;
418     case MESSAGE_CHOKE:
419       xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
420       xbt_assert(!remote_peer->choked_download);
421       remote_peer->choked_download = 1;
422       if (remote_peer->current_piece != -1)
423         remove_current_piece(peer, remote_peer, remote_peer->current_piece);
424       break;
425     case MESSAGE_HAVE:
426       XBT_DEBUG("\t for piece %d", message->piece);
427       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
428       remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
429       peer->pieces_count[message->piece]++;
430       // If the piece is in our pieces, we tell the peer that we are interested.
431       if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
432         remote_peer->am_interested = 1;
433         send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
434         if (remote_peer->choked_download == 0)
435           request_new_piece_to_peer(peer, remote_peer);
436       }
437       break;
438     case MESSAGE_REQUEST:
439       xbt_assert(remote_peer->interested);
440       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
441       if (remote_peer->choked_upload == 0) {
442         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
443                   message->block_index + message->block_length);
444         if (!peer_has_not_piece(peer, message->piece)) {
445           send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
446         }
447       } else {
448         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
449       }
450       break;
451     case MESSAGE_PIECE:
452       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
453                 message->block_index + message->block_length);
454       xbt_assert(!remote_peer->choked_download);
455       xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
456       xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
457       // TODO: Execute a computation.
458       if (peer_has_not_piece(peer, message->piece)) {
459         update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
460         if (piece_complete(peer, message->piece)) {
461           // Removing the piece from our piece list
462           remove_current_piece(peer, remote_peer, message->piece);
463           // Setting the fact that we have the piece
464           peer->bitfield = peer->bitfield | (1U << message->piece);
465           char* status   = xbt_malloc0(FILE_PIECES + 1);
466           get_status(status, peer->bitfield);
467           XBT_DEBUG("My status is now %s", status);
468           xbt_free(status);
469           // Sending the information to all the peers we are connected to
470           send_have_to_all_peers(peer, message->piece);
471           // sending UNINTERESTED to peers that do not have what we want.
472           update_interested_after_receive(peer);
473         } else {                                                   // piece not completed
474           send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
475         }
476       } else {
477         XBT_DEBUG("However, we already have it");
478         request_new_piece_to_peer(peer, remote_peer);
479       }
480       break;
481     case MESSAGE_CANCEL:
482       break;
483     default:
484       THROW_IMPOSSIBLE;
485   }
486   // Update the peer speed.
487   if (remote_peer) {
488     connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
489   }
490   peer->begin_receive_time = simgrid_get_clock();
491 }
492
493 /** Selects the appropriate piece to download and requests it to the remote_peer */
494 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
495 {
496   int piece = select_piece_to_download(peer, remote_peer);
497   if (piece != -1) {
498     peer->current_pieces |= (1U << (unsigned int)piece);
499     send_request_to_peer(peer, remote_peer, piece);
500   }
501 }
502
503 /** remove current_piece from the list of currently downloaded pieces. */
504 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
505 {
506   peer->current_pieces &= ~(1U << current_piece);
507   remote_peer->current_piece = -1;
508 }
509
510 /** @brief Return the piece to be downloaded
511  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
512  * If a piece is partially downloaded, this piece will be selected prioritarily
513  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
514  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
515  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
516  * @param peer: local peer
517  * @param remote_peer: information about the connection
518  * @return the piece to download if possible. -1 otherwise
519  */
520 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
521 {
522   int piece = partially_downloaded_piece(peer, remote_peer);
523   // strict priority policy
524   if (piece != -1)
525     return piece;
526
527   // end game mode
528   if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
529       (is_interested(peer, remote_peer) != 0)) {
530     int nb_interesting_pieces = 0;
531     // compute the number of interesting pieces
532     for (int i = 0; i < FILE_PIECES; i++) {
533       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
534         nb_interesting_pieces++;
535       }
536     }
537     xbt_assert(nb_interesting_pieces != 0);
538     // get a random interesting piece
539     int random_piece_index = rand() % nb_interesting_pieces;
540     int current_index      = 0;
541     for (int i = 0; i < FILE_PIECES; i++) {
542       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
543         if (random_piece_index == current_index) {
544           piece = i;
545           break;
546         }
547         current_index++;
548       }
549     }
550     xbt_assert(piece != -1);
551     return piece;
552   }
553   // Random first policy
554   if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
555     int nb_interesting_pieces = 0;
556     // compute the number of interesting pieces
557     for (int i = 0; i < FILE_PIECES; i++) {
558       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
559           peer_is_not_downloading_piece(peer, i)) {
560         nb_interesting_pieces++;
561       }
562     }
563     xbt_assert(nb_interesting_pieces != 0);
564     // get a random interesting piece
565     int random_piece_index = rand() % nb_interesting_pieces;
566     int current_index      = 0;
567     for (int i = 0; i < FILE_PIECES; i++) {
568       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
569           peer_is_not_downloading_piece(peer, i)) {
570         if (random_piece_index == current_index) {
571           piece = i;
572           break;
573         }
574         current_index++;
575       }
576     }
577     xbt_assert(piece != -1);
578     return piece;
579   } else { // Rarest first policy
580     short min         = SHRT_MAX;
581     int nb_min_pieces = 0;
582     int current_index = 0;
583     // compute the smallest number of copies of available pieces
584     for (int i = 0; i < FILE_PIECES; i++) {
585       if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
586           peer_is_not_downloading_piece(peer, i))
587         min = peer->pieces_count[i];
588     }
589     xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
590     // compute the number of rarest pieces
591     for (int i = 0; i < FILE_PIECES; i++) {
592       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
593           peer_is_not_downloading_piece(peer, i))
594         nb_min_pieces++;
595     }
596     xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
597     // get a random rarest piece
598     int random_rarest_index = 0;
599     if (nb_min_pieces > 0) {
600       random_rarest_index = rand() % nb_min_pieces;
601     }
602     for (int i = 0; i < FILE_PIECES; i++) {
603       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
604           peer_is_not_downloading_piece(peer, i)) {
605         if (random_rarest_index == current_index) {
606           piece = i;
607           break;
608         }
609         current_index++;
610       }
611     }
612     xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
613     return piece;
614   }
615 }
616
617 /** Update the list of current choked and unchoked peers, using the choke algorithm */
618 void update_choked_peers(peer_t peer)
619 {
620   if (nb_interested_peers(peer) == 0)
621     return;
622   XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
623   // update the current round
624   peer->round = (peer->round + 1) % 3;
625   char* key;
626   char* choked_key         = NULL;
627   connection_t chosen_peer = NULL;
628   connection_t choked_peer = NULL;
629   // remove a peer from the list
630   xbt_dict_cursor_t cursor = NULL;
631   xbt_dict_cursor_first(peer->active_peers, &cursor);
632   if (!xbt_dict_is_empty(peer->active_peers)) {
633     choked_key  = xbt_dict_cursor_get_key(cursor);
634     choked_peer = xbt_dict_cursor_get_data(cursor);
635   }
636   xbt_dict_cursor_free(&cursor);
637
638   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
639   if (has_finished(peer->bitfield)) {
640     connection_t connection;
641     double unchoke_time = simgrid_get_clock() + 1;
642
643     xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
644       if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
645           (connection->choked_upload != 0)) {
646         unchoke_time = connection->last_unchoke;
647         chosen_peer  = connection;
648       }
649     }
650   } else {
651     // Random optimistic unchoking
652     if (peer->round == 0) {
653       int j = 0;
654       do {
655         // We choose a random peer to unchoke.
656         int id_chosen = 0;
657         if (xbt_dict_length(peer->connected_peers) > 0) {
658           id_chosen = rand() % xbt_dict_length(peer->connected_peers);
659         }
660         int i = 0;
661         connection_t connection;
662         xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
663           if (i == id_chosen) {
664             chosen_peer = connection;
665             break;
666           }
667           i++;
668         }
669         xbt_dict_cursor_free(&cursor);
670         xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
671         if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
672           chosen_peer = NULL;
673         else
674           XBT_DEBUG("Nothing to do, keep going");
675         j++;
676       } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
677     } else {
678       // Use the "fastest download" policy.
679       connection_t connection;
680       double fastest_speed = 0.0;
681       xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
682         if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
683             (connection->interested != 0)) {
684           chosen_peer   = connection;
685           fastest_speed = connection->peer_speed;
686         }
687       }
688     }
689   }
690
691   if (chosen_peer != NULL)
692     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
693               chosen_peer->interested, chosen_peer->choked_upload);
694
695   if (choked_peer != chosen_peer) {
696     if (choked_peer != NULL) {
697       xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
698       choked_peer->choked_upload = 1;
699       xbt_assert((*((int*)choked_key) == choked_peer->id));
700       update_active_peers_set(peer, choked_peer);
701       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
702       send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
703     }
704     if (chosen_peer != NULL) {
705       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
706       chosen_peer->choked_upload = 0;
707       xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
708       chosen_peer->last_unchoke = simgrid_get_clock();
709       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
710       update_active_peers_set(peer, chosen_peer);
711       send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
712     }
713   }
714 }
715
716 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
717 void update_interested_after_receive(const_peer_t peer)
718 {
719   char* key;
720   xbt_dict_cursor_t cursor;
721   connection_t connection;
722   xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
723     if (connection->am_interested != 0) {
724       int interested = 0;
725       // Check if the peer still has a piece we want.
726       for (int i = 0; i < FILE_PIECES; i++) {
727         if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
728           interested = 1;
729           break;
730         }
731       }
732       if (!interested) { // no more piece to download from connection
733         connection->am_interested = 0;
734         send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
735       }
736     }
737   }
738 }
739
740 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
741 {
742   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
743   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
744   for (int i = block_index; i < (block_index + block_length); i++) {
745     peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
746   }
747 }
748
749 /** Returns if a peer has completed the download of a piece */
750 int piece_complete(const_peer_t peer, int index)
751 {
752   for (int i = 0; i < PIECES_BLOCKS; i++) {
753     if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
754       return 0;
755     }
756   }
757   return 1;
758 }
759
760 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
761 int get_first_missing_block_from(const_peer_t peer, int piece)
762 {
763   for (int i = 0; i < PIECES_BLOCKS; i++) {
764     if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
765       return i;
766     }
767   }
768   return -1;
769 }
770
771 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
772 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
773 {
774   for (int i = 0; i < FILE_PIECES; i++) {
775     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
776         get_first_missing_block_from(peer, i) > 0)
777       return i;
778   }
779   return -1;
780 }
781
782 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
783 {
784   return !(peer->bitfield & 1U << piece);
785 }
786
787 /** Check that a piece is not currently being download by the peer. */
788 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
789 {
790   return !(peer->current_pieces & 1U << piece);
791 }
792
793 /***************** Connection internal functions ***********************/
794 connection_t connection_new(int id)
795 {
796   connection_t connection = xbt_new(s_connection_t, 1);
797   char mailbox_name[MAILBOX_SIZE];
798   snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
799   connection->id              = id;
800   connection->mailbox         = sg_mailbox_by_name(mailbox_name);
801   connection->bitfield        = 0;
802   connection->peer_speed      = 0;
803   connection->last_unchoke    = 0;
804   connection->current_piece   = -1;
805   connection->am_interested   = 0;
806   connection->interested      = 0;
807   connection->choked_upload   = 1;
808   connection->choked_download = 1;
809
810   return connection;
811 }
812
813 void connection_add_speed_value(connection_t connection, double speed)
814 {
815   connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
816 }
817
818 int connection_has_piece(const_connection_t connection, unsigned int piece)
819 {
820   return (connection->bitfield & 1U << piece);
821 }
822
823 /***************** Messages creation functions ***********************/
824 /** @brief Build a new empty message */
825 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
826 {
827   message_t message       = xbt_new(s_message_t, 1);
828   message->peer_id        = peer_id;
829   message->return_mailbox = return_mailbox;
830   message->type           = type;
831   return message;
832 }
833
834 /** Builds a message containing an index. */
835 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
836 {
837   message_t message = message_new(type, peer_id, return_mailbox);
838   message->piece    = index;
839   return message;
840 }
841
842 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
843 {
844   message_t message = message_new(type, peer_id, return_mailbox);
845   message->bitfield = bitfield;
846   return message;
847 }
848
849 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
850 {
851   message_t message     = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
852   message->block_index  = block_index;
853   message->block_length = block_length;
854   return message;
855 }
856
857 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
858 {
859   message_t message     = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
860   message->block_index  = block_index;
861   message->block_length = block_length;
862   return message;
863 }