[QUIZ] Process Rings (#135)

The three rules of Ruby Quiz:

1. Please do not post any solutions or spoiler discussion for this quiz until
48 hours have passed from the time on this message.

2. Support Ruby Quiz by submitting ideas as often as you can:

http://www.rubyquiz.com/

3. Enjoy!

Suggestion: A [QUIZ] in the subject of emails about the problem helps everyone
on Ruby Talk follow the discussion. Please reply to the original quiz message,
if you can.

···

-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

I recently wrote about a challenge in the Programming Erlang book on my blog:

  http://blog.grayproductions.net/articles/2007/08/13/erlang-message-passing

Language comparison issues aside, just figuring out how to build a ring of
"processes" was quite the brain bender for me. That always makes for good Ruby
Quiz material, in my opinion.

The task is straight forward:

  1. Your program should take two command-line arguments: a number of
     processes and a number of cycles.
  2. Begin by creating the requested number of processes, in a ring.
     For example, when three processes are requested, process one
     creates and sends messages to process two, which creates and sends
     messages to process three. The third process then sends its
     messages back to process one.
  3. Pass a message around your ring of processes a number of times
     equal to the requested cycles. Print timing results for how
     long this takes.

The message you pass doesn't much matter. A simple String is fine. You may
also wish to pass a counter with it, to verify the correct number of sends.

I'll leave the definition of "processes" intentionally vague. Ruby doesn't have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.

Ok.... this is my first attempt at using callcc in ruby. The code
is not what I would call beautiful, but should be readable...(??).
Suggestions are most welcome!

#!/usr/bin/env ruby
# vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
    def initalize
        @nxt = nil
    end
    attr_accessor :nxt
    def makecont
        passesleft, message = callcc { |cont|
            return cont
        }
        if passesleft <= 0
            puts $n if $DEBUG
            exit 0
        end
        $n += 1 if $DEBUG
        @nxt.call(passesleft - 1, message)
    end
end

def run(n, cycles, msg)
    process = Array.new(n) { RLWP.new }
    cont = process.collect { |p| p.makecont }
    process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
    cont[0].call(n * cycles, msg)
end

run ARGV[0].to_i, ARGV[1].to_i, "xyzzy"

tom@molly:~/src/quiz-135% time ruby -d quiz-135-callcc.rb 1000 1000
1000000
ruby -d quiz-135-callcc.rb 1000 1000 10.781 user 0.037 system 99% cpu 10.868 total
tom@molly:~/src/quiz-135%

At somewhere around 12000 processes ruby started crashing on my system,
maybe a memory issue?

tom@molly:~/src/quiz-135% time ruby quiz-135-callcc.rb 12000 3
[2] 30579 illegal hardware instruction (core dumped) ruby quiz-135-callcc.rb 12000 3
ruby quiz-135-callcc.rb 12000 3 1.695 user 0.713 system 32% cpu 7.418 total
tom@molly:~/src/quiz-135%

regards,
-tom

I've been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

My solution has 4 parts:
- a pair of programs in an as yet unnamed language. Each program
sends a message to the next process. One program implements a counter
to stop the loop at the end.

- A Compiler for this language. To keep the compiler simple, the
parser has no look-ahead. One result of this is that all operators
have left associativity, so
'n=n+1', 'n=1+n', and 'n=(n+1)' all set n to different values, and
only the last one is unsuprising. The compiler returns an array
containing 'assembly', which is essentially bytecode, except the
codes are ruby symbols, not bytes.

- The InstructionSet, which contains a method for each VM instruction

- The virtual CPU. Schedules and runs a set of Proceses. A Process
executes assembly by sending each symbol to the instruction set.

This solution may be the slowest one submitted, but it was interesting to write.
-Adam

---BEGIN SOLUTION---
#processring.rb
# for Ruby Quiz #135
# Adam Shelly

#Implements a virtual machine and a compiler for a simple language
# language definition:
# types: ints, strings
# variables don't need to be declared in advance (works like ruby)
# only 3 keywords: 'exit', 'if' and 'while',
# the latter two take the form 'keyword (condition) { body }'.
# the parens and brackets are required
# only 2 operators: '+' and '-'.
# 4 builtin functions: '_peek' returns true if any messages waiting
for this proceses
# '_get' returns first pending message.
# '_send(id, message)' sends message to
process with given id
# '_puts(message)' writes message to stdout
# '%' before a name indicates process variable.
# process variables include: %id = current process id
# %last = value of last expression
# strictly left associative, use parentheses to group.
# be careful with assignments: 'n = 1+1' == '(n=1)+1'
# you usually want to do 'n = (1+1)'

# Here are the two programs we will execute.
# this one just forwards any message to the next process
prog1 = <<PROG
while (0) { _get }
while (1) {
  if (_peek) {
    msg =_get
    _send ((%id+1),msg)
  }
}
PROG

# This one generates a message and sends it to process 0, n times.
# It will be the last process so we can close the ring.
prog2 = <<PROG
n = _get
_send (0,"chunky bacon")
while (n ) {
  if (_peek) {
    msg = _get
    _send (0,msg)
    n = ( n - 1)
    _puts ( n )
  }
}
_puts ( "done" )
exit
PROG

# The Compiler turns program text into "assembly"
class Compiler
    @@symbols = {}
    #register keywords
    %w{while if end exit}.each{|kw| @@symbols[kw]=kw.to_sym}
    #register builtins
    %w{_peek _get _send _puts}.each{|bi| @@symbols[bi] = :builtin}

  def self.compile code
    asm =
    text = code
    text = text.dup #don't destroy original code
    token,name = parse text
    while (token)
      p token if $DEBUG
      case token
        when :while,:if,:exit,:end,:add,:subtract,:assign,:comma
          asm << token
        when :localvar,:procvar,:builtin,:num,:string
          asm << token
          asm << name
        when :startgroup,:startblock
          asm << token
          asm << 0 #placeholder for size of group/block
        when :endgroup
          startgroup = asm.rindex(:startgroup)
          asm[startgroup] = :group
          asm[startgroup+1] = asm.size-startgroup-2 #store groupsize
        when :endblock
          startblock = asm.rindex(:startblock)
          asm[startblock] = :block
          asm[startblock+1] = asm.size-startblock #store blocksize
          asm << :endblock
          asm << asm.size+1 #placeholder for looptarget (default is next inst.)
      end
      token,name = parse text
    end
    return asm
  end

private
  def self.parse text, vartype = :localvar
    pt = 0;
    p "parse: #{text}" if $DEBUG
    while (true)
      case (c = text[pt,1])
      when '' #EOF
        return nil

      when /\s/ #skip whitespace
        pt+=1
        next

      when /\d/ #integers
        v = text[pt..-1].to_i
        text.slice!(0..pt+v.to_s.length-1) #remove number
        return :num,v

      when /\w/ #identifiers
        name = /\w*/.match(text[pt..-1])[0]
        text.slice!(0..pt+name.length-1) #remove name
        sym = @@symbols[name]
        sym = register_var(name,vartype) if !sym #unknown identifier is variable
        return sym,name

      when '"' #strings
        name = /".*?[^\\]"/m.match(text[pt..-1])[0]
        text.slice!(0..pt+name.length-1) #remove name
        return :string, name

      when '%' #processes variables
        text.slice!(0..pt)
        token,name = parse text,:procvar
        raise "invalid process variable" if token!= :procvar
        return token,name

      when '=': #punctuation
        text.slice!(0..pt)
        return :assign, c
      when ',':
        text.slice!(0..pt)
        return :comma,c
      when '+'
          text.slice!(0..pt)
          return :add,'+'
      when '-'
          text.slice!(0..pt)
          return :subtract,'-'
      when '('
          text.slice!(0..pt)
          return :startgroup, c
      when ')'
          text.slice!(0..pt)
          return :endgroup, c
      when '{'
          text.slice!(0..pt)
          return :startblock, c
      when '}'
          text.slice!(0..pt)
          return :endblock, c
      end #case
    end #while
  end

  def self.register_var name,type
    @@symbols[name] = type
  end
end

#The cpu instruction set.
#each instruction is the equivalent of a VM bytecode.
class InstructionSet
  def initialize cpu
    @cpu = cpu
  end

  def exit proc #halt the cpu
    @cpu.halt
  end
  def end proc #end the current process
    @cpu.end_process proc.id
  end

  def while proc
    loopp = proc.pc-1
    test = proc.exec
    blocksize = proc.exec
    if test && test != 0
      #if we are going to loop, store the loop start address at the
end of the block
      proc.pm[proc.pc+blocksize-1] = loopp
    else
      proc.pc += blocksize
    end
  end
  def if proc
    test = proc.exec
    blocksize = proc.exec
    if !test || test == 0
      proc.pc += blocksize
    end
  end

  def block proc
    blocksize = proc.pop
  end
  def endblock proc
    jumptarg = proc.pop #after block, maybe jump somewhere
    proc.pc = jumptarg
  end

  def group proc
    groupsize = proc.pop
    endgroup = proc.pc+groupsize
    while (proc.pc < endgroup)
      val = proc.exec
    end
    return val
  end

  def num proc
    proc.pop
  end
  def string proc
    proc.pop
  end
  def builtin proc
    inst = proc.pop
    @cpu.send(inst,proc)
  end
  def localvar proc
    varname = proc.pop
    proc.getvar varname
  end
  def procvar proc
    varname = proc.pop
    proc.send varname
  end
  def assign proc
    proc.setvar(proc.exec)
  end
  def comma proc
    return :comma
  end
  def add proc
    return proc.last + proc.exec
  end
  def subtract proc
    return proc.last - proc.exec
  end

  #returns elements of group as array
  #used to evaluate arguments for function call
  def ungroup proc
    args =
    proc.pop #ignore :group
    groupsize = proc.pop
    endgroup = proc.pc+groupsize
    while (proc.pc < endgroup)
      arg = proc.exec
      args << arg unless arg == :comma
    end
    return args
  end
end

#the CPU
# acts as process scheduler
# processes run for TIMESLICE instructions, or until they send or get a message.
# in the latter case, control switches to the process with a message
pending for the longest time
class CPU
  TIMESLICE = 10

  # CProcess is a process on our virtual machine
  # don't create directly, use CPU#add_process
  class CProcess
    attr_accessor :pm,:pc,:id,:last
    def initialize id, code, vm
      @id = id
      @pm = code #program memory
      @pc = 0 #program counter
      @vars = {}
      @curvar = nil
      @vm = vm
    end

    #executes a VM instruction, advances program counter
    def exec
      inst = @pm[@pc]
      p to_s if $DEBUG
      @pc+=1
      @last = @vm.send(inst,self)
    end
    def pop
      @pc+=1
      @pm[@pc-1]
    end

    def getvar name
      @curvar = name
      @vars[name]||=0
    end
    def setvar value
      @vars[@curvar] = value
    end

    def to_s
      "#{@id}@#{@pc}: #{@pm[@pc]} (#{@pm[@pc+1]})"
    end
  end #class Process

  def initialize
    @processes =
    @messages =
    @i = InstructionSet.new self
    @queue=[,] #scheduling queues
  end

  def add_process code
    asm = code.dup
    asm << :end
    id = @processes.size
    @processes << CProcess.new(id, asm,@i)
    @messages[id] =
    @queue[0] << id
    @cur_proc_id = id
  end

  #stop processes by swapping it out if it is running, and removing it
from queues.
  def end_process id
    taskswap 0 if @cur_proc_id == id
    @processes -= [id]
    @queue[0] -= [id]
    @queue[1] -= [id]
  end

  def start
    @running = true
    run
  end
  def halt
    @running = false
  end

  #inject a message into the system
  def send_msg proc_id,msg
    @messages[proc_id]<< msg
    @queue[1]<<proc_id
  end

private
  #run the scheduler
  def run
    @timeslice = 0
    while (@running)
      @processes[@cur_proc_id].exec
      @timeslice+=1
      if (@timeslice > TIMESLICE)
        taskswap 0
      end
    end
  end

  #switch to the next process waiting at this priority level
  def taskswap priority
    @cur_proc_id = @queue[priority].shift||@cur_proc_id
    (@queue[priority] << @cur_proc_id) if priority == 0
    @timeslice = 0
  end

  ## built-in messaging functions
  def _peek proc
    @messages[proc.id][0]
  end
  def _get proc
    retval = @messages[proc.id].shift
    taskswap 1
    return retval
  end
  def _send proc
    #send puts the target process on the high priority queue
    args = @i.ungroup proc
    @messages[args[0]] << args[1]
    @queue[1]<<args[0]
    taskswap 1
    args[1]
  end

  def _puts proc
    args = @i.ungroup proc
    puts args
  end
end

if __FILE__ == $0
puts "usage: #{$0} processes cycles" or exit if ARGV.size < 2
processes, cycles = ARGV.map { |n| n.to_i }

puts "Timer started."
start_time = Time.now
puts "Creating #{processes} processes"

code1 = Compiler.compile prog1
code2 = Compiler.compile prog2
cpu = CPU.new
(processes-1).times { cpu.add_process code1 }
last_proc = cpu.add_process code2

puts "Sending a message around the ring #{cycles} times..."
cpu.send_msg last_proc,cycles
cpu.start
puts "Time in seconds: #{(Time.now - start_time)}"
end

···

On 8/17/07, Ruby Quiz <james@grayproductions.net> wrote:

...
I'll leave the definition of "processes" intentionally vague. Ruby doesn't have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.

Neat idea.

Here's my own version using fork():

#!/usr/bin/env ruby -wKU

unless ARGV.size == 2
   abort "Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES"
end
processes, cycles = ARGV.map { |n| n.to_i }

parent, child = true, false
parent_reader, parent_writer = IO.pipe
reader, writer = IO.pipe
my_reader = parent_reader

puts "Creating #{processes} processes..."
processes.times do |process|
   if fork
     break
   else
     parent_reader.close unless parent_reader.closed?
     writer.close

     parent = false
     my_reader = reader
     reader, writer = IO.pipe
   end
   child = true if process == processes - 1
end
if child
   puts "Done."
   my_writer = parent_writer
else
   parent_writer.close
   my_writer = writer
end

if parent
   puts "Timer started."
   start_time = Time.now
   puts "Sending a message around the ring #{cycles} times..."
   cycles.times do
     my_writer.puts "0 Ring message"
     my_writer.flush
     raise "Failure" unless my_reader.gets =~ /\A#{processes} Ring message\Z/
   end
   puts "Done: success."
   puts "Time in seconds: #{(Time.now - start_time).to_i}"
else
   my_reader.each do |message|
     if message =~ /\A(\d+)\s+(.+)/
       my_writer.puts "#{$1.to_i + 1} #{$2}"
       my_writer.flush
     end
   end
end

__END__

And here's a threaded attempt:

#!/usr/bin/env ruby -wKU

begin
   require "fastthread"
   puts "Using the fastthread library."
rescue LoadError
   require "thread"
   puts "Using the standard Ruby thread library."
end

module MRing
   class Forward
     def initialize(count, parent)
       @child = count.zero? ? parent : Forward.new(count - 1, parent)
       @queue = Queue.new

       run
     end

     def send_message(message)
       @queue.enq message
     end

     private

     def run
       Thread.new do
         loop do
           message = @queue.deq
           if message =~ /\A(\d+)\s+(.+)/
             @child.send_message "#{$1.to_i + 1} #{$2}"
           end
         end
       end
     end
   end

   class Parent < Forward
     def initialize(processes, cycles)
       @processes = processes
       @cycles = cycles

       puts "Creating #{processes} processes..."
       super(processes, self)
     end

     private

     def run
       puts "Timer started."
       start_time = Time.now
       puts "Sending a message around the ring #{@cycles} times..."
       @cycles.times do
         @child.send_message "0 Ring message"
         raise "Failure" unless @queue.deq =~ /\A#{@processes} Ring message\Z/
       end
       puts "Done: success."
       puts "Time in seconds: #{(Time.now - start_time).to_i}"
     end
   end
end

if __FILE__ == $PROGRAM_NAME
   unless ARGV.size == 2
     abort "Usage: #{File.basename($PROGRAM_NAME)} PROCESSES CYCLES"
   end
   processes, cycles = ARGV.map { |n| n.to_i }

   MRing::Parent.new(processes, cycles)
end

__END__

James Edward Gray II

···

On Aug 19, 2007, at 6:49 PM, Tom Danielsen wrote:

Ok.... this is my first attempt at using callcc in ruby.

    def initalize
        @nxt = nil
    end

oops, a slight copy/paste error there..... run() will call
nxt= before calling the continuations so the code still runs,
but I like to set my instance vars in the constructor anyway :slight_smile:

regards,
tom

That gets my vote for the craziest Ruby Quiz solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary earlier today. :frowning:

James Edward Gray II

···

On Aug 22, 2007, at 1:45 PM, Adam Shelly wrote:

On 8/17/07, Ruby Quiz <james@grayproductions.net> wrote:

...
I'll leave the definition of "processes" intentionally vague. Ruby doesn't have
an equivalent to Erlang processes so we will just say that each process should
represent a node where we could run some instructions concurrently. Be
creative.

I've been thinking about virtual machines recently, so I decided to
implement one in Ruby for this quiz.

I'll resubmit my entry,
  - fixed typo "initialize"
  - cleaner exit from message passing code

regards,
-tom

#!/usr/bin/env ruby
# vim:et:ts=4:sw=4

$n = 0 if $DEBUG

class RLWP
    def initialize
        @nxt = nil
    end
    attr_accessor :nxt
    def makecont
        passesleft, message = callcc { |cont|
            return cont
        }
        if passesleft <= 0
            puts $n if $DEBUG
            throw :DONE
        end
        $n += 1 if $DEBUG
        @nxt.call(passesleft - 1, message)
    end
end

def run(n, cycles, msg)
    catch(:DONE) {
        process = Array.new(n) { RLWP.new }
        cont = process.collect { |p| p.makecont }
        process.each_with_index { |p,i| p.nxt = cont[(i+1) % n] }
        cont[0].call(n * cycles, msg)
    }
end

run ARGV[0].to_i, ARGV[1].to_i, "xyzzy"

You have no idea how much I regret that I had a bunch of real work to
do and didn't have time to finish it up sooner. :slight_smile:

I also wish I had time and a valid reason to keep working on extending
the VM. The first thing I'd do is find a good name for the
language...

-Adam

···

On 8/22/07, James Edward Gray II <james@grayproductions.net> wrote:

On Aug 22, 2007, at 1:45 PM, Adam Shelly wrote:
> I've been thinking about virtual machines recently, so I decided to
> implement one in Ruby for this quiz.

That gets my vote for the craziest Ruby Quiz solution ever. Wow!

You have no idea how much I regret that I wrote the quiz summary
earlier today. :frowning:

It is too rare that I find time to solve the Quiz, but this time I
just had to find that time to make it back, and I enjoyed it as ever
:slight_smile:
Here goes my shot:

require 'labrador/enum/map'
require 'labrador/exp/open-proto'
require 'thread'

processes, cycles = ARGV.map.to_i

timer = new_proto{
  def init
    obj_variable :stopped, nil
    obj_variable :started, Time.now
    obj_variable :ellapsed, nil
  end
  def read
    self.ellapsed ||=
      stopped.tv_sec - started.tv_sec + (
          stopped.tv_usec - started.tv_usec
          ) / 1_000_000.0
  end
  def reset
    self.stopped = nil
    self.started = Time.now
  end
  def stop
    self.stopped = Time.now
  end
}
ring_element = new_proto(Prototype::OpenProto){
  define_method :init do |params|
    super
    obj_variable :thread, Thread.new{
    cycles.times do |i|
        m = lhs_queue.deq
        rhs_queue.enq "thread=#{count}::count=#{i}"
      end
    }
  end
}

startup_timer = timer.new
lqueue = Queue.new
all_processes = (2..processes).map{ |count|
  ring_element.new :count => count,
                   :lhs_queue => lqueue,
                   :rhs_queue => ( lqueue = Queue.new )
}
all_processes << ring_element.new(
    :count => 1,
    :lhs_queue => lqueue,
    :rhs_queue => all_processes.first.lhs_queue
    )
startup_timer.stop
run_timer = timer.new
all_processes.last.lhs_queue.enq "Can you please start"
all_processes.map.thread.map.join
run_timer.stop
puts "Startup time for #{processes} processes: %3.6fs" % startup_timer.read
puts "Runtime for #{processes} processes and #{cycles} cycles: %3.6fs"
% run_timer.read

···

-----------------------------------------------------------------------
Robert
--
I'm an atheist and that's it. I believe there's nothing we can know
except that we should be kind to each other and do what we can for
other people.
-- Katharine Hepburn

Hey guys... here's my attempt. I'm a bit of n00b, so feedback
welcome!

I attacked this with DRb (distributed ruby), since it hadn't been
touched yet. It meant taking a serious hit in performance, but it was
at least interesting. It's threaded, but could be forked (haven't
tried that just yet).

