Non-blocking communication between Ruby processes

Hi, I run Unicorn which is a Rack http server using N forked worker processes.
I need the following:

- When a worker processes a HTTP request it must notify some data to other
independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker process
sends the notification and doesn't wait for response from the process XXX, so
the Unicorn worker can, at the moment, generate the HTTP response and send
back to the client, getting free to handle new HTTP requests.

- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically and
send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other alternative
or suggestion?

Thanks a lot.

···

--
Iñaki Baz Castillo <ibc@aliax.net>

I would probably first try a simple setup: make process XXX publish a Queue via DRb on a well known port and have one or more threads fetching from the queue and processing data. If you fear resource exhaustion, you can make the queue size limited. E.g.:

x.rb server
c.rb client

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1..THREAD_COUNT).map do
   Thread.new do
     while msg = QUEUE.deq
       p msg
     end
   end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
   puts Benchmark.times do
     QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
   end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But then demarcation of message boundaries might be more difficult etc.

Kind regards

  robert

···

On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:

Hi, I run Unicorn which is a Rack http server using N forked worker processes. I need the following:

- When a worker processes a HTTP request it must notify some data to other independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker process sends the notification and doesn't wait for response from the process XXX, so the Unicorn worker can, at the moment, generate the HTTP response and send back to the client, getting free to handle new HTTP requests.

- The ruby process XXX should use some kind of queue system to store notifications and handle them. In fact, it should take them periodically and send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using something as EventMachine for the XXX process and Unix/TCP socket communication between Unicorn processes and XXX process? any other alternative or suggestion?

Thanks a lot.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Hi, I run Unicorn which is a Rack http server using N forked worker processes.
I need the following:

- When a worker processes a HTTP request it must notify some data to other
independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker process
sends the notification and doesn't wait for response from the process XXX, so
the Unicorn worker can, at the moment, generate the HTTP response and send
back to the client, getting free to handle new HTTP requests.

If stressed enough, everything has to block/reject or run your systems
out of memory/disk space :slight_smile:

- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically and
send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other alternative
or suggestion?

If you only talk between processes on one machine (since you're trying
FIFOs), you can also check out the "posix_mq" gem/library I started
recently:

  posix_mq - POSIX message queues for Ruby

It's less portable than FIFOs but if you're running a modern GNU/Linux or
FreeBSD, it should work. The default queue sizes on Linux are small:
8192 bytes per message, and 10 messages in the queue. You'll need
root to increase them.

But then FIFOs are hard-coded to 65536 bytes total under Linux and a
4096 byte PIPE_BUF (POSIX only requires a 512 byte PIPE_BUF).

···

Iñaki Baz Castillo <ibc@aliax.net> wrote:

--
Eric Wong

Really thanks a lot.
just a question: is it DRb good enough for performance?

···

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
> Hi, I run Unicorn which is a Rack http server using N forked worker
> processes. I need the following:
>
> - When a worker processes a HTTP request it must notify some data to
> other independent Ruby process XXX (different than Unicorn).
>
> - This communication must be non-blocking, this is, the Unicorn worker
> process sends the notification and doesn't wait for response from the
> process XXX, so the Unicorn worker can, at the moment, generate the HTTP
> response and send back to the client, getting free to handle new HTTP
> requests.
>
> - The ruby process XXX should use some kind of queue system to store
> notifications and handle them. In fact, it should take them periodically
> and send via TCP (but not HTTP) to other server.
>
>
> Which is the best approach to design such communication? perhaps using
> something as EventMachine for the XXX process and Unix/TCP socket
> communication between Unicorn processes and XXX process? any other
> alternative or suggestion?
>
> Thanks a lot.

I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:

x.rb server
c.rb client

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1..THREAD_COUNT).map do
   Thread.new do
     while msg = QUEUE.deq
       p msg
     end
   end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
   puts Benchmark.times do
     QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
   end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

--
Iñaki Baz Castillo <ibc@aliax.net>

Really interesting. Is it safe to have various processes (Unicorn workers)
writting to a single posix_mq? or will the data be "mixed"? is there any way
to perform "atomic" writting operation in this queue?

Thanks.

···

