Resolving pipe deadlock

This is a cross post from SO:

I am trying to emulate UNIX command line pipes in a Ruby-only solution
that uses multiple cores. Eventually, the records piped from command to
command will be Ruby objects marshaled using msgpack. Unfortunately, the
code hangs after the first dump command. I am really trying to figure
out what causes this deadlock and how to resolve it.

Any hints appreciated.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

This seems to work, without the parallel gem (sorry, not familiar with it):

#!/usr/bin/env ruby

require 'msgpack'
require 'pp'

class Pipe
   def initialize
     @commands =
   end

   def add(command, options = {})
     @commands << Command.new(command, options)

     self
   end

   def run
     writers = {}
     @commands.each_cons(2) do |c_in, c_out|
       reader, writer = IO.pipe

       c_out.input = MessagePack::Unpacker.new(reader)
       c_in.output = MessagePack::Packer.new(writer)
       writers[c_in] = writer
     end

     @commands.map do |command|
       fork do
         command.run
       end
       writers[command].close if writers[command]
     end

     Process.waitall
   end

   class Command
     attr_accessor :input, :output

     def initialize(command, options)
       @command = command
       @options = options
       @input = nil
       @output = nil
     end

     def run
       send @command
     end

     def cat
       @input.each { |record| @output.write(record).flush } if @input

       File.open(@options[:input]) do |ios|
         ios.each { |record| @output.write(record).flush } if @output
       end
     end

     def dump
       @input.each do |record|
         puts record
         @output.write(record).flush if @output
       end
     end
   end
end

p = Pipe.new
p.add(:cat, input: "foo.tab").add(:dump).add(:cat, input: "table.txt").add(:dump)
p.run

···

On 01/17/2014 12:19 AM, Martin Hansen wrote:

This is a cross post from SO:
How to resolve pipe dead-lock in Ruby? - Stack Overflow

I am trying to emulate UNIX command line pipes in a Ruby-only solution
that uses multiple cores. Eventually, the records piped from command to
command will be Ruby objects marshaled using msgpack. Unfortunately, the
code hangs after the first dump command. I am really trying to figure
out what causes this deadlock and how to resolve it.

If you feed it more than a couple of hundred lines of data the deadlock
happens.

Martin

···

--
Posted via http://www.ruby-forum.com/.

Thanks Robert,

Your explanation fits more or less exactly what I was fearing.

I think it would be really nice for a understanding/debugging point of
view if I could get this working in a single thread (parallel can be
disable by setting in_processes: 0). However, there are several issues.

First: with a simple pipe: p.add(:cat, input: "foo.tab").add(:dump)
where "foo.tab" only contains a few lines the script is hanging after
:dump and I suspect it is because of lack of a "termination signal" like
EOF.

Second, if "foo.tab" contains more than a couple of thousand lines then
it blocks in the :cat step - and I suspect that the reader buffer is
full and requires unloading to resolve the block.

Third, I was hoping that with multiple processes the next process would
unload the buffer from the preceding step in a timely fashion. However,
it is clear that this isn't the case - some synchronization is require
between the processes. If only Ruby (MRI) threads supported multiple
processors it could be handled with a mutex (darn GVL!). One could dig
into EventMachine to see if that would work, but I am scared by the
complexity of if.

Thus, I fear my design is flawed. One thing I can think of is to change
the design slightly so a single record at a time is passed from command
to command.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

Hey, I'm not familiar with either parallel or msgpack, but can you try
using strace (or similar syscall tracer for your OS) on the spawned
processes? You'll probably find one (or more) processes is stuck in the
read/write/select syscalls and which FDs they're stuck on.

If you're on Linux, perhaps check out dtas-graph, a Perl script I wrote
for visualizing the pipelines and dataflows within dtas-player[1]. I
combined it with strace for debugging, because it was sometimes
confusing to associate the FDs shown in strace output with the actual
pipes/connected process on the other end(s).

dtas-graph should work on any Linux processes using pipes, not just
dtas-player.

  git clone git://80x24.org/dtas dtas

  # You'll need the Graph::Easy perl module,
  # "apt-get install libgraph-easy-perl" if you're on Debian,
  # I would've written it in Ruby but I couldn't find a graphing
  # library in Ruby with ASCII/UTF-8 text output.
  perl dtas/perl/dtas-graph $PID_OF_MAIN_PROCESS

Sample output: http://dtas.80x24.org/dtas-graph-sample.txt

