Thread.list confusion

Still in pursuit of trying to figure out what is going on with my
threads...

Have done the following:

      3 class Thread
      4 alias __old_initialize initialize
      5
      6 def initialize(*args, &block)
      7 __old_initialize(*args, &block)
      8 STDERR.puts "created thread: #{inspect}"
      9 STDERR.puts block.to_s
     10 end
     11 end

This is above any require 'blah' statements so, this should ensure that
any threads created use my hacked version, right?

Maybe I *really* don't understand what's happening here (highly likely),
but my assumption is that I should have an equal number of threads going
through my code as appear in the Thread.list. However, based on my
current numbers, I have 13 calls to my initialize (one of these is from
drb, so it must be doing what I think). However, the Thread list
contains 33 entries (32 sleeping & 1 running).

I've tried various instrumentation stuff, including things like:

#set_trace_func proc { |event, file, line, id, binding, classname|
# if classname.to_s =~ /read/
# printf("%-10s %10s:%-2d %10s %8s\n", event, file, line, id, classname)
# STDOUT.flush
# end
#}

But, I really can't figure out what is happening. From what my code is
doing, I would expect there to be 13 threads created (counting the main
thread). This perfectly matches up with the number of calls to my
hacked Thread class. Anyone have any ideas?

Also, I've looked at this from a number of different angles, but it
appears to be no way to create a normal thread pool with Ruby. Is this
really correct? I found something about this on www.rubygarden.org, but
this isn't exactly what I want (but it is similar in spirit). From
experimentation, it seems that you get one shot for Thread.new. This
seems a bit expensive, but maybe that's just the way it is.

Thanks in advance,

ast

···

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************

Still in pursuit of trying to figure out what is going on with my
threads...

Have done the following:

      3 class Thread
      4 alias __old_initialize initialize
      5
      6 def initialize(*args, &block)
      7 __old_initialize(*args, &block)
      8 STDERR.puts "created thread: #{inspect}"
      9 STDERR.puts block.to_s
     10 end
     11 end

This is above any require 'blah' statements so, this should ensure that
any threads created use my hacked version, right?

Yes

Maybe I *really* don't understand what's happening here (highly likely),
but my assumption is that I should have an equal number of threads going
through my code as appear in the Thread.list. However, based on my
current numbers, I have 13 calls to my initialize (one of these is from
drb, so it must be doing what I think). However, the Thread list
contains 33 entries (32 sleeping & 1 running).

ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each thread you spawn so you can see who is spawning the extra threads.

DRb spawns a thread per connection. To see DRb's threads, do:

drb = DRb.start_service
drb.instance_variable_get('@grp').list

Also, I've looked at this from a number of different angles, but it
appears to be no way to create a normal thread pool with Ruby. Is this
really correct? I found something about this on www.rubygarden.org, but
this isn't exactly what I want (but it is similar in spirit). From
experimentation, it seems that you get one shot for Thread.new. This
seems a bit expensive, but maybe that's just the way it is.

What do you want?

···

On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:

--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04

Hi Eric

[snip]

ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
thread you spawn so you can see who is spawning the extra threads.

Hmmm... I think I might use a ThreadGroup, no? :wink:

> Also, I've looked at this from a number of different angles, but it
> appears to be no way to create a normal thread pool with Ruby.

Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...

What do you want?

Something like this (actually, it was straightforward enough once I
thought about it a little):

$ cat tpool.rb
require 'thread'

class ThreadPool
  def initialize(size)
    @work = Queue.new
    @workers =
    @group = ThreadGroup.new
    @shutdown = false
    @sh_mutex = Mutex.new
    size.times do
      @workers << t = Thread.new { Thread.stop; thread_work };
      @group.add(t)
    end
    @monitor = Thread.new do
      Thread.stop
      loop do
        @sh_mutex.synchronize { Thread.current.terminate if @shutdown }
        sleep(1)
      end
    end
  end

  def <<(runnable)
    @work << runnable
    self
  end

  def thread_work
    loop do
      @sh_mutex.synchronize do
        if @shutdown
          puts "#{Thread.current} stopping";
          Thread.current.terminate
        end
      end
      puts "#{Thread.current.inspect} is one of #{@work.num_waiting} waiting for work"
      job = @work.deq
      begin
        job.run if job != nil
        Thread.pass
      rescue => e
        puts e
        next
      end
    end
  end

  def start
    @workers.each { |w| w.run }
    @monitor.run
  end

  def join
    @monitor.join
  end

  def shutdown(wait = true)
    @sh_mutex.synchronize { @shutdown = true }
    @workers.each { |w| w.join if w.alive? } if wait
  end

  attr_reader :group
