Trouble with mkfifo and forks

# In a nutshell:
1) Create a pipe
2) Fork twice
3) Each for sends 10 strings over the pipe
4) See what comes out the other side

So, why isn't what I send coming out?

class WaitListTest < Test::Unit::TestCase
  NUMBER_OF_ORDERS = 2
  PIPE = "test/wl_pipe"

  def setup
    `rm #{PIPE}`
    `mkfifo #{PIPE}`
  end

  def test_registering_last_race
    term_count = 0
    responses = []
    pids = []

    NUMBER_OF_ORDERS.times do |i|
      pids << Process.fork do
        sleep 1 + (0.1 * i)
        10.times do |a|
          File.open("#{RAILS_ROOT}/#{PIPE}", "w+") do |pipe|
            sleep 0.1
            pipe.puts "Fork #{i} - #{a}"
          end
        end
        File.open("#{RAILS_ROOT}/#{PIPE}", "a") do |pipe|
          sleep 0.1
          pipe.puts "DONE"
        end
      end
    end

    while term_count < NUMBER_OF_ORDERS
      File.open("#{RAILS_ROOT}/#{PIPE}", "r+") do |pipe|
        val = pipe.gets
        if val.match(/DONE/)
          term_count += 1
        else
          responses << val
        end
      end
    end
    puts responses.sort
    puts responses.count
  end
end

# output
Started
Fork 0 - 0
Fork 0 - 1
Fork 0 - 2
Fork 0 - 3
Fork 0 - 4
Fork 0 - 5
Fork 0 - 7
Fork 0 - 9
Fork 1 - 0
Fork 1 - 1
Fork 1 - 2
Fork 1 - 3
Fork 1 - 4
Fork 1 - 5
Fork 1 - 7
Fork 1 - 8
Fork 1 - 9
17

Why isn't this 20?

I'm probably naive about something, but fifos almost never work right for what I want to do, and I usually end up using (unix domain) sockets (which also makes it easier to move to tcp sockets if you need to distribute across hosts).

Anyway, in your code, it seems that the repeated closing of the *read* end of the pipe causes some data to be lost. If you can modify your read loop to keep using the same IO object, it seems to work (see below). Also, I found I needed to watch for #gets returning nil (though that doesn't happen reliably).

Perhaps someone who knows more about pipes will "pipe up" on this thread.

class WaitListTest
   NUMBER_OF_ORDERS = 2
   PIPE = File.expand_path("~/tmp/wl_pipe")

   def setup
     `rm -f #{PIPE}`
     `mkfifo #{PIPE}`
   end

   def test_registering_last_race
     term_count = 0
     responses = []
     pids = []

     NUMBER_OF_ORDERS.times do |i|
       pids << Process.fork do
         sleep 0.1 + (0.1 * i)
         10.times do |a|
           File.open(PIPE, "w") do |pipe|
             sleep 0.1
             pipe.puts "Fork #{i} - #{a}"
           end
         end
         File.open(PIPE, "w") do |pipe|
           sleep 0.1
           pipe.puts "DONE"
         end
       end
     end

     pipe = File.open(PIPE, "r")
     while term_count < NUMBER_OF_ORDERS
# File.open(PIPE, "r") do |pipe|
         val = pipe.gets
         case val
         when /DONE/
           term_count += 1
         when nil
           puts "pipe closed" # need this too
         else
           responses << val
         end
# end
     end

     pids.each do |pid|
       Process.waitpid(pid)
     end

     puts responses.sort
     puts responses.count
   end
end

wlt = WaitListTest.new
wlt.setup
wlt.test_registering_last_race

···

On 09/06/2010 01:10 PM, Josh wrote:

# In a nutshell:
1) Create a pipe
2) Fork twice
3) Each for sends 10 strings over the pipe
4) See what comes out the other side

In the receiver, you are constantly opening the pipe for read and then
closing it. I expect that if someone writes to the pipe while there's no
reader, the data is lost.

The program works for me if I swap the open and while around:

    File.open("#{RAILS_ROOT}/#{PIPE}", "r") do |pipe|
      while term_count < NUMBER_OF_ORDERS
        val = pipe.gets
        if val.match(/DONE/)
          .. etc

But it is much better form to create an anonymous pipe in the parent
(rd, wr = IO.pipe) then fork. Close the rd end in the child and the wr
end in the parent.

Use Socket.pair if you want a bidirectional channel.

···

--
Posted via http://www.ruby-forum.com/.