El Jueves, 7 de Enero de 2010, Eric Wong escribió:

If you only talk between processes on one machine (since you're trying
FIFOs), you can also check out the "posix_mq" gem/library I started
recently:

        posix_mq - POSIX message queues for Ruby

--
Iñaki Baz Castillo <ibc@aliax.net>

I don't know about your requirements. Just try it out - you can start multiple clients and vary the number of threads and the queue size in the server at will. To me it seemed pretty fast. I did

$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

and message came really fast. Also note that each client prints timing so you can see how fast it is on your machine.

If you need more performance then I'm sure you'll find a Ruby binding to any of the queuing framework like GNU Queue, NQS and whatnot. But I'd start with the simple DRb based solution. It's easily done, you have everything you need and do not need to install extra software, not even gems.

I just notice, there was a bug in my code: I used Benchmark.times which prints timings of the current process. What I meant was Benchmark.measure. I have changed the code a bit so you can easy experiment with queue ssizes, thread counts and message counts (see below).

With this command line

t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq 1 $t`; do wait; done; cat cl-*

I get pretty good timings of 7.6ms / msg with unlimited Queue size and default thread count (5) for this unrealistic test that the queue is hammered.

Kind regards

  robert

Modified code:

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

THREAD_COUNT = (ARGV.shift || 5).to_i
QUEUE_SIZE = ARGV.shift

printf "%4d threads, queue size=%p\n", THREAD_COUNT, QUEUE_SIZE

URI="druby://localhost:8787"

Thread.abort_on_exception = true

QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new
# QUEUE.extend DRb::DRbUndumped

threads = (1..THREAD_COUNT).map do |i|
   Thread.new i do |id|
     while msg = QUEUE.deq
       printf "thread %2d: %p\n", id, msg
     end
   end
end

DRb.start_service(URI, QUEUE)
puts 'Started'
DRb.thread.join
puts 'Returned'
threads.each {|th| th.join rescue nil}
puts 'Done'

robert@fussel:~$

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

rep = (ARGV.shift || 20).to_i

QUEUE = DRb::DRbObject.new_with_uri(SERVER_URI)

QUEUE.enq "Started client"

Benchmark.bm 20 do |b|
   b.report "client %4d" % $$ do
     rep.times do |i|
       QUEUE.enq(sprintf("client %4d msg %4d at %-20s", $$, i, Time.now))
     end
   end
end

QUEUE.enq "Stopped client"

robert@fussel:~$

···

On 01/07/2010 03:07 PM, Iñaki Baz Castillo wrote:

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:

Hi, I run Unicorn which is a Rack http server using N forked worker
processes. I need the following:

- When a worker processes a HTTP request it must notify some data to
other independent Ruby process XXX (different than Unicorn).

- This communication must be non-blocking, this is, the Unicorn worker
process sends the notification and doesn't wait for response from the
process XXX, so the Unicorn worker can, at the moment, generate the HTTP
response and send back to the client, getting free to handle new HTTP
requests.

- The ruby process XXX should use some kind of queue system to store
notifications and handle them. In fact, it should take them periodically
and send via TCP (but not HTTP) to other server.

Which is the best approach to design such communication? perhaps using
something as EventMachine for the XXX process and Unix/TCP socket
communication between Unicorn processes and XXX process? any other
alternative or suggestion?

Thanks a lot.

I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:

x.rb server
c.rb client

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1..THREAD_COUNT).map do
   Thread.new do
     while msg = QUEUE.deq
       p msg
     end
   end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
   puts Benchmark.times do
     QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
   end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Really thanks a lot.
just a question: is it DRb good enough for performance?

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

These queues are completely atomic at the message level and descriptors
can be safely shared between processes/threads. SysV message queues
weren't thread-safe, but POSIX ones are.

···

Iñaki Baz Castillo <ibc@aliax.net> wrote:

El Jueves, 7 de Enero de 2010, Eric Wong escribió:
> If you only talk between processes on one machine (since you're trying
> FIFOs), you can also check out the "posix_mq" gem/library I started
> recently:
>
> posix_mq - POSIX message queues for Ruby

Really interesting. Is it safe to have various processes (Unicorn workers)
writting to a single posix_mq? or will the data be "mixed"? is there any way
to perform "atomic" writting operation in this queue?

--
Eric Wong

I don't know about your requirements. Just try it out - you can start
multiple clients and vary the number of threads and the queue size in
the server at will. To me it seemed pretty fast. I did

$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

and message came really fast. Also note that each client prints timing
so you can see how fast it is on your machine.

If you need more performance then I'm sure you'll find a Ruby binding to
any of the queuing framework like GNU Queue, NQS and whatnot. But I'd
start with the simple DRb based solution. It's easily done, you have
everything you need and do not need to install extra software, not even
gems.

Thanks a lot. I've tryed a code similar to this one:
  http://www.idle-hacking.com/2007/11/iopipe-for-interprocess-communication/

It uses a pipe file (of course there is no queue at all).

Well, sending 100000 strings (with a loop) it takes 2-3 seconds to receive and
print all the received data.
however using the DRb solution it just didn't finish (I had to interrupt the
process after 30 seconds due to CPU usage).

I'd like a simple solution. Using DRb could be nice. However using a pipe file
seems simpler and faster. The doubt I have now is about how secure is a pipe.
Could it leak memory if some process die or the reader process is not so fast
to handle the received data?

I just notice, there was a bug in my code: I used Benchmark.times which
prints timings of the current process. What I meant was
Benchmark.measure. I have changed the code a bit so you can easy
experiment with queue ssizes, thread counts and message counts (see below).

With this command line

t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq
1 $t`; do wait; done; cat cl-*

