Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'hypervisor' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid into hypervisor
[simgrid.git] / examples / msg / bittorrent / peer.c
1   /* Copyright (c) 2012. The SimGrid Team.
2    * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
14
15 //TODO: Let users change this
16 /*
17  * File transfered data
18  *
19  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
20  */
21 static int FILE_SIZE = 10 * 5 * 16384;
22 static int FILE_PIECES = 10;
23
24 static int PIECES_BLOCKS = 5;
25 static int BLOCK_SIZE = 16384;
26 static int BLOCKS_REQUESTED = 2;
27
28 /**
29  * Peer main function
30  */
31 int peer(int argc, char *argv[])
32 {
33   s_peer_t peer;
34   //Check arguments
35   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
36   //Build peer object
37   if (argc == 4) {
38     peer_init(&peer, atoi(argv[1]), 1);
39   } else {
40     peer_init(&peer, atoi(argv[1]), 0);
41   }
42   //Retrieve deadline
43   double deadline = atof(argv[2]);
44   xbt_assert(deadline > 0, "Wrong deadline supplied");
45   XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
46   //Getting peer data from the tracker.
47   if (get_peers_data(&peer)) {
48     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
49     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
50     peer.begin_receive_time = MSG_get_clock();
51     MSG_mailbox_set_async(peer.mailbox);
52     if (has_finished(peer.bitfield)) {
53       peer.pieces = FILE_PIECES;
54       send_handshake_all(&peer);
55       seed_loop(&peer, deadline);
56     } else {
57       leech_loop(&peer, deadline);
58       seed_loop(&peer, deadline);
59     }
60   } else {
61     XBT_INFO("Couldn't contact the tracker.");
62   }
63
64   XBT_INFO("Here is my current status: %s", peer.bitfield);
65   if (peer.comm_received) {
66     MSG_comm_destroy(peer.comm_received);
67   }
68
69   peer_free(&peer);
70   return 0;
71 }
72
73 /**
74  * Peer main loop when it is leeching.
75  * @param peer peer data
76  * @param deadline time at which the peer has to leave
77  */
78 void leech_loop(peer_t peer, double deadline)
79 {
80   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
81   XBT_DEBUG("Start downloading.");
82   /*
83    * Send a "handshake" message to all the peers it got
84    * (since it couldn't have gotten more than 50 peers)
85    */
86   send_handshake_all(peer);
87   //Wait for at least one "bitfield" message.
88   wait_for_pieces(peer, deadline);
89   XBT_DEBUG("Starting main leech loop");
90   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
91     if (peer->comm_received == NULL) {
92       peer->task_received = NULL;
93       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
94     }
95     if (MSG_comm_test(peer->comm_received)) {
96       msg_error_t status = MSG_comm_get_status(peer->comm_received);
97       MSG_comm_destroy(peer->comm_received);
98       peer->comm_received = NULL;
99       if (status == MSG_OK) {
100         handle_message(peer, peer->task_received);
101       }
102     } else {
103       handle_pending_sends(peer);
104       if (peer->current_piece != -1) {
105         send_interested_to_peers(peer);
106       } else {
107         //If the current interested pieces is < MAX
108         if (peer->pieces_requested < MAX_PIECES) {
109           update_current_piece(peer);
110         }
111       }
112       //We don't execute the choke algorithm if we don't already have a piece
113       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
114         update_choked_peers(peer);
115         next_choked_update += UPDATE_CHOKED_INTERVAL;
116       } else {
117         MSG_process_sleep(1);
118       }
119     }
120   }
121 }
122
123 /**
124  * Peer main loop when it is seeding
125  * @param peer peer data
126  * @param deadline time when the peer will leave
127  */
128 void seed_loop(peer_t peer, double deadline)
129 {
130   double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
131   XBT_DEBUG("Start seeding.");
132   //start the main seed loop
133   while (MSG_get_clock() < deadline) {
134     if (peer->comm_received == NULL) {
135       peer->task_received = NULL;
136       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
137     }
138     if (MSG_comm_test(peer->comm_received)) {
139       msg_error_t status = MSG_comm_get_status(peer->comm_received);
140       MSG_comm_destroy(peer->comm_received);
141       peer->comm_received = NULL;
142       if (status == MSG_OK) {
143         handle_message(peer, peer->task_received);
144       }
145     } else {
146       if (MSG_get_clock() >= next_choked_update) {
147         update_choked_peers(peer);
148         //TODO: Change the choked peer algorithm when seeding.
149         next_choked_update += UPDATE_CHOKED_INTERVAL;
150       } else {
151         MSG_process_sleep(1);
152       }
153     }
154   }
155 }
156
157 /**
158  * Retrieves the peer list from the tracker
159  * @param peer current peer data
160  */
161 int get_peers_data(peer_t peer)
162 {
163   int success = 0, send_success = 0;
164   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
165   //Build the task to send to the tracker
166   tracker_task_data_t data =
167       tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
168                             peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
169   //Build the task to send.
170   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
171   msg_task_t task_received = NULL;
172   msg_comm_t comm_received;
173   while (!send_success && MSG_get_clock() < timeout) {
174     XBT_DEBUG("Sending a peer request to the tracker.");
175     msg_error_t status =
176         MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
177                                    GET_PEERS_TIMEOUT);
178     if (status == MSG_OK) {
179       send_success = 1;
180     }
181   }
182   while (!success && MSG_get_clock() < timeout) {
183     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
184     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
185     if (status == MSG_OK) {
186       tracker_task_data_t data = MSG_task_get_data(task_received);
187       unsigned i;
188       int peer_id;
189       //Add the peers the tracker gave us to our peer list.
190       xbt_dynar_foreach(data->peers, i, peer_id) {
191         if (peer_id != peer->id)
192           xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
193                            connection_new(peer_id), NULL);
194       }
195       success = 1;
196       //free the communication and the task
197       MSG_comm_destroy(comm_received);
198       tracker_task_data_free(data);
199       MSG_task_destroy(task_received);
200       comm_received = NULL;
201     }
202   }
203
204   return success;
205 }
206
207 /**
208  * Initialize the peer data.
209  * @param peer peer data
210  * @param id id of the peer to take in the network
211  * @param seed indicates if the peer is a seed.
212  */
213 void peer_init(peer_t peer, int id, int seed)
214 {
215   peer->id = id;
216   sprintf(peer->mailbox, "%d", id);
217   sprintf(peer->mailbox_tracker, "tracker_%d", id);
218   peer->peers = xbt_dict_new();
219   peer->active_peers = xbt_dict_new();
220   peer->hostname = MSG_host_get_name(MSG_host_self());
221
222   peer->bitfield = xbt_new(char, FILE_PIECES + 1);
223   peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
224   if (seed) {
225     memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
226     memset(peer->bitfield_blocks, '1',
227            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
228   } else {
229     memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
230     memset(peer->bitfield_blocks, '0',
231            sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
232   }
233
234   peer->bitfield[FILE_PIECES] = '\0';
235   peer->pieces = 0;
236
237   peer->pieces_count = xbt_new0(short, FILE_PIECES);
238   peer->pieces_requested = 0;
239
240   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
241   peer->current_piece = -1;
242
243   peer->stream = MSG_host_get_data(MSG_host_self());
244   peer->comm_received = NULL;
245
246   peer->round = 0;
247
248   peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
249 }
250
251 /**
252  * Destroys a poor peer object.
253  */
254 void peer_free(peer_t peer)
255 {
256   char *key;
257   connection_t connection;
258   xbt_dict_cursor_t cursor;
259   xbt_dict_foreach(peer->peers, cursor, key, connection) {
260     connection_free(connection);
261   }
262   xbt_dict_free(&peer->peers);
263   xbt_dict_free(&peer->active_peers);
264   xbt_dynar_free(&peer->current_pieces);
265   xbt_dynar_free(&peer->pending_sends);
266   xbt_free(peer->pieces_count);
267   xbt_free(peer->bitfield);
268   xbt_free(peer->bitfield_blocks);
269 }
270
271 /**
272  * Returns if a peer has finished downloading the file
273  * @param bitfield peer bitfield
274  */
275 int has_finished(char *bitfield)
276 {
277   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
278 }
279
280 /**
281  * Handle pending sends and remove those which are done
282  * @param peer Peer data
283  */
284 void handle_pending_sends(peer_t peer)
285 {
286   int index;
287
288   while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
289     msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
290     int status = MSG_comm_get_status(comm_send);
291     xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
292     XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
293
294     msg_task_t task = MSG_comm_get_task(comm_send);
295     MSG_comm_destroy(comm_send);
296
297     if (status != MSG_OK) {
298       task_message_free(task);
299     }
300   }
301 }
302
303 /**
304  * Handle a received message sent by another peer
305  * @param peer Peer data
306  * @param task task received.
307  */
308 void handle_message(peer_t peer, msg_task_t task)
309 {
310   message_t message = MSG_task_get_data(task);
311   connection_t remote_peer;
312   remote_peer =
313       xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
314                                sizeof(int));
315   switch (message->type) {
316
317   case MESSAGE_HANDSHAKE:
318     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
319               message->issuer_host_name);
320     //Check if the peer is in our connection list.
321     if (!remote_peer) {
322       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
323                        connection_new(message->peer_id), NULL);
324       send_handshake(peer, message->mailbox);
325     }
326     //Send our bitfield to the peer
327     send_bitfield(peer, message->mailbox);
328     break;
329   case MESSAGE_BITFIELD:
330     XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
331               message->issuer_host_name);
332     //Update the pieces list
333     update_pieces_count_from_bitfield(peer, message->bitfield);
334     //Store the bitfield
335     remote_peer->bitfield = xbt_strdup(message->bitfield);
336     //Update the current piece
337     if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
338       update_current_piece(peer);
339     }
340     break;
341   case MESSAGE_INTERESTED:
342     XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
343               message->issuer_host_name);
344     xbt_assert((remote_peer != NULL),
345                "A non-in-our-list peer has sent us a message. WTH ?");
346     //Update the interested state of the peer.
347     remote_peer->interested = 1;
348     break;
349   case MESSAGE_NOTINTERESTED:
350     XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox,
351               message->issuer_host_name);
352     xbt_assert((remote_peer != NULL),
353                "A non-in-our-list peer has sent us a message. WTH ?");
354     remote_peer->interested = 0;
355     break;
356   case MESSAGE_UNCHOKE:
357     xbt_assert((remote_peer != NULL),
358                "A non-in-our-list peer has sent us a message. WTH ?");
359     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
360               message->issuer_host_name);
361     remote_peer->choked_download = 0;
362     xbt_dict_set_ext(peer->active_peers, (char *) &message->peer_id,
363                      sizeof(int), remote_peer, NULL);
364     //Send requests to the peer, since it has unchoked us
365     send_requests_to_peer(peer, remote_peer);
366     break;
367   case MESSAGE_CHOKE:
368     xbt_assert((remote_peer != NULL),
369                "A non-in-our-list peer has sent us a message. WTH ?");
370     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
371               message->issuer_host_name);
372     remote_peer->choked_download = 1;
373     xbt_ex_t e;
374     TRY {
375       xbt_dict_remove_ext(peer->active_peers, (char *) &message->peer_id,
376                           sizeof(int));
377     }
378     CATCH(e) {
379       xbt_ex_free(e);
380     }
381     break;
382   case MESSAGE_HAVE:
383     XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
384               message->mailbox, message->issuer_host_name, message->index);
385     xbt_assert((message->index >= 0
386                 && message->index < FILE_PIECES),
387                "Wrong HAVE message received");
388     if (remote_peer->bitfield == NULL)
389       return;
390     remote_peer->bitfield[message->index] = '1';
391     peer->pieces_count[message->index]++;
392     //If the piece is in our pieces, we tell the peer that we are interested.
393     if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
394       remote_peer->am_interested = 1;
395       send_interested(peer, remote_peer->mailbox);
396     }
397     break;
398   case MESSAGE_REQUEST:
399     xbt_assert((message->index >= 0
400                 && message->index < FILE_PIECES), "Wrong request received");
401     if (!remote_peer->choked_upload) {
402       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
403                 message->mailbox, message->issuer_host_name, message->peer_id,
404                 message->block_index,
405                 message->block_index + message->block_length);
406       if (peer->bitfield[message->index] == '1') {
407         send_piece(peer, message->mailbox, message->index, 0,
408                    message->block_index, message->block_length);
409       }
410     } else {
411       XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
412                 message->mailbox, message->issuer_host_name, message->peer_id);
413     }
414     break;
415   case MESSAGE_PIECE:
416     xbt_assert((message->index >= 0
417                 && message->index < FILE_PIECES), "Wrong piece received");
418     //TODO: Execute Ã  computation.
419     if (message->stalled) {
420       XBT_DEBUG("The received piece %d from %s (%s) is STALLED", message->index,
421                 message->mailbox, message->issuer_host_name);
422     } else {
423       XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
424                 message->block_index,
425                 message->block_index + message->block_length, message->mailbox,
426                 message->issuer_host_name);
427       if (peer->bitfield[message->index] == '0') {
428         update_bitfield_blocks(peer, message->index, message->block_index,
429                                message->block_length);
430         if (piece_complete(peer, message->index)) {
431           peer->pieces_requested--;
432           //Removing the piece from our piece list
433           unsigned i;
434           int piece_index = -1, piece;
435           xbt_dynar_foreach(peer->current_pieces, i, piece) {
436             if (piece == message->index) {
437               piece_index = i;
438               break;
439             }
440           }
441           xbt_assert(piece_index != -1, "Received an incorrect piece");
442           xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
443           //Setting the fact that we have the piece
444           peer->bitfield[message->index] = '1';
445           peer->pieces++;
446           XBT_DEBUG("My status is now %s", peer->bitfield);
447           //Sending the information to all the peers we are connected to
448           send_have(peer, message->index);
449           //sending UNINTERSTED to peers that doesn't have what we want.
450           update_interested_after_receive(peer);
451         }
452       } else {
453         XBT_DEBUG("However, we already have it");
454       }
455     }
456     break;
457   case MESSAGE_CANCEL:
458     break;
459   }
460   //Update the peer speed.
461   if (remote_peer) {
462     connection_add_speed_value(remote_peer,
463                                1.0 / (MSG_get_clock() -
464                                       peer->begin_receive_time));
465   }
466   peer->begin_receive_time = MSG_get_clock();
467
468   task_message_free(task);
469 }
470
471 /**
472  * Wait for the node to receive interesting bitfield messages (ie: non empty)
473  * to be received
474  * @param deadline peer deadline
475  * @param peer peer data
476  */
477 void wait_for_pieces(peer_t peer, double deadline)
478 {
479   int finished = 0;
480   while (MSG_get_clock() < deadline && !finished) {
481     if (peer->comm_received == NULL) {
482       peer->task_received = NULL;
483       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
484     }
485     msg_error_t status = MSG_comm_wait(peer->comm_received, TIMEOUT_MESSAGE);
486     //free the comm already, we don't need it anymore
487     MSG_comm_destroy(peer->comm_received);
488     peer->comm_received = NULL;
489     if (status == MSG_OK) {
490       MSG_task_get_data(peer->task_received);
491       handle_message(peer, peer->task_received);
492       if (peer->current_piece != -1) {
493         finished = 1;
494       }
495     }
496   }
497 }
498
499 /**
500  * Updates the list of who has a piece from a bitfield
501  * @param peer peer we want to update the list
502  * @param bitfield bitfield
503  */
504 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
505 {
506   int i;
507   for (i = 0; i < FILE_PIECES; i++) {
508     if (bitfield[i] == '1') {
509       peer->pieces_count[i]++;
510     }
511   }
512 }
513
514 /**
515  * Update the piece the peer is currently interested in.
516  * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
517  * If the peer has less than 3 pieces, he chooses a piece at random.
518  * If the peer has more than pieces, he downloads the pieces that are the less
519  * replicated
520  */
521 void update_current_piece(peer_t peer)
522 {
523   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)) {
524     return;
525   }
526   if (1 || peer->pieces < 3) {
527     int i = 0;
528     do {
529       peer->current_piece =
530           RngStream_RandInt(peer->stream, 0, FILE_PIECES - 1);;
531       i++;
532     } while (!
533              (peer->bitfield[peer->current_piece] == '0'
534               && !in_current_pieces(peer, peer->current_piece)));
535   } else {
536     //Trivial min algorithm.
537     int i, min_id = -1;
538     short min = -1;
539     for (i = 0; i < FILE_PIECES; i++) {
540       if (peer->bitfield[i] == '0') {
541         min = peer->pieces_count[i];
542         min_id = i;
543         break;
544       }
545     }
546     xbt_assert((min > -1), "Couldn't find a minimum");
547     for (i = 1; i < FILE_PIECES; i++) {
548       if (peer->pieces_count[i] < min && peer->bitfield[i] == '0') {
549         min = peer->pieces_count[i];
550         min_id = i;
551       }
552     }
553     peer->current_piece = min_id;
554   }
555   xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
556   XBT_DEBUG("New interested piece: %d", peer->current_piece);
557   xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES),
558              "Peer want to retrieve a piece that doesn't exist.");
559 }
560
561 /**
562  * Update the list of current choked and unchoked peers, using the
563  * choke algorithm
564  * @param peer the current peer
565  */
566 void update_choked_peers(peer_t peer)
567 {
568   //update the current round
569   peer->round = (peer->round + 1) % 3;
570   char *key;
571   connection_t peer_choosed = NULL;
572   //remove a peer from the list
573   xbt_dict_cursor_t cursor = NULL;
574   xbt_dict_cursor_first(peer->active_peers, &cursor);
575   if (!xbt_dict_is_empty(peer->active_peers)) {
576     key = xbt_dict_cursor_get_key(cursor);
577     connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
578     if (peer_choked) {
579       send_choked(peer, peer_choked->mailbox);
580       peer_choked->choked_upload = 1;
581     }
582     xbt_dict_remove_ext(peer->active_peers, key, sizeof(int));
583   }
584   xbt_dict_cursor_free(&cursor);
585
586         /**
587          * If we are currently seeding, we unchoke the peer which has
588          * been unchoke the least time.
589          */
590   if (peer->pieces == FILE_PIECES) {
591     connection_t connection;
592     double unchoke_time = MSG_get_clock() + 1;
593
594     xbt_dict_foreach(peer->peers, cursor, key, connection) {
595       if (connection->last_unchoke < unchoke_time && connection->interested) {
596         unchoke_time = connection->last_unchoke;
597         peer_choosed = connection;
598       }
599     }
600   } else {
601     //Random optimistic unchoking
602     if (peer->round == 0) {
603       int j = 0;
604       do {
605         //We choose a random peer to unchoke.
606         int id_chosen =
607             RngStream_RandInt(peer->stream, 0,
608                               xbt_dict_length(peer->peers) - 1);
609         int i = 0;
610         connection_t connection;
611         xbt_dict_foreach(peer->peers, cursor, key, connection) {
612           if (i == id_chosen) {
613             peer_choosed = connection;
614             break;
615           }
616           i++;
617         }
618         xbt_dict_cursor_free(&cursor);
619         if (peer_choosed->interested == 0) {
620           peer_choosed = NULL;
621         }
622         j++;
623       } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
624     } else {
625       //Use the "fastest download" policy.
626       connection_t connection;
627       double fastest_speed = 0.0;
628       xbt_dict_foreach(peer->peers, cursor, key, connection) {
629         if (connection->peer_speed > fastest_speed && connection->choked_upload
630             && connection->interested) {
631           peer_choosed = connection;
632           fastest_speed = connection->peer_speed;
633         }
634       }
635     }
636
637   }
638   if (peer_choosed != NULL) {
639     peer_choosed->choked_upload = 0;
640     xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
641                      sizeof(int), peer_choosed, NULL);
642     peer_choosed->last_unchoke = MSG_get_clock();
643     send_unchoked(peer, peer_choosed->mailbox);
644   }
645
646 }
647
648 /**
649  * Updates our "interested" state about peers: send "not interested" to peers
650  * that don't have any more pieces we want.
651  * @param peer our peer data
652  */
653 void update_interested_after_receive(peer_t peer)
654 {
655   char *key;
656   xbt_dict_cursor_t cursor;
657   connection_t connection;
658   unsigned cpt;
659   int interested, piece;
660   xbt_dict_foreach(peer->peers, cursor, key, connection) {
661     interested = 0;
662     if (connection->am_interested) {
663       //Check if the peer still has a piece we want.
664       xbt_dynar_foreach(peer->current_pieces, cpt, piece) {
665         xbt_assert((piece >= 0), "Wrong piece.");
666         if (connection->bitfield && connection->bitfield[piece] == '1') {
667           interested = 1;
668           break;
669         }
670       }
671       if (!interested) {
672         connection->am_interested = 0;
673         send_notinterested(peer, connection->mailbox);
674       }
675     }
676   }
677 }
678
679 void update_bitfield_blocks(peer_t peer, int index, int block_index,
680                             int block_length)
681 {
682   int i;
683   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
684   xbt_assert((block_index >= 0
685               && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
686              block_index);
687   for (i = block_index; i < (block_index + block_length); i++) {
688     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
689   }
690 }
691
692 /**
693  * Returns if a peer has completed the download of a piece
694  */
695 int piece_complete(peer_t peer, int index)
696 {
697   int i;
698   for (i = 0; i < PIECES_BLOCKS; i++) {
699     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
700       return 0;
701     }
702   }
703   return 1;
704 }
705
706 /**
707  * Returns the first block that a peer doesn't have in a piece
708  */
709 int get_first_block(peer_t peer, int piece)
710 {
711   int i;
712   for (i = 0; i < PIECES_BLOCKS; i++) {
713     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
714       return i;
715     }
716   }
717   return -1;
718 }
719
720 /**
721  * Send request messages to a peer that have unchoked us
722  * @param peer peer
723  * @param remote_peer peer data to the peer we want to send the request
724  */
725 void send_requests_to_peer(peer_t peer, connection_t remote_peer)
726 {
727   unsigned i;
728   int piece, block_index, block_length;
729   xbt_dynar_foreach(peer->current_pieces, i, piece) {
730     if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
731       block_index = get_first_block(peer, piece);
732       if (block_index != -1) {
733         block_length = PIECES_BLOCKS - block_index;
734         block_length = min(BLOCKS_REQUESTED, block_length);
735         send_request(peer, remote_peer->mailbox, piece, block_index,
736                      block_length);
737         break;
738       }
739     }
740   }
741 }
742
743 /**
744  * Find the peers that have the current interested piece and send them
745  * the "interested" message
746  */
747 void send_interested_to_peers(peer_t peer)
748 {
749   char *key;
750   xbt_dict_cursor_t cursor = NULL;
751   connection_t connection;
752   xbt_assert((peer->current_piece != -1),
753              "Tried to send a interested message wheras the current_piece is -1");
754   xbt_dict_foreach(peer->peers, cursor, key, connection) {
755     if (connection->bitfield
756         && connection->bitfield[peer->current_piece] == '1') {
757       connection->am_interested = 1;
758       msg_task_t task =
759           task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
760                            peer->id, task_message_size(MESSAGE_INTERESTED));
761       MSG_task_dsend(task, connection->mailbox, task_message_free);
762       XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
763     }
764   }
765   peer->current_piece = -1;
766   peer->pieces_requested++;
767 }
768
769 /**
770  * Send a "interested" message to a peer
771  * @param peer peer data
772  * @param mailbox destination mailbox
773  */
774 void send_interested(peer_t peer, const char *mailbox)
775 {
776   msg_task_t task =
777       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
778                        peer->id, task_message_size(MESSAGE_INTERESTED));
779   MSG_task_dsend(task, mailbox, task_message_free);
780   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
781
782 }
783
784 /**
785  * Send a "not interested" message to a peer
786  * @param peer peer data
787  * @param mailbox destination mailbox
788  */
789 void send_notinterested(peer_t peer, const char *mailbox)
790 {
791   msg_task_t task =
792       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
793                        peer->id, task_message_size(MESSAGE_NOTINTERESTED));
794   MSG_task_dsend(task, mailbox, task_message_free);
795   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
796
797 }
798
799 /**
800  * Send a handshake message to all the peers the peer has.
801  * @param peer peer data
802  */
803 void send_handshake_all(peer_t peer)
804 {
805   connection_t remote_peer;
806   xbt_dict_cursor_t cursor = NULL;
807   char *key;
808   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
809     msg_task_t task =
810         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
811                          peer->id, task_message_size(MESSAGE_HANDSHAKE));
812     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
813     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
814   }
815 }
816
817 /**
818  * Send a "handshake" message to an user
819  * @param peer peer data
820  * @param mailbox mailbox where to we send the message
821  */
822 void send_handshake(peer_t peer, const char *mailbox)
823 {
824   msg_task_t task =
825       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
826                        peer->id, task_message_size(MESSAGE_HANDSHAKE));
827   MSG_task_dsend(task, mailbox, task_message_free);
828   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
829 }
830
831 /**
832  * Send a "choked" message to a peer.
833  */
834 void send_choked(peer_t peer, const char *mailbox)
835 {
836   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
837   msg_task_t task =
838       task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, 
839                        peer->id, task_message_size(MESSAGE_CHOKE));
840   MSG_task_dsend(task, mailbox, task_message_free);
841 }
842
843 /**
844  * Send a "unchoked" message to a peer
845  */
846 void send_unchoked(peer_t peer, const char *mailbox)
847 {
848   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
849   msg_task_t task =
850       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
851                        peer->id, task_message_size(MESSAGE_UNCHOKE));
852   MSG_task_dsend(task, mailbox, task_message_free);
853 }
854
855 /**
856  * Send a "HAVE" message to all peers we are connected to
857  */
858 void send_have(peer_t peer, int piece)
859 {
860   XBT_DEBUG("Sending HAVE message to all my peers");
861   connection_t remote_peer;
862   xbt_dict_cursor_t cursor = NULL;
863   char *key;
864   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
865     msg_task_t task =
866         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
867                                peer->id, piece, task_message_size(MESSAGE_HAVE));
868     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
869   }
870 }
871
872 /**
873  * Send a bitfield message to all the peers the peer has.
874  * @param peer peer data
875  */
876 void send_bitfield(peer_t peer, const char *mailbox)
877 {
878   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
879   msg_task_t task =
880       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
881                                 peer->bitfield, FILE_PIECES);
882   //Async send and append to pending sends
883   msg_comm_t comm = MSG_task_isend(task, mailbox);
884   xbt_dynar_push(peer->pending_sends, &comm);
885 }
886
887 /**
888  * Send a "request" message to a pair, containing a request for a piece
889  */
890 void send_request(peer_t peer, const char *mailbox, int piece, int block_index,
891                   int block_length)
892 {
893   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
894             block_index, block_length);
895   msg_task_t task =
896       task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
897                                block_index, block_length);
898   MSG_task_dsend(task, mailbox, task_message_free);
899 }
900
901 /**
902  * Send a "piece" message to a pair, containing a piece of the file
903  */
904 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
905                 int block_index, int block_length)
906 {
907   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
908             block_length, mailbox);
909   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
910   xbt_assert((peer->bitfield[piece] == '1'),
911              "Tried to send a piece that we doesn't have.");
912   msg_task_t task =
913       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
914                              stalled, block_index, block_length, BLOCK_SIZE);
915   MSG_task_dsend(task, mailbox, task_message_free);
916 }
917
918 int in_current_pieces(peer_t peer, int piece)
919 {
920   unsigned i;
921   int is_in = 0, peer_piece;
922   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
923     if (peer_piece == piece) {
924       is_in = 1;
925       break;
926     }
927   }
928   return is_in;
929 }