Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
-Cleaning a bit the code
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26
27 /**
28  *  Number of blocks asked by each request
29  */
30 #define BLOCKS_REQUESTED 2
31
32
33 static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
34
35 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
36 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
37 void remove_current_piece(peer_t peer, connection_t remote_peer,
38                           int current_piece);
39
40
41   /**
42  * Peer main function
43  */
44 int peer(int argc, char *argv[])
45 {
46   s_peer_t peer;
47   //Check arguments
48   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
49   //Build peer object
50   if (argc == 4) {
51     peer_init(&peer, atoi(argv[1]), 1);
52   } else {
53     peer_init(&peer, atoi(argv[1]), 0);
54   }
55   //Retrieve deadline
56   double deadline = atof(argv[2]);
57   xbt_assert(deadline > 0, "Wrong deadline supplied");
58   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
59   //Getting peer data from the tracker.
60   if (get_peers_data(&peer)) {
61     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
62     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
63     peer.begin_receive_time = MSG_get_clock();
64     MSG_mailbox_set_async(peer.mailbox);
65     if (has_finished(peer.bitfield)) {
66       peer.pieces = FILE_PIECES;
67       send_handshake_all(&peer);
68       seed_loop(&peer, deadline);
69     } else {
70       leech_loop(&peer, deadline);
71 //      XBT_INFO("%d becomes a seeder", peer.id);
72       seed_loop(&peer, deadline);
73     }
74   } else {
75     XBT_INFO("Couldn't contact the tracker.");
76   }
77
78   XBT_INFO("Here is my current status: %s", peer.bitfield);
79   if (peer.comm_received) {
80     MSG_comm_destroy(peer.comm_received);
81   }
82
83   peer_free(&peer);
84   return 0;
85 }
86
87 /**
88  * Peer main loop when it is leeching.
89  * @param peer peer data
90  * @param deadline time at which the peer has to leave
91  */
92 void leech_loop(peer_t peer, double deadline)
93 {
94   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
95   XBT_DEBUG("Start downloading.");
96   /*
97    * Send a "handshake" message to all the peers it got
98    * (since it couldn't have gotten more than 50 peers)
99    */
100   send_handshake_all(peer);
101   //Wait for at least one "bitfield" message.
102 //  wait_for_pieces(peer, deadline);
103   XBT_DEBUG("Starting main leech loop");
104
105   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
106     if (peer->comm_received == NULL) {
107 //      XBT_INFO("irecv");
108       peer->task_received = NULL;
109       peer->comm_received =
110         MSG_task_irecv(&peer->task_received, peer->mailbox);
111     }
112     if (MSG_comm_test(peer->comm_received)) {
113 //      XBT_INFO("comm_test OK");
114       msg_error_t status = MSG_comm_get_status(peer->comm_received);
115       MSG_comm_destroy(peer->comm_received);
116       peer->comm_received = NULL;
117       if (status == MSG_OK) {
118         handle_message(peer, peer->task_received);
119       }
120     } else {
121       handle_pending_sends(peer);
122       //We don't execute the choke algorithm if we don't already have a piece
123       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
124         update_choked_peers(peer);
125         next_choked_update += UPDATE_CHOKED_INTERVAL;
126       } else {
127         MSG_process_sleep(1);
128       }
129     }
130   }
131 }
132
133 /**
134  * Peer main loop when it is seeding
135  * @param peer peer data
136  * @param deadline time when the peer will leave
137  */
138 void seed_loop(peer_t peer, double deadline)
139 {
140   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
141   XBT_DEBUG("Start seeding.");
142   //start the main seed loop
143   while (MSG_get_clock() < deadline) {
144     if (peer->comm_received == NULL) {
145       peer->task_received = NULL;
146       peer->comm_received =
147         MSG_task_irecv(&peer->task_received, peer->mailbox);
148     }
149     if (MSG_comm_test(peer->comm_received)) {
150       msg_error_t status = MSG_comm_get_status(peer->comm_received);
151       MSG_comm_destroy(peer->comm_received);
152       peer->comm_received = NULL;
153       if (status == MSG_OK) {
154         handle_message(peer, peer->task_received);
155       }
156     } else {
157       if (MSG_get_clock() >= next_choked_update) {
158         update_choked_peers(peer);
159         //TODO: Change the choked peer algorithm when seeding.
160         next_choked_update += UPDATE_CHOKED_INTERVAL;
161       } else {
162         MSG_process_sleep(1);
163       }
164     }
165   }
166 }
167
168 /**
169  * Retrieves the peer list from the tracker
170  * @param peer current peer data
171  */
172 int get_peers_data(peer_t peer)
173 {
174   int success = 0, send_success = 0;
175   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
176   //Build the task to send to the tracker
177   tracker_task_data_t data =
178     tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
179                           peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
180   //Build the task to send.
181   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
182   msg_task_t task_received = NULL;
183   msg_comm_t comm_received;
184   while (!send_success && MSG_get_clock() < timeout) {
185     XBT_DEBUG("Sending a peer request to the tracker.");
186     msg_error_t status =
187       MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
188                                  GET_PEERS_TIMEOUT);
189     if (status == MSG_OK) {
190       send_success = 1;
191     }
192   }
193   while (!success && MSG_get_clock() < timeout) {
194     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
195     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
196     if (status == MSG_OK) {
197       tracker_task_data_t data = MSG_task_get_data(task_received);
198       unsigned i;
199       int peer_id;
200       //Add the peers the tracker gave us to our peer list.
201       xbt_dynar_foreach(data->peers, i, peer_id) {
202         if (peer_id != peer->id)
203           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
204                            connection_new(peer_id), NULL);
205       }
206       success = 1;
207       //free the communication and the task
208       MSG_comm_destroy(comm_received);
209       tracker_task_data_free(data);
210       MSG_task_destroy(task_received);
211       comm_received = NULL;
212     }
213   }
214
215   return success;
216 }
217
218 /**
219  * Initialize the peer data.
220  * @param peer peer data
221  * @param id id of the peer to take in the network
222  * @param seed indicates if the peer is a seed.
223  */
224 void peer_init(peer_t peer, int id, int seed)
225 {
226   peer->id = id;
227   sprintf(peer->mailbox, "%d", id);
228   sprintf(peer->mailbox_tracker, "tracker_%d", id);
229   peer->peers = xbt_dict_new();
230   peer->active_peers = xbt_dict_new();
231   peer->hostname = MSG_host_get_name(MSG_host_self());
232
233   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
234   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
235   if (seed) {
236     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
237     memset(peer->bitfield_blocks, '1',
238            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
239   } else {
240     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
241     memset(peer->bitfield_blocks, '0',
242            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
243   }
244
245   peer->bitfield[FILE_PIECES] = '\0';
246   peer->pieces = 0;
247
248   peer->pieces_count = xbt_new0(short, FILE_PIECES);
249
250   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
251
252   peer->stream = RngStream_CreateStream("");
253   peer->comm_received = NULL;
254
255   peer->round = 0;
256
257   peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
258 }
259
260 /**
261  * Destroys a poor peer object.
262  */
263 void peer_free(peer_t peer)
264 {
265   char *key;
266   connection_t connection;
267   xbt_dict_cursor_t cursor;
268   xbt_dict_foreach(peer->peers, cursor, key, connection) {
269     connection_free(connection);
270   }
271   xbt_dict_free(&peer->peers);
272   xbt_dict_free(&peer->active_peers);
273   xbt_dynar_free(&peer->current_pieces);
274   xbt_dynar_free(&peer->pending_sends);
275   xbt_free(peer->pieces_count);
276   xbt_free(peer->bitfield);
277   xbt_free(peer->bitfield_blocks);
278
279   RngStream_DeleteStream(&peer->stream);
280 }
281
282 /**
283  * Returns if a peer has finished downloading the file
284  * @param bitfield peer bitfield
285  */
286 int has_finished(char *bitfield)
287 {
288   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) ==
289            NULL) ? 1 : 0);
290 }
291
292 int nb_interested_peers(peer_t peer)
293 {
294   xbt_dict_cursor_t cursor = NULL;
295   char *key;
296   connection_t connection;
297   int nb = 0;
298   xbt_dict_foreach(peer->peers, cursor, key, connection) {
299     if (connection->interested)
300       nb++;
301   }
302   return nb;
303 }
304
305 /**
306  * Handle pending sends and remove those which are done
307  * @param peer Peer data
308  */
309 void handle_pending_sends(peer_t peer)
310 {
311   int index;
312
313   while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
314     msg_comm_t comm_send =
315       xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
316     int status = MSG_comm_get_status(comm_send);
317     xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
318     XBT_DEBUG
319       ("Communication %p is finished with status %d, dynar size is now %lu",
320        comm_send, status, xbt_dynar_length(peer->pending_sends));
321
322     msg_task_t task = MSG_comm_get_task(comm_send);
323     MSG_comm_destroy(comm_send);
324
325     if (status != MSG_OK) {
326       task_message_free(task);
327     }
328   }
329 }
330
331 void update_active_peers_set(peer_t peer, connection_t remote_peer)
332 {
333
334   if (remote_peer->interested && !remote_peer->choked_upload) {
335     //add in the active peers set
336     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
337                      sizeof(int), remote_peer, NULL);
338   } else {
339     //remove
340     xbt_ex_t e;
341     TRY {
342       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
343                           sizeof(int));
344     }
345     CATCH(e) {
346       xbt_ex_free(e);
347     }
348   }
349 }
350
351
352 /**
353  * Handle a received message sent by another peer
354  * @param peer Peer data
355  * @param task task received.
356  */
357 void handle_message(peer_t peer, msg_task_t task)
358 {
359   message_t message = MSG_task_get_data(task);
360   connection_t remote_peer;
361   remote_peer =
362     xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
363                              sizeof(int));
364   switch (message->type) {
365
366   case MESSAGE_HANDSHAKE:
367     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
368               message->issuer_host_name);
369     //Check if the peer is in our connection list.
370     if (!remote_peer) {
371       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
372                        connection_new(message->peer_id), NULL);
373       send_handshake(peer, message->mailbox);
374     }
375     //Send our bitfield to the peer
376     send_bitfield(peer, message->mailbox);
377     break;
378   case MESSAGE_BITFIELD:
379     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
380               message->issuer_host_name);
381     //Update the pieces list
382     update_pieces_count_from_bitfield(peer, message->bitfield);
383     //Store the bitfield
384     remote_peer->bitfield = xbt_strdup(message->bitfield);
385     xbt_assert(!remote_peer->am_interested,
386                "Should not be interested at first");
387     if (is_interested(peer, remote_peer)) {
388       remote_peer->am_interested = 1;
389       send_interested(peer, message->mailbox);
390     }
391     break;
392   case MESSAGE_INTERESTED:
393     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
394               message->issuer_host_name);
395     xbt_assert((remote_peer != NULL),
396                "A non-in-our-list peer has sent us a message. WTH ?");
397     //Update the interested state of the peer.
398     remote_peer->interested = 1;
399     update_active_peers_set(peer, remote_peer);
400     break;
401   case MESSAGE_NOTINTERESTED:
402     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
403               message->mailbox, message->issuer_host_name);
404     xbt_assert((remote_peer != NULL),
405                "A non-in-our-list peer has sent us a message. WTH ?");
406     remote_peer->interested = 0;
407     update_active_peers_set(peer, remote_peer);
408     break;
409   case MESSAGE_UNCHOKE:
410     xbt_assert((remote_peer != NULL),
411                "A non-in-our-list peer has sent us a message. WTH ?");
412     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
413               message->issuer_host_name);
414     remote_peer->choked_download = 0;
415     //Send requests to the peer, since it has unchoked us
416     if (remote_peer->am_interested)
417       request_new_piece_to_peer(peer, remote_peer);
418     break;
419   case MESSAGE_CHOKE:
420     xbt_assert((remote_peer != NULL),
421                "A non-in-our-list peer has sent us a message. WTH ?");
422     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
423               message->issuer_host_name);
424     remote_peer->choked_download = 1;
425     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
426     break;
427   case MESSAGE_HAVE:
428     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
429               message->mailbox, message->issuer_host_name, message->index);
430     xbt_assert(remote_peer->bitfield, "bitfield not received");
431     xbt_assert((message->index >= 0
432                 && message->index < FILE_PIECES),
433                "Wrong HAVE message received");
434     remote_peer->bitfield[message->index] = '1';
435     peer->pieces_count[message->index]++;
436     //If the piece is in our pieces, we tell the peer that we are interested.
437     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
438       remote_peer->am_interested = 1;
439       send_interested(peer, message->mailbox);
440       if (!remote_peer->choked_download)
441         request_new_piece_to_peer(peer, remote_peer);
442     }
443     break;
444   case MESSAGE_REQUEST:
445     xbt_assert((message->index >= 0
446                 && message->index < FILE_PIECES), "Wrong request received");
447     if (!remote_peer->choked_upload) {
448       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
449                 message->mailbox, message->issuer_host_name, message->peer_id,
450                 message->block_index,
451                 message->block_index + message->block_length);
452       if (peer->bitfield[message->index] == '1') {
453         send_piece(peer, message->mailbox, message->index, 0,
454                    message->block_index, message->block_length);
455       }
456     } else {
457       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
458                 message->mailbox, message->issuer_host_name,
459                 message->peer_id);
460     }
461     break;
462   case MESSAGE_PIECE:
463     xbt_assert((message->index >= 0
464                 && message->index < FILE_PIECES), "Wrong piece received");
465     //TODO: Execute Ã  computation.
466     if (message->stalled) {
467       XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
468                 message->index, message->mailbox, message->issuer_host_name);
469     } else {
470       XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
471                 message->block_index,
472                 message->block_index + message->block_length,
473                 message->mailbox, message->issuer_host_name);
474       if (peer->bitfield[message->index] == '0') {
475         update_bitfield_blocks(peer, message->index, message->block_index,
476                                message->block_length);
477         if (piece_complete(peer, message->index)) {
478           //Removing the piece from our piece list
479           remove_current_piece(peer, remote_peer, message->index);
480           //Setting the fact that we have the piece
481           peer->bitfield[message->index] = '1';
482           peer->pieces++;
483           XBT_DEBUG("My status is now %s", peer->bitfield);
484           //Sending the information to all the peers we are connected to
485           send_have(peer, message->index);
486           //sending UNINTERSTED to peers that doesn't have what we want.
487           update_interested_after_receive(peer);
488         } else {                // piece not completed
489           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
490         }
491       } else {
492         XBT_DEBUG("However, we already have it");
493         request_new_piece_to_peer(peer, remote_peer);
494       }
495     }
496     break;
497   case MESSAGE_CANCEL:
498     XBT_DEBUG("The received CANCEL from %s (%s)",
499               message->mailbox, message->issuer_host_name);
500     break;
501   }
502   //Update the peer speed.
503   if (remote_peer) {
504     connection_add_speed_value(remote_peer,
505                                1.0 / (MSG_get_clock() -
506                                       peer->begin_receive_time));
507   }
508   peer->begin_receive_time = MSG_get_clock();
509
510   task_message_free(task);
511 }
512
513 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
514 {
515   int piece = select_piece_to_download(peer, remote_peer);
516   if (piece != -1) {
517     xbt_dynar_push_as(peer->current_pieces, int, piece);
518     send_request_to_peer(peer, remote_peer, piece);
519   }
520 }
521
522 void remove_current_piece(peer_t peer, connection_t remote_peer,
523                           int current_piece)
524 {
525   int piece_index = -1, piece, i;
526   xbt_dynar_foreach(peer->current_pieces, i, piece) {
527     if (piece == current_piece) {
528       piece_index = i;
529       break;
530     }
531   }
532   if (piece_index != -1)
533     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
534   remote_peer->current_piece = -1;
535 }
536
537 /**
538  * Updates the list of who has a piece from a bitfield
539  * @param peer peer we want to update the list
540  * @param bitfield bitfield
541  */
542 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
543 {
544   int i;
545   for (i = 0; i < FILE_PIECES; i++) {
546     if (bitfield[i] == '1') {
547       peer->pieces_count[i]++;
548     }
549   }
550 }
551
552
553
554 /**
555  * Return the piece to be downloaded
556  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
557  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
558  * If the peer has more than pieces, he downloads the pieces that are the less
559  * replicated
560  * Note: "end game mode" policy is not implemented
561  * @param peer: local peer
562  * @param remote_peer: information about the connection
563  * @return the piece to download if possible. -1 otherwise
564  */
565 int select_piece_to_download(peer_t peer, connection_t remote_peer)
566 {
567   // TODO : add strict priority policy
568
569   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)) { // end game mode
570     return -1;
571   }
572   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {  // Random first policy
573     int piece = 0;
574     do {
575       piece = RngStream_RandInt(peer->stream, 0, FILE_PIECES - 1);;
576     } while (!
577              (peer->bitfield[piece] == '0'
578               && remote_peer->bitfield[piece] == '1'
579               && !in_current_pieces(peer, piece)));
580     return piece;
581   } else {                      // Rarest first policy
582     int i, min_piece = -1;
583     short min = SHRT_MAX;
584     for (i = 0; i < FILE_PIECES; i++) {
585       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
586           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
587         min = peer->pieces_count[i];
588         min_piece = i;
589       }
590     }
591     // TODO: add randomness when selecting a rarest piece
592     return min_piece;
593   }
594 }
595
596
597 /**
598  * Update the list of current choked and unchoked peers, using the
599  * choke algorithm
600  * @param peer the current peer
601  */
602 void update_choked_peers(peer_t peer)
603 {
604   if (nb_interested_peers(peer) == 0)
605     return;
606   //  if(xbt_dict_size(peer->active_peers) > 0)
607   //    return;
608   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
609             xbt_dict_size(peer->active_peers));
610   //update the current round
611   peer->round = (peer->round + 1) % 3;
612   char *key, *key_choked;
613   connection_t peer_choosed = NULL;
614   connection_t peer_choked = NULL;
615   //remove a peer from the list
616   xbt_dict_cursor_t cursor = NULL;
617   xbt_dict_cursor_first(peer->active_peers, &cursor);
618   if (xbt_dict_length(peer->active_peers) > 0) {
619     key_choked = xbt_dict_cursor_get_key(cursor);
620     peer_choked = xbt_dict_cursor_get_data(cursor);
621   }
622   xbt_dict_cursor_free(&cursor);
623
624   /**
625    * If we are currently seeding, we unchoke the peer which has
626    * been unchoke the least time.
627    */
628   if (peer->pieces == FILE_PIECES) {
629     connection_t connection;
630     double unchoke_time = MSG_get_clock() + 1;
631
632     xbt_dict_foreach(peer->peers, cursor, key, connection) {
633       if (connection->last_unchoke < unchoke_time && connection->interested
634           && connection->choked_upload) {
635         unchoke_time = connection->last_unchoke;
636         peer_choosed = connection;
637       }
638     }
639   } else {
640     //Random optimistic unchoking
641     if (peer->round == 0) {
642       int j = 0;
643       do {
644         //We choose a random peer to unchoke.
645         int id_chosen = RngStream_RandInt(peer->stream, 0,
646                                           xbt_dict_length(peer->peers) - 1);
647         int i = 0;
648         connection_t connection;
649         xbt_dict_foreach(peer->peers, cursor, key, connection) {
650           if (i == id_chosen) {
651             peer_choosed = connection;
652             break;
653           }
654           i++;
655         }
656         xbt_dict_cursor_free(&cursor);
657         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
658           peer_choosed = NULL;
659         }
660         j++;
661       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
662     } else {
663       //Use the "fastest download" policy.
664       connection_t connection;
665       double fastest_speed = 0.0;
666       xbt_dict_foreach(peer->peers, cursor, key, connection) {
667         if (connection->peer_speed > fastest_speed
668             && connection->choked_upload && connection->interested) {
669           peer_choosed = connection;
670           fastest_speed = connection->peer_speed;
671         }
672       }
673     }
674
675   }
676   if (peer_choosed != NULL)
677     XBT_DEBUG
678       ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
679        peer->id, peer_choosed->id, peer_choosed->interested,
680        peer_choosed->choked_upload);
681
682   //  if (xbt_dict_size(peer->peers) > 0)
683   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
684   //        "No more active peers !");
685
686
687   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
688   //    peer_choked = NULL;
689   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
690   //    peer_choosed = NULL;
691
692   if (peer_choked != peer_choosed) {
693     if (peer_choked != NULL) {
694       xbt_assert((!peer_choked->choked_upload),
695                  "Tries to choked a choked peer");
696       peer_choked->choked_upload = 1;
697       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
698       update_active_peers_set(peer, peer_choked);
699       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
700       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
701       send_choked(peer, peer_choked->mailbox);
702     }
703     if (peer_choosed != NULL) {
704       xbt_assert((peer_choosed->choked_upload),
705                  "Tries to unchoked an unchoked peer");
706       peer_choosed->choked_upload = 0;
707       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
708                        sizeof(int), peer_choosed, NULL);
709       peer_choosed->last_unchoke = MSG_get_clock();
710       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
711       update_active_peers_set(peer, peer_choosed);
712       send_unchoked(peer, peer_choosed->mailbox);
713     }
714   }
715 }
716
717 /**
718  * Updates our "interested" state about peers: send "not interested" to peers
719  * that don't have any more pieces we want.
720  * @param peer our peer data
721  */
722 void update_interested_after_receive(peer_t peer)
723 {
724   char *key;
725   xbt_dict_cursor_t cursor;
726   connection_t connection;
727   unsigned cpt;
728   int interested, piece;
729   xbt_dict_foreach(peer->peers, cursor, key, connection) {
730     interested = 0;
731     if (connection->am_interested) {
732       xbt_assert(connection->bitfield, "Bitfield not received");
733       //Check if the peer still has a piece we want.
734       int i;
735       for (i = 0; i < FILE_PIECES; i++) {
736         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
737           interested = 1;
738           break;
739         }
740       }
741       if (!interested) {        //no more piece to download from connection
742         connection->am_interested = 0;
743         send_notinterested(peer, connection->mailbox);
744       }
745     }
746   }
747 }
748
749 void update_bitfield_blocks(peer_t peer, int index, int block_index,
750                             int block_length)
751 {
752   int i;
753   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
754   xbt_assert((block_index >= 0
755               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
756              block_index);
757   for (i = block_index; i < (block_index + block_length); i++) {
758     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
759   }
760 }
761
762 /**
763  * Returns if a peer has completed the download of a piece
764  */
765 int piece_complete(peer_t peer, int index)
766 {
767   int i;
768   for (i = 0; i < PIECES_BLOCKS; i++) {
769     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
770       return 0;
771     }
772   }
773   return 1;
774 }
775
776 /**
777  * Returns the first block that a peer doesn't have in a piece.
778  * If the peer has all blocks of the piece, returns -1.
779  */
780 int get_first_block(peer_t peer, int piece)
781 {
782   int i;
783   for (i = 0; i < PIECES_BLOCKS; i++) {
784     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
785       return i;
786     }
787   }
788   return -1;
789 }
790
791 /**
792  * Returns a piece that is stored by the remote peer but not by the local peer.
793  * -1 otherwise.
794  */
795 int is_interested(peer_t peer, connection_t remote_peer)
796 {
797   xbt_assert(remote_peer->bitfield, "Bitfield not received");
798   int i;
799   for (i = 0; i < FILE_PIECES; i++) {
800     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
801       return 1;
802     }
803   }
804   return 0;
805 }
806
807 int is_interested_and_free(peer_t peer, connection_t remote_peer)
808 {
809   xbt_assert(remote_peer->bitfield, "Bitfield not received");
810   int i;
811   for (i = 0; i < FILE_PIECES; i++) {
812     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
813         && !in_current_pieces(peer, i)) {
814       return 1;
815     }
816   }
817   return 0;
818 }
819
820
821 /**
822  * Returns a piece that is partially downloaded and stored by the remote peer if any
823  * -1 otherwise.
824  */
825 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
826 {
827   xbt_assert(remote_peer->bitfield, "Bitfield not received");
828   int i;
829   for (i = 0; i < FILE_PIECES; i++) {
830     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
831         && !in_current_pieces(peer, i)) {
832       if (get_first_block(peer, i) > 0)
833         return i;
834     }
835   }
836   return -1;
837 }
838
839
840 /**
841  * Send request messages to a peer that have unchoked us
842  * @param peer peer
843  * @param remote_peer peer data to the peer we want to send the request
844  */
845 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
846 {
847   remote_peer->current_piece = piece;
848   unsigned i;
849   int block_index, block_length;
850   xbt_assert(remote_peer->bitfield, "bitfield not received");
851   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
852   block_index = get_first_block(peer, piece);
853   if (block_index != -1) {
854     block_length = PIECES_BLOCKS - block_index;
855     block_length = min(BLOCKS_REQUESTED, block_length);
856     send_request(peer, remote_peer->mailbox, piece, block_index,
857                  block_length);
858   }
859 }
860
861
862 /**
863  * Indicates if a piece is currently being downloaded by the peer.
864  */
865 int in_current_pieces(peer_t peer, int piece)
866 {
867   unsigned i;
868   int peer_piece;
869   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
870     if (peer_piece == piece) {
871       return 1;
872     }
873   }
874   return 0;
875 }
876
877
878
879
880 /***********************************************************
881  *
882  *  Low level message functions
883  *
884  ***********************************************************/
885
886
887
888 /**
889  * Send a "interested" message to a peer
890  * @param peer peer data
891  * @param mailbox destination mailbox
892  */
893 void send_interested(peer_t peer, const char *mailbox)
894 {
895   msg_task_t task =
896     task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
897                      peer->id, task_message_size(MESSAGE_INTERESTED));
898   MSG_task_dsend(task, mailbox, task_message_free);
899   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
900
901 }
902
903 /**
904  * Send a "not interested" message to a peer
905  * @param peer peer data
906  * @param mailbox destination mailbox
907  */
908 void send_notinterested(peer_t peer, const char *mailbox)
909 {
910   msg_task_t task =
911     task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
912                      peer->id, task_message_size(MESSAGE_NOTINTERESTED));
913   MSG_task_dsend(task, mailbox, task_message_free);
914   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
915
916 }
917
918 /**
919  * Send a handshake message to all the peers the peer has.
920  * @param peer peer data
921  */
922 void send_handshake_all(peer_t peer)
923 {
924   connection_t remote_peer;
925   xbt_dict_cursor_t cursor = NULL;
926   char *key;
927   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
928     msg_task_t task =
929       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
930                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
931     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
932     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
933   }
934 }
935
936 /**
937  * Send a "handshake" message to an user
938  * @param peer peer data
939  * @param mailbox mailbox where to we send the message
940  */
941 void send_handshake(peer_t peer, const char *mailbox)
942 {
943   msg_task_t task =
944     task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
945                      peer->id, task_message_size(MESSAGE_HANDSHAKE));
946   MSG_task_dsend(task, mailbox, task_message_free);
947   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
948 }
949
950 /**
951  * Send a "choked" message to a peer.
952  */
953 void send_choked(peer_t peer, const char *mailbox)
954 {
955   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
956   msg_task_t task =
957     task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
958                      peer->id, task_message_size(MESSAGE_CHOKE));
959   MSG_task_dsend(task, mailbox, task_message_free);
960 }
961
962 /**
963  * Send a "unchoked" message to a peer
964  */
965 void send_unchoked(peer_t peer, const char *mailbox)
966 {
967   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
968   msg_task_t task =
969     task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
970                      peer->id, task_message_size(MESSAGE_UNCHOKE));
971   MSG_task_dsend(task, mailbox, task_message_free);
972 }
973
974 /**
975  * Send a "HAVE" message to all peers we are connected to
976  */
977 void send_have(peer_t peer, int piece)
978 {
979   XBT_DEBUG("Sending HAVE message to all my peers");
980   connection_t remote_peer;
981   xbt_dict_cursor_t cursor = NULL;
982   char *key;
983   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
984     msg_task_t task =
985       task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
986                              peer->id, piece,
987                              task_message_size(MESSAGE_HAVE));
988     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
989   }
990 }
991
992 /**
993  * Send a bitfield message to all the peers the peer has.
994  * @param peer peer data
995  */
996 void send_bitfield(peer_t peer, const char *mailbox)
997 {
998   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
999   msg_task_t task =
1000     task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1001                               peer->bitfield, FILE_PIECES);
1002   //Async send and append to pending sends
1003   msg_comm_t comm = MSG_task_isend(task, mailbox);
1004   xbt_dynar_push(peer->pending_sends, &comm);
1005 }
1006
1007 /**
1008  * Send a "request" message to a pair, containing a request for a piece
1009  */
1010 void send_request(peer_t peer, const char *mailbox, int piece,
1011                   int block_index, int block_length)
1012 {
1013   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1014             block_index, block_length);
1015   msg_task_t task =
1016     task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1017                              block_index, block_length);
1018   MSG_task_dsend(task, mailbox, task_message_free);
1019 }
1020
1021 /**
1022  * Send a "piece" message to a pair, containing a piece of the file
1023  */
1024 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
1025                 int block_index, int block_length)
1026 {
1027   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1028             block_length, mailbox);
1029   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1030   xbt_assert((peer->bitfield[piece] == '1'),
1031              "Tried to send a piece that we doesn't have.");
1032   msg_task_t task =
1033     task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1034                            stalled, block_index, block_length, BLOCK_SIZE);
1035   MSG_task_dsend(task, mailbox, task_message_free);
1036 }