Arrows denote direction of data flow, on the lines are the file
descriptor numbers associated with the pipe for each process. Boxes
either represent processes (identified via PIDs) or pipe objects
(identified via PIPEID:PIPE_INO mapping).

[1] - duct tape audio suite - http://dtas.80x24.org/

···

Martin Hansen <lists@ruby-forum.com> wrote:

I am trying to emulate UNIX command line pipes in a Ruby-only solution
that uses multiple cores. Eventually, the records piped from command to
command will be Ruby objects marshaled using msgpack. Unfortunately, the
code hangs after the first dump command. I am really trying to figure
out what causes this deadlock and how to resolve it.

I get this with strace, but I don't really know what it means:

maasha@mao:~/scratch$ strace -cp 18264
Process 18264 attached - interrupt to quit
Process 18264 detached
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
  -nan 0.000000 0 6 read
  -nan 0.000000 0 23 1 write
  -nan 0.000000 0 1 open
  -nan 0.000000 0 4 close
  -nan 0.000000 0 1 fstat
  -nan 0.000000 0 1 mmap
  -nan 0.000000 0 2 munmap
  -nan 0.000000 0 3 rt_sigaction
  -nan 0.000000 0 2 1 rt_sigreturn
  -nan 0.000000 0 1 getrlimit
  -nan 0.000000 0 1 sched_getaffinity
  -nan 0.000000 0 1 tgkill
------ ----------- ----------- --------- --------- ----------------
100.00 0.000000 46 2 total

···

--
Posted via http://www.ruby-forum.com/.

Right (cryptic output IMHO):

