Thread-safe priority queue?

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f)
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Cheers,
Sean

Sean I seem to fail to understand why that change should have any
impact on Joël's work, can you elaborate please?

Cheers
Robert

···

--
http://ruby-smalltalk.blogspot.com/

---
AALST (n.) One who changes his name to be further to the front
D.Adams; The Meaning of LIFF

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

T.

···

On Jul 9, 9:08 am, "Sean O'Halpin" <sean.ohal...@gmail.com> wrote:

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d\.\.\.\)
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Sean O'Halpin wrote:

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db98931e4a74f\)
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

It's pretty easy to work around, I think. Try the following code. It's based on something I'm using in live code and it seems to pass the test referenced in the above link.

Btw, it's great that RBTree is a gem now. Thanks to whoever did that.

require 'thread'
require 'rbtree'

class PriorityQueue
   def size
     @tree.size
   end

   def initialize(*)
     super
     @tree = MultiRBTree.new
     @que = Queue.new
     @mutex = Mutex.new
   end

   # Push +obj+ with priority equal to +pri+ if given or, otherwise,
   # the result of sending #queue_priority to +obj+. Objects are
   # dequeued in priority order, and first-in-first-out among objects
   # with equal priorities.
   def push(obj, pri = obj.queue_priority)
     @mutex.synchronize do
       if @que.num_waiting > 0
         @que << obj
       else
         @tree.store(pri, obj)
       end
     end
   end

   def pop(non_block=false)
     @mutex.synchronize do
       if (last=@tree.last)
         return @tree.delete(last[0]) # highest key, oldest first
       end

       if non_block
         raise ThreadError, "priority queue empty"
       end
     end
     @que.pop # wait
   end
end

···

--
       vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

I didn't explain that very well, did I? Joel's version inherits from
Queue and directly references an instance variable (@waiting) which
isn't there in the C version.

···

On Wed, Jul 9, 2008 at 6:34 PM, Robert Dober <robert.dober@gmail.com> wrote:

Sean I seem to fail to understand why that change should have any
impact on Joël's work, can you elaborate please?

Cheers
Robert

Thanks for the pointer but the Facets version isn't thread-safe.
Still searching... :slight_smile:

···

On Wed, Jul 9, 2008 at 6:50 PM, Trans <transfire@gmail.com> wrote:

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

Ooh, seconded! Stand up and be thanked :slight_smile:

martin

···

On Wed, Jul 9, 2008 at 11:17 PM, Joel VanderWerf <vjoel@path.berkeley.edu> wrote:

Btw, it's great that RBTree is a gem now. Thanks to whoever did that.

Looks like a race condition in that...

Joel VanderWerf wrote:

require 'thread'
require 'rbtree'

class PriorityQueue
  def size
    @tree.size
  end

  def initialize(*)
    super
    @tree = MultiRBTree.new
    @que = Queue.new
    @mutex = Mutex.new
  end

  # Push +obj+ with priority equal to +pri+ if given or, otherwise,
  # the result of sending #queue_priority to +obj+. Objects are
  # dequeued in priority order, and first-in-first-out among objects
  # with equal priorities.
  def push(obj, pri = obj.queue_priority)
    @mutex.synchronize do
      if @que.num_waiting > 0
        @que << obj
      else
        @tree.store(pri, obj)
      end
    end
  end

  def pop(non_block=false)
    @mutex.synchronize do
      if (last=@tree.last)
        return @tree.delete(last[0]) # highest key, oldest first
      end

      if non_block
        raise ThreadError, "priority queue empty"
      end
    end

       ### Race happens here: if someone else calls #push, then
       ### this thread will wait even though data is available.

    @que.pop # wait
  end
end

Will try to fix....

···

--
       vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Sean O'halpin wrote:

···

On Wed, Jul 9, 2008 at 6:50 PM, Trans <transfire@gmail.com> wrote:

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

Thanks for the pointer but the Facets version isn't thread-safe.
Still searching... :slight_smile:

Could throw a mutex around the facets version.
--
Posted via http://www.ruby-forum.com/\.

Joel VanderWerf wrote:

Looks like a race condition in that...

Proposed fix, using a condition var... still needs some eyeballing and some tests:

require 'thread'
require 'rbtree'

