Is there a standard pattern for threaded access to a file?

I'm pretty new to ruby and this is one of those areas where I can't
quite seem to turn my head inside out as the language requires :slight_smile:

I have a log file that I want to process in parts, with multiple threads
working from the same file, each one getting a line at a time and doing
something with it.

I'd like something like

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

What I have looks like

open (ARGV.flags.log) do |logfile|
  logfile.each do |line|

             blah blah blah...

         end
end

but that's inside out! How do I rubify this code?

Thanks,

Jon

···

--
Posted via http://www.ruby-forum.com/.

cfp:~ > cat a.rb
require 'thread'

q = Queue.new

threads = and 5.times{ threads << Thread.new{ puts q.pop } }

open(__FILE__){|fd| fd.each{|line| q.push line} }

threads.map{|t| t.join}

cfp:~ > ruby a.rb
require 'thread'

q = Queue.new

threads = and 5.times{ threads << Thread.new{ puts q.pop } }

a @ http://codeforpeople.com/

···

On Oct 12, 2007, at 5:51 PM, Jon Handler wrote:

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama

Why are you doing this in the first place? Do you have a computer with five
processors and five memory buses?

···

On 10/12/07, Jon Handler <jhandler@shopping.com> wrote:

I'm pretty new to ruby and this is one of those areas where I can't
quite seem to turn my head inside out as the language requires :slight_smile:

I have a log file that I want to process in parts, with multiple threads
working from the same file, each one getting a line at a time and doing
something with it.

I'd like something like

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

*finally* i remembered where this has been abstracted. in my own lib : alib :wink:

cfp:~ > cat a.rb
require 'alib' ### gem install alib

alib.util.threadify IO.readlines(__FILE__), n_threads=5 do |line, lineno>

   puts "#{ lineno }:#{ line }"

end

cfp:~ > ruby a.rb
0:require 'alib'
1:
2:alib.util.threadify IO.readlines(__FILE__), n_threads=5 do |line, lineno>
3:
4: puts "#{ lineno }:#{ line }"
5:
6:end

this works for *any* enumerable thing you want to process with 'n' backend threads.

the current (0.5.0) alib version will blow up if you give it an IO object though, as it uses #size to calculate the return value. i'll tweak it and release 0.5.1 today.

cheers.

a @ http://codeforpeople.com/

···

On Oct 12, 2007, at 5:51 PM, Jon Handler wrote:

I'm pretty new to ruby and this is one of those areas where I can't
quite seem to turn my head inside out as the language requires :slight_smile:

I have a log file that I want to process in parts, with multiple threads
working from the same file, each one getting a line at a time and doing
something with it.

I'd like something like

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

What I have looks like

open (ARGV.flags.log) do |logfile|
  logfile.each do |line|

             blah blah blah...

         end
end

but that's inside out! How do I rubify this code?

Thanks,

Jon

--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama

Wow!

Thanks everyone for the detailed and insightful help!

Cheers,

Jon

···

--
Posted via http://www.ruby-forum.com/.

ara.t.howard wrote:

threads.map{|t| t.join}

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

···

--
Posted via http://www.ruby-forum.com/\.

Francis Cianfrocca wrote:

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer with
five
processors and five memory buses?

According to pickaxe2, p. 135, your question is irrelevant:

"Finally, if your machine has more than one processor, Ruby threads
won't take advantage of that fact--because they run in a single process,
and in a single native thread, they are constrained to run on one
processor at a time."

Perhaps a better question for the op is: does your processing result in
any pauses in the code? For instance, do you use the information in the
log file to send requests to websites where you are waiting for a
response?

Threads do not actually allow any code to run at the same time. What
really happens is that execution rapidly shifts from one thread to
another, which gives the appearance that the threads are executing at
the same time.

If you have five methods that each take 2 seconds to execute, and you
run those five methods one after another, your program with take 10
seconds to execute. On the other hand, if you use five threads to
execute those methods, your program will still take 10 seconds to
execute. For example, suppose each thread gets 1 second to execute
before execution shifts to another thread, something like this will
occur:

thread1: 1 sec
    >
    V
thread2: 1 sec
    >
    V
thread3: 1 sec
    >
    V
thread4: 1 sec
    >
    V
thread5: 1 sec
    >
    V
thread1: 1 sec
    >
    V
thread2: 1 sec
    >
    V
thread3: 1 sec
    >
    V
thread4: 1 sec
    >
    V
thread5: 1 sec

If you total up the time, it still takes 10 seconds for your program to
execute when using five threads. The only way threads help speed up
execution is if there are pauses in your code where nothing is
happening. During those pauses, threads allow execution to shift to
other code that is ready to execute.

···

On 10/12/07, Jon Handler <jhandler@shopping.com> wrote:

--
Posted via http://www.ruby-forum.com/\.

ara.t.howard wrote:

[...]
threads = and 5.times{ threads << Thread.new{ puts q.pop } }
[...]

just a question of style:

while the above is quite clever, is there any (hidden) reason to use it over
such primitive constructs like

