Threading and Deadlock

I'm making the back end for a feed reader. I plan to have a daemon
which periodically checks feeds for updates. The daemon will have one
refill thread responsible for maintaining a list of feeds to be updated.
It will also have n worker threads who pop feeds from the list and
process them. When the list is empty, the refill thread will sleep for
some amount of time (no need to update the feeds constantly), refill the
list of feeds, and signal to the workers to start up again.

Here's a crawler class as I have it so far. I've removed some of the
nonrelevant parts.

class Crawler
  def initialize(minutes_sleep)
    @minutes_sleep = minutes_sleep
    @first_run = true

    @feeds = Array.new

    @feeds_lock = Mutex.new
    @empty = ConditionVariable.new
    @filled = ConditionVariable.new
  end

  def fill
    @feeds_lock.synchronize do
      @empty.wait(@feeds_lock)
      if @first_run
        @first_run = false
      else
        sleep(60 * @minutes_sleep)
      end
      @feeds = Feed.find(:all, :conditions=>'active = 1')
      @filled.broadcast(@feeds_lock)
    end
  end

  def crawl(worker_number)
    @feeds_lock.synchronize do
      if @feeds.size == 0
        @empty.broadcast(@feeds_lock)
        @filled.wait(@feeds_lock)
      end

      feed = @feeds.pop
      # Do some processing
    end
  end
end

Then I kick off with another script that looks like this:

crawler = Crawler.new(minutes_sleep)
filler = Thread.new {crawler.fill}
workers = (1..num_workers).map do |i|
  Thread.new {crawler.crawl(i)}
end

filler.join
workers.each{|thread| thread.join}

This code results in a deadlock. It seems like the refill thread is not
waking up, doing its work, and signaling to the workers. But I have no
idea - I am stumped. Any pointers?

I did try to use the Queue class, but it seemed a little magical and I
couldn't quite figure out how to use it for my needs.

···

--
Posted via http://www.ruby-forum.com/.

Hello

I did try to use the Queue class, but it seemed a little magical and I
couldn't quite figure out how to use it for my needs.

  I unfortunately can't really where your deadlock comes from. But the
Queue class is really simple:

  Just push elements onto the Queue in the filler class, and pop them
from your workers... That's exactly what you need:

class Filler

  def refill
     ary = [pull the feeds]
     for element in ar
        queue << element
     end
     sleep...
  end
end

class Worker
  def work
    while(a = queue.pop)
      get a...
    end
  end
end

  And that's all. To finish, just push false into the queue once for
every worker thread, to make sure they exit.

  Vince

Have you thought through why this script needs to be threaded in the first
place? Is there an external latency you need to capture?

···

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

I'm making the back end for a feed reader. I plan to have a daemon
which periodically checks feeds for updates. The daemon will have one
refill thread responsible for maintaining a list of feeds to be updated.
It will also have n worker threads who pop feeds from the list and
process them. When the list is empty, the refill thread will sleep for
some amount of time (no need to update the feeds constantly), refill the
list of feeds, and signal to the workers to start up again.

Here's a crawler class as I have it so far. I've removed some of the
nonrelevant parts.

I'm seeing multiple problems in this code. #fill and #crawl don't appear to
have loops. #fill contains a statement that can sleep for multiple minutes
holding a mutex lock, but fortunately appears not to be reachable. Try
running with Thread.abort_on_exception set true to see if you are missing a
fault. Again, why does this program need to be threaded, since you've coded
it such that it will sleep holding a lock?

···

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

I'm making the back end for a feed reader. I plan to have a daemon
which periodically checks feeds for updates. The daemon will have one
refill thread responsible for maintaining a list of feeds to be updated.
It will also have n worker threads who pop feeds from the list and
process them. When the list is empty, the refill thread will sleep for
some amount of time (no need to update the feeds constantly), refill the
list of feeds, and signal to the workers to start up again.

Here's a crawler class as I have it so far. I've removed some of the
nonrelevant parts.

harp:~ > cat a.rb
require 'thread'

class Crawler
   DEFAULT = {
     'n_workers' => 4
   }

   def initialize opts = {}
     getopt = getopts opts
     @n_consumers = getopt['n_consumers']
     @q = Queue.new
     @producer = new_producer
     @consumers = Array.new(@n_consumers){ new_consumer }
   end

   def new_producer
     new_thread do
       loop do
         feeds = get_feeds
         feeds.each{|feed| @q.push feed}
       end
     end
   end

   def new_consumer
     new_thread do
       loop do
         feed = @q.pop
         process_feed feed
       end
     end
   end

   def get_feeds # placeholder
     %w( a b c )
   end

   def process_feed feed # placeholder
     trace "processing <#{ feed }>"
     sleep 1
   end

   def trace msg
     tid = Thread.current.object_id
     STDERR.puts "#{ msg } (#{ tid })"
   end

   def getopts opts
     lambda{|o| opts[o.to_s] || opts[o.to_s.intern] || DEFAULT[o.to_s]}
   end

   def new_thread &b
     Thread.new do
       Thread.current.abort_on_exception = true
       b.call
     end
   end
end

Crawler.new :n_consumers => 4

STDIN.gets

