Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Stuks When a process ( Slave ) Tries to Recieve a Task...
authorcoldpeace <coldpeace@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 22 Feb 2010 13:55:46 +0000 (13:55 +0000)
committercoldpeace <coldpeace@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Mon, 22 Feb 2010 13:55:46 +0000 (13:55 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7115 48e7efb5-ca39-0410-a469-dd3cf9ba447f

12 files changed:
src/bindings/ruby/ApplicationHandler.rb
src/bindings/ruby/Master.rb
src/bindings/ruby/MasterSlave.rb
src/bindings/ruby/RubyProcess.rb
src/bindings/ruby/Semaphore.rb
src/bindings/ruby/Slave.rb
src/bindings/ruby/rb_application_handler.c
src/bindings/ruby/rb_msg.c
src/bindings/ruby/rb_msg.h
src/bindings/ruby/rb_msg_process.c
src/bindings/ruby/rb_msg_task.c
src/bindings/ruby/rb_msg_task.h

index b55dc8e..b335782 100644 (file)
@@ -81,5 +81,4 @@ class ApplicationHandler
  end
  
  #  End Class
- end
-
+ end
\ No newline at end of file
index f2f1913..eafe15d 100644 (file)
@@ -4,18 +4,24 @@ require 'RubyProcess'
 include MSG
 
 class Master < RbProcess  
-  def initialize()
+  def initialize2()
     super()
   end
-
  
+  
+  #for Testing
+  def msg_main2(args)
+   info("Hello From Master")
+  end
+  
+  
   # msg_main : that function that will be executed when Running Simulation
   def msg_main(args) # args is an array Containin' arguments for function master
-    puts "Hey From Ruby...I'm The Master" 
+    info("Hello From Master")
     size = args.size
-    puts "Number of Args for Master = " + size.to_s
+    info ("Number of Args for Master = " + size.to_s)
    for i in 0..size-1
-      puts  args[i]
+      info(args[i])
    end
    
    raise "Master needs 3 arguments" if size < 3 
@@ -25,24 +31,26 @@ class Master < RbProcess
    slaveCount = Integer(args[3]) 
    
    #Creating & Sending Task
-   for i in 0..numberOfTask
+   for i in 0..numberOfTask-1
   
-     task = RbTask.new("Task_" + i.to_s, taskComputeSize , taskCommunicationSize );
+     
+     task = RbTask.new("Task_"+ i.to_s, taskComputeSize , taskCommunicationSize );
      s_alias = "slave>>" + (i%slaveCount).to_s
-     puts "Master Sending "+ RbTask.name(task) + " to " + s_alias + " with Comput Size " + RbTask.compSize(task).to_s 
+     info("Master Sending "+ RbTask.name(task) + " to " + s_alias + " with Comput Size " + RbTask.compSize(task).to_s)
      RbTask.send(task,s_alias)
-     puts "Master Done Sending " +RbTask.name(task) + " to " + s_alias
-  
+     info("Master Done Sending " +RbTask.name(task) + " to " + s_alias)
+#       sameTask = RbTask.receive(s_alias)
+#      puts "Master Receiving its Own Task"
    end
   
    # Sending Finalize Tasks
-   puts "Master: All tasks have been dispatched. Let's tell everybody the computation is over."
+   info ("Master: All tasks have been dispatched. Let's tell everybody the computation is over.")
    for i in 0..slaveCount-1
      s_alias = "slave " + i.to_s
-     puts "Master Sending Finalize to " + s_alias
+     info ("Master Sending Finalize to " + s_alias)
      RbTask.send(RbTask.new("finalize",0,0),s_alias)
    end
-   puts "Master : Everything's Done"
+   info("Master : Everything's Done")
   end  
   
 end
\ No newline at end of file
index f1f358b..307f865 100644 (file)
@@ -5,16 +5,16 @@ require 'Slave'
 
 # include MSG
 
-# raise "Bad Number Of Arguments" if ARGV.length != 2 
-
-# info("Bye")
+raise "Bad Number Of Arguments" if ARGV.length != 2 
 
 MSG.init(ARGV)
+# Thread.list.each {|t| p t}
 raise "Bad Number Of Arguments" if (ARGV.length < 2)
-#  p  Host.number()
 MSG.createEnvironment(ARGV[0])
-#  p  Host.number()
+# Thread.list.each {|t| p t}
 MSG.deployApplication(ARGV[1])
-# p  Host.number()
- MSG.run()
+# Thread.list.each {|t| p t}
+MSG.run()
+# Thread.list.each {|t| p t}
+MSG.getClock()
 # exit()
index 5dce44c..1272a0e 100644 (file)
 require 'msg'
 require 'Semaphore'
 include MSG
-$DEBUG = false  # This is a Global Variable Useful for Debugging
+$DEBUG = true  # This is a Global Variable Useful for Debugging
 
 class RbProcess < Thread 
   @@nextProcessId = 0
 # Attributes
   attr_accessor :bind, :id, :proprieties, :name,
-      :pargs, :schedBegin, :schedEnd
+      :pargs, :schedBegin, :schedEnd, :mutex, :cv
   
 # Initialize : USED in ApplicationHandler to Initialize it
   def initialize(*args)
@@ -106,6 +106,8 @@ class RbProcess < Thread
   # Init_var Called By Initialize  
   def init_var()  
     @proprieties = Hash.new()
+    @mutex = Mutex.new
+    @cv = ConditionVariable.new
     # Process Synchronization Tools
     @schedBegin = Semaphore.new(0)
     @schedEnd = Semaphore.new(0)    
@@ -117,15 +119,13 @@ class RbProcess < Thread
     # The Main Code of The Process to be Executed ...
   end
      
-  
   # Start : To keep the Process Alive and waitin' via semaphore
   def start()
-    
-    @schedBegin.acquire()
+    @schedBegin.acquire(@mutex,@cv)
     #execute The Main Code of The Process ( Example Master ; Slave ...)     
     msg_main(@pargs)
     processExit(self) #Exite the Native Process
-    @schedEnd.release()
+    @schedEnd.release(@mutex,@cv)
   end
     
 #   NetxId
@@ -177,14 +177,17 @@ class RbProcess < Thread
   end
     
   def unschedule() 
-#     Thread.pass
-    @schedEnd.release()
-    @schedBegin.acquire()
+    
+    @schedEnd.release(@mutex,@cv)
+#     info("@schedEnd.release(@mutex,@cv)")
+    @schedBegin.acquire(@mutex,@cv)
+#     info("@schedBegin.acquire(@mutex,@cv)")
+     
   end
   
   def schedule()
-    @schedBegin.release()
-    @schedEnd.release()
+    @schedBegin.release(@mutex,@cv)
+    @schedEnd.acquire(@mutex,@cv)
   end
   
    #C Simualateur Process Equivalent  Management
index d8697c5..e087294 100644 (file)
@@ -7,38 +7,44 @@
 #  * This program is free software; you can redistribute 
 #  * it and/or modify it under the terms of the license 
 #  *(GNU LGPL) which comes with this package. 
+require 'msg'
 require 'thread'
 class Semaphore
   
-  Thread.abort_on_exception = true
-  attr_accessor :permits, :mutex, :cv
+   Thread.abort_on_exception = true
+    attr_accessor :permits
+
    
   def initialize ( permits )
     
-      @permits = permits
-      @mutex = Mutex.new
-      @cv = ConditionVariable.new
-    
+       @permits = permits
+
   end
    
 
-  def acquire()
+  def acquire(mutex,cv)
 
     raise "Interrupted Thread " if (!Thread.current.alive?)
-    @mutex.synchronize {
-    while @permits < 1
-       @cv.wait(@mutex)
+    mutex.synchronize {
+    while @permits <= 0
+       
+       cv.wait(mutex)
+       
     end
+    
     @permits = @permits - 1
+    cv.signal
+    
     }
+    
   end
     
-  def release()
-    @mutex.synchronize{
+  def release(mutex,cv)
+    mutex.synchronize{
       
       @permits += 1
-      @cv.signal
-       
+      cv.signal
+           
       }
   end
   
index 07e848e..e85da1f 100644 (file)
@@ -9,26 +9,34 @@ class Slave < RbProcess
     super()
   end
   
+  #for Testing
+  def msg_main2(args)
+    info("Hello From Slave")
+  end
+  
+  
+  
+  
+  
  # msg_main : that function that will be executed when Running Simulation
   def msg_main(args)
-    puts "Hello From Slave"
+    info("Hello From Slave")
     s_mailbox = "slave>>" + args[0]
+
     while true
-     
-      p "Hellow...................here3 "+s_mailbox
-      task = RbTask.receive(s_mailbox)
-      task_name = RbTask.name(task)
-      if ( task_name == "finalize" )
-       puts "Slave" + s_mailbox + "got finalize msg"
+        
+       info("Ready to Receive Task")
+       task = RbTask.receive(s_mailbox)
+       task_name = RbTask.name(task)
+       info ("Task Received : " + task_name)
+      if (task_name == "finalize")
+       info("Slave" + s_mailbox + "got finalize msg")
        break
       end
-      puts "Slave " + s_mailbox + "Processing" + RbTask.name(task)
-      RbTask.execute(task)
+      info("Slave " + s_mailbox + " ...Processing" + RbTask.name(task))
+       RbTask.execute(task)
     end
-    puts "Slave " + s_mailbox + "I'm Done , See You !!"
+    info("Slave " + s_mailbox +  "I'm Done , See You !!")
     end
     
   end
-  
-
-# slave = Slave.new
\ No newline at end of file
index 5d1f8e3..32be9a5 100644 (file)
@@ -12,7 +12,7 @@
 #include "surf/surfxml_parse.h"
 #include <stdio.h>
 
-// #define DEBUG 
+// #define MY_DEBUG 
 
 static void  r_init()
 {
@@ -30,7 +30,7 @@ static void  application_handler_on_start_document(void)
    //current One
    current = rb_funcall3(rb_const_get(rb_cObject, rb_intern("ApplicationHandler")),  rb_intern("new"), 0, 0);
    rb_funcall(current,rb_intern("onStartDocument"),0);
- #ifdef DEBUG
+ #ifdef MY_DEBUG
    printf ("application_handler_on_start_document ...Done\n" );
  #endif
   
@@ -48,7 +48,7 @@ static void application_handler_on_begin_process(void)
   //r_init();
   VALUE hostName = rb_str_new2(A_surfxml_process_host);
   VALUE function = rb_str_new2(A_surfxml_process_function);
-#ifdef DEBUG
+#ifdef MY_DEBUG
    printf ("On_Begin_Process: %s : %s \n",RSTRING(hostName)->ptr,RSTRING(function)->ptr);
 #endif 
    rb_funcall(current,rb_intern("onBeginProcess"),2,hostName,function); 
@@ -58,7 +58,7 @@ static void  application_handler_on_process_arg(void)
 {
   //r_init();
    VALUE arg = rb_str_new2(A_surfxml_argument_value);
-#ifdef DEBUG
+#ifdef MY_DEBUG
    printf ("On_Process_Args >> Sufxml argument value : %s\n",RSTRING(arg)->ptr);
 #endif
    rb_funcall(current,rb_intern("onProcessArg"),1,arg); 
index dcdc0a1..8638eae 100644 (file)
@@ -1,4 +1,4 @@
-/*
+/* 
  * $Id$
  *
  * Copyright 2010 Martin Quinson, Mehdi Fekari           
@@ -7,7 +7,7 @@
  * This program is free software; you can redistribute 
  * it and/or modify it under the terms of the license 
  *(GNU LGPL) which comes with this package. 
- */
+ */ 
 
 #include "rb_msg.h"
 #include "msg/msg.h"
 #include "rb_msg_process.c"
 #include "rb_application_handler.c"
 
-#define DEBUG
+#define MY_DEBUG
 //Init Msg_Init From Ruby
 static void msg_init(VALUE Class,VALUE args)
-{
-    
+{ 
   char **argv=NULL;    
   const char *tmp;
   int argc,type,i;
@@ -39,7 +38,7 @@ static void msg_init(VALUE Class,VALUE args)
   }
   ptr= RARRAY(args)->ptr;
   argc= RARRAY(args)->len;
-//   Create C Array to Hold Data_Get_Struct 
+//  Create C Array to Hold Data_Get_Struct 
   argc++; 
   argv = xbt_new0(char *, argc);  
   argv[0] = strdup("ruby");
@@ -61,8 +60,8 @@ static void msg_init(VALUE Class,VALUE args)
    free(argv[i]) ;
   
   free (argv);
-  #ifdef DEBUG
-  printf("Msg Init...Done\n");
+  #ifdef MY_DEBUG
+  INFO0("Msg Init...Done");
   #endif
   return;
 }
@@ -71,7 +70,9 @@ static void msg_init(VALUE Class,VALUE args)
 static void msg_run(VALUE class)
 {
   
- printf("msg_run msg_run msg_run msg_run...\n");
+ #ifdef MY_DEBUG
+ INFO0("Start Running...");
+ #endif
  xbt_fifo_item_t item = NULL;
  m_host_t host = NULL;
  VALUE rbHost;  
@@ -88,13 +89,15 @@ static void msg_run(VALUE class)
    xbt_fifo_foreach(msg_global->host, item, host, m_host_t) {
      //rbHost = (VALUE)host->data;// ??!!
       }
-    
-   printf("Let's Cleaaaaaaaaaaaaaaaaaaaaaaaan!!!\n"); 
+ #ifdef MY_DEBUG
+ INFO0("Start Cleaning...");
+ #endif
+   
    if (MSG_OK != MSG_clean()){
      rb_raise(rb_eRuntimeError,"MSG_clean() failed");
    }
     return;
-}
+} 
 
 //Create Environment
 static void msg_createEnvironment(VALUE class,VALUE plateformFile)
@@ -137,22 +140,23 @@ static void msg_deployApplication(VALUE class,VALUE deploymentFile )
        rb_raise(rb_eRuntimeError,"surf_parse() failed");
     surf_parse_close();   
     application_handler_on_end_document();
-    printf("Deploy Application...Done\n");
-   
+    #ifdef MY_DEBUG
+    INFO0("Deploy Application...Done");
+    #endif
 }
-
 // INFO
 static void msg_info(VALUE class,VALUE msg)
 {
  const char *s = RSTRING(msg)->ptr;
- INFO("%s",s);
+ INFO1("%s",s);
 }
 
 // Get Clock
-static VALUE msg_get_clock(VALUE class)
+static void msg_get_clock(VALUE class)
 {
  
-  return DBL2NUM(MSG_get_clock());
+  printf("Simulation time %f\n",MSG_get_clock());
   
 }   
 
@@ -183,15 +187,14 @@ static VALUE msg_new_ruby_instance_with_args(VALUE class,VALUE className,VALUE a
   ruby_init();
   ruby_init_loadpath();
   char * p_className = RSTRING(className)->ptr;
-  return rb_funcall(rb_const_get(rb_cObject, rb_intern(p_className)),rb_intern("new"), 1, args);
-  
-}  
+  return rb_funcall(rb_const_get(rb_cObject, rb_intern(p_className)),rb_intern("new"), 1, args); 
+}
 /*****************************************************************************************************************
 
 Wrapping MSG module and its Class ( Task,Host) & Methods ( Process's method...ect)
 To Ruby 
 
- the part after "Init_" is the name of the C extension specified in extconf.rb , not the name of C source file
+the part after "Init_" is the name of the C extension specified in extconf.rb , not the name of C source file
  
 *****************************************************************************************************************/
 void Init_msg()
@@ -221,7 +224,7 @@ void Init_msg()
    //Classes       
    rb_task = rb_define_class_under(rb_msg,"Task",rb_cObject);
    rb_host = rb_define_class_under(rb_msg,"Host",rb_cObject);
-     
+    
    //Task Methods    
    rb_define_module_function(rb_task,"new",task_new,3);
    rb_define_module_function(rb_task,"compSize",task_comp,1);
@@ -234,8 +237,10 @@ void Init_msg()
    rb_define_module_function(rb_task,"source",task_source,1);
    rb_define_module_function(rb_task,"listen",task_listen,2);
    rb_define_module_function(rb_task,"listenFromHost",task_listen_host,3);
-    
-   //Host Methods
+   rb_define_module_function(rb_task,"put",task_put,2);
+   rb_define_module_function(rb_task,"get",task_get,0);
+   
+   //Host Methods  
    rb_define_module_function(rb_host,"getByName",host_get_by_name,1);
    rb_define_module_function(rb_host,"name",host_name,1);
    rb_define_module_function(rb_host,"speed",host_speed,1);
index c14656d..d6a4ebc 100644 (file)
@@ -4,6 +4,8 @@
 #include "msg/msg.h"
 #include <ruby.h>
 
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
+                             "Messages specific for this msg example");
 
 // #include "msg/private.h"
 // #include "simix/private.h"
@@ -34,7 +36,7 @@ static void msg_deployApplication(VALUE Class,VALUE deploymntFile);
 static void msg_info(VALUE Class,VALUE msg);
 
 //get Clock  
-static VALUE msg_get_clock(VALUE Class);
+static void msg_get_clock(VALUE Class);
 
 //pajeOutput
 static void msg_paje_output(VALUE Class,VALUE pajeFile);
index bb2b81e..96280ee 100644 (file)
@@ -13,7 +13,6 @@
 
 #define DEBUG
 // Init Ruby
-
 static void initRuby()
 {
   
@@ -82,7 +81,6 @@ static VALUE process_isAlive(VALUE ruby_process)
 }
 
 // Kill Process
-
 static void process_kill(VALUE ruby_process)
 {
   
@@ -92,7 +90,6 @@ static void process_kill(VALUE ruby_process)
 }
 
 // join Process
-
 static void process_join( VALUE ruby_process )
 {
   
@@ -102,7 +99,6 @@ static void process_join( VALUE ruby_process )
 }
 
 // unschedule Process
-
 static void process_unschedule( VALUE ruby_process )
 {
  
@@ -112,7 +108,6 @@ static void process_unschedule( VALUE ruby_process )
 }
 
 // schedule Process
-
 static void process_schedule( VALUE ruby_process )
 {
    
index 8043af4..9334a12 100644 (file)
@@ -37,25 +37,18 @@ static VALUE task_comp(VALUE class,VALUE task)
   return rb_float_new(size);
 }
 
-
 //Get Name
-
 static VALUE task_name(VALUE class,VALUE task)
 {
   
   // Wrap Ruby Value to m_task_t struct
-  
   m_task_t tk;
   Data_Get_Struct(task, m_task_t, tk);
   return rb_str_new2(MSG_task_get_name(tk));
    
 }
 
-
-
-
 // Execute Task
-
 static VALUE task_execute(VALUE class,VALUE task)
 {
   
@@ -64,20 +57,16 @@ static VALUE task_execute(VALUE class,VALUE task)
   Data_Get_Struct(task, m_task_t, tk);
   return INT2NUM(MSG_task_execute(tk));
   
-  
 }
 
 // Sending Task
-
 static void task_send(VALUE class,VALUE task,VALUE mailbox)
 {
   
-    // Wrap Ruby Value to m_task_t struct
-  
+  // Wrap Ruby Value to m_task_t struct
   m_task_t tk;
   Data_Get_Struct(task, m_task_t, tk);
   int res = MSG_task_send(tk,RSTRING(mailbox)->ptr);
   if(res != MSG_OK)
    rb_raise(rb_eRuntimeError,"MSG_task_send failed");
   
@@ -92,22 +81,22 @@ static void task_send(VALUE class,VALUE task,VALUE mailbox)
 
 static VALUE task_receive(VALUE class,VALUE mailbox)
 {
-  m_task_t tk; 
-  MSG_task_receive(tk,RSTRING(mailbox)->ptr); 
-  return Data_Wrap_Struct(class, 0, task_free, tk);
+  // Task
+  m_task_t task = NULL;
+  MSG_task_receive(&task,RSTRING(mailbox)->ptr);
+  return Data_Wrap_Struct(class, 0, task_free, task);
 }
 
 // Recieve Task 2
 // Not Appreciated 
-static VALUE task_receive2(VALUE class,VALUE task,VALUE mailbox)
+static void task_receive2(VALUE class,VALUE task,VALUE mailbox)
 {
   m_task_t tk;
   Data_Get_Struct(task, m_task_t, tk);
-  return INT2NUM(MSG_task_receive(tk,RSTRING(mailbox)->ptr)); 
+  MSG_task_receive(&tk,RSTRING(mailbox)->ptr);
   
 }
 
-
 // It Return a Native Process ( m_process_t )
 static VALUE task_sender(VALUE class,VALUE task)
 {
@@ -170,6 +159,29 @@ static VALUE task_listen_host(VALUE class,VALUE task,VALUE alias,VALUE host)
  
  return Qfalse;
  
+}
+
+
+// Put
+static void task_put(VALUE class,VALUE task,VALUE host)
+{
+  
+ m_task_t tk;
+ m_host_t ht;
+ Data_Get_Struct(task,m_task_t,tk);
+ Data_Get_Struct(host,m_host_t,ht);
+ MSG_task_put(tk,ht,PORT_22);  //Channel set to 0
  
+}
+
+//get 
+static VALUE task_get(VALUE class)
+{
   
-}
\ No newline at end of file
+ m_task_t task = NULL;
+ int res = MSG_task_get(&task,PORT_22); // Channel set to 0
+ xbt_assert0(res == MSG_OK, "MSG_task_get failed");
+ return Data_Wrap_Struct(class, 0, task_free, task);
+}
index 9f82517..131196d 100644 (file)
@@ -9,6 +9,11 @@
 #include "xbt/log.h"
 #include "xbt/asserts.h"
 
+typedef enum {
+  PORT_22 = 0,
+  MAX_CHANNEL
+} channel_t;
+
 // Free Method
 static void task_free(m_task_t tk);
 
@@ -31,7 +36,7 @@ static void task_send(VALUE Class,VALUE task,VALUE mailbox);
 static VALUE task_receive(VALUE Class,VALUE mailbox);
 
 // Recieve Task 2 <<  Not Appreciated 
-static VALUE task_receive2(VALUE Class,VALUE task,VALUE mailbox);
+static void task_receive2(VALUE Class,VALUE task,VALUE mailbox);
 
 // Get Sender
 static VALUE task_sender(VALUE Class,VALUE task);
@@ -45,4 +50,9 @@ static VALUE task_listen(VALUE Class,VALUE task,VALUE alias);
 //Listen from Host
 static VALUE task_listen_host(VALUE Class,VALUE task,VALUE alias,VALUE host);
 
+// put
+static void task_put(VALUE Class,VALUE task,VALUE host);
+
+//get
+static VALUE task_get(VALUE Class);
 #endif
\ No newline at end of file