A Ruby Thread Pool
Submitted by mmorsi on Fri, 2010-09-10 06:32
Recently I implemented a simple thread pool in Ruby for a side project I'm working on and figured I'd share.
AFAIK its pretty solid, it does what it needs to do without any deadlocks or race conditions (feel free to correct me if you find one), though it probably could use a little simplification and 'Rubyification'. As an added bonus I've included an optional timeout feature, if a job being run by a thread in the pool takes longer than the specified time, it is killed and that thread is then free to pick up another task. Enjoy!
require 'thread' # Work item to be executed in thread pool class ThreadPoolJob attr_accessor :handler attr_accessor :params def initialize(*params, &block) @params = params @handler = block end end # Launches a specified number of threads on instantiation, # assigning work to them as it arrives class ThreadPool # Encapsulate each thread pool thread in object class ThreadPoolJobRunner attr_accessor :time_started def initialize(thread_pool) @thread_pool = thread_pool @timeout_lock = Mutex.new @thread_lock = Mutex.new end def run @thread_lock.synchronize { @thread = Thread.new { until @thread_pool.terminate @timeout_lock.synchronize { @time_started = nil } work = @thread_pool.next_job @timeout_lock.synchronize { @time_started = Time.now } work.handler.call *work.params unless work.nil? end } } end def check_timeout(timeout) @timeout_lock.synchronize { if !@time_started.nil? && Time.now - @time_started > timeout stop run end } end def stop @thread_lock.synchronize { if @thread.alive? @thread.kill @thread.join end } end end # Create a thread pool with a specified number of threads def initialize(num_threads, args = {}) @num_threads = num_threads @timeout = args[:timeout] @job_runners = [] @job_runners_lock = Mutex.new @terminate = false @terminate_lock = Mutex.new @work_queue = Queue.new 0.upto(@num_threads) { |i| runner = ThreadPoolJobRunner.new(self) @job_runners << runner runner.run } # optional timeout thread unless @timeout.nil? @timeout_thread = Thread.new { until terminate sleep @timeout @job_runners_lock.synchronize { @job_runners.each { |jr| jr.check_timeout(@timeout) } } end } end end # terminate reader def terminate @terminate_lock.synchronize { @terminate } end # terminate setter def terminate=(val) @terminate_lock.synchronize { @terminate = val } end # Add work to the pool def <<(work) @work_queue.push work end # Return the next job queued up def next_job @work_queue.pop end # Terminate the thread pool def stop terminate = true @timeout_thread.join unless @timout_thread.nil? @work_queue.clear @job_runners_lock.synchronize { @job_runners.each { |jr| jr.stop } } end end
- mmorsi's blog
- Login to post comments