threads = Array.new(5){ Thread.new{ puts q.pop } }

or

threads = (1..5).map{ Thread.new{ puts q.pop } }

?

cheers

Simon

It depends upon the operation being parallelized. If your file is filled with IP addresses to resolve you will use very little CPU. I've used resolv.rb and hundreds of threads to rapidly resolve IP addresses streamed from an HTTP access log.

···

On Oct 13, 2007, at 07:29 , Francis Cianfrocca wrote:

On 10/12/07, Jon Handler <jhandler@shopping.com> wrote:

I'm pretty new to ruby and this is one of those areas where I can't
quite seem to turn my head inside out as the language requires :slight_smile:

I have a log file that I want to process in parts, with multiple threads working from the same file, each one getting a line at a time and doing something with it.

I'd like something like

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer with five processors and five memory buses?

--
Poor workers blame their tools. Good workers build better tools. The
best workers get their tools to do the work for them. -- Syndicate Wars

Francis Cianfrocca wrote:

···

On 10/12/07, Jon Handler <jhandler@shopping.com> wrote:

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

Why are you doing this in the first place? Do you have a computer with
five
processors and five memory buses?

I'm stress-testing my http server... Since most of the time is spent
waiting for requests to go and come back, multiple threads on the
sending end allows for greater throughput (to a point anyway). The
number 5 was just an example of n where n > 1.

Jon
--
Posted via http://www.ruby-forum.com/\.

Actually, the example provided won't even work in your case. You have
to do some extra things.

I'm pretty new to ruby

A Queue is a first in first out container, which means the items you
push() into one end of the Queue are the first items that pop() out the
other end. A Queue is also thread safe, which means that only one
thread can access it at the same time.

Therefore, you can push() the lines from your file into one end of the
Queue, and you can have each thread pop() a line off the other end of
the Queue.

If there is nothing in the Queue, then a thread that tries to pop() a
line from the Queue will block until more data becomes available. As a
result, even after all the lines have been read from the Queue, each
thread will come back to the Queue and try to pop() another line, but
since there won't be any more lines left, the threads will block and
wait for more data. That means the threads will never end. To make
your threads stop trying to read more lines from the Queue once it's
exhausted, you will need to send each thread a string that acts as a
termination message.

You could first push() all the lines from your file into the Queue, and
then start the threads, but you might as well get the threads working on
the first lines while you are pushing the rest of the lines into the
Queue. So, start the threads and let them block, then start pushing
the lines from the file into the Queue.

require 'thread'

#Create some data:
File.open("data.txt", "w") do |file|
  (1..100).each do |num|
    file.puts("line #{num}")
  end
end

#Read data with 5 threads:
q = Queue.new

my_threads = (1..5).collect do |i|
  Thread.new do #returns a thread
    loop do
      line = q.pop

      if line == "END_OF_DATA"
        break
      end

      #process line:
      puts line.capitalize
    end
  end
end

#Threads are blocking while they
#await data. Give them some data:
IO.foreach("data.txt") do |line|
  q.push(line)
end

#Send each thread a signal that
#terminates the thread:
5.times {q.push("END_OF_DATA")}

#Wait for all the threads to finish
#executing:
my_threads.each {|t| t.join}

···

--
Posted via http://www.ruby-forum.com/\.

ara.t.howard wrote:

[...]
threads = and 5.times{ threads << Thread.new{ puts q.pop } }
[...]

just a question of style:

while the above is quite clever, is there any (hidden) reason to use it over
such primitive constructs like

not really. mostly i just like the '5' to be more prominent in the code

threads = Array.new(5){ Thread.new{ puts q.pop } }

this works

or

threads = (1..5).map{ Thread.new{ puts q.pop } }

seems kinda heavyweight, but yeah

cheers.

a @ http://codeforpeople.com/

···

On Oct 13, 2007, at 9:50 AM, Simon Kröger wrote:
--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama

unless you use

   Thread.current.abort_on_exception = true

you should *always* use 'map' and check the return value. otherwise you have no idea if your threads have completed successfully and you simply exit whether threads did their job or not.

regards.

a @ http://codeforpeople.com/

···

On Oct 12, 2007, at 8:35 PM, 7stud -- wrote:

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama

Eric, are you reading/posting on comp.lang.ruby ? I don't see Francis'
post, but both you and 7stud quoted him, so I'm wondering if it was
aggregated from somewhere else.

···

On Oct 13, 1:32 pm, Eric Hodel <drbr...@segment7.net> wrote:

On Oct 13, 2007, at 07:29 , Francis Cianfrocca wrote:

While this is all true and well I have some additional remarks.

Actually, the example provided won't even work in your case. You have to do some extra things.

I'm pretty new to ruby

A Queue is a first in first out container, which means the items you push() into one end of the Queue are the first items that pop() out the other end. A Queue is also thread safe, which means that only one thread can access it at the same time.

Therefore, you can push() the lines from your file into one end of the Queue, and you can have each thread pop() a line off the other end of the Queue.

