[ANN] forkoff - parallel processing for ruby enumerables

NAME

   forkoff

SYNOPSIS

   brain-dead simple parallel processing for ruby

URI

   http://rubyforge.org/projects/codeforpeople

INSTALL

   gem install forkoff

DESCRIPTION

   forkoff works for any enumerable object, iterating a code block to run in a
   child process and collecting the results. forkoff can limit the number of
   child processes which is, by default, 8.

SAMPLES

   <========< samples/a.rb >========>

   ~ > cat samples/a.rb

···

#
     # forkoff makes it trivial to do parallel processing with ruby, the following
     # prints out each word in a separate process
     #

       require 'forkoff'

       %w( hey you ).forkoff!{|word| puts "#{ word } from #{ Process.pid }"}

   ~ > ruby samples/a.rb

     hey from 3239
     you from 3240

   <========< samples/b.rb >========>

   ~ > cat samples/b.rb

     #
     # for example, this takes only 1 second or so to complete
     #

       require 'forkoff'

       a = Time.now.to_f

       results =
         (0..7).forkoff do |i|

           sleep 1

           i ** 2

         end

       b = Time.now.to_f

       elapsed = b - a

       puts "elapsed: #{ elapsed }"
       puts "results: #{ results.inspect }"

   ~ > ruby samples/b.rb

     elapsed: 1.07044386863708
     results: [0, 1, 4, 9, 16, 25, 36, 49]

   <========< samples/c.rb >========>

   ~ > cat samples/c.rb

     #
     # forkoff does *NOT* spawn processes in batches, waiting for each batch to
     # complete. rather, it keeps a certain number of processes busy until all
     # results have been gathered. in otherwords the following will ensure that 2
     # processes are running at all times, until the list is complete. note that
     # the following will take about 2 seconds to run (2 sets of 2 @ 1 second).
     #

     require 'forkoff'

     pid = Process.pid

     a = Time.now.to_f

     pstrees =
       %w( a b c d ).forkoff! :processes => 2 do |letter|
         sleep 1
         { letter => ` pstree -l 2 #{ pid } ` }
       end

     b = Time.now.to_f

     puts
     puts "pid: #{ pid }"
     puts "elapsed: #{ b - a }"
     puts

     require 'yaml'

     pstrees.each do |pstree|
       y pstree
     end

   ~ > ruby samples/c.rb

     pid: 3254
     elapsed: 2.12998485565186

     ---
     a: |
       -+- 03254 ahoward ruby -Ilib samples/c.rb
        >-+- 03255 ahoward ruby -Ilib samples/c.rb
        \-+- 03256 ahoward ruby -Ilib samples/c.rb

     ---
     b: |
       -+- 03254 ahoward ruby -Ilib samples/c.rb
        >-+- 03255 ahoward ruby -Ilib samples/c.rb
        \-+- 03256 ahoward ruby -Ilib samples/c.rb

     ---
     c: |
       -+- 03254 ahoward ruby -Ilib samples/c.rb
        >-+- 03261 ahoward (ruby)
        \-+- 03262 ahoward ruby -Ilib samples/c.rb

     ---
     d: |
       -+- 03254 ahoward ruby -Ilib samples/c.rb
        >-+- 03261 ahoward ruby -Ilib samples/c.rb
        \-+- 03262 ahoward ruby -Ilib samples/c.rb

a @ http://codeforpeople.com/
--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

NAME

forkoff

Nice. Great idea.

   # forkoff does *NOT* spawn processes in batches, waiting for each batch to
   # complete. rather, it keeps a certain number of processes busy until all
   # results have been gathered. in otherwords the following will ensure that 2
   # processes are running at all times, until the list is complete. note that
   # the following will take about 2 seconds to run (2 sets of 2 @ 1 second).
   #

I assume then that at most 2 processes are forked, and each keeps working?

ara howard wrote:

NAME

  forkoff

SYNOPSIS

  brain-dead simple parallel processing for ruby

URI

  http://rubyforge.org/projects/codeforpeople

INSTALL

  gem install forkoff

DESCRIPTION

  forkoff works for any enumerable object, iterating a code block to run
in a
  child process and collecting the results. forkoff can limit the
number of
  child processes which is, by default, 8.

So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :stuck_out_tongue:

SCNR

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

