1 /* Copyright (c) 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "connection.h"
11 #include <xbt/RngStream.h>
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
15 //TODO: Let users change this
17 * File transfered data
19 static int FILE_SIZE = 5120;
20 static int FILE_PIECES = 10;
22 static int PIECES_BLOCKS = 5;
23 static int BLOCKS_REQUESTED = 2;
28 int peer(int argc, char *argv[])
32 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
35 peer_init(&peer, atoi(argv[1]), 1);
37 peer_init(&peer, atoi(argv[1]), 0);
40 double deadline = atof(argv[2]);
41 xbt_assert(deadline > 0, "Wrong deadline supplied");
42 XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
43 //Getting peer data from the tracker.
44 if (get_peers_data(&peer)) {
45 XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
46 XBT_DEBUG("Here is my current status: %s", peer.bitfield);
47 peer.begin_receive_time = MSG_get_clock();
48 if (has_finished(peer.bitfield)) {
49 peer.pieces = FILE_PIECES;
50 send_handshake_all(&peer);
51 seed_loop(&peer, deadline);
53 leech_loop(&peer, deadline);
54 seed_loop(&peer, deadline);
57 XBT_INFO("Couldn't contact the tracker.");
60 XBT_INFO("Here is my current status: %s", peer.bitfield);
61 if (peer.comm_received) {
62 MSG_comm_destroy(peer.comm_received);
70 * Peer main loop when it is leeching.
71 * @param peer peer data
72 * @param deadline time at which the peer has to leave
74 void leech_loop(peer_t peer, double deadline)
76 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
77 XBT_DEBUG("Start downloading.");
79 * Send a "handshake" message to all the peers it got
80 * (since it couldn't have gotten more than 50 peers)
82 send_handshake_all(peer);
83 //Wait for at least one "bitfield" message.
84 wait_for_pieces(peer, deadline);
85 XBT_DEBUG("Starting main leech loop");
86 while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
87 if (peer->comm_received == NULL) {
88 peer->task_received = NULL;
89 peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
91 if (MSG_comm_test(peer->comm_received)) {
92 msg_error_t status = MSG_comm_get_status(peer->comm_received);
93 MSG_comm_destroy(peer->comm_received);
94 peer->comm_received = NULL;
95 if (status == MSG_OK) {
96 handle_message(peer, peer->task_received);
99 if (peer->current_piece != -1) {
100 send_interested_to_peers(peer);
102 //If the current interested pieces is < MAX
103 if (peer->pieces_requested < MAX_PIECES) {
104 update_current_piece(peer);
107 //We don't execute the choke algorithm if we don't already have a piece
108 if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
109 update_choked_peers(peer);
110 next_choked_update += UPDATE_CHOKED_INTERVAL;
112 MSG_process_sleep(1);
119 * Peer main loop when it is seeding
120 * @param peer peer data
121 * @param deadline time when the peer will leave
123 void seed_loop(peer_t peer, double deadline)
125 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
126 XBT_DEBUG("Start seeding.");
127 //start the main seed loop
128 while (MSG_get_clock() < deadline) {
129 if (peer->comm_received == NULL) {
130 peer->task_received = NULL;
131 peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
133 if (MSG_comm_test(peer->comm_received)) {
134 msg_error_t status = MSG_comm_get_status(peer->comm_received);
135 MSG_comm_destroy(peer->comm_received);
136 peer->comm_received = NULL;
137 if (status == MSG_OK) {
138 handle_message(peer, peer->task_received);
141 if (MSG_get_clock() >= next_choked_update) {
142 update_choked_peers(peer);
143 //TODO: Change the choked peer algorithm when seeding.
144 next_choked_update += UPDATE_CHOKED_INTERVAL;
146 MSG_process_sleep(1);
153 * Retrieves the peer list from the tracker
154 * @param peer current peer data
156 int get_peers_data(peer_t peer)
158 int success = 0, send_success = 0;
159 double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
160 //Build the task to send to the tracker
161 tracker_task_data_t data =
162 tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
163 peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
164 //Build the task to send.
165 msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
166 msg_task_t task_received = NULL;
167 msg_comm_t comm_received;
168 while (!send_success && MSG_get_clock() < timeout) {
169 XBT_DEBUG("Sending a peer request to the tracker.");
171 MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX,
173 if (status == MSG_OK) {
177 while (!success && MSG_get_clock() < timeout) {
178 comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
179 msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
180 if (status == MSG_OK) {
181 tracker_task_data_t data = MSG_task_get_data(task_received);
184 //Add the peers the tracker gave us to our peer list.
185 xbt_dynar_foreach(data->peers, i, peer_id) {
186 if (peer_id != peer->id)
187 xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int),
188 connection_new(peer_id), NULL);
191 //free the communication and the task
192 MSG_comm_destroy(comm_received);
193 tracker_task_data_free(data);
194 MSG_task_destroy(task_received);
195 comm_received = NULL;
203 * Initialize the peer data.
204 * @param peer peer data
205 * @param id id of the peer to take in the network
206 * @param seed indicates if the peer is a seed.
208 void peer_init(peer_t peer, int id, int seed)
211 sprintf(peer->mailbox, "%d", id);
212 sprintf(peer->mailbox_tracker, "tracker_%d", id);
213 peer->peers = xbt_dict_new();
214 peer->active_peers = xbt_dict_new();
215 peer->hostname = MSG_host_get_name(MSG_host_self());
217 peer->bitfield = xbt_new(char, FILE_PIECES + 1);
218 peer->bitfield_blocks = xbt_new(char, (FILE_PIECES) * (PIECES_BLOCKS) + 1);
220 memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1));
221 memset(peer->bitfield_blocks, '1',
222 sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
224 memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1));
225 memset(peer->bitfield_blocks, '0',
226 sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
229 peer->bitfield[FILE_PIECES] = '\0';
232 peer->pieces_count = xbt_new0(short, FILE_PIECES);
233 peer->pieces_requested = 0;
235 peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
236 peer->current_piece = -1;
238 peer->stream = RngStream_CreateStream("");
239 peer->comm_received = NULL;
245 * Destroys a poor peer object.
247 void peer_free(peer_t peer)
250 connection_t connection;
251 xbt_dict_cursor_t cursor;
252 xbt_dict_foreach(peer->peers, cursor, key, connection) {
253 connection_free(connection);
255 xbt_dict_free(&peer->peers);
256 xbt_dict_free(&peer->active_peers);
257 xbt_dynar_free(&peer->current_pieces);
258 xbt_free(peer->pieces_count);
259 xbt_free(peer->bitfield);
260 xbt_free(peer->bitfield_blocks);
262 RngStream_DeleteStream(&peer->stream);
266 * Returns if a peer has finished downloading the file
267 * @param bitfield peer bitfield
269 int has_finished(char *bitfield)
271 return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
275 * Handle a received message sent by another peer
276 * @param peer Peer data
277 * @param task task received.
279 void handle_message(peer_t peer, msg_task_t task)
281 message_t message = MSG_task_get_data(task);
282 connection_t remote_peer;
284 xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
286 switch (message->type) {
288 case MESSAGE_HANDSHAKE:
289 XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
290 message->issuer_host_name);
291 //Check if the peer is in our connection list.
293 xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
294 connection_new(message->peer_id), NULL);
295 send_handshake(peer, message->mailbox);
297 //Send our bitfield to the peer
298 send_bitfield(peer, message->mailbox);
300 case MESSAGE_BITFIELD:
301 XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
302 message->issuer_host_name);
303 //Update the pieces list
304 update_pieces_count_from_bitfield(peer, message->bitfield);
306 remote_peer->bitfield = xbt_strdup(message->bitfield);
307 //Update the current piece
308 if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
309 update_current_piece(peer);
312 case MESSAGE_INTERESTED:
313 XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
314 message->issuer_host_name);
315 xbt_assert((remote_peer != NULL),
316 "A non-in-our-list peer has sent us a message. WTH ?");
317 //Update the interested state of the peer.
318 remote_peer->interested = 1;
320 case MESSAGE_NOTINTERESTED:
321 XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox,
322 message->issuer_host_name);
323 xbt_assert((remote_peer != NULL),
324 "A non-in-our-list peer has sent us a message. WTH ?");
325 remote_peer->interested = 0;
327 case MESSAGE_UNCHOKE:
328 xbt_assert((remote_peer != NULL),
329 "A non-in-our-list peer has sent us a message. WTH ?");
330 XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
331 message->issuer_host_name);
332 remote_peer->choked_download = 0;
333 xbt_dict_set_ext(peer->active_peers, (char *) &message->peer_id,
334 sizeof(int), remote_peer, NULL);
335 //Send requests to the peer, since it has unchoked us
336 send_requests_to_peer(peer, remote_peer);
339 xbt_assert((remote_peer != NULL),
340 "A non-in-our-list peer has sent us a message. WTH ?");
341 XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
342 message->issuer_host_name);
343 remote_peer->choked_download = 1;
346 xbt_dict_remove_ext(peer->active_peers, (char *) &message->peer_id,
354 XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
355 message->mailbox, message->issuer_host_name, message->index);
356 xbt_assert((message->index >= 0
357 && message->index < FILE_PIECES),
358 "Wrong HAVE message received");
359 if (remote_peer->bitfield == NULL)
361 remote_peer->bitfield[message->index] = '1';
362 peer->pieces_count[message->index]++;
363 //If the piece is in our pieces, we tell the peer that we are interested.
364 if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
365 remote_peer->am_interested = 1;
366 send_interested(peer, remote_peer->mailbox);
369 case MESSAGE_REQUEST:
370 xbt_assert((message->index >= 0
371 && message->index < FILE_PIECES), "Wrong request received");
372 if (!remote_peer->choked_upload) {
373 XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
374 message->mailbox, message->issuer_host_name, message->peer_id,
375 message->block_index,
376 message->block_index + message->block_length);
377 if (peer->bitfield[message->index] == '1') {
378 send_piece(peer, message->mailbox, message->index, 0,
379 message->block_index, message->block_length);
382 XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
383 message->mailbox, message->issuer_host_name, message->peer_id);
387 xbt_assert((message->index >= 0
388 && message->index < FILE_PIECES), "Wrong piece received");
389 //TODO: Execute à computation.
390 if (message->stalled) {
391 XBT_DEBUG("The received piece %d from %s (%s) is STALLED", message->index,
392 message->mailbox, message->issuer_host_name);
394 XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
395 message->block_index,
396 message->block_index + message->block_length, message->mailbox,
397 message->issuer_host_name);
398 if (peer->bitfield[message->index] == '0') {
399 update_bitfield_blocks(peer, message->index, message->block_index,
400 message->block_length);
401 if (piece_complete(peer, message->index)) {
402 peer->pieces_requested--;
403 //Removing the piece from our piece list
405 int piece_index = -1, piece;
406 xbt_dynar_foreach(peer->current_pieces, i, piece) {
407 if (piece == message->index) {
412 xbt_assert(piece_index != -1, "Received an incorrect piece");
413 xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
414 //Setting the fact that we have the piece
415 peer->bitfield[message->index] = '1';
417 XBT_DEBUG("My status is now %s", peer->bitfield);
418 //Sending the information to all the peers we are connected to
419 send_have(peer, message->index);
420 //sending UNINTERSTED to peers that doesn't have what we want.
421 update_interested_after_receive(peer);
424 XBT_DEBUG("However, we already have it");
429 //Update the peer speed.
431 connection_add_speed_value(remote_peer,
432 1.0 / (MSG_get_clock() -
433 peer->begin_receive_time));
435 peer->begin_receive_time = MSG_get_clock();
437 task_message_free(task);
441 * Wait for the node to receive interesting bitfield messages (ie: non empty)
443 * @param deadline peer deadline
444 * @param peer peer data
446 void wait_for_pieces(peer_t peer, double deadline)
449 while (MSG_get_clock() < deadline && !finished) {
450 if (peer->comm_received == NULL) {
451 peer->task_received = NULL;
452 peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
454 msg_error_t status = MSG_comm_wait(peer->comm_received, TIMEOUT_MESSAGE);
455 //free the comm already, we don't need it anymore
456 MSG_comm_destroy(peer->comm_received);
457 peer->comm_received = NULL;
458 if (status == MSG_OK) {
459 MSG_task_get_data(peer->task_received);
460 handle_message(peer, peer->task_received);
461 if (peer->current_piece != -1) {
469 * Updates the list of who has a piece from a bitfield
470 * @param peer peer we want to update the list
471 * @param bitfield bitfield
473 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
476 for (i = 0; i < FILE_PIECES; i++) {
477 if (bitfield[i] == '1') {
478 peer->pieces_count[i]++;
484 * Update the piece the peer is currently interested in.
485 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
486 * If the peer has less than 3 pieces, he chooses a piece at random.
487 * If the peer has more than pieces, he downloads the pieces that are the less
490 void update_current_piece(peer_t peer)
492 if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)) {
495 if (1 || peer->pieces < 3) {
498 peer->current_piece =
499 RngStream_RandInt(peer->stream, 0, FILE_PIECES - 1);;
502 (peer->bitfield[peer->current_piece] == '0'
503 && !in_current_pieces(peer, peer->current_piece)));
505 //Trivial min algorithm.
508 for (i = 0; i < FILE_PIECES; i++) {
509 if (peer->bitfield[i] == '0') {
510 min = peer->pieces_count[i];
515 xbt_assert((min > -1), "Couldn't find a minimum");
516 for (i = 1; i < FILE_PIECES; i++) {
517 if (peer->pieces_count[i] < min && peer->bitfield[i] == '0') {
518 min = peer->pieces_count[i];
522 peer->current_piece = min_id;
524 xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
525 XBT_DEBUG("New interested piece: %d", peer->current_piece);
526 xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES),
527 "Peer want to retrieve a piece that doesn't exist.");
531 * Update the list of current choked and unchoked peers, using the
533 * @param peer the current peer
535 void update_choked_peers(peer_t peer)
537 //update the current round
538 peer->round = (peer->round + 1) % 3;
540 connection_t peer_choosed = NULL;
541 //remove a peer from the list
542 xbt_dict_cursor_t cursor = NULL;
543 xbt_dict_cursor_first(peer->active_peers, &cursor);
544 if (xbt_dict_length(peer->active_peers) > 0) {
545 key = xbt_dict_cursor_get_key(cursor);
546 connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
548 send_choked(peer, peer_choked->mailbox);
549 peer_choked->choked_upload = 1;
551 xbt_dict_remove_ext(peer->active_peers, key, sizeof(int));
553 xbt_dict_cursor_free(&cursor);
556 * If we are currently seeding, we unchoke the peer which has
557 * been unchoke the least time.
559 if (peer->pieces == FILE_PIECES) {
560 connection_t connection;
561 double unchoke_time = MSG_get_clock() + 1;
563 xbt_dict_foreach(peer->peers, cursor, key, connection) {
564 if (connection->last_unchoke < unchoke_time && connection->interested) {
565 unchoke_time = connection->last_unchoke;
566 peer_choosed = connection;
570 //Random optimistic unchoking
571 if (peer->round == 0) {
574 //We choose a random peer to unchoke.
576 RngStream_RandInt(peer->stream, 0,
577 xbt_dict_length(peer->peers) - 1);
579 connection_t connection;
580 xbt_dict_foreach(peer->peers, cursor, key, connection) {
581 if (i == id_chosen) {
582 peer_choosed = connection;
587 xbt_dict_cursor_free(&cursor);
588 if (peer_choosed->interested == 0) {
592 } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
594 //Use the "fastest download" policy.
595 connection_t connection;
596 double fastest_speed = 0.0;
597 xbt_dict_foreach(peer->peers, cursor, key, connection) {
598 if (connection->peer_speed > fastest_speed && connection->choked_upload
599 && connection->interested) {
600 peer_choosed = connection;
601 fastest_speed = connection->peer_speed;
607 if (peer_choosed != NULL) {
608 peer_choosed->choked_upload = 0;
609 xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id,
610 sizeof(int), peer_choosed, NULL);
611 peer_choosed->last_unchoke = MSG_get_clock();
612 send_unchoked(peer, peer_choosed->mailbox);
618 * Updates our "interested" state about peers: send "not interested" to peers
619 * that don't have any more pieces we want.
620 * @param peer our peer data
622 void update_interested_after_receive(peer_t peer)
625 xbt_dict_cursor_t cursor;
626 connection_t connection;
628 int interested, piece;
629 xbt_dict_foreach(peer->peers, cursor, key, connection) {
631 if (connection->am_interested) {
632 //Check if the peer still has a piece we want.
633 xbt_dynar_foreach(peer->current_pieces, cpt, piece) {
634 xbt_assert((piece >= 0), "Wrong piece.");
635 if (connection->bitfield && connection->bitfield[piece] == '1') {
641 connection->am_interested = 0;
642 send_notinterested(peer, connection->mailbox);
648 void update_bitfield_blocks(peer_t peer, int index, int block_index,
652 xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
653 xbt_assert((block_index >= 0
654 && block_index <= PIECES_BLOCKS), "Wrong block : %d.",
656 for (i = block_index; i < (block_index + block_length); i++) {
657 peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
662 * Returns if a peer has completed the download of a piece
664 int piece_complete(peer_t peer, int index)
667 for (i = 0; i < PIECES_BLOCKS; i++) {
668 if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
676 * Returns the first block that a peer doesn't have in a piece
678 int get_first_block(peer_t peer, int piece)
681 for (i = 0; i < PIECES_BLOCKS; i++) {
682 if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
690 * Send request messages to a peer that have unchoked us
692 * @param remote_peer peer data to the peer we want to send the request
694 void send_requests_to_peer(peer_t peer, connection_t remote_peer)
697 int piece, block_index, block_length;
698 xbt_dynar_foreach(peer->current_pieces, i, piece) {
699 if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
700 block_index = get_first_block(peer, piece);
701 if (block_index != -1) {
702 block_length = PIECES_BLOCKS - block_index;
703 block_length = min(BLOCKS_REQUESTED, block_length);
704 send_request(peer, remote_peer->mailbox, piece, block_index,
713 * Find the peers that have the current interested piece and send them
714 * the "interested" message
716 void send_interested_to_peers(peer_t peer)
719 xbt_dict_cursor_t cursor = NULL;
720 connection_t connection;
721 xbt_assert((peer->current_piece != -1),
722 "Tried to send a interested message wheras the current_piece is -1");
723 xbt_dict_foreach(peer->peers, cursor, key, connection) {
724 if (connection->bitfield
725 && connection->bitfield[peer->current_piece] == '1') {
726 connection->am_interested = 1;
728 task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
730 MSG_task_dsend(task, connection->mailbox, task_message_free);
731 XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
734 peer->current_piece = -1;
735 peer->pieces_requested++;
739 * Send a "interested" message to a peer
740 * @param peer peer data
741 * @param mailbox destination mailbox
743 void send_interested(peer_t peer, const char *mailbox)
746 task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
748 MSG_task_dsend(task, mailbox, task_message_free);
749 XBT_DEBUG("Sending INTERESTED to %s", mailbox);
754 * Send a "not interested" message to a peer
755 * @param peer peer data
756 * @param mailbox destination mailbox
758 void send_notinterested(peer_t peer, const char *mailbox)
761 task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
763 MSG_task_dsend(task, mailbox, task_message_free);
764 XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
769 * Send a handshake message to all the peers the peer has.
770 * @param peer peer data
772 void send_handshake_all(peer_t peer)
774 connection_t remote_peer;
775 xbt_dict_cursor_t cursor = NULL;
777 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
779 task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
781 MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
782 XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
787 * Send a "handshake" message to an user
788 * @param peer peer data
789 * @param mailbox mailbox where to we send the message
791 void send_handshake(peer_t peer, const char *mailbox)
794 task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
796 MSG_task_dsend(task, mailbox, task_message_free);
797 XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
801 * Send a "choked" message to a peer.
803 void send_choked(peer_t peer, const char *mailbox)
805 XBT_DEBUG("Sending a CHOKE to %s", mailbox);
807 task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
808 MSG_task_dsend(task, mailbox, task_message_free);
812 * Send a "unchoked" message to a peer
814 void send_unchoked(peer_t peer, const char *mailbox)
816 XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
818 task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
820 MSG_task_dsend(task, mailbox, task_message_free);
824 * Send a "HAVE" message to all peers we are connected to
826 void send_have(peer_t peer, int piece)
828 XBT_DEBUG("Sending HAVE message to all my peers");
829 connection_t remote_peer;
830 xbt_dict_cursor_t cursor = NULL;
832 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
834 task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
836 MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
841 * Send a bitfield message to all the peers the peer has.
842 * @param peer peer data
844 void send_bitfield(peer_t peer, const char *mailbox)
846 XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
848 task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
850 MSG_task_dsend(task, mailbox, task_message_free);
854 * Send a "request" message to a pair, containing a request for a piece
856 void send_request(peer_t peer, const char *mailbox, int piece, int block_index,
859 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece,
860 block_index, block_length);
862 task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece,
863 block_index, block_length);
864 MSG_task_dsend(task, mailbox, task_message_free);
868 * Send a "piece" message to a pair, containing a piece of the file
870 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
871 int block_index, int block_length)
873 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
874 block_length, mailbox);
875 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
876 xbt_assert((peer->bitfield[piece] == '1'),
877 "Tried to send a piece that we doesn't have.");
879 task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
880 stalled, block_index, block_length);
881 MSG_task_dsend(task, mailbox, task_message_free);
884 int in_current_pieces(peer_t peer, int piece)
887 int is_in = 0, peer_piece;
888 xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
889 if (peer_piece == piece) {