I get pretty good timings of 7.6ms / msg with unlimited Queue size and
default thread count (5) for this unrealistic test that the queue is
hammered.

Really thanks a lot, I'll try it.

···

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

--
Iñaki Baz Castillo <ibc@aliax.net>

Great!

···

El Jueves, 7 de Enero de 2010, Eric Wong escribió:

Iñaki Baz Castillo <ibc@aliax.net> wrote:
> El Jueves, 7 de Enero de 2010, Eric Wong escribió:
> > If you only talk between processes on one machine (since you're trying
> > FIFOs), you can also check out the "posix_mq" gem/library I started
> > recently:
> >
> > posix_mq - POSIX message queues for Ruby
>
> Really interesting. Is it safe to have various processes (Unicorn
> workers) writting to a single posix_mq? or will the data be "mixed"? is
> there any way to perform "atomic" writting operation in this queue?

These queues are completely atomic at the message level and descriptors
can be safely shared between processes/threads. SysV message queues
weren't thread-safe, but POSIX ones are.

--
Iñaki Baz Castillo <ibc@aliax.net>

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

Thanks a lot.

···

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

The doubt I have now is about how secure is a pipe.
Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

--
Iñaki Baz Castillo <ibc@aliax.net>

I've already tested it :slight_smile:

I've also realized that in case two processes perform "receive" for the same
mq then the messages received are distributed at 50% (one message for each
receiver). :slight_smile:

···

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

El Jueves, 7 de Enero de 2010, Eric Wong escribió:
> Iñaki Baz Castillo <ibc@aliax.net> wrote:
> > El Jueves, 7 de Enero de 2010, Eric Wong escribió:
> > > If you only talk between processes on one machine (since you're
> > > trying FIFOs), you can also check out the "posix_mq" gem/library I
> > > started recently:
> > >
> > > posix_mq - POSIX message queues for Ruby
> >
> > Really interesting. Is it safe to have various processes (Unicorn
> > workers) writting to a single posix_mq? or will the data be "mixed"? is
> > there any way to perform "atomic" writting operation in this queue?
>
> These queues are completely atomic at the message level and descriptors
> can be safely shared between processes/threads. SysV message queues
> weren't thread-safe, but POSIX ones are.

Great!

--
Iñaki Baz Castillo <ibc@aliax.net>

pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or new) reading thread has started.

···

On 07.01.2010 18:58, Iñaki Baz Castillo wrote:

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the
pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

--
Phillip Gawlowski

The doubt I have now is about how secure is a pipe. Could it leak memory if some process die or the reader process is not so
fast to handle the received data?

Hummm, I have a reader process and a writer process.

