slaveCount = Integer(args[3])
# Creates and sends the tasks
- for i in 0..numberOfTask-1
+ for i in 0..numberOfTask-1
task = MSG::Task.new("Task_"+ i.to_s, taskComputeSize , taskCommunicationSize);
- mailbox = "slave " + (i%slaveCount).to_s
- MSG::info("Master Sending "+ task.name + " to " + mailbox + " with Comput Size " +
- task.compSize.to_s)
-# task.compSize.to_s) # FIXME: This version creates a deadlock. Interesting
- task.send(mailbox)
- MSG::info("Master Done Sending " + task.name + " to " + mailbox)
- end
+ mailbox = "slave " + (i%slaveCount).to_s
+ MSG::info("Master Sending "+ task.name + " to " + mailbox + " with Comput Size " +
+ task.compSize.to_s)
+ task.send(mailbox)
+ MSG::info("Master Done Sending " + task.name + " to " + mailbox)
+ end
# Sending Finalize MSG::Tasks
MSG::info("Master: All tasks have been dispatched. Let's tell everybody the computation is over.")
finalize_task.send(mailbox)
end
MSG::info("Master : Everything's Done")
- Thread.list.each {|t| p t}
end
end
else
MSG.createEnvironment("platform.xml")
MSG.deployApplication("deploy.xml")
- #Thread.list.each {|t| p t}
+
end
MSG.run
-Thread.list.each {|t| p t}
puts "Simulation time : " + MSG.getClock .to_s
-
-# exit()
+MSG.exit
rb_funcall(ruby_process,rb_intern("schedule"),0);
}
-
/***************************************************
Function for Native Process ( Bound ) Management
require 'simgrid_ruby'
require 'thread'
-##########################################################
-# Class Semaphore
-##########################################################
-class MySemaphore
- Thread.abort_on_exception = true
- attr_accessor :permits
-
- def initialize (permits = 0)
- @permits = permits
- end
-
- def acquire(mutex,cv)
-
- raise "Interrupted Thread " if (!Thread.current.alive?)
- mutex.synchronize {
- while @permits <= 0
-
- cv.wait(mutex)
-
- end
- @permits = @permits - 1
- cv.signal
- }
-
- end
-
- def release(mutex,cv)
- mutex.synchronize{
- @permits += 1
- cv.signal
- }
- end
-end
#######################################
-# Another Semaphore
+# Semaphore
#######################################
class Semaphore
@@nextProcessId = 0
# Attributes
- attr_reader :name, :pargs ,:properties, :cv, :mutex # Read only
+ attr_reader :name, :pargs ,:properties # Read only
def initialize(*args)
super(){
raise "Bad number of arguments to create a Ruby process. Expected (name,args,prop) " if args.size < 3
- @cv = ConditionVariable.new
- @mutex = Mutex.new
- @schedBegin = MySemaphore.new(0)
- @schedEnd = MySemaphore.new(0)
+ @schedBegin = Semaphore.new(0)
+ @schedEnd = Semaphore.new(0)
@id = @@nextProcessId
@@nextProcessId +=1
@name = args[0]
end
def start()
- @schedBegin.acquire(@mutex,@cv)
-
+ @schedBegin.acquire
MSG::debug("Let's execute the main() of the Ruby process")
main(@pargs)
-<<<<<<< .mine
-# processExit(self) # FIXME : Fix the simix_context_ruby_stop to stop context process before quitting
- @schedEnd.release(@mutex,@cv)
-
-=======
- @schedEnd.release()
->>>>>>> .r7205
+ @schedEnd.release
MSG::debug("Released my schedEnd, bailing out")
processExit(self) # Exit the Native Process
+
end
def getBind()
end
def unschedule()
- @schedEnd.release(@mutex,@cv)
- @schedBegin.acquire(@mutex,@cv)
+ @schedEnd.release
+ @schedBegin.acquire
end
def schedule()
- @schedBegin.release(@mutex,@cv)
- @schedEnd.acquire(@mutex,@cv)
+ @schedBegin.release
+ @schedEnd.acquire
end
def pause()
def getHost()
processGetHost(self)
end
-
+
# The Rest of Methods !!! To be Continued ... FIXME: what's missing?
end
-
############################################
# Task Extend from the native Class RbTask
############################################
super(self,mailbox)
end
-
def source
super(self)
end
super(t_alias,host)
end
end
-
####################################################
# Host Extend from the native Class RbHost
####################################################
class MSG::Host < MSG::RbHost
-
def getByName(name)
super(name)
end
super()
end
end
-
#########################
# Main chunck
#########################
-MSG.init(ARGV)
+MSG.init(ARGV)
\ No newline at end of file
for (cpt=0;cpt<host_count;cpt++) {
rbHost = (VALUE)((hosts[cpt])->data);
}
-
- //FIXME Before Cleanin' up , we should stop process running to avoir a ThreadError
- /* if (MSG_OK != MSG_clean()){
- rb_raise(rb_eRuntimeError,"MSG_clean() failed");
- }*/
return;
}
+static void msg_clean(VALUE class)
+{
+ if (MSG_OK != MSG_clean())
+ rb_raise(rb_eRuntimeError,"MSG_clean() failed");
+
+}
static void msg_createEnvironment(VALUE class,VALUE plateformFile) {
int type = TYPE(plateformFile);
rb_define_module_function(rb_msg,"info",(rb_meth)msg_info,1);
rb_define_module_function(rb_msg,"debug",(rb_meth)msg_debug,1);
rb_define_module_function(rb_msg,"getClock",(rb_meth)msg_get_clock,0);
+ rb_define_module_function(rb_msg,"exit",(rb_meth)msg_clean,0);
//Associated Process Methods
rb_define_method(rb_msg,"processSuspend",(rb_meth)rb_process_suspend,1);