Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)
require 'thread'
Thread.abort_on_exception = true
class MultiFail < Exception
attr_reader :queue
def initialize( _queue)
@queue = _queue
end
end
class MultiThread
private
def do_stuff
job = @jobs.deq
while job
job.call(Thread.current[:index])
job = @jobs.deq
end
rescue Exception => failure
@failed << failure
end
public
# Spawns a pool of _jobs worker threads
def initialize( _jobs = 1)
raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
@jobs = SizedQueue.new( 2 * _jobs)
@threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
@failed = Queue.new
end
# Run block in one of the threads
def run(&block)
raise MultiFail.new(@failed) if @failed.size > 0
@jobs.enq( block)
end
# Wait until all threads are finished doing whatever they're doing.
def join
@threads.each{|t| @jobs.enq nil}
@threads.each{|t| t.join}
raise MultiFail.new(@failed) if @failed.size > 0
end
end
if $0 == __FILE__ then
require 'test/unit'
class TC_MultiThread < Test::Unit::TestCase
def initialize(test)
super(test)
@c = 0
end
def wrap(s)
@c += s
if @c > 70
puts
@c = 0
end
end
def dot(c)
s = sprintf( '%x< ',c)
print s
wrap s.size
end
def undot(c)
s = sprintf( '>%x ',c )
print s
wrap s.size
end
def try_for(loops,threads)
puts "Trying [#{loops},#{threads}]"
i = 0
k = 0
max = 0
mutex = Mutex.new
multi_thread = MultiThread.new(threads)
loops.times do |j|
multi_thread.run do |t|
dot(t)
mutex.synchronize do
i += 1
end
sleep 1
mutex.synchronize do
assert( i <= threads)
k +=1
max = i if i > max
end
mutex.synchronize do
i -= 1
end
undot(t)
end
end
multi_thread.join
assert_equal(0, i)
assert( ((threads <= 1) || (loops <= 1)) || max > 1)
assert_equal( loops, k)
end
def test_multi
assert_raises(RuntimeError){ try_for(0,0)}
try_for(0,1)
try_for(0,2)
try_for(1,1)
try_for(2,1)
try_for(2,2)
try_for(2,100)
try_for(3,1)
try_for(3,2)
try_for(3,3)
try_for(3,100)
try_for(100,100)
end
def test_fail
multi_thread = MultiThread.new(3)
multi_thread.run do
sleep 2
end
multi_thread.run do
raise "This thread failed for test purposes"
end
assert_raises( MultiFail) do
multi_thread.run do
sleep 2
end
end
begin
multi_thread.join
rescue MultiFail => multi_fail
assert_equal( RuntimeError, multi_fail.queue.pop.class)
end
end
end
end
John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : john.carter@tait.co.nz
New Zealand
···
On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:
I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...
pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
fetch pages, parse stuff, enc...
end
end