class PriorityQueue
   def size
     @tree.size
   end

   def initialize(*)
     super
     @tree = MultiRBTree.new
     @que = # should never have more than one entry
     @num_waiting = 0
     @mutex = Mutex.new
     @cond = ConditionVariable.new
   end

   # Push +obj+ with priority equal to +pri+ if given or, otherwise,
   # the result of sending #queue_priority to +obj+. Objects are
   # dequeued in priority order, and first-in-first-out among objects
   # with equal priorities.
   def push(obj, pri = obj.queue_priority)
     @mutex.synchronize do
       if @num_waiting > 0
         @que << obj
         @cond.signal
       else
         @tree.store(pri, obj)
       end
     end
   end

   def pop(non_block=false)
     @mutex.synchronize do
       if (last=@tree.last)
         return @tree.delete(last[0]) # highest key, oldest first
       end

       if non_block
         raise ThreadError, "priority queue empty"
       end

       @num_waiting += 1
       @cond.wait(@mutex)
       @num_waiting -= 1
       @que.pop
     end
   end
end

···

--
       vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Joel VanderWerf wrote:

Joel VanderWerf wrote:

Looks like a race condition in that...

Proposed fix, using a condition var... still needs some eyeballing and some tests:

That was not quite right either (because cond.signal only wakes the waiter, and doesn't schedule it). The following seems to complete without deadlocks or starvation.

require 'thread'
require 'rbtree'

class PriorityQueue
   def size
     @tree.size
   end

   def initialize(*)
     super
     @tree = MultiRBTree.new
     @mutex = Mutex.new
     @cond = ConditionVariable.new
   end

   # Push +obj+ with priority equal to +pri+ if given or, otherwise,
   # the result of sending #queue_priority to +obj+. Objects are
   # dequeued in priority order, and first-in-first-out among objects
   # with equal priorities.
   def push(obj, pri = obj.queue_priority)
     @mutex.synchronize do
       @tree.store(pri, obj)
       @cond.signal
     end
   end

   def pop(non_block=false)
     @mutex.synchronize do
       if (last=@tree.last)
         return @tree.delete(last[0]) # highest key, oldest first
       end

       if non_block
         raise ThreadError, "priority queue empty"
       end

       loop do
         @cond.wait(@mutex)
         if (last=@tree.last)
           return @tree.delete(last[0])
         end
       end
     end
   end
end

if __FILE__ == $0

   Thread.abort_on_exception = true

   pq = PriorityQueue.new

   n_items_per_thread = 1000
   n_writers = 10
   n_readers = 10

   writers = (0...n_writers).map do |i_thr|
     Thread.new do
       n_items_per_thread.times do |i|
         pri = rand(10)
         pq.push([pri, i, i_thr], pri)
         Thread.pass if rand(5) == 0
       end
     end
   end

   sleep 0.1 until pq.size > 100 # a little head start populating the tree

   results = Array.new(n_readers, 0)

   readers = (0...n_readers).map do |i|
     Thread.new do
       loop do
         pq.pop
         results[i] += 1
       end
     end
   end

   writers.each do |wr|
     wr.join
   end

   p results
   until pq.size == 0
     sleep 0.1
     p results
   end

   raise unless results.inject {|s,x|s+x} == n_items_per_thread * n_writers

end

···

--
       vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done... :wink:

I resurrected the Queue code from an old copy of 1.8.4 I had lying
around and went with that + your original version of the
PriorityQueue. Still putting it through its paces. I'll give your new
version a whirl too.

@Roger - just putting a mutex around all the access methods isn't
sufficient unfortunately (see Joel's code for evidence). I want the
calling thread to block if there's nothing in the queue (like the
standard lib Queue behaves). And once I start putting that machinery
in, I might as well write the whole thing myself. I was hoping to
avoid that (trying to be virtuous :slight_smile:

Regards,
Sean

···

On Thu, Jul 10, 2008 at 8:03 PM, Joel VanderWerf <vjoel@path.berkeley.edu> wrote:

Joel VanderWerf wrote:

Joel VanderWerf wrote:

Looks like a race condition in that...

Proposed fix, using a condition var... still needs some eyeballing and
some tests:

That was not quite right either (because cond.signal only wakes the waiter,
and doesn't schedule it). The following seems to complete without deadlocks
or starvation.

Sean O'Halpin wrote:

Thanks for taking the time to do this Joel. You can see why I was
hoping it had already been done... :wink:

And as you can see from my other thread, this code breaks on 1.8.6. :frowning:

···

--
       vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407