harp:~ > ruby a.rb
processing <a> (-609386786)
processing <b> (-609491016)
processing <c> (-609529486)
processing <a> (-609324242)
processing <b> (-609386786)
processing <c> (-609491016)
processing <a> (-609324242)
processing <b> (-609529486)
processing <c> (-609386786)
processing <a> (-609491016)
processing <b> (-609324242)
processing <c> (-609529486)

hth.

-a

···

On Thu, 28 Sep 2006, Jordan McKible wrote:

I'm making the back end for a feed reader. I plan to have a daemon which
periodically checks feeds for updates. The daemon will have one refill
thread responsible for maintaining a list of feeds to be updated. It will
also have n worker threads who pop feeds from the list and process them.
When the list is empty, the refill thread will sleep for some amount of time
(no need to update the feeds constantly), refill the list of feeds, and
signal to the workers to start up again.

Here's a crawler class as I have it so far. I've removed some of the
nonrelevant parts.

--
in order to be effective truth must penetrate like an arrow - and that is
likely to hurt. -- wei wu wei

Francis Cianfrocca wrote:

···

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

nonrelevant parts.

Have you thought through why this script needs to be threaded in the
first
place? Is there an external latency you need to capture?

Processing a feed entails an http request to retrieve it, parsing, and
updating the database, so it seems natural to use threads. This daemon
needs to be able to handle thousands of feeds.

--
Posted via http://www.ruby-forum.com/\.

Vincent Fourmond wrote:

class Filler

  def refill
     ary = [pull the feeds]
     for element in ar
        queue << element
     end
     sleep...
  end
end

I don't think this is quite what I need. The worker threads should
never exit - they should just sleep until the queue is refilled. It
seems like this configuration could lead to the queue being filled up
before it's completely depleated by workers.

···

--
Posted via http://www.ruby-forum.com/\.

Francis Cianfrocca wrote:

nonrelevant parts.

I'm seeing multiple problems in this code. #fill and #crawl don't appear
to
have loops.

ah, the loop, of course! The fill and crawl should be wrapped in loop do
{}, correct?

#fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

···

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

--
Posted via http://www.ruby-forum.com/\.

As coded, your design waits for multiple minutes, then (presumably) fires
off a few thousand HTTP requests, then goes right back to sleep, *holding
the mutex that your worker threads need to run.* Don't do that.

Now of course when you have a loop in crawl, I assume you'll have it grab
the lock, then run until the queue is consumed, which will block the fill
thread at the Mutex#synchronize call (assuming you're lucky enough for one
of the workers to get scheduled before the fill thread tries to go back to
sleep. If that's what you want, you really don't need threads in this app.

If you really think you need to do this with threads, then you need to
re-analyze your synchronization sets, and possibly define a few more
mutexes. Also, you mentioned a capturable external latency in the fill
operation, but not in the crawl. Is there one?

···

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

> #fill contains a statement that can sleep for multiple
> minutes
> holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

even if true - consider that you are effectively re-writing Queue to
accomplish your task. using the built-in Queue class will greatly simply your
goal.

fyi.

-a

···

On Thu, 28 Sep 2006, Jordan McKible wrote:

Francis Cianfrocca wrote:

On 9/27/06, Jordan McKible <jmckible@gmail.com> wrote:

nonrelevant parts.

I'm seeing multiple problems in this code. #fill and #crawl don't appear
to
have loops.

ah, the loop, of course! The fill and crawl should be wrapped in loop do
{}, correct?

#fill contains a statement that can sleep for multiple
minutes
holding a mutex lock, but fortunately appears not to be reachable.

This is by design - in order to not be constantly retrieving feeds (that
seems a little exessive for a feed reader) I want to have a break
between refills

--
in order to be effective truth must penetrate like an arrow - and that is
likely to hurt. -- wei wu wei

Jordan McKible wrote:

Vincent Fourmond wrote:

class Filler

  def refill
     ary = [pull the feeds]
     for element in ar
        queue << element
     end
     sleep...
  end
end

I don't think this is quite what I need. The worker threads should
never exit - they should just sleep until the queue is refilled.

  This is exactly what this scheme is doing. I was just mentionning the
exit so you give all the worker threads a chance to exit if the program
ever stops. I bet it will stop, one day, won't it ? The queue.pop is
*blocking*, which means it will wait until the queue has some elements:

-------------------------------------------------------------- Queue#pop
     pop(non_block=false)

···

------------------------------------------------------------------------
     Retrieves data from the queue. If the queue is empty, the calling
     thread is suspended until data is pushed onto the queue. If
     non_block is true, the thread isn't suspended, and an exception is
     raised.

  It

seems like this configuration could lead to the queue being filled up
before it's completely depleated by workers.

  You can't fill up the queue (unless you mean run out of memory ?). And
you can use SizedQueue if you're worried about it: you set a limit on
the number of elements in the queue. Then, when that number is reached, the

queue << elements

blocks. It wakes up again when workers have depleted it enough.

  I know this scheme works, as I use it for a massive parallel download
program: I build a queue of elements to download, and I use several
threads to pull them. Works perfectly. This really is exactly what you
need: no need to bother about synchronisation whatsoever...

  Cheers !

  Vince