Tags

redhat
employment
ripple
interfaces
ncurses
ruby
refs
filesystems
retro gaming
raspberry pi
sinatra
3d printing
nethack
gcc
compiler
fedora
virtfs
project
gaming
vim
grep
sed
aikido
philosophy
splix
android
lvm
storage
bitcoin
projects
sig315
miq
db
polisher
meditation
hopex
conferences
omega
simulator
bundler_ext
rubygems
book review
google code in
isitfedoraruby
svn
gsoc
design patters
jsonrpc
rjr
aeolus
ohiolinuxfest
rome
europe
travel
brno
gtk
python
puppet
conference
fudcon
snap
html5
tips
ssh
linux
hardware
libvirt
virtualization
engineering expo
cloud
rpm
yum
rake
redmine
plugins
screencasting
jruby
fosscon
pidgin
gnome-shell
distros
notacon
presentation
rails
deltacloud
apache
qmf
passenger
syrlug
hackerspace
music
massive attack
crypto
backups
vnc
xsd
rxsd
x3d
mercurial
ovirt
qpid
webdev
haikus
poetry
legaleese
jquery
selenium
testing
xpath
git
sshfs
svg
ldap
autotools
pygtk
xmlrpc
slackware

Sep 15 2010 ruby

A Ruby Thread Pool

Recently I implemented a simple thread pool in Ruby for a side project I'm working on and figured I'd share.

AFAIK its pretty solid, it does what it needs to do without any deadlocks or race conditions (feel free to correct me if you find one), though it probably could use a little simplification and 'Rubyification'. As an added bonus I've included an optional timeout feature, if a job being run by a thread in the pool takes longer than the specified time, it is killed and that thread is then free to pick up another task. Enjoy!

  require 'thread'

  # Work item to be executed in thread pool
  class ThreadPoolJob
    attr_accessor :handler
    attr_accessor :params

    def initialize(*params, &block)
      @params = params
      @handler = block
    end
  end


  # Launches a specified number of threads on instantiation,
  # assigning work to them as it arrives
  class ThreadPool

    # Encapsulate each thread pool thread in object
    class ThreadPoolJobRunner
      attr_accessor :time_started

      def initialize(thread_pool)
        @thread_pool = thread_pool
        @timeout_lock = Mutex.new
        @thread_lock  = Mutex.new
      end

      def run
        @thread_lock.synchronize {
          @thread = Thread.new {
            until @thread_pool.terminate
              @timeout_lock.synchronize { @time_started = nil }
              work = @thread_pool.next_job
              @timeout_lock.synchronize { @time_started = Time.now }
              work.handler.call *work.params unless work.nil?
            end
          }
        }
      end

      def check_timeout(timeout)
        @timeout_lock.synchronize {
          if !@time_started.nil? && Time.now - @time_started > timeout
            stop
            run
          end
        }
      end

      def stop
        @thread_lock.synchronize {
          if @thread.alive?
            @thread.kill
            @thread.join
          end
        }
      end
    end

    # Create a thread pool with a specified number of threads
    def initialize(num_threads, args = {})
      @num_threads = num_threads
      @timeout     = args[:timeout]
      @job_runners = []
      @job_runners_lock = Mutex.new
      @terminate = false
      @terminate_lock = Mutex.new

      @work_queue  = Queue.new

      0.upto(@num_threads) { |i|
        runner = ThreadPoolJobRunner.new(self)
        @job_runners << runner
        runner.run
      }

      # optional timeout thread
      unless @timeout.nil?
        @timeout_thread = Thread.new {
          until terminate
            sleep @timeout
            @job_runners_lock.synchronize {
              @job_runners.each { |jr|
                jr.check_timeout(@timeout)
              }
            }
          end
        }
      end
    end

    # terminate reader
    def terminate
      @terminate_lock.synchronize { @terminate }
    end

    # terminate setter
    def terminate=(val)
      @terminate_lock.synchronize { @terminate = val }
    end

    # Add work to the pool
    def <<(work)
      @work_queue.push work
    end

    # Return the next job queued up
    def next_job
      @work_queue.pop
    end

    # Terminate the thread pool
    def stop
      terminate = true
      @timeout_thread.join unless @timout_thread.nil?
      @work_queue.clear
      @job_runners_lock.synchronize { @job_runners.each { |jr| jr.stop } }
    end
  end