Expressing *nix process pipelines?

I've been trying to find better ways to express one-off
pipelines as concisely as I can in Bourne shells. In other
words, being able to tie completely unrelated programs together
in any *nix-like system like this:

  curl $URL | tee keep.gz | zcat | git fast-import

Obviously, I could use system() from Ruby to spawn a shell, and
be done with it:

  system("curl #{url} | tee keep.gz | zcat | git fast-import")

But there's no error-checking, above, so I would need bash
(or ksh) to enable "pipefail" (and have curl use "-f"):

  system(*%W(bash -o pipefail -c),
         "curl -f #{url} | tee keep.gz | zcat | git fast-import")

But yeah, that's still an extra process and I don't want to
depend on bash or ksh being available, either. (I normally use
dash instead of bash since it's a little bit faster).

There's other downsides to using shell, too, like quoting
arguments properly. I don't even use wierd filenames, but stuff
like the "mcompand" effect in sox forces spaces and quoting on
me, so I'd have to remember to use "shellwords" from the
stdlib...

So, I wonder if some syntax in pure Ruby (no using *sh)
can be close to the conciseness of Bourne shell.

So far, this is what I can come up with:

  run_pipeline([
    %W(curl -f #{url}),
    %w(tee keep.gz),
    %w(zcat),
    %w(git fast-import)
  ])

*shrug* %W is useful, here. Thoughts?

Implementation below (and it following supports throwing
Proc objects into the pipeline running as a forked process,
too).

----------------------8<------------------------
module MyPipeline

  # Process.spawn wrapper which supports running Proc-like objects in
  # a separate process, not just external commands.
  # Returns the pid of the spawned process
  def pspawn(cmd, rdr = {})
    case cmd
    when Array
      Process.spawn(*cmd, rdr)
    else # support running Proc-like objects, too:
      fork do
        # setup redirects
        [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
          dst = rdr[fd] and io.reopen(dst)
        end

        # close all other pipes, since we can't rely on FD_CLOEXEC
        # (as we do not exec, here)
        rdr.each do |k, v|
          k.close if v == :close
        end
        cmd.call
      end
    end
  end

  # +pipeline+ is an Array of (Arrays or Procs)
  def run_pipeline(pipeline)
    pids = {} # pid => pipeline index
    work = pipeline.dup
    last = work.pop
    nr = work.size
    rdr = {} # redirect mapping for Process.spawn

    # we need to make sure pipes are closed in any forked processes
    # (they are redirected to stdin or stdout, first)
    pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }

    # start the first and last commands first, they only have one pipe, each
    last_pid = pspawn(last, rdr.merge(0 => pipes[-1][0]))
    pids[last_pid] = nr
    first = work.shift
    first_pid = pspawn(first, rdr.merge(1 => pipes[0][1]))
    pids[first_pid] = 0

    # start the middle commands, they both have two pipes:
    work.each_with_index do |cmd, i|
      pid = pspawn(cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
      pids[pid] = i + 1
    end

    # all pipes handed off to children, close so they see EOF
    pipes.flatten!.each(&:close).clear

    # wait for children to finish
    fails = []
    until pids.empty?
      pid, status = Process.waitpid2(-1)
      nr = pids.delete(pid)
      status.success? or
        fails << "#{nr} #{pipeline[nr].inspect} #{status.inspect}"
    end
    # detect errors like "set -o pipefail" in bash
    raise RuntimeError, "child failures:\n#{fails.join("\n")}" if fails[0]
  end
end

# Example usage:

include MyPipeline

# git log | tr a-z A-Z | ruby -pe '...'
run_pipeline([
  %w(git log), # anything which generates something to stdout
  %w(tr a-z A-Z), # upcase
  # these lambdas each run inside their own process
  lambda do
    $stdin.each_line do |l|
      $stdout.write("#{l.chomp.reverse}\n")
    end
  end,
  lambda do
    $stdin.each_line do |l|
      $stdout.write("#{l.chomp.downcase.reverse}\n")
    end
  end,
])

I suppose one could make proper objects here and allow
subshells/chainability, somehow, too...

Above code samples is CC0 or 2-clause BSD (your choice) in case
you care to reuse it, no warranty of course. I don't even want
credit so I prefer CC0.

It's a long time ago that I thought about this myself. Obviously using
operator | would be nice for this but I did not yet find an elegant
solution. One part of this is that execution of the operator cannot
execute processes because you do not know how long the pipeline will
be. So you would need some kind of final execute operation which makes
it a bit ugly. Maybe something like this could work:

pipeline {
  curl $URL | tee "keep.gz" | zcat | git 'fast-import'
}

The block would be executed via instance_eval on a BasicObject which
just collects method calls via #method_missing.

Kind regards

robert

···

On Thu, Jun 2, 2016 at 3:55 AM, Eric Wong <e@80x24.org> wrote:

I've been trying to find better ways to express one-off
pipelines as concisely as I can in Bourne shells. In other
words, being able to tie completely unrelated programs together
in any *nix-like system like this:

  curl $URL | tee keep.gz | zcat | git fast-import

--
[guy, jim, charlie].each {|him| remember.him do |as, often| as.you_can
- without end}
http://blog.rubybestpractices.com/

> I've been trying to find better ways to express one-off
> pipelines as concisely as I can in Bourne shells. In other
> words, being able to tie completely unrelated programs together
> in any *nix-like system like this:
>
> curl $URL | tee keep.gz | zcat | git fast-import

It's a long time ago that I thought about this myself. Obviously using
operator | would be nice for this but I did not yet find an elegant
solution. One part of this is that execution of the operator cannot
execute processes because you do not know how long the pipeline will
be.

Yeah, '|' would be nice; but I forgot to mention I also want to
manipulate the the pipeline programatically; perhaps even being
able to modify a running pipeline in response to user input[1].

So I figured arrays are nice and natural for that.
User input for modifications might also happen over JSON/YAML,
so it would be easier to serialize/deserialize

So you would need some kind of final execute operation which makes
it a bit ugly. Maybe something like this could work:

pipeline {
  curl $URL | tee "keep.gz" | zcat | git 'fast-import'
}

The block would be executed via instance_eval on a BasicObject which
just collects method calls via #method_missing.

I also worry about method_missing and like causing conflicts and
introducing unnecessary subtleties.

[1] dtas-player already kinda does that, but needs to force into
    two processes, currently, so you can edit either side of
    the main pipeline, but it needs to restart the entire
    part contained within the ()

      (source|filter1|filter2|...) | (filter3|filter4|...|sink)

    Ideally, one could swap out/remove/add any part at will
    without losing or even noticeably delaying data:

      source | filter1 | filter2 | filter3 | filter4 | ... | sink

    And even tee off at arbitrary points to different sinks:

                                 > filter3a | filter4a | ... | sink_a
                                 >
      source | filter1 T filter2 T
                       > >
                       > > filter3b | filter4b | ... | sink_b
                       >
                       >
                       > filter3c | filter4b | ... | sink_c

    ref: https://dtas.80x24.org/

Now, how to best express tee-ing to processes? :slight_smile:

  run_pipeline([ %W(curl {#URL}), %w(zcat), [ :tee,
      # first process for pristine import
      %w(git --git-dir=orig.git fast-import),

      # second process for cleanup: cleanup | git .. fast-import
      [ %w(cleanup), %w(git --git-dir=clean.git fast-import) ],

      # potentially more processes could go here
    ]])

···

Robert Klemme <shortcutter@googlemail.com> wrote:

On Thu, Jun 2, 2016 at 3:55 AM, Eric Wong <e@80x24.org> wrote:

> I've been trying to find better ways to express one-off
> pipelines as concisely as I can in Bourne shells. In other
> words, being able to tie completely unrelated programs together
> in any *nix-like system like this:
>
> curl $URL | tee keep.gz | zcat | git fast-import

It's a long time ago that I thought about this myself. Obviously using
operator | would be nice for this but I did not yet find an elegant
solution. One part of this is that execution of the operator cannot
execute processes because you do not know how long the pipeline will
be.

Yeah, '|' would be nice; but I forgot to mention I also want to
manipulate the the pipeline programatically; perhaps even being
able to modify a running pipeline in response to user input[1].

Hmm... That would not be a requirement for me. Maybe that is a tad
ambitious - and also, it's not what the shell allows. My goal back
then was it to find a syntax which allows to set up pipelines as
easily as the shell does for cases where it's needed. Your solution
seems to be more geared towards complete control of other process
pipelines.

So I figured arrays are nice and natural for that.
User input for modifications might also happen over JSON/YAML,
so it would be easier to serialize/deserialize

Again, a requirement I never thought of. :slight_smile:

So you would need some kind of final execute operation which makes
it a bit ugly. Maybe something like this could work:

pipeline {
  curl $URL | tee "keep.gz" | zcat | git 'fast-import'
}

The block would be executed via instance_eval on a BasicObject which
just collects method calls via #method_missing.

I also worry about method_missing and like causing conflicts and
introducing unnecessary subtleties.

Yeah, as I said, I have still to find a satisfying solution.

[1] dtas-player already kinda does that, but needs to force into
    two processes, currently, so you can edit either side of
    the main pipeline, but it needs to restart the entire
    part contained within the ()

      (source|filter1|filter2|...) | (filter3|filter4|...|sink)

    Ideally, one could swap out/remove/add any part at will
    without losing or even noticeably delaying data:

      source | filter1 | filter2 | filter3 | filter4 | ... | sink

    And even tee off at arbitrary points to different sinks:

                                 > filter3a | filter4a | ... | sink_a
                                 >
      source | filter1 T filter2 T
                       > >
                       > > filter3b | filter4b | ... | sink_b
                       >
                       >
                       > filter3c | filter4b | ... | sink_c

    ref: https://dtas.80x24.org/

Now, how to best express tee-ing to processes? :slight_smile:

  run_pipeline([ %W(curl {#URL}), %w(zcat), [ :tee,
      # first process for pristine import
      %w(git --git-dir=orig.git fast-import),

      # second process for cleanup: cleanup | git .. fast-import
      [ %w(cleanup), %w(git --git-dir=clean.git fast-import) ],

      # potentially more processes could go here
    ]])

Frankly, I find that pretty ugly. In that case I think I prefer a more
explicit API which uses verbs like "add_process" or
"connect_processes" by which you construct the pipeline or tree of
processes.

Kind regards

robert

···

On Tue, Jun 7, 2016 at 4:18 AM, Eric Wong <e@80x24.org> wrote:

Robert Klemme <shortcutter@googlemail.com> wrote:

On Thu, Jun 2, 2016 at 3:55 AM, Eric Wong <e@80x24.org> wrote:

--
[guy, jim, charlie].each {|him| remember.him do |as, often| as.you_can
- without end}
http://blog.rubybestpractices.com/

>> > I've been trying to find better ways to express one-off
>> > pipelines as concisely as I can in Bourne shells. In other
>> > words, being able to tie completely unrelated programs together
>> > in any *nix-like system like this:
>> >
>> > curl $URL | tee keep.gz | zcat | git fast-import
>>
>> It's a long time ago that I thought about this myself. Obviously using
>> operator | would be nice for this but I did not yet find an elegant
>> solution. One part of this is that execution of the operator cannot
>> execute processes because you do not know how long the pipeline will
>> be.
>
> Yeah, '|' would be nice; but I forgot to mention I also want to
> manipulate the the pipeline programatically; perhaps even being
> able to modify a running pipeline in response to user input[1].

Hmm... That would not be a requirement for me. Maybe that is a tad
ambitious - and also, it's not what the shell allows.

Actually, I use variables all the time to build commands in shell:

  cmd='echo unknown system'
  if test -d .git
  then
    cmd='git log'
  elsif test -d .svn
  then
    cmd='svn-cat-log' # old caching version of "svn log"
  fi
  exec $cmd

My goal back
then was it to find a syntax which allows to set up pipelines as
easily as the shell does for cases where it's needed. Your solution
seems to be more geared towards complete control of other process
pipelines.

Yes, but I was thinking of a way to do both, actually.

Being more interactive than existing levels of interactivity
provided by the like of bash/ksh/irb, at least :slight_smile:

> Now, how to best express tee-ing to processes? :slight_smile:
>
> run_pipeline([ %W(curl {#URL}), %w(zcat), [ :tee,
> # first process for pristine import
> %w(git --git-dir=orig.git fast-import),
>
> # second process for cleanup: cleanup | git .. fast-import
> [ %w(cleanup), %w(git --git-dir=clean.git fast-import) ],
>
> # potentially more processes could go here
> ]])

Frankly, I find that pretty ugly. In that case I think I prefer a more
explicit API which uses verbs like "add_process" or
"connect_processes" by which you construct the pipeline or tree of
processes.

*shrug* The explicit API seems unnecessarily programmatic; but
my target audience is sysadmins and command-line users who are
familiar with editing config files and simple makefiles, not
necessarily programmers who are expected to learn an API.

The current approach taken by dtas-player is the user edits a
YAML file with commands from their favorite $EDITOR,
saves+closes the file, and the updated command is sent to
the shell or process manager to replace running commands.

Maybe my approach is Lisp-ier? My apologies if I insult
a Lisper here. I've never taken the time to study any Lisp;
I see lots of parentheses and that confuses me :x

···

Robert Klemme <shortcutter@googlemail.com> wrote:

On Tue, Jun 7, 2016 at 4:18 AM, Eric Wong <e@80x24.org> wrote:
> Robert Klemme <shortcutter@googlemail.com> wrote:
>> On Thu, Jun 2, 2016 at 3:55 AM, Eric Wong <e@80x24.org> wrote:

We built a gem called Caliph to provide substantially this functionality.
Also notable is that commands can be run against a mock shell instance and
collected for testing instead of actually running. Overloading the |
operator for pipelines was a definite feature.

Judson

···

On Tue, Jun 7, 2016 at 2:59 PM Eric Wong <e@80x24.org> wrote:

Robert Klemme <shortcutter@googlemail.com> wrote:
> On Tue, Jun 7, 2016 at 4:18 AM, Eric Wong <e@80x24.org> wrote:
> > Robert Klemme <shortcutter@googlemail.com> wrote:
> >> On Thu, Jun 2, 2016 at 3:55 AM, Eric Wong <e@80x24.org> wrote:
> >> > I've been trying to find better ways to express one-off
> >> > pipelines as concisely as I can in Bourne shells. In other
> >> > words, being able to tie completely unrelated programs together
> >> > in any *nix-like system like this:
> >> >
> >> > curl $URL | tee keep.gz | zcat | git fast-import
> >>
> >> It's a long time ago that I thought about this myself. Obviously using
> >> operator | would be nice for this but I did not yet find an elegant
> >> solution. One part of this is that execution of the operator cannot
> >> execute processes because you do not know how long the pipeline will
> >> be.
> >
> > Yeah, '|' would be nice; but I forgot to mention I also want to
> > manipulate the the pipeline programatically; perhaps even being
> > able to modify a running pipeline in response to user input[1].
>
> Hmm... That would not be a requirement for me. Maybe that is a tad
> ambitious - and also, it's not what the shell allows.

Actually, I use variables all the time to build commands in shell:

        cmd='echo unknown system'
        if test -d .git
        then
                cmd='git log'
        elsif test -d .svn
        then
                cmd='svn-cat-log' # old caching version of "svn log"
        fi
        exec $cmd

> My goal back
> then was it to find a syntax which allows to set up pipelines as
> easily as the shell does for cases where it's needed. Your solution
> seems to be more geared towards complete control of other process
> pipelines.

Yes, but I was thinking of a way to do both, actually.

Being more interactive than existing levels of interactivity
provided by the like of bash/ksh/irb, at least :slight_smile:

> > Now, how to best express tee-ing to processes? :slight_smile:
> >
> > run_pipeline([ %W(curl {#URL}), %w(zcat), [ :tee,
> > # first process for pristine import
> > %w(git --git-dir=orig.git fast-import),
> >
> > # second process for cleanup: cleanup | git .. fast-import
> > [ %w(cleanup), %w(git --git-dir=clean.git fast-import) ],
> >
> > # potentially more processes could go here
> > ]])
>
> Frankly, I find that pretty ugly. In that case I think I prefer a more
> explicit API which uses verbs like "add_process" or
> "connect_processes" by which you construct the pipeline or tree of
> processes.

*shrug* The explicit API seems unnecessarily programmatic; but
my target audience is sysadmins and command-line users who are
familiar with editing config files and simple makefiles, not
necessarily programmers who are expected to learn an API.

The current approach taken by dtas-player is the user edits a
YAML file with commands from their favorite $EDITOR,
saves+closes the file, and the updated command is sent to
the shell or process manager to replace running commands.

Maybe my approach is Lisp-ier? My apologies if I insult
a Lisper here. I've never taken the time to study any Lisp;
I see lots of parentheses and that confuses me :x

Unsubscribe: <mailto:ruby-talk-request@ruby-lang.org?subject=unsubscribe>
<http://lists.ruby-lang.org/cgi-bin/mailman/options/ruby-talk&gt;