You thought I was taking your woman away from you. You're jealous.
You tried to kill me with your bare hands. Would a Kelvan do that?
Would he have to? You're reacting with the emotions of a human.
You are human.
~ -- Kirk, "By Any Other Name," stardate 4657.5

Very neat indeed!

martin

···

On Thu, Apr 17, 2008 at 6:43 PM, ara howard <ara.t.howard@gmail.com> wrote:

DESCRIPTION

  forkoff works for any enumerable object, iterating a code block to run in
a
  child process and collecting the results. forkoff can limit the number of
  child processes which is, by default, 8.

I've once implemented Enumerable#fork myself. It doesn't use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

···

----------------------------------------------------------------

Here's my code:

----------------------------------------------------------------

module Enumerable
   def fork(max_number_of_threads=nil, &block)
     thread_limiter =
EV::ThreadLimiter.new(max_number_of_threads)

     collect do |x|
       thread_limiter.fork do
         Thread.current.abort_on_exception = true

         r, w = IO.pipe

         if pid = Process.fork
           w.close
           Process.wait(pid)
           data = r.read
           r.close
           Marshal.load(data)
         else
           r.close
           Marshal.dump(block.call(x), w)
           w.close
           exit
         end
       end
     end.collect do |t|
       t.value
     end
   end
end

----------------------------------------------------------------

module EV
   class ThreadLimiter
     def initialize(max_number_of_threads)
       @number_of_threads = 0
       @max_number_of_threads = max_number_of_threads

       yield(self) if block_given?
     end

     def fork(*args, &block)
       Thread.pass while @max_number_of_threads and
                               @max_number_of_threads > 0 and
                               @number_of_threads >
@max_number_of_threads

       # If this methods is called from several threads, then
       # @number_of_threads might get bigger than
@max_number_of_threads.
       # This usually a) isn't the case and b) doesn't really matter
(to me...).
       # I'm willing to accept this "risk", because a)
Thread.exclusive is
       # much, much faster than Mutex#synchronize and b) we can't run
into
       # deadlocks.

       Thread.exclusive{@number_of_threads += 1}

       Thread.fork do
         begin
           res = block.call(*args)
         ensure
           Thread.exclusive{@number_of_threads -= 1}
         end

         res
       end
     end
   end
end

----------------------------------------------------------------

Here's a benchmark:

require "benchmark"

Benchmark.bm(15) do |bm|
   rc = nil
   r2 = nil
   r4 = nil
   rx = nil

   data = 1..10
   test = lambda{|x| 1_000_000.times{7+8}; [x, Process.pid]}

   bm.report(" collect "){rc = data.collect(&test)}
   bm.report(" 2 processes"){r2 = data.fork(2, &test)}
   bm.report(" 4 processes"){r4 = data.fork(4, &test)}
   bm.report("inf processes"){rx = data.fork(-1, &test)}

   p rc
   p r2
   p r4
   p rx
end

It produces these results on a dual core machine:

                      user system total real
     collect 4.530000 0.000000 4.530000 ( 4.527982)
   2 processes 0.030000 0.050000 3.170000 ( 1.733209)
   4 processes 0.160000 0.370000 3.610000 ( 1.927826)
inf processes 0.000000 0.000000 3.080000 ( 1.691932)
[[1, 18732], [2, 18732], [3, 18732], [4, 18732], [5, 18732], [6,
18732], [7, 18732], [8, 18732], [9, 18732], [10, 18732]]
[[1, 18733], [2, 18734], [3, 18735], [4, 18736], [5, 18737], [6,
18738], [7, 18739], [8, 18740], [9, 18741], [10, 18742]]
[[1, 18743], [2, 18744], [3, 18745], [4, 18746], [5, 18747], [6,
18748], [7, 18749], [8, 18750], [9, 18751], [10, 18752]]
[[1, 18753], [2, 18754], [3, 18755], [4, 18756], [5, 18757], [6,
18758], [7, 18759], [8, 18760], [9, 18761], [10, 18762]]

----------------------------------------------------------------

Just a word of warning: The construction "Thread.new(i){|i|" is
useless, by definition. Just like "i=i" is useless too.

If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

···

----------------------------------------------------------------

a = (1..10).to_a
a1 = a.map{|i| Thread.new { sleep 0.01 ; i }}.map{|t|
t.value}
a2 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Will do.
i = nil
a3 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Won't do!
a4 = a.map{|i1| Thread.new(i1){|i2|sleep 0.01 ; i2}}.map{|t|
t.value}