maasha@mao:~/scratch$ strace -p 18272
Process 18272 attached - interrupt to quit
write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ? ERESTARTSYS
(To be restarted)
--- SIGINT (Interrupt) @ 0 (0) ---
write(4, "!", 1) = 1
--- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
rt_sigreturn(0x1a) = 1
rt_sigreturn(0x2) = -1 EINTR (Interrupted system
call)
close(9) = 0
open("/proc/self/maps", O_RDONLY) = 9
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM_INFINITY}) =
0
fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0)
= 0x7fd49ea35000
read(9, "7fd49d0d0000-7fd49d0df000 r-xp 0"..., 1024) = 1024
read(9, " /usr/local/lib/ruby/2"..., 1024) = 1024
read(9, "0000 08:01 260629 "..., 1024) = 1024
read(9, "400000 rw-p 00003000 08:01 26062"..., 1024) = 1024
read(9, ":00 0 \n7fd49e824000-7fd49e844000"..., 1024) = 1024
read(9, "fff64280000 rw-p 00000000 00:00 "..., 1024) = 231
close(9) = 0
munmap(0x7fd49ea35000, 4096) = 0
sched_getaffinity(18272, 32, {ffff, 0, 0, 0}) = 32
write(2, "./pipes.rb:50:in `write'", 24) = 24
write(2, ": ", 2) = 2
write(2, "Interrupt", 9) = 9
write(2, "\n", 1) = 1
write(2, "\tfrom ./pipes.rb:50:in `flush'\n", 31) = 31
write(2, "\tfrom ./pipes.rb:50:in `block (2"..., 49) = 49
write(2, "\tfrom ./pipes.rb:50:in `each'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:50:in `block in"..., 38) = 38
write(2, "\tfrom ./pipes.rb:49:in `open'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:49:in `cat'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:27:in `block in"..., 38) = 38
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 103) = 103
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 87) = 87
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 98) = 98
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 94) = 94
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom ./pipes.rb:27:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:67:in `<main>'\n", 32) = 32
rt_sigaction(SIGINT, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {0x7fd49eb2d6f0, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, 8) = 0
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
close(7) = 0
close(8) = 0
write(4, "!", 1) = 1
munmap(0x7fd49e926000, 1052672) = 0
rt_sigaction(SIGINT, {SIG_DFL, [INT], SA_RESTORER|SA_RESTART,
0x7fd49d9eb4f0}, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
tgkill(18272, 18272, SIGINT) = 0
--- SIGINT (Interrupt) @ 0 (0) ---
Process 18272 detached

maasha@mao:~/scratch$ strace -fp 18277
Process 18277 attached with 2 threads - interrupt to quit
[pid 18278] restart_syscall(<... resuming interrupted call ...>
<unfinished ...>
[pid 18277] write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ?
ERESTARTSYS (To be restarted)
[pid 18277] --- SIGINT (Interrupt) @ 0 (0) ---
[pid 18277] write(4, "!", 1 <unfinished ...>
[pid 18278] <... restart_syscall resumed> ) = 1
[pid 18277] <... write resumed> ) = 1
[pid 18278] read(3, <unfinished ...>
[pid 18277] rt_sigreturn(0x2 <unfinished ...>
[pid 18278] <... read resumed> "!", 1024) = 1
[pid 18277] <... rt_sigreturn resumed> ) = -1 EINTR (Interrupted system
call)
[pid 18278] read(3, <unfinished ...>
[pid 18277] write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31
<unfinished ...>
[pid 18278] <... read resumed> 0x7f2fc6d28020, 1024) = -1 EAGAIN
(Resource temporarily unavailable)
[pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] tgkill(18277, 18277, SIGVTALRM) = 0
[pid 18278] poll([{fd=3, events=POLLIN}], 1, 100 <unfinished ...>
[pid 18277] <... write resumed> ) = ? ERESTARTSYS (To be
restarted)
[pid 18277] --- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
[pid 18277] rt_sigreturn(0x1a) = -1 EINTR (Interrupted system
call)
[pid 18277] close(9) = 0
[pid 18277] open("/proc/self/maps", O_RDONLY) = 9
[pid 18277] getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024,
rlim_max=RLIM_INFINITY}) = 0
[pid 18277] fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
[pid 18277] mmap(NULL, 4096, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f2fc687f000
[pid 18277] read(9, "7f2fc4f1a000-7f2fc4f29000 r-xp 0"..., 1024) = 1024
[pid 18277] read(9, " /usr/local/lib/ruby/2"..., 1024) = 1024
[pid 18277] read(9, "0000 08:01 260629 "..., 1024) = 1024
[pid 18277] read(9, "24a000 rw-p 00003000 08:01 26062"..., 1024) = 1024
[pid 18277] read(9, ":00 0 \n7f2fc666e000-7f2fc668e000"..., 1024) = 1024
[pid 18277] read(9, "fff1d5fb000 rw-p 00000000 00:00 "..., 1024) = 231
[pid 18277] close(9) = 0
[pid 18277] munmap(0x7f2fc687f000, 4096) = 0
[pid 18277] sched_getaffinity(18277, 32, {ffff, 0, 0, 0}) = 32
[pid 18277] write(2, "./pipes.rb:50:in `write'", 24) = 24
[pid 18277] write(2, ": ", 2) = 2
[pid 18277] write(2, "Interrupt", 9) = 9
[pid 18277] write(2, "\n", 1) = 1
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `flush'\n", 31) = 31
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `block (2"..., 49) = 49
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `each'\n", 30) = 30
[pid 18277] write(2, "\tfrom ./pipes.rb:50:in `block in"..., 38) = 38
[pid 18277] write(2, "\tfrom ./pipes.rb:49:in `open'\n", 30) = 30
[pid 18277] write(2, "\tfrom ./pipes.rb:49:in `cat'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:27:in `block in"..., 38) = 38
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 103) = 103
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 87) = 87
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 98) = 98
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 94) = 94
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
[pid 18277] write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
[pid 18277] write(2, "\tfrom ./pipes.rb:27:in `run'\n", 29) = 29
[pid 18277] write(2, "\tfrom ./pipes.rb:67:in `<main>'\n", 32) = 32
[pid 18277] rt_sigaction(SIGINT, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, {0x7f2fc69776f0, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, 8) = 0
[pid 18277] rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO,
0x7f2fc6461030}, {SIG_IGN, [], SA_RESTORER|SA_SIGINFO, 0x7f2fc6461030},
8) = 0
[pid 18277] close(7) = 0
[pid 18277] close(8) = 0
[pid 18277] write(4, "!", 1 <unfinished ...>
[pid 18278] <... poll resumed> ) = 1 ([{fd=3, revents=POLLIN}])
[pid 18277] <... write resumed> ) = 1
[pid 18278] read(3, <unfinished ...>
[pid 18277] futex(0x7f2fc688a9d0, FUTEX_WAIT, 18278, NULL <unfinished
...>
[pid 18278] <... read resumed> "!", 1024) = 1
[pid 18278] read(3, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] _exit(0) = ?
Process 18278 detached
<... futex resumed> ) = 0
munmap(0x7f2fc6770000, 1052672) = 0
rt_sigaction(SIGINT, {SIG_DFL, [INT], SA_RESTORER|SA_RESTART,
0x7f2fc58354f0}, {SIG_DFL, [], SA_RESTORER|SA_SIGINFO, 0x7f2fc6461030},
8) = 0
tgkill(18277, 18277, SIGINT) = 0
--- SIGINT (Interrupt) @ 0 (0) ---
Process 18277 detached

