include MSG
require 'thread'
-$DEBUG = true # This is a Global Variable Useful for Debugging
+$DEBUG = false # This is a Global Variable Useful for Debugging
###########################################################################
# Class Semaphore
###########################################################################
-class Semaphore
- Thread.abort_on_exception = true
- attr_accessor :permits
-
- def initialize ( permits )
- @permits = permits
+
+class Semaphore
+ def initialize(initvalue = 0)
+ @counter = initvalue
+ @waiting_list = []
end
-
- def acquire(mutex,cv)
- raise "Interrupted Thread " if (!Thread.current.alive?)
- mutex.synchronize {
- while @permits <= 0
- cv.wait(mutex)
- end
-
- @permits = @permits - 1
- cv.signal
- }
+ def acquire
+ Thread.critical = true
+ if (@counter -= 1) < 0
+ @waiting_list.push(Thread.current)
+ Thread.stop
+ end
+ self
+ ensure
+ Thread.critical = false
end
-
- def release(mutex,cv)
- mutex.synchronize{
- @permits += 1
- cv.signal
- }
+
+ def release
+ Thread.critical = true
+ begin
+ if (@counter += 1) <= 0
+ t = @waiting_list.shift
+ t.wakeup if t
+ end
+ rescue ThreadError
+ retry
+ end
+ self
+ ensure
+ Thread.critical = false
end
end
########################################################################
-# Class RbProcess
+# Class Process
########################################################################
-class RbProcess < Thread
+class MsgProcess < Thread
@@nextProcessId = 0
+
# Attributes
- attr_accessor :bind, :id, :properties, :name,
- :pargs, :schedBegin, :schedEnd, :mutex, :cv
+ attr_reader :bind, :id, :pargs # Read only
+ attr_accessor :name, :properties # R/W
# Initialize : Used from ApplicationHandler to fill it in
def initialize(*args)
+ @schedBegin = Semaphore.new(0)
+ @schedEnd = Semaphore.new(0)
+ @properties = Hash.new()
+ @id = @@nextProcessId++
argc = args.size
@bind = 0
@name = ""
@pargs = Array.new()
- init_var()
start()
if $DEBUG
puts "Init Default Initializer...Nothing to do...Bye"
puts @name
end
@pargs = Array.new() # No Args[] Passed in Arguments
- @@nextProcessId += 1
- @id = @@nextProcessId
- init_var()
start()
createProcess(self,host)
if $DEBUG
type = args[2].type()
raise "Third argument should be an Array" if type != "Array"
@pargs = args[3]
- @@nextProcessId +=1
- @id = @@nextProcessId
- init_var()
createProcess(self,host)
if $DEBUG
puts "Initilize with 3 args"
- end
-
-# sleep #keep the thread running
+ end
}
else
raise "Bad number of argument: Expecting either 1, 2 or 3, but got "+argc
end
end
-
- # Init_var Called By Initialize
- def init_var()
- @proprieties = Hash.new()
- @mutex = Mutex.new
- @cv = ConditionVariable.new
- # Process Synchronization Tools
- @schedBegin = Semaphore.new(0)
- @schedEnd = Semaphore.new(0)
- end
- #main
- def msg_main(args)
- # To Be Implemented within The Process...
- # The Main Code of The Process to be Executed ...
+ # main
+ def msg_main(args)
+ # To be overriden by childs
+ raise("You must define a msg_main() function in your process, containing the code of this process")
end
- # Start : To keep the Process Alive and waitin' via semaphore
+ # Start : To keep the process alive and waiting via semaphore
def start()
- @schedBegin.acquire(@mutex,@cv)
- #execute The Main Code of The Process ( Example Master ; Slave ...)
+ @schedBegin.acquire
+ # execute the main code of the process
+ debug("Begin execution")
msg_main(@pargs)
- processExit(self) #Exite the Native Process
- @schedEnd.release(@mutex,@cv)
+ processExit(self) # Exit the Native Process
+ @schedEnd.release
end
-# NetxId
- def nextId ()
- @@nextProcessId +=1
- return @@nextProcessId
- end
-
- if $DEBUG
- #Process List
- def processList()
- Thread.list.each {|t| p t}
- end
+ def processList()
+ Thread.list.each {|t| p t}
end
#Get Own ID
return @id
end
- # set Id
- def setID(id)
- @id = id
- end
-
#Get a Process ID
def processID(process)
return process.id
@bind = bind
end
- def unschedule()
-
- @schedEnd.release(@mutex,@cv)
-# info("@schedEnd.release(@mutex,@cv)")
- @schedBegin.acquire(@mutex,@cv)
-# info("@schedBegin.acquire(@mutex,@cv)")
-
+ def unschedule()
+ @schedEnd.release
+ @schedBegin.acquire
end
def schedule()
- @schedBegin.release(@mutex,@cv)
- @schedEnd.acquire(@mutex,@cv)
+ @schedBegin.release
+ @schedEnd.acquire
end
- #C Simualateur Process Equivalent Management
+ #C Simualator Process Equivalent Management
# After Binding Ruby Process to C Process
# pause
# The Rest of Methods !!! To be Continued ...
end
-########################################################################
-# Class ProcessFactory
-########################################################################
-
-class ProcessFactory
-
-# Attributes
- attr_accessor :args, :proprieties, :hostName, :function
-# Initlialize
- def initialize()
-
- @args = Array.new
- @proprieties = Hash.new
- @hostName = nil
- @function = nil
-
- end
-
-# setProcessIdentity
- def setProcessIdentity(hostName,function)
- @hostName = hostName
- @function = function
-
- if !args.empty?
- args.clear
- end
-
- if !proprieties.empty?
- proprieties.clear
- end
-
- end
-
-# RegisterProcess
- def registerProcessArg(arg)
-
- @args.push(arg)
-
- end
-
-# CreateProcess
- def createProcess()
-
- process = rubyNewInstance(@function) # process = rubyNewInstanceArgs(@function,@args) #
- size = @args.size
- for i in 0..size-1
- process.pargs.push(@args[i])
- end
- process.name = @function
- process.id = process.nextId() # This increment Automaticaly The Static ProcessNextId for The Class RbProces
- host = RbHost.getByName(@hostName)
- processCreate(process,host)
- process.properties = @properties
- @proprieties = Hash.new
-
- end
-
-# SetProperty
- def setProperty(id,value)
- @proprieties[id] = value
- end
-end
-
#########################################################################
# Class ApplicationHandler
#########################################################################
class ApplicationHandler
- @processFactory
-# Initialize
def initialize()
- #Nothing todo
- end
-
- # onStartDocument
- def onStartDocument()
-
- @processFactory = ProcessFactory.new
- if ($DEBUG)
- puts "onStartDocument"
- end
-
+ @hostName = nil
+ @function = nil
end
-# onBeginProcess
def onBeginProcess(hostName,function)
+ @args = Array.new
+ @properties = Hash.new
- @processFactory.setProcessIdentity(hostName,function)
-
- if ($DEBUG)
- puts "onBeginProcess"
- end
-
- end
-
-# onProperty
- def onProperty(id,value)
-
- @processFactory.setProperty(id,value)
-
- if ($DEBUG)
- puts "onProperty"
- end
-
- end
-
-# RegisterProcessArg
- def onProcessArg(arg)
-
- @processFactory.registerProcessArg(arg)
+ @hostName = hostName
+ @function = function
- if ($DEBUG)
- puts "onProcessArg"
- end
-
+ debug("onBeginProcess("+hostName+","+function+")")
end
-# OnEndProcess
- def onEndProcess()
-
- @processFactory.createProcess()
-
- if ($DEBUG)
- puts "onEndProcess"
- end
-
- end
-
- # onEndDocument
- def onEndDocument()
-# Erm... Actually nothing to do !!
-
- if($DEBUG)
- puts "onEndDocument"
- end
- end
-
- # End Class
- end
-
-#########################
-# Class RbHost
-#########################
-
-class RbHost < Host
-# Attributes
- attr_accessor :bind, :data
-
-# Initialize
- def initialize()
- super()
- @bind = 0
- @data = nil
+ def onProperty(id,value)
+ @properties[id] = value
end
-end
-
-#########################
-# Class RbTask
-#########################
-class RbTask < Task
- attr_accessor :bind
-
- def initialize(name,comp_size,comm_size)
- super(name,comp_size,comm_size)
+ def onProcessArg(arg)
+ @args.push(arg)
end
+ def onEndProcess()
+ process = rubyNewInstance(@function)
+ size = @args.size
+ for i in 0..size-1
+ process.pargs.push(@args[i])
+ end
+ process.name = @function
+ host = Host.getByName(@hostName)
+ processCreate(process,host)
+ process.properties = @properties
+ end
end
#########################