Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26 #define ENABLE_END_GAME_MODE 1
27
28 /**
29  *  Number of blocks asked by each request
30  */
31 #define BLOCKS_REQUESTED 2
32
33
34 static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
35
36
37 #define SLEEP_DURATION 1
38
39   /**
40  * Peer main function
41  */
42 int peer(int argc, char *argv[])
43 {
44   s_peer_t peer;
45   //Check arguments
46   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
47   //Build peer object
48   if (argc == 4) {
49     peer_init(&peer, atoi(argv[1]), 1);
50   } else {
51     peer_init(&peer, atoi(argv[1]), 0);
52   }
53   //Retrieve deadline
54   double deadline = atof(argv[2]);
55   xbt_assert(deadline > 0, "Wrong deadline supplied");
56   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
57   //Getting peer data from the tracker.
58   if (get_peers_data(&peer)) {
59     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
60     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
61     peer.begin_receive_time = MSG_get_clock();
62     MSG_mailbox_set_async(peer.mailbox);
63     if (has_finished(peer.bitfield)) {
64       peer.pieces = FILE_PIECES;
65       send_handshake_all(&peer);
66       seed_loop(&peer, deadline);
67     } else {
68       leech_loop(&peer, deadline);
69       seed_loop(&peer, deadline);
70     }
71   } else {
72     XBT_INFO("Couldn't contact the tracker.");
73   }
74
75   XBT_INFO("Here is my current status: %s", peer.bitfield);
76   if (peer.comm_received) {
77     MSG_comm_destroy(peer.comm_received);
78   }
79
80   peer_free(&peer);
81   return 0;
82 }
83
84 /**
85  * Peer main loop when it is leeching.
86  * @param peer peer data
87  * @param deadline time at which the peer has to leave
88  */
89 void leech_loop(peer_t peer, double deadline)
90 {
91   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
92   XBT_DEBUG("Start downloading.");
93   /*
94    * Send a "handshake" message to all the peers it got
95    * (since it couldn't have gotten more than 50 peers)
96    */
97   send_handshake_all(peer);
98   //Wait for at least one "bitfield" message.
99 //  wait_for_pieces(peer, deadline);
100   XBT_DEBUG("Starting main leech loop");
101
102   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
103     if (peer->comm_received == NULL) {
104       peer->task_received = NULL;
105       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
106     }
107     if (MSG_comm_test(peer->comm_received)) {
108       msg_error_t status = MSG_comm_get_status(peer->comm_received);
109       MSG_comm_destroy(peer->comm_received);
110       peer->comm_received = NULL;
111       if (status == MSG_OK) {
112         handle_message(peer, peer->task_received);
113       }
114     } else {
115       //We don't execute the choke algorithm if we don't already have a piece
116       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
117         update_choked_peers(peer);
118         next_choked_update += UPDATE_CHOKED_INTERVAL;
119       } else {
120         MSG_process_sleep(SLEEP_DURATION);
121       }
122     }
123   }
124   if (peer->pieces == FILE_PIECES)
125     XBT_DEBUG("%d becomes a seeder", peer->id);
126
127 }
128
129 /**
130  * Peer main loop when it is seeding
131  * @param peer peer data
132  * @param deadline time when the peer will leave
133  */
134 void seed_loop(peer_t peer, double deadline)
135 {
136   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
137   XBT_DEBUG("Start seeding.");
138   //start the main seed loop
139   while (MSG_get_clock() < deadline) {
140     if (peer->comm_received == NULL) {
141       peer->task_received = NULL;
142       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
143     }
144     if (MSG_comm_test(peer->comm_received)) {
145       msg_error_t status = MSG_comm_get_status(peer->comm_received);
146       MSG_comm_destroy(peer->comm_received);
147       peer->comm_received = NULL;
148       if (status == MSG_OK) {
149         handle_message(peer, peer->task_received);
150       }
151     } else {
152       if (MSG_get_clock() >= next_choked_update) {
153         update_choked_peers(peer);
154         //TODO: Change the choked peer algorithm when seeding.
155         next_choked_update += UPDATE_CHOKED_INTERVAL;
156       } else {
157         MSG_process_sleep(SLEEP_DURATION);
158       }
159     }
160   }
161 }
162
163 /**
164  * Retrieves the peer list from the tracker
165  * @param peer current peer data
166  */
167 int get_peers_data(peer_t peer)
168 {
169   int success = 0, send_success = 0;
170   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
171   //Build the task to send to the tracker
172   tracker_task_data_t data =
173       tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
174                             peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
175   //Build the task to send.
176   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
177   msg_task_t task_received = NULL;
178   msg_comm_t comm_received;
179   while (!send_success && MSG_get_clock() < timeout) {
180     XBT_DEBUG("Sending a peer request to the tracker.");
181     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
182                                                     GET_PEERS_TIMEOUT);
183     if (status == MSG_OK) {
184       send_success = 1;
185     }
186   }
187   while (!success && MSG_get_clock() < timeout) {
188     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
189     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
190     if (status == MSG_OK) {
191       tracker_task_data_t data = MSG_task_get_data(task_received);
192       unsigned i;
193       int peer_id;
194       //Add the peers the tracker gave us to our peer list.
195       xbt_dynar_foreach(data->peers, i, peer_id) {
196         if (peer_id != peer->id)
197           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
198                            connection_new(peer_id), NULL);
199       }
200       success = 1;
201       //free the communication and the task
202       MSG_comm_destroy(comm_received);
203       tracker_task_data_free(data);
204       MSG_task_destroy(task_received);
205       comm_received = NULL;
206     }
207   }
208
209   return success;
210 }
211
212 /**
213  * Initialize the peer data.
214  * @param peer peer data
215  * @param id id of the peer to take in the network
216  * @param seed indicates if the peer is a seed.
217  */
218 void peer_init(peer_t peer, int id, int seed)
219 {
220   peer->id = id;
221   sprintf(peer->mailbox, "%d", id);
222   sprintf(peer->mailbox_tracker, "tracker_%d", id);
223   peer->peers = xbt_dict_new();
224   peer->active_peers = xbt_dict_new();
225   peer->hostname = MSG_host_get_name(MSG_host_self());
226
227   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
228   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
229   if (seed) {
230     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
231     memset(peer->bitfield_blocks, '1',
232            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
233   } else {
234     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
235     memset(peer->bitfield_blocks, '0',
236            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
237   }
238
239   peer->bitfield[FILE_PIECES] = '\0';
240   peer->pieces = 0;
241
242   peer->pieces_count = xbt_new0(short, FILE_PIECES);
243
244   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
245
246   peer->stream = RngStream_CreateStream("");
247   peer->comm_received = NULL;
248
249   peer->round = 0;
250
251 }
252
253 /**
254  * Destroys a poor peer object.
255  */
256 void peer_free(peer_t peer)
257 {
258   char *key;
259   connection_t connection;
260   xbt_dict_cursor_t cursor;
261   xbt_dict_foreach(peer->peers, cursor, key, connection) {
262     connection_free(connection);
263   }
264   xbt_dict_free(&peer->peers);
265   xbt_dict_free(&peer->active_peers);
266   xbt_dynar_free(&peer->current_pieces);
267   xbt_free(peer->pieces_count);
268   xbt_free(peer->bitfield);
269   xbt_free(peer->bitfield_blocks);
270
271   RngStream_DeleteStream(&peer->stream);
272 }
273
274 /**
275  * Returns if a peer has finished downloading the file
276  * @param bitfield peer bitfield
277  */
278 int has_finished(char *bitfield)
279 {
280   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
281 }
282
283 int nb_interested_peers(peer_t peer)
284 {
285   xbt_dict_cursor_t cursor = NULL;
286   char *key;
287   connection_t connection;
288   int nb = 0;
289   xbt_dict_foreach(peer->peers, cursor, key, connection) {
290     if (connection->interested)
291       nb++;
292   }
293   return nb;
294 }
295
296
297 void update_active_peers_set(peer_t peer, connection_t remote_peer)
298 {
299
300   if (remote_peer->interested && !remote_peer->choked_upload) {
301     //add in the active peers set
302     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
303                      sizeof(int), remote_peer, NULL);
304   } else {
305     //remove
306     xbt_ex_t e;
307     TRY {
308       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
309                           sizeof(int));
310     }
311     CATCH(e) {
312       xbt_ex_free(e);
313     }
314   }
315 }
316
317
318 /**
319  * Handle a received message sent by another peer
320  * @param peer Peer data
321  * @param task task received.
322  */
323 void handle_message(peer_t peer, msg_task_t task)
324 {
325   message_t message = MSG_task_get_data(task);
326   connection_t remote_peer;
327   remote_peer =
328       xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
329                                sizeof(int));
330   switch (message->type) {
331
332   case MESSAGE_HANDSHAKE:
333     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
334               message->issuer_host_name);
335     //Check if the peer is in our connection list.
336     if (!remote_peer) {
337       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
338                        connection_new(message->peer_id), NULL);
339       send_handshake(peer, message->mailbox);
340     }
341     //Send our bitfield to the peer
342     send_bitfield(peer, message->mailbox);
343     break;
344   case MESSAGE_BITFIELD:
345     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
346               message->issuer_host_name);
347     //Update the pieces list
348     update_pieces_count_from_bitfield(peer, message->bitfield);
349     //Store the bitfield
350     remote_peer->bitfield = xbt_strdup(message->bitfield);
351     xbt_assert(!remote_peer->am_interested,
352                "Should not be interested at first");
353     if (is_interested(peer, remote_peer)) {
354       remote_peer->am_interested = 1;
355       send_interested(peer, message->mailbox);
356     }
357     break;
358   case MESSAGE_INTERESTED:
359     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
360               message->issuer_host_name);
361     xbt_assert((remote_peer != NULL),
362                "A non-in-our-list peer has sent us a message. WTH ?");
363     //Update the interested state of the peer.
364     remote_peer->interested = 1;
365     update_active_peers_set(peer, remote_peer);
366     break;
367   case MESSAGE_NOTINTERESTED:
368     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
369               message->mailbox, message->issuer_host_name);
370     xbt_assert((remote_peer != NULL),
371                "A non-in-our-list peer has sent us a message. WTH ?");
372     remote_peer->interested = 0;
373     update_active_peers_set(peer, remote_peer);
374     break;
375   case MESSAGE_UNCHOKE:
376     xbt_assert((remote_peer != NULL),
377                "A non-in-our-list peer has sent us a message. WTH ?");
378     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
379               message->issuer_host_name);
380     xbt_assert(remote_peer->choked_download, "WTF !!!");
381     remote_peer->choked_download = 0;
382     //Send requests to the peer, since it has unchoked us
383     if (remote_peer->am_interested)
384       request_new_piece_to_peer(peer, remote_peer);
385     break;
386   case MESSAGE_CHOKE:
387     xbt_assert((remote_peer != NULL),
388                "A non-in-our-list peer has sent us a message. WTH ?");
389     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
390               message->issuer_host_name);
391     xbt_assert(!remote_peer->choked_download, "WTF !!!");
392     remote_peer->choked_download = 1;
393     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
394     break;
395   case MESSAGE_HAVE:
396     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
397               message->mailbox, message->issuer_host_name, message->index);
398     xbt_assert(remote_peer->bitfield, "bitfield not received");
399     xbt_assert((message->index >= 0
400                 && message->index < FILE_PIECES),
401                "Wrong HAVE message received");
402     remote_peer->bitfield[message->index] = '1';
403     peer->pieces_count[message->index]++;
404     //If the piece is in our pieces, we tell the peer that we are interested.
405     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
406       remote_peer->am_interested = 1;
407       send_interested(peer, message->mailbox);
408       if (!remote_peer->choked_download)
409         request_new_piece_to_peer(peer, remote_peer);
410     }
411     break;
412   case MESSAGE_REQUEST:
413     xbt_assert(remote_peer->interested, "WTF !!!");
414
415     xbt_assert((message->index >= 0
416                 && message->index < FILE_PIECES), "Wrong request received");
417     if (!remote_peer->choked_upload) {
418       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
419                 message->mailbox, message->issuer_host_name, message->index,
420                 message->block_index,
421                 message->block_index + message->block_length);
422       if (peer->bitfield[message->index] == '1') {
423         send_piece(peer, message->mailbox, message->index,
424                    message->block_index, message->block_length);
425       }
426     } else {
427       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
428                 message->mailbox, message->issuer_host_name, message->peer_id);
429     }
430     break;
431   case MESSAGE_PIECE:
432     XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
433               message->block_index,
434               message->block_index + message->block_length,
435               message->mailbox, message->issuer_host_name);
436     xbt_assert(!remote_peer->choked_download, "WTF !!!");
437     xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
438     xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
439     xbt_assert((message->index >= 0
440                 && message->index < FILE_PIECES), "Wrong piece received");
441     //TODO: Execute Ã  computation.
442       if (peer->bitfield[message->index] == '0') {
443         update_bitfield_blocks(peer, message->index, message->block_index,
444                                message->block_length);
445         if (piece_complete(peer, message->index)) {
446           //Removing the piece from our piece list
447           remove_current_piece(peer, remote_peer, message->index);
448           //Setting the fact that we have the piece
449           peer->bitfield[message->index] = '1';
450           peer->pieces++;
451           XBT_DEBUG("My status is now %s", peer->bitfield);
452           //Sending the information to all the peers we are connected to
453           send_have(peer, message->index);
454           //sending UNINTERSTED to peers that doesn't have what we want.
455           update_interested_after_receive(peer);
456         } else {                // piece not completed
457           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
458         }
459       } else {
460         XBT_DEBUG("However, we already have it");
461         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
462         request_new_piece_to_peer(peer, remote_peer);
463       }
464     break;
465   case MESSAGE_CANCEL:
466     XBT_DEBUG("The received CANCEL from %s (%s)",
467               message->mailbox, message->issuer_host_name);
468     break;
469   }
470   //Update the peer speed.
471   if (remote_peer) {
472     connection_add_speed_value(remote_peer,
473                                1.0 / (MSG_get_clock() -
474                                       peer->begin_receive_time));
475   }
476   peer->begin_receive_time = MSG_get_clock();
477
478   task_message_free(task);
479 }
480
481 /**
482  * Selects the appropriate piece to download and requests it to the remote_peer
483  */
484 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
485 {
486   int piece = select_piece_to_download(peer, remote_peer);
487   if (piece != -1) {
488     xbt_dynar_push_as(peer->current_pieces, int, piece);
489     send_request_to_peer(peer, remote_peer, piece);
490   }
491 }
492
493 /**
494  * remove current_piece from the list of currently downloaded pieces.
495  */
496 void remove_current_piece(peer_t peer, connection_t remote_peer,
497                           int current_piece)
498 {
499   int piece_index = -1, piece;
500   unsigned int i;
501   xbt_dynar_foreach(peer->current_pieces, i, piece) {
502     if (piece == current_piece) {
503       piece_index = i;
504       break;
505     }
506   }
507   if (piece_index != -1)
508     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
509   remote_peer->current_piece = -1;
510 }
511
512 /**
513  * Updates the list of who has a piece from a bitfield
514  * @param peer peer we want to update the list
515  * @param bitfield bitfield
516  */
517 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
518 {
519   int i;
520   for (i = 0; i < FILE_PIECES; i++) {
521     if (bitfield[i] == '1') {
522       peer->pieces_count[i]++;
523     }
524   }
525 }
526
527
528
529 /**
530  * Return the piece to be downloaded
531  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
532  * If a piece is partially downloaded, this piece will be selected prioritarily
533  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
534  * If the peer has more than pieces, he downloads the pieces that are the less
535  * replicated (rarest policy).
536  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
537  * @param peer: local peer
538  * @param remote_peer: information about the connection
539  * @return the piece to download if possible. -1 otherwise
540  */
541 int select_piece_to_download(peer_t peer, connection_t remote_peer)
542 {
543   int piece = -1;
544
545   piece = partially_downloaded_piece(peer, remote_peer);
546   // strict priority policy
547   if (piece != -1)
548     return piece;
549
550   // end game mode
551   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
552       && is_interested(peer, remote_peer)) {
553     if(!ENABLE_END_GAME_MODE)
554       return -1;
555     int i;
556     int nb_interesting_pieces = 0;
557     int random_piece_index, current_index = 0;
558     // compute the number of interesting pieces
559     for (i = 0; i < FILE_PIECES; i++) {
560       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
561         nb_interesting_pieces++;
562       }
563     }
564     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
565     // get a random interesting piece
566     random_piece_index =
567         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
568     for (i = 0; i < FILE_PIECES; i++) {
569       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
570         if (random_piece_index == current_index) {
571           piece = i;
572           break;
573         }
574         current_index++;
575       }
576     }
577     xbt_assert(piece != -1, "WTF !!!");
578     return piece;
579   }
580   // Random first policy
581   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
582     int i;
583     int nb_interesting_pieces = 0;
584     int random_piece_index, current_index = 0;
585     // compute the number of interesting pieces
586     for (i = 0; i < FILE_PIECES; i++) {
587       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
588           && !in_current_pieces(peer, i)) {
589         nb_interesting_pieces++;
590       }
591     }
592     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
593     // get a random interesting piece
594     random_piece_index =
595         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
596     for (i = 0; i < FILE_PIECES; i++) {
597       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
598           && !in_current_pieces(peer, i)) {
599         if (random_piece_index == current_index) {
600           piece = i;
601           break;
602         }
603         current_index++;
604       }
605     }
606     xbt_assert(piece != -1, "WTF !!!");
607     return piece;
608   } else {                      // Rarest first policy
609     int i;
610     short min = SHRT_MAX;
611     int nb_min_pieces = 0;
612     int random_rarest_index, current_index = 0;
613     // compute the smallest number of copies of available pieces
614     for (i = 0; i < FILE_PIECES; i++) {
615       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
616           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
617         min = peer->pieces_count[i];
618     }
619     xbt_assert(min != SHRT_MAX
620                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
621     // compute the number of rarest pieces
622     for (i = 0; i < FILE_PIECES; i++) {
623       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
624           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
625         nb_min_pieces++;
626     }
627     xbt_assert(nb_min_pieces != 0
628                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
629     // get a random rarest piece
630     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
631     for (i = 0; i < FILE_PIECES; i++) {
632       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
633           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
634         if (random_rarest_index == current_index) {
635           piece = i;
636           break;
637         }
638         current_index++;
639       }
640     }
641     xbt_assert(piece != -1
642                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
643     return piece;
644   }
645 }
646
647
648 /**
649  * Update the list of current choked and unchoked peers, using the
650  * choke algorithm
651  * @param peer the current peer
652  */
653 void update_choked_peers(peer_t peer)
654 {
655   if (nb_interested_peers(peer) == 0)
656     return;
657   //  if(xbt_dict_size(peer->active_peers) > 0)
658   //    return;
659   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
660             xbt_dict_size(peer->active_peers));
661   //update the current round
662   peer->round = (peer->round + 1) % 3;
663   char *key, *key_choked;
664   connection_t peer_choosed = NULL;
665   connection_t peer_choked = NULL;
666   //remove a peer from the list
667   xbt_dict_cursor_t cursor = NULL;
668   xbt_dict_cursor_first(peer->active_peers, &cursor);
669   if (xbt_dict_length(peer->active_peers) > 0) {
670     key_choked = xbt_dict_cursor_get_key(cursor);
671     peer_choked = xbt_dict_cursor_get_data(cursor);
672   }
673   xbt_dict_cursor_free(&cursor);
674
675   /**
676    * If we are currently seeding, we unchoke the peer which has
677    * been unchoke the least time.
678    */
679   if (peer->pieces == FILE_PIECES) {
680     connection_t connection;
681     double unchoke_time = MSG_get_clock() + 1;
682
683     xbt_dict_foreach(peer->peers, cursor, key, connection) {
684       if (connection->last_unchoke < unchoke_time && connection->interested
685           && connection->choked_upload) {
686         unchoke_time = connection->last_unchoke;
687         peer_choosed = connection;
688       }
689     }
690   } else {
691     //Random optimistic unchoking
692     if (peer->round == 0) {
693       int j = 0;
694       do {
695         //We choose a random peer to unchoke.
696         int id_chosen = RngStream_RandInt(peer->stream, 0,
697                                           xbt_dict_length(peer->peers) - 1);
698         int i = 0;
699         connection_t connection;
700         xbt_dict_foreach(peer->peers, cursor, key, connection) {
701           if (i == id_chosen) {
702             peer_choosed = connection;
703             break;
704           }
705           i++;
706         }
707         xbt_dict_cursor_free(&cursor);
708         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
709           peer_choosed = NULL;
710         }
711         j++;
712       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
713     } else {
714       //Use the "fastest download" policy.
715       connection_t connection;
716       double fastest_speed = 0.0;
717       xbt_dict_foreach(peer->peers, cursor, key, connection) {
718         if (connection->peer_speed > fastest_speed
719             && connection->choked_upload && connection->interested) {
720           peer_choosed = connection;
721           fastest_speed = connection->peer_speed;
722         }
723       }
724     }
725
726   }
727   if (peer_choosed != NULL)
728     XBT_DEBUG
729         ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
730          peer->id, peer_choosed->id, peer_choosed->interested,
731          peer_choosed->choked_upload);
732
733   //  if (xbt_dict_size(peer->peers) > 0)
734   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
735   //        "No more active peers !");
736
737
738   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
739   //    peer_choked = NULL;
740   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
741   //    peer_choosed = NULL;
742
743   if (peer_choked != peer_choosed) {
744     if (peer_choked != NULL) {
745       xbt_assert((!peer_choked->choked_upload),
746                  "Tries to choked a choked peer");
747       peer_choked->choked_upload = 1;
748       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
749       update_active_peers_set(peer, peer_choked);
750       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
751       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
752       send_choked(peer, peer_choked->mailbox);
753     }
754     if (peer_choosed != NULL) {
755       xbt_assert((peer_choosed->choked_upload),
756                  "Tries to unchoked an unchoked peer");
757       peer_choosed->choked_upload = 0;
758       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
759                        sizeof(int), peer_choosed, NULL);
760       peer_choosed->last_unchoke = MSG_get_clock();
761       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
762       update_active_peers_set(peer, peer_choosed);
763       send_unchoked(peer, peer_choosed->mailbox);
764     }
765   }
766 }
767
768 /**
769  * Updates our "interested" state about peers: send "not interested" to peers
770  * that don't have any more pieces we want.
771  * @param peer our peer data
772  */
773 void update_interested_after_receive(peer_t peer)
774 {
775   char *key;
776   xbt_dict_cursor_t cursor;
777   connection_t connection;
778   int interested;
779   xbt_dict_foreach(peer->peers, cursor, key, connection) {
780     interested = 0;
781     if (connection->am_interested) {
782       xbt_assert(connection->bitfield, "Bitfield not received");
783       //Check if the peer still has a piece we want.
784       int i;
785       for (i = 0; i < FILE_PIECES; i++) {
786         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
787           interested = 1;
788           break;
789         }
790       }
791       if (!interested) {        //no more piece to download from connection
792         connection->am_interested = 0;
793         send_notinterested(peer, connection->mailbox);
794       }
795     }
796   }
797 }
798
799 void update_bitfield_blocks(peer_t peer, int index, int block_index,
800                             int block_length)
801 {
802   int i;
803   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
804   xbt_assert((block_index >= 0
805               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
806              block_index);
807   for (i = block_index; i < (block_index + block_length); i++) {
808     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
809   }
810 }
811
812 /**
813  * Returns if a peer has completed the download of a piece
814  */
815 int piece_complete(peer_t peer, int index)
816 {
817   int i;
818   for (i = 0; i < PIECES_BLOCKS; i++) {
819     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
820       return 0;
821     }
822   }
823   return 1;
824 }
825
826 /**
827  * Returns the first block that a peer doesn't have in a piece.
828  * If the peer has all blocks of the piece, returns -1.
829  */
830 int get_first_block(peer_t peer, int piece)
831 {
832   int i;
833   for (i = 0; i < PIECES_BLOCKS; i++) {
834     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
835       return i;
836     }
837   }
838   return -1;
839 }
840
841 /**
842  * Indicates if the remote peer has a piece not stored by the local peer
843  */
844 int is_interested(peer_t peer, connection_t remote_peer)
845 {
846   xbt_assert(remote_peer->bitfield, "Bitfield not received");
847   int i;
848   for (i = 0; i < FILE_PIECES; i++) {
849     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
850       return 1;
851     }
852   }
853   return 0;
854 }
855
856 /**
857  * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
858  */
859 int is_interested_and_free(peer_t peer, connection_t remote_peer)
860 {
861   xbt_assert(remote_peer->bitfield, "Bitfield not received");
862   int i;
863   for (i = 0; i < FILE_PIECES; i++) {
864     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
865         && !in_current_pieces(peer, i)) {
866       return 1;
867     }
868   }
869   return 0;
870 }
871
872
873 /**
874  * Returns a piece that is partially downloaded and stored by the remote peer if any
875  * -1 otherwise.
876  */
877 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
878 {
879   xbt_assert(remote_peer->bitfield, "Bitfield not received");
880   int i;
881   for (i = 0; i < FILE_PIECES; i++) {
882     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
883         && !in_current_pieces(peer, i)) {
884       if (get_first_block(peer, i) > 0)
885         return i;
886     }
887   }
888   return -1;
889 }
890
891
892 /**
893  * Send request messages to a peer that have unchoked us
894  * @param peer peer
895  * @param remote_peer peer data to the peer we want to send the request
896  */
897 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
898 {
899   remote_peer->current_piece = piece;
900   int block_index, block_length;
901   xbt_assert(remote_peer->bitfield, "bitfield not received");
902   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
903   block_index = get_first_block(peer, piece);
904   if (block_index != -1) {
905     block_length = PIECES_BLOCKS - block_index;
906     block_length = min(BLOCKS_REQUESTED, block_length);
907     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
908   }
909 }
910
911
912 /**
913  * Indicates if a piece is currently being downloaded by the peer.
914  */
915 int in_current_pieces(peer_t peer, int piece)
916 {
917   unsigned i;
918   int peer_piece;
919   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
920     if (peer_piece == piece) {
921       return 1;
922     }
923   }
924   return 0;
925 }
926
927
928
929
930 /***********************************************************
931  *
932  *  Low level message functions
933  *
934  ***********************************************************/
935
936
937
938 /**
939  * Send a "interested" message to a peer
940  * @param peer peer data
941  * @param mailbox destination mailbox
942  */
943 void send_interested(peer_t peer, const char *mailbox)
944 {
945   msg_task_t task =
946       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
947                        peer->id, task_message_size(MESSAGE_INTERESTED));
948   MSG_task_dsend(task, mailbox, task_message_free);
949   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
950
951 }
952
953 /**
954  * Send a "not interested" message to a peer
955  * @param peer peer data
956  * @param mailbox destination mailbox
957  */
958 void send_notinterested(peer_t peer, const char *mailbox)
959 {
960   msg_task_t task =
961       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
962                        peer->id, task_message_size(MESSAGE_NOTINTERESTED));
963   MSG_task_dsend(task, mailbox, task_message_free);
964   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
965
966 }
967
968 /**
969  * Send a handshake message to all the peers the peer has.
970  * @param peer peer data
971  */
972 void send_handshake_all(peer_t peer)
973 {
974   connection_t remote_peer;
975   xbt_dict_cursor_t cursor = NULL;
976   char *key;
977   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
978     msg_task_t task =
979         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
980                          peer->id, task_message_size(MESSAGE_HANDSHAKE));
981     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
982     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
983   }
984 }
985
986 /**
987  * Send a "handshake" message to an user
988  * @param peer peer data
989  * @param mailbox mailbox where to we send the message
990  */
991 void send_handshake(peer_t peer, const char *mailbox)
992 {
993   msg_task_t task =
994       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
995                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
996   MSG_task_dsend(task, mailbox, task_message_free);
997   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
998 }
999
1000 /**
1001  * Send a "choked" message to a peer.
1002  */
1003 void send_choked(peer_t peer, const char *mailbox)
1004 {
1005   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
1006   msg_task_t task =
1007       task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1008                        peer->id, task_message_size(MESSAGE_CHOKE));
1009   MSG_task_dsend(task, mailbox, task_message_free);
1010 }
1011
1012 /**
1013  * Send a "unchoked" message to a peer
1014  */
1015 void send_unchoked(peer_t peer, const char *mailbox)
1016 {
1017   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1018   msg_task_t task =
1019       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1020                        peer->id, task_message_size(MESSAGE_UNCHOKE));
1021   MSG_task_dsend(task, mailbox, task_message_free);
1022 }
1023
1024 /**
1025  * Send a "HAVE" message to all peers we are connected to
1026  */
1027 void send_have(peer_t peer, int piece)
1028 {
1029   XBT_DEBUG("Sending HAVE message to all my peers");
1030   connection_t remote_peer;
1031   xbt_dict_cursor_t cursor = NULL;
1032   char *key;
1033   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1034     msg_task_t task =
1035         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1036                                peer->id, piece,
1037                                task_message_size(MESSAGE_HAVE));
1038     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1039   }
1040 }
1041
1042 /**
1043  * Send a bitfield message to all the peers the peer has.
1044  * @param peer peer data
1045  */
1046 void send_bitfield(peer_t peer, const char *mailbox)
1047 {
1048   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1049   msg_task_t task =
1050       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1051                                 peer->bitfield, FILE_PIECES);
1052   MSG_task_dsend(task, mailbox, task_message_free);
1053 }
1054
1055 /**
1056  * Send a "request" message to a pair, containing a request for a piece
1057  */
1058 void send_request(peer_t peer, const char *mailbox, int piece,
1059                   int block_index, int block_length)
1060 {
1061   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1062             block_index, block_length);
1063   msg_task_t task =
1064       task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1065                                block_index, block_length);
1066   MSG_task_dsend(task, mailbox, task_message_free);
1067 }
1068
1069 /**
1070  * Send a "piece" message to a pair, containing a piece of the file
1071  */
1072 void send_piece(peer_t peer, const char *mailbox, int piece,
1073                 int block_index, int block_length)
1074 {
1075   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1076             block_length, mailbox);
1077   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1078   xbt_assert((peer->bitfield[piece] == '1'),
1079              "Tried to send a piece that we doesn't have.");
1080   msg_task_t task =
1081       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1082                              block_index, block_length, BLOCK_SIZE);
1083   MSG_task_dsend(task, mailbox, task_message_free);
1084 }