Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
06fabc523240eba5d7ed1629c26457e0aa3aa6a2
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26
27 /**
28  *  Number of blocks asked by each request
29  */
30 #define BLOCKS_REQUESTED 2
31
32
33 static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
34
35 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
36 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
37 void remove_current_piece(peer_t peer, connection_t remote_peer,
38                           int current_piece);
39
40
41   /**
42  * Peer main function
43  */
44 int peer(int argc, char *argv[])
45 {
46   s_peer_t peer;
47   //Check arguments
48   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
49   //Build peer object
50   if (argc == 4) {
51     peer_init(&peer, atoi(argv[1]), 1);
52   } else {
53     peer_init(&peer, atoi(argv[1]), 0);
54   }
55   //Retrieve deadline
56   double deadline = atof(argv[2]);
57   xbt_assert(deadline > 0, "Wrong deadline supplied");
58   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
59   //Getting peer data from the tracker.
60   if (get_peers_data(&peer)) {
61     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
62     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
63     peer.begin_receive_time = MSG_get_clock();
64     MSG_mailbox_set_async(peer.mailbox);
65     if (has_finished(peer.bitfield)) {
66       peer.pieces = FILE_PIECES;
67       send_handshake_all(&peer);
68       seed_loop(&peer, deadline);
69     } else {
70       leech_loop(&peer, deadline);
71       seed_loop(&peer, deadline);
72     }
73   } else {
74     XBT_INFO("Couldn't contact the tracker.");
75   }
76
77   XBT_INFO("Here is my current status: %s", peer.bitfield);
78   if (peer.comm_received) {
79     MSG_comm_destroy(peer.comm_received);
80   }
81
82   peer_free(&peer);
83   return 0;
84 }
85
86 /**
87  * Peer main loop when it is leeching.
88  * @param peer peer data
89  * @param deadline time at which the peer has to leave
90  */
91 void leech_loop(peer_t peer, double deadline)
92 {
93   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
94   XBT_DEBUG("Start downloading.");
95   /*
96    * Send a "handshake" message to all the peers it got
97    * (since it couldn't have gotten more than 50 peers)
98    */
99   send_handshake_all(peer);
100   //Wait for at least one "bitfield" message.
101 //  wait_for_pieces(peer, deadline);
102   XBT_DEBUG("Starting main leech loop");
103
104   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
105     if (peer->comm_received == NULL) {
106       peer->task_received = NULL;
107       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
108     }
109     if (MSG_comm_test(peer->comm_received)) {
110       msg_error_t status = MSG_comm_get_status(peer->comm_received);
111       MSG_comm_destroy(peer->comm_received);
112       peer->comm_received = NULL;
113       if (status == MSG_OK) {
114         handle_message(peer, peer->task_received);
115       }
116     } else {
117       //We don't execute the choke algorithm if we don't already have a piece
118       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
119         update_choked_peers(peer);
120         next_choked_update += UPDATE_CHOKED_INTERVAL;
121       } else {
122         MSG_process_sleep(1);
123       }
124     }
125   }
126 //  if (peer->pieces == FILE_PIECES)
127 //    XBT_INFO("%d becomes a seeder", peer->id);
128
129 }
130
131 /**
132  * Peer main loop when it is seeding
133  * @param peer peer data
134  * @param deadline time when the peer will leave
135  */
136 void seed_loop(peer_t peer, double deadline)
137 {
138   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
139   XBT_DEBUG("Start seeding.");
140   //start the main seed loop
141   while (MSG_get_clock() < deadline) {
142     if (peer->comm_received == NULL) {
143       peer->task_received = NULL;
144       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
145     }
146     if (MSG_comm_test(peer->comm_received)) {
147       msg_error_t status = MSG_comm_get_status(peer->comm_received);
148       MSG_comm_destroy(peer->comm_received);
149       peer->comm_received = NULL;
150       if (status == MSG_OK) {
151         handle_message(peer, peer->task_received);
152       }
153     } else {
154       if (MSG_get_clock() >= next_choked_update) {
155         update_choked_peers(peer);
156         //TODO: Change the choked peer algorithm when seeding.
157         next_choked_update += UPDATE_CHOKED_INTERVAL;
158       } else {
159         MSG_process_sleep(1);
160       }
161     }
162   }
163 }
164
165 /**
166  * Retrieves the peer list from the tracker
167  * @param peer current peer data
168  */
169 int get_peers_data(peer_t peer)
170 {
171   int success = 0, send_success = 0;
172   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
173   //Build the task to send to the tracker
174   tracker_task_data_t data =
175       tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
176                             peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
177   //Build the task to send.
178   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
179   msg_task_t task_received = NULL;
180   msg_comm_t comm_received;
181   while (!send_success && MSG_get_clock() < timeout) {
182     XBT_DEBUG("Sending a peer request to the tracker.");
183     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
184                                                     GET_PEERS_TIMEOUT);
185     if (status == MSG_OK) {
186       send_success = 1;
187     }
188   }
189   while (!success && MSG_get_clock() < timeout) {
190     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
191     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
192     if (status == MSG_OK) {
193       tracker_task_data_t data = MSG_task_get_data(task_received);
194       unsigned i;
195       int peer_id;
196       //Add the peers the tracker gave us to our peer list.
197       xbt_dynar_foreach(data->peers, i, peer_id) {
198         if (peer_id != peer->id)
199           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
200                            connection_new(peer_id), NULL);
201       }
202       success = 1;
203       //free the communication and the task
204       MSG_comm_destroy(comm_received);
205       tracker_task_data_free(data);
206       MSG_task_destroy(task_received);
207       comm_received = NULL;
208     }
209   }
210
211   return success;
212 }
213
214 /**
215  * Initialize the peer data.
216  * @param peer peer data
217  * @param id id of the peer to take in the network
218  * @param seed indicates if the peer is a seed.
219  */
220 void peer_init(peer_t peer, int id, int seed)
221 {
222   peer->id = id;
223   sprintf(peer->mailbox, "%d", id);
224   sprintf(peer->mailbox_tracker, "tracker_%d", id);
225   peer->peers = xbt_dict_new();
226   peer->active_peers = xbt_dict_new();
227   peer->hostname = MSG_host_get_name(MSG_host_self());
228
229   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
230   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
231   if (seed) {
232     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
233     memset(peer->bitfield_blocks, '1',
234            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
235   } else {
236     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
237     memset(peer->bitfield_blocks, '0',
238            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
239   }
240
241   peer->bitfield[FILE_PIECES] = '\0';
242   peer->pieces = 0;
243
244   peer->pieces_count = xbt_new0(short, FILE_PIECES);
245
246   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
247
248   peer->stream = RngStream_CreateStream("");
249   peer->comm_received = NULL;
250
251   peer->round = 0;
252
253 }
254
255 /**
256  * Destroys a poor peer object.
257  */
258 void peer_free(peer_t peer)
259 {
260   char *key;
261   connection_t connection;
262   xbt_dict_cursor_t cursor;
263   xbt_dict_foreach(peer->peers, cursor, key, connection) {
264     connection_free(connection);
265   }
266   xbt_dict_free(&peer->peers);
267   xbt_dict_free(&peer->active_peers);
268   xbt_dynar_free(&peer->current_pieces);
269   xbt_free(peer->pieces_count);
270   xbt_free(peer->bitfield);
271   xbt_free(peer->bitfield_blocks);
272
273   RngStream_DeleteStream(&peer->stream);
274 }
275
276 /**
277  * Returns if a peer has finished downloading the file
278  * @param bitfield peer bitfield
279  */
280 int has_finished(char *bitfield)
281 {
282   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
283 }
284
285 int nb_interested_peers(peer_t peer)
286 {
287   xbt_dict_cursor_t cursor = NULL;
288   char *key;
289   connection_t connection;
290   int nb = 0;
291   xbt_dict_foreach(peer->peers, cursor, key, connection) {
292     if (connection->interested)
293       nb++;
294   }
295   return nb;
296 }
297
298
299 void update_active_peers_set(peer_t peer, connection_t remote_peer)
300 {
301
302   if (remote_peer->interested && !remote_peer->choked_upload) {
303     //add in the active peers set
304     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
305                      sizeof(int), remote_peer, NULL);
306   } else {
307     //remove
308     xbt_ex_t e;
309     TRY {
310       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
311                           sizeof(int));
312     }
313     CATCH(e) {
314       xbt_ex_free(e);
315     }
316   }
317 }
318
319
320 /**
321  * Handle a received message sent by another peer
322  * @param peer Peer data
323  * @param task task received.
324  */
325 void handle_message(peer_t peer, msg_task_t task)
326 {
327   message_t message = MSG_task_get_data(task);
328   connection_t remote_peer;
329   remote_peer =
330       xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
331                                sizeof(int));
332   switch (message->type) {
333
334   case MESSAGE_HANDSHAKE:
335     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
336               message->issuer_host_name);
337     //Check if the peer is in our connection list.
338     if (!remote_peer) {
339       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
340                        connection_new(message->peer_id), NULL);
341       send_handshake(peer, message->mailbox);
342     }
343     //Send our bitfield to the peer
344     send_bitfield(peer, message->mailbox);
345     break;
346   case MESSAGE_BITFIELD:
347     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
348               message->issuer_host_name);
349     //Update the pieces list
350     update_pieces_count_from_bitfield(peer, message->bitfield);
351     //Store the bitfield
352     remote_peer->bitfield = xbt_strdup(message->bitfield);
353     xbt_assert(!remote_peer->am_interested,
354                "Should not be interested at first");
355     if (is_interested(peer, remote_peer)) {
356       remote_peer->am_interested = 1;
357       send_interested(peer, message->mailbox);
358     }
359     break;
360   case MESSAGE_INTERESTED:
361     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
362               message->issuer_host_name);
363     xbt_assert((remote_peer != NULL),
364                "A non-in-our-list peer has sent us a message. WTH ?");
365     //Update the interested state of the peer.
366     remote_peer->interested = 1;
367     update_active_peers_set(peer, remote_peer);
368     break;
369   case MESSAGE_NOTINTERESTED:
370     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
371               message->mailbox, message->issuer_host_name);
372     xbt_assert((remote_peer != NULL),
373                "A non-in-our-list peer has sent us a message. WTH ?");
374     remote_peer->interested = 0;
375     update_active_peers_set(peer, remote_peer);
376     break;
377   case MESSAGE_UNCHOKE:
378     xbt_assert((remote_peer != NULL),
379                "A non-in-our-list peer has sent us a message. WTH ?");
380     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
381               message->issuer_host_name);
382     remote_peer->choked_download = 0;
383     //Send requests to the peer, since it has unchoked us
384     if (remote_peer->am_interested)
385       request_new_piece_to_peer(peer, remote_peer);
386     break;
387   case MESSAGE_CHOKE:
388     xbt_assert((remote_peer != NULL),
389                "A non-in-our-list peer has sent us a message. WTH ?");
390     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
391               message->issuer_host_name);
392     remote_peer->choked_download = 1;
393     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
394     break;
395   case MESSAGE_HAVE:
396     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
397               message->mailbox, message->issuer_host_name, message->index);
398     xbt_assert(remote_peer->bitfield, "bitfield not received");
399     xbt_assert((message->index >= 0
400                 && message->index < FILE_PIECES),
401                "Wrong HAVE message received");
402     remote_peer->bitfield[message->index] = '1';
403     peer->pieces_count[message->index]++;
404     //If the piece is in our pieces, we tell the peer that we are interested.
405     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
406       remote_peer->am_interested = 1;
407       send_interested(peer, message->mailbox);
408       if (!remote_peer->choked_download)
409         request_new_piece_to_peer(peer, remote_peer);
410     }
411     break;
412   case MESSAGE_REQUEST:
413     xbt_assert((message->index >= 0
414                 && message->index < FILE_PIECES), "Wrong request received");
415     if (!remote_peer->choked_upload) {
416       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
417                 message->mailbox, message->issuer_host_name, message->peer_id,
418                 message->block_index,
419                 message->block_index + message->block_length);
420       if (peer->bitfield[message->index] == '1') {
421         send_piece(peer, message->mailbox, message->index, 0,
422                    message->block_index, message->block_length);
423       }
424     } else {
425       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
426                 message->mailbox, message->issuer_host_name, message->peer_id);
427     }
428     break;
429   case MESSAGE_PIECE:
430     xbt_assert((message->index >= 0
431                 && message->index < FILE_PIECES), "Wrong piece received");
432     //TODO: Execute Ã  computation.
433     if (message->stalled) {
434       XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
435                 message->index, message->mailbox, message->issuer_host_name);
436     } else {
437       XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
438                 message->block_index,
439                 message->block_index + message->block_length,
440                 message->mailbox, message->issuer_host_name);
441       if (peer->bitfield[message->index] == '0') {
442         update_bitfield_blocks(peer, message->index, message->block_index,
443                                message->block_length);
444         if (piece_complete(peer, message->index)) {
445           //Removing the piece from our piece list
446           remove_current_piece(peer, remote_peer, message->index);
447           //Setting the fact that we have the piece
448           peer->bitfield[message->index] = '1';
449           peer->pieces++;
450           XBT_DEBUG("My status is now %s", peer->bitfield);
451           //Sending the information to all the peers we are connected to
452           send_have(peer, message->index);
453           //sending UNINTERSTED to peers that doesn't have what we want.
454           update_interested_after_receive(peer);
455         } else {                // piece not completed
456           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
457         }
458       } else {
459         XBT_DEBUG("However, we already have it");
460         request_new_piece_to_peer(peer, remote_peer);
461       }
462     }
463     break;
464   case MESSAGE_CANCEL:
465     XBT_DEBUG("The received CANCEL from %s (%s)",
466               message->mailbox, message->issuer_host_name);
467     break;
468   }
469   //Update the peer speed.
470   if (remote_peer) {
471     connection_add_speed_value(remote_peer,
472                                1.0 / (MSG_get_clock() -
473                                       peer->begin_receive_time));
474   }
475   peer->begin_receive_time = MSG_get_clock();
476
477   task_message_free(task);
478 }
479
480 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
481 {
482   int piece = select_piece_to_download(peer, remote_peer);
483   if (piece != -1) {
484     xbt_dynar_push_as(peer->current_pieces, int, piece);
485     send_request_to_peer(peer, remote_peer, piece);
486   }
487 }
488
489 void remove_current_piece(peer_t peer, connection_t remote_peer,
490                           int current_piece)
491 {
492   int piece_index = -1, piece, i;
493   xbt_dynar_foreach(peer->current_pieces, i, piece) {
494     if (piece == current_piece) {
495       piece_index = i;
496       break;
497     }
498   }
499   if (piece_index != -1)
500     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
501   remote_peer->current_piece = -1;
502 }
503
504 /**
505  * Updates the list of who has a piece from a bitfield
506  * @param peer peer we want to update the list
507  * @param bitfield bitfield
508  */
509 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
510 {
511   int i;
512   for (i = 0; i < FILE_PIECES; i++) {
513     if (bitfield[i] == '1') {
514       peer->pieces_count[i]++;
515     }
516   }
517 }
518
519
520
521 /**
522  * Return the piece to be downloaded
523  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
524  * If a piece is partially downloaded, this piece will be selected prioritarily
525  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
526  * If the peer has more than pieces, he downloads the pieces that are the less
527  * replicated (rarest policy).
528  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
529  * @param peer: local peer
530  * @param remote_peer: information about the connection
531  * @return the piece to download if possible. -1 otherwise
532  */
533 int select_piece_to_download(peer_t peer, connection_t remote_peer)
534 {
535   int piece = -1;
536
537   piece = partially_downloaded_piece(peer, remote_peer);
538   // strict priority policy
539   if (piece != -1)
540     return piece;
541
542   // end game mode
543   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
544       && is_interested(peer, remote_peer)) {
545     int i;
546     int nb_interesting_pieces = 0;
547     int random_piece_index, current_index = 0;
548     // compute the number of interesting pieces
549     for (i = 0; i < FILE_PIECES; i++) {
550       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
551         nb_interesting_pieces++;
552       }
553     }
554     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
555     // get a random interesting piece
556     random_piece_index =
557         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
558     for (i = 0; i < FILE_PIECES; i++) {
559       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
560         if (random_piece_index == current_index) {
561           piece = i;
562           break;
563         }
564         current_index++;
565       }
566     }
567     xbt_assert(piece != -1, "WTF !!!");
568     return piece;
569   }
570   // Random first policy
571   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
572     int i;
573     int nb_interesting_pieces = 0;
574     int random_piece_index, current_index = 0;
575     // compute the number of interesting pieces
576     for (i = 0; i < FILE_PIECES; i++) {
577       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
578           && !in_current_pieces(peer, i)) {
579         nb_interesting_pieces++;
580       }
581     }
582     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
583     // get a random interesting piece
584     random_piece_index =
585         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
586     for (i = 0; i < FILE_PIECES; i++) {
587       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
588           && !in_current_pieces(peer, i)) {
589         if (random_piece_index == current_index) {
590           piece = i;
591           break;
592         }
593         current_index++;
594       }
595     }
596     xbt_assert(piece != -1, "WTF !!!");
597     return piece;
598   } else {                      // Rarest first policy
599     int i;
600     short min = SHRT_MAX;
601     int nb_min_pieces = 0;
602     int random_rarest_index, current_index = 0;
603     // compute the smallest number of copies of available pieces
604     for (i = 0; i < FILE_PIECES; i++) {
605       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
606           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
607         min = peer->pieces_count[i];
608     }
609     xbt_assert(min != SHRT_MAX
610                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
611     // compute the number of rarest pieces
612     for (i = 0; i < FILE_PIECES; i++) {
613       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
614           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
615         nb_min_pieces++;
616     }
617     xbt_assert(nb_min_pieces != 0
618                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
619     // get a random rarest piece
620     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
621     for (i = 0; i < FILE_PIECES; i++) {
622       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
623           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
624         if (random_rarest_index == current_index) {
625           piece = i;
626           break;
627         }
628         current_index++;
629       }
630     }
631     xbt_assert(piece != -1
632                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
633     return piece;
634   }
635 }
636
637
638 /**
639  * Update the list of current choked and unchoked peers, using the
640  * choke algorithm
641  * @param peer the current peer
642  */
643 void update_choked_peers(peer_t peer)
644 {
645   if (nb_interested_peers(peer) == 0)
646     return;
647   //  if(xbt_dict_size(peer->active_peers) > 0)
648   //    return;
649   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
650             xbt_dict_size(peer->active_peers));
651   //update the current round
652   peer->round = (peer->round + 1) % 3;
653   char *key, *key_choked;
654   connection_t peer_choosed = NULL;
655   connection_t peer_choked = NULL;
656   //remove a peer from the list
657   xbt_dict_cursor_t cursor = NULL;
658   xbt_dict_cursor_first(peer->active_peers, &cursor);
659   if (xbt_dict_length(peer->active_peers) > 0) {
660     key_choked = xbt_dict_cursor_get_key(cursor);
661     peer_choked = xbt_dict_cursor_get_data(cursor);
662   }
663   xbt_dict_cursor_free(&cursor);
664
665   /**
666    * If we are currently seeding, we unchoke the peer which has
667    * been unchoke the least time.
668    */
669   if (peer->pieces == FILE_PIECES) {
670     connection_t connection;
671     double unchoke_time = MSG_get_clock() + 1;
672
673     xbt_dict_foreach(peer->peers, cursor, key, connection) {
674       if (connection->last_unchoke < unchoke_time && connection->interested
675           && connection->choked_upload) {
676         unchoke_time = connection->last_unchoke;
677         peer_choosed = connection;
678       }
679     }
680   } else {
681     //Random optimistic unchoking
682     if (peer->round == 0) {
683       int j = 0;
684       do {
685         //We choose a random peer to unchoke.
686         int id_chosen = RngStream_RandInt(peer->stream, 0,
687                                           xbt_dict_length(peer->peers) - 1);
688         int i = 0;
689         connection_t connection;
690         xbt_dict_foreach(peer->peers, cursor, key, connection) {
691           if (i == id_chosen) {
692             peer_choosed = connection;
693             break;
694           }
695           i++;
696         }
697         xbt_dict_cursor_free(&cursor);
698         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
699           peer_choosed = NULL;
700         }
701         j++;
702       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
703     } else {
704       //Use the "fastest download" policy.
705       connection_t connection;
706       double fastest_speed = 0.0;
707       xbt_dict_foreach(peer->peers, cursor, key, connection) {
708         if (connection->peer_speed > fastest_speed
709             && connection->choked_upload && connection->interested) {
710           peer_choosed = connection;
711           fastest_speed = connection->peer_speed;
712         }
713       }
714     }
715
716   }
717   if (peer_choosed != NULL)
718     XBT_DEBUG
719         ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
720          peer->id, peer_choosed->id, peer_choosed->interested,
721          peer_choosed->choked_upload);
722
723   //  if (xbt_dict_size(peer->peers) > 0)
724   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
725   //        "No more active peers !");
726
727
728   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
729   //    peer_choked = NULL;
730   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
731   //    peer_choosed = NULL;
732
733   if (peer_choked != peer_choosed) {
734     if (peer_choked != NULL) {
735       xbt_assert((!peer_choked->choked_upload),
736                  "Tries to choked a choked peer");
737       peer_choked->choked_upload = 1;
738       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
739       update_active_peers_set(peer, peer_choked);
740       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
741       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
742       send_choked(peer, peer_choked->mailbox);
743     }
744     if (peer_choosed != NULL) {
745       xbt_assert((peer_choosed->choked_upload),
746                  "Tries to unchoked an unchoked peer");
747       peer_choosed->choked_upload = 0;
748       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
749                        sizeof(int), peer_choosed, NULL);
750       peer_choosed->last_unchoke = MSG_get_clock();
751       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
752       update_active_peers_set(peer, peer_choosed);
753       send_unchoked(peer, peer_choosed->mailbox);
754     }
755   }
756 }
757
758 /**
759  * Updates our "interested" state about peers: send "not interested" to peers
760  * that don't have any more pieces we want.
761  * @param peer our peer data
762  */
763 void update_interested_after_receive(peer_t peer)
764 {
765   char *key;
766   xbt_dict_cursor_t cursor;
767   connection_t connection;
768   unsigned cpt;
769   int interested, piece;
770   xbt_dict_foreach(peer->peers, cursor, key, connection) {
771     interested = 0;
772     if (connection->am_interested) {
773       xbt_assert(connection->bitfield, "Bitfield not received");
774       //Check if the peer still has a piece we want.
775       int i;
776       for (i = 0; i < FILE_PIECES; i++) {
777         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
778           interested = 1;
779           break;
780         }
781       }
782       if (!interested) {        //no more piece to download from connection
783         connection->am_interested = 0;
784         send_notinterested(peer, connection->mailbox);
785       }
786     }
787   }
788 }
789
790 void update_bitfield_blocks(peer_t peer, int index, int block_index,
791                             int block_length)
792 {
793   int i;
794   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
795   xbt_assert((block_index >= 0
796               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
797              block_index);
798   for (i = block_index; i < (block_index + block_length); i++) {
799     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
800   }
801 }
802
803 /**
804  * Returns if a peer has completed the download of a piece
805  */
806 int piece_complete(peer_t peer, int index)
807 {
808   int i;
809   for (i = 0; i < PIECES_BLOCKS; i++) {
810     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
811       return 0;
812     }
813   }
814   return 1;
815 }
816
817 /**
818  * Returns the first block that a peer doesn't have in a piece.
819  * If the peer has all blocks of the piece, returns -1.
820  */
821 int get_first_block(peer_t peer, int piece)
822 {
823   int i;
824   for (i = 0; i < PIECES_BLOCKS; i++) {
825     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
826       return i;
827     }
828   }
829   return -1;
830 }
831
832 /**
833  * Indicates if the remote peer has a piece not stored by the local peer
834  */
835 int is_interested(peer_t peer, connection_t remote_peer)
836 {
837   xbt_assert(remote_peer->bitfield, "Bitfield not received");
838   int i;
839   for (i = 0; i < FILE_PIECES; i++) {
840     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
841       return 1;
842     }
843   }
844   return 0;
845 }
846
847 /**
848  * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
849  */
850 int is_interested_and_free(peer_t peer, connection_t remote_peer)
851 {
852   xbt_assert(remote_peer->bitfield, "Bitfield not received");
853   int i;
854   for (i = 0; i < FILE_PIECES; i++) {
855     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
856         && !in_current_pieces(peer, i)) {
857       return 1;
858     }
859   }
860   return 0;
861 }
862
863
864 /**
865  * Returns a piece that is partially downloaded and stored by the remote peer if any
866  * -1 otherwise.
867  */
868 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
869 {
870   xbt_assert(remote_peer->bitfield, "Bitfield not received");
871   int i;
872   for (i = 0; i < FILE_PIECES; i++) {
873     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
874         && !in_current_pieces(peer, i)) {
875       if (get_first_block(peer, i) > 0)
876         return i;
877     }
878   }
879   return -1;
880 }
881
882
883 /**
884  * Send request messages to a peer that have unchoked us
885  * @param peer peer
886  * @param remote_peer peer data to the peer we want to send the request
887  */
888 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
889 {
890   remote_peer->current_piece = piece;
891   unsigned i;
892   int block_index, block_length;
893   xbt_assert(remote_peer->bitfield, "bitfield not received");
894   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
895   block_index = get_first_block(peer, piece);
896   if (block_index != -1) {
897     block_length = PIECES_BLOCKS - block_index;
898     block_length = min(BLOCKS_REQUESTED, block_length);
899     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
900   }
901 }
902
903
904 /**
905  * Indicates if a piece is currently being downloaded by the peer.
906  */
907 int in_current_pieces(peer_t peer, int piece)
908 {
909   unsigned i;
910   int peer_piece;
911   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
912     if (peer_piece == piece) {
913       return 1;
914     }
915   }
916   return 0;
917 }
918
919
920
921
922 /***********************************************************
923  *
924  *  Low level message functions
925  *
926  ***********************************************************/
927
928
929
930 /**
931  * Send a "interested" message to a peer
932  * @param peer peer data
933  * @param mailbox destination mailbox
934  */
935 void send_interested(peer_t peer, const char *mailbox)
936 {
937   msg_task_t task =
938       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
939                        peer->id, task_message_size(MESSAGE_INTERESTED));
940   MSG_task_dsend(task, mailbox, task_message_free);
941   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
942
943 }
944
945 /**
946  * Send a "not interested" message to a peer
947  * @param peer peer data
948  * @param mailbox destination mailbox
949  */
950 void send_notinterested(peer_t peer, const char *mailbox)
951 {
952   msg_task_t task =
953       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
954                        peer->id, task_message_size(MESSAGE_NOTINTERESTED));
955   MSG_task_dsend(task, mailbox, task_message_free);
956   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
957
958 }
959
960 /**
961  * Send a handshake message to all the peers the peer has.
962  * @param peer peer data
963  */
964 void send_handshake_all(peer_t peer)
965 {
966   connection_t remote_peer;
967   xbt_dict_cursor_t cursor = NULL;
968   char *key;
969   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
970     msg_task_t task =
971         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
972                          peer->id, task_message_size(MESSAGE_HANDSHAKE));
973     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
974     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
975   }
976 }
977
978 /**
979  * Send a "handshake" message to an user
980  * @param peer peer data
981  * @param mailbox mailbox where to we send the message
982  */
983 void send_handshake(peer_t peer, const char *mailbox)
984 {
985   msg_task_t task =
986       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
987                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
988   MSG_task_dsend(task, mailbox, task_message_free);
989   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
990 }
991
992 /**
993  * Send a "choked" message to a peer.
994  */
995 void send_choked(peer_t peer, const char *mailbox)
996 {
997   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
998   msg_task_t task =
999       task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1000                        peer->id, task_message_size(MESSAGE_CHOKE));
1001   MSG_task_dsend(task, mailbox, task_message_free);
1002 }
1003
1004 /**
1005  * Send a "unchoked" message to a peer
1006  */
1007 void send_unchoked(peer_t peer, const char *mailbox)
1008 {
1009   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1010   msg_task_t task =
1011       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1012                        peer->id, task_message_size(MESSAGE_UNCHOKE));
1013   MSG_task_dsend(task, mailbox, task_message_free);
1014 }
1015
1016 /**
1017  * Send a "HAVE" message to all peers we are connected to
1018  */
1019 void send_have(peer_t peer, int piece)
1020 {
1021   XBT_DEBUG("Sending HAVE message to all my peers");
1022   connection_t remote_peer;
1023   xbt_dict_cursor_t cursor = NULL;
1024   char *key;
1025   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1026     msg_task_t task =
1027         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1028                                peer->id, piece,
1029                                task_message_size(MESSAGE_HAVE));
1030     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1031   }
1032 }
1033
1034 /**
1035  * Send a bitfield message to all the peers the peer has.
1036  * @param peer peer data
1037  */
1038 void send_bitfield(peer_t peer, const char *mailbox)
1039 {
1040   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1041   msg_task_t task =
1042       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1043                                 peer->bitfield, FILE_PIECES);
1044   MSG_task_dsend(task, mailbox, task_message_free);
1045 }
1046
1047 /**
1048  * Send a "request" message to a pair, containing a request for a piece
1049  */
1050 void send_request(peer_t peer, const char *mailbox, int piece,
1051                   int block_index, int block_length)
1052 {
1053   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1054             block_index, block_length);
1055   msg_task_t task =
1056       task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1057                                block_index, block_length);
1058   MSG_task_dsend(task, mailbox, task_message_free);
1059 }
1060
1061 /**
1062  * Send a "piece" message to a pair, containing a piece of the file
1063  */
1064 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
1065                 int block_index, int block_length)
1066 {
1067   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1068             block_length, mailbox);
1069   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1070   xbt_assert((peer->bitfield[piece] == '1'),
1071              "Tried to send a piece that we doesn't have.");
1072   msg_task_t task =
1073       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1074                              stalled, block_index, block_length, BLOCK_SIZE);
1075   MSG_task_dsend(task, mailbox, task_message_free);
1076 }