Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Ruby Thread Pool (See related posts)

This is useful for a variety of scenarios. You create a thread pool, give it a maximum size, and pass a block to it everytime you need something processed. If all threads are busy, you block until a thread becomes free.

For example:

pool = ThreadPool.new(10) # up to 10 threads
email_addresses.each do |addr|
  pool.process {send_mail_to addr}
end


require 'thread'

class ThreadPool
  class Worker
    def initialize
      @mutex = Mutex.new
      @thread = Thread.new do
        while true
          sleep 0.001
          block = get_block
          if block
            block.call
            reset_block
          end
        end
      end
    end
    
    def get_block
      @mutex.synchronize {@block}
    end
    
    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
      end
    end
    
    def reset_block
      @mutex.synchronize {@block = nil}
    end
    
    def busy?
      @mutex.synchronize {!@block.nil?}
    end
  end
  
  attr_accessor :max_size
  attr_reader :workers

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
  end
  
  def size
    @mutex.synchronize {@workers.size}
  end
  
  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end
  
  def join
    sleep 0.01 while busy?
  end
  
  def process(&block)
    wait_for_worker.set_block(block)
  end
  
  def wait_for_worker
    while true
      worker = find_available_worker
      return worker if worker
      sleep 0.01
    end
  end
  
  def find_available_worker
    @mutex.synchronize {free_worker || create_worker}
  end
  
  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end
  
  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new
    @workers << worker
    worker
  end
end

Comments on this post

synfinatic posts on Jan 26, 2007 at 21:38
There seems to be a race condition when executing a block- two jobs may try to grab the same thread. Changing find_available_worker() and wait_for_worker() like this solves the problem:
    def process(&block)
        while true
            @mutex.synchronize do
                worker = find_available_worker 
                if worker
                    return worker.set_block(block)
                end
            end
            sleep 0.01
        end
    end

    def find_available_worker
        free_worker || create_worker
    end
andrewlmurray posts on Mar 20, 2007 at 14:01
One other thing is that the example of usage provided
pool = ThreadPool.new(10) # up to 10 threads
email_addresses.each do |addr|
  pool.process {send_mail_to addr}
end


Should really read
pool = ThreadPool.new(10) # up to 10 threads
email_addresses.each do |addr|
  pool.process {send_mail_to addr}
end 
pool.join()


Otherwise your program will terminate before your pool has finished running

You need to create an account or log in to post comments to this site.


Click here to browse all 5140 code snippets

Related Posts