Managing a fork pool to handle tasks

Hey guys,

I am doing some background processes in ruby, and I would like to use
forks. But I can't figure out how I can manage them.

- I want to create a pool of child processes (forks) running in the
background, limited on a number I specify
- Create them from a parent task but creation of new child processes
should be blocked untill one of the child processes in the pool is done
- I want to limit the time a child process can run (so it should quit
after x seconds)

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
  timeout(4) do
    fetch pages, parse stuff, enc...
  end
end

···

--
Posted via http://www.ruby-forum.com/.

google ruby forkoff [?]

···

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany <abdulrahman@advany.com> wrote:

Hey guys,

I am doing some background processes in ruby, and I would like to use
forks. But I can't figure out how I can manage them.

- I want to create a pool of child processes (forks) running in the
background, limited on a number I specify
- Create them from a parent task but creation of new child processes
should be blocked untill one of the child processes in the pool is done
- I want to limit the time a child process can run (so it should quit
after x seconds)

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
  timeout(4) do
    fetch pages, parse stuff, enc...
  end
end
--
Posted via http://www.ruby-forum.com/\.

Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)

require 'thread'
Thread.abort_on_exception = true

class MultiFail < Exception
   attr_reader :queue

   def initialize( _queue)
     @queue = _queue
   end
end

class MultiThread
   private

   def do_stuff
     job = @jobs.deq
     while job
       job.call(Thread.current[:index])
       job = @jobs.deq
     end
   rescue Exception => failure
     @failed << failure
   end

   public

   # Spawns a pool of _jobs worker threads
   def initialize( _jobs = 1)
     raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
     @jobs = SizedQueue.new( 2 * _jobs)
     @threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
     @failed = Queue.new
   end

   # Run block in one of the threads
   def run(&block)
     raise MultiFail.new(@failed) if @failed.size > 0
     @jobs.enq( block)
   end

   # Wait until all threads are finished doing whatever they're doing.
   def join
     @threads.each{|t| @jobs.enq nil}
     @threads.each{|t| t.join}
     raise MultiFail.new(@failed) if @failed.size > 0
   end
end

if $0 == __FILE__ then
   require 'test/unit'

   class TC_MultiThread < Test::Unit::TestCase
     def initialize(test)
       super(test)
       @c = 0
     end

     def wrap(s)
       @c += s
       if @c > 70
         puts
         @c = 0
       end
     end

     def dot(c)
       s = sprintf( '%x< ',c)
       print s
       wrap s.size
     end

     def undot(c)
       s = sprintf( '>%x ',c )
       print s
       wrap s.size
     end

     def try_for(loops,threads)
       puts "Trying [#{loops},#{threads}]"
       i = 0
       k = 0
       max = 0
       mutex = Mutex.new
       multi_thread = MultiThread.new(threads)

       loops.times do |j|
         multi_thread.run do |t|
           dot(t)
           mutex.synchronize do
             i += 1
           end
           sleep 1
           mutex.synchronize do
             assert( i <= threads)
             k +=1
             max = i if i > max
           end
           mutex.synchronize do
             i -= 1
           end
           undot(t)
         end
       end
       multi_thread.join
       assert_equal(0, i)
       assert( ((threads <= 1) || (loops <= 1)) || max > 1)
       assert_equal( loops, k)
     end

     def test_multi
       assert_raises(RuntimeError){ try_for(0,0)}
       try_for(0,1)
       try_for(0,2)
       try_for(1,1)
       try_for(2,1)
       try_for(2,2)
       try_for(2,100)
       try_for(3,1)
       try_for(3,2)
       try_for(3,3)
       try_for(3,100)
       try_for(100,100)
     end

     def test_fail
       multi_thread = MultiThread.new(3)

       multi_thread.run do
         sleep 2
       end

       multi_thread.run do
         raise "This thread failed for test purposes"
       end

       assert_raises( MultiFail) do
         multi_thread.run do
           sleep 2
         end
       end

       begin
         multi_thread.join
       rescue MultiFail => multi_fail
         assert_equal( RuntimeError, multi_fail.queue.pop.class)
       end
     end
   end

end

John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : john.carter@tait.co.nz
New Zealand

···

On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
   fetch pages, parse stuff, enc...
end
end

Roger Pack wrote:

google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

I don't have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don't think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork.. does that fork the thread?

···

--
Posted via http://www.ruby-forum.com/\.

Your multithread class doesn't catch failures...
http://ruby-rails.pl/true-ruby-thread-pool

···

--
Posted via http://www.ruby-forum.com/.

Abdul-rahman Advany wrote:

Roger Pack wrote:

google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

I don't have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don't think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork.. does that fork the thread?

Sorry, I didn't know that calling fork makes the thread become a child
process

···

--
Posted via http://www.ruby-forum.com/\.

Contrariwise.

It does.

Of course it's a bit debatable what you want to do with a failure once
you have caught it.

Having a exception bubble up the call frames to the top level of a
generic pool worker thread is not very helpful.

Having all the tasks complete before you act on a failure is not what
I wanted either.

The gotcha is two or more failures can happen before you start
handling them in the parent thread.

So what I do is catch failues, and drop them in a list which I check
before every run / join.

If there have been any failures I throw them all in a bundle up the
parent thread.

That may not be what you want, but it makes sense to me.

John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : john.carter@tait.co.nz
New Zealand

···

On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

Your multithread class doesn't catch failures...
http://ruby-rails.pl/true-ruby-thread-pool