Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
07f7af671c40990b8bc6f7d23faf6c1d1adb4552
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26
27 /**
28  *  Number of blocks asked by each request
29  */
30 #define BLOCKS_REQUESTED 2
31
32
33 static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
34
35 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
36 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
37 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece);
38
39
40   /**
41  * Peer main function
42  */
43 int peer(int argc, char *argv[])
44 {
45   s_peer_t peer;
46   //Check arguments
47   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
48   //Build peer object
49   if (argc == 4) {
50     peer_init(&peer, atoi(argv[1]), 1);
51   } else {
52     peer_init(&peer, atoi(argv[1]), 0);
53   }
54   //Retrieve deadline
55   double deadline = atof(argv[2]);
56   xbt_assert(deadline > 0, "Wrong deadline supplied");
57   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
58   //Getting peer data from the tracker.
59   if (get_peers_data(&peer)) {
60     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
61     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
62     peer.begin_receive_time = MSG_get_clock();
63     MSG_mailbox_set_async(peer.mailbox);
64     if (has_finished(peer.bitfield)) {
65       peer.pieces = FILE_PIECES;
66       send_handshake_all(&peer);
67       seed_loop(&peer, deadline);
68     } else {
69       leech_loop(&peer, deadline);
70 //      XBT_INFO("%d becomes a seeder", peer.id);
71       seed_loop(&peer, deadline);
72     }
73   } else {
74     XBT_INFO("Couldn't contact the tracker.");
75   }
76
77   XBT_INFO("Here is my current status: %s", peer.bitfield);
78   if (peer.comm_received) {
79     MSG_comm_destroy(peer.comm_received);
80   }
81
82   peer_free(&peer);
83   return 0;
84 }
85
86 /**
87  * Peer main loop when it is leeching.
88  * @param peer peer data
89  * @param deadline time at which the peer has to leave
90  */
91 void leech_loop(peer_t peer, double deadline)
92 {
93   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
94   XBT_DEBUG("Start downloading.");
95   /*
96    * Send a "handshake" message to all the peers it got
97    * (since it couldn't have gotten more than 50 peers)
98    */
99   send_handshake_all(peer);
100   //Wait for at least one "bitfield" message.
101 //  wait_for_pieces(peer, deadline);
102   XBT_DEBUG("Starting main leech loop");
103
104   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
105     if (peer->comm_received == NULL) {
106 //      XBT_INFO("irecv");
107       peer->task_received = NULL;
108       peer->comm_received =
109         MSG_task_irecv(&peer->task_received, peer->mailbox);
110     }
111     if (MSG_comm_test(peer->comm_received)) {
112 //      XBT_INFO("comm_test OK");
113       msg_error_t status = MSG_comm_get_status(peer->comm_received);
114       MSG_comm_destroy(peer->comm_received);
115       peer->comm_received = NULL;
116       if (status == MSG_OK) {
117         handle_message(peer, peer->task_received);
118       }
119     } else {
120       handle_pending_sends(peer);
121       //We don't execute the choke algorithm if we don't already have a piece
122       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
123         update_choked_peers(peer);
124         next_choked_update += UPDATE_CHOKED_INTERVAL;
125       } else {
126         MSG_process_sleep(1);
127       }
128     }
129   }
130 }
131
132 /**
133  * Peer main loop when it is seeding
134  * @param peer peer data
135  * @param deadline time when the peer will leave
136  */
137 void seed_loop(peer_t peer, double deadline)
138 {
139   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
140   XBT_DEBUG("Start seeding.");
141   //start the main seed loop
142   while (MSG_get_clock() < deadline) {
143     if (peer->comm_received == NULL) {
144       peer->task_received = NULL;
145       peer->comm_received =
146         MSG_task_irecv(&peer->task_received, peer->mailbox);
147     }
148     if (MSG_comm_test(peer->comm_received)) {
149       msg_error_t status = MSG_comm_get_status(peer->comm_received);
150       MSG_comm_destroy(peer->comm_received);
151       peer->comm_received = NULL;
152       if (status == MSG_OK) {
153         handle_message(peer, peer->task_received);
154       }
155     } else {
156       if (MSG_get_clock() >= next_choked_update) {
157         update_choked_peers(peer);
158         //TODO: Change the choked peer algorithm when seeding.
159         next_choked_update += UPDATE_CHOKED_INTERVAL;
160       } else {
161         MSG_process_sleep(1);
162       }
163     }
164   }
165 }
166
167 /**
168  * Retrieves the peer list from the tracker
169  * @param peer current peer data
170  */
171 int get_peers_data(peer_t peer)
172 {
173   int success = 0, send_success = 0;
174   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
175   //Build the task to send to the tracker
176   tracker_task_data_t data =
177     tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
178                           peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
179   //Build the task to send.
180   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
181   msg_task_t task_received = NULL;
182   msg_comm_t comm_received;
183   while (!send_success && MSG_get_clock() < timeout) {
184     XBT_DEBUG("Sending a peer request to the tracker.");
185     msg_error_t status =
186       MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
187                                  GET_PEERS_TIMEOUT);
188     if (status == MSG_OK) {
189       send_success = 1;
190     }
191   }
192   while (!success && MSG_get_clock() < timeout) {
193     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
194     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
195     if (status == MSG_OK) {
196       tracker_task_data_t data = MSG_task_get_data(task_received);
197       unsigned i;
198       int peer_id;
199       //Add the peers the tracker gave us to our peer list.
200       xbt_dynar_foreach(data->peers, i, peer_id) {
201         if (peer_id != peer->id)
202           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
203                            connection_new(peer_id), NULL);
204       }
205       success = 1;
206       //free the communication and the task
207       MSG_comm_destroy(comm_received);
208       tracker_task_data_free(data);
209       MSG_task_destroy(task_received);
210       comm_received = NULL;
211     }
212   }
213
214   return success;
215 }
216
217 /**
218  * Initialize the peer data.
219  * @param peer peer data
220  * @param id id of the peer to take in the network
221  * @param seed indicates if the peer is a seed.
222  */
223 void peer_init(peer_t peer, int id, int seed)
224 {
225   peer->id = id;
226   sprintf(peer->mailbox, "%d", id);
227   sprintf(peer->mailbox_tracker, "tracker_%d", id);
228   peer->peers = xbt_dict_new();
229   peer->active_peers = xbt_dict_new();
230   peer->hostname = MSG_host_get_name(MSG_host_self());
231
232   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
233   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
234   if (seed) {
235     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
236     memset(peer->bitfield_blocks, '1',
237            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
238   } else {
239     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
240     memset(peer->bitfield_blocks, '0',
241            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
242   }
243
244   peer->bitfield[FILE_PIECES] = '\0';
245   peer->pieces = 0;
246
247   peer->pieces_count = xbt_new0(short, FILE_PIECES);
248
249   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
250
251   peer->stream = RngStream_CreateStream("");
252   peer->comm_received = NULL;
253
254   peer->round = 0;
255
256   peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
257 }
258
259 /**
260  * Destroys a poor peer object.
261  */
262 void peer_free(peer_t peer)
263 {
264   char *key;
265   connection_t connection;
266   xbt_dict_cursor_t cursor;
267   xbt_dict_foreach(peer->peers, cursor, key, connection) {
268     connection_free(connection);
269   }
270   xbt_dict_free(&peer->peers);
271   xbt_dict_free(&peer->active_peers);
272   xbt_dynar_free(&peer->current_pieces);
273   xbt_dynar_free(&peer->pending_sends);
274   xbt_free(peer->pieces_count);
275   xbt_free(peer->bitfield);
276   xbt_free(peer->bitfield_blocks);
277
278   RngStream_DeleteStream(&peer->stream);
279 }
280
281 /**
282  * Returns if a peer has finished downloading the file
283  * @param bitfield peer bitfield
284  */
285 int has_finished(char *bitfield)
286 {
287   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) ==
288            NULL) ? 1 : 0);
289 }
290
291 int nb_interested_peers(peer_t peer)
292 {
293   xbt_dict_cursor_t cursor = NULL;
294   char *key;
295   connection_t connection;
296   int nb = 0;
297   xbt_dict_foreach(peer->peers, cursor, key, connection) {
298     if (connection->interested)
299       nb++;
300   }
301   return nb;
302 }
303
304 /**
305  * Handle pending sends and remove those which are done
306  * @param peer Peer data
307  */
308 void handle_pending_sends(peer_t peer)
309 {
310   int index;
311
312   while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
313     msg_comm_t comm_send =
314       xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
315     int status = MSG_comm_get_status(comm_send);
316     xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
317     XBT_DEBUG
318       ("Communication %p is finished with status %d, dynar size is now %lu",
319        comm_send, status, xbt_dynar_length(peer->pending_sends));
320
321     msg_task_t task = MSG_comm_get_task(comm_send);
322     MSG_comm_destroy(comm_send);
323
324     if (status != MSG_OK) {
325       task_message_free(task);
326     }
327   }
328 }
329
330 void update_active_peers_set(peer_t peer, connection_t remote_peer)
331 {
332
333   if (remote_peer->interested && !remote_peer->choked_upload) {
334     //add in the active peers set
335     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
336                      sizeof(int), remote_peer, NULL);
337   } else {
338     //remove
339     xbt_ex_t e;
340     TRY {
341       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
342                           sizeof(int));
343     }
344     CATCH(e) {
345       xbt_ex_free(e);
346     }
347   }
348 }
349
350
351 /**
352  * Handle a received message sent by another peer
353  * @param peer Peer data
354  * @param task task received.
355  */
356 void handle_message(peer_t peer, msg_task_t task)
357 {
358   message_t message = MSG_task_get_data(task);
359   connection_t remote_peer;
360   remote_peer =
361     xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
362                              sizeof(int));
363   switch (message->type) {
364
365   case MESSAGE_HANDSHAKE:
366     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
367               message->issuer_host_name);
368     //Check if the peer is in our connection list.
369     if (!remote_peer) {
370       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
371                        connection_new(message->peer_id), NULL);
372       send_handshake(peer, message->mailbox);
373     }
374     //Send our bitfield to the peer
375     send_bitfield(peer, message->mailbox);
376     break;
377   case MESSAGE_BITFIELD:
378     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
379               message->issuer_host_name);
380     //Update the pieces list
381     update_pieces_count_from_bitfield(peer, message->bitfield);
382     //Store the bitfield
383     remote_peer->bitfield = xbt_strdup(message->bitfield);
384     xbt_assert(!remote_peer->am_interested,
385                "Should not be interested at first");
386     if (is_interested(peer, remote_peer)) {
387       remote_peer->am_interested = 1;
388       send_interested(peer, message->mailbox);
389     }
390     break;
391   case MESSAGE_INTERESTED:
392     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
393               message->issuer_host_name);
394     xbt_assert((remote_peer != NULL),
395                "A non-in-our-list peer has sent us a message. WTH ?");
396     //Update the interested state of the peer.
397     remote_peer->interested = 1;
398     update_active_peers_set(peer, remote_peer);
399     break;
400   case MESSAGE_NOTINTERESTED:
401     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
402               message->mailbox, message->issuer_host_name);
403     xbt_assert((remote_peer != NULL),
404                "A non-in-our-list peer has sent us a message. WTH ?");
405     remote_peer->interested = 0;
406     update_active_peers_set(peer, remote_peer);
407     break;
408   case MESSAGE_UNCHOKE:
409     xbt_assert((remote_peer != NULL),
410                "A non-in-our-list peer has sent us a message. WTH ?");
411     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
412               message->issuer_host_name);
413     remote_peer->choked_download = 0;
414     //Send requests to the peer, since it has unchoked us
415     if (remote_peer->am_interested)
416       request_new_piece_to_peer(peer, remote_peer);
417     break;
418   case MESSAGE_CHOKE:
419     xbt_assert((remote_peer != NULL),
420                "A non-in-our-list peer has sent us a message. WTH ?");
421     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
422               message->issuer_host_name);
423     remote_peer->choked_download = 1;
424     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
425     break;
426   case MESSAGE_HAVE:
427     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
428               message->mailbox, message->issuer_host_name, message->index);
429     xbt_assert(remote_peer->bitfield, "bitfield not received");
430     xbt_assert((message->index >= 0
431                 && message->index < FILE_PIECES),
432                "Wrong HAVE message received");
433     remote_peer->bitfield[message->index] = '1';
434     peer->pieces_count[message->index]++;
435     //If the piece is in our pieces, we tell the peer that we are interested.
436     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
437       remote_peer->am_interested = 1;
438       send_interested(peer, message->mailbox);
439       if (!remote_peer->choked_download)
440         request_new_piece_to_peer(peer, remote_peer);
441     }
442     break;
443   case MESSAGE_REQUEST:
444     xbt_assert((message->index >= 0
445                 && message->index < FILE_PIECES), "Wrong request received");
446     if (!remote_peer->choked_upload) {
447       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
448                 message->mailbox, message->issuer_host_name, message->peer_id,
449                 message->block_index,
450                 message->block_index + message->block_length);
451       if (peer->bitfield[message->index] == '1') {
452         send_piece(peer, message->mailbox, message->index, 0,
453                    message->block_index, message->block_length);
454       }
455     } else {
456       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
457                 message->mailbox, message->issuer_host_name,
458                 message->peer_id);
459     }
460     break;
461   case MESSAGE_PIECE:
462     xbt_assert((message->index >= 0
463                 && message->index < FILE_PIECES), "Wrong piece received");
464     //TODO: Execute Ã  computation.
465     if (message->stalled) {
466       XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
467                 message->index, message->mailbox, message->issuer_host_name);
468     } else {
469       XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
470                 message->block_index,
471                 message->block_index + message->block_length,
472                 message->mailbox, message->issuer_host_name);
473       if (peer->bitfield[message->index] == '0') {
474         update_bitfield_blocks(peer, message->index, message->block_index,
475                                message->block_length);
476         if (piece_complete(peer, message->index)) {
477           //Removing the piece from our piece list
478           remove_current_piece(peer, remote_peer, message->index);
479           //Setting the fact that we have the piece
480           peer->bitfield[message->index] = '1';
481           peer->pieces++;
482           XBT_DEBUG("My status is now %s", peer->bitfield);
483           //Sending the information to all the peers we are connected to
484           send_have(peer, message->index);
485           //sending UNINTERSTED to peers that doesn't have what we want.
486           update_interested_after_receive(peer);
487         } else {                // piece not completed
488           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
489         }
490       } else {
491         XBT_DEBUG("However, we already have it");
492         request_new_piece_to_peer(peer, remote_peer);
493       }
494     }
495     break;
496   case MESSAGE_CANCEL:
497     XBT_DEBUG("The received CANCEL from %s (%s)",
498               message->mailbox, message->issuer_host_name);
499     break;
500   }
501   //Update the peer speed.
502   if (remote_peer) {
503     connection_add_speed_value(remote_peer,
504                                1.0 / (MSG_get_clock() -
505                                       peer->begin_receive_time));
506   }
507   peer->begin_receive_time = MSG_get_clock();
508
509   task_message_free(task);
510 }
511
512 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
513 {
514   int piece = select_piece_to_download(peer, remote_peer);
515   if (piece != -1) {
516     xbt_dynar_push_as(peer->current_pieces, int, piece);
517     send_request_to_peer(peer, remote_peer, piece);
518   }
519 }
520
521 void remove_current_piece(peer_t peer, connection_t remote_peer,
522                           int current_piece)
523 {
524   int piece_index = -1, piece, i;
525   xbt_dynar_foreach(peer->current_pieces, i, piece) {
526     if (piece == current_piece) {
527       piece_index = i;
528       break;
529     }
530   }
531   if (piece_index != -1)
532     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
533   remote_peer->current_piece = -1;
534 }
535
536 /**
537  * Updates the list of who has a piece from a bitfield
538  * @param peer peer we want to update the list
539  * @param bitfield bitfield
540  */
541 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
542 {
543   int i;
544   for (i = 0; i < FILE_PIECES; i++) {
545     if (bitfield[i] == '1') {
546       peer->pieces_count[i]++;
547     }
548   }
549 }
550
551
552
553 /**
554  * Return the piece to be downloaded
555  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
556  * If a piece is partially downloaded, this piece will be selected prioritarily
557  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
558  * If the peer has more than pieces, he downloads the pieces that are the less
559  * replicated (rarest policy).
560  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
561
562  * @param peer: local peer
563  * @param remote_peer: information about the connection
564  * @return the piece to download if possible. -1 otherwise
565  */
566 int select_piece_to_download(peer_t peer, connection_t remote_peer)
567 {
568   int piece = -1;
569
570   piece = partially_downloaded_piece(peer, remote_peer);
571   // strict priority policy
572   if (piece != -1)
573     return piece;
574
575   // end game mode
576   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) && is_interested(peer, remote_peer)) {
577     int i;
578     int nb_interesting_pieces = 0;
579     int random_piece_index, current_index = 0;
580     // compute the number of interesting pieces
581     for (i = 0; i < FILE_PIECES; i++) {
582       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
583         nb_interesting_pieces++;
584       }
585     }
586     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
587     // get a random interesting piece
588     random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces-1);
589     for (i = 0; i < FILE_PIECES; i++) {
590       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
591         if (random_piece_index == current_index) {
592           piece = i;
593           break;
594         }
595         current_index++;
596       }
597     }
598     xbt_assert(piece != -1,"WTF !!!");
599     return piece;
600   }
601
602   // Random first policy
603   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
604     int i;
605     int nb_interesting_pieces = 0;
606     int random_piece_index, current_index = 0;
607     // compute the number of interesting pieces
608     for (i = 0; i < FILE_PIECES; i++) {
609       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
610         nb_interesting_pieces++;
611       }
612     }
613     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
614     // get a random interesting piece
615     random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces-1);
616     for (i = 0; i < FILE_PIECES; i++) {
617       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
618         if (random_piece_index == current_index) {
619           piece = i;
620           break;
621         }
622         current_index++;
623       }
624     }
625     xbt_assert(piece != -1,"WTF !!!");
626     return piece;
627   } else {                      // Rarest first policy
628     int i;
629     short min = SHRT_MAX;
630     int nb_min_pieces = 0;
631     int random_rarest_index, current_index = 0;
632     // compute the smallest number of copies of available pieces
633     for (i = 0; i < FILE_PIECES; i++) {
634       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
635           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
636         min = peer->pieces_count[i];
637     }
638     xbt_assert(min != SHRT_MAX || !is_interested_and_free(peer, remote_peer), "WTF !!!");
639     // compute the number of rarest pieces
640     for (i = 0; i < FILE_PIECES; i++) {
641       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
642           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
643         nb_min_pieces++;
644     }
645     xbt_assert(nb_min_pieces != 0 || !is_interested_and_free(peer, remote_peer), "WTF !!!");
646     // get a random rarest piece
647     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces-1);
648     for (i = 0; i < FILE_PIECES; i++) {
649       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
650           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
651         if (random_rarest_index == current_index) {
652           piece = i;
653           break;
654         }
655         current_index++;
656       }
657     }
658     xbt_assert(piece != -1 || !is_interested_and_free(peer, remote_peer),"WTF !!!");
659     return piece;
660   }
661 }
662
663
664 /**
665  * Update the list of current choked and unchoked peers, using the
666  * choke algorithm
667  * @param peer the current peer
668  */
669 void update_choked_peers(peer_t peer)
670 {
671   if (nb_interested_peers(peer) == 0)
672     return;
673   //  if(xbt_dict_size(peer->active_peers) > 0)
674   //    return;
675   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
676             xbt_dict_size(peer->active_peers));
677   //update the current round
678   peer->round = (peer->round + 1) % 3;
679   char *key, *key_choked;
680   connection_t peer_choosed = NULL;
681   connection_t peer_choked = NULL;
682   //remove a peer from the list
683   xbt_dict_cursor_t cursor = NULL;
684   xbt_dict_cursor_first(peer->active_peers, &cursor);
685   if (xbt_dict_length(peer->active_peers) > 0) {
686     key_choked = xbt_dict_cursor_get_key(cursor);
687     peer_choked = xbt_dict_cursor_get_data(cursor);
688   }
689   xbt_dict_cursor_free(&cursor);
690
691   /**
692    * If we are currently seeding, we unchoke the peer which has
693    * been unchoke the least time.
694    */
695   if (peer->pieces == FILE_PIECES) {
696     connection_t connection;
697     double unchoke_time = MSG_get_clock() + 1;
698
699     xbt_dict_foreach(peer->peers, cursor, key, connection) {
700       if (connection->last_unchoke < unchoke_time && connection->interested
701           && connection->choked_upload) {
702         unchoke_time = connection->last_unchoke;
703         peer_choosed = connection;
704       }
705     }
706   } else {
707     //Random optimistic unchoking
708     if (peer->round == 0) {
709       int j = 0;
710       do {
711         //We choose a random peer to unchoke.
712         int id_chosen = RngStream_RandInt(peer->stream, 0,
713                                           xbt_dict_length(peer->peers) - 1);
714         int i = 0;
715         connection_t connection;
716         xbt_dict_foreach(peer->peers, cursor, key, connection) {
717           if (i == id_chosen) {
718             peer_choosed = connection;
719             break;
720           }
721           i++;
722         }
723         xbt_dict_cursor_free(&cursor);
724         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
725           peer_choosed = NULL;
726         }
727         j++;
728       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
729     } else {
730       //Use the "fastest download" policy.
731       connection_t connection;
732       double fastest_speed = 0.0;
733       xbt_dict_foreach(peer->peers, cursor, key, connection) {
734         if (connection->peer_speed > fastest_speed
735             && connection->choked_upload && connection->interested) {
736           peer_choosed = connection;
737           fastest_speed = connection->peer_speed;
738         }
739       }
740     }
741
742   }
743   if (peer_choosed != NULL)
744     XBT_DEBUG
745       ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
746        peer->id, peer_choosed->id, peer_choosed->interested,
747        peer_choosed->choked_upload);
748
749   //  if (xbt_dict_size(peer->peers) > 0)
750   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
751   //        "No more active peers !");
752
753
754   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
755   //    peer_choked = NULL;
756   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
757   //    peer_choosed = NULL;
758
759   if (peer_choked != peer_choosed) {
760     if (peer_choked != NULL) {
761       xbt_assert((!peer_choked->choked_upload),
762                  "Tries to choked a choked peer");
763       peer_choked->choked_upload = 1;
764       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
765       update_active_peers_set(peer, peer_choked);
766       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
767       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
768       send_choked(peer, peer_choked->mailbox);
769     }
770     if (peer_choosed != NULL) {
771       xbt_assert((peer_choosed->choked_upload),
772                  "Tries to unchoked an unchoked peer");
773       peer_choosed->choked_upload = 0;
774       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
775                        sizeof(int), peer_choosed, NULL);
776       peer_choosed->last_unchoke = MSG_get_clock();
777       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
778       update_active_peers_set(peer, peer_choosed);
779       send_unchoked(peer, peer_choosed->mailbox);
780     }
781   }
782 }
783
784 /**
785  * Updates our "interested" state about peers: send "not interested" to peers
786  * that don't have any more pieces we want.
787  * @param peer our peer data
788  */
789 void update_interested_after_receive(peer_t peer)
790 {
791   char *key;
792   xbt_dict_cursor_t cursor;
793   connection_t connection;
794   unsigned cpt;
795   int interested, piece;
796   xbt_dict_foreach(peer->peers, cursor, key, connection) {
797     interested = 0;
798     if (connection->am_interested) {
799       xbt_assert(connection->bitfield, "Bitfield not received");
800       //Check if the peer still has a piece we want.
801       int i;
802       for (i = 0; i < FILE_PIECES; i++) {
803         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
804           interested = 1;
805           break;
806         }
807       }
808       if (!interested) {        //no more piece to download from connection
809         connection->am_interested = 0;
810         send_notinterested(peer, connection->mailbox);
811       }
812     }
813   }
814 }
815
816 void update_bitfield_blocks(peer_t peer, int index, int block_index,
817                             int block_length)
818 {
819   int i;
820   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
821   xbt_assert((block_index >= 0
822               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
823              block_index);
824   for (i = block_index; i < (block_index + block_length); i++) {
825     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
826   }
827 }
828
829 /**
830  * Returns if a peer has completed the download of a piece
831  */
832 int piece_complete(peer_t peer, int index)
833 {
834   int i;
835   for (i = 0; i < PIECES_BLOCKS; i++) {
836     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
837       return 0;
838     }
839   }
840   return 1;
841 }
842
843 /**
844  * Returns the first block that a peer doesn't have in a piece.
845  * If the peer has all blocks of the piece, returns -1.
846  */
847 int get_first_block(peer_t peer, int piece)
848 {
849   int i;
850   for (i = 0; i < PIECES_BLOCKS; i++) {
851     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
852       return i;
853     }
854   }
855   return -1;
856 }
857
858 /**
859  * Indicates if the remote peer has a piece not stored by the local peer
860  */
861 int is_interested(peer_t peer, connection_t remote_peer)
862 {
863   xbt_assert(remote_peer->bitfield, "Bitfield not received");
864   int i;
865   for (i = 0; i < FILE_PIECES; i++) {
866     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
867       return 1;
868     }
869   }
870   return 0;
871 }
872
873 /**
874  * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
875  */
876 int is_interested_and_free(peer_t peer, connection_t remote_peer)
877 {
878   xbt_assert(remote_peer->bitfield, "Bitfield not received");
879   int i;
880   for (i = 0; i < FILE_PIECES; i++) {
881     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
882         && !in_current_pieces(peer, i)) {
883       return 1;
884     }
885   }
886   return 0;
887 }
888
889
890 /**
891  * Returns a piece that is partially downloaded and stored by the remote peer if any
892  * -1 otherwise.
893  */
894 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
895 {
896   xbt_assert(remote_peer->bitfield, "Bitfield not received");
897   int i;
898   for (i = 0; i < FILE_PIECES; i++) {
899     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
900         && !in_current_pieces(peer, i)) {
901       if (get_first_block(peer, i) > 0)
902         return i;
903     }
904   }
905   return -1;
906 }
907
908
909 /**
910  * Send request messages to a peer that have unchoked us
911  * @param peer peer
912  * @param remote_peer peer data to the peer we want to send the request
913  */
914 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
915 {
916   remote_peer->current_piece = piece;
917   unsigned i;
918   int block_index, block_length;
919   xbt_assert(remote_peer->bitfield, "bitfield not received");
920   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
921   block_index = get_first_block(peer, piece);
922   if (block_index != -1) {
923     block_length = PIECES_BLOCKS - block_index;
924     block_length = min(BLOCKS_REQUESTED, block_length);
925     send_request(peer, remote_peer->mailbox, piece, block_index,
926                  block_length);
927   }
928 }
929
930
931 /**
932  * Indicates if a piece is currently being downloaded by the peer.
933  */
934 int in_current_pieces(peer_t peer, int piece)
935 {
936   unsigned i;
937   int peer_piece;
938   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
939     if (peer_piece == piece) {
940       return 1;
941     }
942   }
943   return 0;
944 }
945
946
947
948
949 /***********************************************************
950  *
951  *  Low level message functions
952  *
953  ***********************************************************/
954
955
956
957 /**
958  * Send a "interested" message to a peer
959  * @param peer peer data
960  * @param mailbox destination mailbox
961  */
962 void send_interested(peer_t peer, const char *mailbox)
963 {
964   msg_task_t task =
965     task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
966                      peer->id, task_message_size(MESSAGE_INTERESTED));
967   MSG_task_dsend(task, mailbox, task_message_free);
968   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
969
970 }
971
972 /**
973  * Send a "not interested" message to a peer
974  * @param peer peer data
975  * @param mailbox destination mailbox
976  */
977 void send_notinterested(peer_t peer, const char *mailbox)
978 {
979   msg_task_t task =
980     task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
981                      peer->id, task_message_size(MESSAGE_NOTINTERESTED));
982   MSG_task_dsend(task, mailbox, task_message_free);
983   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
984
985 }
986
987 /**
988  * Send a handshake message to all the peers the peer has.
989  * @param peer peer data
990  */
991 void send_handshake_all(peer_t peer)
992 {
993   connection_t remote_peer;
994   xbt_dict_cursor_t cursor = NULL;
995   char *key;
996   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
997     msg_task_t task =
998       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
999                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
1000     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1001     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
1002   }
1003 }
1004
1005 /**
1006  * Send a "handshake" message to an user
1007  * @param peer peer data
1008  * @param mailbox mailbox where to we send the message
1009  */
1010 void send_handshake(peer_t peer, const char *mailbox)
1011 {
1012   msg_task_t task =
1013     task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
1014                      peer->id, task_message_size(MESSAGE_HANDSHAKE));
1015   MSG_task_dsend(task, mailbox, task_message_free);
1016   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
1017 }
1018
1019 /**
1020  * Send a "choked" message to a peer.
1021  */
1022 void send_choked(peer_t peer, const char *mailbox)
1023 {
1024   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
1025   msg_task_t task =
1026     task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1027                      peer->id, task_message_size(MESSAGE_CHOKE));
1028   MSG_task_dsend(task, mailbox, task_message_free);
1029 }
1030
1031 /**
1032  * Send a "unchoked" message to a peer
1033  */
1034 void send_unchoked(peer_t peer, const char *mailbox)
1035 {
1036   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1037   msg_task_t task =
1038     task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1039                      peer->id, task_message_size(MESSAGE_UNCHOKE));
1040   MSG_task_dsend(task, mailbox, task_message_free);
1041 }
1042
1043 /**
1044  * Send a "HAVE" message to all peers we are connected to
1045  */
1046 void send_have(peer_t peer, int piece)
1047 {
1048   XBT_DEBUG("Sending HAVE message to all my peers");
1049   connection_t remote_peer;
1050   xbt_dict_cursor_t cursor = NULL;
1051   char *key;
1052   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1053     msg_task_t task =
1054       task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1055                              peer->id, piece,
1056                              task_message_size(MESSAGE_HAVE));
1057     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1058   }
1059 }
1060
1061 /**
1062  * Send a bitfield message to all the peers the peer has.
1063  * @param peer peer data
1064  */
1065 void send_bitfield(peer_t peer, const char *mailbox)
1066 {
1067   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1068   msg_task_t task =
1069     task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1070                               peer->bitfield, FILE_PIECES);
1071   //Async send and append to pending sends
1072   msg_comm_t comm = MSG_task_isend(task, mailbox);
1073   xbt_dynar_push(peer->pending_sends, &comm);
1074 }
1075
1076 /**
1077  * Send a "request" message to a pair, containing a request for a piece
1078  */
1079 void send_request(peer_t peer, const char *mailbox, int piece,
1080                   int block_index, int block_length)
1081 {
1082   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1083             block_index, block_length);
1084   msg_task_t task =
1085     task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1086                              block_index, block_length);
1087   MSG_task_dsend(task, mailbox, task_message_free);
1088 }
1089
1090 /**
1091  * Send a "piece" message to a pair, containing a piece of the file
1092  */
1093 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
1094                 int block_index, int block_length)
1095 {
1096   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1097             block_length, mailbox);
1098   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1099   xbt_assert((peer->bitfield[piece] == '1'),
1100              "Tried to send a piece that we doesn't have.");
1101   msg_task_t task =
1102     task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1103                            stalled, block_index, block_length, BLOCK_SIZE);
1104   MSG_task_dsend(task, mailbox, task_message_free);
1105 }