Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Dig through git history, and update copyright lines.
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26 #define ENABLE_END_GAME_MODE 1
27
28 /**
29  *  Number of blocks asked by each request
30  */
31 #define BLOCKS_REQUESTED 2
32
33
34 static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
35
36
37 #define SLEEP_DURATION 1
38
39   /**
40  * Peer main function
41  */
42 int peer(int argc, char *argv[])
43 {
44   s_peer_t peer;
45   //Check arguments
46   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
47   //Build peer object
48   if (argc == 4) {
49     peer_init(&peer, atoi(argv[1]), 1);
50   } else {
51     peer_init(&peer, atoi(argv[1]), 0);
52   }
53   //Retrieve deadline
54   double deadline = atof(argv[2]);
55   xbt_assert(deadline > 0, "Wrong deadline supplied");
56   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
57   //Getting peer data from the tracker.
58   if (get_peers_data(&peer)) {
59     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
60     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
61     peer.begin_receive_time = MSG_get_clock();
62     MSG_mailbox_set_async(peer.mailbox);
63     if (has_finished(peer.bitfield)) {
64       peer.pieces = FILE_PIECES;
65       send_handshake_all(&peer);
66       seed_loop(&peer, deadline);
67     } else {
68       leech_loop(&peer, deadline);
69       seed_loop(&peer, deadline);
70     }
71   } else {
72     XBT_INFO("Couldn't contact the tracker.");
73   }
74
75   XBT_INFO("Here is my current status: %s", peer.bitfield);
76   if (peer.comm_received) {
77     MSG_comm_destroy(peer.comm_received);
78   }
79
80   peer_free(&peer);
81   return 0;
82 }
83
84 /**
85  * Peer main loop when it is leeching.
86  * @param peer peer data
87  * @param deadline time at which the peer has to leave
88  */
89 void leech_loop(peer_t peer, double deadline)
90 {
91   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
92   XBT_DEBUG("Start downloading.");
93   /*
94    * Send a "handshake" message to all the peers it got
95    * (since it couldn't have gotten more than 50 peers)
96    */
97   send_handshake_all(peer);
98   XBT_DEBUG("Starting main leech loop");
99
100   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
101     if (peer->comm_received == NULL) {
102       peer->task_received = NULL;
103       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
104     }
105     if (MSG_comm_test(peer->comm_received)) {
106       msg_error_t status = MSG_comm_get_status(peer->comm_received);
107       MSG_comm_destroy(peer->comm_received);
108       peer->comm_received = NULL;
109       if (status == MSG_OK) {
110         handle_message(peer, peer->task_received);
111       }
112     } else {
113       //We don't execute the choke algorithm if we don't already have a piece
114       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
115         update_choked_peers(peer);
116         next_choked_update += UPDATE_CHOKED_INTERVAL;
117       } else {
118         MSG_process_sleep(SLEEP_DURATION);
119       }
120     }
121   }
122   if (peer->pieces == FILE_PIECES)
123     XBT_DEBUG("%d becomes a seeder", peer->id);
124
125 }
126
127 /**
128  * Peer main loop when it is seeding
129  * @param peer peer data
130  * @param deadline time when the peer will leave
131  */
132 void seed_loop(peer_t peer, double deadline)
133 {
134   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
135   XBT_DEBUG("Start seeding.");
136   //start the main seed loop
137   while (MSG_get_clock() < deadline) {
138     if (peer->comm_received == NULL) {
139       peer->task_received = NULL;
140       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
141     }
142     if (MSG_comm_test(peer->comm_received)) {
143       msg_error_t status = MSG_comm_get_status(peer->comm_received);
144       MSG_comm_destroy(peer->comm_received);
145       peer->comm_received = NULL;
146       if (status == MSG_OK) {
147         handle_message(peer, peer->task_received);
148       }
149     } else {
150       if (MSG_get_clock() >= next_choked_update) {
151         update_choked_peers(peer);
152         //TODO: Change the choked peer algorithm when seeding.
153         next_choked_update += UPDATE_CHOKED_INTERVAL;
154       } else {
155         MSG_process_sleep(SLEEP_DURATION);
156       }
157     }
158   }
159 }
160
161 /**
162  * Retrieves the peer list from the tracker
163  * @param peer current peer data
164  */
165 int get_peers_data(peer_t peer)
166 {
167   int success = 0, send_success = 0;
168   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
169   //Build the task to send to the tracker
170   tracker_task_data_t data =
171       tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
172                             peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
173   //Build the task to send.
174   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
175   msg_task_t task_received = NULL;
176   msg_comm_t comm_received;
177   while (!send_success && MSG_get_clock() < timeout) {
178     XBT_DEBUG("Sending a peer request to the tracker.");
179     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
180                                                     GET_PEERS_TIMEOUT);
181     if (status == MSG_OK) {
182       send_success = 1;
183     }
184   }
185   while (!success && MSG_get_clock() < timeout) {
186     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
187     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
188     if (status == MSG_OK) {
189       tracker_task_data_t data = MSG_task_get_data(task_received);
190       unsigned i;
191       int peer_id;
192       //Add the peers the tracker gave us to our peer list.
193       xbt_dynar_foreach(data->peers, i, peer_id) {
194         if (peer_id != peer->id)
195           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
196                            connection_new(peer_id), NULL);
197       }
198       success = 1;
199       //free the communication and the task
200       MSG_comm_destroy(comm_received);
201       tracker_task_data_free(data);
202       MSG_task_destroy(task_received);
203       comm_received = NULL;
204     }
205   }
206
207   return success;
208 }
209
210 /**
211  * Initialize the peer data.
212  * @param peer peer data
213  * @param id id of the peer to take in the network
214  * @param seed indicates if the peer is a seed.
215  */
216 void peer_init(peer_t peer, int id, int seed)
217 {
218   peer->id = id;
219   sprintf(peer->mailbox, "%d", id);
220   sprintf(peer->mailbox_tracker, "tracker_%d", id);
221   peer->peers = xbt_dict_new();
222   peer->active_peers = xbt_dict_new();
223   peer->hostname = MSG_host_get_name(MSG_host_self());
224
225   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
226   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
227   if (seed) {
228     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
229     memset(peer->bitfield_blocks, '1',
230            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
231   } else {
232     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
233     memset(peer->bitfield_blocks, '0',
234            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
235   }
236
237   peer->bitfield[FILE_PIECES] = '\0';
238   peer->pieces = 0;
239
240   peer->pieces_count = xbt_new0(short, FILE_PIECES);
241
242   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
243
244   peer->stream = RngStream_CreateStream("");
245   peer->comm_received = NULL;
246
247   peer->round = 0;
248
249 }
250
251 /**
252  * Destroys a poor peer object.
253  */
254 void peer_free(peer_t peer)
255 {
256   char *key;
257   connection_t connection;
258   xbt_dict_cursor_t cursor;
259   xbt_dict_foreach(peer->peers, cursor, key, connection) {
260     connection_free(connection);
261   }
262   xbt_dict_free(&peer->peers);
263   xbt_dict_free(&peer->active_peers);
264   xbt_dynar_free(&peer->current_pieces);
265   xbt_free(peer->pieces_count);
266   xbt_free(peer->bitfield);
267   xbt_free(peer->bitfield_blocks);
268
269   RngStream_DeleteStream(&peer->stream);
270 }
271
272 /**
273  * Returns if a peer has finished downloading the file
274  * @param bitfield peer bitfield
275  */
276 int has_finished(char *bitfield)
277 {
278   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
279 }
280
281 int nb_interested_peers(peer_t peer)
282 {
283   xbt_dict_cursor_t cursor = NULL;
284   char *key;
285   connection_t connection;
286   int nb = 0;
287   xbt_dict_foreach(peer->peers, cursor, key, connection) {
288     if (connection->interested)
289       nb++;
290   }
291   return nb;
292 }
293
294
295 void update_active_peers_set(peer_t peer, connection_t remote_peer)
296 {
297
298   if (remote_peer->interested && !remote_peer->choked_upload) {
299     //add in the active peers set
300     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
301                      sizeof(int), remote_peer, NULL);
302   } else {
303     //remove
304     xbt_ex_t e;
305     TRY {
306       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
307                           sizeof(int));
308     }
309     CATCH(e) {
310       xbt_ex_free(e);
311     }
312   }
313 }
314
315
316 /**
317  * Handle a received message sent by another peer
318  * @param peer Peer data
319  * @param task task received.
320  */
321 void handle_message(peer_t peer, msg_task_t task)
322 {
323   message_t message = MSG_task_get_data(task);
324   connection_t remote_peer;
325   remote_peer =
326       xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
327                                sizeof(int));
328   switch (message->type) {
329
330   case MESSAGE_HANDSHAKE:
331     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
332               message->issuer_host_name);
333     //Check if the peer is in our connection list.
334     if (!remote_peer) {
335       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
336                        connection_new(message->peer_id), NULL);
337       send_handshake(peer, message->mailbox);
338     }
339     //Send our bitfield to the peer
340     send_bitfield(peer, message->mailbox);
341     break;
342   case MESSAGE_BITFIELD:
343     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
344               message->issuer_host_name);
345     //Update the pieces list
346     update_pieces_count_from_bitfield(peer, message->bitfield);
347     //Store the bitfield
348     remote_peer->bitfield = xbt_strdup(message->bitfield);
349     xbt_assert(!remote_peer->am_interested,
350                "Should not be interested at first");
351     if (is_interested(peer, remote_peer)) {
352       remote_peer->am_interested = 1;
353       send_interested(peer, message->mailbox);
354     }
355     break;
356   case MESSAGE_INTERESTED:
357     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
358               message->issuer_host_name);
359     xbt_assert((remote_peer != NULL),
360                "A non-in-our-list peer has sent us a message. WTH ?");
361     //Update the interested state of the peer.
362     remote_peer->interested = 1;
363     update_active_peers_set(peer, remote_peer);
364     break;
365   case MESSAGE_NOTINTERESTED:
366     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
367               message->mailbox, message->issuer_host_name);
368     xbt_assert((remote_peer != NULL),
369                "A non-in-our-list peer has sent us a message. WTH ?");
370     remote_peer->interested = 0;
371     update_active_peers_set(peer, remote_peer);
372     break;
373   case MESSAGE_UNCHOKE:
374     xbt_assert((remote_peer != NULL),
375                "A non-in-our-list peer has sent us a message. WTH ?");
376     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
377               message->issuer_host_name);
378     xbt_assert(remote_peer->choked_download, "WTF !!!");
379     remote_peer->choked_download = 0;
380     //Send requests to the peer, since it has unchoked us
381     if (remote_peer->am_interested)
382       request_new_piece_to_peer(peer, remote_peer);
383     break;
384   case MESSAGE_CHOKE:
385     xbt_assert((remote_peer != NULL),
386                "A non-in-our-list peer has sent us a message. WTH ?");
387     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
388               message->issuer_host_name);
389     xbt_assert(!remote_peer->choked_download, "WTF !!!");
390     remote_peer->choked_download = 1;
391     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
392     break;
393   case MESSAGE_HAVE:
394     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
395               message->mailbox, message->issuer_host_name, message->index);
396     xbt_assert(remote_peer->bitfield, "bitfield not received");
397     xbt_assert((message->index >= 0
398                 && message->index < FILE_PIECES),
399                "Wrong HAVE message received");
400     remote_peer->bitfield[message->index] = '1';
401     peer->pieces_count[message->index]++;
402     //If the piece is in our pieces, we tell the peer that we are interested.
403     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
404       remote_peer->am_interested = 1;
405       send_interested(peer, message->mailbox);
406       if (!remote_peer->choked_download)
407         request_new_piece_to_peer(peer, remote_peer);
408     }
409     break;
410   case MESSAGE_REQUEST:
411     xbt_assert(remote_peer->interested, "WTF !!!");
412
413     xbt_assert((message->index >= 0
414                 && message->index < FILE_PIECES), "Wrong request received");
415     if (!remote_peer->choked_upload) {
416       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
417                 message->mailbox, message->issuer_host_name, message->index,
418                 message->block_index,
419                 message->block_index + message->block_length);
420       if (peer->bitfield[message->index] == '1') {
421         send_piece(peer, message->mailbox, message->index,
422                    message->block_index, message->block_length);
423       }
424     } else {
425       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
426                 message->mailbox, message->issuer_host_name, message->peer_id);
427     }
428     break;
429   case MESSAGE_PIECE:
430     XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
431               message->block_index,
432               message->block_index + message->block_length,
433               message->mailbox, message->issuer_host_name);
434     xbt_assert(!remote_peer->choked_download, "WTF !!!");
435     xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
436     xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
437     xbt_assert((message->index >= 0
438                 && message->index < FILE_PIECES), "Wrong piece received");
439     //TODO: Execute Ã  computation.
440       if (peer->bitfield[message->index] == '0') {
441         update_bitfield_blocks(peer, message->index, message->block_index,
442                                message->block_length);
443         if (piece_complete(peer, message->index)) {
444           //Removing the piece from our piece list
445           remove_current_piece(peer, remote_peer, message->index);
446           //Setting the fact that we have the piece
447           peer->bitfield[message->index] = '1';
448           peer->pieces++;
449           XBT_DEBUG("My status is now %s", peer->bitfield);
450           //Sending the information to all the peers we are connected to
451           send_have(peer, message->index);
452           //sending UNINTERSTED to peers that doesn't have what we want.
453           update_interested_after_receive(peer);
454         } else {                // piece not completed
455           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
456         }
457       } else {
458         XBT_DEBUG("However, we already have it");
459         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
460         request_new_piece_to_peer(peer, remote_peer);
461       }
462     break;
463   case MESSAGE_CANCEL:
464     XBT_DEBUG("The received CANCEL from %s (%s)",
465               message->mailbox, message->issuer_host_name);
466     break;
467   }
468   //Update the peer speed.
469   if (remote_peer) {
470     connection_add_speed_value(remote_peer,
471                                1.0 / (MSG_get_clock() -
472                                       peer->begin_receive_time));
473   }
474   peer->begin_receive_time = MSG_get_clock();
475
476   task_message_free(task);
477 }
478
479 /**
480  * Selects the appropriate piece to download and requests it to the remote_peer
481  */
482 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
483 {
484   int piece = select_piece_to_download(peer, remote_peer);
485   if (piece != -1) {
486     xbt_dynar_push_as(peer->current_pieces, int, piece);
487     send_request_to_peer(peer, remote_peer, piece);
488   }
489 }
490
491 /**
492  * remove current_piece from the list of currently downloaded pieces.
493  */
494 void remove_current_piece(peer_t peer, connection_t remote_peer,
495                           int current_piece)
496 {
497   int piece_index = -1, piece;
498   unsigned int i;
499   xbt_dynar_foreach(peer->current_pieces, i, piece) {
500     if (piece == current_piece) {
501       piece_index = i;
502       break;
503     }
504   }
505   if (piece_index != -1)
506     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
507   remote_peer->current_piece = -1;
508 }
509
510 /**
511  * Updates the list of who has a piece from a bitfield
512  * @param peer peer we want to update the list
513  * @param bitfield bitfield
514  */
515 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
516 {
517   int i;
518   for (i = 0; i < FILE_PIECES; i++) {
519     if (bitfield[i] == '1') {
520       peer->pieces_count[i]++;
521     }
522   }
523 }
524
525
526
527 /**
528  * Return the piece to be downloaded
529  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
530  * If a piece is partially downloaded, this piece will be selected prioritarily
531  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
532  * If the peer has more than pieces, he downloads the pieces that are the less
533  * replicated (rarest policy).
534  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
535  * @param peer: local peer
536  * @param remote_peer: information about the connection
537  * @return the piece to download if possible. -1 otherwise
538  */
539 int select_piece_to_download(peer_t peer, connection_t remote_peer)
540 {
541   int piece = -1;
542
543   piece = partially_downloaded_piece(peer, remote_peer);
544   // strict priority policy
545   if (piece != -1)
546     return piece;
547
548   // end game mode
549   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
550       && is_interested(peer, remote_peer)) {
551     if(!ENABLE_END_GAME_MODE)
552       return -1;
553     int i;
554     int nb_interesting_pieces = 0;
555     int random_piece_index, current_index = 0;
556     // compute the number of interesting pieces
557     for (i = 0; i < FILE_PIECES; i++) {
558       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
559         nb_interesting_pieces++;
560       }
561     }
562     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
563     // get a random interesting piece
564     random_piece_index =
565         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
566     for (i = 0; i < FILE_PIECES; i++) {
567       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
568         if (random_piece_index == current_index) {
569           piece = i;
570           break;
571         }
572         current_index++;
573       }
574     }
575     xbt_assert(piece != -1, "WTF !!!");
576     return piece;
577   }
578   // Random first policy
579   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
580     int i;
581     int nb_interesting_pieces = 0;
582     int random_piece_index, current_index = 0;
583     // compute the number of interesting pieces
584     for (i = 0; i < FILE_PIECES; i++) {
585       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
586           && !in_current_pieces(peer, i)) {
587         nb_interesting_pieces++;
588       }
589     }
590     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
591     // get a random interesting piece
592     random_piece_index =
593         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
594     for (i = 0; i < FILE_PIECES; i++) {
595       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
596           && !in_current_pieces(peer, i)) {
597         if (random_piece_index == current_index) {
598           piece = i;
599           break;
600         }
601         current_index++;
602       }
603     }
604     xbt_assert(piece != -1, "WTF !!!");
605     return piece;
606   } else {                      // Rarest first policy
607     int i;
608     short min = SHRT_MAX;
609     int nb_min_pieces = 0;
610     int random_rarest_index, current_index = 0;
611     // compute the smallest number of copies of available pieces
612     for (i = 0; i < FILE_PIECES; i++) {
613       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
614           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
615         min = peer->pieces_count[i];
616     }
617     xbt_assert(min != SHRT_MAX
618                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
619     // compute the number of rarest pieces
620     for (i = 0; i < FILE_PIECES; i++) {
621       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
622           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
623         nb_min_pieces++;
624     }
625     xbt_assert(nb_min_pieces != 0
626                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
627     // get a random rarest piece
628     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
629     for (i = 0; i < FILE_PIECES; i++) {
630       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
631           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
632         if (random_rarest_index == current_index) {
633           piece = i;
634           break;
635         }
636         current_index++;
637       }
638     }
639     xbt_assert(piece != -1
640                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
641     return piece;
642   }
643 }
644
645
646 /**
647  * Update the list of current choked and unchoked peers, using the
648  * choke algorithm
649  * @param peer the current peer
650  */
651 void update_choked_peers(peer_t peer)
652 {
653   if (nb_interested_peers(peer) == 0)
654     return;
655   //  if(xbt_dict_size(peer->active_peers) > 0)
656   //    return;
657   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
658             xbt_dict_size(peer->active_peers));
659   //update the current round
660   peer->round = (peer->round + 1) % 3;
661   char *key, *key_choked=NULL;
662   connection_t peer_choosed = NULL;
663   connection_t peer_choked = NULL;
664   //remove a peer from the list
665   xbt_dict_cursor_t cursor = NULL;
666   xbt_dict_cursor_first(peer->active_peers, &cursor);
667   if (xbt_dict_length(peer->active_peers) > 0) {
668     key_choked = xbt_dict_cursor_get_key(cursor);
669     peer_choked = xbt_dict_cursor_get_data(cursor);
670   }
671   xbt_dict_cursor_free(&cursor);
672
673   /**
674    * If we are currently seeding, we unchoke the peer which has
675    * been unchoke the least time.
676    */
677   if (peer->pieces == FILE_PIECES) {
678     connection_t connection;
679     double unchoke_time = MSG_get_clock() + 1;
680
681     xbt_dict_foreach(peer->peers, cursor, key, connection) {
682       if (connection->last_unchoke < unchoke_time && connection->interested
683           && connection->choked_upload) {
684         unchoke_time = connection->last_unchoke;
685         peer_choosed = connection;
686       }
687     }
688   } else {
689     //Random optimistic unchoking
690     if (peer->round == 0) {
691       int j = 0;
692       do {
693         //We choose a random peer to unchoke.
694         int id_chosen = RngStream_RandInt(peer->stream, 0,
695                                           xbt_dict_length(peer->peers) - 1);
696         int i = 0;
697         connection_t connection;
698         xbt_dict_foreach(peer->peers, cursor, key, connection) {
699           if (i == id_chosen) {
700             peer_choosed = connection;
701             break;
702           }
703           i++;
704         }
705         xbt_dict_cursor_free(&cursor);
706         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
707           peer_choosed = NULL;
708         }
709         j++;
710       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
711     } else {
712       //Use the "fastest download" policy.
713       connection_t connection;
714       double fastest_speed = 0.0;
715       xbt_dict_foreach(peer->peers, cursor, key, connection) {
716         if (connection->peer_speed > fastest_speed
717             && connection->choked_upload && connection->interested) {
718           peer_choosed = connection;
719           fastest_speed = connection->peer_speed;
720         }
721       }
722     }
723
724   }
725   if (peer_choosed != NULL)
726     XBT_DEBUG
727         ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
728          peer->id, peer_choosed->id, peer_choosed->interested,
729          peer_choosed->choked_upload);
730
731   //  if (xbt_dict_size(peer->peers) > 0)
732   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
733   //        "No more active peers !");
734
735
736   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
737   //    peer_choked = NULL;
738   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
739   //    peer_choosed = NULL;
740
741   if (peer_choked != peer_choosed) {
742     if (peer_choked != NULL) {
743       xbt_assert((!peer_choked->choked_upload),
744                  "Tries to choked a choked peer");
745       peer_choked->choked_upload = 1;
746       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
747       update_active_peers_set(peer, peer_choked);
748       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
749       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
750       send_choked(peer, peer_choked->mailbox);
751     }
752     if (peer_choosed != NULL) {
753       xbt_assert((peer_choosed->choked_upload),
754                  "Tries to unchoked an unchoked peer");
755       peer_choosed->choked_upload = 0;
756       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
757                        sizeof(int), peer_choosed, NULL);
758       peer_choosed->last_unchoke = MSG_get_clock();
759       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
760       update_active_peers_set(peer, peer_choosed);
761       send_unchoked(peer, peer_choosed->mailbox);
762     }
763   }
764 }
765
766 /**
767  * Updates our "interested" state about peers: send "not interested" to peers
768  * that don't have any more pieces we want.
769  * @param peer our peer data
770  */
771 void update_interested_after_receive(peer_t peer)
772 {
773   char *key;
774   xbt_dict_cursor_t cursor;
775   connection_t connection;
776   int interested;
777   xbt_dict_foreach(peer->peers, cursor, key, connection) {
778     interested = 0;
779     if (connection->am_interested) {
780       xbt_assert(connection->bitfield, "Bitfield not received");
781       //Check if the peer still has a piece we want.
782       int i;
783       for (i = 0; i < FILE_PIECES; i++) {
784         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
785           interested = 1;
786           break;
787         }
788       }
789       if (!interested) {        //no more piece to download from connection
790         connection->am_interested = 0;
791         send_notinterested(peer, connection->mailbox);
792       }
793     }
794   }
795 }
796
797 void update_bitfield_blocks(peer_t peer, int index, int block_index,
798                             int block_length)
799 {
800   int i;
801   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
802   xbt_assert((block_index >= 0
803               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
804              block_index);
805   for (i = block_index; i < (block_index + block_length); i++) {
806     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
807   }
808 }
809
810 /**
811  * Returns if a peer has completed the download of a piece
812  */
813 int piece_complete(peer_t peer, int index)
814 {
815   int i;
816   for (i = 0; i < PIECES_BLOCKS; i++) {
817     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
818       return 0;
819     }
820   }
821   return 1;
822 }
823
824 /**
825  * Returns the first block that a peer doesn't have in a piece.
826  * If the peer has all blocks of the piece, returns -1.
827  */
828 int get_first_block(peer_t peer, int piece)
829 {
830   int i;
831   for (i = 0; i < PIECES_BLOCKS; i++) {
832     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
833       return i;
834     }
835   }
836   return -1;
837 }
838
839 /**
840  * Indicates if the remote peer has a piece not stored by the local peer
841  */
842 int is_interested(peer_t peer, connection_t remote_peer)
843 {
844   xbt_assert(remote_peer->bitfield, "Bitfield not received");
845   int i;
846   for (i = 0; i < FILE_PIECES; i++) {
847     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
848       return 1;
849     }
850   }
851   return 0;
852 }
853
854 /**
855  * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
856  */
857 int is_interested_and_free(peer_t peer, connection_t remote_peer)
858 {
859   xbt_assert(remote_peer->bitfield, "Bitfield not received");
860   int i;
861   for (i = 0; i < FILE_PIECES; i++) {
862     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
863         && !in_current_pieces(peer, i)) {
864       return 1;
865     }
866   }
867   return 0;
868 }
869
870
871 /**
872  * Returns a piece that is partially downloaded and stored by the remote peer if any
873  * -1 otherwise.
874  */
875 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
876 {
877   xbt_assert(remote_peer->bitfield, "Bitfield not received");
878   int i;
879   for (i = 0; i < FILE_PIECES; i++) {
880     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
881         && !in_current_pieces(peer, i)) {
882       if (get_first_block(peer, i) > 0)
883         return i;
884     }
885   }
886   return -1;
887 }
888
889
890 /**
891  * Send request messages to a peer that have unchoked us
892  * @param peer peer
893  * @param remote_peer peer data to the peer we want to send the request
894  */
895 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
896 {
897   remote_peer->current_piece = piece;
898   int block_index, block_length;
899   xbt_assert(remote_peer->bitfield, "bitfield not received");
900   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
901   block_index = get_first_block(peer, piece);
902   if (block_index != -1) {
903     block_length = PIECES_BLOCKS - block_index;
904     block_length = min(BLOCKS_REQUESTED, block_length);
905     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
906   }
907 }
908
909
910 /**
911  * Indicates if a piece is currently being downloaded by the peer.
912  */
913 int in_current_pieces(peer_t peer, int piece)
914 {
915   unsigned i;
916   int peer_piece;
917   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
918     if (peer_piece == piece) {
919       return 1;
920     }
921   }
922   return 0;
923 }
924
925
926
927
928 /***********************************************************
929  *
930  *  Low level message functions
931  *
932  ***********************************************************/
933
934
935
936 /**
937  * Send a "interested" message to a peer
938  * @param peer peer data
939  * @param mailbox destination mailbox
940  */
941 void send_interested(peer_t peer, const char *mailbox)
942 {
943   msg_task_t task =
944       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
945                        peer->id, task_message_size(MESSAGE_INTERESTED));
946   MSG_task_dsend(task, mailbox, task_message_free);
947   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
948
949 }
950
951 /**
952  * Send a "not interested" message to a peer
953  * @param peer peer data
954  * @param mailbox destination mailbox
955  */
956 void send_notinterested(peer_t peer, const char *mailbox)
957 {
958   msg_task_t task =
959       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
960                        peer->id, task_message_size(MESSAGE_NOTINTERESTED));
961   MSG_task_dsend(task, mailbox, task_message_free);
962   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
963
964 }
965
966 /**
967  * Send a handshake message to all the peers the peer has.
968  * @param peer peer data
969  */
970 void send_handshake_all(peer_t peer)
971 {
972   connection_t remote_peer;
973   xbt_dict_cursor_t cursor = NULL;
974   char *key;
975   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
976     msg_task_t task =
977         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
978                          peer->id, task_message_size(MESSAGE_HANDSHAKE));
979     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
980     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
981   }
982 }
983
984 /**
985  * Send a "handshake" message to an user
986  * @param peer peer data
987  * @param mailbox mailbox where to we send the message
988  */
989 void send_handshake(peer_t peer, const char *mailbox)
990 {
991   msg_task_t task =
992       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
993                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
994   MSG_task_dsend(task, mailbox, task_message_free);
995   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
996 }
997
998 /**
999  * Send a "choked" message to a peer.
1000  */
1001 void send_choked(peer_t peer, const char *mailbox)
1002 {
1003   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
1004   msg_task_t task =
1005       task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1006                        peer->id, task_message_size(MESSAGE_CHOKE));
1007   MSG_task_dsend(task, mailbox, task_message_free);
1008 }
1009
1010 /**
1011  * Send a "unchoked" message to a peer
1012  */
1013 void send_unchoked(peer_t peer, const char *mailbox)
1014 {
1015   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1016   msg_task_t task =
1017       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1018                        peer->id, task_message_size(MESSAGE_UNCHOKE));
1019   MSG_task_dsend(task, mailbox, task_message_free);
1020 }
1021
1022 /**
1023  * Send a "HAVE" message to all peers we are connected to
1024  */
1025 void send_have(peer_t peer, int piece)
1026 {
1027   XBT_DEBUG("Sending HAVE message to all my peers");
1028   connection_t remote_peer;
1029   xbt_dict_cursor_t cursor = NULL;
1030   char *key;
1031   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1032     msg_task_t task =
1033         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1034                                peer->id, piece,
1035                                task_message_size(MESSAGE_HAVE));
1036     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1037   }
1038 }
1039
1040 /**
1041  * Send a bitfield message to all the peers the peer has.
1042  * @param peer peer data
1043  */
1044 void send_bitfield(peer_t peer, const char *mailbox)
1045 {
1046   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1047   msg_task_t task =
1048       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1049                                 peer->bitfield, FILE_PIECES);
1050   MSG_task_dsend(task, mailbox, task_message_free);
1051 }
1052
1053 /**
1054  * Send a "request" message to a pair, containing a request for a piece
1055  */
1056 void send_request(peer_t peer, const char *mailbox, int piece,
1057                   int block_index, int block_length)
1058 {
1059   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1060             block_index, block_length);
1061   msg_task_t task =
1062       task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1063                                block_index, block_length);
1064   MSG_task_dsend(task, mailbox, task_message_free);
1065 }
1066
1067 /**
1068  * Send a "piece" message to a pair, containing a piece of the file
1069  */
1070 void send_piece(peer_t peer, const char *mailbox, int piece,
1071                 int block_index, int block_length)
1072 {
1073   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1074             block_length, mailbox);
1075   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1076   xbt_assert((peer->bitfield[piece] == '1'),
1077              "Tried to send a piece that we doesn't have.");
1078   msg_task_t task =
1079       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1080                              block_index, block_length, BLOCK_SIZE);
1081   MSG_task_dsend(task, mailbox, task_message_free);
1082 }