From: cherierm Date: Mon, 7 Jan 2008 16:36:32 +0000 (+0000) Subject: This change introduce the new mailbox concept. X-Git-Tag: v3.3~691 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/f3ba71e33e73c61ca467a37dda2bc3b9d5f9ed1b?hp=bb38b738b6810c3e4c0be80ffc5de5eb25e62435 This change introduce the new mailbox concept. A mailbox is a storage used to put or get an msg task. It is identified by an alias. For more information about the usage of the mailbox see the java examples alias0 and alias1 and the C example masterslave_forwarder_with_alias in the directory /examples/msg/alias. This change impacts the following files : include/msg/msg.h Add the declaration of the functions related with the new concept of mailboxes. src/msg/host.c Adapte the functions __MSG_host_create and __MSG_host_destroy to the new concept of mailboxes. src/gos/gos.c Implementation of the functions related withe the new concept of mailboxes. src/simix/smx_global.c Initialization et finalization of the mailbox module. src/java/jmsg.c Implementation of the jni callback functions related with the mailbox concept. src/java/jmsg.h Declaration of the jni callback functions related with the mailbox concept. ApplicationHandler.java Implementation of the operations related with the properties of a process Host.java Implementation of the operations related with the new concept of mailbox MsgNative.java Implementation of the operations related with the new concept of mailbox Process.java Implementation of the operations related with the new concept of mailbox Task.java Implementation of the operations related with the new concept of mailbox msg_mailbox.h Header containing the declaration of the functions related with the mailbox concept msg_mailbox.c Source file containing the implementation of the functions related with the mailbox concept directories alia0 et alias1 contain two examples to illustrate the different usages of the mailbox concept using the java interface for msg. directory alias contains the example masterslave_forwarder_with_alias which illustrate the usage of mailbox instead channel as in the masterslave_forwarder example. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@5163 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/build/vc7/simgrid/all.sln b/build/vc7/simgrid/all.sln index d577563ebf..95524176b1 100644 --- a/build/vc7/simgrid/all.sln +++ b/build/vc7/simgrid/all.sln @@ -69,6 +69,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "graphxml_usage", "..\testsu EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "teshsuite", "teshsuite", "{C48B8100-6A37-4206-9DFF-A7216B0748F8}" EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "masterslave_forwarder_with_alias", "..\examples\msg\masterslave_forwarder_with_alias\masterslave_forwarder_with_alias.vcproj", "{7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Win32 = Debug|Win32 @@ -171,15 +173,19 @@ Global {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Debug|Win32.Build.0 = Debug|Win32 {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Release|Win32.ActiveCfg = Release|Win32 {05FB934F-D2CE-420F-9DAA-E7EE3FC0E381}.Release|Win32.Build.0 = Release|Win32 + {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Debug|Win32.ActiveCfg = Debug|Win32 + {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Debug|Win32.Build.0 = Debug|Win32 + {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Release|Win32.ActiveCfg = Release|Win32 + {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1}.Release|Win32.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution + {617987DE-296D-4E0C-BD53-8D639E6FDA09} = {B24BD5D4-9D03-4644-9792-416396F9C34E} {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} = {F356BA40-006E-4F59-AE49-E25171DF2016} {0E867394-9B56-42DA-AB3F-E59080B653A5} = {F356BA40-006E-4F59-AE49-E25171DF2016} {65F6A759-3E01-4265-AB50-71EF60AD126C} = {F356BA40-006E-4F59-AE49-E25171DF2016} - {4A8A0642-2553-4BB1-ADB2-264F2E799EF9} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {D5E757F9-B469-488D-BBA4-C04B167961BD} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {EEB461EE-0D1C-4D77-8010-3FB5897ED347} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {0247C196-B310-4A81-AF0F-B65C8632F69C} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} @@ -187,6 +193,8 @@ Global {9A8719E2-B66F-4B33-854B-6F6D226DE0BC} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {D8A5626B-3188-4F74-95F0-732F84F53A1C} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {FD995103-FD5C-4A5B-95E6-AF512BC6D749} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} + {4A8A0642-2553-4BB1-ADB2-264F2E799EF9} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} + {7BD90322-A802-4ACB-9997-E8ABB0C8C6E1} = {61D5BE96-5482-49E0-8F2F-ED6944B04CAD} {F2CCCCB9-0342-44E1-B1E3-53E6103EC294} = {0E867394-9B56-42DA-AB3F-E59080B653A5} {DFDB47E0-83D2-41F1-AF4B-DC5EA312A08C} = {0E867394-9B56-42DA-AB3F-E59080B653A5} {A95FC8DF-52C7-45FD-B7D1-D93081DE6BB1} = {0E867394-9B56-42DA-AB3F-E59080B653A5} @@ -201,7 +209,6 @@ Global {8147D9B1-528F-4D2F-BA5D-17793CDBFBBB} = {CEF665B7-5EB9-4055-993B-32F7B50D87A1} {5DD95E75-D229-4136-A2ED-216D23749A7B} = {CEF665B7-5EB9-4055-993B-32F7B50D87A1} {9EF11810-1CAF-4588-A390-D51E49BAED35} = {536BA0EC-107D-467E-BFF8-D7BA634023BC} - {617987DE-296D-4E0C-BD53-8D639E6FDA09} = {B24BD5D4-9D03-4644-9792-416396F9C34E} {63A06558-AAFC-491F-A294-3DA98A20EDA9} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A} {DF89E558-10A4-42E8-B438-4C60904F87F4} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A} {78D7DC8F-D24B-4160-95F5-7101C17A58BA} = {D4B14EC6-CCA0-4681-90D4-F435F05EFD3A} diff --git a/build/vc7/simgrid/simgrid.vcproj b/build/vc7/simgrid/simgrid.vcproj index bf94a65b22..ad9e0dff0c 100644 --- a/build/vc7/simgrid/simgrid.vcproj +++ b/build/vc7/simgrid/simgrid.vcproj @@ -411,6 +411,10 @@ RelativePath="..\..\..\src\msg\msg_config.c" > + + @@ -571,10 +575,6 @@ RelativePath="..\..\..\src\surf\workstation.c" > - - diff --git a/include/msg/msg.h b/include/msg/msg.h index 1424a4c20a..cf80731360 100644 --- a/include/msg/msg.h +++ b/include/msg/msg.h @@ -132,5 +132,36 @@ XBT_PUBLIC(double) MSG_task_get_compute_duration(m_task_t task); XBT_PUBLIC(double) MSG_task_get_remaining_computation(m_task_t task); XBT_PUBLIC(double) MSG_task_get_data_size(m_task_t task); + +XBT_PUBLIC(MSG_error_t) +MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host); + +XBT_PUBLIC(MSG_error_t) +MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout); + +XBT_PUBLIC(MSG_error_t) +MSG_task_receive(m_task_t * task, const char* alias); + +XBT_PUBLIC(int) +MSG_task_listen(const char* alias); + +XBT_PUBLIC(int) +MSG_task_listen_from_host(const char* alias, m_host_t host); + +XBT_PUBLIC(MSG_error_t) +MSG_alias_select_from(const char* alias, double timeout, int* PID); + +XBT_PUBLIC(MSG_error_t) +MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout); + +XBT_PUBLIC(MSG_error_t) +MSG_task_send(m_task_t task,const char* alias); + +XBT_PUBLIC(MSG_error_t) +MSG_task_send_bounded(m_task_t task, const char* alias, double rate); + +XBT_PUBLIC(int) +MSG_task_listen_from(const char* alias); + SG_END_DECL() #endif diff --git a/src/java/jmsg.c b/src/java/jmsg.c index 5239f29f70..f0aae36f84 100644 --- a/src/java/jmsg.c +++ b/src/java/jmsg.c @@ -23,6 +23,8 @@ #include "jmsg.h" +#include "msg/msg_mailbox.h" + XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg); static JavaVM * __java_vm = NULL; @@ -61,7 +63,8 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce jstring jname; /* the name of the java process instance */ const char* name; /* the C name of the process */ m_process_t process; /* the native process to create */ - + char alias[MAX_ALIAS_NAME + 1] = {0}; + msg_mailbox_t mailbox; DEBUG4("Java_simgrid_msg_MsgNative_processCreate(env=%p,cls=%p,jproc=%p,jhost=%p)", env,cls,jprocess_arg,jhost); @@ -96,6 +99,8 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce (*env)->ReleaseStringUTFChars(env, jname, name); process->simdata->m_host = jhost_get_native(env,jhost); + + if( ! (process->simdata->m_host) ) { /* not binded */ free(process->simdata); free(process->data); @@ -132,6 +137,12 @@ Java_simgrid_msg_MsgNative_processCreate(JNIEnv* env, jclass cls, jobject jproce /* add the process to the list of the processes of the simulation */ xbt_fifo_unshift(msg_global->process_list, process); + + sprintf(alias,"%s:%s",(process->simdata->m_host->simdata->smx_host)->name,process->name); + + mailbox = MSG_mailbox_new(alias); + MSG_mailbox_set_hostname(mailbox, process->simdata->m_host->simdata->smx_host->name); + } @@ -733,6 +744,7 @@ Java_simgrid_msg_MsgNative_taskGet(JNIEnv* env, jclass cls, return (jobject)task->data; } + JNIEXPORT jboolean JNICALL Java_simgrid_msg_MsgNative_taskProbe(JNIEnv* env, jclass cls, jint chan_id) { return (jboolean)MSG_task_Iprobe(chan_id); @@ -786,6 +798,8 @@ Java_simgrid_msg_MsgNative_hostPut(JNIEnv* env, jclass cls, jxbt_throw_native(env, xbt_strdup("MSG_task_put_with_timeout() failed")); } + + JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_hostPutBounded(JNIEnv* env, jclass cls, jobject jhost, jint chan_id, jobject jtask, @@ -987,4 +1001,135 @@ Java_simgrid_msg_MsgNative_selectContextFactory(JNIEnv * env, jclass class,jstri if(rv) jxbt_throw_native(env, xbt_strdup("xbt_select_context_factory() failed")); } + +JNIEXPORT void JNICALL +Java_simgrid_msg_MsgNative_taskSend(JNIEnv* env, jclass cls, + jstring jalias, jobject jtask, + jdouble jtimeout) { + + MSG_error_t rv; + const char* alias = (*env)->GetStringUTFChars(env, jalias, 0); + + m_task_t task = jtask_to_native_task(jtask,env); + + + if(!task){ + (*env)->ReleaseStringUTFChars(env, jalias, alias); + jxbt_throw_notbound(env,"task",jtask); + return; + } + + rv = MSG_task_send_with_timeout(task,alias,(double)jtimeout); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + if(MSG_OK != rv) + jxbt_throw_native(env, xbt_strdup("MSG_task_send_with_timeout() failed")); + +} + +JNIEXPORT void JNICALL +Java_simgrid_msg_MsgNative_taskSendBounded(JNIEnv* env, jclass cls, + jstring jalias, jobject jtask, + jdouble jmaxRate) { + m_task_t task = jtask_to_native_task(jtask,env); + MSG_error_t rv; + const char* alias; + + if(!task){ + jxbt_throw_notbound(env,"task",jtask); + return; + } + + alias = (*env)->GetStringUTFChars(env, jalias, 0); + + rv = MSG_task_send_bounded(task,alias,(double)jmaxRate); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + if(MSG_OK != rv) + jxbt_throw_native(env, xbt_strdup("MSG_task_send_bounded() failed")); +} + +JNIEXPORT jobject JNICALL +Java_simgrid_msg_MsgNative_taskReceive(JNIEnv* env, jclass cls, + jstring jalias, jdouble jtimeout, jobject jhost) { + MSG_error_t rv; + m_task_t task = NULL; + m_host_t host = NULL; + const char* alias; + + if (jhost) { + host = jhost_get_native(env,jhost); + + if(!host){ + jxbt_throw_notbound(env,"host",jhost); + return NULL; + } + } + + alias = (*env)->GetStringUTFChars(env, jalias, 0); + + rv = MSG_task_receive_ext(&task,alias,(double)jtimeout,host); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + if (MSG_OK != rv) + { + jxbt_throw_native(env, xbt_strdup("MSG_task_receive_ext() failed")); + return NULL; + } + + return (jobject)task->data; +} + +JNIEXPORT jboolean JNICALL +Java_simgrid_msg_MsgNative_taskListen(JNIEnv* env, jclass cls, jstring jalias) { + + const char* alias; + int rv; + + alias = (*env)->GetStringUTFChars(env, jalias, 0); + + rv = MSG_task_listen(alias); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + return (jboolean)rv; +} + +JNIEXPORT jint JNICALL +Java_simgrid_msg_MsgNative_taskListenFromHost(JNIEnv* env, jclass cls, jstring jalias, jobject jhost) { + + int rv; + const char* alias; + + m_host_t host = jhost_get_native(env,jhost); + + if(!host){ + jxbt_throw_notbound(env,"host",jhost); + return -1; + } + + alias = (*env)->GetStringUTFChars(env, jalias, 0); + + rv = MSG_task_listen_from_host(alias,host); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + return (jint)rv; +} + +JNIEXPORT jint JNICALL +Java_simgrid_msg_MsgNative_taskListenFrom(JNIEnv* env, jclass cls, jstring jalias) { + + int rv; + const char* alias = (*env)->GetStringUTFChars(env, jalias, 0); + + rv = MSG_task_listen_from(alias); + + (*env)->ReleaseStringUTFChars(env, jalias, alias); + + return (jint)rv; +} diff --git a/src/java/jmsg.h b/src/java/jmsg.h index 0ee91bb284..93f4ee668b 100644 --- a/src/java/jmsg.h +++ b/src/java/jmsg.h @@ -277,6 +277,10 @@ JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_taskExecute JNIEXPORT jobject JNICALL Java_simgrid_msg_MsgNative_taskGet (JNIEnv *, jclass, jint, jdouble, jobject); +JNIEXPORT jobject JNICALL +Java_simgrid_msg_MsgNative_taskReceive + (JNIEnv *, jclass, jstring, jdouble, jobject); + /* * Class simgrid_msg_Msg * Method taskHasPendingCommunication @@ -309,6 +313,10 @@ JNIEXPORT jint JNICALL Java_simgrid_msg_MsgNative_taskProbeHost JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_hostPut (JNIEnv *, jclass, jobject, jint, jobject, jdouble); +JNIEXPORT void JNICALL +Java_simgrid_msg_MsgNative_taskSend + (JNIEnv *, jclass, jstring, jobject, jdouble); + /* * Class simgrid_msg_Msg * Method hostPutBounded @@ -370,4 +378,16 @@ Java_simgrid_msg_Msg_createEnvironment(JNIEnv* env, jclass cls,jstring jplatform JNIEXPORT void JNICALL Java_simgrid_msg_MsgNative_selectContextFactory(JNIEnv *, jclass, jstring); +JNIEXPORT void JNICALL +Java_simgrid_msg_MsgNative_taskSendBounded(JNIEnv*, jclass, jstring, jobject, jdouble); + +JNIEXPORT jboolean JNICALL +Java_simgrid_msg_MsgNative_taskListen(JNIEnv*, jclass, jstring); + +JNIEXPORT jint JNICALL +Java_simgrid_msg_MsgNative_taskListenFromHost(JNIEnv*, jclass, jstring, jobject); + +JNIEXPORT jint JNICALL +Java_simgrid_msg_MsgNative_taskListenFrom(JNIEnv*, jclass, jstring); + #endif /* !MSG4JAVA_H */ diff --git a/src/java/simgrid/msg/ApplicationHandler.java b/src/java/simgrid/msg/ApplicationHandler.java index 14d5e5732d..fd59889e31 100644 --- a/src/java/simgrid/msg/ApplicationHandler.java +++ b/src/java/simgrid/msg/ApplicationHandler.java @@ -12,6 +12,7 @@ package simgrid.msg; import java.util.Vector; +import java.util.Hashtable; import org.xml.sax.*; import org.xml.sax.helpers.*; @@ -39,6 +40,8 @@ public final class ApplicationHandler extends DefaultHandler { * of the process object. */ public Vector args; + + public Hashtable properties; /** * The name of the host of the process. @@ -49,12 +52,14 @@ public final class ApplicationHandler extends DefaultHandler { * The function of the process. */ private String function; + /** * Default constructor. */ public ProcessFactory() { this.args = new Vector(); + this.properties = new Hashtable(); this.hostName = null; this.function = null; } @@ -72,8 +77,11 @@ public final class ApplicationHandler extends DefaultHandler { this.hostName = hostName; this.function = function; - if (!args.isEmpty()) - args.clear(); + if (!this.args.isEmpty()) + this.args.clear(); + + if(!this.properties.isEmpty()) + this.properties.clear(); } /** * This method is called by the startElement() handler. @@ -85,6 +93,16 @@ public final class ApplicationHandler extends DefaultHandler { */ public void registerProcessArg(String arg) { this.args.add(arg); } + + public void setProperty(String id, String value) + { + this.properties.put(id,value); + } + + public String getHostName() + { + return this.hostName; + } public void createProcess() { try { @@ -92,7 +110,7 @@ public final class ApplicationHandler extends DefaultHandler { Class cls = Class.forName(this.function); simgrid.msg.Process process = (simgrid.msg.Process) cls.newInstance(); - process.name = process.getName(); //this.function; + process.name = /*process.getName();*/ this.function; process.id = simgrid.msg.Process.nextProcessId++; Host host = Host.getByName(this.hostName); @@ -102,6 +120,9 @@ public final class ApplicationHandler extends DefaultHandler { for (int index = 0; index < size; index++) process.args.add(args.get(index)); + + process.properties = this.properties; + this.properties = new Hashtable(); } catch(JniException e) { System.out.println(e.toString()); @@ -159,6 +180,8 @@ public final class ApplicationHandler extends DefaultHandler { Attributes attr) { if (localName.equals("process")) onProcessIdentity(attr); + else if(localName.equals("prop")) + onProperty(attr); else if (localName.equals("argument")) onProcessArg(attr); } @@ -169,6 +192,10 @@ public final class ApplicationHandler extends DefaultHandler { public void onProcessIdentity(Attributes attr) { processFactory.setProcessIdentity(attr.getValue(0), attr.getValue(1)); } + + public void onProperty(Attributes attr) { + processFactory.setProperty(attr.getValue(0), attr.getValue(1)); + } /** * process arguments handler. diff --git a/src/java/simgrid/msg/Host.java b/src/java/simgrid/msg/Host.java index 23094dd0f2..69737f24f0 100644 --- a/src/java/simgrid/msg/Host.java +++ b/src/java/simgrid/msg/Host.java @@ -190,10 +190,48 @@ try { double timeout) throws JniException, NativeException { MsgNative.hostPut(this, channel, task, timeout); } + + /** Send the given task to the given channel of the host (capping the emision rate to #maxrate) */ public void putBounded(int channel, Task task, double maxrate) throws JniException, NativeException { MsgNative.hostPutBounded(this, channel, task, maxrate); -} } + } + + /** Send the given task to mailbox identified by the default alias */ + public void send(Task task) throws JniException, NativeException { + String alias = this.getName() + ":" + Process.currentProcess().msgName(); + MsgNative.taskSend(alias, task, -1); + } + + /** Send the given task to the mailbox associated with the specified alias */ + + public void send(String alias, Task task) throws JniException, NativeException { + MsgNative.taskSend(alias, task, -1); + } + + /** Send the given task in the mailbox associated with the alias of the current host (waiting at most #timeout seconds) */ + public void send(Task task, double timeout) throws JniException, NativeException { + String alias = this.getName() + ":" + Process.currentProcess().msgName(); + MsgNative.taskSend(alias, task, timeout); + } + + /** Send the given task to mailbox associated with the specified alias (waiting at most #timeout seconds) */ + public void send(String alias, Task task, double timeout) throws JniException, NativeException { + MsgNative.taskSend(alias, task, timeout); + } + + /** Send the given task to the mailbox associated with the default alias (capping the emision rate to #maxrate) */ + public void sendBounded(Task task, double maxrate) throws JniException, NativeException { + String alias = this.getName() + ":" + Process.currentProcess().msgName(); + + MsgNative.taskSendBounded(alias, task, maxrate); + } + + /** Send the given task to the mailbox associated with the specified alias (capping the emision rate to #maxrate) */ + public void sendBounded(String alias, Task task, double maxrate) throws JniException, NativeException { + MsgNative.taskSendBounded(alias, task, maxrate); + } +} diff --git a/src/java/simgrid/msg/MsgNative.java b/src/java/simgrid/msg/MsgNative.java index 26d2a97b6f..1cb47da32e 100644 --- a/src/java/simgrid/msg/MsgNative.java +++ b/src/java/simgrid/msg/MsgNative.java @@ -529,6 +529,30 @@ final class MsgNative { final static native Task taskGet(int channel, double timeout, Host host) throws JniException, NativeException; + + + /****************************************************************** + * Task methods relative with the alias * + ******************************************************************/ + + + final static native void taskSend(String alias, Task task, double timeout) + throws JniException, NativeException; + + + final static native Task taskReceive(String alias, double timeout, Host host) + throws JniException, NativeException; + + final static native int taskListenFrom(String alias) + throws JniException, NativeException; + + + final static native boolean taskListen(String alias) + throws JniException; + + final static native int taskListenFromHost(String alias, Host host) + throws JniException; + /** @@ -574,6 +598,7 @@ final class MsgNative { */ final static native int taskProbeHost(int channel, Host host) throws JniException; + /****************************************************************** * Task emission methods * @@ -614,5 +639,20 @@ final class MsgNative { final static native void hostPutBounded(Host host, int channel, Task task, double max_rate) throws JniException, NativeException; + + /** + * The natively implemented method to send a task in a mailbox associated with an alias, with a bounded transmition + * rate. + * + * @param alias The alias of the mailbox. + * @param task The task to put. + * @param max_rate The bounded transmition rate. + * + * @exception InvalidTaskException if the task is not valid. + * InvalidHostException if the host is not valid. + * MsgException if the operation failed. + */ + final static native void taskSendBounded(String alias, Task task, double maxrate) + throws JniException, NativeException; } diff --git a/src/java/simgrid/msg/Process.java b/src/java/simgrid/msg/Process.java index c678e6576f..04f26b9945 100644 --- a/src/java/simgrid/msg/Process.java +++ b/src/java/simgrid/msg/Process.java @@ -68,6 +68,8 @@ public abstract class Process extends Thread { * The native functions use this identifier to synchronize the process. */ public long id; + + public Hashtable properties; /** * The name of the process. @@ -92,6 +94,7 @@ public abstract class Process extends Thread { this.name = null; this.bind = 0; this.args = new Vector(); + this.properties = null; schedBegin = new Sem(0); schedEnd = new Sem(0); } @@ -165,7 +168,8 @@ public abstract class Process extends Thread { if (name == null) throw new NullPointerException("Process name cannot be NULL"); - + this.properties = null; + this.args = new Vector(); if (null != args) @@ -179,6 +183,8 @@ public abstract class Process extends Thread { MsgNative.processCreate(this, host); } + + /** * This method kills all running process of the simulation. * @@ -390,32 +396,118 @@ public abstract class Process extends Thread { } catch(InterruptedException e) { } } - - - /** Send the given task to given host on given channel */ - public void taskSend(Host host, int channel, + + /** Send the given task to given host on given channel */ + public void taskPut(Host host, int channel, Task task) throws NativeException, JniException { MsgNative.hostPut(host, channel, task, -1); } + + /** Send the given task to given host on given channel (waiting at most given time)*/ + public void taskPut(Host host, int channel, + Task task, double timeout) throws NativeException, JniException { + MsgNative.hostPut(host, channel, task, timeout); + } /** Receive a task on given channel */ - public Task taskReceive(int channel) throws NativeException, + public Task taskGet(int channel) throws NativeException, JniException { return MsgNative.taskGet(channel, -1, null); } /** Receive a task on given channel (waiting at most given time) */ - public Task taskReceive(int channel, + public Task taskGet(int channel, double timeout) throws NativeException, JniException { return MsgNative.taskGet(channel, timeout, null); } /** Receive a task on given channel from given sender */ - public Task taskReceive(int channel, Host host) throws NativeException, + public Task taskGet(int channel, Host host) throws NativeException, JniException { return MsgNative.taskGet(channel, -1, host); } /** Receive a task on given channel from given sender (waiting at most given time) */ - public Task taskReceive(int channel, double timeout, + public Task taskGet(int channel, double timeout, Host host) throws NativeException, JniException { return MsgNative.taskGet(channel, timeout, host); } + + /** Send the given task in the mailbox associated with the specified alias (waiting at most given time) */ + public void taskSend(String alias, + Task task, double timeout) throws NativeException, JniException { + MsgNative.taskSend(alias, task, timeout); + } + + /** Send the given task in the mailbox associated with the specified alias*/ + public void taskSend(String alias, + Task task) throws NativeException, JniException { + MsgNative.taskSend(alias, task, -1); + } + + /** Send the given task in the mailbox associated with the default alias (defaultAlias = "hostName:processName") */ + public void taskSend(Task task) throws NativeException, JniException { + + String alias = Host.currentHost().getName() + ":" + this.msgName(); + MsgNative.taskSend(alias, task, -1); + } + + /** Send the given task in the mailbox associated with the default alias (waiting at most given time) */ + public void taskSend(Task task, double timeout) throws NativeException, JniException { + + String alias = Host.currentHost().getName() + ":" + this.msgName(); + MsgNative.taskSend(alias, task, timeout); + } + + + /** Receive a task on mailbox associated with the specified alias */ + public Task taskReceive(String alias) throws NativeException, + JniException { + return MsgNative.taskReceive(alias, -1.0, null); + } + + /** Receive a task on mailbox associated with the default alias */ + public Task taskReceive() throws NativeException, + JniException { + String alias = Host.currentHost().getName() + ":" + this.msgName(); + return MsgNative.taskReceive(alias, -1.0, null); + } + + /** Receive a task on mailbox associated with the specified alias (waiting at most given time) */ + public Task taskReceive(String alias, + double timeout) throws NativeException, + JniException { + return MsgNative.taskReceive(alias, timeout, null); + } + + /** Receive a task on mailbox associated with the default alias (waiting at most given time) */ + public Task taskReceive(double timeout) throws NativeException, + JniException { + String alias = Host.currentHost().getName() + ":" + this.msgName(); + return MsgNative.taskReceive(alias, timeout, null); + } + + /** Receive a task on mailbox associated with the specified alias from given sender */ + public Task taskReceive(String alias, + double timeout, Host host) throws NativeException, + JniException { + return MsgNative.taskReceive(alias, timeout, host); + } + + /** Receive a task on mailbox associated with the default alias from given sender (waiting at most given time) */ + public Task taskReceive(double timeout, Host host) throws NativeException, + JniException { + String alias = Host.currentHost().getName() + ":" + this.msgName(); + return MsgNative.taskReceive(alias, timeout, host); + } + + /** Receive a task on mailbox associated with the specified alias from given sender*/ + public Task taskReceive(String alias, + Host host) throws NativeException, + JniException { + return MsgNative.taskReceive(alias, -1.0, host); + } + /** Receive a task on mailbox associated with the default alias from given sender */ + public Task taskReceive( Host host) throws NativeException, + JniException { + String alias = Host.currentHost().getName() + ":" + this.msgName(); + return MsgNative.taskReceive(alias, -1.0, host); + } } diff --git a/src/java/simgrid/msg/Task.java b/src/java/simgrid/msg/Task.java index 129f5f3c1c..584fe57ec7 100644 --- a/src/java/simgrid/msg/Task.java +++ b/src/java/simgrid/msg/Task.java @@ -176,6 +176,7 @@ public class Task { Host host) throws JniException, NativeException { return MsgNative.taskGet(channel, timeout, host); } + /** * Probes whether there is a waiting task on the given channel of local host * @@ -191,6 +192,7 @@ public class Task { */ public static int probe(int channel, Host host) throws JniException { return MsgNative.taskProbeHost(channel, host); } + /* * * * * * Computation-related * * * * * *//** @@ -219,4 +221,189 @@ public class Task { */ protected void finalize() throws JniException, NativeException { if (this.bind != 0) MsgNative.taskDestroy(this); -}} + } + + /** + * Send the task on the mailbox identified by the default alias (defaultAlias = "currentHostName:CurrentProcessName") + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void send() throws JniException,NativeException { + + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + + MsgNative.taskSend(alias, this, -1); + } + + /** + * Send the task on the mailbox identified by the specified alias + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void send(String alias) throws JniException,NativeException { + MsgNative.taskSend(alias, this, -1); + } + + /** + * Send the task on the mailbox identified by the default alias (wait at most #timeout seconds) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void send(double timeout) throws JniException,NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + MsgNative.taskSend(alias, this, timeout); + } + + /** + * Send the task on the mailbox identified by the specified alias (wait at most #timeout seconds) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void send(String alias, double timeout) throws JniException,NativeException { + MsgNative.taskSend(alias, this, timeout); + } + + + /** + * Send the task on the mailbox identified by the default alias (capping the emision rate to #maxrate) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void sendBounded(double maxrate) throws JniException,NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + MsgNative.taskSendBounded(alias, this, maxrate); + } + + /** + * Send the task on the mailbox identified by the specified alias (capping the emision rate to #maxrate) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public void sendBounded(String alias, double maxrate) throws JniException,NativeException { + MsgNative.taskSendBounded(alias, this, maxrate); + } + + /** + * Retrieves next task from the mailbox identified by the default alias (defaultAlias = "currentHostName:CurrentProcessName") + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static Task receive() throws JniException, NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + return MsgNative.taskReceive(alias, -1.0, null); + } + + /** + * Retrieves next task from the mailbox identified by the specified alias + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + + public static Task receive(String alias) throws JniException, NativeException { + return MsgNative.taskReceive(alias, -1.0, null); + } + + /** + * Retrieves next task on the mailbox identified by the specified alias (wait at most #timeout seconds) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static Task receive(String alias, double timeout) throws JniException, NativeException { + return MsgNative.taskReceive(alias, timeout, null); + } + + /** + * Retrieves next task sended by a given host on the mailbox identified by the specified alias + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + + public static Task receive(String alias, Host host) throws JniException, NativeException { + return MsgNative.taskReceive(alias, -1.0, host); + } + + /** + * Retrieves next task sended by a given host on the mailbox identified by the specified alias (wait at most #timeout seconds) + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static Task receive(String alias, double timeout, Host host) throws JniException, NativeException { + return MsgNative.taskReceive(alias, timeout, host); + } + + /** + * Listen whether there is a waiting task on the mailbox identified by the default alias of local host + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static boolean listen() throws JniException, NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + + return MsgNative.taskListen(alias); + } + + /** + * Test whether there is a pending communication on the mailbox identified by the specified alias, and who sent it + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static int listenFrom(String alias) throws JniException, NativeException { + return MsgNative.taskListenFrom(alias); + } + + /** + * Test whether there is a pending communication on the mailbox identified by the default alias, of the current host, and who sent it + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static int listenFrom() throws JniException, NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + + return MsgNative.taskListenFrom(alias); + } + + /** + * Listen whether there is a waiting task on the mailbox identified by the specified alias + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static boolean listen(String alias) throws JniException, NativeException { + return MsgNative.taskListen(alias); + } + + /** + * Counts the number of tasks waiting to be received on the #mailbox identified by the specified alias and sended by the current host. + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static int listenFromHost(Host host) throws JniException, NativeException { + String alias = Host.currentHost().getName() + ":" + Process.currentProcess().msgName(); + return MsgNative.taskListenFromHost(alias,host); + } + + /** + * Counts the number of tasks waiting to be received on the #mailbox identified by the specified alia and sended by the specified #host. + * + * @exception JniException if the binding mecanism fails. + * @exception NativeException if the retrival fails. + */ + public static int listenFromHost(String alias, Host host) throws JniException, NativeException { + return MsgNative.taskListenFromHost(alias, host); + } +} diff --git a/src/msg/gos.c b/src/msg/gos.c index 82ce8e6630..8297d8f71d 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -10,6 +10,8 @@ #include "msg/private.h" #include "xbt/sysdep.h" #include "xbt/log.h" +#include "msg_mailbox.h" + XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)"); @@ -828,3 +830,455 @@ MSG_error_t MSG_get_errno(void) { return PROCESS_GET_ERRNO(); } + +MSG_error_t +MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host) +{ + m_process_t process = MSG_process_self(); + m_task_t t = NULL; + m_host_t h = NULL; + simdata_task_t t_simdata = NULL; + simdata_host_t h_simdata = NULL; + int first_time = 1; + xbt_fifo_item_t item = NULL; + + smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet + + /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */ + msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias); + + CHECK_HOST(); + + /* Sanity check */ + xbt_assert0(task, "Null pointer for the task storage"); + + if (*task) + CRITICAL0("MSG_task_get() was asked to write in a non empty task struct."); + + /* Get the task */ + h = MSG_host_self(); + h_simdata = h->simdata; + + DEBUG2("Waiting for a task on channel aliased by %s (%s)", alias, h->name); + + SIMIX_mutex_lock(h->simdata->mutex); + + while (1) + { + /* if the mailbox is empty (has no task */ + if(!MSG_mailbox_is_empty(mailbox)) + { + if(!host) + { + /* pop the head of the mailbox */ + t = MSG_mailbox_pop_head(mailbox); + break; + } + else + { + /* get the first task of the host */ + if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host))) + break; + } + } + + if(timeout > 0) + { + if (!first_time) + { + SIMIX_mutex_unlock(h->simdata->mutex); + /* set the simix condition of the mailbox to NULL */ + MSG_mailbox_set_cond(mailbox, NULL); + SIMIX_cond_destroy(cond); + MSG_RETURN(MSG_TRANSFER_FAILURE); + } + } + + xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel aliased by %s", alias); + + cond = SIMIX_cond_init(); + + /* set the condition of the mailbox */ + MSG_mailbox_set_cond(mailbox, cond); + + if (timeout > 0) + SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout); + else + SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex); + + + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) + MSG_RETURN(MSG_HOST_FAILURE); + + first_time = 0; + } + + SIMIX_mutex_unlock(h->simdata->mutex); + + DEBUG1("OK, got a task (%s)", t->name); + /* clean conditional */ + if (cond) + { + SIMIX_cond_destroy(cond); + + MSG_mailbox_set_cond(mailbox,NULL); + } + + t_simdata = t->simdata; + t_simdata->receiver = process; + *task = t; + + SIMIX_mutex_lock(t_simdata->mutex); + + /* Transfer */ + /* create SIMIX action to the communication */ + t_simdata->comm = SIMIX_action_communicate( + t_simdata->sender->simdata->m_host->simdata->smx_host, + process->simdata->m_host->simdata->smx_host, + t->name, + t_simdata->message_size, + t_simdata->rate + ); + + /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */ + if (MSG_process_is_suspended(t_simdata->sender)) + { + DEBUG1("Process sender (%s) suspended", t_simdata->sender->name); + SIMIX_action_set_priority(t_simdata->comm, 0); + } + + process->simdata->waiting_task = t; + SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond); + + while (1) + { + SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex); + + if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING) + break; + } + + SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond); + process->simdata->waiting_task = NULL; + + /* the task has already finished and the pointer must be null */ + if (t->simdata->sender) + { + t->simdata->sender->simdata->waiting_task = NULL; + } + + /* for this process, don't need to change in get function */ + t->simdata->receiver = NULL; + SIMIX_mutex_unlock(t_simdata->mutex); + + + if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) + { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + t_simdata->using--; + MSG_RETURN(MSG_OK); + } + else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) + { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + t_simdata->using--; + MSG_RETURN(MSG_HOST_FAILURE); + } + else + { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + t_simdata->using--; + MSG_RETURN(MSG_TRANSFER_FAILURE); + } +} + +MSG_error_t +MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout) +{ + return MSG_task_receive_ext(task, alias, timeout, NULL); +} + +MSG_error_t +MSG_task_receive(m_task_t * task, const char* alias) +{ + return MSG_task_receive_with_time_out(task, alias, -1); +} + +int +MSG_task_listen(const char* alias) +{ + CHECK_HOST(); + + DEBUG2("Probing on channel aliased by %s (%s)", alias, MSG_host_self()->name); + + return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias)); +} + +int +MSG_task_listen_from_host(const char* alias, m_host_t host) +{ + + return MSG_mailbox_get_count_host_tasks(MSG_mailbox_get_by_alias(alias),host); + +} + +MSG_error_t +MSG_alias_select_from(const char* alias, double timeout, int* PID) +{ + m_host_t h = NULL; + simdata_host_t h_simdata = NULL; + m_task_t t; + int first_time = 1; + smx_cond_t cond; + msg_mailbox_t mailbox; + + if (PID) + { + *PID = -1; + } + + if(timeout == 0.0) + { + *PID = MSG_task_listen_from(alias); + MSG_RETURN(MSG_OK); + } + else + { + CHECK_HOST(); + h = MSG_host_self(); + h_simdata = h->simdata; + + DEBUG2("Probing on alias %s (%s)", alias, h->name); + + mailbox = MSG_mailbox_get_by_alias(alias); + + while(MSG_mailbox_is_empty(mailbox)) + { + if(timeout > 0) + { + if (!first_time) + { + MSG_RETURN(MSG_OK); + } + } + + SIMIX_mutex_lock(h_simdata->mutex); + + xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias); + + cond = SIMIX_cond_init(); + + MSG_mailbox_set_cond(mailbox, cond); + + if (timeout > 0) + { + SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout); + } + else + { + SIMIX_cond_wait(cond, h_simdata->mutex); + } + + SIMIX_cond_destroy(cond); + SIMIX_mutex_unlock(h_simdata->mutex); + + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) + { + MSG_RETURN(MSG_HOST_FAILURE); + } + + MSG_mailbox_set_cond(mailbox,NULL); + first_time = 0; + } + + if(NULL == (t = MSG_mailbox_get_head(mailbox))) + MSG_RETURN(MSG_OK); + + + if (PID) + { + *PID = MSG_process_get_PID(t->simdata->sender); + } + + MSG_RETURN(MSG_OK); + } +} + +MSG_error_t +MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout) +{ + m_process_t process = MSG_process_self(); + const char* hostname; + simdata_task_t task_simdata = NULL; + m_host_t local_host = NULL; + m_host_t remote_host = NULL; + smx_cond_t cond = NULL; + + /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */ + msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias); + + CHECK_HOST(); + + task_simdata = task->simdata; + task_simdata->sender = process; + task_simdata->source = MSG_process_get_host(process); + + xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!"); + + task_simdata->comm = NULL; + + task_simdata->using++; + local_host = ((simdata_process_t) process->simdata)->m_host; + + /* get the host name containing the mailbox */ + hostname = MSG_mailbox_get_hostname(mailbox); + + remote_host = MSG_get_host_by_name(hostname); + + if(NULL == remote_host) + THROW1(not_found_error,0,"Host %s not fount", hostname); + + + DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox)); + + SIMIX_mutex_lock(remote_host->simdata->mutex); + + /* put the task in the mailbox */ + MSG_mailbox_put(mailbox,task); + + if(NULL != (cond = MSG_mailbox_get_cond(mailbox))) + { + DEBUG0("Somebody is listening. Let's wake him up!"); + SIMIX_cond_signal(cond); + } + + + + SIMIX_mutex_unlock(remote_host->simdata->mutex); + + SIMIX_mutex_lock(task->simdata->mutex); + + process->simdata->waiting_task = task; + + if(timeout > 0) + { + xbt_ex_t e; + double time; + double time_elapsed; + time = SIMIX_get_clock(); + + TRY + { + /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */ + while (1) + { + time_elapsed = SIMIX_get_clock() - time; + SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed); + + if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)) + break; + } + } + CATCH(e) + { + if(e.category==timeout_error) + { + xbt_ex_free(e); + /* verify if the timeout happened and the communication didn't started yet */ + if (task->simdata->comm == NULL) + { + process->simdata->waiting_task = NULL; + + /* remove the task from the mailbox */ + MSG_mailbox_remove(mailbox,task); + + if (task->simdata->receiver) + { + task->simdata->receiver->simdata->waiting_task = NULL; + } + + task->simdata->sender = NULL; + + SIMIX_mutex_unlock(task->simdata->mutex); + MSG_RETURN(MSG_TRANSFER_FAILURE); + } + } + else + { + RETHROW; + } + } + } + else + { + while (1) + { + SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex); + + if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING) + break; + } + } + + DEBUG1("Action terminated %s", task->name); + process->simdata->waiting_task = NULL; + + /* the task has already finished and the pointer must be null */ + if (task->simdata->receiver) + { + task->simdata->receiver->simdata->waiting_task = NULL; + } + + task->simdata->sender = NULL; + SIMIX_mutex_unlock(task->simdata->mutex); + + + if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) + { + MSG_RETURN(MSG_OK); + } + else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) + { + MSG_RETURN(MSG_HOST_FAILURE); + } + else + { + MSG_RETURN(MSG_TRANSFER_FAILURE); + } +} + +MSG_error_t +MSG_task_send(m_task_t task,const char* alias) +{ + return MSG_task_send_with_timeout(task, alias, -1); +} + + +MSG_error_t +MSG_task_send_bounded(m_task_t task, const char* alias, double rate) +{ + task->simdata->rate = rate; + return MSG_task_send(task, alias); +} + +int +MSG_task_listen_from(const char* alias) +{ + m_host_t h = NULL; + m_task_t t; + + CHECK_HOST(); + + h = MSG_host_self(); + + DEBUG2("Probing on alias %s(%s)", alias, h->name); + + if(NULL == (t = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias)))) + return -1; + + return MSG_process_get_PID(t->simdata->sender); +} + diff --git a/src/msg/host.c b/src/msg/host.c index 12b0d5c7e7..98668736fa 100644 --- a/src/msg/host.c +++ b/src/msg/host.c @@ -10,6 +10,7 @@ #include "msg/private.h" #include "xbt/sysdep.h" #include "xbt/log.h" +#include "msg_mailbox.h" /** \defgroup m_host_management Management functions of Hosts * \brief This section describes the host structure of MSG @@ -30,31 +31,44 @@ /********************************* Host **************************************/ m_host_t __MSG_host_create(smx_host_t workstation, void *data) { - const char *name; - simdata_host_t simdata = xbt_new0(s_simdata_host_t, 1); - m_host_t host = xbt_new0(s_m_host_t, 1); - int i; - - name = SIMIX_host_get_name(workstation); - /* Host structure */ - host->name = xbt_strdup(name); - host->simdata = simdata; - host->data = data; - - simdata->smx_host = workstation; - - simdata->mbox = xbt_new0(xbt_fifo_t, msg_global->max_channel); - for (i = 0; i < msg_global->max_channel; i++) - simdata->mbox[i] = xbt_fifo_new(); - - simdata->sleeping = xbt_new0(smx_cond_t, msg_global->max_channel); - simdata->mutex = SIMIX_mutex_init(); - SIMIX_host_set_data(workstation, host); - - /* Update global variables */ - xbt_fifo_unshift(msg_global->host, host); - - return host; + const char *name; + simdata_host_t simdata = xbt_new0(s_simdata_host_t, 1); + m_host_t host = xbt_new0(s_m_host_t, 1); + int i; + + char alias[MAX_ALIAS_NAME +1] = {0}; /* buffer used to build the key of the mailbox */ + msg_mailbox_t mailbox; + + name = SIMIX_host_get_name(workstation); + /* Host structure */ + host->name = xbt_strdup(name); + host->simdata = simdata; + host->data = data; + + simdata->smx_host = workstation; + + simdata->mbox = xbt_new0(xbt_fifo_t, msg_global->max_channel); + + for (i = 0; i < msg_global->max_channel; i++) + { + sprintf(alias,"%s:%d",name,i); + + /* the key of the mailbox (in this case) is build from the name of the host and the channel number */ + mailbox = MSG_mailbox_new(alias); + MSG_mailbox_set_hostname(mailbox,name); + memset(alias,0,MAX_ALIAS_NAME +1); + + simdata->mbox[i] = xbt_fifo_new(); + } + + simdata->sleeping = xbt_new0(smx_cond_t, msg_global->max_channel); + simdata->mutex = SIMIX_mutex_init(); + SIMIX_host_set_data(workstation, host); + + /* Update global variables */ + xbt_fifo_unshift(msg_global->host, host); + + return host; } /** \ingroup m_host_management @@ -126,6 +140,7 @@ void __MSG_host_destroy(m_host_t host) { simdata_host_t simdata = NULL; int i = 0; + char alias[MAX_ALIAS_NAME +1] = {0}; /* buffer used to build the key of the mailbox */ xbt_assert0((host != NULL), "Invalid parameters"); @@ -134,7 +149,14 @@ void __MSG_host_destroy(m_host_t host) simdata = (host)->simdata; for (i = 0; i < msg_global->max_channel; i++) + { + sprintf(alias,"%s:%d",host->name,i); + xbt_dict_remove(MSG_get_mailboxes(),alias); + memset(alias,0,MAX_ALIAS_NAME +1); + xbt_fifo_free(simdata->mbox[i]); + } + free(simdata->mbox); free(simdata->sleeping); SIMIX_mutex_destroy(simdata->mutex); diff --git a/src/simix/smx_global.c b/src/simix/smx_global.c index 192a816082..f853c7d7a7 100644 --- a/src/simix/smx_global.c +++ b/src/simix/smx_global.c @@ -11,6 +11,7 @@ #include "xbt/log.h" #include "xbt/str.h" #include "xbt/ex.h" /* ex_backtrace_display */ +#include "msg/msg_mailbox.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix, "Logging specific to SIMIX (kernel)"); @@ -120,7 +121,10 @@ void SIMIX_global_init(int *argc, char **argv) xbt_swag_new(xbt_swag_offset(proc, process_hookup)); simix_global->current_process = NULL; simix_global->registered_functions = xbt_dict_new(); - + + /* initialization of the mailbox module */ + MSG_mailbox_mod_init(); + simix_global->create_process_function = NULL; simix_global->kill_process_function = NULL; simix_global->cleanup_process_function = SIMIX_process_cleanup; @@ -296,6 +300,10 @@ void SIMIX_clean(void) simix_config_finalize(); free(simix_global); simix_global = NULL; + + /* cleanup all resources in the mailbox module */ + MSG_mailbox_mod_exit(); + surf_exit(); return;