end

class Runnable
  def initialize(*args, &block)
    @block = block
  end

  def run
    @block.call
  end
end

pool = ThreadPool.new(8)

pool.start
job1 = Runnable.new do
  3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end

vagrant = Runnable.new { raise "broken" }

pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1

Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t); pool.shutdown(false) }
pool.join
pool.shutdown

puts "Thread group"
pool.group.list.each { |w| puts w.inspect }

puts "Thread.list"
Thread.list.each { |w| puts w.inspect }

···

On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:

On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************

Hi Eric

[snip]

ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
thread you spawn so you can see who is spawning the extra threads.

Hmmm... I think I might use a ThreadGroup, no? :wink:

Also, I've looked at this from a number of different angles, but it
appears to be no way to create a normal thread pool with Ruby.

Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...

What do you want?

Something like this (actually, it was straightforward enough once I
thought about it a little):

$ cat tpool.rb
require 'thread'

class ThreadPool
  def initialize(size)
    @work = Queue.new

# @workers =

You don't need to keep threads in an Array because you don't keep track of their status or #value. (Its better to just push a value onto a Queue than to check #value because you can be sure you're always getting something that is valid.)

    @group = ThreadGroup.new
    @shutdown = false
    @sh_mutex = Mutex.new

I don't think this mutex protects anything, assignment of a constant will be atomic.

    size.times do

        Thread.new { @group.add Thread.current; thread.stop; thread_work }

    end
    @monitor = Thread.new do
      Thread.stop
      loop do
        @sh_mutex.synchronize { Thread.current.terminate if @shutdown }
        sleep(1)
      end
    end
  end

  def <<(runnable)
    @work << runnable
    self
  end

  def thread_work
    loop do
      @sh_mutex.synchronize do
        if @shutdown
          puts "#{Thread.current} stopping";
          Thread.current.terminate
        end
      end
      puts "#{Thread.current.inspect} is one of #{@work.num_waiting} waiting for work"
      job = @work.deq
      begin
        job.run if job != nil
        Thread.pass
      rescue => e
        puts e
        next
      end
    end
  end

  def start

      @group.list.each { |w| w.run }

    @monitor.run
  end

  def join
    @monitor.join
  end

  def shutdown(wait = true)

      @group.enclose

    @sh_mutex.synchronize { @shutdown = true }

      @group.list.first.join until @group.list.empty? if wait

  end

  attr_reader :group
end

class Runnable
  def initialize(*args, &block)
    @block = block
  end

  def run
    @block.call
  end
end

pool = ThreadPool.new(8)

pool.start
job1 = Runnable.new do
  3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end

vagrant = Runnable.new { raise "broken" }

pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1

Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t); pool.shutdown(false) }
pool.join
pool.shutdown

puts "Thread group"
pool.group.list.each { |w| puts w.inspect }

The group should always be empty after shutdown if you waited. A ThreadGroup does not hold dead threads.

···

On 30 Aug 2005, at 04:25, Andrew S. Townley wrote:

On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:

On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:

puts "Thread.list"
Thread.list.each { |w| puts w.inspect }

--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04

[lots of really useful feedback deleted]

Thanks for the feedback, Eric. I'd actually made some other changes to
it prior to getting your comments. I have to say, looking at the
difference, I can see some value in the ThreadGroup, but I don't really
get why you're so excited about it... this is for another day, I think.

In other news, I found the mystery multiplying threads and have a
problem that I'm not quite sure how to solve with the tools at hand...
Maybe someone out there can help, but maybe the answer is to just roll
my own something-or-other again.

The problem came from two seemingly benign blocks of code:

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

and this (/usr/lib/ruby/1.8/thread.rb):

    272 #
    273 # Retrieves data from the queue. If the queue is empty, the calling thread is
    274 # suspended until data is pushed onto the queue. If +non_block+ is true, the
    275 # thread isn't suspended, and an exception is raised.
    276 #
    277 def pop(non_block=false)
    278 while (Thread.critical = true; @que.empty?)
    279 raise ThreadError, "queue empty" if non_block
    280 @waiting.push Thread.current
    281 Thread.stop
    282 end
    283 @que.shift
    284 ensure
    285 Thread.critical = false
    286 end
    287 alias shift pop
    288 alias deq pop

