1 /* Copyright (c) 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "connection.h"
11 #include <xbt/RngStream.h>
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers,
14 "Messages specific for the peers");
21 int peer(int argc, char *argv[]) {
24 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
27 peer_init(&peer, atoi(argv[1]),1);
30 peer_init(&peer, atoi(argv[1]),0);
33 double deadline = atof(argv[2]);
34 xbt_assert(deadline > 0, "Wrong deadline supplied");
35 XBT_INFO("Hi, I'm joining the network with id %d", peer.id);
36 //Getting peer data from the tracker.
37 if (get_peers_data(&peer)) {
38 XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
39 XBT_DEBUG("Here is my current status: %s",peer.bitfield);
40 peer.begin_receive_time = MSG_get_clock();
41 if (has_finished(peer.bitfield)) {
42 peer.pieces = FILE_PIECES;
43 send_handshake_all(&peer);
44 seed_loop(&peer,deadline);
47 leech_loop(&peer,deadline);
48 seed_loop(&peer,deadline);
52 XBT_INFO("Couldn't contact the tracker.");
55 XBT_INFO("Here is my current status: %s", peer.bitfield);
56 if (peer.comm_received) {
57 MSG_comm_destroy(peer.comm_received);
64 * Peer main loop when it is leeching.
65 * @param peer peer data
66 * @param deadline time at which the peer has to leave
68 void leech_loop(peer_t peer, double deadline) {
69 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
70 XBT_DEBUG("Start downloading.");
72 * Send a "handshake" message to all the peers it got
73 * (since it couldn't have gotten more than 50 peers)
75 send_handshake_all(peer);
76 //Wait for at least one "bitfield" message.
77 wait_for_pieces(peer,deadline);
78 XBT_DEBUG("Starting main leech loop");
79 while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
80 if (peer->comm_received == NULL) {
81 peer->task_received = NULL;
82 peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
84 if (MSG_comm_test(peer->comm_received)) {
85 msg_error_t status = MSG_comm_get_status(peer->comm_received);
86 MSG_comm_destroy(peer->comm_received);
87 peer->comm_received = NULL;
88 if (status == MSG_OK) {
89 handle_message(peer,peer->task_received);
93 if (peer->current_piece != -1) {
94 send_interested_to_peers(peer);
97 //If the current interested pieces is < MAX
98 if (peer->pieces_requested < MAX_PIECES) {
99 update_current_piece(peer);
102 //We don't execute the choke algorithm if we don't already have a piece
103 if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
104 update_choked_peers(peer);
105 next_choked_update += UPDATE_CHOKED_INTERVAL;
108 MSG_process_sleep(1);
114 * Peer main loop when it is seeding
115 * @param peer peer data
116 * @param deadline time when the peer will leave
118 void seed_loop(peer_t peer, double deadline) {
119 double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
120 XBT_DEBUG("Start seeding.");
121 //start the main seed loop
122 while (MSG_get_clock() < deadline) {
123 if (peer->comm_received == NULL) {
124 peer->task_received = NULL;
125 peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
127 if (MSG_comm_test(peer->comm_received)) {
128 msg_error_t status = MSG_comm_get_status(peer->comm_received);
129 MSG_comm_destroy(peer->comm_received);
130 peer->comm_received = NULL;
131 if (status == MSG_OK) {
132 handle_message(peer,peer->task_received);
136 if (MSG_get_clock() >= next_choked_update) {
137 update_choked_peers(peer);
138 //TODO: Change the choked peer algorithm when seeding.
139 next_choked_update += UPDATE_CHOKED_INTERVAL;
142 MSG_process_sleep(1);
148 * Retrieves the peer list from the tracker
149 * @param peer current peer data
151 int get_peers_data(peer_t peer) {
152 int success = 0, send_success = 0;
153 double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
154 //Build the task to send to the tracker
155 tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()),
156 peer->mailbox_tracker,peer->id, 0, 0, FILE_SIZE);
157 //Build the task to send.
158 msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
159 msg_task_t task_received = NULL;
160 msg_comm_t comm_received;
161 while (!send_success && MSG_get_clock() < timeout) {
162 XBT_DEBUG("Sending a peer request to the tracker.");
163 msg_error_t status = MSG_task_send_with_timeout(task_send,TRACKER_MAILBOX,GET_PEERS_TIMEOUT);
164 if (status == MSG_OK) {
168 while (!success && MSG_get_clock() < timeout) {
169 comm_received = MSG_task_irecv(&task_received,peer->mailbox_tracker);
170 msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
171 if (status == MSG_OK) {
172 tracker_task_data_t data = MSG_task_get_data(task_received);
174 //Add the peers the tracker gave us to our peer list.
175 xbt_dynar_foreach(data->peers,i,peer_id) {
176 if (peer_id != peer->id)
177 xbt_dict_set_ext(peer->peers,(char*)&peer_id,sizeof(int),connection_new(peer_id),NULL);
180 //free the communication and the task
181 MSG_comm_destroy(comm_received);
182 tracker_task_data_free(data);
183 MSG_task_destroy(task_received);
184 comm_received = NULL;
191 * Initialize the peer data.
192 * @param peer peer data
193 * @param id id of the peer to take in the network
194 * @param seed indicates if the peer is a seed.
196 void peer_init(peer_t peer, int id, int seed) {
198 sprintf(peer->mailbox,"%d",id);
199 sprintf(peer->mailbox_tracker,"tracker_%d",id);
200 peer->peers = xbt_dict_new();
201 peer->active_peers = xbt_dict_new();
202 peer->hostname = MSG_host_get_name(MSG_host_self());
204 peer->bitfield = xbt_new(char,FILE_PIECES + 1);
205 peer->bitfield_blocks = xbt_new(char,(FILE_PIECES) * (PIECES_BLOCKS) + 1);
207 memset(peer->bitfield,'1',sizeof(char) * (FILE_PIECES + 1));
208 memset(peer->bitfield_blocks,'1',sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
211 memset(peer->bitfield,'0',sizeof(char) * (FILE_PIECES + 1));
212 memset(peer->bitfield_blocks,'0',sizeof(char) * FILE_PIECES * (PIECES_BLOCKS));
215 peer->bitfield[FILE_PIECES] = '\0';
218 peer->pieces_count = xbt_new0(short,FILE_PIECES);
219 peer->pieces_requested = 0;
221 peer->current_pieces = xbt_dynar_new(sizeof(int),NULL);
222 peer->current_piece = -1;
224 peer->stream = RngStream_CreateStream("");
225 peer->comm_received = NULL;
230 * Destroys a poor peer object.
232 void peer_free(peer_t peer) {
234 connection_t connection;
235 xbt_dict_cursor_t cursor;
236 xbt_dict_foreach(peer->peers,cursor,key,connection) {
237 connection_free(connection);
239 xbt_dict_free(&peer->peers);
240 xbt_dict_free(&peer->active_peers);
241 xbt_dynar_free(&peer->current_pieces);
242 xbt_free(peer->pieces_count);
243 xbt_free(peer->bitfield);
244 xbt_free(peer->bitfield_blocks);
246 RngStream_DeleteStream(&peer->stream);
249 * Returns if a peer has finished downloading the file
250 * @param bitfield peer bitfield
252 int has_finished(char *bitfield) {
253 return ((memchr(bitfield,'0',sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
256 * Handle a received message sent by another peer
257 * @param peer Peer data
258 * @param task task received.
260 void handle_message(peer_t peer, msg_task_t task) {
261 message_t message = MSG_task_get_data(task);
262 connection_t remote_peer;
263 remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char*)&message->peer_id, sizeof(int));
264 switch (message->type) {
266 case MESSAGE_HANDSHAKE:
267 XBT_DEBUG("Received a HANDSHAKE from %s (%s)",message->mailbox, message->issuer_host_name);
268 //Check if the peer is in our connection list.
270 xbt_dict_set_ext(peer->peers,(char*)&message->peer_id,sizeof(int),connection_new(message->peer_id),NULL);
271 send_handshake(peer,message->mailbox);
273 //Send our bitfield to the peer
274 send_bitfield(peer,message->mailbox);
276 case MESSAGE_BITFIELD:
277 XBT_DEBUG("Recieved a BITFIELD message from %s (%s)",message->mailbox,message->issuer_host_name);
278 //Update the pieces list
279 update_pieces_count_from_bitfield(peer,message->bitfield);
281 remote_peer->bitfield = xbt_strdup(message->bitfield);
282 //Update the current piece
283 if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
284 update_current_piece(peer);
287 case MESSAGE_INTERESTED:
288 XBT_DEBUG("Recieved an INTERESTED message from %s (%s)",message->mailbox,message->issuer_host_name);
289 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
290 //Update the interested state of the peer.
291 remote_peer->interested = 1;
293 case MESSAGE_NOTINTERESTED:
294 XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
295 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
296 remote_peer->interested = 0;
298 case MESSAGE_UNCHOKE:
299 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
300 XBT_DEBUG("Received a UNCHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
301 remote_peer->choked_download = 0;
302 xbt_dict_set_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int),remote_peer,NULL);
303 //Send requests to the peer, since it has unchoked us
304 send_requests_to_peer(peer,remote_peer);
307 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
308 XBT_DEBUG("Received a CHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
309 remote_peer->choked_download = 1;
312 xbt_dict_remove_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int));
319 XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",message->mailbox, message->issuer_host_name, message->index);
320 xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
321 if (remote_peer->bitfield == NULL)
323 remote_peer->bitfield[message->index] = '1';
324 peer->pieces_count[message->index]++;
325 //If the piece is in our pieces, we tell the peer that we are interested.
326 if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
327 remote_peer->am_interested = 1;
328 send_interested(peer,remote_peer->mailbox);
331 case MESSAGE_REQUEST:
332 xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
333 if (!remote_peer->choked_upload) {
334 XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",message->mailbox,message->issuer_host_name,message->peer_id, message->block_index,message->block_index + message->block_length);
335 if (peer->bitfield[message->index] == '1') {
336 send_piece(peer, message->mailbox, message->index, 0, message->block_index, message->block_length);
340 XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",message->mailbox,message->issuer_host_name,message->peer_id);
344 xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
345 //TODO: Execute à computation.
346 if (message->stalled) {
347 XBT_DEBUG("The received piece %d from %s (%s) is STALLED",message->index, message->mailbox, message->issuer_host_name);
350 XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)",message->index, message->block_index, message->block_index + message->block_length, message->mailbox, message->issuer_host_name);
351 if (peer->bitfield[message->index] == '0') {
352 update_bitfield_blocks(peer,message->index,message->block_index,message->block_length);
353 if (piece_complete(peer,message->index)) {
354 peer->pieces_requested--;
355 //Removing the piece from our piece list
356 int piece_index = -1, i, piece;
357 xbt_dynar_foreach(peer->current_pieces, i, piece) {
358 if (piece == message->index) {
363 xbt_assert(piece_index != -1, "Received an incorrect piece");
364 xbt_dynar_remove_at(peer->current_pieces,piece_index,NULL);
365 //Setting the fact that we have the piece
366 peer->bitfield[message->index] = '1';
368 XBT_DEBUG("My status is now %s",peer->bitfield);
369 //Sending the information to all the peers we are connected to
370 send_have(peer,message->index);
371 //sending UNINTERSTED to peers that doesn't have what we want.
372 update_interested_after_receive(peer);
376 XBT_DEBUG("However, we already have it");
381 //Update the peer speed.
383 connection_add_speed_value(remote_peer, 1.0 / ( MSG_get_clock() - peer->begin_receive_time));
385 peer->begin_receive_time = MSG_get_clock();
387 task_message_free(task);
391 * Wait for the node to receive interesting bitfield messages (ie: non empty)
393 * @param deadline peer deadline
394 * @param peer peer data
396 void wait_for_pieces(peer_t peer, double deadline) {
398 while (MSG_get_clock() < deadline && !finished) {
399 if (peer->comm_received == NULL) {
400 peer->task_received = NULL;
401 peer->comm_received = MSG_task_irecv(&peer->task_received,peer->mailbox);
403 msg_error_t status = MSG_comm_wait(peer->comm_received,TIMEOUT_MESSAGE);
404 //free the comm already, we don't need it anymore
405 MSG_comm_destroy(peer->comm_received);
406 peer->comm_received = NULL;
407 if (status == MSG_OK) {
408 message_t message = MSG_task_get_data(peer->task_received);
409 handle_message(peer,peer->task_received);
410 if (peer->current_piece != -1) {
417 * Updates the list of who has a piece from a bitfield
418 * @param peer peer we want to update the list
419 * @param bitfield bitfield
421 void update_pieces_count_from_bitfield(peer_t peer, char *bitfield) {
423 for (i = 0; i < FILE_PIECES; i++) {
424 if (bitfield[i] == '1') {
425 peer->pieces_count[i]++;
430 * Update the piece the peer is currently interested in.
431 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
432 * If the peer has less than 3 pieces, he chooses a piece at random.
433 * If the peer has more than pieces, he downloads the pieces that are the less
436 void update_current_piece(peer_t peer) {
437 if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)) {
440 if (1 || peer->pieces < 3) {
443 peer->current_piece = RngStream_RandInt(peer->stream,0,FILE_PIECES - 1);;
445 } while (!(peer->bitfield[peer->current_piece] == '0' && !in_current_pieces(peer,peer->current_piece)));
448 //Trivial min algorithm.
451 for (i = 0; i < FILE_PIECES; i++) {
452 if (peer->bitfield[i] == '0') {
453 min = peer->pieces_count[i];
458 xbt_assert((min > -1), "Couldn't find a minimum");
459 for (i = 1; i < FILE_PIECES; i++) {
460 if (peer->pieces_count[i] < min && peer->bitfield[i] == '0') {
461 min = peer->pieces_count[i];
465 peer->current_piece = min_id;
467 xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
468 XBT_DEBUG("New interested piece: %d",peer->current_piece);
469 xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES, "Peer want to retrieve a piece that doesn't exist."));
472 * Update the list of current choked and unchoked peers, using the
474 * @param peer the current peer
476 void update_choked_peers(peer_t peer) {
477 //update the current round
478 peer->round = (peer->round + 1) % 3;
481 connection_t peer_choosed = NULL;
482 //remove a peer from the list
483 xbt_dict_cursor_t cursor = NULL;
484 xbt_dict_cursor_first(peer->active_peers,&cursor);
485 if (xbt_dict_length(peer->active_peers) > 0) {
486 key = xbt_dict_cursor_get_key(cursor);
487 connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
489 send_choked(peer,peer_choked->mailbox);
490 peer_choked->choked_upload = 1;
492 xbt_dict_remove_ext(peer->active_peers,key,sizeof(int));
494 xbt_dict_cursor_free(&cursor);
497 * If we are currently seeding, we unchoke the peer which has
498 * been unchoke the least time.
500 if (peer->pieces == FILE_PIECES) {
501 connection_t connection;
502 double unchoke_time = MSG_get_clock() + 1;
504 xbt_dict_foreach(peer->peers,cursor,key,connection) {
505 if (connection->last_unchoke < unchoke_time && connection->interested) {
506 unchoke_time = connection->last_unchoke;
507 peer_choosed = connection;
512 //Random optimistic unchoking
513 if (peer->round == 0) {
516 //We choose a random peer to unchoke.
517 int id_chosen = RngStream_RandInt(peer->stream,0,xbt_dict_length(peer->peers) - 1);
519 connection_t connection;
520 xbt_dict_foreach(peer->peers,cursor,key,connection) {
521 if (i == id_chosen) {
522 peer_choosed = connection;
527 xbt_dict_cursor_free(&cursor);
528 if (peer_choosed->interested == 0) {
532 } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
535 //Use the "fastest download" policy.
536 connection_t connection;
537 double fastest_speed = 0.0;
538 xbt_dict_foreach(peer->peers,cursor,key,connection) {
539 if (connection->peer_speed > fastest_speed && connection->choked_upload && connection->interested) {
540 peer_choosed = connection;
541 fastest_speed = connection->peer_speed;
547 if (peer_choosed != NULL) {
548 peer_choosed->choked_upload = 0;
549 xbt_dict_set_ext(peer->active_peers,(char*)&peer_choosed->id,sizeof(int),peer_choosed,NULL);
550 peer_choosed->last_unchoke = MSG_get_clock();
551 send_unchoked(peer,peer_choosed->mailbox);
556 * Updates our "interested" state about peers: send "not interested" to peers
557 * that don't have any more pieces we want.
558 * @param peer our peer data
560 void update_interested_after_receive(peer_t peer) {
562 xbt_dict_cursor_t cursor;
563 connection_t connection;
564 int interested, cpt, piece;
565 xbt_dict_foreach(peer->peers, cursor, key, connection) {
567 if (connection->am_interested) {
568 //Check if the peer still has a piece we want.
569 xbt_dynar_foreach(peer->current_pieces, cpt, piece) {
570 xbt_assert((piece >= 0), "Wrong piece.");
571 if (connection->bitfield && connection->bitfield[piece] == '1') {
577 connection->am_interested = 0;
578 send_notinterested(peer,connection->mailbox);
583 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length) {
585 xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
586 xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.",block_index);
587 for (i = block_index; i < (block_index + block_length); i++) {
588 peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1';
592 * Returns if a peer has completed the download of a piece
594 int piece_complete(peer_t peer, int index) {
596 for (i = 0; i < PIECES_BLOCKS; i++) {
597 if (peer->bitfield_blocks[index * PIECES_BLOCKS + i] == '0') {
604 * Returns the first block that a peer doesn't have in a piece
606 int get_first_block(peer_t peer, int piece) {
608 for (i = 0; i < PIECES_BLOCKS; i++) {
609 if (peer->bitfield_blocks[piece * PIECES_BLOCKS + i] == '0') {
616 * Send request messages to a peer that have unchoked us
618 * @param remote_peer peer data to the peer we want to send the request
620 void send_requests_to_peer(peer_t peer, connection_t remote_peer) {
621 int i, piece, block_index, block_length;
622 xbt_dynar_foreach(peer->current_pieces, i, piece) {
623 if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
624 block_index = get_first_block(peer,piece);
625 if (block_index != -1) {
626 block_length = PIECES_BLOCKS - block_index;
627 block_length = min(BLOCKS_REQUESTED,block_length);
628 send_request(peer, remote_peer->mailbox, piece, block_index, block_length);
635 * Find the peers that have the current interested piece and send them
636 * the "interested" message
638 void send_interested_to_peers(peer_t peer) {
640 xbt_dict_cursor_t cursor=NULL;
641 connection_t connection;
642 xbt_assert((peer->current_piece != -1), "Tried to send a interested message wheras the current_piece is -1");
643 xbt_dict_foreach(peer->peers, cursor, key, connection) {
644 if (connection->bitfield && connection->bitfield[peer->current_piece] == '1') {
645 connection->am_interested = 1;
646 msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id);
647 MSG_task_dsend(task,connection->mailbox,task_message_free);
648 XBT_DEBUG("Send INTERESTED to %s",connection->mailbox);
651 peer->current_piece = -1;
652 peer->pieces_requested++;
655 * Send a "interested" message to a peer
656 * @param peer peer data
657 * @param mailbox destination mailbox
659 void send_interested(peer_t peer, const char *mailbox) {
660 msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id);
661 MSG_task_dsend(task,mailbox,task_message_free);
662 XBT_DEBUG("Sending INTERESTED to %s",mailbox);
666 * Send a "not interested" message to a peer
667 * @param peer peer data
668 * @param mailbox destination mailbox
670 void send_notinterested(peer_t peer, const char *mailbox) {
671 msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id);
672 MSG_task_dsend(task,mailbox,task_message_free);
673 XBT_DEBUG("Sending NOTINTERESTED to %s",mailbox);
677 * Send a handshake message to all the peers the peer has.
678 * @param peer peer data
680 void send_handshake_all(peer_t peer) {
681 connection_t remote_peer;
682 xbt_dict_cursor_t cursor=NULL;
684 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
685 msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,peer->id);
686 MSG_task_dsend(task,remote_peer->mailbox,task_message_free);
687 XBT_DEBUG("Sending a HANDSHAKE to %s",remote_peer->mailbox);
691 * Send a "handshake" message to an user
692 * @param peer peer data
693 * @param mailbox mailbox where to we send the message
695 void send_handshake(peer_t peer, const char *mailbox) {
696 msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,peer->id);
697 MSG_task_dsend(task,mailbox,task_message_free);
698 XBT_DEBUG("Sending a HANDSHAKE to %s",mailbox);
701 * Send a "choked" message to a peer.
703 void send_choked(peer_t peer, const char *mailbox) {
704 XBT_DEBUG("Sending a CHOKE to %s",mailbox);
705 msg_task_t task = task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
706 MSG_task_dsend(task,mailbox,task_message_free);
709 * Send a "unchoked" message to a peer
711 void send_unchoked(peer_t peer, const char *mailbox) {
712 XBT_DEBUG("Sending a UNCHOKE to %s",mailbox);
713 msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id);
714 MSG_task_dsend(task,mailbox,task_message_free);
717 * Send a "HAVE" message to all peers we are connected to
719 void send_have(peer_t peer, int piece) {
720 XBT_DEBUG("Sending HAVE message to all my peers");
721 connection_t remote_peer;
722 xbt_dict_cursor_t cursor= NULL;
724 xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
725 msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,peer->id,piece);
726 MSG_task_dsend(task,remote_peer->mailbox,task_message_free);
730 * Send a bitfield message to all the peers the peer has.
731 * @param peer peer data
733 void send_bitfield(peer_t peer, const char *mailbox) {
734 XBT_DEBUG("Sending a BITFIELD to %s",mailbox);
735 msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox,peer->id,peer->bitfield);
736 MSG_task_dsend(task,mailbox,task_message_free);
739 * Send a "request" message to a pair, containing a request for a piece
741 void send_request(peer_t peer, const char *mailbox, int piece, int block_index, int block_length) {
742 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)",mailbox,piece,block_index,block_length);
743 msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
744 MSG_task_dsend(task,mailbox,task_message_free);
747 * Send a "piece" message to a pair, containing a piece of the file
749 void send_piece(peer_t peer, const char *mailbox, int piece, int stalled, int block_index, int block_length) {
750 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s",piece,block_index, block_length,mailbox);
751 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
752 xbt_assert((peer->bitfield[piece] == '1'), "Tried to send a piece that we doesn't have.");
753 msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, stalled, block_index, block_length);
754 MSG_task_dsend(task,mailbox,task_message_free);
757 int in_current_pieces(peer_t peer, int piece) {
758 int is_in = 0, i, peer_piece;
759 xbt_dynar_foreach(peer->current_pieces,i, peer_piece) {
760 if (peer_piece == piece) {