If there is nothing in the Queue, then a thread that tries to pop() a line from the Queue will block until more data becomes available. As a result, even after all the lines have been read from the Queue, each thread will come back to the Queue and try to pop() another line, but since there won't be any more lines left, the threads will block and wait for more data. That means the threads will never end. To make your threads stop trying to read more lines from the Queue once it's exhausted, you will need to send each thread a string that acts as a termination message.

There is a better option: rather send something down the queue that is *not a String* - otherwise processing would suddenly stop if the file contained the terminating line.

You could first push() all the lines from your file into the Queue, and then start the threads,

That's a rather bad idea given that a file can be huge and you do not need all lines in memory for line wise processing.

That's the same reason why it's a good idea to use a bounded queue: if processing is slower than reading, an unbounded queue will eventually fill up with the complete file contents. If processing is faster than reading then threads will have to wait either way.

but you might as well get the threads working on the first lines while you are pushing the rest of the lines into the Queue. So, start the threads and let them block, then start pushing the lines from the file into the Queue.

require 'thread'

#Create some data:
File.open("data.txt", "w") do |file|
  (1..100).each do |num|
    file.puts("line #{num}")
  end
end

#Read data with 5 threads:
q = Queue.new

my_threads = (1..5).collect do |i|
  Thread.new do #returns a thread
    loop do
      line = q.pop

      if line == "END_OF_DATA"
        break
      end

      #process line:
      puts line.capitalize
    end
  end
end

#Threads are blocking while they
#await data. Give them some data:
IO.foreach("data.txt") do |line|
  q.push(line)
end

#Send each thread a signal that
#terminates the thread:
5.times {q.push("END_OF_DATA")}

#Wait for all the threads to finish
#executing:
my_threads.each {|t| t.join}

Here's my version with all the remarks incorporated.

require 'thread'

MAX_IN_QUEUE = 1024
NUM_THREADS = 5

queue = SizedQueue.new MAX_IN_QUEUE

threads = (1..NUM_THREADS).map do
   # we use the mechanism to pass the queue through
   # the constructor to avoid nasty effects of
   # variable "queue" changing
   Thread.new queue do |q|
     # we use the queue itself as terminator
     until q == (item = q.deq)
       begin
         # whatever processing
       rescue Exception => e
         # whatever error handling
       end
     end
   end
end

# read from files on the command line
ARGF.each do |line|
   queue.enq line
end

threads.each do |th|
   # send the terminator and wait
   queue.enq queue
   th.join
end

Have fun!

  robert

···

On 13.10.2007 05:59, 7stud -- wrote:

ara.t.howard wrote:

···

On Oct 12, 2007, at 8:35 PM, 7stud -- wrote:

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

unless you use

   Thread.current.abort_on_exception = true

you should *always* use 'map' and check the return value.

1) Where are you checking a return value:

threads.map{|t| t.join}

In fact, you discard map's return value.

2) How is map's return value ever going to be different than your
threads array?

--
Posted via http://www.ruby-forum.com/\.

I use the one, true ruby-talk, the ruby-talk@ruby-lang.org mailing list.

···

On Oct 13, 2007, at 13:15 , Brian Adkins wrote:

On Oct 13, 1:32 pm, Eric Hodel <drbr...@segment7.net> wrote:

On Oct 13, 2007, at 07:29 , Francis Cianfrocca wrote:

Eric, are you reading/posting on comp.lang.ruby ? I don't see Francis'
post, but both you and 7stud quoted him, so I'm wondering if it was
aggregated from somewhere else.

--
Poor workers blame their tools. Good workers build better tools. The
best workers get their tools to do the work for them. -- Syndicate Wars

i'm using the ml and do see francis' post.

a @ http://codeforpeople.com/

···

On Oct 13, 2007, at 2:15 PM, Brian Adkins wrote:

Eric, are you reading/posting on comp.lang.ruby ? I don't see Francis'
post, but both you and 7stud quoted him, so I'm wondering if it was
aggregated from somewhere else.

--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama

7stud -- wrote:

Actually, the example provided won't even work in your case. You have
to do some extra things.

I'm pretty new to ruby

A Queue is a first in first out container, which means the items you
push() into one end of the Queue are the first items that pop() out the
other end. A Queue is also thread safe, which means that only one
thread can access it at the same time.

Slapping forehead... of course! Producer/Consumer = q.

Thanks!

···

--
Posted via http://www.ruby-forum.com/\.

That's a very nice solution. It demonstrates a lot of accumulatd
wisdom. I think I'd use a symbol in the queue, such as :end_of_data,
rather than the queue itself to mark the end of the data, if only to
avoid a "huh?" moment from those who read the code down the line.

Eric

···

On Oct 13, 6:33 am, Robert Klemme <shortcut...@googlemail.com> wrote:

There is a better option: rather send something down the queue that is
*not a String* - otherwise processing would suddenly stop if the file
contained the terminating line.

----

On-site, hands-on Ruby training is available from http://LearnRuby.com
!