1 /* Copyright (c) 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "connection.h"
11 #include <xbt/RngStream.h>
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
16 //TODO: Let users change this
18 * File transfered data
19 * For the test, default values are :
20 * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
23 #define FILE_PIECES 10
24 #define PIECES_BLOCKS 5
25 #define BLOCK_SIZE 16384
28 * Number of blocks asked by each request
30 #define BLOCKS_REQUESTED 2
33 static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
35 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
36 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
37 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece);
43 int peer(int argc, char *argv[])
47 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
50 peer_init(&peer, atoi(argv[1]), 1);
52 peer_init(&peer, atoi(argv[1]), 0);
55 double deadline = atof(argv[2]);
56 xbt_assert(deadline > 0, "Wrong deadline supplied");
57 XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
58 //Getting peer data from the tracker.
59 if (get_peers_data(&peer)) {
60 XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
61 XBT_DEBUG("Here is my current status: %s", peer.bitfield);
62 peer.begin_receive_time = MSG_get_clock();
63 MSG_mailbox_set_async(peer.mailbox);
64 if (has_finished(peer.bitfield)) {
65 peer.pieces = FILE_PIECES;
66 send_handshake_all(&peer);
67 seed_loop(&peer, deadline);
69 leech_loop(&peer, deadline);
70 // XBT_INFO("%d becomes a seeder", peer.id);
71 seed_loop(&peer, deadline);
74 XBT_INFO("Couldn't contact the tracker.");
77 XBT_INFO("Here is my current status: %s", peer.bitfield);
78 if (peer.comm_received) {
79 MSG_comm_destroy(peer.comm_received);
87 * Peer main loop when it is leeching.
88 * @param peer peer data
89 * @param deadline time at which the peer has to leave
91 void leech_loop(peer_t peer, double deadline)
93 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
94 XBT_DEBUG("Start downloading.");
96 * Send a "handshake" message to all the peers it got
97 * (since it couldn't have gotten more than 50 peers)
99 send_handshake_all(peer);
100 //Wait for at least one "bitfield" message.
101 // wait_for_pieces(peer, deadline);
102 XBT_DEBUG("Starting main leech loop");
104 while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
105 if (peer->comm_received == NULL) {
106 // XBT_INFO("irecv");
107 peer->task_received = NULL;
108 peer->comm_received =
109 MSG_task_irecv(&peer->task_received, peer->mailbox);
111 if (MSG_comm_test(peer->comm_received)) {
112 // XBT_INFO("comm_test OK");
113 msg_error_t status = MSG_comm_get_status(peer->comm_received);
114 MSG_comm_destroy(peer->comm_received);
115 peer->comm_received = NULL;
116 if (status == MSG_OK) {
117 handle_message(peer, peer->task_received);
120 handle_pending_sends(peer);
121 //We don't execute the choke algorithm if we don't already have a piece
122 if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
123 update_choked_peers(peer);
124 next_choked_update += UPDATE_CHOKED_INTERVAL;
126 MSG_process_sleep(1);
133 * Peer main loop when it is seeding
134 * @param peer peer data
135 * @param deadline time when the peer will leave
137 void seed_loop(peer_t peer, double deadline)
139 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
140 XBT_DEBUG("Start seeding.");
141 //start the main seed loop
142 while (MSG_get_clock() < deadline) {
143 if (peer->comm_received == NULL) {
144 peer->task_received = NULL;
145 peer->comm_received =
146 MSG_task_irecv(&peer->task_received, peer->mailbox);
148 if (MSG_comm_test(peer->comm_received)) {
149 msg_error_t status = MSG_comm_get_status(peer->comm_received);
150 MSG_comm_destroy(peer->comm_received);
151 peer->comm_received = NULL;
152 if (status == MSG_OK) {
153 handle_message(peer, peer->task_received);
156 if (MSG_get_clock() >= next_choked_update) {
157 update_choked_peers(peer);
158 //TODO: Change the choked peer algorithm when seeding.
159 next_choked_update += UPDATE_CHOKED_INTERVAL;
161 MSG_process_sleep(1);
168 * Retrieves the peer list from the tracker
169 * @param peer current peer data
171 int get_peers_data(peer_t peer)
173 int success = 0, send_success = 0;
174 double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
175 //Build the task to send to the tracker
176 tracker_task_data_t data =
177 tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
178 peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
179 //Build the task to send.
180 msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
181 msg_task_t task_received = NULL;
182 msg_comm_t comm_received;
183 while (!send_success && MSG_get_clock() < timeout) {
184 XBT_DEBUG("Sending a peer request to the tracker.");
186 MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
188 if (status == MSG_OK) {
192 while (!success && MSG_get_clock() < timeout) {
193 comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
194 msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
195 if (status == MSG_OK) {
196 tracker_task_data_t data = MSG_task_get_data(task_received);
199 //Add the peers the tracker gave us to our peer list.
200 xbt_dynar_foreach(data->peers, i, peer_id) {
201 if (peer_id != peer->id)
202 xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
203 connection_new(peer_id), NULL);
206 //free the communication and the task
207 MSG_comm_destroy(comm_received);
208 tracker_task_data_free(data);
209 MSG_task_destroy(task_received);
210 comm_received = NULL;
218 * Initialize the peer data.
219 * @param peer peer data
220 * @param id id of the peer to take in the network
221 * @param seed indicates if the peer is a seed.
223 void peer_init(peer_t peer, int id, int seed)
226 sprintf(peer->mailbox, "%d", id);
227 sprintf(peer->mailbox_tracker, "tracker_%d", id);
228 peer->peers = xbt_dict_new();
229 peer->active_peers = xbt_dict_new();
230 peer->hostname = MSG_host_get_name(MSG_host_self());
232 peer->bitfield = xbt_new(char, FILE_PIECES + 1);
233 peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
235 memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
236 memset(peer->bitfield_blocks, '1',
237 sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
239 memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
240 memset(peer->bitfield_blocks, '0',
241 sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
244 peer->bitfield[FILE_PIECES] = '\0';
247 peer->pieces_count = xbt_new0(short, FILE_PIECES);
249 peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
251 peer->stream = RngStream_CreateStream("");
252 peer->comm_received = NULL;
256 peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
260 * Destroys a poor peer object.
262 void peer_free(peer_t peer)
265 connection_t connection;
266 xbt_dict_cursor_t cursor;
267 xbt_dict_foreach(peer->peers, cursor, key, connection) {
268 connection_free(connection);
270 xbt_dict_free(&peer->peers);
271 xbt_dict_free(&peer->active_peers);
272 xbt_dynar_free(&peer->current_pieces);
273 xbt_dynar_free(&peer->pending_sends);
274 xbt_free(peer->pieces_count);
275 xbt_free(peer->bitfield);
276 xbt_free(peer->bitfield_blocks);
278 RngStream_DeleteStream(&peer->stream);
282 * Returns if a peer has finished downloading the file
283 * @param bitfield peer bitfield
285 int has_finished(char *bitfield)
287 return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) ==
291 int nb_interested_peers(peer_t peer)
293 xbt_dict_cursor_t cursor = NULL;
295 connection_t connection;
297 xbt_dict_foreach(peer->peers, cursor, key, connection) {
298 if (connection->interested)
305 * Handle pending sends and remove those which are done
306 * @param peer Peer data
308 void handle_pending_sends(peer_t peer)
312 while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
313 msg_comm_t comm_send =
314 xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
315 int status = MSG_comm_get_status(comm_send);
316 xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
318 ("Communication %p is finished with status %d, dynar size is now %lu",
319 comm_send, status, xbt_dynar_length(peer->pending_sends));
321 msg_task_t task = MSG_comm_get_task(comm_send);
322 MSG_comm_destroy(comm_send);
324 if (status != MSG_OK) {
325 task_message_free(task);
330 void update_active_peers_set(peer_t peer, connection_t remote_peer)
333 if (remote_peer->interested && !remote_peer->choked_upload) {
334 //add in the active peers set
335 xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id,
336 sizeof(int), remote_peer, NULL);
341 xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id,
352 * Handle a received message sent by another peer
353 * @param peer Peer data
354 * @param task task received.
356 void handle_message(peer_t peer, msg_task_t task)
358 message_t message = MSG_task_get_data(task);
359 connection_t remote_peer;
361 xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
363 switch (message->type) {
365 case MESSAGE_HANDSHAKE:
366 XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
367 message->issuer_host_name);
368 //Check if the peer is in our connection list.
370 xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
371 connection_new(message->peer_id), NULL);
372 send_handshake(peer, message->mailbox);
374 //Send our bitfield to the peer
375 send_bitfield(peer, message->mailbox);
377 case MESSAGE_BITFIELD:
378 XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
379 message->issuer_host_name);
380 //Update the pieces list
381 update_pieces_count_from_bitfield(peer, message->bitfield);
383 remote_peer->bitfield = xbt_strdup(message->bitfield);
384 xbt_assert(!remote_peer->am_interested,
385 "Should not be interested at first");
386 if (is_interested(peer, remote_peer)) {
387 remote_peer->am_interested = 1;
388 send_interested(peer, message->mailbox);
391 case MESSAGE_INTERESTED:
392 XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
393 message->issuer_host_name);
394 xbt_assert((remote_peer != NULL),
395 "A non-in-our-list peer has sent us a message. WTH ?");
396 //Update the interested state of the peer.
397 remote_peer->interested = 1;
398 update_active_peers_set(peer, remote_peer);
400 case MESSAGE_NOTINTERESTED:
401 XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)",
402 message->mailbox, message->issuer_host_name);
403 xbt_assert((remote_peer != NULL),
404 "A non-in-our-list peer has sent us a message. WTH ?");
405 remote_peer->interested = 0;
406 update_active_peers_set(peer, remote_peer);
408 case MESSAGE_UNCHOKE:
409 xbt_assert((remote_peer != NULL),
410 "A non-in-our-list peer has sent us a message. WTH ?");
411 XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
412 message->issuer_host_name);
413 remote_peer->choked_download = 0;
414 //Send requests to the peer, since it has unchoked us
415 if (remote_peer->am_interested)
416 request_new_piece_to_peer(peer, remote_peer);
419 xbt_assert((remote_peer != NULL),
420 "A non-in-our-list peer has sent us a message. WTH ?");
421 XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
422 message->issuer_host_name);
423 remote_peer->choked_download = 1;
424 remove_current_piece(peer, remote_peer, remote_peer->current_piece);
427 XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
428 message->mailbox, message->issuer_host_name, message->index);
429 xbt_assert(remote_peer->bitfield, "bitfield not received");
430 xbt_assert((message->index >= 0
431 && message->index < FILE_PIECES),
432 "Wrong HAVE message received");
433 remote_peer->bitfield[message->index] = '1';
434 peer->pieces_count[message->index]++;
435 //If the piece is in our pieces, we tell the peer that we are interested.
436 if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
437 remote_peer->am_interested = 1;
438 send_interested(peer, message->mailbox);
439 if (!remote_peer->choked_download)
440 request_new_piece_to_peer(peer, remote_peer);
443 case MESSAGE_REQUEST:
444 xbt_assert((message->index >= 0
445 && message->index < FILE_PIECES), "Wrong request received");
446 if (!remote_peer->choked_upload) {
447 XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
448 message->mailbox, message->issuer_host_name, message->peer_id,
449 message->block_index,
450 message->block_index + message->block_length);
451 if (peer->bitfield[message->index] == '1') {
452 send_piece(peer, message->mailbox, message->index, 0,
453 message->block_index, message->block_length);
456 XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
457 message->mailbox, message->issuer_host_name,
462 xbt_assert((message->index >= 0
463 && message->index < FILE_PIECES), "Wrong piece received");
464 //TODO: Execute à computation.
465 if (message->stalled) {
466 XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
467 message->index, message->mailbox, message->issuer_host_name);
469 XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
470 message->block_index,
471 message->block_index + message->block_length,
472 message->mailbox, message->issuer_host_name);
473 if (peer->bitfield[message->index] == '0') {
474 update_bitfield_blocks(peer, message->index, message->block_index,
475 message->block_length);
476 if (piece_complete(peer, message->index)) {
477 //Removing the piece from our piece list
478 remove_current_piece(peer, remote_peer, message->index);
479 //Setting the fact that we have the piece
480 peer->bitfield[message->index] = '1';
482 XBT_DEBUG("My status is now %s", peer->bitfield);
483 //Sending the information to all the peers we are connected to
484 send_have(peer, message->index);
485 //sending UNINTERSTED to peers that doesn't have what we want.
486 update_interested_after_receive(peer);
487 } else { // piece not completed
488 send_request_to_peer(peer, remote_peer, message->index); // ask for the next block
491 XBT_DEBUG("However, we already have it");
492 request_new_piece_to_peer(peer, remote_peer);
497 XBT_DEBUG("The received CANCEL from %s (%s)",
498 message->mailbox, message->issuer_host_name);
501 //Update the peer speed.
503 connection_add_speed_value(remote_peer,
504 1.0 / (MSG_get_clock() -
505 peer->begin_receive_time));
507 peer->begin_receive_time = MSG_get_clock();
509 task_message_free(task);
512 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
514 int piece = select_piece_to_download(peer, remote_peer);
516 xbt_dynar_push_as(peer->current_pieces, int, piece);
517 send_request_to_peer(peer, remote_peer, piece);
521 void remove_current_piece(peer_t peer, connection_t remote_peer,
524 int piece_index = -1, piece, i;
525 xbt_dynar_foreach(peer->current_pieces, i, piece) {
526 if (piece == current_piece) {
531 if (piece_index != -1)
532 xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
533 remote_peer->current_piece = -1;
537 * Updates the list of who has a piece from a bitfield
538 * @param peer peer we want to update the list
539 * @param bitfield bitfield
541 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
544 for (i = 0; i < FILE_PIECES; i++) {
545 if (bitfield[i] == '1') {
546 peer->pieces_count[i]++;
554 * Return the piece to be downloaded
555 * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
556 * If a piece is partially downloaded, this piece will be selected prioritarily
557 * If the peer has strictly less than 4 pieces, he chooses a piece at random.
558 * If the peer has more than pieces, he downloads the pieces that are the less
559 * replicated (rarest policy).
560 * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
562 * @param peer: local peer
563 * @param remote_peer: information about the connection
564 * @return the piece to download if possible. -1 otherwise
566 int select_piece_to_download(peer_t peer, connection_t remote_peer)
570 piece = partially_downloaded_piece(peer, remote_peer);
571 // strict priority policy
576 if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) && is_interested(peer, remote_peer)) {
578 int nb_interesting_pieces = 0;
579 int random_piece_index, current_index = 0;
580 // compute the number of interesting pieces
581 for (i = 0; i < FILE_PIECES; i++) {
582 if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
583 nb_interesting_pieces++;
586 xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
587 // get a random interesting piece
588 random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces-1);
589 for (i = 0; i < FILE_PIECES; i++) {
590 if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1') {
591 if (random_piece_index == current_index) {
598 xbt_assert(piece != -1,"WTF !!!");
602 // Random first policy
603 if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
605 int nb_interesting_pieces = 0;
606 int random_piece_index, current_index = 0;
607 // compute the number of interesting pieces
608 for (i = 0; i < FILE_PIECES; i++) {
609 if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
610 nb_interesting_pieces++;
613 xbt_assert(nb_interesting_pieces != 0, "WTF !!!");
614 // get a random interesting piece
615 random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces-1);
616 for (i = 0; i < FILE_PIECES; i++) {
617 if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
618 if (random_piece_index == current_index) {
625 xbt_assert(piece != -1,"WTF !!!");
627 } else { // Rarest first policy
629 short min = SHRT_MAX;
630 int nb_min_pieces = 0;
631 int random_rarest_index, current_index = 0;
632 // compute the smallest number of copies of available pieces
633 for (i = 0; i < FILE_PIECES; i++) {
634 if (peer->pieces_count[i] < min && peer->bitfield[i] == '0'
635 && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
636 min = peer->pieces_count[i];
638 xbt_assert(min != SHRT_MAX || !is_interested_and_free(peer, remote_peer), "WTF !!!");
639 // compute the number of rarest pieces
640 for (i = 0; i < FILE_PIECES; i++) {
641 if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
642 && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i))
645 xbt_assert(nb_min_pieces != 0 || !is_interested_and_free(peer, remote_peer), "WTF !!!");
646 // get a random rarest piece
647 random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces-1);
648 for (i = 0; i < FILE_PIECES; i++) {
649 if (peer->pieces_count[i] == min && peer->bitfield[i] == '0'
650 && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
651 if (random_rarest_index == current_index) {
658 xbt_assert(piece != -1 || !is_interested_and_free(peer, remote_peer),"WTF !!!");
665 * Update the list of current choked and unchoked peers, using the
667 * @param peer the current peer
669 void update_choked_peers(peer_t peer)
671 if (nb_interested_peers(peer) == 0)
673 // if(xbt_dict_size(peer->active_peers) > 0)
675 XBT_DEBUG("(%d) update_choked peers %d active peers", peer->id,
676 xbt_dict_size(peer->active_peers));
677 //update the current round
678 peer->round = (peer->round + 1) % 3;
679 char *key, *key_choked;
680 connection_t peer_choosed = NULL;
681 connection_t peer_choked = NULL;
682 //remove a peer from the list
683 xbt_dict_cursor_t cursor = NULL;
684 xbt_dict_cursor_first(peer->active_peers, &cursor);
685 if (xbt_dict_length(peer->active_peers) > 0) {
686 key_choked = xbt_dict_cursor_get_key(cursor);
687 peer_choked = xbt_dict_cursor_get_data(cursor);
689 xbt_dict_cursor_free(&cursor);
692 * If we are currently seeding, we unchoke the peer which has
693 * been unchoke the least time.
695 if (peer->pieces == FILE_PIECES) {
696 connection_t connection;
697 double unchoke_time = MSG_get_clock() + 1;
699 xbt_dict_foreach(peer->peers, cursor, key, connection) {
700 if (connection->last_unchoke < unchoke_time && connection->interested
701 && connection->choked_upload) {
702 unchoke_time = connection->last_unchoke;
703 peer_choosed = connection;
707 //Random optimistic unchoking
708 if (peer->round == 0) {
711 //We choose a random peer to unchoke.
712 int id_chosen = RngStream_RandInt(peer->stream, 0,
713 xbt_dict_length(peer->peers) - 1);
715 connection_t connection;
716 xbt_dict_foreach(peer->peers, cursor, key, connection) {
717 if (i == id_chosen) {
718 peer_choosed = connection;
723 xbt_dict_cursor_free(&cursor);
724 if (!peer_choosed->interested || !peer_choosed->choked_upload) {
728 } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
730 //Use the "fastest download" policy.
731 connection_t connection;
732 double fastest_speed = 0.0;
733 xbt_dict_foreach(peer->peers, cursor, key, connection) {
734 if (connection->peer_speed > fastest_speed
735 && connection->choked_upload && connection->interested) {
736 peer_choosed = connection;
737 fastest_speed = connection->peer_speed;
743 if (peer_choosed != NULL)
745 ("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
746 peer->id, peer_choosed->id, peer_choosed->interested,
747 peer_choosed->choked_upload);
749 // if (xbt_dict_size(peer->peers) > 0)
750 // xbt_assert((xbt_dict_size(peer->active_peers) != 0),
751 // "No more active peers !");
754 // if (peer_choked != NULL && peer_choked->choked_upload != 0)
755 // peer_choked = NULL;
756 // if (peer_choosed != NULL && peer_choosed->choked_upload == 0)
757 // peer_choosed = NULL;
759 if (peer_choked != peer_choosed) {
760 if (peer_choked != NULL) {
761 xbt_assert((!peer_choked->choked_upload),
762 "Tries to choked a choked peer");
763 peer_choked->choked_upload = 1;
764 xbt_assert((*((int *) key_choked) == peer_choked->id), "WTF !!!");
765 update_active_peers_set(peer, peer_choked);
766 // xbt_dict_remove_ext(peer->active_peers, key_choked, sizeof(int));
767 XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
768 send_choked(peer, peer_choked->mailbox);
770 if (peer_choosed != NULL) {
771 xbt_assert((peer_choosed->choked_upload),
772 "Tries to unchoked an unchoked peer");
773 peer_choosed->choked_upload = 0;
774 xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
775 sizeof(int), peer_choosed, NULL);
776 peer_choosed->last_unchoke = MSG_get_clock();
777 XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
778 update_active_peers_set(peer, peer_choosed);
779 send_unchoked(peer, peer_choosed->mailbox);
785 * Updates our "interested" state about peers: send "not interested" to peers
786 * that don't have any more pieces we want.
787 * @param peer our peer data
789 void update_interested_after_receive(peer_t peer)
792 xbt_dict_cursor_t cursor;
793 connection_t connection;
795 int interested, piece;
796 xbt_dict_foreach(peer->peers, cursor, key, connection) {
798 if (connection->am_interested) {
799 xbt_assert(connection->bitfield, "Bitfield not received");
800 //Check if the peer still has a piece we want.
802 for (i = 0; i < FILE_PIECES; i++) {
803 if (connection->bitfield[i] == '1' && peer->bitfield[i] == '0') {
808 if (!interested) { //no more piece to download from connection
809 connection->am_interested = 0;
810 send_notinterested(peer, connection->mailbox);
816 void update_bitfield_blocks(peer_t peer, int index, int block_index,
820 xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
821 xbt_assert((block_index >= 0
822 && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
824 for (i = block_index; i < (block_index + block_length); i++) {
825 peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
830 * Returns if a peer has completed the download of a piece
832 int piece_complete(peer_t peer, int index)
835 for (i = 0; i < PIECES_BLOCKS; i++) {
836 if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
844 * Returns the first block that a peer doesn't have in a piece.
845 * If the peer has all blocks of the piece, returns -1.
847 int get_first_block(peer_t peer, int piece)
850 for (i = 0; i < PIECES_BLOCKS; i++) {
851 if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
859 * Indicates if the remote peer has a piece not stored by the local peer
861 int is_interested(peer_t peer, connection_t remote_peer)
863 xbt_assert(remote_peer->bitfield, "Bitfield not received");
865 for (i = 0; i < FILE_PIECES; i++) {
866 if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
874 * Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer
876 int is_interested_and_free(peer_t peer, connection_t remote_peer)
878 xbt_assert(remote_peer->bitfield, "Bitfield not received");
880 for (i = 0; i < FILE_PIECES; i++) {
881 if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
882 && !in_current_pieces(peer, i)) {
891 * Returns a piece that is partially downloaded and stored by the remote peer if any
894 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
896 xbt_assert(remote_peer->bitfield, "Bitfield not received");
898 for (i = 0; i < FILE_PIECES; i++) {
899 if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0'
900 && !in_current_pieces(peer, i)) {
901 if (get_first_block(peer, i) > 0)
910 * Send request messages to a peer that have unchoked us
912 * @param remote_peer peer data to the peer we want to send the request
914 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
916 remote_peer->current_piece = piece;
918 int block_index, block_length;
919 xbt_assert(remote_peer->bitfield, "bitfield not received");
920 xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
921 block_index = get_first_block(peer, piece);
922 if (block_index != -1) {
923 block_length = PIECES_BLOCKS - block_index;
924 block_length = min(BLOCKS_REQUESTED, block_length);
925 send_request(peer, remote_peer->mailbox, piece, block_index,
932 * Indicates if a piece is currently being downloaded by the peer.
934 int in_current_pieces(peer_t peer, int piece)
938 xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
939 if (peer_piece == piece) {
949 /***********************************************************
951 * Low level message functions
953 ***********************************************************/
958 * Send a "interested" message to a peer
959 * @param peer peer data
960 * @param mailbox destination mailbox
962 void send_interested(peer_t peer, const char *mailbox)
965 task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
966 peer->id, task_message_size(MESSAGE_INTERESTED));
967 MSG_task_dsend(task, mailbox, task_message_free);
968 XBT_DEBUG("Sending INTERESTED to %s", mailbox);
973 * Send a "not interested" message to a peer
974 * @param peer peer data
975 * @param mailbox destination mailbox
977 void send_notinterested(peer_t peer, const char *mailbox)
980 task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
981 peer->id, task_message_size(MESSAGE_NOTINTERESTED));
982 MSG_task_dsend(task, mailbox, task_message_free);
983 XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
988 * Send a handshake message to all the peers the peer has.
989 * @param peer peer data
991 void send_handshake_all(peer_t peer)
993 connection_t remote_peer;
994 xbt_dict_cursor_t cursor = NULL;
996 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
998 task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
999 peer->id, task_message_size(MESSAGE_HANDSHAKE));
1000 MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1001 XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
1006 * Send a "handshake" message to an user
1007 * @param peer peer data
1008 * @param mailbox mailbox where to we send the message
1010 void send_handshake(peer_t peer, const char *mailbox)
1013 task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
1014 peer->id, task_message_size(MESSAGE_HANDSHAKE));
1015 MSG_task_dsend(task, mailbox, task_message_free);
1016 XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
1020 * Send a "choked" message to a peer.
1022 void send_choked(peer_t peer, const char *mailbox)
1024 XBT_DEBUG("Sending a CHOKE to %s", mailbox);
1026 task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
1027 peer->id, task_message_size(MESSAGE_CHOKE));
1028 MSG_task_dsend(task, mailbox, task_message_free);
1032 * Send a "unchoked" message to a peer
1034 void send_unchoked(peer_t peer, const char *mailbox)
1036 XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
1038 task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
1039 peer->id, task_message_size(MESSAGE_UNCHOKE));
1040 MSG_task_dsend(task, mailbox, task_message_free);
1044 * Send a "HAVE" message to all peers we are connected to
1046 void send_have(peer_t peer, int piece)
1048 XBT_DEBUG("Sending HAVE message to all my peers");
1049 connection_t remote_peer;
1050 xbt_dict_cursor_t cursor = NULL;
1052 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
1054 task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
1056 task_message_size(MESSAGE_HAVE));
1057 MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
1062 * Send a bitfield message to all the peers the peer has.
1063 * @param peer peer data
1065 void send_bitfield(peer_t peer, const char *mailbox)
1067 XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
1069 task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
1070 peer->bitfield, FILE_PIECES);
1071 //Async send and append to pending sends
1072 msg_comm_t comm = MSG_task_isend(task, mailbox);
1073 xbt_dynar_push(peer->pending_sends, &comm);
1077 * Send a "request" message to a pair, containing a request for a piece
1079 void send_request(peer_t peer, const char *mailbox, int piece,
1080 int block_index, int block_length)
1082 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
1083 block_index, block_length);
1085 task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
1086 block_index, block_length);
1087 MSG_task_dsend(task, mailbox, task_message_free);
1091 * Send a "piece" message to a pair, containing a piece of the file
1093 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
1094 int block_index, int block_length)
1096 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
1097 block_length, mailbox);
1098 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
1099 xbt_assert((peer->bitfield[piece] == '1'),
1100 "Tried to send a piece that we doesn't have.");
1102 task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
1103 stalled, block_index, block_length, BLOCK_SIZE);
1104 MSG_task_dsend(task, mailbox, task_message_free);