Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecate sg_comm_wait_all in C
[simgrid.git] / examples / c / app-chainsend / peer.c
1 /* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "chainsend.h"
7
8 XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");
9
10 static void peer_join_chain(peer_t p)
11 {
12   chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
13   p->prev             = msg->prev_;
14   p->next             = msg->next_;
15   p->total_pieces     = msg->num_pieces;
16   XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
17             p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
18   xbt_free(msg);
19 }
20
21 static void peer_forward_file(peer_t p)
22 {
23   void* received;
24   int done = 0;
25
26   while (!done) {
27     sg_activity_set_push(p->pending_recvs, (sg_activity_t)sg_mailbox_get_async(p->me, &received));
28
29     sg_activity_t acti = sg_activity_set_wait_any(p->pending_recvs);
30     if (acti != NULL) {
31       sg_comm_unref((sg_comm_t)acti);
32       XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
33
34       if (p->next != NULL) {
35         XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
36                   sg_mailbox_get_name(p->next));
37         sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
38         sg_activity_set_push(p->pending_sends, (sg_activity_t)send);
39       } else
40         free(received);
41
42       p->received_pieces++;
43       p->received_bytes += PIECE_SIZE;
44       XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
45       if (p->received_pieces >= p->total_pieces) {
46         done = 1;
47       }
48     }
49   }
50   sg_activity_set_wait_all(p->pending_sends);
51 }
52
53 static peer_t peer_init(int argc, char* argv[])
54 {
55   peer_t p           = xbt_malloc(sizeof(s_peer_t));
56   p->prev            = NULL;
57   p->next            = NULL;
58   p->received_pieces = 0;
59   p->received_bytes  = 0;
60   p->pending_recvs   = sg_activity_set_init();
61   p->pending_sends   = sg_activity_set_init();
62
63   p->me = sg_mailbox_by_name(sg_host_self_get_name());
64
65   return p;
66 }
67
68 static void peer_delete(peer_t p)
69 {
70   sg_activity_set_delete(p->pending_recvs);
71   sg_activity_set_delete(p->pending_sends);
72
73   xbt_free(p);
74 }
75
76 void peer(int argc, char* argv[])
77 {
78   XBT_DEBUG("peer");
79
80   peer_t p          = peer_init(argc, argv);
81   double start_time = simgrid_get_clock();
82   peer_join_chain(p);
83   peer_forward_file(p);
84   double end_time = simgrid_get_clock();
85
86   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
87            p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
88
89   peer_delete(p);
90 }