Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5dc1078c54a50bc0507a630ac102f16491d82c31
[simgrid.git] / examples / msg / bittorrent / peer.c
1   /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "peer.h"
7 #include "tracker.h"
8 #include "connection.h"
9 #include "messages.h"
10 #include <msg/msg.h>
11 #include <xbt/RngStream.h>
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers,
14                              "Messages specific for the peers");
15
16
17
18 /**
19  * Peer main function
20  */
21 int peer(int argc, char *argv[]) {
22         s_peer_t peer;
23         //Check arguments
24         xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
25         //Build peer object
26         if (argc == 4) {
27                 peer_init(&peer, atoi(argv[1]),1);
28         }
29         else {
30                 peer_init(&peer, atoi(argv[1]),0);
31         }
32         //Retrieve deadline
33         double deadline = atof(argv[2]);
34         xbt_assert(deadline > 0, "Wrong deadline supplied");
35         XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
36         //Getting peer data from the tracker.
37         if (get_peers_data(&peer)) {
38                 XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
39                 XBT_DEBUG("Here is my current status: %s",peer.bitfield);
40                 peer.begin_receive_time = MSG_get_clock();
41                 if (has_finished(peer.bitfield)) {
42                         peer.pieces = FILE_PIECES;
43                         send_handshake_all(&peer);
44                         seed_loop(&peer,deadline);
45                 }
46                 else {
47                         leech_loop(&peer,deadline);
48                         seed_loop(&peer,deadline);
49                 }
50         }
51         else {
52                 XBT_INFO("Couldn't contact the tracker.");
53         }
54
55         XBT_INFO("Here is my current status: %s", peer.bitfield);
56         if (peer.comm_received) {
57                 MSG_comm_destroy(peer.comm_received);
58         }
59
60         peer_free(&peer);
61         return 0;
62 }
63 /**
64  * Peer main loop when it is leeching.
65  * @param peer peer data
66  * @param deadline time at which the peer has to leave
67  */
68 void leech_loop(peer_t peer, double deadline) {
69         double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
70         XBT_DEBUG("Start downloading.");
71         /*
72          * Send a "handshake" message to all the peers it got
73          * (since it couldn't have gotten more than 50 peers)
74          */
75         send_handshake_all(peer);
76         //Wait for at least one "bitfield" message.
77         wait_for_pieces(peer,deadline);
78         XBT_DEBUG("Starting main leech loop");
79         while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
80                 if (peer->comm_received == NULL) {
81                         peer->task_received = NULL;
82                         peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
83                 }
84                 if (MSG_comm_test(peer->comm_received)) {
85                         msg_error_t status = MSG_comm_get_status(peer->comm_received);
86                         MSG_comm_destroy(peer->comm_received);
87                         peer->comm_received = NULL;
88                         if (status == MSG_OK) {
89                                 handle_message(peer,peer->task_received);
90                         }
91                 }
92                 else {
93                         if (peer->current_piece != -1) {
94                                 send_interested_to_peers(peer);
95                         }
96                         else {
97                                 //If the current interested pieces is < MAX
98                                 if (peer->pieces_requested < MAX_PIECES) {
99                                         update_current_piece(peer);
100                                 }
101                         }
102                         //We don't execute the choke algorithm if we don't already have a piece
103                         if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
104                                 update_choked_peers(peer);
105                                 next_choked_update += UPDATE_CHOKED_INTERVAL;
106                         }
107                         else {
108                                 MSG_process_sleep(1);
109                         }
110                 }
111         }
112 }
113 /**
114  * Peer main loop when it is seeding
115  * @param peer peer data
116  * @param deadline time when the peer will leave
117  */
118 void seed_loop(peer_t peer, double deadline) {
119         double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
120         XBT_DEBUG("Start seeding.");
121         //start the main seed loop
122         while (MSG_get_clock() < deadline) {
123                 if (peer->comm_received == NULL) {
124                         peer->task_received = NULL;
125                         peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
126                 }
127                 if (MSG_comm_test(peer->comm_received)) {
128                         msg_error_t status = MSG_comm_get_status(peer->comm_received);
129                         MSG_comm_destroy(peer->comm_received);
130                         peer->comm_received = NULL;
131                         if (status == MSG_OK) {
132                                 handle_message(peer,peer->task_received);
133                         }
134                 }
135                 else {
136                         if (MSG_get_clock() >= next_choked_update) {
137                                 update_choked_peers(peer);
138                                 //TODO: Change the choked peer algorithm when seeding.
139                                 next_choked_update += UPDATE_CHOKED_INTERVAL;
140                         }
141                         else {
142                                 MSG_process_sleep(1);
143                         }
144                 }
145         }
146 }
147 /**
148  * Retrieves the peer list from the tracker
149  * @param peer current peer data
150  */
151 int get_peers_data(peer_t peer) {
152         int success = 0, send_success = 0;
153         double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
154         //Build the task to send to the tracker
155         tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
156         peer->mailbox_tracker,peer->id, 0, 0, FILE_SIZE);
157         //Build the task to send.
158         msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
159         msg_task_t task_received = NULL;
160         msg_comm_t comm_received;
161         while (!send_success && MSG_get_clock() < timeout) {
162                 XBT_DEBUG("Sending a peer request to the tracker.");
163                 msg_error_t status = MSG_task_send_with_timeout(task_send,TRACKER_MAILBOX,GET_PEERS_TIMEOUT);
164                 if (status == MSG_OK) {
165                         send_success = 1;
166                 }
167         }
168         while (!success && MSG_get_clock() < timeout) {
169                 comm_received = MSG_task_irecv(&task_received,peer->mailbox_tracker);
170                 msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
171                 if (status == MSG_OK) {
172                         tracker_task_data_t data = MSG_task_get_data(task_received);
173                         int i, peer_id;
174                         //Add the peers the tracker gave us to our peer list.
175                         xbt_dynar_foreach(data->peers,i,peer_id) {
176                                 if (peer_id != peer->id)
177                                         xbt_dict_set_ext(peer->peers,(char*)&peer_id,sizeof(int),connection_new(peer_id),NULL);
178                         }
179                         success = 1;
180                         //free the communication and the task
181                         MSG_comm_destroy(comm_received);
182                         tracker_task_data_free(data);
183                         MSG_task_destroy(task_received);
184                         comm_received = NULL;
185                 }
186         }
187
188         return success;
189 }
190 /**
191  * Initialize the peer data.
192  * @param peer peer data
193  * @param id id of the peer to take in the network
194  * @param seed indicates if the peer is a seed.
195  */
196 void peer_init(peer_t peer, int id, int seed) {
197         peer->id = id;
198         sprintf(peer->mailbox,"%d",id);
199         sprintf(peer->mailbox_tracker,"tracker_%d",id);
200         peer->peers = xbt_dict_new();
201         peer->active_peers = xbt_dict_new();
202         peer->hostname = MSG_host_get_name(MSG_host_self());
203
204         peer->bitfield = xbt_new(char,FILE_PIECES + 1);
205         peer->bitfield_blocks = xbt_new(char,(FILE_PIECES) * (PIECES_BLOCKS) + 1);
206         if (seed) {
207                 memset(peer->bitfield,'1',sizeof(char) * (FILE_PIECES + 1));
208                 memset(peer->bitfield_blocks,'1',sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
209         }
210         else {
211                 memset(peer->bitfield,'0',sizeof(char) * (FILE_PIECES + 1));
212         memset(peer->bitfield_blocks,'0',sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
213         }
214
215         peer->bitfield[FILE_PIECES] = '\0';
216     peer->pieces = 0;
217
218         peer->pieces_count = xbt_new0(short,FILE_PIECES);
219         peer->pieces_requested = 0;
220
221         peer->current_pieces = xbt_dynar_new(sizeof(int),NULL);
222         peer->current_piece = -1;
223
224         peer->stream = RngStream_CreateStream("");
225         peer->comm_received = NULL;
226
227         peer->round = 0;
228 }
229 /**
230  * Destroys a poor peer object.
231  */
232 void peer_free(peer_t peer) {
233         char *key;
234         connection_t connection;
235         xbt_dict_cursor_t cursor;
236         xbt_dict_foreach(peer->peers,cursor,key,connection) {
237                 connection_free(connection);
238         }
239         xbt_dict_free(&peer->peers);
240         xbt_dict_free(&peer->active_peers);
241         xbt_dynar_free(&peer->current_pieces);
242         xbt_free(peer->pieces_count);
243         xbt_free(peer->bitfield);
244         xbt_free(peer->bitfield_blocks);
245
246         RngStream_DeleteStream(&peer->stream);
247 }
248 /**
249  * Returns if a peer has finished downloading the file
250  * @param bitfield peer bitfield
251  */
252 int has_finished(char *bitfield) {
253         return ((memchr(bitfield,'0',sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
254 }
255 /**
256  * Handle a received message sent by another peer
257  * @param peer Peer data
258  * @param task task received.
259  */
260 void handle_message(peer_t peer, msg_task_t task) {
261         message_t message = MSG_task_get_data(task);
262         connection_t remote_peer;
263         remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char*)&message->peer_id, sizeof(int));
264         switch (message->type) {
265
266                 case MESSAGE_HANDSHAKE:
267                         XBT_DEBUG("Received a HANDSHAKE from %s (%s)",message->mailbox, message->issuer_host_name);
268                         //Check if the peer is in our connection list.
269                         if (!remote_peer) {
270                                 xbt_dict_set_ext(peer->peers,(char*)&message->peer_id,sizeof(int),connection_new(message->peer_id),NULL);
271                                 send_handshake(peer,message->mailbox);
272                         }
273                         //Send our bitfield to the peer
274                         send_bitfield(peer,message->mailbox);
275                 break;
276                 case MESSAGE_BITFIELD:
277                         XBT_DEBUG("Recieved a BITFIELD message from %s (%s)",message->mailbox,message->issuer_host_name);
278                         //Update the pieces list
279                         update_pieces_count_from_bitfield(peer,message->bitfield);
280                         //Store the bitfield
281                         remote_peer->bitfield = xbt_strdup(message->bitfield);
282                         //Update the current piece
283                         if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
284                                 update_current_piece(peer);
285                         }
286                 break;
287                 case MESSAGE_INTERESTED:
288                         XBT_DEBUG("Recieved an INTERESTED message from %s (%s)",message->mailbox,message->issuer_host_name);
289                         xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
290                         //Update the interested state of the peer.
291                         remote_peer->interested = 1;
292                 break;
293                 case MESSAGE_NOTINTERESTED:
294                         XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
295                         xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
296                         remote_peer->interested = 0;
297                 break;
298                 case MESSAGE_UNCHOKE:
299                         xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
300                         XBT_DEBUG("Received a UNCHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
301                         remote_peer->choked_download = 0;
302                         xbt_dict_set_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int),remote_peer,NULL);
303                         //Send requests to the peer, since it has unchoked us
304                         send_requests_to_peer(peer,remote_peer);
305                         break;
306                 case MESSAGE_CHOKE:
307                         xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
308                         XBT_DEBUG("Received a CHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
309                         remote_peer->choked_download = 1;
310                         xbt_ex_t e;
311                         TRY {
312                                 xbt_dict_remove_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int));
313                         }
314                         CATCH(e) {
315                                 xbt_ex_free(e);
316                         }
317                 break;
318                 case MESSAGE_HAVE:
319                         XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",message->mailbox, message->issuer_host_name, message->index);
320                         xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
321                         if (remote_peer->bitfield == NULL)
322                                 return;
323                         remote_peer->bitfield[message->index] = '1';
324                         peer->pieces_count[message->index]++;
325                         //If the piece is in our pieces, we tell the peer that we are interested.
326                         if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
327                                 remote_peer->am_interested = 1;
328                                 send_interested(peer,remote_peer->mailbox);
329                         }
330                         break;
331                 case MESSAGE_REQUEST:
332                         xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
333                         if (!remote_peer->choked_upload) {
334                                 XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",message->mailbox,message->issuer_host_name,message->peer_id, message->block_index,message->block_index + message->block_length);
335                                 if (peer->bitfield[message->index] == '1') {
336                                         send_piece(peer, message->mailbox, message->index, 0, message->block_index, message->block_length);
337                                 }
338                         }
339                         else {
340                           XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",message->mailbox,message->issuer_host_name,message->peer_id);
341                         }
342                 break;
343                 case MESSAGE_PIECE:
344                         xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
345                         //TODO: Execute Ã  computation.
346                         if (message->stalled) {
347                                 XBT_DEBUG("The received piece %d from %s (%s) is STALLED",message->index, message->mailbox, message->issuer_host_name);
348                         }
349                         else {
350                           XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)",message->index, message->block_index, message->block_index + message->block_length, message->mailbox, message->issuer_host_name);
351                                 if (peer->bitfield[message->index] == '0') {
352                                     update_bitfield_blocks(peer,message->index,message->block_index,message->block_length);
353                                     if (piece_complete(peer,message->index)) {
354                       peer->pieces_requested--;
355                       //Removing the piece from our piece list
356                       int piece_index = -1, i, piece;
357                       xbt_dynar_foreach(peer->current_pieces, i, piece) {
358                           if (piece == message->index) {
359                               piece_index = i;
360                               break;
361                           }
362                       }
363                       xbt_assert(piece_index != -1, "Received an incorrect piece");
364                       xbt_dynar_remove_at(peer->current_pieces,piece_index,NULL);
365                       //Setting the fact that we have the piece
366                       peer->bitfield[message->index] = '1';
367                       peer->pieces++;
368                       XBT_DEBUG("My status is now %s",peer->bitfield);
369                       //Sending the information to all the peers we are connected to
370                       send_have(peer,message->index);
371                       //sending UNINTERSTED to peers that doesn't have what we want.
372                       update_interested_after_receive(peer);
373                                   }
374                                 }
375                                 else {
376                                         XBT_DEBUG("However, we already have it");
377                                 }
378                         }
379                 break;
380         }
381         //Update the peer speed.
382         if (remote_peer) {
383           connection_add_speed_value(remote_peer, 1.0 / ( MSG_get_clock() - peer->begin_receive_time));
384         }
385         peer->begin_receive_time = MSG_get_clock();
386
387         task_message_free(task);
388 }
389
390 /**
391  * Wait for the node to receive interesting bitfield messages (ie: non empty)
392  * to be received
393  * @param deadline peer deadline
394  * @param peer peer data
395  */
396 void wait_for_pieces(peer_t peer, double deadline) {
397         int finished = 0;
398         while (MSG_get_clock() < deadline && !finished) {
399                 if (peer->comm_received == NULL) {
400                         peer->task_received = NULL;
401                         peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
402                 }
403                 msg_error_t status = MSG_comm_wait(peer->comm_received,TIMEOUT_MESSAGE);
404                 //free the comm already, we don't need it anymore
405                 MSG_comm_destroy(peer->comm_received);
406                 peer->comm_received = NULL;
407                 if (status == MSG_OK) {
408                         message_t message = MSG_task_get_data(peer->task_received);
409                         handle_message(peer,peer->task_received);
410                         if (peer->current_piece != -1) {
411                                 finished = 1;
412                         }
413                 }
414         }
415 }
416 /**
417  * Updates the list of who has a piece from a bitfield
418  * @param peer peer we want to update the list
419  * @param bitfield bitfield
420  */
421 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield) {
422         int i;
423         for (i = 0; i < FILE_PIECES; i++) {
424                 if (bitfield[i] == '1') {
425                         peer->pieces_count[i]++;
426                 }
427         }
428 }
429 /**
430  * Update the piece the peer is currently interested in.
431  * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
432  * If the peer has less than 3 pieces, he chooses a piece at random.
433  * If the peer has more than pieces, he downloads the pieces that are the less
434  * replicated
435  */
436 void update_current_piece(peer_t peer) {
437         if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)) {
438                 return;
439         }
440         if (1 || peer->pieces < 3) {
441                 int i = 0       ;
442                 do {
443                         peer->current_piece = RngStream_RandInt(peer->stream,0,FILE_PIECES - 1);;
444                         i++;
445                 } while (!(peer->bitfield[peer->current_piece] == '0' && !in_current_pieces(peer,peer->current_piece)));
446         }
447         else {
448                 //Trivial min algorithm.
449                 int i, min_id = -1;
450                 short min = -1;
451                 for (i = 0; i < FILE_PIECES; i++) {
452                         if (peer->bitfield[i] == '0') {
453                                 min = peer->pieces_count[i];
454                                 min_id = i;
455                                 break;
456                         }
457                 }
458                 xbt_assert((min > -1), "Couldn't find a minimum");
459                 for (i = 1; i < FILE_PIECES; i++) {
460                         if (peer->pieces_count[i] < min && peer->bitfield[i] == '0') {
461                                 min = peer->pieces_count[i];
462                                 min_id = i;
463                         }
464                 }
465                 peer->current_piece = min_id;
466         }
467         xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
468         XBT_DEBUG("New interested piece: %d",peer->current_piece);
469         xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES, "Peer want to retrieve a piece that doesn't exist."));
470 }
471 /**
472  * Update the list of current choked and unchoked peers, using the
473  * choke algorithm
474  * @param peer the current peer
475  */
476 void update_choked_peers(peer_t peer) {
477         //update the current round
478         peer->round = (peer->round + 1) % 3;
479         int i;
480         char *key;
481     connection_t peer_choosed = NULL;
482         //remove a peer from the list
483         xbt_dict_cursor_t cursor = NULL;
484         xbt_dict_cursor_first(peer->active_peers,&cursor);
485         if (xbt_dict_length(peer->active_peers) > 0) {
486                 key = xbt_dict_cursor_get_key(cursor);
487                 connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
488                 if (peer_choked) {
489                         send_choked(peer,peer_choked->mailbox);
490                         peer_choked->choked_upload = 1;
491                 }
492                 xbt_dict_remove_ext(peer->active_peers,key,sizeof(int));
493         }
494         xbt_dict_cursor_free(&cursor);
495
496         /**
497          * If we are currently seeding, we unchoke the peer which has
498          * been unchoke the least time.
499          */
500         if (peer->pieces == FILE_PIECES) {
501       connection_t connection;
502       double unchoke_time = MSG_get_clock() + 1;
503
504       xbt_dict_foreach(peer->peers,cursor,key,connection) {
505         if (connection->last_unchoke < unchoke_time && connection->interested) {
506           unchoke_time = connection->last_unchoke;
507           peer_choosed = connection;
508         }
509       }
510         }
511         else {
512       //Random optimistic unchoking
513       if (peer->round == 0) {
514           int j = 0;
515           do {
516               //We choose a random peer to unchoke.
517               int id_chosen = RngStream_RandInt(peer->stream,0,xbt_dict_length(peer->peers) - 1);
518               int i = 0;
519               connection_t connection;
520               xbt_dict_foreach(peer->peers,cursor,key,connection) {
521                   if (i == id_chosen) {
522                       peer_choosed = connection;
523                       break;
524                   }
525                   i++;
526               }
527               xbt_dict_cursor_free(&cursor);
528               if (peer_choosed->interested == 0) {
529                   peer_choosed = NULL;
530               }
531               j++;
532           } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
533       }
534       else {
535         //Use the "fastest download" policy.
536         connection_t connection;
537         double fastest_speed = 0.0;
538         xbt_dict_foreach(peer->peers,cursor,key,connection) {
539           if (connection->peer_speed > fastest_speed && connection->choked_upload && connection->interested) {
540             peer_choosed = connection;
541             fastest_speed = connection->peer_speed;
542           }
543         }
544       }
545
546         }
547     if (peer_choosed != NULL) {
548         peer_choosed->choked_upload = 0;
549         xbt_dict_set_ext(peer->active_peers,(char*)&peer_choosed->id,sizeof(int),peer_choosed,NULL);
550         peer_choosed->last_unchoke = MSG_get_clock();
551         send_unchoked(peer,peer_choosed->mailbox);
552     }
553
554 }
555 /**
556  * Updates our "interested" state about peers: send "not interested" to peers
557  * that don't have any more pieces we want.
558  * @param peer our peer data
559  */
560 void update_interested_after_receive(peer_t peer) {
561         char *key;
562         xbt_dict_cursor_t cursor;
563         connection_t connection;
564         int interested, cpt, piece;
565         xbt_dict_foreach(peer->peers, cursor, key, connection) {
566                 interested = 0;
567                 if (connection->am_interested) {
568                     //Check if the peer still has a piece we want.
569                         xbt_dynar_foreach(peer->current_pieces, cpt, piece) {
570                                 xbt_assert((piece >= 0), "Wrong piece.");
571                                 if (connection->bitfield && connection->bitfield[piece] == '1') {
572                                         interested = 1;
573                                         break;
574                                 }
575                         }
576                         if (!interested) {
577                                 connection->am_interested = 0;
578                                 send_notinterested(peer,connection->mailbox);
579                         }
580                 }
581         }
582 }
583 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length) {
584   int i;
585   xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
586   xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.",block_index);
587   for (i = block_index; i < (block_index + block_length); i++) {
588     peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
589   }
590 }
591 /**
592  * Returns if a peer has completed the download of a piece
593  */
594 int piece_complete(peer_t peer, int index) {
595   int i;
596   for (i = 0; i < PIECES_BLOCKS; i++) {
597     if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
598       return 0;
599     }
600   }
601   return 1;
602 }
603 /**
604  * Returns the first block that a peer doesn't have in a piece
605  */
606 int get_first_block(peer_t peer, int piece) {
607   int i;
608   for (i = 0; i < PIECES_BLOCKS; i++) {
609     if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
610       return i;
611     }
612   }
613   return -1;
614 }
615 /**
616  * Send request messages to a peer that have unchoked us
617  * @param peer peer
618  * @param remote_peer peer data to the peer we want to send the request
619  */
620 void send_requests_to_peer(peer_t peer, connection_t remote_peer) {
621         int i, piece, block_index, block_length;
622         xbt_dynar_foreach(peer->current_pieces, i, piece) {
623                 if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
624                   block_index = get_first_block(peer,piece);
625                   if (block_index != -1) {
626             block_length = PIECES_BLOCKS - block_index;
627             block_length = min(BLOCKS_REQUESTED,block_length);
628             send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
629             break;
630                   }
631                 }
632         }
633 }
634 /**
635  * Find the peers that have the current interested piece and send them
636  * the "interested" message
637  */
638 void send_interested_to_peers(peer_t peer) {
639         char *key;
640         xbt_dict_cursor_t cursor=NULL;
641         connection_t connection;
642         xbt_assert((peer->current_piece != -1), "Tried to send a interested message wheras the current_piece is -1");
643         xbt_dict_foreach(peer->peers, cursor, key, connection) {
644                 if (connection->bitfield && connection->bitfield[peer->current_piece] == '1') {
645                         connection->am_interested = 1;
646                         msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id);
647                         MSG_task_dsend(task,connection->mailbox,task_message_free);
648                         XBT_DEBUG("Send INTERESTED to %s",connection->mailbox);
649                 }
650         }
651         peer->current_piece = -1;
652         peer->pieces_requested++;
653 }
654 /**
655  * Send a "interested" message to a peer
656  * @param peer peer data
657  * @param mailbox destination mailbox
658  */
659 void send_interested(peer_t peer, const char *mailbox) {
660         msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id);
661         MSG_task_dsend(task,mailbox,task_message_free);
662         XBT_DEBUG("Sending INTERESTED to %s",mailbox);
663
664 }
665 /**
666  * Send a "not interested" message to a peer
667  * @param peer peer data
668  * @param mailbox destination mailbox
669  */
670 void send_notinterested(peer_t peer, const char *mailbox) {
671         msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id);
672         MSG_task_dsend(task,mailbox,task_message_free);
673         XBT_DEBUG("Sending NOTINTERESTED to %s",mailbox);
674
675 }
676 /**
677  * Send a handshake message to all the peers the peer has.
678  * @param peer peer data
679  */
680 void send_handshake_all(peer_t peer) {
681         connection_t remote_peer;
682         xbt_dict_cursor_t cursor=NULL;
683         char *key;
684         xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
685                 msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,peer->id);
686                 MSG_task_dsend(task,remote_peer->mailbox,task_message_free);
687                 XBT_DEBUG("Sending a HANDSHAKE to %s",remote_peer->mailbox);
688         }
689 }
690 /**
691  * Send a "handshake" message to an user
692  * @param peer peer data
693  * @param mailbox mailbox where to we send the message
694  */
695 void send_handshake(peer_t peer, const char *mailbox) {
696         msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,peer->id);
697         MSG_task_dsend(task,mailbox,task_message_free);
698         XBT_DEBUG("Sending a HANDSHAKE to %s",mailbox);
699 }
700 /**
701  * Send a "choked" message to a peer.
702  */
703 void send_choked(peer_t peer, const char *mailbox) {
704         XBT_DEBUG("Sending a CHOKE to %s",mailbox);
705         msg_task_t task = task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
706         MSG_task_dsend(task,mailbox,task_message_free);
707 }
708 /**
709  * Send a "unchoked" message to a peer
710  */
711 void send_unchoked(peer_t peer, const char *mailbox) {
712         XBT_DEBUG("Sending a UNCHOKE to %s",mailbox);
713         msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id);
714         MSG_task_dsend(task,mailbox,task_message_free);
715 }
716 /**
717  * Send a "HAVE" message to all peers we are connected to
718  */
719 void send_have(peer_t peer, int piece) {
720         XBT_DEBUG("Sending HAVE message to all my peers");
721         connection_t remote_peer;
722         xbt_dict_cursor_t cursor= NULL;
723         char *key;
724         xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
725                 msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,peer->id,piece);
726                 MSG_task_dsend(task,remote_peer->mailbox,task_message_free);
727         }
728 }
729 /**
730  * Send a bitfield message to all the peers the peer has.
731  * @param peer peer data
732  */
733 void send_bitfield(peer_t peer, const char *mailbox) {
734         XBT_DEBUG("Sending a BITFIELD to %s",mailbox);
735         msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox,peer->id,peer->bitfield);
736         MSG_task_dsend(task,mailbox,task_message_free);
737 }
738 /**
739  * Send a "request" message to a pair, containing a request for a piece
740  */
741 void send_request(peer_t peer, const char *mailbox, int piece, int block_index, int block_length) {
742         XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)",mailbox,piece,block_index,block_length);
743         msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
744         MSG_task_dsend(task,mailbox,task_message_free);
745 }
746 /**
747  * Send a "piece" message to a pair, containing a piece of the file
748  */
749 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled, int block_index, int block_length) {
750         XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s",piece,block_index, block_length,mailbox);
751         xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
752         xbt_assert((peer->bitfield[piece] == '1'), "Tried to send a piece that we doesn't have.");
753         msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, stalled, block_index, block_length);
754         MSG_task_dsend(task,mailbox,task_message_free);
755 }
756
757 int in_current_pieces(peer_t peer, int piece) {
758         int is_in = 0, i, peer_piece;
759         xbt_dynar_foreach(peer->current_pieces,i, peer_piece) {
760                 if (peer_piece == piece) {
761                         is_in = 1;
762                         break;
763                 }
764         }
765         return is_in;
766 }