I'm trying to make a simple message queue. Messages can be added to
the queue by drb clients and the main queue class just sits in a
different thread and goes through the queue trying to send them out.
I figured I'd use the Queue class from Thread.
Here's the server:
#server
require 'thread'
require 'drb'
class Message
@@num = 0
def initialize message = "Time is now: #{Time.now}",&b
@num = @@num
@message = message
if b
@block = b
else
@block = lambda{rand > 0.5}
end
@@num +=1
end
def try_send
@block.call
end
def to_s
"#{self.class} number: #{@num} message: #{@message}"
end
end
class MsgQueue
include DRb::DRbUndumped
attr_reader :thread
def initialize sleeptime = rand
@queue = Queue.new
@sleeptime = sleeptime
end
def method_missing methid, *args, &b
@queue.send(methid, *args, &b)
end
def start
@thread = Thread.new {
loop do
if @queue.size != 0
msg = @queue.shift
if msg.try_send
puts "sending of message: #{msg} succeeded!"
else
puts "sending failed...."
@queue << msg #put it back in queue
end
else
puts "...queue empty..."
end
sleep @sleeptime
end
}
end
def start_works
@thread = Thread.new {
loop do
puts "queue size is now: #{@queue.size}"
sleep 0.1
end
}
end
end
if __FILE__ == $0
mq = MsgQueue.new 0.5
DRb.start_service('druby://localhost:9759', mq)
mq.start
mq.thread.join
DRb.thread.join
end # server
And here's the client:
#client:
require 'msgqueue'
require 'drb'
DRb.start_service()
mq = DRbObject.new(nil, 'druby://localhost:9759')
20.times do
puts "send message..."
mq << Message.new( "message from client...")
sleep 0.1
end #client
When I run the server and then the client I usually end up with something like:
$> ruby msgqueue.rb
...queue empty...
sending failed....
sending of message: Message number: 1 message: message from
client... succeeded!
sending of message: Message number: 2 message: message from client... succeeded!
sending failed....
/usr/local/lib/ruby/1.8/drb/drb.rb:733:in `open':
druby://phpe-dev-10.hf.intel.com:33135 - #<Errno::ECONNREFUSED:
Connection refused - connect
^^^^^^^
(2)> (DRb::DRbConnError)
from msgqueue.rb:73:in `join'
from msgqueue.rb:73
So what's with the port number 33135? It's not a port I'm using.
also, is Queue (the one that comes in with require 'thread' ) good for
this kind of use? It's advertised as being safe for multiple thread
access, but in this case there are also multiple processes involved as
well.
Phil