I don't know about your requirements. Just try it out - you can start multiple clients and vary the number of threads and the queue size in the server at will. To me it seemed pretty fast. I did
$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done
and message came really fast. Also note that each client prints timing so you can see how fast it is on your machine.
If you need more performance then I'm sure you'll find a Ruby binding to any of the queuing framework like GNU Queue, NQS and whatnot. But I'd start with the simple DRb based solution. It's easily done, you have everything you need and do not need to install extra software, not even gems.
I just notice, there was a bug in my code: I used Benchmark.times which prints timings of the current process. What I meant was Benchmark.measure. I have changed the code a bit so you can easy experiment with queue ssizes, thread counts and message counts (see below).
With this command line
t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq 1 $t`; do wait; done; cat cl-*
I get pretty good timings of 7.6ms / msg with unlimited Queue size and default thread count (5) for this unrealistic test that the queue is hammered.
Kind regards
robert
Modified code:
robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19
require 'thread'
require 'drb'
THREAD_COUNT = (ARGV.shift || 5).to_i
QUEUE_SIZE = ARGV.shift
printf "%4d threads, queue size=%p\n", THREAD_COUNT, QUEUE_SIZE
URI="druby://localhost:8787"
Thread.abort_on_exception = true
QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new
# QUEUE.extend DRb::DRbUndumped
threads = (1..THREAD_COUNT).map do |i|
Thread.new i do |id|
while msg = QUEUE.deq
printf "thread %2d: %p\n", id, msg
end
end
end
DRb.start_service(URI, QUEUE)
puts 'Started'
DRb.thread.join
puts 'Returned'
threads.each {|th| th.join rescue nil}
puts 'Done'
robert@fussel:~$
robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19
require 'drb/drb'
require 'benchmark'
SERVER_URI="druby://localhost:8787"
rep = (ARGV.shift || 20).to_i
QUEUE = DRb::DRbObject.new_with_uri(SERVER_URI)
QUEUE.enq "Started client"
Benchmark.bm 20 do |b|
b.report "client %4d" % $$ do
rep.times do |i|
QUEUE.enq(sprintf("client %4d msg %4d at %-20s", $$, i, Time.now))
end
end
end
QUEUE.enq "Stopped client"
robert@fussel:~$
···
On 01/07/2010 03:07 PM, Iñaki Baz Castillo wrote:
El Jueves, 7 de Enero de 2010, Robert Klemme escribió:
On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
Hi, I run Unicorn which is a Rack http server using N forked worker
processes. I need the following:
- When a worker processes a HTTP request it must notify some data to
other independent Ruby process XXX (different than Unicorn).
- This communication must be non-blocking, this is, the Unicorn worker
process sends the notification and doesn't wait for response from the
process XXX, so the Unicorn worker can, at the moment, generate the HTTP
response and send back to the client, getting free to handle new HTTP
requests.
- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically
and send via TCP (but not HTTP) to other server.
Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other
alternative or suggestion?
Thanks a lot.
I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:
x.rb server
c.rb client
robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19
require 'thread'
require 'drb'
QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"
QUEUE = SizedQueue.new QUEUE_SIZE
threads = (1..THREAD_COUNT).map do
Thread.new do
while msg = QUEUE.deq
p msg
end
end
end
DRb.start_service(URI, QUEUE)
DRb.thread.join
robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19
require 'drb/drb'
require 'benchmark'
SERVER_URI="druby://localhost:8787"
QUEUE = DRbObject.new_with_uri(SERVER_URI)
10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
end
end
robert@fussel:~$
Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.
Really thanks a lot.
just a question: is it DRb good enough for performance?
--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/