because of this (/usr/lib/ruby/1.8/timeout.rb):

     32 module Timeout
     33 class Error<Interrupt
     34 end
     35
     36 def timeout(sec, exception=Error)
     37 return yield if sec == nil or sec.zero?
     38 begin
     39 x = Thread.current
     40 y = Thread.start {
     41 sleep sec
     42 x.raise exception, "execution expired" if x.alive?
     43 }
     44 yield sec
     45 # return true
     46 ensure
     47 y.kill if y and y.alive?
     48 end
     49 end
     50 module_function :timeout
     51 end

The key line is 280. The problem seems to come from @waiting being an
array which holds on to references to the threads created in line 40
(which is also why I couldn't find it because I was tracing for
Thread#new and not Thread#start doh!!). Even though the thread seems to
definitely be killed in line 47, the array still holds a reference to
it, so I'm guessing that like Java, this prevents garbage collection for
a while. Therefore, when I was looking at the list, the threads created
here were still in it.

I'm a bit worried about this block in thread.rb, though:

    250 #
    251 # Pushes +obj+ to the queue.
    252 #
    253 def push(obj)
    254 Thread.critical = true
    255 @que.push obj
    256 begin
    257 t = @waiting.shift
    258 t.wakeup if t
    259 rescue ThreadError
    260 retry
    261 ensure
    262 Thread.critical = false
    263 end
    264 begin
    265 t.run if t
    266 rescue ThreadError
    267 end
    268 end
    269 alias << push
    270 alias enq push

because I really don't like what I've observed happening in #257/8
here. Based on what I'm doing, the waiting array will eventually get
huge... and I mean, HUGE. Also, based on further experiments, adding
items to the queue will reduce @waiting.length by n, however there will
be a lot more attempted reads than there will attempted writes to the
queue.

I'm open to suggestions, but I can't just do it without the timer
because I need to get control back every n seconds so I can do things
like graceful shutdown, etc.

Any help greatly appreciated. Thanks in advance,

ast

Here's the full test program (not out to win any style awards with the
calls to read, btw) :slight_smile:

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

read(1); read(1); read(1); read(1); read(1); read(1)
@queue << "one"
read(1); read(1); read(1); read(1); read(1); read(1)

puts Thread.list.join(", ") # only one by the end

···

On Tue, 2005-08-30 at 19:05, Eric Hodel wrote:

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************

[lots of really useful feedback deleted]

Thanks for the feedback, Eric. I'd actually made some other changes to
it prior to getting your comments. I have to say, looking at the
difference, I can see some value in the ThreadGroup, but I don't really
get why you're so excited about it... this is for another day, I think.

In other news, I found the mystery multiplying threads and have a
problem that I'm not quite sure how to solve with the tools at hand...
Maybe someone out there can help, but maybe the answer is to just roll
my own something-or-other again.

The problem came from two seemingly benign blocks of code:

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT: #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

and this (/usr/lib/ruby/1.8/thread.rb):

    280 @waiting.push Thread.current

because of this (/usr/lib/ruby/1.8/timeout.rb):

     40 y = Thread.start {
     41 sleep sec
     42 x.raise exception, "execution expired" if x.alive?
     43 }
     46 ensure
     47 y.kill if y and y.alive?

The key line is 280. The problem seems to come from @waiting being an
array which holds on to references to the threads created in line 40
(which is also why I couldn't find it because I was tracing for
Thread#new and not Thread#start doh!!). Even though the thread seems to
definitely be killed in line 47, the array still holds a reference to
it, so I'm guessing that like Java, this prevents garbage collection for
a while. Therefore, when I was looking at the list, the threads created
here were still in it.

Yes. A ThreadGroup doesn't have the GC problem, but would not be suitable here. (A Thread can only belong to one ThreadGroup, and Queue shouldn't reorganize threads it didn't create.)

I'm a bit worried about this block in thread.rb, though:

    257 t = @waiting.shift
    258 t.wakeup if t

because I really don't like what I've observed happening in #257/8
here. Based on what I'm doing, the waiting array will eventually get
huge... and I mean, HUGE. Also, based on further experiments, adding
items to the queue will reduce @waiting.length by n, however there will
be a lot more attempted reads than there will attempted writes to the
queue.

More job runners than jobs?

I'm open to suggestions, but I can't just do it without the timer
because I need to get control back every n seconds so I can do things
like graceful shutdown, etc.

If you need safe concurrent access like a Queue and timeouts you may find rinda/tuplespace.rb useful. Somewhere around here I have a stream implementation for it (but its not terribly difficult to write from scratch). I think it can be modified to have timeouts on pop.

(I have RDoc patches for rinda out for review.)

Here's the full test program (not out to win any style awards with the
calls to read, btw) :slight_smile:

