Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
XBT_ATTRIB_UNUSED for unused parameters.
[simgrid.git] / examples / c / app-chainsend / peer.c
1 /* Copyright (c) 2012-2020. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "chainsend.h"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");
10
11 static void peer_join_chain(peer_t p)
12 {
13   const chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
14   p->prev                   = msg->prev_;
15   p->next                   = msg->next_;
16   p->total_pieces           = msg->num_pieces;
17   XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
18             p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
19   free(msg);
20 }
21
22 static void peer_forward_file(const peer_t p)
23 {
24   void* received;
25   int done                = 0;
26   size_t nb_pending_sends = 0;
27   size_t nb_pending_recvs = 0;
28
29   while (!done) {
30     sg_comm_t comm                     = sg_mailbox_get_async(p->me, &received);
31     p->pending_recvs[nb_pending_recvs] = comm;
32     nb_pending_recvs++;
33
34     int idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
35     if (idx != -1) {
36       comm = p->pending_recvs[idx];
37       XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
38       /* move the last pending comm where the finished one was, and decrement */
39       p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
40
41       if (p->next != NULL) {
42         XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
43                   sg_mailbox_get_name(p->next));
44         sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
45         p->pending_sends[nb_pending_sends] = send;
46         nb_pending_sends++;
47       } else
48         free(received);
49
50       p->received_pieces++;
51       p->received_bytes += PIECE_SIZE;
52       XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
53       if (p->received_pieces >= p->total_pieces) {
54         done = 1;
55       }
56     }
57   }
58   sg_comm_wait_all(p->pending_sends, nb_pending_sends);
59 }
60
61 static peer_t peer_init(XBT_ATTRIB_UNUSED int argc, XBT_ATTRIB_UNUSED char* argv[])
62 {
63   peer_t p           = (peer_t)malloc(sizeof(s_peer_t));
64   p->prev            = NULL;
65   p->next            = NULL;
66   p->received_pieces = 0;
67   p->received_bytes  = 0;
68   p->pending_recvs   = (sg_comm_t*)malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
69   p->pending_sends   = (sg_comm_t*)malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
70
71   p->me = sg_mailbox_by_name(sg_host_self_get_name());
72
73   return p;
74 }
75
76 static void peer_delete(peer_t p)
77 {
78   free(p->pending_recvs);
79   free(p->pending_sends);
80
81   free(p);
82 }
83
84 void peer(int argc, char* argv[])
85 {
86   XBT_DEBUG("peer");
87
88   peer_t p          = peer_init(argc, argv);
89   double start_time = simgrid_get_clock();
90   peer_join_chain(p);
91   peer_forward_file(p);
92   double end_time = simgrid_get_clock();
93
94   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
95            p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
96
97   peer_delete(p);
98 }