p a1 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a2 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a3 # ==> [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
p a4 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

----------------------------------------------------------------

ara howard:

forkoff works for any enumerable object, iterating a code
block to run in a child process and collecting the results.

Thanks a lot for this great gem – it’s a lifesaver for my
PhD, which does some heavy computing in a parallelable way.

A quick question, if I may. Assuming (which is not far from the truth)
that I have no much knowledge on threads and processes, is there any
way for the block under iteration to communicate with the outside world
other than by the returned result?

My example code:

results =
run = 0
runs = 2 ** fsm_inputs.size
(0...runs).forkoff do |vector|
  results[vector] = by_input_sets vector
  # snip some run-based stats
  run += 1
end

This, obviously, doesn’t work (i.e., results is an empty array at the
end and run is 0 in every iteration). I can get the results by making
the block return the by_input_sets call’s result, but I still lose the
run-based stats.

It seems a singleton-based approach would work (I’d create a singleton
object outside of the loop and have the results array and run counter
be its properties), but maybe there is an easier way?

-- Shot

···

--
Added mysterious, undocumented --scanflags and
--fuzzy options. -- nmap 3.0 announcement

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.

/Fredrik

right now it's 8 - 2 is more reasonable. at this point this code is fully proof of concept - i'll take that as a suggestion (that i agree with)

cheers.

a @ http://codeforpeople.com/

···

On Apr 17, 2008, at 7:51 PM, Roger Pack wrote:

I assume then that at most 2 processes are forked, and each keeps working?

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

oh yeah, that's good - taken!

a @ http://codeforpeople.com/

···

On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:

So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :stuck_out_tongue:

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

this is precisely how forkoff manages the number of processes, only it's via Queues and a fixed number of threads consuming from those queues

the latest impl from svn:

http://p.ramaze.net/1141

regards.

a @ http://codeforpeople.com/

···

On Apr 18, 2008, at 12:19 PM, Erik Veenstra wrote:

I've once implemented Enumerable#fork myself. It doesn't use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

indeed - left over from a previous iteration.

a @ http://codeforpeople.com/

···

On Apr 18, 2008, at 12:20 PM, Erik Veenstra wrote:

If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

the next release supports either an hash (options) or numeric argument - so either will work - may release tonight...

cheers.

a @ http://codeforpeople.com/

···

On Apr 20, 2008, at 7:20 PM, Fredrik wrote:

This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

to do this you'll want to combine forkoff with my slave lib: which sets up an object which is fronted by drb, an which can indeed be a singleton - note that this object is, itself, running in a child process, but you can ignore this for the most part. an simple example:

cfp:~ > cat a.rb
require 'rubygems'
require 'slave'
require 'forkoff'

slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object

( 0 .. 4 ).each do |i|
   process_global[i] = i ** 2
end

process_global.each do |k,v|
   p k => v
end

cfp:~ > ruby a.rb
{0=>0}
{1=>1}
{2=>4}
{3=>9}
{4=>16}

even with these abstractions you have to consider deeply what's happening with threads/processes etc - but yes, it's definitely possible with little code.

cheers.

a @ http://codeforpeople.com/

···

On Apr 20, 2008, at 12:32 PM, Shot (Piotr Szotkowski) wrote:

My example code:

results =
run = 0
runs = 2 ** fsm_inputs.size
(0...runs).forkoff do |vector|
results[vector] = by_input_sets vector
# snip some run-based stats
run += 1
end

This, obviously, doesn’t work (i.e., results is an empty array at the
end and run is 0 in every iteration). I can get the results by making
the block return the by_input_sets call’s result, but I still lose the
run-based stats.

It seems a singleton-based approach would work (I’d create a singleton
object outside of the loop and have the results array and run counter
be its properties), but maybe there is an easier way?

--
we can deny everything, except that we have the possibility of being better. simply reflect on that.
h.h. the 14th dalai lama

ara.t.howard wrote:

So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :stuck_out_tongue:

oh yeah, that's good - taken!

I want credit. Dollars aren't worth a dime. :stuck_out_tongue:

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

"You speak truth," said Themistocles; "I should never have been famous
if I had been of Seriphus"
~ -- Plutarch (46-120 AD)
~ -- Life of Themistocles

···