Give me a bit and I think I can make your test program with with a TupleSpace streams.

···

On 30 Aug 2005, at 16:40, Townley, Andrew wrote:

On Tue, 2005-08-30 at 19:05, Eric Hodel wrote:

--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04

With all the talk about Muds I was hoping the Subject was referring to a
Roguelike game in Ruby.. :slight_smile:

$ cat test.rb
require 'ts_stream'

ts = Rinda::TupleSpace.new 1
stream = Rinda::Stream.new ts, 1

def read(stream, timeout)
   puts "READ: #{stream.length} elements"
   puts "*** #{stream.pop timeout} ***"
rescue Rinda::Stream::ClosedError
   puts "CLOSED: #{stream.length} elements"
rescue Rinda::Stream::TimeoutError
   puts "TIMEOUT: #{stream.length} elements"
end

6.times { read stream, 1 }
stream.push "one"
6.times { read stream, 1 }

p Thread.list
$ ruby test.rb
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 1 elements
*** one ***
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
READ: 0 elements
TIMEOUT: 0 elements
[#<Thread:0x33898c sleep>, #<Thread:0x1c1740 run>]

ts_stream.rb (4.6 KB)

···

On 31 Aug 2005, at 14:51, Eric Hodel wrote:

On 30 Aug 2005, at 16:40, Townley, Andrew wrote:

I'm open to suggestions, but I can't just do it without the timer
because I need to get control back every n seconds so I can do things
like graceful shutdown, etc.

If you need safe concurrent access like a Queue and timeouts you may find rinda/tuplespace.rb useful. Somewhere around here I have a stream implementation for it (but its not terribly difficult to write from scratch). I think it can be modified to have timeouts on pop.

Here's the full test program (not out to win any style awards with the
calls to read, btw) :slight_smile:

Give me a bit and I think I can make your test program with with a TupleSpace streams.

Lyndon Samson wrote:

With all the talk about Muds I was hoping the Subject was referring to a
Roguelike game in Ruby.. :slight_smile:

I'm with you, brother! What say we write one; we can use dwemthy as a core... :wink:

Even better with renewer objects.

require 'ts_stream'

ts = Rinda::TupleSpace.new 1
stream = Rinda::Stream.new ts, 1

class Renewer
   attr_accessor :shutdown
   def initialize() @shutdown = false end
   def renew() return !@shutdown end
end

renewer = Renewer.new

def read(stream, timeout)
   puts "READ: #{stream.length} elements"
   puts "*** #{stream.pop timeout} ***"
rescue Rinda::Stream::ClosedError
   puts "CLOSED: #{stream.length} elements"
rescue Rinda::Stream::TimeoutError
   puts "TIMEOUT: #{stream.length} elements"
end

6.times { Thread.start do read stream, renewer end }
stream.push "one"

puts "#{Thread.list.length} live threads"

renewer.shutdown = true

sleep 2

puts "#{Thread.list.length} live threads"

···

--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04

Hi Eric,

Sorry for the late reply--haven't been able to keep up with the list
traffic.

I hadn't considered any of the Rinda stuff because what I'm trying to
model is queues. The solution that I have in place with the
TimedReadQueue and the ThreadPool (slightly modified) works.

Not knowing much about the Rinda package, I assume the Renewer critter
is some sort of implicit interface used by Rinda to determine if it
should timeout or close the stream? Overall, I guess it just proves
that there's always more than one solution to a given problem. :slight_smile:

Thanks for all your help with this. I'm sure I'll have more questions
as I get further into Ruby.

Cheers,

ast

···

On Thu, 2005-09-01 at 08:08, Eric Hodel wrote:

Even better with renewer objects.

***************************************************************************************************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************

Not knowing much about the Rinda package, I assume the Renewer critter
is some sort of implicit interface used by Rinda to determine if it
should timeout or close the stream?

Its more low-level than that, it can be used to expire the Tuple it is attached to.

···

On 12 Sep 2005, at 03:12, Andrew S. Townley wrote:

Overall, I guess it just proves
that there's always more than one solution to a given problem. :slight_smile:

--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04