Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
msg_private.h is a C++ header so add pp
[simgrid.git] / src / s4u / s4u_comm.cpp
1 /* Copyright (c) 2006-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "xbt/log.h"
8 #include "simgrid/s4u/comm.hpp"
9 #include <simgrid/s4u/Mailbox.hpp>
10 #include "../msg/msg_private.hpp"
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm,s4u_activity,"S4U asynchronous communications");
14
15 namespace simgrid {
16 namespace s4u {
17
18 Comm::~Comm() {
19
20 }
21
22
23
24 s4u::Comm &Comm::send_init(s4u::MailboxPtr chan) {
25   s4u::Comm *res = new s4u::Comm();
26   res->sender_ = SIMIX_process_self();
27   res->mailbox_ = chan;
28   return *res;
29 }
30
31 s4u::Comm &Comm::recv_init(s4u::MailboxPtr chan) {
32   s4u::Comm *res = new s4u::Comm();
33   res->receiver_ = SIMIX_process_self();
34   res->mailbox_ = chan;
35   return *res;
36 }
37
38 void Comm::setRate(double rate) {
39   xbt_assert(state_==inited);
40   rate_ = rate;
41 }
42
43 void Comm::setSrcData(void * buff) {
44   xbt_assert(state_==inited);
45   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
46   srcBuff_ = buff;
47 }
48 void Comm::setSrcDataSize(size_t size){
49   xbt_assert(state_==inited);
50   srcBuffSize_ = size;
51 }
52 void Comm::setSrcData(void * buff, size_t size) {
53   xbt_assert(state_==inited);
54
55   xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
56   srcBuff_ = buff;
57   srcBuffSize_ = size;
58 }
59 void Comm::setDstData(void ** buff) {
60   xbt_assert(state_==inited);
61   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
62   dstBuff_ = buff;
63 }
64 size_t Comm::getDstDataSize(){
65   xbt_assert(state_==finished);
66   return dstBuffSize_;
67 }
68 void Comm::setDstData(void ** buff, size_t size) {
69   xbt_assert(state_==inited);
70
71   xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
72   dstBuff_ = buff;
73   dstBuffSize_ = size;
74 }
75
76 void Comm::start() {
77   xbt_assert(state_ == inited);
78
79   if (srcBuff_ != nullptr) { // Sender side
80     pimpl_ = simcall_comm_isend(sender_, mailbox_->getImpl(), remains_, rate_,
81         srcBuff_, srcBuffSize_,
82         matchFunction_, cleanFunction_, copyDataFunction_,
83         userData_, detached_);
84   } else if (dstBuff_ != nullptr) { // Receiver side
85     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
86         matchFunction_, copyDataFunction_,
87         userData_, rate_);
88
89   } else {
90     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
91   }
92   state_ = started;
93 }
94 void Comm::wait() {
95   xbt_assert(state_ == started || state_ == inited);
96
97   if (state_ == started)
98     simcall_comm_wait(pimpl_, -1/*timeout*/);
99   else {// p_state == inited. Save a simcall and do directly a blocking send/recv
100     if (srcBuff_ != nullptr) {
101       simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
102           srcBuff_, srcBuffSize_,
103           matchFunction_, copyDataFunction_,
104           userData_, -1 /*timeout*/);
105     } else {
106       simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
107           matchFunction_, copyDataFunction_,
108           userData_, -1/*timeout*/, rate_);
109     }
110   }
111   state_ = finished;
112   delete this;
113 }
114 void Comm::wait(double timeout) {
115   xbt_assert(state_ == started || state_ == inited);
116
117   if (state_ == started) {
118     simcall_comm_wait(pimpl_, timeout);
119     state_ = finished;
120     return;
121   }
122
123   // It's not started yet. Do it in one simcall
124   if (srcBuff_ != nullptr) {
125     simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
126         srcBuff_, srcBuffSize_,
127         matchFunction_, copyDataFunction_,
128         userData_, timeout);
129   } else { // Receiver
130     simcall_comm_recv(receiver_, mailbox_->getImpl(), dstBuff_, &dstBuffSize_,
131         matchFunction_, copyDataFunction_,
132         userData_, timeout, rate_);
133   }
134   state_ = finished;
135   delete this;
136 }
137
138 s4u::Comm &Comm::send_async(MailboxPtr dest, void *data, int simulatedSize) {
139   s4u::Comm &res = s4u::Comm::send_init(dest);
140   res.setRemains(simulatedSize);
141   res.srcBuff_ = data;
142   res.srcBuffSize_ = sizeof(void*);
143   res.start();
144   return res;
145 }
146
147 s4u::Comm &Comm::recv_async(MailboxPtr dest, void **data) {
148   s4u::Comm &res = s4u::Comm::recv_init(dest);
149   res.setDstData(data);
150   res.start();
151   return res;
152 }
153
154 bool Comm::test() {
155   xbt_assert(state_ == inited || state_ == started || state_ == finished);
156   
157   if (state_ == finished) 
158     xbt_die("Don't call test on a finished comm.");
159   
160   if (state_ == inited) {
161     this->start();
162   }
163   
164   if(simcall_comm_test(pimpl_)){
165     state_ = finished;
166     delete this;
167     return true;
168   }
169   return false;
170 }
171
172 }
173 }