Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
This change introduce the new mailbox concept.
authorcherierm <cherierm@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 7 Jan 2008 16:36:32 +0000 (16:36 +0000)
committercherierm <cherierm@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 7 Jan 2008 16:36:32 +0000 (16:36 +0000)
A mailbox is a storage used to put or get an msg task. It is identified by an alias. For more information about the usage of the mailbox see the java examples alias0 and alias1 and the C example
masterslave_forwarder_with_alias in the directory /examples/msg/alias.

This change impacts the following files :

include/msg/msg.h Add the declaration of the functions related with the new concept of mailboxes.
src/msg/host.c Adapte the functions __MSG_host_create and __MSG_host_destroy to the new concept of mailboxes.
src/gos/gos.c Implementation of the functions related withe the new concept of mailboxes.

src/simix/smx_global.c Initialization et finalization of the mailbox module.

src/java/jmsg.c Implementation of the jni callback functions related with the mailbox concept.
src/java/jmsg.h Declaration of the jni callback functions related with the mailbox concept.

ApplicationHandler.java Implementation of the operations related with the properties of a process
Host.java Implementation of the operations related with the new concept of mailbox
MsgNative.java Implementation of the operations related with the new concept of mailbox
Process.java Implementation of the operations related with the new concept of mailbox
Task.java Implementation of the operations related with the new concept of mailbox

msg_mailbox.h Header containing the declaration of the functions related with the mailbox concept
msg_mailbox.c Source file containing the implementation of the functions related with the mailbox concept

directories alia0 et alias1 contain two examples to illustrate the different usages of the mailbox concept using the java interface
for msg.

directory alias contains the example masterslave_forwarder_with_alias which illustrate the usage of mailbox instead channel as in
the masterslave_forwarder example.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@5163 48e7efb5-ca39-0410-a469-dd3cf9ba447f

13 files changed:
build/vc7/simgrid/all.sln
build/vc7/simgrid/simgrid.vcproj
include/msg/msg.h
src/java/jmsg.c
src/java/jmsg.h
src/java/simgrid/msg/ApplicationHandler.java
src/java/simgrid/msg/Host.java
src/java/simgrid/msg/MsgNative.java
src/java/simgrid/msg/Process.java
src/java/simgrid/msg/Task.java
src/msg/gos.c
src/msg/host.c
src/simix/smx_global.c

index d577563..9552417 100644 (file)
@@ -69,6 +69,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "graphxml_usage", "..\testsu
 EndProject\r
 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "teshsuite", "teshsuite", "{C48B8100-6A37-4206-9DFF-A7216B0748F8}"\r
 EndProject\r
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "masterslave_forwarder_with_alias", "..\examples\msg\masterslave_forwarder_with_alias\masterslave_forwarder_with_alias.vcproj", "{7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}"\r
+EndProject\r
 Global\r
        GlobalSection(SolutionConfigurationPlatforms) = preSolution\r
                Debug|Win32 = Debug|Win32\r