···

--
Posted via http://www.ruby-forum.com/.

OK, the output of strace -o trace -ff -ttt ./pipes.rb produces two
files:

ftp://ftp_20140119_17561:eLNy4r+fAN27@ftp.dna.ku.dk

I have yet to figure out what they mean.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

Thanks Robert,

As usual your assistance and insight is awesome.

I did let the strace run for a while before, and tested a couple of new
ones that was run for a minute or so, but since the traces contain the
same number of lines, I don't think run time was an issue.

I don't understand why 3 pipes (pipe pairs?) are reported, when IO.pipe
is called only once and fork is disabled (also disabling msgpack doesn't
change the number of pipes).

"ulimit -p" on my linux system yields 8 and that indicates a 64K buffer.
On my Mac it yields 1?

Anyways, I did some more tests/straces with Parallel enabled:

Since this version uses forks it means that a separate process is
reading from the process that is writing (that's my idea at least). This
works to the degree that all of the data is being dumped, but then the
script hangs - I suspect that the pipe is not closed and freed
correctly. I uploaded the traces from this test to trace_2step.tar to
ftp://ftp_20140119_17561:eLNy4r+fAN27@ftp.dna.ku.dk

Moreover, adding a further step in line 67 in the above gist:

p.add(:cat, input: "big.tab").add(:cat, input: "table.txt").add(:dump)

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

@Robert, that is of cause something I need to look into. I did notice
that IO.pipe have a child/parent relationship from the docs, but some
clever guy on IRC #ruby said it would probably be OK to use. I was
actually using named pipes in an earlier version - and those have no
child/parent bindings - but the script would still hang for reasons
unknown.

For now I will do some forking manually to avoid Parallel.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

This Parallel-less version is more of less following the example from
the IO.pipe docs, however, even if I can connect the commands in the
pipeline I fail to see how I can then run the commands:

  def run
    @commands.each_cons(2) do |parent, child|
      reader, writer = IO.pipe

      if fork
        writer.close
        child.input = MessagePack::Unpacker.new(reader)
        reader.close
        Process.wait
      else
        reader.close
        parent.output = MessagePack::Packer.new(writer)
        writer.close
      end
    end

    self
  end

I better sleep on it :o/

Martin

···

--
Posted via http://www.ruby-forum.com/.

Robert, I appreciate your effort. I still have a hard time wrapping my
head around fork and closing dangling file handles - just as you pointed
out. Also, lambdas, smart as they may be, are less readable to me, and I
would think also my target audience (biologists). I have decided to
stick to my proposed syntax for setting up and executing pipes, which is
most flexible and easy to grasp (and will work excellently in irb):

require 'pipe'

p1 = Pipe.new.add(:cat, input: "test_in").add(:grep, pattern: "foo")
p2 = Pipe.new.add(:save, output: "test_out" )

(p1 + p2).add(:dump).run

Also, I am aiming for some +100 commands here, where some will be quite
advanced - and I am afraid of lambdas for this. In fact, I am
experimenting to come up with the next generation of Biopieces
(www.biopieces.org).

Here is a version with named pipes that works (though still with
dangling file handles):

Named pipes don't have the parent/child binding of IO.pipe, so they work
with the parallel gem. However, the stream terminator "\0" I use is
quite hackery. I could possible keep track of the number of records
passed around instead. Or get those dangling file handles sorted?

Finally, I wonder about performance of IO.pipe versus named pipes - I
want to do a benchmark. Actually, I am concerned about overall
performance; this is of cause not C or even well written Ruby code
optimized to a specific task, but rather a general purpose way of
setting up pipes and executing them as simply as possible. I figure that
30 minutes writing a script that runs for 1 minute is more often less
appealing than a 1 minute script that runs for 30 minutes.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

Benchmark of IO.pipe vs named pipe:

maasha@mao:~$ cat benchmark.rb
#!/usr/bin/env ruby

require 'benchmark'

system("mkfifo my_pipe") unless File.exists? "my_pipe"

rd, wt = IO.pipe
nrd = File.open("my_pipe", File::RDONLY | File::NONBLOCK)
nwt = File.open("my_pipe", File::WRONLY | File::NONBLOCK)

n = 1_000_000

Benchmark.bm() do |x|
  x.report("IO.pipe") { n.times { wt.puts "foo"; wt.flush; rd.gets }
}
  x.report("named pipe") { n.times { nwt.puts "foo"; nwt.flush; nrd.gets
} }
end

maasha@mao:~$ ./benchmark.rb
       user system total real
IO.pipe 1.360000 0.750000 2.110000 ( 2.118993)
named pipe 1.280000 0.610000 1.890000 ( 1.894353)

···

--
Posted via http://www.ruby-forum.com/.

Robert, this is indeed marvelous. I guess my problem with lambda, proc
and closures is lack of experience - there is a bit of a hurdle to get
over since abstraction is, well, abstract... Anyways, you make a strong
case, so I will stick to your suggestions!

Now, I totally believe in "separation of concerns" and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack) through all commands in
the pipeline. Certain commands will respond to records with appropriate
data and act upon this and sometimes emit incoming records as well as
any newly produced to the next command in the pipeline. Therefore I will
need a command like the below CAT that emits all incoming records and
any new records from the specified file. So I tried the below with your
code, but it hangs :o( ?

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    io_in.each_line {|line| line.chomp!; io_out.puts line}

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}