I thought you have multiple writers. Didn't you mention multiple forked Rack handlers?

The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in the pipe and the data is stored (in the filesystem?).

So there is the leaking problem...

Not exactly: the writer is blocked. You can try this out:

robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
^Z
[1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ 1 1 10001 ff

[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$

At the point where I pressed Ctrl-Z the writer hung because the pipe was full. (The size of a pipe is usually the memory page size of the OS IIRC, this would be 4k in case of Linux 32 bit).

I must investigate it a bit more...

I'd personally prefer to use the DRb approach because then you can actually send typed messages, i.e. whatever information you need. Also, it was fun to play around with those small test programs. :wink: And you can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when the reader goes away - for whatever reasons. You could write your messages to a file and use an approach like "tail -f" uses to read them. But this has the nasty effect of clobbering the file system plus if the reader goes away the file might grow arbitrary large. And you have locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is preferrable IMHO. The you can still decide in the client what you do if you cannot get rid of the message.

Thanks a lot.

You're welcome.

Kind regards

  robert

···

On 01/07/2010 06:58 PM, Iñaki Baz Castillo wrote:

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Ok, the fifo remains working at SO level so it can receive messages after some
SO buffer capability is filled. Then the writer process blocks when trying to
"flush" the data.
Fortunatelly it just blocks as Ruby thread level so other thread can work.

···

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:
> The doubt I have now is about how secure is a pipe.
> Could it leak memory if some process die or the reader process is not so
> fast to handle the received data?

Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in
the pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

--
Iñaki Baz Castillo <ibc@aliax.net>

>> The doubt I have now is about how secure is a pipe.
>> Could it leak memory if some process die or the reader process is not so
>> fast to handle the received data?
>
> Hummm, I have a reader process and a writer process.

I thought you have multiple writers. Didn't you mention multiple forked
Rack handlers?

Yes, that's true. Sure I'll get into problems when writting in the FIFO from
varios clients at the same time :slight_smile:
But for that I could generate so many fifo's as Rack workers...

> The wirter process writes into the pipe file.
> If I kill the reader process then the writer process remains writting in
> the pipe and the data is stored (in the filesystem?).
>
> So there is the leaking problem...

Not exactly: the writer is blocked. You can try this out:

robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
^Z
[1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ 1 1 10001 ff

[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$

At the point where I pressed Ctrl-Z the writer hung because the pipe was
full. (The size of a pipe is usually the memory page size of the OS
IIRC, this would be 4k in case of Linux 32 bit).

> I must investigate it a bit more...

I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. :wink: And you
can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons.

It's realtime info so if the reader dies then it's not so important to recover
that information when starting again. Well, it would be nice to recover it
just for 5-10 minutes, but no more.

You could write your
messages to a file and use an approach like "tail -f" uses to read them.
  But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.

Yes, I must think a bit aobut it :slight_smile:

Thanks a lot for your help.

···

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

On 01/07/2010 06:58 PM, Iñaki Baz Castillo wrote:
> El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:

--
Iñaki Baz Castillo <ibc@aliax.net>

Unfortunatelly #full? is not a method of File :frowning:
Note that I'm using a fifo file (created with "mkfifo file") so it is not
"stored" in the filesystem. Instead it's just a communication between two
processes at SO level via SO's buffers.

···

El Jueves, 7 de Enero de 2010, Phillip Gawlowski escribió:

On 07.01.2010 18:58, Iñaki Baz Castillo wrote:
> Hummm, I have a reader process and a writer process.
> The wirter process writes into the pipe file.
> If I kill the reader process then the writer process remains writting in
> the pipe and the data is stored (in the filesystem?).
>
> So there is the leaking problem... I must investigate it a bit more...

pipe.write unless pipe.full?

--
Iñaki Baz Castillo <ibc@aliax.net>

Hi Robert, I'd like to thank you the help you gave me in this and other
threads. Finally I've decided to use posix message queue [*] under Ruby.

The reason is that it allows safely multiple processes or threads using the
same mqueue to write message (atomic strings) and also having multiple
processes reading from the same mqueue which means load-balancing out of the
box :slight_smile:

The queue size is configurable and the writer/reader can write/read in the
mqueue in a blocking or non blocking way.

Also, mqueues allow setting a priority to the messages so those messages with
higher priority are fetched first when reading the mqueue.

Posix message queues are just 20-40% slower than pipes in my benchmarks (but
pipes are no multiprocess/thread safe).

I would like to share a working example:

---- posix_mq_reader.rb ------------------------------
require "posix_mq"

# Parameters:
# - queue name (must start by "/")
# - flags:
# - IO::RDONLY => Just to read from the queue
# - IO::CREAT => Create if it doesn't exist
MQ = POSIX_MQ.new "/my_mq", IO::RDONLY | IO::CREAT

loop do
  # Blocking waiting:
  msg = MQ.receive.first # It returns an array [message, priority]
  puts "messsage received: #{msg}"
end

···

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. :wink: And you
can have the reader run on any machine in the network.

------------------------------------------------------

---- posix_mq_writer.rb ------------------------------
require "posix_mq"

# Open with these options:
# - IO::WRONLY => Just to write into the queue.
# - IO::CREAT => Create if it doesn't exist.
# - IO::NONBLOCK => Don't block when writting (instead raise Errno::EAGAIN)
MQ = POSIX_MQ.new("/my_mq", IO::WRONLY | IO::CREAT | IO::NONBLOCK)

def send(msg)
  begin
    MQ << msg
  rescue Errno::EAGAIN
    puts "Errno::EAGAIN received, the queue is full!"
  end
end
------------------------------------------------------

Now the reader and writer can be open multiple times sharing the same mqueue
:slight_smile:

I also tested your suggested solution with DRb with is really nice, but I
don't need all the features DRb provides (I just need to pass a simple string
to other process(es) from multiple workers).

Again thanks a lot to all the people who contributed in this thread, I've
learnt a lot.

Best regards.

[*] posix_mq - POSIX message queues for Ruby

--
Iñaki Baz Castillo <ibc@aliax.net>

pipe.write unless pipe.full?

Unfortunatelly #full? is not a method of File :frowning:

Well, yes, you'd have to implement the method (or something like it) yourself. :wink:

Note that I'm using a fifo file (created with "mkfifo file") so it is not
"stored" in the filesystem. Instead it's just a communication between two
processes at SO level via SO's buffers.

Yeah, I gathered that from your other posts. The general point, though, still applies: check the pipe's size, and if it grows too large, spin off a new reading thread.

···

On 07.01.2010 19:50, Iñaki Baz Castillo wrote:

--
Phillip Gawlowski

I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. :wink: And you
can have the reader run on any machine in the network.

Hi Robert, I'd like to thank you the help you gave me in this and other
threads. Finally I've decided to use posix message queue [*] under Ruby.

You're welcome!

The reason is that it allows safely multiple processes or threads using the
same mqueue to write message (atomic strings) and also having multiple
processes reading from the same mqueue which means load-balancing out of the
box :slight_smile:

The queue size is configurable and the writer/reader can write/read in the
mqueue in a blocking or non blocking way.

Also, mqueues allow setting a priority to the messages so those messages with
higher priority are fetched first when reading the mqueue.

Posix message queues are just 20-40% slower than pipes in my benchmarks (but
pipes are no multiprocess/thread safe).

That sounds good! I have never worked with POSIX MQ so I definitively
learned something new as well.

I would like to share a working example:

Thank you for the summary and the code! That way other readers will
benefit as well.

I also tested your suggested solution with DRb with is really nice, but I
don't need all the features DRb provides (I just need to pass a simple string
to other process(es) from multiple workers).

Well, you don't have to use them. :slight_smile: But POSIX MQ looks equally simple to use.

Kind regards

robert

···

2010/1/9 Iñaki Baz Castillo <ibc@aliax.net>:

El Jueves, 7 de Enero de 2010, Robert Klemme escribió:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

I believe pipes can be used concurrently if reads and writes are less than
or equal to PIPE_BUF bytes. Is the size limitation the problem you were
hinting at or something else?

Gary Wright

···

On Jan 9, 2010, at 12:54 PM, Iñaki Baz Castillo wrote:

Posix message queues are just 20-40% slower than pipes in my benchmarks (but
pipes are no multiprocess/thread safe).