Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
22991a3fbb20b17cf43852f1c3bb2035730bec1b
[simgrid.git] / examples / c / app-chainsend / peer.c
1 /* Copyright (c) 2012-2020. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "chainsend.h"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");
10
11 static void peer_join_chain(peer_t p)
12 {
13   chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
14   p->prev             = msg->prev_;
15   p->next             = msg->next_;
16   p->total_pieces     = msg->num_pieces;
17   XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
18             p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
19   free(msg);
20 }
21
22 static void peer_forward_file(peer_t p)
23 {
24   void* received;
25   int done                = 0;
26   size_t nb_pending_sends = 0;
27   size_t nb_pending_recvs = 0;
28
29   while (!done) {
30     p->pending_recvs[nb_pending_recvs] = sg_mailbox_get_async(p->me, &received);
31     nb_pending_recvs++;
32
33     int idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
34     if (idx != -1) {
35       XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
36       /* move the last pending comm where the finished one was, and decrement */
37       p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
38
39       if (p->next != NULL) {
40         XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
41                   sg_mailbox_get_name(p->next));
42         sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
43         p->pending_sends[nb_pending_sends] = send;
44         nb_pending_sends++;
45       } else
46         free(received);
47
48       p->received_pieces++;
49       p->received_bytes += PIECE_SIZE;
50       XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
51       if (p->received_pieces >= p->total_pieces) {
52         done = 1;
53       }
54     }
55   }
56   sg_comm_wait_all(p->pending_sends, nb_pending_sends);
57 }
58
59 static peer_t peer_init(int argc, char* argv[])
60 {
61   peer_t p           = xbt_malloc(sizeof(s_peer_t));
62   p->prev            = NULL;
63   p->next            = NULL;
64   p->received_pieces = 0;
65   p->received_bytes  = 0;
66   p->pending_recvs   = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
67   p->pending_sends   = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
68
69   p->me = sg_mailbox_by_name(sg_host_self_get_name());
70
71   return p;
72 }
73
74 static void peer_delete(peer_t p)
75 {
76   free(p->pending_recvs);
77   free(p->pending_sends);
78
79   free(p);
80 }
81
82 void peer(int argc, char* argv[])
83 {
84   XBT_DEBUG("peer");
85
86   peer_t p          = peer_init(argc, argv);
87   double start_time = simgrid_get_clock();
88   peer_join_chain(p);
89   peer_forward_file(p);
90   double end_time = simgrid_get_clock();
91
92   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
93            p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
94
95   peer_delete(p);
96 }