p.execute_processes

···

--
Posted via http://www.ruby-forum.com/.

Serializing and deserializing is a bottleneck and msgpack is fast:

maasha@mel:~$ cat benchmark.rb
#!/usr/bin/env ruby

require 'benchmark'
require 'msgpack'

n = 100_000

h = {
  zero: 0,
  one: 1,
  two: 2,
  three: 3,
  four: 4,
  five: 5,
  six: 6
}

Benchmark.bm() do |x|
  x.report("Marshal") { n.times { m = Marshal.dump h; u = Marshal.load m
} }
  x.report("MsgPack") { n.times { m = h.to_msgpack; u =
MessagePack.unpack m } }
end

maasha@mel:~$ ./benchmark.rb
       user system total real
Marshal 1.770000 0.000000 1.770000 ( 1.774334)
MsgPack 0.900000 0.020000 0.920000 ( 0.921232)

Now concerning CAT: This is a placeholder for commands that will read in
data in particular formats from specified files. I have found time and
time again that it is extremely useful (using Biopieces 1.0) to be able
to process data like this:

p = Pipe.new
p << CAT["file1"]
p << GREP["/foobar/"]
p << CAT["file2]
p.execute_processes

I know that this is not the way UNIX cat works, and that BASH can do
stuff like this neatly. But this is not UNIX and UNIX cat. I really
would like to let ALL commands read any incoming records and emit them
again (pending given options) along with any newly created records.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

OK, the use of "-" inside the CAT lambda I could perhaps tolerate. But
in the invocation I think it looks silly. Now, I tried something crufty
instead that happens to work!?!:

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    io_in.each_line {|line| line.chomp!; io_out.puts line} unless
io_in.inspect =~ /STDIN/

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}
p.execute_processes

One could perhaps patch class IO and add a stdin? method?

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

So after a busy week, I have at last some time to look more into this.
So far it looks very promising, but then I wanted to use the
(de-)serializer method of msgpack and it hangs once again?

line 49-50

The File.pipe? test of cause needs to be changed, but there is more to
it ...

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/.

Got it. Gist updated.

Martin

···

--
Posted via http://www.ruby-forum.com/.

No knowledge of gem "parallels" here either but this phenomenon
happens also if one does not properly parallelize IO. It may be an
explanation. If it is what I am suggesting then everything works as
long as data size is less as some buffer size because then an IO
operation can finish even if there is no live reader. If you exceed
the limit then one IO operation blocks waiting for another to read
data from the buffer so it can write more data. If the other write
operation is somehow blocked by the first one (i.e. via some
synchronization mechanism or plainly because it is invoked from the
same thread afterwards) then the deadlock happens: the writer cannot
finish and the reader never gets a chance to read.

Kind regards

robert

···

On Sat, Jan 18, 2014 at 10:34 AM, Martin Hansen <lists@ruby-forum.com> wrote:

If you feed it more than a couple of hundred lines of data the deadlock
happens.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Just use strace -p, no need for -c unless you're doing performance
stuff. Try -f to follow forks, too.

···

Martin Hansen <lists@ruby-forum.com> wrote:

I get this with strace, but I don't really know what it means:

maasha@mao:~/scratch$ strace -cp 18264