I've been trying to find better ways to express one-off
pipelines as concisely as I can in Bourne shells. In other
words, being able to tie completely unrelated programs together
in any *nix-like system like this:
curl $URL | tee keep.gz | zcat | git fast-import
Obviously, I could use system() from Ruby to spawn a shell, and
be done with it:
system("curl #{url} | tee keep.gz | zcat | git fast-import")
But there's no error-checking, above, so I would need bash
(or ksh) to enable "pipefail" (and have curl use "-f"):
system(*%W(bash -o pipefail -c),
"curl -f #{url} | tee keep.gz | zcat | git fast-import")
But yeah, that's still an extra process and I don't want to
depend on bash or ksh being available, either. (I normally use
dash instead of bash since it's a little bit faster).
There's other downsides to using shell, too, like quoting
arguments properly. I don't even use wierd filenames, but stuff
like the "mcompand" effect in sox forces spaces and quoting on
me, so I'd have to remember to use "shellwords" from the
stdlib...
So, I wonder if some syntax in pure Ruby (no using *sh)
can be close to the conciseness of Bourne shell.
So far, this is what I can come up with:
run_pipeline([
%W(curl -f #{url}),
%w(tee keep.gz),
%w(zcat),
%w(git fast-import)
])
*shrug* %W is useful, here. Thoughts?
Implementation below (and it following supports throwing
Proc objects into the pipeline running as a forked process,
too).
----------------------8<------------------------
module MyPipeline
# Process.spawn wrapper which supports running Proc-like objects in
# a separate process, not just external commands.
# Returns the pid of the spawned process
def pspawn(cmd, rdr = {})
case cmd
when Array
Process.spawn(*cmd, rdr)
else # support running Proc-like objects, too:
fork do
# setup redirects
[ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
dst = rdr[fd] and io.reopen(dst)
end
# close all other pipes, since we can't rely on FD_CLOEXEC
# (as we do not exec, here)
rdr.each do |k, v|
k.close if v == :close
end
cmd.call
end
end
end
# +pipeline+ is an Array of (Arrays or Procs)
def run_pipeline(pipeline)
pids = {} # pid => pipeline index
work = pipeline.dup
last = work.pop
nr = work.size
rdr = {} # redirect mapping for Process.spawn
# we need to make sure pipes are closed in any forked processes
# (they are redirected to stdin or stdout, first)
pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }
# start the first and last commands first, they only have one pipe, each
last_pid = pspawn(last, rdr.merge(0 => pipes[-1][0]))
pids[last_pid] = nr
first = work.shift
first_pid = pspawn(first, rdr.merge(1 => pipes[0][1]))
pids[first_pid] = 0
# start the middle commands, they both have two pipes:
work.each_with_index do |cmd, i|
pid = pspawn(cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
pids[pid] = i + 1
end
# all pipes handed off to children, close so they see EOF
pipes.flatten!.each(&:close).clear
# wait for children to finish
fails = []
until pids.empty?
pid, status = Process.waitpid2(-1)
nr = pids.delete(pid)
status.success? or
fails << "#{nr} #{pipeline[nr].inspect} #{status.inspect}"
end
# detect errors like "set -o pipefail" in bash
raise RuntimeError, "child failures:\n#{fails.join("\n")}" if fails[0]
end
end
# Example usage:
include MyPipeline
# git log | tr a-z A-Z | ruby -pe '...'
run_pipeline([
%w(git log), # anything which generates something to stdout
%w(tr a-z A-Z), # upcase
# these lambdas each run inside their own process
lambda do
$stdin.each_line do |l|
$stdout.write("#{l.chomp.reverse}\n")
end
end,
lambda do
$stdin.each_line do |l|
$stdout.write("#{l.chomp.downcase.reverse}\n")
end
end,
])
I suppose one could make proper objects here and allow
subshells/chainability, somehow, too...
Above code samples is CC0 or 2-clause BSD (your choice) in case
you care to reuse it, no warranty of course. I don't even want
credit so I prefer CC0.