Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
implement receive async
authorMartin Quinson <martin.quinson@loria.fr>
Sun, 16 Aug 2015 14:43:19 +0000 (16:43 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Sun, 16 Aug 2015 14:43:19 +0000 (16:43 +0200)
include/simgrid/s4u/comm.hpp
include/simgrid/s4u/mailbox.hpp
src/s4u/s4u_actor.cpp
src/s4u/s4u_comm.cpp

index f8cfd8f..c2d3a59 100644 (file)
@@ -30,7 +30,7 @@ public:
        /** Creates and start an async send to the mailbox #dest */
        static Comm &send_async(s4u::Actor *sender, Mailbox &dest, void *data, int simulatedByteAmount);
     /** Creates (but don't start) an async recv onto the mailbox #from */
-       //static Comm &recv_init(Mailbox &from);
+       static Comm &recv_init(Mailbox &from);
        /** Creates and start an async recv to the mailbox #from */
        //static Comm &recv_async(Mailbox &from, void *data);
 
@@ -45,6 +45,8 @@ public:
        void setRate(double rate);
 
 private:
+       void *p_dstBuff = NULL;
+       size_t p_dstBuffSize = 0;
        void *p_srcBuff = NULL;
        size_t p_srcBuffSize = sizeof(void*);
 public:
@@ -55,6 +57,14 @@ public:
        /** Specify the data to send and its size */
        void setSrcData(void * buff, size_t size);
 
+       /** Specify where to receive the data */
+       void setDstData(void ** buff);
+       /** Specify the buffer in which the data should be received */
+       void setDstData(void ** buff, size_t size);
+       /** Retrieve the size of the received data */
+       size_t getDstDataSize();
+
+
 private: /* FIXME: expose these elements in the API */
        int p_detached = 0;
     int (*p_matchFunction)(void *, void *, smx_synchro_t) = NULL;
index c519700..d98da45 100644 (file)
@@ -23,7 +23,6 @@ class Comm;
  * sender and receiver.
  */
 class Mailbox {
-       friend Actor; // FIXME: remove it when recv async exist
        friend Comm;
 
 private:
index 173d633..74264b5 100644 (file)
@@ -83,13 +83,13 @@ void s4u::Actor::execute(double flops) {
 }
 
 char *s4u::Actor::recvstr(Mailbox &chan) {
-       char *res=NULL;
-       size_t res_size=sizeof(res);
+       void *res=NULL;
 
+       Comm c = Comm::recv_init(chan);
+       c.setDstData(&res,sizeof(res));
+       c.wait();
 
-       simcall_comm_recv(chan.getInferior(),&res,&res_size,NULL,NULL,NULL,-1 /* timeout */,-1 /*rate*/);
-
-    return res;
+    return (char*)res;
 }
 void s4u::Actor::sendstr(Mailbox &chan, const char*msg) {
        Comm c = Comm::send_init(this,chan);
index 4ef20b7..44f526e 100644 (file)
@@ -24,6 +24,12 @@ s4u::Comm &s4u::Comm::send_init(s4u::Actor *sender, s4u::Mailbox &chan) {
 
        return *res;
 }
+s4u::Comm &s4u::Comm::recv_init(s4u::Mailbox &chan) {
+       s4u::Comm *res = new s4u::Comm();
+       res->p_mailbox = &chan;
+
+       return *res;
+}
 
 void s4u::Comm::setRate(double rate) {
        xbt_assert(p_state==inited);
@@ -32,6 +38,7 @@ void s4u::Comm::setRate(double rate) {
 
 void s4u::Comm::setSrcData(void * buff) {
        xbt_assert(p_state==inited);
+       xbt_assert(p_dstBuff == NULL, "Cannot set the src and dst buffers at the same time");
        p_srcBuff = buff;
 }
 void s4u::Comm::setSrcDataSize(size_t size){
@@ -41,17 +48,43 @@ void s4u::Comm::setSrcDataSize(size_t size){
 void s4u::Comm::setSrcData(void * buff, size_t size) {
        xbt_assert(p_state==inited);
 
+       xbt_assert(p_dstBuff == NULL, "Cannot set the src and dst buffers at the same time");
        p_srcBuff = buff;
        p_srcBuffSize = size;
 }
+void s4u::Comm::setDstData(void ** buff) {
+       xbt_assert(p_state==inited);
+       xbt_assert(p_srcBuff == NULL, "Cannot set the src and dst buffers at the same time");
+       p_dstBuff = buff;
+}
+size_t s4u::Comm::getDstDataSize(){
+       xbt_assert(p_state==finished);
+       return p_dstBuffSize;
+}
+void s4u::Comm::setDstData(void ** buff, size_t size) {
+       xbt_assert(p_state==inited);
+
+       xbt_assert(p_srcBuff == NULL, "Cannot set the src and dst buffers at the same time");
+       p_dstBuff = buff;
+       p_dstBuffSize = size;
+}
 
 void s4u::Comm::start() {
        xbt_assert(p_state == inited);
 
-       p_inferior = simcall_comm_isend(p_sender->getInferior(), p_mailbox->getInferior(), p_remains, p_rate,
-                       p_srcBuff, p_srcBuffSize,
-                       p_matchFunction, p_cleanFunction, p_copyDataFunction,
-                       p_userData, p_detached);
+       if (p_srcBuff != NULL) { // Sender side
+               p_inferior = simcall_comm_isend(p_sender->getInferior(), p_mailbox->getInferior(), p_remains, p_rate,
+                               p_srcBuff, p_srcBuffSize,
+                               p_matchFunction, p_cleanFunction, p_copyDataFunction,
+                               p_userData, p_detached);
+       } else if (p_dstBuff != NULL) { // Receiver side
+               p_inferior = simcall_comm_irecv(p_mailbox->getInferior(), p_dstBuff, &p_dstBuffSize,
+                               p_matchFunction, p_copyDataFunction,
+                               p_userData, p_rate);
+
+       } else {
+               xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
+       }
        p_state = started;
 }
 void s4u::Comm::wait() {
@@ -59,20 +92,41 @@ void s4u::Comm::wait() {
 
        if (p_state == started)
                simcall_comm_wait(p_inferior, -1/*timeout*/);
-       else // p_state == inited
-               /* Save a simcall and do directly a blocking send */
-               simcall_comm_send(p_sender->getInferior(), p_mailbox->getInferior(), p_remains, p_rate,
-                               p_srcBuff, p_srcBuffSize,
-                               p_matchFunction, p_copyDataFunction,
-                               p_userData, p_detached);
+       else {// p_state == inited. Save a simcall and do directly a blocking send/recv
+               if (p_srcBuff != NULL) {
+                       simcall_comm_send(p_sender->getInferior(), p_mailbox->getInferior(), p_remains, p_rate,
+                                       p_srcBuff, p_srcBuffSize,
+                                       p_matchFunction, p_copyDataFunction,
+                                       p_userData, -1 /*timeout*/);
+               } else {
+                       simcall_comm_recv(p_mailbox->getInferior(), p_dstBuff, &p_dstBuffSize,
+                                       p_matchFunction, p_copyDataFunction,
+                                       p_userData, -1/*timeout*/, p_rate);
+               }
+       }
        p_state = finished;
 }
 void s4u::Comm::wait(double timeout) {
        xbt_assert(p_state == started || p_state == inited);
 
-       if (p_state == inited)
-               start();
-       simcall_comm_wait(p_inferior, timeout);
+       if (p_state == started) {
+               simcall_comm_wait(p_inferior, timeout);
+               p_state = finished;
+               return;
+       }
+
+       // It's not started yet. Do it in one simcall
+       if (p_srcBuff != NULL) {
+               simcall_comm_send(p_sender->getInferior(), p_mailbox->getInferior(), p_remains, p_rate,
+                               p_srcBuff, p_srcBuffSize,
+                               p_matchFunction, p_copyDataFunction,
+                               p_userData, timeout);
+       } else { // Receiver
+               simcall_comm_recv(p_mailbox->getInferior(), p_dstBuff, &p_dstBuffSize,
+                               p_matchFunction, p_copyDataFunction,
+                               p_userData, timeout, p_rate);
+       }
+       p_state = finished;
 }
 
 s4u::Comm &s4u::Comm::send_async(s4u::Actor *sender, Mailbox &dest, void *data, int simulatedSize) {