Resolving pipe deadlock

You'll have to figure that all out and what it means.

I'm just pointing you towards to some of the tools available. Leave the
processes running while you strace; run strace on each process in
multiple terminals, look at "ps axf" output, etc...

···

Martin Hansen <lists@ruby-forum.com> wrote:

Right (cryptic output IMHO):

Right (cryptic output IMHO):

maasha@mao:~/scratch$ strace -p 18272
Process 18272 attached - interrupt to quit
write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n", 31) = ? ERESTARTSYS
(To be restarted)

A lot is missing before that. To find out what object a file
descriptor points to it's usually better to start the process with
"strace -f" and not attach later. Because if you attach later then
the open() calls are usually not contained. I would try something
like

$ strace -o trace -ff -ttt your/ruby/process with args

This will write one trace file per process called "trace.<PID>" and
follow forks (-o trace and -ff) and print timestamps with microsecond
resolution (-ttt). I suggest the latter over -r because then you can
align events from different processes if need be.

It seems the trace mostly contains reaction to you killing the process
with SIGINT (similar for the other trace):

--- SIGINT (Interrupt) @ 0 (0) ---
write(4, "!", 1) = 1
--- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
rt_sigreturn(0x1a) = 1
rt_sigreturn(0x2) = -1 EINTR (Interrupted system
call)
close(9) = 0
open("/proc/self/maps", O_RDONLY) = 9
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM_INFINITY}) =
0
fstat(9, {st_mode=S_IFREG|0444, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0)
= 0x7fd49ea35000
read(9, "7fd49d0d0000-7fd49d0df000 r-xp 0"..., 1024) = 1024
read(9, " /usr/local/lib/ruby/2"..., 1024) = 1024
read(9, "0000 08:01 260629 "..., 1024) = 1024
read(9, "400000 rw-p 00003000 08:01 26062"..., 1024) = 1024
read(9, ":00 0 \n7fd49e824000-7fd49e844000"..., 1024) = 1024
read(9, "fff64280000 rw-p 00000000 00:00 "..., 1024) = 231
close(9) = 0
munmap(0x7fd49ea35000, 4096) = 0
sched_getaffinity(18272, 32, {ffff, 0, 0, 0}) = 32
write(2, "./pipes.rb:50:in `write'", 24) = 24
write(2, ": ", 2) = 2
write(2, "Interrupt", 9) = 9
write(2, "\n", 1) = 1
write(2, "\tfrom ./pipes.rb:50:in `flush'\n", 31) = 31
write(2, "\tfrom ./pipes.rb:50:in `block (2"..., 49) = 49
write(2, "\tfrom ./pipes.rb:50:in `each'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:50:in `block in"..., 38) = 38
write(2, "\tfrom ./pipes.rb:49:in `open'\n", 30) = 30
write(2, "\tfrom ./pipes.rb:49:in `cat'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:43:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:27:in `block in"..., 38) = 38
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 103) = 103
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 87) = 87
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 98) = 98
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 94) = 94
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom /usr/local/lib/ruby/gems/2"..., 86) = 86
write(2, "\tfrom ./pipes.rb:27:in `run'\n", 29) = 29
write(2, "\tfrom ./pipes.rb:67:in `<main>'\n", 32) = 32
rt_sigaction(SIGINT, {SIG_IGN, , SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {0x7fd49eb2d6f0, , SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, 8) = 0
rt_sigaction(SIGINT, {SIG_DFL, , SA_RESTORER|SA_SIGINFO,
0x7fd49e617030}, {SIG_IGN, , SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
close(7) = 0
close(8) = 0
write(4, "!", 1) = 1
munmap(0x7fd49e926000, 1052672) = 0
rt_sigaction(SIGINT, {SIG_DFL, [INT], SA_RESTORER|SA_RESTART,
0x7fd49d9eb4f0}, {SIG_DFL, , SA_RESTORER|SA_SIGINFO, 0x7fd49e617030},
8) = 0
tgkill(18272, 18272, SIGINT) = 0
--- SIGINT (Interrupt) @ 0 (0) ---
Process 18272 detached

[...]
[pid 18277] write(4, "!", 1 <unfinished ...>
[pid 18278] <... poll resumed> ) = 1 ([{fd=3, revents=POLLIN}])
[pid 18277] <... write resumed> ) = 1
[pid 18278] read(3, <unfinished ...>
[pid 18277] futex(0x7f2fc688a9d0, FUTEX_WAIT, 18278, NULL <unfinished
...>
[pid 18278] <... read resumed> "!", 1024) = 1
[pid 18278] read(3, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
[pid 18278] read(5, 0x7f2fc6d28020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)

Maybe lines above give an indication that the reader is actually
blocked because there is no data. (Ruby MRI internally uses
nonblocking IO to multiplex threads.)

Kind regards

robert

···

On Sun, Jan 19, 2014 at 11:06 AM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

OK, the output of strace -o trace -ff -ttt ./pipes.rb produces two
files:

ftp://ftp\_20140119\_17561:eLNy4r\+fAN27@ftp.dna.ku.dk

Thank you!

I have yet to figure out what they mean.

I can share a few observations:

The parent creates three pipes:
$ fgrep pipe2 trace.1*
trace.18386:1390135219.046131 pipe2([3, 4], O_CLOEXEC) = 0
trace.18386:1390135219.046295 pipe2([5, 6], O_CLOEXEC) = 0
trace.18386:1390135219.112926 pipe2([7, 8], O_CLOEXEC) = 0

The client reads only from FD 3 and 5:

$ fgrep read trace.18387
1390135375.259220 read(3, "!", 1024) = 1
1390135375.259410 read(3, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.259663 read(5, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.261880 read(3, "!", 1024) = 1
1390135375.261910 read(3, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)
1390135375.261932 read(5, 0x7fb63c823020, 1024) = -1 EAGAIN (Resource
temporarily unavailable)

The parent writes to 2 (stderr) and 4 and 8
$ egrep -o 'write\([0-9]+' trace.18386 | sort -u
write(2
write(4
write(8

But to 4 only at the end
$ fgrep 'write(4' trace.18386
1390135375.258855 write(4, "!", 1) = 1
1390135375.261804 write(4, "!", 1) = 1

Main output from file "big.tab" goes to 8:

$ fgrep -c 'write(8' trace.18386
1924

Now, as you can see 8 is connected to 7 but since the child never
reads from 7 no data is transferred to it.

If you look at the ordering of pipe2 and clone you see that the client
is created before the last pipe is opened:

$ egrep '^[0-9]+\.[0-9]+ (pipe|clone)' trace.18386
1390135219.046131 pipe2([3, 4], O_CLOEXEC) = 0
1390135219.046295 pipe2([5, 6], O_CLOEXEC) = 0
1390135219.046513 clone(child_stack=0x7fb63c384ff0,
flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID,
parent_tidptr=0x7fb63c3859d0, tls=0x7fb63c385700,
child_tidptr=0x7fb63c3859d0) = 18387
1390135219.112926 pipe2([7, 8], O_CLOEXEC) = 0

So maybe it cannot read from FD 7 which it would need to do to read
what the parent writes to FD 8. But see at the end.

The strange thing is that almost all write operations are successful
because as many bytes are written as indicated:

$ egrep -c 'write\(8,.* ([0-9]+)\) = \1' trace.18386
1922
$ egrep -c 'write\(8,.* ([0-9]+)\) =' trace.18386
1924

$ diff -U5 <(egrep 'write\(8,.* ([0-9]+)\) =' trace.18386) <(egrep
'write\(8,.* ([0-9]+)\) = \1' trace.18386)
--- /dev/fd/63 2014-01-19 14:33:25.426820557 +0100
+++ /dev/fd/62 2014-01-19 14:33:25.426820557 +0100
@@ -1918,7 +1918,5 @@
1390135219.160905 write(8, "\332\0
4559\t4602\t3_cpVRyxwXsN1/2\t592"..., 35) = 35
1390135219.160929 write(8, "\332\0
4563\t4597\t5_X4035ywXsN1/1\t833"..., 35) = 35
1390135219.160954 write(8, "\332\0
4565\t4599\t1_bWkzlxwXsN1/1\t442"..., 35) = 35
1390135219.160980 write(8, "\332\0
4568\t4611\t1_TQZWsxwXsN1/2\t540"..., 35) = 35
1390135219.161004 write(8, "\332\0
4570\t4613\t3_gQoPtxwXsN1/2\t196"..., 35) = 35
-1390135219.161029 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
-1390135375.259425 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)

The last two ones happen because of the signal I believe:

1390135219.161029 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
1390135375.258745 --- SIGINT (Interrupt) @ 0 (0) ---
1390135375.258855 write(4, "!", 1) = 1
1390135375.259234 rt_sigreturn(0x2) = -1 EINTR (Interrupted system call)
1390135375.259425 write(8, "\2764571\t4605\t5_GQbZ3ywXsN1/1\t7\t-\n",
31) = ? ERESTARTSYS (To be restarted)
1390135375.259784 --- SIGVTALRM (Virtual timer expired) @ 0 (0) ---
1390135375.259836 rt_sigreturn(0x1a) = -1 EINTR (Interrupted system call)

All in all 65266 bytes are written:

$ awk '/write\(8,.* = [0-9]+/ {sum+=$NF} END{print sum}' trace.18386
65266

This is 270 bytes short of 64k. I have no idea whether you happened to
kill the parent before it got into the stuck state. You could check
your pipe buffer size with "ulimit -p". On linux this gives buffer
size in 512 byte blocks. On my machine it returns 8 which is 4k which
also happens to be the memory page size but of course it may be
different on your system.

Maybe you repeat the test and wait a bit longer to see whether the
parent gets into some kind of blocking state.

I looked a bit closer at flags passed to clone() and it seems the
child is a thread only:

1390135219.046513 clone(child_stack=0x7fb63c384ff0,
flags=CLONE_VM (same memory space)

CLONE_FS (same filesystem information)
CLONE_FILES (share the same file descriptor table.)
CLONE_SIGHAND (share the same table of signal handlers)
CLONE_THREAD (same thread group as the calling process)
CLONE_SYSVSEM (share a single list of System V semaphore undo values
CLONE_SETTLS (The newtls argument is the new TLS (Thread Local

Storage) descriptor.)

CLONE_PARENT_SETTID (Store child thread ID at location ptid in parent

and child memory)

CLONE_CHILD_CLEARTID (Erase child thread ID at location ctid in

child memory when the child exits,)

So, since both share the same FD table the client _could_ read from FD
7 but apparently nobody does. So apparently you have a pipe which is
written to but nobody reads from it. At some point in time the writer
then must block since buffers are limited - always.

Kind regards

robert

···

On Sun, Jan 19, 2014 at 1:55 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

If I'm allowed to use aalib, I can have one for you in about 10 more lines of code. :stuck_out_tongue:

···

On Jan 19, 2014, at 1:17, Eric Wong <normalperson@yhbt.net> wrote:

  # I would've written it in Ruby but I couldn't find a graphing
  # library in Ruby with ASCII/UTF-8 text output.

Thanks Robert,

As usual your assistance and insight is awesome.

You're welcome!

I did let the strace run for a while before, and tested a couple of new
ones that was run for a minute or so, but since the traces contain the
same number of lines, I don't think run time was an issue.

I don't understand why 3 pipes (pipe pairs?) are reported, when IO.pipe
is called only once and fork is disabled (also disabling msgpack doesn't
change the number of pipes).

"ulimit -p" on my linux system yields 8 and that indicates a 64K buffer.
On my Mac it yields 1?

Does 8 really mean 64k? I would have assumed that all flavors of
Linux use 512 byte blocks which would mean 4k - same as on my box.

Anyways, I did some more tests/straces with Parallel enabled:

pipes.rb · GitHub

Since this version uses forks it means that a separate process is
reading from the process that is writing (that's my idea at least). This
works to the degree that all of the data is being dumped, but then the
script hangs - I suspect that the pipe is not closed and freed
correctly. I uploaded the traces from this test to trace_2step.tar to
ftp://ftp\_20140119\_17561:eLNy4r\+fAN27@ftp.dna.ku.dk

Moreover, adding a further step in line 67 in the above gist:

p.add(:cat, input: "big.tab").add(:cat, input: "table.txt").add(:dump)

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.

I don't have time right now to dive into yet another trace but I did
look at documentation of parallel. As far as I can tell from brief
inspection this is implementing a farmer worker scenario, i.e. you
have a queue of work items and processors (thread or process) pick
them up one after the other and execute. But if I understand what you
are doing properly this is not what you want. Basically you want all
tasks to run in parallel and the work items are taken from somewhere
else (i.e. the task at the head of the FIFO pipeline reads them from a
file) and piped through. In other words: it seems you are not using
the proper framework for your problem at hand. :slight_smile:

Kind regards

robert

···

On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

One more potential source of issues: you open all the pipes from the
main process. However, child processes will inherit those pipe
objects and if they are not closed there they will keep the pipeline
alive. That might be the issue you face.

Cheers

robert

···

On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com> wrote:

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

This Parallel-less version is more of less following the example from
the IO.pipe docs, however, even if I can connect the commands in the
pipeline I fail to see how I can then run the commands:

I have a solution which connects commands but it still suffers from
your original issue (i.e. not terminating properly).

I better sleep on it :o/

Good plan!

Kind regards

robert

···

On Mon, Jan 20, 2014 at 10:24 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Robert, I appreciate your effort. I still have a hard time wrapping my
head around fork and closing dangling file handles - just as you pointed
out.

Did you look at the code I provided (the gist)? Basically you need to
close all file handles that you do not need in a process which
includes all previously opened pipes and the write end of the current
pipe. I also did the optimization that the head of the pipeline is
executed in the current process.

Also, lambdas, smart as they may be, are less readable to me, and I
would think also my target audience (biologists). I have decided to
stick to my proposed syntax for setting up and executing pipes, which is
most flexible and easy to grasp (and will work excellently in irb):

I'll try to convince you to reconsider. First, with the approach you
have taken, you need to define a method inside the Pipe class for
*every* functionality that you want to have. By that you make the
Pipe class multi purpose (executing pipelines in multiple processes
AND a lot individual functionality). As a consequence the provider of
the Pipe class (you) ought to provide all the algorithms in form of
methods. Of course users of the class can open it again and add
methods (e.g. their own version of "cat" or whatnot). But that opens
the door for name conflicts, issues with instance variables etc.

In software engineering it is usually considered good practice to only
condense one purpose in a class. For class Pipe (which should
probably rather be called "Pipeline") it is executing multiple
operations in various processes (or even threads as my class offers).
By using lambda as abstraction for an anonymous method you have
maximum flexibility to provide any functionality. The only contract
is that the lambda will be invoked with two IOs for reading and
writing.

Even if you dislike lambdas, it is not too hard to come up with a
syntax that resembles yours! And that is probably the best news for
you. Here are some examples (for "cat" only) and how they look like
when creating the pipeline:

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(CAT["test_in"]).add(GREP[/foo.*bar/])
pipe << CAT["test_in"] << GREP[/foo.*bar/]
pipe | CAT["test_in"] | GREP[/foo.*bar/]

(Note, I added operator "|".)

Other forms to create those lambdas - basically always following the
same approach to use a closure to capture arguments for later
execution:

def cat(*args)
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(cat("test_in")).add(grep(/foo.*bar/))
pipe << cat("test_in") << grep(/foo.*bar/)
pipe | cat("test_in") | grep(/foo.*bar/)

def Cat(*args)
  lambda do |io_in, io_out|
    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

pipe.add(Cat("test_in")).add(Grep(/foo.*bar/))
pipe << Cat("test_in") << Grep(/foo.*bar/)
pipe | Cat("test_in") | Grep(/foo.*bar/)

require 'pipe'

p1 = Pipe.new.add(:cat, input: "test_in").add(:grep, pattern: "foo")
p2 = Pipe.new.add(:save, output: "test_out" )

(p1 + p2).add(:dump).run

Easily done.

Also, I am aiming for some +100 commands here, where some will be quite
advanced - and I am afraid of lambdas for this.

Why? Your lambda can even be a simple adapter if you want to make use
of multiple other classes.

In fact, I am
experimenting to come up with the next generation of Biopieces
(www.biopieces.org).

Here is a version with named pipes that works (though still with
dangling file handles):

named_pipes2.rb · GitHub

I have updated my version but there is the opposite problem: one of
the IOs is closed too early. :slight_smile: You can see current state of affairs
in the gist. I have to go to bed now.

Named pipes don't have the parent/child binding of IO.pipe, so they work
with the parallel gem. However, the stream terminator "\0" I use is
quite hackery. I could possible keep track of the number of records
passed around instead. Or get those dangling file handles sorted?

Better that.

Finally, I wonder about performance of IO.pipe versus named pipes - I
want to do a benchmark. Actually, I am concerned about overall
performance; this is of cause not C or even well written Ruby code
optimized to a specific task, but rather a general purpose way of
setting up pipes and executing them as simply as possible. I figure that
30 minutes writing a script that runs for 1 minute is more often less
appealing than a 1 minute script that runs for 30 minutes.

:slight_smile:

There is another disadvantages of names pipes: you need to make sure
that names do not collide. Also, there might be security
implications. The approach with nameless pipes is certainly robuster.

Kind regards

robert

···

On Tue, Jan 21, 2014 at 9:08 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Robert, this is indeed marvelous. I guess my problem with lambda, proc
and closures is lack of experience - there is a bit of a hurdle to get
over since abstraction is, well, abstract... Anyways, you make a strong
case, so I will stick to your suggestions!

:slight_smile:

Now, I totally believe in "separation of concerns" and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack)

Btw. why do you use msgpack? Is there any advantage over Marshal? If
you want to use individual messages for exchange instead of the raw
binary stream you could maybe add another layer which will handle
serialization and deserialization so individual lambdas receive only
one argument (an object) and the returned value will be sent off to
the next stage.

through all commands in
the pipeline. Certain commands will respond to records with appropriate
data and act upon this and sometimes emit incoming records as well as
any newly produced to the next command in the pipeline. Therefore I will
need a command like the below CAT that emits all incoming records and
any new records from the specified file. So I tried the below with your
code, but it hangs :o( ?

That's because your first CAT tries to read from $stdin. If you want
to append multiple files you can remove the first line from the
definition and use

p << CAT["table.txt", "foo.tab"]

CAT = lambda do |*args|
  lambda do |io_in, io_out|

blocks in this line:

    io_in.each_line {|line| line.chomp!; io_out.puts line}

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}

Kind regards

robert

···

On Wed, Jan 22, 2014 at 12:43 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Serializing and deserializing is a bottleneck and msgpack is fast:

maasha@mel:~$ cat benchmark.rb
#!/usr/bin/env ruby

require 'benchmark'
require 'msgpack'

n = 100_000

h = {
  zero: 0,
  one: 1,
  two: 2,
  three: 3,
  four: 4,
  five: 5,
  six: 6
}

Benchmark.bm() do |x|
  x.report("Marshal") { n.times { m = Marshal.dump h; u = Marshal.load m
} }
  x.report("MsgPack") { n.times { m = h.to_msgpack; u =
MessagePack.unpack m } }
end

maasha@mel:~$ ./benchmark.rb
       user system total real
Marshal 1.770000 0.000000 1.770000 ( 1.774334)
MsgPack 0.900000 0.020000 0.920000 ( 0.921232)

Impressive. Although it does not look as if Marshal was really slow.

Now concerning CAT: This is a placeholder for commands that will read in
data in particular formats from specified files. I have found time and
time again that it is extremely useful (using Biopieces 1.0) to be able
to process data like this:

p = Pipe.new
p << CAT["file1"]
p << GREP["/foobar/"]
p << CAT["file2]
p.execute_processes

I know that this is not the way UNIX cat works, and that BASH can do
stuff like this neatly. But this is not UNIX and UNIX cat. I really
would like to let ALL commands read any incoming records and emit them
again (pending given options) along with any newly created records.

Well, but then you need a way to decide whether you want to read from
the pipe or not because the first CAT will always block on $stdio. I
suggest to do it like the UNIX cat and use "-" as identifier for
stdin:

CAT = lambda do |*args|
  args << '-' if args.empty?

  lambda do |io_in, io_out|
    args.each do |file|
      case file
      when "-"
        io_in.each_line {|line| line.chomp!; io_out.puts line}
      else
        File.foreach(file) {|line| io_out.puts line}
      end
    end
  end
end

Now you example above looks like this

p = Pipe.new
p << CAT["file1"]
p << GREP["/foobar/"]
p << CAT["-", "file2]
p.execute_processes

:slight_smile:

For using messages to pass through pipes I suggest to write a wrapper
class around Pipe which uses lambdas with a single argument and pass
the return value through the chain.

Kind regards

robert

···

On Wed, Jan 22, 2014 at 2:27 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

OK, the use of "-" inside the CAT lambda I could perhaps tolerate. But
in the invocation I think it looks silly.

Why that? You could also replace this with something else, e.g. symbol :input.

Now, I tried something crufty
instead that happens to work!?!:

CAT = lambda do |*args|
  lambda do |io_in, io_out|
    io_in.each_line {|line| line.chomp!; io_out.puts line} unless
io_in.inspect =~ /STDIN/

IMHO it does not make sense to hard code this decision inside the
lambda. This is too inflexible because the head of the pipeline could
be some other IO object OR you really want to read from stdin. Plus,
if you make the decision in the argument list of the call you can
determine the position where you phase the input in.

    args.each do |file|
      File.foreach(file) {|line| io_out.puts line}
    end
  end
end

p = Pipe.new
p << CAT["table.txt"]
p << CAT["foo.tab"]
p << lambda {|r,w| r.each {|x| puts x}}
p.execute_processes

I don't understand why you insist on wasting another process if you
read input files sequentially anyway.

One could perhaps patch class IO and add a stdin? method?

That is not necessary - see above.

Btw. in terms of efficiency it may be reasonable to implement CAT with
fixed buffer instead of line based:

CAT = lambda do |*args|
  args << :input if args.empty?

  lambda do |io_in, io_out|
    buffer = ''

    args.each do |file|
      case file
      when :input
        while (buffer = io_in.read(4_096, buffer))
          io_out.write(buffer)
        end
      else
        File.open(file) do |io|
          while (buffer = io.read(4_096, buffer))
            io_out.write(buffer)
          end
        end
      end
    end
  end
end

Kind regards

robert

···

On Wed, Jan 22, 2014 at 8:15 PM, Martin Hansen <lists@ruby-forum.com> wrote:

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

I don't think like 143 will work as IO#write usually returns the
number of bytes written. Also, throughput wise it's probably not a
good idea to flush every message.

Also, as I've said before I'd rather stack the msg handling pipeline
class on top of Pipe. That way you keep concerns nicely separated and
can use class Pipe for other purposes as well. The way you did it you
created a class that can only handle MessagePack messages.

Kind regards

robert

···

On Thu, Jan 30, 2014 at 9:31 PM, Martin Hansen <lists@ruby-forum.com> wrote:

Got it. Gist updated.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Sure, I'd try it!

···

Ryan Davis <ryand-ruby@zenspider.com> wrote:

On Jan 19, 2014, at 1:17, Eric Wong <normalperson@yhbt.net> wrote:
> # I would've written it in Ruby but I couldn't find a graphing
> # library in Ruby with ASCII/UTF-8 text output.

If I'm allowed to use aalib, I can have one for you in about 10 more
lines of code. :stuck_out_tongue:

Here's a demo of what I mean:

#!/usr/bin/ruby

r, w = IO.pipe

p2 = fork {
  w.close
  r.each do |line|
    line.chomp!
    puts "Got: '#{line}'"
  end
}

p1 = fork {
  r.close
  10.times {|i| w.puts i}
  w.close
}

r.close
# w.close # uncomment to make hang go away

puts "waiting for #{p2}..."
Process.waitpid p2
puts "finished"

The situation becomes worse with multiple processes and pipes being
generated because they are all inherited by all the processes.

I created a solution that seems to work:

Main difference to your API: I use lambdas as abstraction of work to
do because this is more flexible than your approach with methods in
the Pipe class. Lambads receive an IO to read from and one to write
to.

Kind regards

robert

···

On Mon, Jan 20, 2014 at 1:54 PM, Robert Klemme <shortcutter@googlemail.com> wrote:

On Sun, Jan 19, 2014 at 6:47 PM, Martin Hansen <lists@ruby-forum.com> wrote:

and it hangs after dumping big.tab. Again, I suspect bad pipe hangling
and I uploaded the traces from that one as well as trace_3step.tar. In
fact I uploaded code, test files and traces.

One more potential source of issues: you open all the pipes from the
main process. However, child processes will inherit those pipe
objects and if they are not closed there they will keep the pipeline
alive. That might be the issue you face.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

I fixed it:

My FD closing scheme was broken because the parent process had the FDs
open far too long. I now changed it that FDs are closed early as
possible. I also move the execution code into separate classes for
thread and process execution to better separate the code and reduce
number of arguments of methods.

Kind regards

robert

···

On Tue, Jan 21, 2014 at 11:50 PM, Robert Klemme <shortcutter@googlemail.com> wrote:

I have updated my version but there is the opposite problem: one of
the IOs is closed too early. :slight_smile: You can see current state of affairs
in the gist. I have to go to bed now.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

...

Now, I totally believe in "separation of concerns" and this is exactly
what I am trying to achieve with these pipelines. The idea is to pass
data records (hashes marshaled with msgpack)

Btw. why do you use msgpack? Is there any advantage over Marshal? If
you want to use individual messages for exchange instead of the raw
binary stream you could maybe add another layer which will handle
serialization and deserialization so individual lambdas receive only
one argument (an object) and the returned value will be sent off to
the next stage.

Some advantages to be aware of (not necessarily relevant to Martin's system):

1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a writer pauses while writing output, the reader must wait as well; the reader's state cannot be saved while the thread does other work.

But msgpack's Unpacker#feed and #each methods can be used to buffer incoming data and yield the fully parsed objects as they are available. If the writer pauses in the middle of an object, the reader can make progress on some other task, and then continue with the #feed and #each when more data is available (checking with IO#select). (This is probably more useful in a message broker design than in Martin's pipelines.) You can also do this with yajl / json, BTW.[1]

2. You can write processing code in languages other than ruby.

And of course there is the speed and compactness, as Martin mentioned.

···

On 01/22/2014 04:43 AM, Robert Klemme wrote:

On Wed, Jan 22, 2014 at 12:43 PM, Martin Hansen <lists@ruby-forum.com> wrote:

---

[1] I wrote a gem called object-stream that manages streams of objects with different serializers (msgpack, marshal, yaml, yajl) and also with different transports (File, Pipe, Socket, StringIO). Here's an example that shows the non-blocking advantage of msgpack (or yajl), compared to marshal and yaml:

Why that? You could also replace this with something else, e.g. symbol
:input.

I favor a minimalistic invocation and the "-" or a symbol appears
superfluous.

I don't understand why you insist on wasting another process if you
read input files sequentially anyway.

It is a common situation in bioinformatics that you need pieces of
information from several files in different format. One could be a table
and one could be something ala XML. I would create a READ_TABLE and
READ_XML command that read in AND parses these formats:

p = Pipe.new
p << READ_TABLE
p << READ_XML
p << DO_SCIENTIFIC_CALCULATION
p << WRITE_TABLE
p.execute_processes

I hope this explains the need for records to be passed along.

Btw. in terms of efficiency it may be reasonable to implement CAT with
fixed buffer instead of line based:

Eventually, there will not be a CAT command :o) - I have just used that
as an example. I will require commands for parsing nasty bioinformatic
formats like GenBank, GFF, SAM etc - and optimizing that is a whole
different story!

And Robert, I really truly value your help. It is most inspiring!

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/\.

That's good to know! Thank you, Joel.

Kind regards

robert

···

On Wed, Jan 22, 2014 at 9:07 PM, Joel VanderWerf <joelvanderwerf@gmail.com> wrote:

Some advantages to be aware of (not necessarily relevant to Martin's
system):

1. Msgpack can use non-blocking io (IO#readpartial). Using Marshal, if a
writer pauses while writing output, the reader must wait as well; the
reader's state cannot be saved while the thread does other work.

2. You can write processing code in languages other than ruby.

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

Also, as I've said before I'd rather stack the msg handling pipeline
class on top of Pipe. That way you keep concerns nicely separated and
can use class Pipe for other purposes as well. The way you did it you
created a class that can only handle MessagePack messages.

Again, I get it working one way and then there is some other superior
way :o) - I'll give it a shot.

Now, I want two additional features.

First, I want a to_s method in the Pipe class that return a string with
the commands and options run like: "Pipe.new.add(:cat. input:
"foo").add(:save, output: "bar").add(:dump).run". Strings like this I
would like to log in a history file for documentation of commands run
and easy rerunning (in irb). I think there might be a problem with this,
with the lambdas giving the separation of the Pipe and commands (from
the Pipe class there is no way to see what is in the lambdas)?

Second, I would like to keep track of some basic statistics from each
command: number of records in and out, runtime, and keys seen (I will be
using records consisting of simple hashes). Each command could write a
temporary file and then the stats could be compiled after execution.
Alternative, the stats could be passed along the IO stream as the last
record, or each command should also have separate pipes inter process
communication of stats.

Cheers,

Martin

···

--
Posted via http://www.ruby-forum.com/\.

I'm just gonna throw two ideas out there:

class Pipe
  def << work
    # whatever you need to do w/ work
    self
  end
end

such that:

p = Pipe.new
p << READ_TABLE
  << READ_XML
  << DO_SCIENTIFIC_CALCULATION
  << WRITE_TABLE
p.execute_processes

or:

class Pipe
  def | work
    # whatever you need to do w/ work
    self
  end
end

such that:

p = Pipe.new | READ_TABLE | READ_XML | DO_SCIENTIFIC_CALCULATION | WRITE_TABLE
p.execute_processes

···

On Jan 22, 2014, at 12:21, Martin Hansen <lists@ruby-forum.com> wrote:

p = Pipe.new
p << READ_TABLE
p << READ_XML
p << DO_SCIENTIFIC_CALCULATION
p << WRITE_TABLE
p.execute_processes