+++ /dev/null
-#
-# Copyright 2010. The SimGrid Team. All right reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-require 'simgrid'
-include MSG
-$DEBUG = false
# Class Master
#################################################
-class Master < RbProcess
+class Master < MsgProcess
def initialize2()
super()
end
for i in 0..numberOfTask-1
- task = RbTask.new("Task_"+ i.to_s, taskComputeSize , taskCommunicationSize );
+ task = Task.new("Task_"+ i.to_s, taskComputeSize , taskCommunicationSize );
s_alias = "slave>>" + (i%slaveCount).to_s
- info("Master Sending "+ RbTask.name(task) + " to " + s_alias + " with Comput Size " + RbTask.compSize(task).to_s)
- RbTask.send(task,s_alias)
- info("Master Done Sending " +RbTask.name(task) + " to " + s_alias)
-# sameTask = RbTask.receive(s_alias)
+ info("Master Sending "+ Task.name(task) + " to " + s_alias + " with Comput Size " + Task.compSize(task).to_s)
+ Task.send(task,s_alias)
+ info("Master Done Sending " +Task.name(task) + " to " + s_alias)
+# sameTask = Task.receive(s_alias)
# puts "Master Receiving its Own Task"
end
for i in 0..slaveCount-1
s_alias = "slave " + i.to_s
info ("Master Sending Finalize to " + s_alias)
- RbTask.send(RbTask.new("finalize",0,0),s_alias)
+ Task.send(Task.new("finalize",0,0),s_alias)
end
info("Master : Everything's Done")
end
#################################################
# Class Slave
#################################################
-class Slave < RbProcess
+class Slave < MsgProcess
def initialize()
super()
while true
info("Ready to Receive Task")
- task = RbTask.receive(s_mailbox)
- task_name = RbTask.name(task)
+ task = Task.receive(s_mailbox)
+ task_name = Task.name(task)
info ("Task Received : " + task.name)
if (task_name == "finalize")
info("Slave" + s_mailbox + "got finalize msg")
break
end
- info("Slave " + s_mailbox + " ...Processing" + RbTask.name(task))
- RbTask.execute(task)
+ info("Slave " + s_mailbox + " ...Processing" + Task.name(task))
+ Task.execute(task)
end
info("Slave " + s_mailbox + "I'm Done , See You !!")
end
#include "bindings/ruby_bindings.h"
#include "surf/surfxml_parse.h"
-static VALUE application_handler_class; // The Current Instance of ApplicationHandler Class ,is it the same as Current Thread ??!!
-
-
-// #define MY_DEBUG
-
-static void r_init() {
- ruby_init();
- ruby_init_loadpath();
- rb_require("ApplicationHandler.rb");
-
-}
+static VALUE application_handler_class; // The Current Instance of ApplicationHandler Class
void rb_application_handler_on_start_document(void) {
-
- r_init();
- application_handler_class = rb_funcall3(rb_const_get(rb_cObject, rb_intern("ApplicationHandler")), rb_intern("new"), 0, 0);
- rb_funcall(application_handler_class,rb_intern("onStartDocument"),0);
- #ifdef MY_DEBUG
- printf ("application_handler_on_start_document ...Done\n" );
- #endif
-
+ application_handler_class = rb_funcall3(rb_const_get(rb_cObject, rb_intern("ApplicationHandler")), rb_intern("new"), 0, 0);
}
void rb_application_handler_on_end_document(void) {
- rb_funcall(application_handler_class,rb_intern("onEndDocument"),0);
+ application_handler_class = Qnil;
}
void rb_application_handler_on_begin_process(void) {
VALUE hostName = rb_str_new2(A_surfxml_process_host);
VALUE function = rb_str_new2(A_surfxml_process_function);
-#ifdef MY_DEBUG
- printf ("On_Begin_Process: %s : %s \n",RSTRING(hostName)->ptr,RSTRING(function)->ptr);
-#endif
- rb_funcall(application_handler_class,rb_intern("onBeginProcess"),2,hostName,function);
+ rb_funcall(application_handler_class,rb_intern("onBeginProcess"),2,hostName,function);
}
void rb_application_handler_on_process_arg(void) {
- VALUE arg = rb_str_new2(A_surfxml_argument_value);
-#ifdef MY_DEBUG
- printf ("On_Process_Args >> Sufxml argument value : %s\n",RSTRING(arg)->ptr);
-#endif
- rb_funcall(application_handler_class,rb_intern("onProcessArg"),1,arg);
+ VALUE arg = rb_str_new2(A_surfxml_argument_value);
+ rb_funcall(application_handler_class,rb_intern("onProcessArg"),1,arg);
}
void rb_application_handler_on_property(void) {
- VALUE id = rb_str_new2(A_surfxml_prop_id);
- VALUE val = rb_str_new2(A_surfxml_prop_value);
- rb_funcall(application_handler_class,rb_intern("onProperty"),2,id,val);
+ VALUE id = rb_str_new2(A_surfxml_prop_id);
+ VALUE val = rb_str_new2(A_surfxml_prop_value);
+ rb_funcall(application_handler_class,rb_intern("onProperty"),2,id,val);
}
void rb_application_handler_on_end_process(void) {
- rb_funcall(application_handler_class,rb_intern("onEndProcess"),0);
+ rb_funcall(application_handler_class,rb_intern("onEndProcess"),0);
}
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ruby,bindings,"Ruby Bindings");
-// Init Ruby
-void initRuby(void) {
- ruby_init();
- ruby_init_loadpath();
-// KILLME rb_require("RubyProcess.rb");
-}
-
-
/*
* Functions for Ruby Process Management (Up Calls)
*/
-
// get Ruby Process Name
VALUE rb_process_getName(VALUE ruby_process) {
- initRuby();
- // instance = rb_funcall3(rb_const_get(rb_cObject, rb_intern("RbProcess")), rb_intern("new"), 0, 0);
return rb_funcall(ruby_process,rb_intern("getName"),0);
-
}
// Get Process ID
VALUE rb_process_getID(VALUE ruby_process) {
- initRuby();
return rb_funcall(ruby_process,rb_intern("getID"),0);
}
// Get Bind
VALUE rb_process_getBind(VALUE ruby_process) {
- initRuby();
return rb_funcall(ruby_process,rb_intern("getBind"),0);
}
// Set Bind
void rb_process_setBind(VALUE ruby_process,long bind) {
- initRuby();
VALUE r_bind = LONG2FIX(bind);
rb_funcall(ruby_process,rb_intern("setBind"),1,r_bind);
}
// isAlive
VALUE rb_process_isAlive(VALUE ruby_process) {
- initRuby();
return rb_funcall(ruby_process,rb_intern("alive?"),0);
}
// Kill Process
void rb_process_kill_up(VALUE ruby_process) {
- initRuby();
rb_funcall(ruby_process,rb_intern("kill"),0);
}
// join Process
void rb_process_join( VALUE ruby_process ) {
- initRuby();
rb_funcall(ruby_process,rb_intern("join"),0);
}
// unschedule Process
void rb_process_unschedule( VALUE ruby_process ) {
- initRuby();
rb_funcall(ruby_process,rb_intern("unschedule"),0);
}
// schedule Process
void rb_process_schedule( VALUE ruby_process ) {
- initRuby();
rb_funcall(ruby_process,rb_intern("schedule"),0);
}
}
process->simdata->PID = msg_global->PID++; // msg_global ??
- DEBUG7("fill in process %s/%s (pid=%d) %p (sd=%p , host=%p, host->sd=%p)\n",
+ DEBUG7("fill in process %s/%s (pid=%d) %p (sd=%p , host=%p, host->sd=%p)",
process->name , process->simdata->m_host->name,process->simdata->PID,
process,process->simdata, process->simdata->m_host,
process->simdata->m_host->simdata);
+ /* FIXME: that's mainly for debugging. We could only allocate this if XBT_LOG_ISENABLED(ruby,debug) is true since I guess this leaks */
+ char **argv=xbt_new(char*,2);
+ argv[0] = bprintf("%s@%s",process->name,process->simdata->m_host->simdata->smx_host->name);
+ argv[1] = NULL;
process->simdata->s_process =
SIMIX_process_create(process->name,
(xbt_main_func_t)ruby_process,
(void *) process,
process->simdata->m_host->simdata->smx_host->name,
- 0,NULL,NULL);
+ 1,argv,NULL);
- DEBUG1("context created (s_process=%p)\n",process->simdata->s_process);
+ DEBUG1("context created (s_process=%p)",process->simdata->s_process);
if (SIMIX_process_self()) { // SomeOne Created Me !!
process->simdata->PPID = MSG_process_get_PID(SIMIX_process_self()->data);
// Wrap Ruby Value to m_task_t struct
m_task_t tk;
Data_Get_Struct(task, s_m_task_t, tk);
- xbt_backtrace_display_current();
int res = MSG_task_send(tk,RSTRING(mailbox)->ptr);
if(res != MSG_OK)
rb_raise(rb_eRuntimeError,"MSG_task_send failed");
VALUE rb_task_receive(VALUE class, VALUE mailbox) {
// Task
m_task_t task = NULL;
- INFO1("Receiving a task on mailbox %s",RSTRING(mailbox)->ptr);
- xbt_backtrace_display_current();
+ INFO1("Receiving a task on mailbox '%s'",RSTRING(mailbox)->ptr);
MSG_task_receive(&task,RSTRING(mailbox)->ptr);
INFO2("XXXXXXXXReceived a task %p %s",task,task->name);
return Data_Wrap_Struct(class, 0, rb_task_free, task);
include MSG
require 'thread'
-$DEBUG = true # This is a Global Variable Useful for Debugging
+$DEBUG = false # This is a Global Variable Useful for Debugging
###########################################################################
# Class Semaphore
###########################################################################
-class Semaphore
- Thread.abort_on_exception = true
- attr_accessor :permits
-
- def initialize ( permits )
- @permits = permits
+
+class Semaphore
+ def initialize(initvalue = 0)
+ @counter = initvalue
+ @waiting_list = []
end
-
- def acquire(mutex,cv)
- raise "Interrupted Thread " if (!Thread.current.alive?)
- mutex.synchronize {
- while @permits <= 0
- cv.wait(mutex)
- end
-
- @permits = @permits - 1
- cv.signal
- }
+ def acquire
+ Thread.critical = true
+ if (@counter -= 1) < 0
+ @waiting_list.push(Thread.current)
+ Thread.stop
+ end
+ self
+ ensure
+ Thread.critical = false
end
-
- def release(mutex,cv)
- mutex.synchronize{
- @permits += 1
- cv.signal
- }
+
+ def release
+ Thread.critical = true
+ begin
+ if (@counter += 1) <= 0
+ t = @waiting_list.shift
+ t.wakeup if t
+ end
+ rescue ThreadError
+ retry
+ end
+ self
+ ensure
+ Thread.critical = false
end
end
########################################################################
-# Class RbProcess
+# Class Process
########################################################################
-class RbProcess < Thread
+class MsgProcess < Thread
@@nextProcessId = 0
+
# Attributes
- attr_accessor :bind, :id, :properties, :name,
- :pargs, :schedBegin, :schedEnd, :mutex, :cv
+ attr_reader :bind, :id, :pargs # Read only
+ attr_accessor :name, :properties # R/W
# Initialize : Used from ApplicationHandler to fill it in
def initialize(*args)
+ @schedBegin = Semaphore.new(0)
+ @schedEnd = Semaphore.new(0)
+ @properties = Hash.new()
+ @id = @@nextProcessId++
argc = args.size
@bind = 0
@name = ""
@pargs = Array.new()
- init_var()
start()
if $DEBUG
puts "Init Default Initializer...Nothing to do...Bye"
puts @name
end
@pargs = Array.new() # No Args[] Passed in Arguments
- @@nextProcessId += 1
- @id = @@nextProcessId
- init_var()
start()
createProcess(self,host)
if $DEBUG
type = args[2].type()
raise "Third argument should be an Array" if type != "Array"
@pargs = args[3]
- @@nextProcessId +=1
- @id = @@nextProcessId
- init_var()
createProcess(self,host)
if $DEBUG
puts "Initilize with 3 args"
- end
-
-# sleep #keep the thread running
+ end
}
else
raise "Bad number of argument: Expecting either 1, 2 or 3, but got "+argc
end
end
-
- # Init_var Called By Initialize
- def init_var()
- @proprieties = Hash.new()
- @mutex = Mutex.new
- @cv = ConditionVariable.new
- # Process Synchronization Tools
- @schedBegin = Semaphore.new(0)
- @schedEnd = Semaphore.new(0)
- end
- #main
- def msg_main(args)
- # To Be Implemented within The Process...
- # The Main Code of The Process to be Executed ...
+ # main
+ def msg_main(args)
+ # To be overriden by childs
+ raise("You must define a msg_main() function in your process, containing the code of this process")
end
- # Start : To keep the Process Alive and waitin' via semaphore
+ # Start : To keep the process alive and waiting via semaphore
def start()
- @schedBegin.acquire(@mutex,@cv)
- #execute The Main Code of The Process ( Example Master ; Slave ...)
+ @schedBegin.acquire
+ # execute the main code of the process
+ debug("Begin execution")
msg_main(@pargs)
- processExit(self) #Exite the Native Process
- @schedEnd.release(@mutex,@cv)
+ processExit(self) # Exit the Native Process
+ @schedEnd.release
end
-# NetxId
- def nextId ()
- @@nextProcessId +=1
- return @@nextProcessId
- end
-
- if $DEBUG
- #Process List
- def processList()
- Thread.list.each {|t| p t}
- end
+ def processList()
+ Thread.list.each {|t| p t}
end
#Get Own ID
return @id
end
- # set Id
- def setID(id)
- @id = id
- end
-
#Get a Process ID
def processID(process)
return process.id
@bind = bind
end
- def unschedule()
-
- @schedEnd.release(@mutex,@cv)
-# info("@schedEnd.release(@mutex,@cv)")
- @schedBegin.acquire(@mutex,@cv)
-# info("@schedBegin.acquire(@mutex,@cv)")
-
+ def unschedule()
+ @schedEnd.release
+ @schedBegin.acquire
end
def schedule()
- @schedBegin.release(@mutex,@cv)
- @schedEnd.acquire(@mutex,@cv)
+ @schedBegin.release
+ @schedEnd.acquire
end
- #C Simualateur Process Equivalent Management
+ #C Simualator Process Equivalent Management
# After Binding Ruby Process to C Process
# pause
class ProcessFactory
# Attributes
- attr_accessor :args, :proprieties, :hostName, :function
-# Initlialize
+ attr_accessor :args, :properties, :hostName, :function
+# Initialize
def initialize()
@args = Array.new
- @proprieties = Hash.new
+ @properties = Hash.new
@hostName = nil
@function = nil
@function = function
if !args.empty?
- args.clear
+ args.clear
end
- if !proprieties.empty?
- proprieties.clear
+ if !properties.empty?
+ properties.clear
end
end
-# RegisterProcess
def registerProcessArg(arg)
-
@args.push(arg)
-
end
# CreateProcess
def createProcess()
-
- process = rubyNewInstance(@function) # process = rubyNewInstanceArgs(@function,@args) #
+ process = rubyNewInstance(@function)
size = @args.size
for i in 0..size-1
process.pargs.push(@args[i])
end
process.name = @function
- process.id = process.nextId() # This increment Automaticaly The Static ProcessNextId for The Class RbProces
- host = RbHost.getByName(@hostName)
+ host = Host.getByName(@hostName)
processCreate(process,host)
process.properties = @properties
- @proprieties = Hash.new
+ @properties = Hash.new
end
# SetProperty
def setProperty(id,value)
- @proprieties[id] = value
+ @properties[id] = value
end
end
#########################################################################
class ApplicationHandler
@processFactory
-# Initialize
- def initialize()
- #Nothing todo
- end
- # onStartDocument
- def onStartDocument()
-
+ def initialize()
@processFactory = ProcessFactory.new
- if ($DEBUG)
- puts "onStartDocument"
- end
-
end
-# onBeginProcess
def onBeginProcess(hostName,function)
-
@processFactory.setProcessIdentity(hostName,function)
-
- if ($DEBUG)
- puts "onBeginProcess"
- end
-
+ debug("onBeginProcess("+hostName+","+function+")")
end
-# onProperty
- def onProperty(id,value)
-
+ def onProperty(id,value)
@processFactory.setProperty(id,value)
-
- if ($DEBUG)
- puts "onProperty"
- end
-
end
-# RegisterProcessArg
- def onProcessArg(arg)
-
+ def onProcessArg(arg)
@processFactory.registerProcessArg(arg)
-
- if ($DEBUG)
- puts "onProcessArg"
- end
-
end
-# OnEndProcess
- def onEndProcess()
-
+ def onEndProcess()
@processFactory.createProcess()
-
- if ($DEBUG)
- puts "onEndProcess"
- end
-
- end
-
- # onEndDocument
- def onEndDocument()
-# Erm... Actually nothing to do !!
-
- if($DEBUG)
- puts "onEndDocument"
- end
- end
-
- # End Class
- end
-
-#########################
-# Class RbHost
-#########################
-
-class RbHost < Host
-# Attributes
- attr_accessor :bind, :data
-
-# Initialize
- def initialize()
- super()
- @bind = 0
- @data = nil
+ debug("onEndProcess")
end
-
-end
-
-#########################
-# Class RbTask
-#########################
-class RbTask < Task
- attr_accessor :bind
-
- def initialize(name,comp_size,comm_size)
- super(name,comp_size,comm_size)
- end
-
end
#########################
VALUE *ptr ;
// Testing The Args Type
type = TYPE(args);
- if (type != T_ARRAY )
- {
- rb_raise(rb_eRuntimeError,"Argh!!! Bad Arguments to msg_init");
+ if (type != T_ARRAY ) {
+ rb_raise(rb_eRuntimeError,"Bad arguments to msg_init (expecting an array)");
return;
}
ptr= RARRAY(args)->ptr;
argc++;
argv = xbt_new0(char *, argc);
argv[0] = strdup("ruby");
- for (i=0;i<argc-1;i++)
- {
+ for (i=0;i<argc-1;i++) {
VALUE value = ptr[i];
type = TYPE(value);
// if (type == T_STRING)
}
// Calling C Msg_Init Method
MSG_global_init(&argc,argv);
- MSG_set_channel_number(10); // Okey !! Okey !! This Must Be Fixed Dynamiclly , But Later ;)
- SIMIX_context_select_factory("ruby");
- // Free Stuffs
+ // Cleanups
for (i=0;i<argc;i++)
free(argv[i]) ;
-
free (argv);
- DEBUG0("Msg Init...Done");
- return;
}
//Init Msg_Run From Ruby
static void msg_run(VALUE class) {
const char *s = RSTRING(msg)->ptr;
INFO1("%s",s);
}
+static void msg_debug(VALUE class,VALUE msg) {
+ const char *s = RSTRING(msg)->ptr;
+ DEBUG1("%s",s);
+}
// Get Clock
static void msg_get_clock(VALUE class) {
}
+extern const char*xbt_ctx_factory_to_use; /*Hack: let msg load directly the right factory */
typedef VALUE(*rb_meth)(ANYARGS);
void Init_simgrid_ruby() {
+ xbt_ctx_factory_to_use = "ruby";
+
// Modules
rb_msg = rb_define_module("MSG");
//Associated Environment Methods!
rb_define_method(rb_msg,"createEnvironment",(rb_meth)msg_createEnvironment,1);
rb_define_method(rb_msg,"deployApplication",(rb_meth)msg_deployApplication,1);
rb_define_method(rb_msg,"info",(rb_meth)msg_info,1);
+ rb_define_method(rb_msg,"debug",(rb_meth)msg_debug,1);
rb_define_method(rb_msg,"getClock",(rb_meth)msg_get_clock,0);
rb_define_method(rb_msg,"rubyNewInstance",(rb_meth)msg_new_ruby_instance,1);
rb_define_method(rb_msg,"rubyNewInstanceArgs",(rb_meth)msg_new_ruby_instance_with_args,2);
xbt_ex_t e;
MSG_error_t ret = MSG_OK;
smx_comm_t comm;
+
+ /* We no longer support getting a task from a specific host */
+ if (host) THROW_UNIMPLEMENTED;
+
CHECK_HOST();
memset(&comm,0,sizeof(comm));
+
/* Kept for compatibility with older implementation */
xbt_assert1(!MSG_mailbox_get_cond(mailbox),
"A process is already blocked on this channel %s",
xbt_assert0(task, "Null pointer for the task storage");
if (*task)
- CRITICAL0
- ("MSG_task_get() was asked to write in a non empty task struct.");
-
- /* We no longer support getting a task from a specific host */
- if(host)
- THROW_UNIMPLEMENTED;
+ CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
/* Try to receive it by calling SIMIX network layer */
TRY{
SIMIX_network_recv(mailbox->rdv, timeout, task, NULL, &comm);
+ //INFO2("Got task %s from %s",(*task)->name,mailbox->alias);
(*task)->simdata->refcount--;
}
CATCH(e){
(*factory)->name = "smx_ruby_context_factory";
ruby_init();
ruby_init_loadpath();
- DEBUG0("SIMIX_ctx_ruby_factory_init...Done");
}
static int smx_ctx_ruby_factory_finalize(smx_context_factory_t *factory) {
smx_ctx_ruby_t context = xbt_new0(s_smx_ctx_ruby_t,1);
- /*if the user provided a function for the process , then use it
- Otherwise it's the context for maestro */
+ /* if the user provided a function for the process , then use it
+ Otherwise it's the context for maestro */
if (code) {
context->cleanup_func = cleanup_func;
context->cleanup_arg = cleanup_arg;
context->process = (VALUE)code;
+ context->argc=argc;
+ context->argv=argv;
- DEBUG0("smx_ctx_ruby_create_context...Done");
+ DEBUG1("smx_ctx_ruby_create_context(%s)...Done",argv[0]);
}
return (smx_context_t) context;
}
free(context);
context = NULL;
}*/
+ DEBUG1("smx_ctx_ruby_free_context(%s)",context->argv[0]);
free (context);
context = NULL;
- DEBUG0("smx_ctx_ruby_free_context...Done");
}
static void smx_ctx_ruby_start(smx_context_t context) {
- /* Already Done .. Since a Ruby Process is launched within initialization
- We Start it Within the Initializer ... We Use the Semaphore To Keep
- The Thread Alive Waitin' For Mutex Signal to Execute The Main*/
-
+ DEBUG1("smx_ctx_ruby_start(%s) (nothing to do)",context->argv[0]);
+ /* Already Done .. Since a Ruby process is launched within initialization
+ We Start it Within the Initializer ... We Use the Semaphore To Keep
+ the thread alive waiting for mutex signal to execute the main*/
}
static void smx_ctx_ruby_stop(smx_context_t context) {
+ DEBUG1("smx_ctx_ruby_stop(%s)",context->argv[0]);
VALUE process = Qnil;
smx_ctx_ruby_t ctx_ruby,current;
process = ctx_ruby->process;
ctx_ruby->process = Qnil;
}
- DEBUG0("smx_ctx_ruby_stop...Done\n");
+ DEBUG1("smx_ctx_ruby_stop(%s)...Done",context->argv[0]);
}
static void smx_ctx_ruby_suspend(smx_context_t context) {
- if (context) {
+ DEBUG1("smx_ctx_ruby_suspend(%s)",context->argv[0]);
smx_ctx_ruby_t ctx_ruby = (smx_ctx_ruby_t) context;
if (ctx_ruby->process)
rb_process_unschedule(ctx_ruby->process);
- DEBUG0("smx_ctx_ruby_unschedule...Done");
- } else
- rb_raise(rb_eRuntimeError,"smx_ctx_ruby_suspend failed");
}
static void smx_ctx_ruby_resume(smx_context_t old_context,smx_context_t new_context) {
+ DEBUG2("smx_ctx_ruby_resume(%s,%s)",
+ (old_context->argc?old_context->argv[0]:"maestro"),
+ (new_context->argc?new_context->argv[0]:"maestro"));
+
smx_ctx_ruby_t ctx_ruby = (smx_ctx_ruby_t) new_context;
rb_process_schedule(ctx_ruby->process);
- DEBUG0("smx_ctx_ruby_schedule...Done");
+// DEBUG1("smx_ctx_ruby_schedule(%s)...Done",
+// (new_context->argc?new_context->argv[0]:"maestro"));
}