On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:

Thanks a lot for your reply, Ara, and for your time!

ara.t.howard:

to do this you'll want to combine forkoff with my slave lib: which
sets up an object which is fronted by drb, an which can indeed be
a singleton - note that this object is, itself, running in a child
process, but you can ignore this for the most part.

Ahhh, a DRb – another thing on my ‘to-read’ list. :slight_smile:

a simple example:

Ah, thanks. I’m a bit lost there, though:

slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object

( 0 .. 4 ).each do |i|
  process_global[i] = i ** 2
end

This doesn’t seem to be run concurrently, and
if I switch each to forkoff here it dies with

/home/shot/opt/ruby-1.8.6-p114/lib/ruby/1.8/drb/drb.rb:736:in `open': drbunix:///tmp/slave_hash_-605329008_13239_13240_0_0.32803043357426 - #<Errno::ENOENT: No such file or directory - ///tmp/slave_hash_-605329008_13239_13240_0_0.32803043357426> (DRb::DRbConnError)

even with these abstractions you have to consider deeply what's happening
with threads/processes etc - but yes, it's definitely possible with little
code.

Is there a good place (other than the relevant docs) to read a tutorial
on DRb? I feel really stupid to ask about stuff without knowing what I’m
writing about. :expressionless:

In the meantime, I tried the ‘simple’ forkoff approach
with just handling the returned values, but I end with

/home/shot/opt/ruby-1.8.6-p114/lib/ruby/gems/1.8/gems/forkoff-0.0.1/lib/forkoff.rb:53:in `dump': singleton can't be dumped (TypeError)

Which, exactly, object is being marshalled by forkoff? The one returned,
or the one that contains the forkoff call? (I tried removing any
singleton references from both, but to no success so far.)

Also, a humble patch:

--- slave-1.2.1.rb.orig 2008-04-24 21:01:38.000000000 +0200
+++ slave-1.2.1.rb 2008-04-24 21:02:54.000000000 +0200
@@ -11,8 +11,8 @@

···

#
# the Slave class encapsulates the work of setting up a drb server in another
# process running on localhost via unix domain sockets. the slave process is
-# attached to it's parent via a LifeLine which is designed such that the slave
-# cannot out-live it's parent and become a zombie, even if the parent dies and
+# attached to its parent via a LifeLine which is designed such that the slave
+# cannot out-live its parent and become a zombie, even if the parent dies an
# early death, such as by 'kill -9'. the concept and purpose of the Slave
# class is to be able to setup any server object in another process so easily
# that using a multi-process, drb/ipc, based design is as easy, or easier,

-- Shot, loving the ‘slave cannot out-live its parent and become
   a zombie, even if the parent dies an early death’ quote.
--
With some people's code, your best debugging
tool is 'rm'. -- Jesper Lauridsen

I think this is a great idea!
Kudos

···

On Fri, Apr 18, 2008 at 9:09 AM, Phillip Gawlowski < cmdjackryan@googlemail.com> wrote:

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara.t.howard wrote:
>
> On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:
>>
>> So, the tool that captures run away processes and terminates them will
>> be called 'sodoff', I wager? :stuck_out_tongue:
>
> oh yeah, that's good - taken!

I want credit. Dollars aren't worth a dime. :stuck_out_tongue:

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

"You speak truth," said Themistocles; "I should never have been famous
if I had been of Seriphus"
~ -- Plutarch (46-120 AD)
~ -- Life of Themistocles
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIGAMACgkQbtAgaoJTgL/40QCgpIHgsDVOKQHPfTLEWA05FwLs
73gAn0D6YYgbh0Td+nNcVf6xGMr6ZPGM
=hUj0
-----END PGP SIGNATURE-----

Checking out the source I only see pid = fork (this is a call to
Thread.new isn't it?), I don't see that real fork (kernel) is used... or
I am wrong?

···

--
Posted via http://www.ruby-forum.com/.

Since it's using Kernel#fork(), does this mean it is using OS threads?

Abdul-rahman Advany wrote:

Checking out the source I only see pid = fork (this is a call to
Thread.new isn't it?), I don't see that real fork (kernel) is used... or
I am wrong?

Sorry, just figured out calling fork makes the thread a child process...
just need to find out how to set a timeout on the thread... anyone
suggestions?

···

--
Posted via http://www.ruby-forum.com/\.