Thanks!

---BEGIN SOLUTION---
# Num. 135
# process_rings.rb
require 'drb'

BasePort = 7654

class RingParent
  def initialize(processes = 3, cycles = 5)
    @processes = processes
    @cycles = cycles
    @message = "Message from parent\n"
  end

  def start
    spawn_processes
    connect_ring
    send_messages
  end

  def spawn_processes
    t =
    for i in 0...@processes-1
      t << Thread.new do
        RingMember.new(BasePort+i, BasePort+i+1, self)
      end
    end
    t << Thread.new do
      RingMember.new(BasePort+@processes-1, BasePort)
    end
  end

  def connect_ring
    DRb.start_service
    @ring = DRbObject.new(nil, "druby://127.0.0.1:#{BasePort}")
  end

  def send_messages
    @start = Time.now
    @cycles.times do
      @ring.parent_receive("Hi ring!")
    end
  end

  def return_message(message)
    puts "Parent: Got message back- circulation time: #{Time.now -
@start}"
  end
end

class RingMember
  def initialize(port, next_port, parent = nil)
    @port = port
    @parent = parent
    @current_message = ""
    @next_member = connect_next(next_port)
    DRBService.new(self, @port)
  end

  def connect_next(port)
    DRb.start_service
    DRbObject.new(nil, "druby://127.0.0.1:#{port}")
  end

  def parent_receive(message)
    @current_message = message
    forward_message(@current_message)
  end

  def receive_message(message)
    begin
      message == @current_message ?
        (@parent.return_message(message);(@current_message = "")) :
        forward_message(message)
    rescue
      puts "#{@port}: Received duplicate message, couldn't talk to
parent: #{$!}"
    end
  end

  def forward_message(message)
    @next_member.receive_message(message)
  end

  def test(message)
    return "#{@port}: Got message #{message}"
  end
end

class DRBService
  def initialize(process, port)
    DRb.start_service("druby://:#{port}", process)
    DRb.thread.join
  end
end

processes = ARGV[0].to_i
cycles = ARGV[1].to_i
parent = RingParent.new(processes, cycles)

···

On Aug 22, 3:00 pm, "Adam Shelly" <adam.she...@gmail.com> wrote:

On 8/22/07, James Edward Gray II <ja...@grayproductions.net> wrote:> On Aug 22, 2007, at 1:45 PM, Adam Shelly wrote:
> > I've been thinking about virtual machines recently, so I decided to
> > implement one in Ruby for this quiz.

> That gets my vote for the craziest Ruby Quiz solution ever. Wow!

> You have no idea how much I regret that I wrote the quiz summary
> earlier today. :frowning:

You have no idea how much I regret that I had a bunch of real work to
do and didn't have time to finish it up sooner. :slight_smile:

I also wish I had time and a valid reason to keep working on extending
the VM. The first thing I'd do is find a good name for the
language...

-Adam