Hi,
I'm trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn't support
timeout behaviour, so I went about implementing a queue myself. I
missed java's Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .
I ended up using `thread.join(timeout)` which blocks the current
thread until the thread that `join` was called on finishes (or until
timeout is reached). My queue doesn't need to join any particular
thread, though, it just needs to be notified when a new message is
added to the queue. So I needed to create a `dummy` thread solely to
provide notification. What I came up with is this:
require 'thread'
class MessageQueue
def initialize
@lock = Mutex.new
@t = Thread.new{Thread.stop} # this thread exists solely for notification
@queue = []
end
def enqueue msg
@lock.synchronize {
@queue.push msg
t=@t
@t=Thread.new{Thread.stop}
t.kill# notifiy waiting thread
}
end
def dequeue timeout=nil
msg = nil
stop = Time.now+timeout if timeout
while msg == nil
@lock.synchronize {msg = @queue.shift}
break if timeout && Time.new >= stop
@t.join(timeout) unless msg # wait for new message
end #while
msg
end
end
While this works fine, it seems like a bit of a kludge because I have
to create/misuses threads solely to provided notification to the
blocking thread.
Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?
Thanks,
-tim
Use 'timeout':
require 'timeout'
require 'thread'
q = Queue.new
Thread.new do
q.push "thing"
sleep 10
end
begin
Timeout::timeout(1) do
q.pop
end
rescue Timeout::Error
puts "timed out!"
end
···
On 5/12/07, Tim Becker <a2800276@gmail.com> wrote:
I'm trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn't support
timeout behaviour, so I went about implementing a queue myself. I
missed java's Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .
--
Avdi
Unless/until the built-in classes support timeouts, using a thread to
track the timeout is sadly your best option.
However, there are a number of problems with the specific approach
you've used here (e.g. not all accesses to @t are protected by the
mutex)... here's an alternate implementation...
require 'thread'
begin
require 'fastthread'
rescue LoadError
end
class MessageQueue
def initialize
@lock = Mutex.new
@messages =
@readers =
end
def enqueue msg
@lock.synchronize do
unless @readers.empty?
@readers.pop << msg
else
@messages.push msg
end
end
end
def dequeue timeout=nil
timeout_thread = nil
begin
reader = nil
@lock.synchronize do
unless @messages.empty?
# fast path
return @messages.shift
else
reader = Queue.new
@readers.push reader
if timeout
timeout_thread = Thread.new do
sleep timeout
@lock.synchronize do
@readers.delete reader
reader << nil
end
end
end
end
end
# either timeout or writer will send to us
reader.shift
ensure
# (try to) clean up timeout thread
timeout_thread.run if timeout_thread
end
end
end
Queue may seem sort of heavyweight to use for "callbacks" this way, but
if you use fastthread they are pretty inexpensive, and queues take care
of a _lot_ of the bookkeeping you would need to do to make this safe
otherwise.
One remaining issue is that it's possible for the timeout thread to
sleep _after_ the call to timeout_thread.run, so the cleanup isn't
necessarily that effective. However, Thread#kill (or Thread#raise)
isn't an option, since those can leave things in an inconsistent state.
A better solution would be to maintain a dedicated thread for tracking
timeouts which you don't have to worry about cleaning up -- I'll be
releasing a library to do that soon, but for now the above solution is
the best I can do.
-mental
···
On Sat, 2007-05-12 at 22:39 +0900, Tim Becker wrote:
Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?
I'd be cautious of using timeout, since not all of stdlib is safe with
respect to asynchronous exceptions (it's sometimes very difficult to do
properly). I think it's okay with the old thread.rb implementation of
Queue specifically, but I'm not 100% sure about the current C
implementation or other Ruby implementations like JRuby.
-mental
···
On Sun, 2007-05-13 at 04:17 +0900, Avdi Grimm wrote:
Use 'timeout':