you win the golf for sure - here's something similar to what i've used in production code:
···
On Oct 13, 2007, at 4:35 AM, Robert Klemme wrote:
Here's my version with all the remarks incorporated.
require 'thread'
MAX_IN_QUEUE = 1024
NUM_THREADS = 5
queue = SizedQueue.new MAX_IN_QUEUE
threads = (1..NUM_THREADS).map do
# we use the mechanism to pass the queue through
# the constructor to avoid nasty effects of
# variable "queue" changing
Thread.new queue do |q|
# we use the queue itself as terminator
until q == (item = q.deq)
begin
# whatever processing
rescue Exception => e
# whatever error handling
end
end
end
end
# read from files on the command line
ARGF.each do |line|
queue.enq line
end
threads.each do |th|
# send the terminator and wait
queue.enq queue
th.join
end
Have fun!
robert
#
# a lines producer feeds chunks of lines to consuming threads. the producer
# itself does not slurp the potentially huge log file into memory at once,
# rather, it reads only 'bufsize' lines at a time. consumers process
# 'bufsize' lines of the file at a time where 'bufsize' means that the number
# of lines yielded to the block with be that big *at most*: near the end of a
# file it's possible that consumers will be given less that 'bufsize' lines to
# process
#
Lines::Producer.new :path => __FILE__, :bufsize => 10 do
consumer :bufsize => 2 do |lines|
lines.each{|line| puts line}
end
consumer :bufsize => 3 do |lines|
lines.each{|line| puts line}
end
end
#
# Lines module and Producer/Consumer classes
#
BEGIN do
require 'thread'
module Lines
class Error < ::StandardError
class Starvation < Error; end
end
class Producer
%w[ path bufsize ].each{|a| attr a}
def initialize options = {}, &block
@path = String options[:path]
@bufsize = Integer options[:bufsize] || 1
produce &block if block
end
def produce &block
setup
configure &block
[ new_buffered_reader, new_buffered_writer ].each{|t| t.join}
teardown
end
def setup
@consumers =
@sq = SizedQueue.new @bufsize
end
def configure &block
instance_eval &block
end
def new_buffered_reader
Thread.new do
Thread.current.abort_on_exception = true
open(@path){|fd| fd.each{|line| @sq.push line}}
@sq.push(:eof)
end
end
def new_buffered_writer
Thread.new do
Thread.current.abort_on_exception = true
catch :eof do
loop do
@consumers.each do |consumer|
chunk =
consumer.bufsize.times do
line = @sq.pop
throw :eof if line == :eof
chunk << line
end
consumer << chunk
end
end
end
notify_all :eof
end
end
def notify_all msg = :eof
@consumers.each{|consumer| consumer << msg}
end
def teardown
@consumers.map{|consumer| consumer.wait}
end
def consumer options = {}, &block
@consumers << Consumer.new(self, options, &block)
end
class Consumer
attr 'bufsize'
def initialize producer, options = {}, &block
@bufsize = Integer options[:bufsize]
@producer = producer
raise Error::Starvation unless @bufsize < @producer.bufsize
@block = block
@q = Queue.new
@block = block
@thread = new_thread
end
def << data
@q.push data
end
def new_chunk
Array.new bufsize
end
def new_thread
Thread.new do
Thread.current.abort_on_exception = true
loop do
data = @q.pop
break if data == :eof
@block.call data
end
end
end
def wait
@thread.value
end
end
end
end
end
a @ http://codeforpeople.com/
--
share your knowledge. it's a way to achieve immortality.
h.h. the 14th dalai lama