@@ -171,15 +173,19 @@ Global
                {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Debug|Win32.Build.0 = Debug|Win32\r
                {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Release|Win32.ActiveCfg = Release|Win32\r
                {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Release|Win32.Build.0 = Release|Win32\r
+               {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Debug|Win32.ActiveCfg = Debug|Win32\r
+               {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Debug|Win32.Build.0 = Debug|Win32\r
+               {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Release|Win32.ActiveCfg = Release|Win32\r
+               {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Release|Win32.Build.0 = Release|Win32\r
        EndGlobalSection\r
        GlobalSection(SolutionProperties) = preSolution\r
                HideSolutionNode = FALSE\r
        EndGlobalSection\r
        GlobalSection(NestedProjects) = preSolution\r
+               {617987DE-296D-4E0C-BD53-8D639E6FDA09} = {B24BD5D4-9D03-4644-9792-416396F9C34E}\r
                {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} = {F356BA40-006E-4F59-AE49-E25171DF2016}\r
                {0E867394-9B56-42DA-AB3F-E59080B653A5} = {F356BA40-006E-4F59-AE49-E25171DF2016}\r
                {65F6A759-3E01-4265-AB50-71EF60AD126C} = {F356BA40-006E-4F59-AE49-E25171DF2016}\r
-               {4A8A0642-2553-4BB1-ADB2-264F2E799EF9} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {D5E757F9-B469-488D-BBA4-C04B167961BD} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {EEB461EE-0D1C-4D77-8010-3FB5897ED347} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {0247C196-B310-4A81-AF0F-B65C8632F69C} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
@@ -187,6 +193,8 @@ Global
                {9A8719E2-B66F-4B33-854B-6F6D226DE0BC} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {D8A5626B-3188-4F74-95F0-732F84F53A1C} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {FD995103-FD5C-4A5B-95E6-AF512BC6D749} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
+               {4A8A0642-2553-4BB1-ADB2-264F2E799EF9} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
+               {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD}\r
                {F2CCCCB9-0342-44E1-B1E3-53E6103EC294} = {0E867394-9B56-42DA-AB3F-E59080B653A5}\r
                {DFDB47E0-83D2-41F1-AF4B-DC5EA312A08C} = {0E867394-9B56-42DA-AB3F-E59080B653A5}\r
                {A95FC8DF-52C7-45FD-B7D1-D93081DE6BB1} = {0E867394-9B56-42DA-AB3F-E59080B653A5}\r
@@ -201,7 +209,6 @@ Global
                {8147D9B1-528F-4D2F-BA5D-17793CDBFBBB} = {CEF665B7-5EB9-4055-993B-32F7B50D87A1}\r
                {5DD95E75-D229-4136-A2ED-216D23749A7B} = {CEF665B7-5EB9-4055-993B-32F7B50D87A1}\r
                {9EF11810-1CAF-4588-A390-D51E49BAED35} = {536BA0EC-107D-467E-BFF8-D7BA634023BC}\r
-               {617987DE-296D-4E0C-BD53-8D639E6FDA09} = {B24BD5D4-9D03-4644-9792-416396F9C34E}\r
                {63A06558-AAFC-491F-A294-3DA98A20EDA9} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A}\r
                {DF89E558-10A4-42E8-B438-4C60904F87F4} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A}\r
                {78D7DC8F-D24B-4160-95F5-7101C17A58BA} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A}\r
index bf94a65..ad9e0df 100644 (file)
                                RelativePath="..\..\..\src\msg\msg_config.c"\r
                                >\r
                        </File>\r
+                       <File\r
+                               RelativePath="..\..\..\src\msg\msg_mailbox.c"\r
+                               >\r
+                       </File>\r
                        <File\r
                                RelativePath="..\..\..\src\surf\network.c"\r
                                >\r
                                RelativePath="..\..\..\src\surf\workstation.c"\r
                                >\r
                        </File>\r
-                       <File\r
-                               RelativePath="..\..\..\src\surf\workstation_KCCFLN05.c"\r
-                               >\r
-                       </File>\r
                        <File\r
                                RelativePath="..\..\..\src\surf\workstation_ptask_L07.c"\r
                                >\r
index 1424a4c..cf80731 100644 (file)
@@ -132,5 +132,36 @@ XBT_PUBLIC(double) MSG_task_get_compute_duration(m_task_t task);
 XBT_PUBLIC(double) MSG_task_get_remaining_computation(m_task_t task);
 XBT_PUBLIC(double) MSG_task_get_data_size(m_task_t task);
 
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_receive(m_task_t * task, const char* alias);
+
+XBT_PUBLIC(int) 
+MSG_task_listen(const char* alias);
+
+XBT_PUBLIC(int) 
+MSG_task_listen_from_host(const char* alias, m_host_t host);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_alias_select_from(const char* alias, double timeout, int* PID);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_send(m_task_t task,const char* alias);
+
+XBT_PUBLIC(MSG_error_t) 
+MSG_task_send_bounded(m_task_t task, const char* alias, double rate);
+
+XBT_PUBLIC(int)
+MSG_task_listen_from(const char* alias);
+
 SG_END_DECL()
 #endif
index 5239f29..f0aae36 100644 (file)
@@ -23,6 +23,8 @@
 
 #include "jmsg.h"
 
+#include "msg/msg_mailbox.h"
+
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
 
 static JavaVM * __java_vm = NULL;
@@ -61,7 +63,8 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce
   jstring jname;               /* the name of the java process instance                */
   const char* name;            /* the C name of the process                            */
   m_process_t process;         /* the native process to create                         */
-
+  char alias[MAX_ALIAS_NAME + 1] = {0};
+  msg_mailbox_t mailbox;
 
   DEBUG4("Java_simgrid_msg_MsgNative_processCreate(env=%p,cls=%p,jproc=%p,jhost=%p)",
         env,cls,jprocess_arg,jhost);
@@ -96,6 +99,8 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce
   (*env)->ReleaseStringUTFChars(env, jname, name);
        
   process->simdata->m_host = jhost_get_native(env,jhost);
+
+
   if( ! (process->simdata->m_host) ) { /* not binded */
     free(process->simdata);
     free(process->data);
@@ -132,6 +137,12 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce
     
   /* add the process to the list of the processes of the simulation */
   xbt_fifo_unshift(msg_global->process_list, process);
+
+  sprintf(alias,"%s:%s",(process->simdata->m_host->simdata->smx_host)->name,process->name);
+  
+  mailbox = MSG_mailbox_new(alias);
+  MSG_mailbox_set_hostname(mailbox, process->simdata->m_host->simdata->smx_host->name);
+
        
 }
 
@@ -733,6 +744,7 @@ Java_simgrid_msg_MsgNative_taskGet(JNIEnv* env, jclass cls,
   return (jobject)task->data;
 }
 
+
 JNIEXPORT jboolean JNICALL 
 Java_simgrid_msg_MsgNative_taskProbe(JNIEnv* env, jclass cls, jint chan_id) {
   return (jboolean)MSG_task_Iprobe(chan_id);
@@ -786,6 +798,8 @@ Java_simgrid_msg_MsgNative_hostPut(JNIEnv* env, jclass cls,
     jxbt_throw_native(env, xbt_strdup("MSG_task_put_with_timeout() failed"));
 }
 
+
+
 JNIEXPORT void JNICALL 
 Java_simgrid_msg_MsgNative_hostPutBounded(JNIEnv* env, jclass cls, 
                                    jobject jhost, jint chan_id, jobject jtask, 
@@ -987,4 +1001,135 @@ Java_simgrid_msg_MsgNative_selectContextFactory(JNIEnv * env, jclass class,jstri
        if(rv)
                jxbt_throw_native(env, xbt_strdup("xbt_select_context_factory() failed"));       
 }
+
+JNIEXPORT void JNICALL 
+Java_simgrid_msg_MsgNative_taskSend(JNIEnv* env, jclass cls, 
+                            jstring jalias, jobject jtask, 
+                            jdouble jtimeout) {
+       
+       MSG_error_t rv;
+       const char* alias = (*env)->GetStringUTFChars(env, jalias, 0);
+
+       m_task_t task = jtask_to_native_task(jtask,env);
+
+       
+       if(!task){
+               (*env)->ReleaseStringUTFChars(env, jalias, alias);
+               jxbt_throw_notbound(env,"task",jtask);
+               return;
+       }
+
+       rv = MSG_task_send_with_timeout(task,alias,(double)jtimeout);
+
+       (*env)->ReleaseStringUTFChars(env, jalias, alias);
+
+       if(MSG_OK != rv)
+               jxbt_throw_native(env, xbt_strdup("MSG_task_send_with_timeout() failed"));
+
+}
+
+JNIEXPORT void JNICALL 
+Java_simgrid_msg_MsgNative_taskSendBounded(JNIEnv* env, jclass cls, 
+                                   jstring jalias, jobject jtask, 
+                                   jdouble jmaxRate) {
+  m_task_t task = jtask_to_native_task(jtask,env);
+  MSG_error_t rv;
+  const char* alias;
+
+  if(!task){
+    jxbt_throw_notbound(env,"task",jtask);
+    return;
+  }
+  
+  alias = (*env)->GetStringUTFChars(env, jalias, 0);
+       
+  rv = MSG_task_send_bounded(task,alias,(double)jmaxRate);
+  
+  (*env)->ReleaseStringUTFChars(env, jalias, alias);
+   
+  if(MSG_OK != rv)
+    jxbt_throw_native(env, xbt_strdup("MSG_task_send_bounded() failed"));
+}
+
+JNIEXPORT jobject JNICALL 
+Java_simgrid_msg_MsgNative_taskReceive(JNIEnv* env, jclass cls, 
+                            jstring jalias, jdouble jtimeout, jobject jhost) {
+       MSG_error_t rv;
+       m_task_t task = NULL;
+       m_host_t host = NULL;
+       const char* alias;
+
+       if (jhost) {
+               host = jhost_get_native(env,jhost);
+               
+               if(!host){
+                       jxbt_throw_notbound(env,"host",jhost);
+                       return NULL;
+               }  
+       } 
+
+       alias = (*env)->GetStringUTFChars(env, jalias, 0);
+
+       rv = MSG_task_receive_ext(&task,alias,(double)jtimeout,host);   
+
+       (*env)->ReleaseStringUTFChars(env, jalias, alias);
+       
+       if (MSG_OK != rv) 
+       {
+               jxbt_throw_native(env, xbt_strdup("MSG_task_receive_ext() failed"));
+               return NULL;
+       }
+
+       return (jobject)task->data;
+}
+
+JNIEXPORT jboolean JNICALL 
+Java_simgrid_msg_MsgNative_taskListen(JNIEnv* env, jclass cls, jstring jalias) {
+       
+  const char* alias;
+  int rv;
+  
+  alias = (*env)->GetStringUTFChars(env, jalias, 0);
+  
+  rv = MSG_task_listen(alias);
+  
+  (*env)->ReleaseStringUTFChars(env, jalias, alias);
+  
+  return (jboolean)rv;
+}
+
+JNIEXPORT jint JNICALL 
+Java_simgrid_msg_MsgNative_taskListenFromHost(JNIEnv* env, jclass cls, jstring jalias, jobject jhost) {
+  
+  int rv;
+  const char* alias;
+  
+  m_host_t host = jhost_get_native(env,jhost);
+
+  if(!host){
+    jxbt_throw_notbound(env,"host",jhost);
+    return -1;
+  }
+  
+  alias = (*env)->GetStringUTFChars(env, jalias, 0);
+
+  rv = MSG_task_listen_from_host(alias,host);
+  
+  (*env)->ReleaseStringUTFChars(env, jalias, alias);
+  
+  return (jint)rv;
+}
+
+JNIEXPORT jint JNICALL 
+Java_simgrid_msg_MsgNative_taskListenFrom(JNIEnv* env, jclass cls, jstring jalias) {
+  
+  int rv;
+  const char* alias = (*env)->GetStringUTFChars(env, jalias, 0);
+
+  rv = MSG_task_listen_from(alias);
+  
+  (*env)->ReleaseStringUTFChars(env, jalias, alias);
+  
+  return (jint)rv;
+}
   
index 0ee91bb..93f4ee6 100644 (file)
@@ -277,6 +277,10 @@ JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_taskExecute
 JNIEXPORT jobject JNICALL Java_simgrid_msg_MsgNative_taskGet
   (JNIEnv *, jclass, jint, jdouble, jobject);
 
+JNIEXPORT jobject JNICALL 
+Java_simgrid_msg_MsgNative_taskReceive
+  (JNIEnv *, jclass, jstring, jdouble, jobject);
+
 /*
  * Class               simgrid_msg_Msg
  * Method              taskHasPendingCommunication
@@ -309,6 +313,10 @@ JNIEXPORT jint JNICALL Java_simgrid_msg_MsgNative_taskProbeHost
 JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_hostPut
   (JNIEnv *, jclass, jobject, jint, jobject, jdouble);
 
+JNIEXPORT void JNICALL 
+Java_simgrid_msg_MsgNative_taskSend
+  (JNIEnv *, jclass, jstring, jobject, jdouble);
+
 /*
  * Class               simgrid_msg_Msg
  * Method              hostPutBounded
@@ -370,4 +378,16 @@ Java_simgrid_msg_Msg_createEnvironment(JNIEnv* env, jclass cls,jstring jplatform
 JNIEXPORT void JNICALL 
 Java_simgrid_msg_MsgNative_selectContextFactory(JNIEnv *, jclass, jstring);
 
+JNIEXPORT void JNICALL 
+Java_simgrid_msg_MsgNative_taskSendBounded(JNIEnv*, jclass, jstring, jobject, jdouble);
+
+JNIEXPORT jboolean JNICALL 
+Java_simgrid_msg_MsgNative_taskListen(JNIEnv*, jclass, jstring);
+
+JNIEXPORT jint JNICALL 
+Java_simgrid_msg_MsgNative_taskListenFromHost(JNIEnv*, jclass, jstring, jobject);
+
+JNIEXPORT jint JNICALL 
+Java_simgrid_msg_MsgNative_taskListenFrom(JNIEnv*, jclass, jstring);
+
 #endif /* !MSG4JAVA_H */ 
index 14d5e57..fd59889 100644 (file)
@@ -12,6 +12,7 @@
 package simgrid.msg;
 
 import java.util.Vector;
+import java.util.Hashtable;
 import org.xml.sax.*;
 import org.xml.sax.helpers.*;
 
@@ -39,6 +40,8 @@ public final class ApplicationHandler extends DefaultHandler {
                 * of the process object.
                 */
     public Vector args;
+    
+    public Hashtable properties;
 
                 /**
                 * The name of the host of the process.
@@ -49,12 +52,14 @@ public final class ApplicationHandler extends DefaultHandler {
                 * The function of the process.
                 */
     private String function;
+    
 
                 /**
                 * Default constructor.
                 */
     public ProcessFactory() {
       this.args = new Vector();
+      this.properties = new Hashtable();
       this.hostName = null;
       this.function = null;
     }
@@ -72,8 +77,11 @@ public final class ApplicationHandler extends DefaultHandler {
       this.hostName = hostName;
       this.function = function;
 
-      if (!args.isEmpty())
-        args.clear();
+      if (!this.args.isEmpty())
+        this.args.clear();
+        
+      if(!this.properties.isEmpty())
+       this.properties.clear();
     }
                 /**
                 * This method is called by the startElement() handler.
@@ -85,6 +93,16 @@ public final class ApplicationHandler extends DefaultHandler {
                 */ public void registerProcessArg(String arg) {
       this.args.add(arg);
     }
+    
+    public void setProperty(String id, String value)
+    {
+       this.properties.put(id,value);  
+    }
+    
+    public String getHostName()
+    {
+       return this.hostName;
+    }
 
     public void createProcess() {
       try {
@@ -92,7 +110,7 @@ public final class ApplicationHandler extends DefaultHandler {
         Class cls = Class.forName(this.function);
 
         simgrid.msg.Process process = (simgrid.msg.Process) cls.newInstance();
-        process.name = process.getName();       //this.function;
+        process.name = /*process.getName();*/       this.function;
         process.id = simgrid.msg.Process.nextProcessId++;
         Host host = Host.getByName(this.hostName);
 
@@ -102,6 +120,9 @@ public final class ApplicationHandler extends DefaultHandler {
 
         for (int index = 0; index < size; index++)
           process.args.add(args.get(index));
+          
+        process.properties = this.properties;
+        this.properties = new Hashtable();
 
       } catch(JniException e) {
         System.out.println(e.toString());
@@ -159,6 +180,8 @@ public final class ApplicationHandler extends DefaultHandler {
                            Attributes attr) {
     if (localName.equals("process"))
       onProcessIdentity(attr);
+    else if(localName.equals("prop"))
+      onProperty(attr);
     else if (localName.equals("argument"))
       onProcessArg(attr);
   }
@@ -169,6 +192,10 @@ public final class ApplicationHandler extends DefaultHandler {
   public void onProcessIdentity(Attributes attr) {
     processFactory.setProcessIdentity(attr.getValue(0), attr.getValue(1));
   }
+  
+  public void onProperty(Attributes attr) {
+    processFactory.setProperty(attr.getValue(0), attr.getValue(1));
+  }
 
         /**
      * process arguments handler.
index 23094dd..69737f2 100644 (file)
@@ -190,10 +190,48 @@ try {
                     double timeout) throws JniException, NativeException {
     MsgNative.hostPut(this, channel, task, timeout);
   } 
+  
+  
    /** Send the given task to the given channel of the host (capping the emision rate to #maxrate) */ 
    
     public void putBounded(int channel, Task task,
                            double maxrate) throws JniException,
     NativeException {
     MsgNative.hostPutBounded(this, channel, task, maxrate);
-} } 
+       }
+       
+        /** Send the given task to mailbox identified by the default alias */ 
+       public void send(Task task) throws JniException, NativeException  {
+               String alias = this.getName() + ":" + Process.currentProcess().msgName();       
+       MsgNative.taskSend(alias, task, -1);
+       } 
+       
+       /** Send the given task to the mailbox associated with the specified alias */ 
+   
+    public void send(String alias, Task task) throws JniException, NativeException {
+       MsgNative.taskSend(alias, task, -1);
+       }
+       
+        /** Send the given task in the mailbox associated with the alias of the current host (waiting at most #timeout seconds) */
+       public void send(Task task, double timeout) throws JniException, NativeException {
+       String alias = this.getName() + ":" + Process.currentProcess().msgName();
+       MsgNative.taskSend(alias, task, timeout);
+       }
+       
+        /** Send the given task to mailbox associated with the specified alias (waiting at most #timeout seconds) */
+    public void send(String alias, Task task, double timeout) throws JniException, NativeException {
+       MsgNative.taskSend(alias, task, timeout);
+       }
+       
+        /** Send the given task to the mailbox associated with the default alias (capping the emision rate to #maxrate) */ 
+       public void sendBounded(Task task, double maxrate) throws JniException, NativeException {
+       String alias = this.getName() + ":" + Process.currentProcess().msgName();
+                       
+       MsgNative.taskSendBounded(alias, task, maxrate);
+       }  
+       
+        /** Send the given task to the mailbox associated with the specified alias (capping the emision rate to #maxrate) */
+       public void sendBounded(String alias, Task task, double maxrate) throws JniException, NativeException {
+       MsgNative.taskSendBounded(alias, task, maxrate);
+       } 
+} 
index 26d2a97..1cb47da 100644 (file)
@@ -529,6 +529,30 @@ final class MsgNative {
   final static native Task taskGet(int channel, double timeout,
                                    Host host) throws JniException,
     NativeException;
+    
+  
+  /******************************************************************
+     * Task methods relative with the alias                           *
+     ******************************************************************/
+  
+  
+  final static native void taskSend(String alias, Task task, double timeout) 
+  throws JniException, NativeException;
+  
+  
+  final static native Task taskReceive(String alias, double timeout, Host host) 
+  throws JniException, NativeException;
+  
+  final static native int taskListenFrom(String alias) 
+  throws JniException, NativeException;
+  
+  
+  final static native boolean taskListen(String alias) 
+  throws JniException;
+  
+  final static native int taskListenFromHost(String alias, Host host) 
+  throws JniException;
+    
 
 
     /**
@@ -574,6 +598,7 @@ final class MsgNative {
      */
   final static native int taskProbeHost(int channel,
                                         Host host) throws JniException;
+                                        
 
     /******************************************************************
      * Task emission methods                                          *
@@ -614,5 +639,20 @@ final class MsgNative {
   final static native void hostPutBounded(Host host, int channel, Task task,
                                           double max_rate) throws
     JniException, NativeException;
+    
+   /**
+     * The natively implemented method to send a task in a mailbox associated with an alias,  with a bounded transmition
+     * rate.
+     * 
+     * @param alias            The alias of the mailbox.
+     * @param task            The task to put.
+     * @param max_rate        The bounded transmition rate.
+     *
+     * @exception                InvalidTaskException if the task is not valid.
+     *                        InvalidHostException if the host is not valid.
+     *                        MsgException if the operation failed.
+     */ 
+  final static native void taskSendBounded(String alias, Task task, double maxrate) 
+  throws JniException, NativeException;
 
 }
index c678e65..04f26b9 100644 (file)
@@ -68,6 +68,8 @@ public abstract class Process extends Thread {
      * The native functions use this identifier to synchronize the process.
      */
   public long id;
+  
+  public Hashtable properties;
 
     /**
      * The name of the process.                                                        
@@ -92,6 +94,7 @@ public abstract class Process extends Thread {
     this.name = null;
     this.bind = 0;
     this.args = new Vector();
+    this.properties = null;
     schedBegin = new Sem(0);
     schedEnd = new Sem(0);
   }
@@ -165,7 +168,8 @@ public abstract class Process extends Thread {
     if (name == null)
       throw new NullPointerException("Process name cannot be NULL");
 
-
+         this.properties = null;
+         
       this.args = new Vector();
 
     if (null != args)
@@ -179,6 +183,8 @@ public abstract class Process extends Thread {
 
       MsgNative.processCreate(this, host);
   }
+  
     /**
      * This method kills all running process of the simulation.
      *
@@ -390,32 +396,118 @@ public abstract class Process extends Thread {
     } catch(InterruptedException e) {
     }
   }
-
-
-   /** Send the given task to given host on given channel */
-  public void taskSend(Host host, int channel,
+  
+  /** Send the given task to given host on given channel */
+  public void taskPut(Host host, int channel,
                        Task task) throws NativeException, JniException {
     MsgNative.hostPut(host, channel, task, -1);
   }
+  
+   /** Send the given task to given host on given channel (waiting at most given time)*/
+   public void taskPut(Host host, int channel,
+                       Task task, double timeout) throws NativeException, JniException {
+    MsgNative.hostPut(host, channel, task, timeout);
+  }
    /** Receive a task on given channel */
-    public Task taskReceive(int channel) throws NativeException,
+    public Task taskGet(int channel) throws NativeException,
     JniException {
     return MsgNative.taskGet(channel, -1, null);
   }
    /** Receive a task on given channel (waiting at most given time) */
-    public Task taskReceive(int channel,
+    public Task taskGet(int channel,
                             double timeout) throws NativeException,
     JniException {
     return MsgNative.taskGet(channel, timeout, null);
   }
    /** Receive a task on given channel from given sender */
-    public Task taskReceive(int channel, Host host) throws NativeException,
+    public Task taskGet(int channel, Host host) throws NativeException,
     JniException {
     return MsgNative.taskGet(channel, -1, host);
   }
    /** Receive a task on given channel from given sender (waiting at most given time) */
-    public Task taskReceive(int channel, double timeout,
+    public Task taskGet(int channel, double timeout,
                             Host host) throws NativeException, JniException {
     return MsgNative.taskGet(channel, timeout, host);
   }
+  
+  /** Send the given task in the mailbox associated with the specified alias  (waiting at most given time) */
+  public void taskSend(String alias,
+                       Task task, double timeout) throws NativeException, JniException {
+    MsgNative.taskSend(alias, task, timeout);
+  }
+  
+  /** Send the given task in the mailbox associated with the specified alias*/
+  public void taskSend(String alias,
+                       Task task) throws NativeException, JniException {
+    MsgNative.taskSend(alias, task, -1);
+  }
+  
+  /** Send the given task in the mailbox associated with the default alias  (defaultAlias = "hostName:processName") */
+  public void taskSend(Task task) throws NativeException, JniException {
+       
+       String alias = Host.currentHost().getName() + ":" + this.msgName();
+    MsgNative.taskSend(alias, task, -1);
+  }
+  
+  /** Send the given task in the mailbox associated with the default alias (waiting at most given time) */
+  public void taskSend(Task task, double timeout) throws NativeException, JniException {
+       
+       String alias = Host.currentHost().getName() + ":" + this.msgName();
+    MsgNative.taskSend(alias, task, timeout);
+  }
+  
+  
+   /** Receive a task on mailbox associated with the specified alias */
+    public Task taskReceive(String alias) throws NativeException,
+    JniException {
+    return MsgNative.taskReceive(alias, -1.0, null);
+  }
+  
+  /** Receive a task on mailbox associated with the default alias */
+   public Task taskReceive() throws NativeException,
+    JniException {
+    String alias = Host.currentHost().getName() + ":" + this.msgName();
+    return MsgNative.taskReceive(alias, -1.0, null);
+  }
+  
+  /** Receive a task on mailbox associated with the specified alias (waiting at most given time) */
+    public Task taskReceive(String alias,
+                            double timeout) throws NativeException,
+    JniException {
+    return MsgNative.taskReceive(alias, timeout, null);
+  }
+  
+  /** Receive a task on mailbox associated with the default alias (waiting at most given time) */
+   public Task taskReceive(double timeout) throws NativeException,
+    JniException {
+    String alias = Host.currentHost().getName() + ":" + this.msgName();
+    return MsgNative.taskReceive(alias, timeout, null);
+  }
+  
+   /** Receive a task on mailbox associated with the specified alias from given sender */
+    public Task taskReceive(String alias,
+                            double timeout, Host host) throws NativeException,
+    JniException {
+    return MsgNative.taskReceive(alias, timeout, host);
+  }
+  
+  /** Receive a task on mailbox associated with the default alias from given sender  (waiting at most given time) */
+  public Task taskReceive(double timeout, Host host) throws NativeException,
+    JniException {
+    String alias = Host.currentHost().getName() + ":" + this.msgName();
+    return MsgNative.taskReceive(alias, timeout, host);
+  }
+  
+   /** Receive a task on mailbox associated with the specified alias from given sender*/
+     public Task taskReceive(String alias,
+                            Host host) throws NativeException,
+    JniException {
+    return MsgNative.taskReceive(alias, -1.0, host);
+  }
+   /** Receive a task on mailbox associated with the default alias from given sender */
+   public Task taskReceive( Host host) throws NativeException,
+    JniException {
+       String alias = Host.currentHost().getName() + ":" + this.msgName();
+    return MsgNative.taskReceive(alias, -1.0, host);
+  }
 }
index 129f5f3..584fe57 100644 (file)
@@ -176,6 +176,7 @@ public class Task {
                               Host host) throws JniException, NativeException {
     return MsgNative.taskGet(channel, timeout, host);
   }
+  
    /**
     * Probes whether there is a waiting task on the given channel of local host
     *
@@ -191,6 +192,7 @@ public class Task {
     */ public static int probe(int channel, Host host) throws JniException {
     return MsgNative.taskProbeHost(channel, host);
   }
+  
   /* *                     * *
    * * Computation-related * *
          * *                     * *//**
@@ -219,4 +221,189 @@ public class Task {
         */ protected void finalize() throws JniException, NativeException {
     if (this.bind != 0)
       MsgNative.taskDestroy(this);
-}}
+       }
+       
+       /**
+    * Send the task on the mailbox identified by the default alias (defaultAlias = "currentHostName:CurrentProcessName")
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void send() throws JniException,NativeException {
+       
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+                       
+               MsgNative.taskSend(alias, this, -1);
+       } 
+       
+       /**
+    * Send the task on the mailbox identified by the specified alias 
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void send(String alias) throws JniException,NativeException {
+               MsgNative.taskSend(alias, this, -1);
+       } 
+       
+       /**
+    * Send the task on the mailbox identified by the default alias  (wait at most #timeout seconds)
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void send(double timeout) throws JniException,NativeException {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               MsgNative.taskSend(alias, this, timeout);
+       } 
+       
+       /**
+    * Send the task on the mailbox identified by the specified alias  (wait at most #timeout seconds)
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void send(String alias, double timeout) throws JniException,NativeException {
+               MsgNative.taskSend(alias, this, timeout);
+       } 
+       
+       
+       /**
+    * Send the task on the mailbox identified by the default alias  (capping the emision rate to #maxrate) 
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void sendBounded(double maxrate) throws JniException,NativeException {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               MsgNative.taskSendBounded(alias, this, maxrate);
+       } 
+       
+       /**
+    * Send the task on the mailbox identified by the specified alias  (capping the emision rate to #maxrate) 
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public void sendBounded(String alias, double maxrate) throws JniException,NativeException {
+               MsgNative.taskSendBounded(alias, this, maxrate);
+       } 
+       
+       /**
+    * Retrieves next task from the mailbox identified by the default alias (defaultAlias = "currentHostName:CurrentProcessName")
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public static Task receive() throws JniException, NativeException {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               return MsgNative.taskReceive(alias, -1.0, null);
+       }
+       
+       /**
+    * Retrieves next task from the mailbox identified by the specified alias
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       
+       public static Task receive(String alias) throws JniException, NativeException {
+               return MsgNative.taskReceive(alias, -1.0, null);
+       }
+       
+       /**
+    * Retrieves next task on the mailbox identified by the specified alias (wait at most #timeout seconds)
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public static Task receive(String alias, double timeout) throws JniException, NativeException {
+               return MsgNative.taskReceive(alias, timeout, null);
+       }
+       
+       /**
+    * Retrieves next task sended by a given host on the mailbox identified by the specified alias 
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       
+       public static Task receive(String alias, Host host) throws JniException, NativeException {
+               return MsgNative.taskReceive(alias, -1.0, host);
+       }
+       
+       /**
+    * Retrieves next task sended by a given host on the mailbox identified by the specified alias (wait at most #timeout seconds)
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */
+       public static Task receive(String alias, double timeout, Host host) throws JniException, NativeException {
+               return MsgNative.taskReceive(alias, timeout, host);
+       }
+       
+       /**
+    * Listen whether there is a waiting task on the mailbox identified by the default alias of local host
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static boolean listen() throws JniException, NativeException {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               
+               return MsgNative.taskListen(alias);
+       }
+       
+       /**
+    * Test whether there is a pending communication on the mailbox identified by the specified alias, and who sent it
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static int listenFrom(String alias) throws JniException, NativeException  {
+               return MsgNative.taskListenFrom(alias);
+       }
+       
+       /**
+    * Test whether there is a pending communication on the mailbox identified by the default alias, of the current host, and who sent it
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static int listenFrom() throws JniException, NativeException {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               
+               return MsgNative.taskListenFrom(alias);
+       }
+       
+       /**
+    * Listen whether there is a waiting task on the mailbox identified by the specified alias
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static boolean listen(String alias) throws JniException, NativeException  {
+               return MsgNative.taskListen(alias);
+       }
+       
+        /**
+    * Counts the number of tasks waiting to be received on the #mailbox identified by the specified alias and sended by the current host.
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static int listenFromHost(Host host) throws JniException, NativeException  {
+               String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName();
+               return MsgNative.taskListenFromHost(alias,host);
+       }
+       
+       /**
+    * Counts the number of tasks waiting to be received on the #mailbox identified by the specified alia and sended by the specified #host.
+    *
+    * @exception  JniException if the binding mecanism fails.
+    * @exception  NativeException if the retrival fails.
+    */ 
+       public static int listenFromHost(String alias, Host host) throws JniException, NativeException  {
+               return MsgNative.taskListenFromHost(alias, host);
+       }
+}
index 82ce8e6..8297d8f 100644 (file)
@@ -10,6 +10,8 @@
 #include "msg/private.h"
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
+#include "msg_mailbox.h"
+
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
                                "Logging specific to MSG (gos)");
@@ -828,3 +830,455 @@ MSG_error_t MSG_get_errno(void)
 {
   return PROCESS_GET_ERRNO();
 }
+
+MSG_error_t 
+MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host)
+{
+       m_process_t process = MSG_process_self();
+       m_task_t t = NULL;
+       m_host_t h = NULL;
+       simdata_task_t t_simdata = NULL;
+       simdata_host_t h_simdata = NULL;
+       int first_time = 1;
+       xbt_fifo_item_t item = NULL;
+       
+       smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
+       
+       /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
+       msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
+       
+       CHECK_HOST();
+       
+       /* Sanity check */
+       xbt_assert0(task, "Null pointer for the task storage");
+       
+       if (*task)
+               CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
+       
+       /* Get the task */
+       h = MSG_host_self();
+       h_simdata = h->simdata;
+       
+       DEBUG2("Waiting for a task on channel aliased by %s (%s)", alias, h->name);
+       
+       SIMIX_mutex_lock(h->simdata->mutex);
+       
+       while (1) 
+       {
+               /* if the mailbox is empty (has no task */
+               if(!MSG_mailbox_is_empty(mailbox))
+               {
+                       if(!host) 
+                       {
+                               /* pop the head of the mailbox */
+                               t = MSG_mailbox_pop_head(mailbox);
+                               break;
+                       } 
+                       else 
+                       {
+                               /* get the first task of the host */
+                               if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))
+                                       break;
+                       }
+               }
+       
+               if(timeout > 0) 
+               {
+                       if (!first_time) 
+                       {
+                               SIMIX_mutex_unlock(h->simdata->mutex);
+                               /* set the simix condition of the mailbox to NULL */
+                               MSG_mailbox_set_cond(mailbox, NULL);
+                               SIMIX_cond_destroy(cond);
+                               MSG_RETURN(MSG_TRANSFER_FAILURE);
+                       }
+               }
+               
+               xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel aliased by %s", alias);
+               
+               cond = SIMIX_cond_init();
+               
+               /* set the condition of the mailbox */
+               MSG_mailbox_set_cond(mailbox, cond);
+               
+               if (timeout > 0)
+                       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);
+               else 
+                       SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);
+                       
+       
+               if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
+                       MSG_RETURN(MSG_HOST_FAILURE);
+       
+               first_time = 0;
+       }
+       
+       SIMIX_mutex_unlock(h->simdata->mutex);
+       
+       DEBUG1("OK, got a task (%s)", t->name);
+       /* clean conditional */
+       if (cond) 
+       {
+               SIMIX_cond_destroy(cond);
+               
+               MSG_mailbox_set_cond(mailbox,NULL);
+       }
+       
+       t_simdata = t->simdata;
+       t_simdata->receiver = process;
+       *task = t;
+       
+       SIMIX_mutex_lock(t_simdata->mutex);
+       
+       /* Transfer */
+       /* create SIMIX action to the communication */
+       t_simdata->comm = SIMIX_action_communicate(
+                                                                                               t_simdata->sender->simdata->m_host->simdata->smx_host,
+                                                                                               process->simdata->m_host->simdata->smx_host,
+                                                                                               t->name, 
+                                                                                               t_simdata->message_size,
+                                                                                               t_simdata->rate
+                                                                                       );
+       
+       /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
+       if (MSG_process_is_suspended(t_simdata->sender)) 
+       {
+               DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
+               SIMIX_action_set_priority(t_simdata->comm, 0);
+       }
+       
+       process->simdata->waiting_task = t;
+       SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
+       
+       while (1) 
+       {
+               SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
+               
+               if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
+                       break;
+       }
+       
+       SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
+               process->simdata->waiting_task = NULL;
+       
+       /* the task has already finished and the pointer must be null */
+       if (t->simdata->sender) 
+       {
+               t->simdata->sender->simdata->waiting_task = NULL;
+       }
+       
+       /* for this process, don't need to change in get function */
+       t->simdata->receiver = NULL;
+       SIMIX_mutex_unlock(t_simdata->mutex);
+       
+       
+       if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) 
+       {
+               SIMIX_action_destroy(t_simdata->comm);
+               t_simdata->comm = NULL;
+               t_simdata->using--;
+               MSG_RETURN(MSG_OK);
+       } 
+       else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
+       {
+               SIMIX_action_destroy(t_simdata->comm);
+               t_simdata->comm = NULL;
+               t_simdata->using--;
+               MSG_RETURN(MSG_HOST_FAILURE);
+       } 
+       else 
+       {
+               SIMIX_action_destroy(t_simdata->comm);
+               t_simdata->comm = NULL;
+               t_simdata->using--;
+               MSG_RETURN(MSG_TRANSFER_FAILURE);
+       }
+}
+
+MSG_error_t 
+MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout)
+{
+       return MSG_task_receive_ext(task, alias, timeout, NULL);                
+}
+                            
+MSG_error_t 
+MSG_task_receive(m_task_t * task, const char* alias)
+{
+       return MSG_task_receive_with_time_out(task, alias, -1); 
+}
+
+int 
+MSG_task_listen(const char* alias)
+{
+       CHECK_HOST();
+       
+       DEBUG2("Probing on channel aliased by %s (%s)", alias, MSG_host_self()->name);
+       
+       return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
+}
+
+int 
+MSG_task_listen_from_host(const char* alias, m_host_t host)
+{
+
+       return MSG_mailbox_get_count_host_tasks(MSG_mailbox_get_by_alias(alias),host);
+                       
+}
+
+MSG_error_t 
+MSG_alias_select_from(const char* alias, double timeout, int* PID)
+{
+       m_host_t h = NULL;
+       simdata_host_t h_simdata = NULL;
+       m_task_t t;
+       int first_time = 1;
+       smx_cond_t cond;
+       msg_mailbox_t mailbox;
+       
+       if (PID) 
+       {
+               *PID = -1;
+       }
+       
+       if(timeout == 0.0) 
+       {
+               *PID = MSG_task_listen_from(alias);
+               MSG_RETURN(MSG_OK);
+       } 
+       else 
+       {
+               CHECK_HOST();
+               h = MSG_host_self();
+               h_simdata = h->simdata;
+       
+               DEBUG2("Probing on alias %s (%s)", alias, h->name);
+               
+               mailbox = MSG_mailbox_get_by_alias(alias);      
+               
+               while(MSG_mailbox_is_empty(mailbox))
+               {
+                       if(timeout > 0) 
+                       {
+                               if (!first_time) 
+                               {
+                                       MSG_RETURN(MSG_OK);
+                               }
+                       }
+                       
+                       SIMIX_mutex_lock(h_simdata->mutex);
+                       
+                       xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias);
+                       
+                       cond = SIMIX_cond_init();
+                       
+                       MSG_mailbox_set_cond(mailbox, cond);
+                       
+                       if (timeout > 0) 
+                       {
+                               SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
+                       } 
+                       else 
+                       {
+                               SIMIX_cond_wait(cond, h_simdata->mutex);
+                       }
+                       
+                       SIMIX_cond_destroy(cond);
+                       SIMIX_mutex_unlock(h_simdata->mutex);
+                       
+                       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
+                       {
+                               MSG_RETURN(MSG_HOST_FAILURE);
+                       }
+                       
+                       MSG_mailbox_set_cond(mailbox,NULL);
+                       first_time = 0;
+               }
+               
+               if(NULL == (t = MSG_mailbox_get_head(mailbox)))
+                       MSG_RETURN(MSG_OK);
+                               
+               
+               if (PID) 
+               {
+                       *PID = MSG_process_get_PID(t->simdata->sender);
+               }
+               
+               MSG_RETURN(MSG_OK);
+       }
+}
+                                   
+MSG_error_t 
+MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout)
+{
+       m_process_t process = MSG_process_self();
+       const char* hostname;
+       simdata_task_t task_simdata = NULL;
+       m_host_t local_host = NULL;
+       m_host_t remote_host = NULL;
+       smx_cond_t cond = NULL;
+       
+       /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
+       msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
+
+       CHECK_HOST();
+       
+       task_simdata = task->simdata;
+       task_simdata->sender = process;
+       task_simdata->source = MSG_process_get_host(process);
+       
+       xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
+       
+       task_simdata->comm = NULL;
+       
+       task_simdata->using++;
+       local_host = ((simdata_process_t) process->simdata)->m_host;
+       
+       /* get the host name containing the mailbox */
+       hostname = MSG_mailbox_get_hostname(mailbox);
+
+       remote_host = MSG_get_host_by_name(hostname);
+
+       if(NULL == remote_host)
+               THROW1(not_found_error,0,"Host %s not fount", hostname);
+
+
+       DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));
+       
+       SIMIX_mutex_lock(remote_host->simdata->mutex);
+
+       /* put the task in the mailbox */
+       MSG_mailbox_put(mailbox,task);
+       
+       if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))
+       {
+               DEBUG0("Somebody is listening. Let's wake him up!");
+               SIMIX_cond_signal(cond);
+       }
+
+       
+       
+       SIMIX_mutex_unlock(remote_host->simdata->mutex);
+       
+       SIMIX_mutex_lock(task->simdata->mutex);
+
+       process->simdata->waiting_task = task;
+       
+       if(timeout > 0) 
+       {
+               xbt_ex_t e;
+               double time;
+               double time_elapsed;
+               time = SIMIX_get_clock();
+               
+               TRY 
+               {
+                       /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
+                       while (1) 
+                       {
+                               time_elapsed = SIMIX_get_clock() - time;
+                               SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);
+                               
+                               if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
+                                       break;
+                       }
+               } 
+               CATCH(e) 
+               {
+                       if(e.category==timeout_error) 
+                       {
+                               xbt_ex_free(e);
+                               /* verify if the timeout happened and the communication didn't started yet */
+                               if (task->simdata->comm == NULL) 
+                               {
+                                       process->simdata->waiting_task = NULL;
+                                       
+                                       /* remove the task from the mailbox */
+                                       MSG_mailbox_remove(mailbox,task);
+                                       
+                                       if (task->simdata->receiver) 
+                                       {
+                                               task->simdata->receiver->simdata->waiting_task = NULL;
+                                       }
+                                       
+                                       task->simdata->sender = NULL;
+                                       
+                                       SIMIX_mutex_unlock(task->simdata->mutex);
+                                       MSG_RETURN(MSG_TRANSFER_FAILURE);
+                               }
+                       } 
+                       else 
+                       {
+                               RETHROW;
+                       }
+               }
+       } 
+       else 
+       {
+               while (1) 
+               {
+                       SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
+                       
+                       if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
+                               break;
+               }
+       }
+       
+       DEBUG1("Action terminated %s", task->name);
+       process->simdata->waiting_task = NULL;
+       
+       /* the task has already finished and the pointer must be null */
+       if (task->simdata->receiver) 
+       {
+               task->simdata->receiver->simdata->waiting_task = NULL;
+       }
+       
+       task->simdata->sender = NULL;
+       SIMIX_mutex_unlock(task->simdata->mutex);
+
+       
+       if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)
+       {
+               MSG_RETURN(MSG_OK);
+       } 
+       else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) 
+       {
+               MSG_RETURN(MSG_HOST_FAILURE);
+       } 
+       else 
+       {
+               MSG_RETURN(MSG_TRANSFER_FAILURE);
+       }
+}
+
+MSG_error_t 
+MSG_task_send(m_task_t task,const char* alias)
+{
+       return MSG_task_send_with_timeout(task, alias, -1);
+}
+
+
+MSG_error_t 
+MSG_task_send_bounded(m_task_t task, const char* alias, double rate)
+{
+       task->simdata->rate = rate;
+       return MSG_task_send(task, alias);
+}
+
+int
+MSG_task_listen_from(const char* alias)
+{
+       m_host_t h = NULL;
+       m_task_t t;
+
+       CHECK_HOST();
+
+       h = MSG_host_self();
+
+       DEBUG2("Probing on alias %s(%s)", alias, h->name);
+
+       if(NULL == (t = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
+               return -1;
+
+       return MSG_process_get_PID(t->simdata->sender);
+}
+                             
index 12b0d5c..9866873 100644 (file)
@@ -10,6 +10,7 @@
 #include "msg/private.h"
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
+#include "msg_mailbox.h"
 
 /** \defgroup m_host_management Management functions of Hosts
  *  \brief This section describes the host structure of MSG
 /********************************* Host **************************************/
 m_host_t __MSG_host_create(smx_host_t workstation, void *data)
 {
-  const char *name;
-  simdata_host_t simdata = xbt_new0(s_simdata_host_t, 1);
-  m_host_t host = xbt_new0(s_m_host_t, 1);
-  int i;
-
-  name = SIMIX_host_get_name(workstation);
-  /* Host structure */
-  host->name = xbt_strdup(name);
-  host->simdata = simdata;
-  host->data = data;
-
-  simdata->smx_host = workstation;
-
-  simdata->mbox = xbt_new0(xbt_fifo_t, msg_global->max_channel);
-  for (i = 0; i < msg_global->max_channel; i++)
-    simdata->mbox[i] = xbt_fifo_new();
-
-  simdata->sleeping = xbt_new0(smx_cond_t, msg_global->max_channel);
-  simdata->mutex = SIMIX_mutex_init();
-  SIMIX_host_set_data(workstation, host);
-
-  /* Update global variables */
-  xbt_fifo_unshift(msg_global->host, host);
-
-  return host;
+       const char *name;
+       simdata_host_t simdata = xbt_new0(s_simdata_host_t, 1);
+       m_host_t host = xbt_new0(s_m_host_t, 1);
+       int i;
+       
+       char alias[MAX_ALIAS_NAME +1] = {0}; /* buffer used to build the key of the mailbox */
+       msg_mailbox_t mailbox;
+       
+       name = SIMIX_host_get_name(workstation);
+       /* Host structure */
+       host->name = xbt_strdup(name);
+       host->simdata = simdata;
+       host->data = data;
+       
+       simdata->smx_host = workstation;
+       
+       simdata->mbox = xbt_new0(xbt_fifo_t, msg_global->max_channel);
+       
+       for (i = 0; i < msg_global->max_channel; i++)
+       {
+               sprintf(alias,"%s:%d",name,i);
+               
+               /* the key of the mailbox (in this case) is build from the name of the host and the channel number */
+               mailbox = MSG_mailbox_new(alias);
+               MSG_mailbox_set_hostname(mailbox,name);
+               memset(alias,0,MAX_ALIAS_NAME +1);
+               
+               simdata->mbox[i] = xbt_fifo_new();
+       }
+       
+       simdata->sleeping = xbt_new0(smx_cond_t, msg_global->max_channel);
+       simdata->mutex = SIMIX_mutex_init();
+       SIMIX_host_set_data(workstation, host);
+       
+       /* Update global variables */
+       xbt_fifo_unshift(msg_global->host, host);
+       
+       return host;
 }
 
 /** \ingroup m_host_management
@@ -126,6 +140,7 @@ void __MSG_host_destroy(m_host_t host)
 {
   simdata_host_t simdata = NULL;
   int i = 0;
+  char alias[MAX_ALIAS_NAME +1] = {0}; /* buffer used to build the key of the mailbox */
 
   xbt_assert0((host != NULL), "Invalid parameters");
 
@@ -134,7 +149,14 @@ void __MSG_host_destroy(m_host_t host)
   simdata = (host)->simdata;
 
   for (i = 0; i < msg_global->max_channel; i++)
+  {
+       sprintf(alias,"%s:%d",host->name,i);
+       xbt_dict_remove(MSG_get_mailboxes(),alias);
+       memset(alias,0,MAX_ALIAS_NAME +1);
+       
     xbt_fifo_free(simdata->mbox[i]);
+  }
+
   free(simdata->mbox);
   free(simdata->sleeping);
   SIMIX_mutex_destroy(simdata->mutex);
index 192a816..f853c7d 100644 (file)
@@ -11,6 +11,7 @@
 #include "xbt/log.h"
 #include "xbt/str.h"
 #include "xbt/ex.h"            /* ex_backtrace_display */
+#include "msg/msg_mailbox.h"
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix,
                                "Logging specific to SIMIX (kernel)");
 
@@ -120,7 +121,10 @@ void SIMIX_global_init(int *argc, char **argv)
        xbt_swag_new(xbt_swag_offset(proc, process_hookup));
     simix_global->current_process = NULL;
     simix_global->registered_functions = xbt_dict_new();
-
+       
+       /* initialization of the mailbox module */
+       MSG_mailbox_mod_init();
+       
     simix_global->create_process_function = NULL;
     simix_global->kill_process_function = NULL;
     simix_global->cleanup_process_function = SIMIX_process_cleanup;
@@ -296,6 +300,10 @@ void SIMIX_clean(void)
   simix_config_finalize();
   free(simix_global);
   simix_global = NULL;
+  
+  /* cleanup all resources in the mailbox module */
+  MSG_mailbox_mod_exit();
+  
   surf_exit();
 
   return;