Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
88b287f69b703e66ccdf4e3e90e022cac69ced0f
[simgrid.git] / examples / msg / bittorrent / peer.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12 #include <limits.h>
13
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15
16 //TODO: Let users change this
17 /*
18  * File transfered data
19  * For the test, default values are :
20  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
21  */
22
23 #define FILE_PIECES  10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE  16384
26
27 /**
28  *  Number of blocks asked by each request
29  */
30 #define BLOCKS_REQUESTED 2
31
32
33 static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
34
35 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
36 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
37 void remove_current_piece(peer_t peer, connection_t remote_peer,
38                           int current_piece);
39
40
41   /**
42  * Peer main function
43  */
44 int peer(int argc, char *argv[])
45 {
46   s_peer_t peer;
47   //Check arguments
48   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
49   //Build peer object
50   if (argc == 4) {
51     peer_init(&peer, atoi(argv[1]), 1);
52   } else {
53     peer_init(&peer, atoi(argv[1]), 0);
54   }
55   //Retrieve deadline
56   double deadline = atof(argv[2]);
57   xbt_assert(deadline > 0, "Wrong deadline supplied");
58   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
59   //Getting peer data from the tracker.
60   if (get_peers_data(&peer)) {
61     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
62     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
63     peer.begin_receive_time = MSG_get_clock();
64     MSG_mailbox_set_async(peer.mailbox);
65     if (has_finished(peer.bitfield)) {
66       peer.pieces = FILE_PIECES;
67       send_handshake_all(&peer);
68       seed_loop(&peer, deadline);
69     } else {
70       leech_loop(&peer, deadline);
71 //      XBT_INFO("%d becomes a seeder", peer.id);
72       seed_loop(&peer, deadline);
73     }
74   } else {
75     XBT_INFO("Couldn't contact the tracker.");
76   }
77
78   XBT_INFO("Here is my current status: %s", peer.bitfield);
79   if (peer.comm_received) {
80     MSG_comm_destroy(peer.comm_received);
81   }
82
83   peer_free(&peer);
84   return 0;
85 }
86
87 /**
88  * Peer main loop when it is leeching.
89  * @param peer peer data
90  * @param deadline time at which the peer has to leave
91  */
92 void leech_loop(peer_t peer, double deadline)
93 {
94   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
95   XBT_DEBUG("Start downloading.");
96   /*
97    * Send a "handshake" message to all the peers it got
98    * (since it couldn't have gotten more than 50 peers)
99    */
100   send_handshake_all(peer);
101   //Wait for at least one "bitfield" message.
102 //  wait_for_pieces(peer, deadline);
103   XBT_DEBUG("Starting main leech loop");
104
105   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
106     if (peer->comm_received == NULL) {
107 //      XBT_INFO("irecv");
108       peer->task_received = NULL;
109       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
110     }
111     if (MSG_comm_test(peer->comm_received)) {
112 //      XBT_INFO("comm_test OK");
113       msg_error_t status = MSG_comm_get_status(peer->comm_received);
114       MSG_comm_destroy(peer->comm_received);
115       peer->comm_received = NULL;
116       if (status == MSG_OK) {
117         handle_message(peer, peer->task_received);
118       }
119     } else {
120       handle_pending_sends(peer);
121       //We don't execute the choke algorithm if we don't already have a piece
122       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
123         update_choked_peers(peer);
124         next_choked_update += UPDATE_CHOKED_INTERVAL;
125       } else {
126         MSG_process_sleep(1);
127       }
128     }
129   }
130 }
131
132 /**
133  * Peer main loop when it is seeding
134  * @param peer peer data
135  * @param deadline time when the peer will leave
136  */
137 void seed_loop(peer_t peer, double deadline)
138 {
139   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
140   XBT_DEBUG("Start seeding.");
141   //start the main seed loop
142   while (MSG_get_clock() < deadline) {
143     if (peer->comm_received == NULL) {
144       peer->task_received = NULL;
145       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
146     }
147     if (MSG_comm_test(peer->comm_received)) {
148       msg_error_t status = MSG_comm_get_status(peer->comm_received);
149       MSG_comm_destroy(peer->comm_received);
150       peer->comm_received = NULL;
151       if (status == MSG_OK) {
152         handle_message(peer, peer->task_received);
153       }
154     } else {
155       if (MSG_get_clock() >= next_choked_update) {
156         update_choked_peers(peer);
157         //TODO: Change the choked peer algorithm when seeding.
158         next_choked_update += UPDATE_CHOKED_INTERVAL;
159       } else {
160         MSG_process_sleep(1);
161       }
162     }
163   }
164 }
165
166 /**
167  * Retrieves the peer list from the tracker
168  * @param peer current peer data
169  */
170 int get_peers_data(peer_t peer)
171 {
172   int success = 0, send_success = 0;
173   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
174   //Build the task to send to the tracker
175   tracker_task_data_t data =
176       tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
177                             peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
178   //Build the task to send.
179   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
180   msg_task_t task_received = NULL;
181   msg_comm_t comm_received;
182   while (!send_success && MSG_get_clock() < timeout) {
183     XBT_DEBUG("Sending a peer request to the tracker.");
184     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
185                                                     GET_PEERS_TIMEOUT);
186     if (status == MSG_OK) {
187       send_success = 1;
188     }
189   }
190   while (!success && MSG_get_clock() < timeout) {
191     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
192     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
193     if (status == MSG_OK) {
194       tracker_task_data_t data = MSG_task_get_data(task_received);
195       unsigned i;
196       int peer_id;
197       //Add the peers the tracker gave us to our peer list.
198       xbt_dynar_foreach(data->peers, i, peer_id) {
199         if (peer_id != peer->id)
200           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
201                            connection_new(peer_id), NULL);
202       }
203       success = 1;
204       //free the communication and the task
205       MSG_comm_destroy(comm_received);
206       tracker_task_data_free(data);
207       MSG_task_destroy(task_received);
208       comm_received = NULL;
209     }
210   }
211
212   return success;
213 }
214
215 /**
216  * Initialize the peer data.
217  * @param peer peer data
218  * @param id id of the peer to take in the network
219  * @param seed indicates if the peer is a seed.
220  */
221 void peer_init(peer_t peer, int id, int seed)
222 {
223   peer->id = id;
224   sprintf(peer->mailbox, "%d", id);
225   sprintf(peer->mailbox_tracker, "tracker_%d", id);
226   peer->peers = xbt_dict_new();
227   peer->active_peers = xbt_dict_new();
228   peer->hostname = MSG_host_get_name(MSG_host_self());
229
230   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
231   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
232   if (seed) {
233     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
234     memset(peer->bitfield_blocks, '1',
235            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
236   } else {
237     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
238     memset(peer->bitfield_blocks, '0',
239            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
240   }
241
242   peer->bitfield[FILE_PIECES] = '\0';
243   peer->pieces = 0;
244
245   peer->pieces_count = xbt_new0(short, FILE_PIECES);
246
247   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
248
249   peer->stream = RngStream_CreateStream("");
250   peer->comm_received = NULL;
251
252   peer->round = 0;
253
254   peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
255 }
256
257 /**
258  * Destroys a poor peer object.
259  */
260 void peer_free(peer_t peer)
261 {
262   char *key;
263   connection_t connection;
264   xbt_dict_cursor_t cursor;
265   xbt_dict_foreach(peer->peers, cursor, key, connection) {
266     connection_free(connection);
267   }
268   xbt_dict_free(&peer->peers);
269   xbt_dict_free(&peer->active_peers);
270   xbt_dynar_free(&peer->current_pieces);
271   xbt_dynar_free(&peer->pending_sends);
272   xbt_free(peer->pieces_count);
273   xbt_free(peer->bitfield);
274   xbt_free(peer->bitfield_blocks);
275
276   RngStream_DeleteStream(&peer->stream);
277 }
278
279 /**
280  * Returns if a peer has finished downloading the file
281  * @param bitfield peer bitfield
282  */
283 int has_finished(char *bitfield)
284 {
285   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
286 }
287
288 int nb_interested_peers(peer_t peer)
289 {
290   xbt_dict_cursor_t cursor = NULL;
291   char *key;
292   connection_t connection;
293   int nb = 0;
294   xbt_dict_foreach(peer->peers, cursor, key, connection) {
295     if (connection->interested)
296       nb++;
297   }
298   return nb;
299 }
300
301 /**
302  * Handle pending sends and remove those which are done
303  * @param peer Peer data
304  */
305 void handle_pending_sends(peer_t peer)
306 {
307   int index;
308
309   while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
310     msg_comm_t comm_send =
311         xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
312     int status = MSG_comm_get_status(comm_send);
313     xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
314     XBT_DEBUG
315         ("Communication %p is finished with status %d, dynar size is now %lu",
316          comm_send, status, xbt_dynar_length(peer->pending_sends));
317
318     msg_task_t task = MSG_comm_get_task(comm_send);
319     MSG_comm_destroy(comm_send);
320
321     if (status != MSG_OK) {
322       task_message_free(task);
323     }
324   }
325 }
326
327 void update_active_peers_set(peer_t peer, connection_t remote_peer)
328 {
329
330   if (remote_peer->interested && !remote_peer->choked_upload) {
331     //add in the active peers set
332     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
333                      sizeof(int), remote_peer, NULL);
334   } else {
335     //remove
336     xbt_ex_t e;
337     TRY {
338       xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
339                           sizeof(int));
340     }
341     CATCH(e) {
342       xbt_ex_free(e);
343     }
344   }
345 }
346
347
348 /**
349  * Handle a received message sent by another peer
350  * @param peer Peer data
351  * @param task task received.
352  */
353 void handle_message(peer_t peer, msg_task_t task)
354 {
355   message_t message = MSG_task_get_data(task);
356   connection_t remote_peer;
357   remote_peer =
358       xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
359                                sizeof(int));
360   switch (message->type) {
361
362   case MESSAGE_HANDSHAKE:
363     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
364               message->issuer_host_name);
365     //Check if the peer is in our connection list.
366     if (!remote_peer) {
367       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
368                        connection_new(message->peer_id), NULL);
369       send_handshake(peer, message->mailbox);
370     }
371     //Send our bitfield to the peer
372     send_bitfield(peer, message->mailbox);
373     break;
374   case MESSAGE_BITFIELD:
375     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
376               message->issuer_host_name);
377     //Update the pieces list
378     update_pieces_count_from_bitfield(peer, message->bitfield);
379     //Store the bitfield
380     remote_peer->bitfield = xbt_strdup(message->bitfield);
381     xbt_assert(!remote_peer->am_interested,
382                "Should not be interested at first");
383     if (is_interested(peer, remote_peer)) {
384       remote_peer->am_interested = 1;
385       send_interested(peer, message->mailbox);
386     }
387     break;
388   case MESSAGE_INTERESTED:
389     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
390               message->issuer_host_name);
391     xbt_assert((remote_peer != NULL),
392                "A non-in-our-list peer has sent us a message. WTH ?");
393     //Update the interested state of the peer.
394     remote_peer->interested = 1;
395     update_active_peers_set(peer, remote_peer);
396     break;
397   case MESSAGE_NOTINTERESTED:
398     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
399               message->mailbox, message->issuer_host_name);
400     xbt_assert((remote_peer != NULL),
401                "A non-in-our-list peer has sent us a message. WTH ?");
402     remote_peer->interested = 0;
403     update_active_peers_set(peer, remote_peer);
404     break;
405   case MESSAGE_UNCHOKE:
406     xbt_assert((remote_peer != NULL),
407                "A non-in-our-list peer has sent us a message. WTH ?");
408     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
409               message->issuer_host_name);
410     remote_peer->choked_download = 0;
411     //Send requests to the peer, since it has unchoked us
412     if (remote_peer->am_interested)
413       request_new_piece_to_peer(peer, remote_peer);
414     break;
415   case MESSAGE_CHOKE:
416     xbt_assert((remote_peer != NULL),
417                "A non-in-our-list peer has sent us a message. WTH ?");
418     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
419               message->issuer_host_name);
420     remote_peer->choked_download = 1;
421     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
422     break;
423   case MESSAGE_HAVE:
424     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
425               message->mailbox, message->issuer_host_name, message->index);
426     xbt_assert(remote_peer->bitfield, "bitfield not received");
427     xbt_assert((message->index >= 0
428                 && message->index < FILE_PIECES),
429                "Wrong HAVE message received");
430     remote_peer->bitfield[message->index] = '1';
431     peer->pieces_count[message->index]++;
432     //If the piece is in our pieces, we tell the peer that we are interested.
433     if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
434       remote_peer->am_interested = 1;
435       send_interested(peer, message->mailbox);
436       if (!remote_peer->choked_download)
437         request_new_piece_to_peer(peer, remote_peer);
438     }
439     break;
440   case MESSAGE_REQUEST:
441     xbt_assert((message->index >= 0
442                 && message->index < FILE_PIECES), "Wrong request received");
443     if (!remote_peer->choked_upload) {
444       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
445                 message->mailbox, message->issuer_host_name, message->peer_id,
446                 message->block_index,
447                 message->block_index + message->block_length);
448       if (peer->bitfield[message->index] == '1') {
449         send_piece(peer, message->mailbox, message->index, 0,
450                    message->block_index, message->block_length);
451       }
452     } else {
453       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
454                 message->mailbox, message->issuer_host_name, message->peer_id);
455     }
456     break;
457   case MESSAGE_PIECE:
458     xbt_assert((message->index >= 0
459                 && message->index < FILE_PIECES), "Wrong piece received");
460     //TODO: Execute Ã  computation.
461     if (message->stalled) {
462       XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
463                 message->index, message->mailbox, message->issuer_host_name);
464     } else {
465       XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
466                 message->block_index,
467                 message->block_index + message->block_length,
468                 message->mailbox, message->issuer_host_name);
469       if (peer->bitfield[message->index] == '0') {
470         update_bitfield_blocks(peer, message->index, message->block_index,
471                                message->block_length);
472         if (piece_complete(peer, message->index)) {
473           //Removing the piece from our piece list
474           remove_current_piece(peer, remote_peer, message->index);
475           //Setting the fact that we have the piece
476           peer->bitfield[message->index] = '1';
477           peer->pieces++;
478           XBT_DEBUG("My status is now %s", peer->bitfield);
479           //Sending the information to all the peers we are connected to
480           send_have(peer, message->index);
481           //sending UNINTERSTED to peers that doesn't have what we want.
482           update_interested_after_receive(peer);
483         } else {                // piece not completed
484           send_request_to_peer(peer, remote_peer, message->index);      // ask for the next block
485         }
486       } else {
487         XBT_DEBUG("However, we already have it");
488         request_new_piece_to_peer(peer, remote_peer);
489       }
490     }
491     break;
492   case MESSAGE_CANCEL:
493     XBT_DEBUG("The received CANCEL from %s (%s)",
494               message->mailbox, message->issuer_host_name);
495     break;
496   }
497   //Update the peer speed.
498   if (remote_peer) {
499     connection_add_speed_value(remote_peer,
500                                1.0 / (MSG_get_clock() -
501                                       peer->begin_receive_time));
502   }
503   peer->begin_receive_time = MSG_get_clock();
504
505   task_message_free(task);
506 }
507
508 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
509 {
510   int piece = select_piece_to_download(peer, remote_peer);
511   if (piece != -1) {
512     xbt_dynar_push_as(peer->current_pieces, int, piece);
513     send_request_to_peer(peer, remote_peer, piece);
514   }
515 }
516
517 void remove_current_piece(peer_t peer, connection_t remote_peer,
518                           int current_piece)
519 {
520   int piece_index = -1, piece, i;
521   xbt_dynar_foreach(peer->current_pieces, i, piece) {
522     if (piece == current_piece) {
523       piece_index = i;
524       break;
525     }
526   }
527   if (piece_index != -1)
528     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
529   remote_peer->current_piece = -1;
530 }
531
532 /**
533  * Updates the list of who has a piece from a bitfield
534  * @param peer peer we want to update the list
535  * @param bitfield bitfield
536  */
537 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
538 {
539   int i;
540   for (i = 0; i < FILE_PIECES; i++) {
541     if (bitfield[i] == '1') {
542       peer->pieces_count[i]++;
543     }
544   }
545 }
546
547
548
549 /**
550  * Return the piece to be downloaded
551  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
552  * If a piece is partially downloaded, this piece will be selected prioritarily
553  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
554  * If the peer has more than pieces, he downloads the pieces that are the less
555  * replicated (rarest policy).
556  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
557
558  * @param peer: local peer
559  * @param remote_peer: information about the connection
560  * @return the piece to download if possible. -1 otherwise
561  */
562 int select_piece_to_download(peer_t peer, connection_t remote_peer)
563 {
564   int piece = -1;
565
566   piece = partially_downloaded_piece(peer, remote_peer);
567   // strict priority policy
568   if (piece != -1)
569     return piece;
570
571   // end game mode
572   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
573       && is_interested(peer, remote_peer)) {
574     int i;
575     int nb_interesting_pieces = 0;
576     int random_piece_index, current_index = 0;
577     // compute the number of interesting pieces
578     for (i = 0; i < FILE_PIECES; i++) {
579       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
580         nb_interesting_pieces++;
581       }
582     }
583     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
584     // get a random interesting piece
585     random_piece_index =
586         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
587     for (i = 0; i < FILE_PIECES; i++) {
588       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
589         if (random_piece_index == current_index) {
590           piece = i;
591           break;
592         }
593         current_index++;
594       }
595     }
596     xbt_assert(piece != -1, "WTF !!!");
597     return piece;
598   }
599   // Random first policy
600   if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
601     int i;
602     int nb_interesting_pieces = 0;
603     int random_piece_index, current_index = 0;
604     // compute the number of interesting pieces
605     for (i = 0; i < FILE_PIECES; i++) {
606       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
607           && !in_current_pieces(peer, i)) {
608         nb_interesting_pieces++;
609       }
610     }
611     xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
612     // get a random interesting piece
613     random_piece_index =
614         RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
615     for (i = 0; i < FILE_PIECES; i++) {
616       if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
617           && !in_current_pieces(peer, i)) {
618         if (random_piece_index == current_index) {
619           piece = i;
620           break;
621         }
622         current_index++;
623       }
624     }
625     xbt_assert(piece != -1, "WTF !!!");
626     return piece;
627   } else {                      // Rarest first policy
628     int i;
629     short min = SHRT_MAX;
630     int nb_min_pieces = 0;
631     int random_rarest_index, current_index = 0;
632     // compute the smallest number of copies of available pieces
633     for (i = 0; i < FILE_PIECES; i++) {
634       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
635           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
636         min = peer->pieces_count[i];
637     }
638     xbt_assert(min != SHRT_MAX
639                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
640     // compute the number of rarest pieces
641     for (i = 0; i < FILE_PIECES; i++) {
642       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
643           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
644         nb_min_pieces++;
645     }
646     xbt_assert(nb_min_pieces != 0
647                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
648     // get a random rarest piece
649     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
650     for (i = 0; i < FILE_PIECES; i++) {
651       if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
652           && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
653         if (random_rarest_index == current_index) {
654           piece = i;
655           break;
656         }
657         current_index++;
658       }
659     }
660     xbt_assert(piece != -1
661                || !is_interested_and_free(peer, remote_peer), "WTF !!!");
662     return piece;
663   }
664 }
665
666
667 /**
668  * Update the list of current choked and unchoked peers, using the
669  * choke algorithm
670  * @param peer the current peer
671  */
672 void update_choked_peers(peer_t peer)
673 {
674   if (nb_interested_peers(peer) == 0)
675     return;
676   //  if(xbt_dict_size(peer->active_peers) > 0)
677   //    return;
678   XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
679             xbt_dict_size(peer->active_peers));
680   //update the current round
681   peer->round = (peer->round + 1) % 3;
682   char *key, *key_choked;
683   connection_t peer_choosed = NULL;
684   connection_t peer_choked = NULL;
685   //remove a peer from the list
686   xbt_dict_cursor_t cursor = NULL;
687   xbt_dict_cursor_first(peer->active_peers, &cursor);
688   if (xbt_dict_length(peer->active_peers) > 0) {
689     key_choked = xbt_dict_cursor_get_key(cursor);
690     peer_choked = xbt_dict_cursor_get_data(cursor);
691   }
692   xbt_dict_cursor_free(&cursor);
693
694   /**
695    * If we are currently seeding, we unchoke the peer which has
696    * been unchoke the least time.
697    */
698   if (peer->pieces == FILE_PIECES) {
699     connection_t connection;
700     double unchoke_time = MSG_get_clock() + 1;
701
702     xbt_dict_foreach(peer->peers, cursor, key, connection) {
703       if (connection->last_unchoke < unchoke_time && connection->interested
704           && connection->choked_upload) {
705         unchoke_time = connection->last_unchoke;
706         peer_choosed = connection;
707       }
708     }
709   } else {
710     //Random optimistic unchoking
711     if (peer->round == 0) {
712       int j = 0;
713       do {
714         //We choose a random peer to unchoke.
715         int id_chosen = RngStream_RandInt(peer->stream, 0,
716                                           xbt_dict_length(peer->peers) - 1);
717         int i = 0;
718         connection_t connection;
719         xbt_dict_foreach(peer->peers, cursor, key, connection) {
720           if (i == id_chosen) {
721             peer_choosed = connection;
722             break;
723           }
724           i++;
725         }
726         xbt_dict_cursor_free(&cursor);
727         if (!peer_choosed->interested || !peer_choosed->choked_upload) {
728           peer_choosed = NULL;
729         }
730         j++;
731       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
732     } else {
733       //Use the "fastest download" policy.
734       connection_t connection;
735       double fastest_speed = 0.0;
736       xbt_dict_foreach(peer->peers, cursor, key, connection) {
737         if (connection->peer_speed > fastest_speed
738             && connection->choked_upload && connection->interested) {
739           peer_choosed = connection;
740           fastest_speed = connection->peer_speed;
741         }
742       }
743     }
744
745   }
746   if (peer_choosed != NULL)
747     XBT_DEBUG
748         ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
749          peer->id, peer_choosed->id, peer_choosed->interested,
750          peer_choosed->choked_upload);
751
752   //  if (xbt_dict_size(peer->peers) > 0)
753   //    xbt_assert((xbt_dict_size(peer->active_peers)  != 0),
754   //        "No more active peers !");
755
756
757   //  if (peer_choked != NULL && peer_choked->choked_upload  != 0)
758   //    peer_choked = NULL;
759   //  if (peer_choosed != NULL && peer_choosed->choked_upload  == 0)
760   //    peer_choosed = NULL;
761
762   if (peer_choked != peer_choosed) {
763     if (peer_choked != NULL) {
764       xbt_assert((!peer_choked->choked_upload),
765                  "Tries to choked a choked peer");
766       peer_choked->choked_upload = 1;
767       xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
768       update_active_peers_set(peer, peer_choked);
769       //      xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
770       XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
771       send_choked(peer, peer_choked->mailbox);
772     }
773     if (peer_choosed != NULL) {
774       xbt_assert((peer_choosed->choked_upload),
775                  "Tries to unchoked an unchoked peer");
776       peer_choosed->choked_upload = 0;
777       xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
778                        sizeof(int), peer_choosed, NULL);
779       peer_choosed->last_unchoke = MSG_get_clock();
780       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
781       update_active_peers_set(peer, peer_choosed);
782       send_unchoked(peer, peer_choosed->mailbox);
783     }
784   }
785 }
786
787 /**
788  * Updates our "interested" state about peers: send "not interested" to peers
789  * that don't have any more pieces we want.
790  * @param peer our peer data
791  */
792 void update_interested_after_receive(peer_t peer)
793 {
794   char *key;
795   xbt_dict_cursor_t cursor;
796   connection_t connection;
797   unsigned cpt;
798   int interested, piece;
799   xbt_dict_foreach(peer->peers, cursor, key, connection) {
800     interested = 0;
801     if (connection->am_interested) {
802       xbt_assert(connection->bitfield, "Bitfield not received");
803       //Check if the peer still has a piece we want.
804       int i;
805       for (i = 0; i < FILE_PIECES; i++) {
806         if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
807           interested = 1;
808           break;
809         }
810       }
811       if (!interested) {        //no more piece to download from connection
812         connection->am_interested = 0;
813         send_notinterested(peer, connection->mailbox);
814       }
815     }
816   }
817 }
818
819 void update_bitfield_blocks(peer_t peer, int index, int block_index,
820                             int block_length)
821 {
822   int i;
823   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
824   xbt_assert((block_index >= 0
825               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
826              block_index);
827   for (i = block_index; i < (block_index + block_length); i++) {
828     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
829   }
830 }
831
832 /**
833  * Returns if a peer has completed the download of a piece
834  */
835 int piece_complete(peer_t peer, int index)
836 {
837   int i;
838   for (i = 0; i < PIECES_BLOCKS; i++) {
839     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
840       return 0;
841     }
842   }
843   return 1;
844 }
845
846 /**
847  * Returns the first block that a peer doesn't have in a piece.
848  * If the peer has all blocks of the piece, returns -1.
849  */
850 int get_first_block(peer_t peer, int piece)
851 {
852   int i;
853   for (i = 0; i < PIECES_BLOCKS; i++) {
854     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
855       return i;
856     }
857   }
858   return -1;
859 }
860
861 /**
862  * Indicates if the remote peer has a piece not stored by the local peer
863  */
864 int is_interested(peer_t peer, connection_t remote_peer)
865 {
866   xbt_assert(remote_peer->bitfield, "Bitfield not received");
867   int i;
868   for (i = 0; i < FILE_PIECES; i++) {
869     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
870       return 1;
871     }
872   }
873   return 0;
874 }
875
876 /**
877  * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
878  */
879 int is_interested_and_free(peer_t peer, connection_t remote_peer)
880 {
881   xbt_assert(remote_peer->bitfield, "Bitfield not received");
882   int i;
883   for (i = 0; i < FILE_PIECES; i++) {
884     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
885         && !in_current_pieces(peer, i)) {
886       return 1;
887     }
888   }
889   return 0;
890 }
891
892
893 /**
894  * Returns a piece that is partially downloaded and stored by the remote peer if any
895  * -1 otherwise.
896  */
897 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
898 {
899   xbt_assert(remote_peer->bitfield, "Bitfield not received");
900   int i;
901   for (i = 0; i < FILE_PIECES; i++) {
902     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
903         && !in_current_pieces(peer, i)) {
904       if (get_first_block(peer, i) > 0)
905         return i;
906     }
907   }
908   return -1;
909 }
910
911
912 /**
913  * Send request messages to a peer that have unchoked us
914  * @param peer peer
915  * @param remote_peer peer data to the peer we want to send the request
916  */
917 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
918 {
919   remote_peer->current_piece = piece;
920   unsigned i;
921   int block_index, block_length;
922   xbt_assert(remote_peer->bitfield, "bitfield not received");
923   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
924   block_index = get_first_block(peer, piece);
925   if (block_index != -1) {
926     block_length = PIECES_BLOCKS - block_index;
927     block_length = min(BLOCKS_REQUESTED, block_length);
928     send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
929   }
930 }
931
932
933 /**
934  * Indicates if a piece is currently being downloaded by the peer.
935  */
936 int in_current_pieces(peer_t peer, int piece)
937 {
938   unsigned i;
939   int peer_piece;
940   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
941     if (peer_piece == piece) {
942       return 1;
943     }
944   }
945   return 0;
946 }
947
948
949
950
951 /***********************************************************
952  *
953  *  Low level message functions
954  *
955  ***********************************************************/
956
957
958
959 /**
960  * Send a "interested" message to a peer
961  * @param peer peer data
962  * @param mailbox destination mailbox
963  */
964 void send_interested(peer_t peer, const char *mailbox)
965 {
966   msg_task_t task =
967       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
968                        peer->id, task_message_size(MESSAGE_INTERESTED));
969   MSG_task_dsend(task, mailbox, task_message_free);
970   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
971
972 }
973
974 /**
975  * Send a "not interested" message to a peer
976  * @param peer peer data
977  * @param mailbox destination mailbox
978  */
979 void send_notinterested(peer_t peer, const char *mailbox)
980 {
981   msg_task_t task =
982       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
983                        peer->id, task_message_size(MESSAGE_NOTINTERESTED));
984   MSG_task_dsend(task, mailbox, task_message_free);
985   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
986
987 }
988
989 /**
990  * Send a handshake message to all the peers the peer has.
991  * @param peer peer data
992  */
993 void send_handshake_all(peer_t peer)
994 {
995   connection_t remote_peer;
996   xbt_dict_cursor_t cursor = NULL;
997   char *key;
998   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
999     msg_task_t task =
1000         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
1001                          peer->id, task_message_size(MESSAGE_HANDSHAKE));
1002     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1003     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
1004   }
1005 }
1006
1007 /**
1008  * Send a "handshake" message to an user
1009  * @param peer peer data
1010  * @param mailbox mailbox where to we send the message
1011  */
1012 void send_handshake(peer_t peer, const char *mailbox)
1013 {
1014   msg_task_t task =
1015       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
1016                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
1017   MSG_task_dsend(task, mailbox, task_message_free);
1018   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
1019 }
1020
1021 /**
1022  * Send a "choked" message to a peer.
1023  */
1024 void send_choked(peer_t peer, const char *mailbox)
1025 {
1026   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
1027   msg_task_t task =
1028       task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1029                        peer->id, task_message_size(MESSAGE_CHOKE));
1030   MSG_task_dsend(task, mailbox, task_message_free);
1031 }
1032
1033 /**
1034  * Send a "unchoked" message to a peer
1035  */
1036 void send_unchoked(peer_t peer, const char *mailbox)
1037 {
1038   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1039   msg_task_t task =
1040       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1041                        peer->id, task_message_size(MESSAGE_UNCHOKE));
1042   MSG_task_dsend(task, mailbox, task_message_free);
1043 }
1044
1045 /**
1046  * Send a "HAVE" message to all peers we are connected to
1047  */
1048 void send_have(peer_t peer, int piece)
1049 {
1050   XBT_DEBUG("Sending HAVE message to all my peers");
1051   connection_t remote_peer;
1052   xbt_dict_cursor_t cursor = NULL;
1053   char *key;
1054   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1055     msg_task_t task =
1056         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1057                                peer->id, piece,
1058                                task_message_size(MESSAGE_HAVE));
1059     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1060   }
1061 }
1062
1063 /**
1064  * Send a bitfield message to all the peers the peer has.
1065  * @param peer peer data
1066  */
1067 void send_bitfield(peer_t peer, const char *mailbox)
1068 {
1069   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1070   msg_task_t task =
1071       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1072                                 peer->bitfield, FILE_PIECES);
1073   //Async send and append to pending sends
1074   msg_comm_t comm = MSG_task_isend(task, mailbox);
1075   xbt_dynar_push(peer->pending_sends, &comm);
1076 }
1077
1078 /**
1079  * Send a "request" message to a pair, containing a request for a piece
1080  */
1081 void send_request(peer_t peer, const char *mailbox, int piece,
1082                   int block_index, int block_length)
1083 {
1084   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1085             block_index, block_length);
1086   msg_task_t task =
1087       task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1088                                block_index, block_length);
1089   MSG_task_dsend(task, mailbox, task_message_free);
1090 }
1091
1092 /**
1093  * Send a "piece" message to a pair, containing a piece of the file
1094  */
1095 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
1096                 int block_index, int block_length)
1097 {
1098   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1099             block_length, mailbox);
1100   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1101   xbt_assert((peer->bitfield[piece] == '1'),
1102              "Tried to send a piece that we doesn't have.");
1103   msg_task_t task =
1104       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1105                              stalled, block_index, block_length, BLOCK_SIZE);
1106   MSG_task_dsend(task, mailbox, task_message_free);
1107 }