Distributed testing with Test::Unit and Rinda

Hi all,

I have been working on some code to assist with distributed unit testing
using Test::Unit and Rinda. I thought I'd post it here assuming that
someone else might find it interesting or useful. It's a bit raw, and
I'm still working out some bugs with unclean shutdowns of the test
servers. Also, it's not documented (yet), but it's only about 240 lines
of code.

There are easier ways of doing this, of course, but I had a few
requirements that caused me to write it this way:

1) Distribute tests to the test servers on an individual test method
basis
2) Avoid (as much as possible) having to rewrite any of the Test::Unit
code via method aliasing.

You'll have to run a ring server - see ringserver.rb from Eric Hodel's
site at http://segment7.net/projects/ruby/drb/rinda/ringserver.html.
Also, I did not provide the 'attribute_accessors' file, since that is
just like the one in the rails support package (except that is modified
to be used in a Module instead of Class). The rest of the files are
included inline below. Here is an explanation of what to do with each:

service.rb -

  This file continas definitions for producer/consumer classes for the
distributed test service, which is shared via a tuple space.

distributed.rb -

  This file contains mixins for Test::Unit::TestCase and
Test::Unit::TestSuite that enable them to use the distributed service.

server.rb -

  Run this on every machine that will be given unit tests to run.

tests.rb -

  This is a sample unit test file

test.rb -

  This is a sample master script, which is run as 'ruby test.rb -d
tests.rb.' If you run 'ruby test.rb tests.rb,' the tests are run
locally.

Regards,
Joe Hosteny
jhosteny at gmail dot com

--service.rb--
require 'rinda/ring'
require 'rinda/tuplespace'
require 'rinda/rinda'

def log *args
  $stdout.write "(#{Thread.current}) "
  puts *args
  $stdout.flush
end

module Rinda
  class RingFinger
    # Change this to your local network broadcast netmask
    @@broadcast_list.push("192.168.1.255")
  end
end

module Service
  class Base
    def initialize(name)
      @name = name
      DRb.start_service
      log "Started DRb on URI #{DRb.uri}"
      Rinda::RingFinger.primary
    end

    def consumer?
      respond_to? :consume
    end

    def method_missing(meth, *args)
      ts = Thread.current[:tuplespace][2]
      ts = Rinda::TupleSpaceProxy.new(ts) if consumer?
      ts.send(meth, *args)
    end
  end

  class Producer < Base
    def initialize(name)
      super
      ts = Rinda::TupleSpace.new
      name = "#{@name}:#{DRb.uri}"
      tuple = Rinda::RingProvider.new(@name.to_sym, ts, name).provide
      Thread.current[:tuplespace] =
Rinda::RingFinger.primary.read(tuple)
      trap("EXIT") do
        Rinda::RingFinger.primary.take(Thread.current[:tuplespace])
      end
    end
  end

  class Consumer < Base
    def consume
      tuple = [:name, @name.to_sym, nil, nil]
      Thread.current[:tuplespace] =
Rinda::RingFinger.primary.take(tuple)
      log "Got tuplespace from URI:
#{Thread.current[:tuplespace][2].__drburi}"
      begin
        yield self
      ensure
        Rinda::RingFinger.primary.write(Thread.current[:tuplespace])
      end
    end
  end
end

--distributed.rb--
require 'test/unit'
require 'test/unit/testresult'
require 'attribute_accessors'
require 'service'

module DistributedTestCase
  module ClassMethods
    @@service = nil
    mattr_accessor :service

    @@file = nil
    mattr_accessor :file

    module Run
    end

    def start_client
      @@service = Service::Consumer.new('DistributedTest')
    end
    def start_server
      @@service = Service::Producer.new('DistributedTest')
      loop do
        log "Waiting to take test"
        file, name, meth, oid = *(@@service.take([:test, nil]).last)
        log "Loading #{name}::#{meth} in file #{file}"
        load(file)
        klass = nil
        i = 0
        ObjectSpace.each_object do |obj|
          if (obj.class == Class and obj.to_s == name)
            klass = obj
            break
          end
          i += 1
        end
        log "Checked #{i} objects"
        begin
          test = klass.new(meth)
          log "Running #{name}::#{meth} in file #{file})"
          test.run(Test::Unit::TestResultProxy.new(@@service, oid))
          log "Finished running #{name}::#{meth} in file #{file}"
        rescue => e
          @@service.write([:result, oid, :exception, e])
        end
      end
    end

    def inherited(base)
      caller[0] =~ /(.+?):.*/
      @@file = File.expand_path($1)
    end
  end

  class << self
    def included(base)
      base.extend(ClassMethods)
      base.class_eval do
        alias_method :run_original, :run
        alias_method :run, :run_distributed
      end
    end
  end

  def run_distributed(result)
    if ClassMethods.service.consumer?
      th = Thread.new do
        log "New thread"
        ClassMethods.service.consume do |srv|
          oid = method(method_name).object_id
          log "Dispatching test #{self.class.to_s}::#{method_name}
(#{oid})"
          srv.write [:test, [ClassMethods.file, self.class.to_s,
method_name, oid]]
          log "Waiting for result from
#{self.class.to_s}::#{method_name}"
          loop do
            tuple = [:result, oid, nil, nil]
            tuple = srv.take(tuple)
            args, method = tuple.pop, tuple.pop
            log "Test #{self.class.to_s}::#{method_name} called
#{method}"
            if method == :exception
              raise args.class, "#{args.message}\n\t(remote)
#{args.backtrace.join("\n\t(remote) ")}\n"
            end
            if %W(add_failure add_error).include? method.to_s
              klass = Test::Unit::Error
              klass = Test::Unit::Failure if method.to_s =~ /failure/
              result.send(method, klass.new(*args))
            else
              result.send(method)
            end
            break if method == :add_run
          end
        end
        log "Thread exiting"
      end
      callcc do |cc|
        throw :new_thread, [th, cc]
      end
    else
      run_original(result) do |s,n| end
    end
  end
end

module DistributedTestSuite
  class << self
    def included(base)
      base.class_eval do
        alias_method :run_original, :run
        alias_method :run, :run_distributed
      end
    end
  end

  def run_distributed(result, &block)
    threads = []
    th, cc = *catch(:new_thread) do
      run_original(result, &block)
      nil
    end
    if th
      threads << th
      cc.call
    end
    threads.each { |th| th.join }
  end
end

module Test
  module Unit
    class TestSuite
      include DistributedTestSuite
    end
    class TestCase
      include DistributedTestCase
    end
    class TestResultProxy
      def initialize(server, oid)
        @server = server
        @oid = oid
      end

      def method_missing(name, *args)
        name = name.id2name
        if name =~ /add_(.*)/
          if %W(failure error).include? $1
            args = args[0]
            if $1 =~ /failure/
              args = [args.test_name, args.location, args.message]
            else
              args = [args.test_name, args.exception]
            end
          end
          @server.write([:result, @oid, name.to_sym, args])
        end
      end
    end
  end
end

--server.rb--
#!/bin/env ruby
require 'optparse'
require 'distributed'

Test::Unit::TestCase.start_server

--tests.rb--
require 'test/unit'

class TC_MyTest < Test::Unit::TestCase
  def setup
    puts "in setup"
  end

  def teardown
    puts "in teardown"
  end

  def test_it
    assert(false, 'Assertion was false.')
  end

  def test_pass
    assert(true, 'Assertion was true.')
  end
end

--test.rb--
#!/bin/env ruby
require 'optparse'
require 'distributed'
Test::Unit::TestCase.start_client
require ARGV.shift

···

--
Posted via http://www.ruby-forum.com/.

Hi Joe,
This looks really cool. I will have to see how it can integrate with
TESTify, my test management system. It seems like a cool way to do things,
espically in the corporate or many person open-source project situations
that TESTify is being created for.

···

--
Chris

On 10/9/06, Joe Hosteny iv <jhosteny@gmail.com> wrote:

Hi all,

I have been working on some code to assist with distributed unit testing
using Test::Unit and Rinda. I thought I'd post it here assuming that
someone else might find it interesting or useful. It's a bit raw, and
I'm still working out some bugs with unclean shutdowns of the test
servers. Also, it's not documented (yet), but it's only about 240 lines
of code.

There are easier ways of doing this, of course, but I had a few
requirements that caused me to write it this way:

1) Distribute tests to the test servers on an individual test method
basis
2) Avoid (as much as possible) having to rewrite any of the Test::Unit
code via method aliasing.

You'll have to run a ring server - see ringserver.rb from Eric Hodel's
site at http://segment7.net/projects/ruby/drb/rinda/ringserver.html\.
Also, I did not provide the 'attribute_accessors' file, since that is
just like the one in the rails support package (except that is modified
to be used in a Module instead of Class). The rest of the files are
included inline below. Here is an explanation of what to do with each:

service.rb -

This file continas definitions for producer/consumer classes for the
distributed test service, which is shared via a tuple space.

distributed.rb -

This file contains mixins for Test::Unit::TestCase and
Test::Unit::TestSuite that enable them to use the distributed service.

server.rb -

Run this on every machine that will be given unit tests to run.

tests.rb -

This is a sample unit test file

test.rb -

This is a sample master script, which is run as 'ruby test.rb -d
tests.rb.' If you run 'ruby test.rb tests.rb,' the tests are run
locally.

Regards,
Joe Hosteny
jhosteny at gmail dot com

--service.rb--
require 'rinda/ring'
require 'rinda/tuplespace'
require 'rinda/rinda'

def log *args
$stdout.write "(#{Thread.current}) "
puts *args
$stdout.flush
end

module Rinda
class RingFinger
   # Change this to your local network broadcast netmask
   @@broadcast_list.push("192.168.1.255")
end

module Service
class Base
   def initialize(name)
     @name = name
     DRb.start_service
     log "Started DRb on URI #{DRb.uri}"
     Rinda::RingFinger.primary
   end

   def consumer?
     respond_to? :consume
   end

   def method_missing(meth, *args)
     ts = Thread.current[:tuplespace][2]
     ts = Rinda::TupleSpaceProxy.new(ts) if consumer?
     ts.send(meth, *args)
   end
end

class Producer < Base
   def initialize(name)
     super
     ts = Rinda::TupleSpace.new
     name = "#{@name}:#{DRb.uri}"
     tuple = Rinda::RingProvider.new(@name.to_sym, ts, name).provide
     Thread.current[:tuplespace] =
Rinda::RingFinger.primary.read(tuple)
     trap("EXIT") do
       Rinda::RingFinger.primary.take(Thread.current[:tuplespace])
     end
   end
end

class Consumer < Base
   def consume
     tuple = [:name, @name.to_sym, nil, nil]
     Thread.current[:tuplespace] =
Rinda::RingFinger.primary.take(tuple)
     log "Got tuplespace from URI:
#{Thread.current[:tuplespace][2].__drburi}"
     begin
       yield self
     ensure
       Rinda::RingFinger.primary.write(Thread.current[:tuplespace])
     end
   end
end

--distributed.rb--
require 'test/unit'
require 'test/unit/testresult'
require 'attribute_accessors'
require 'service'

module DistributedTestCase
module ClassMethods
   @@service = nil
   mattr_accessor :service

   @@file = nil
   mattr_accessor :file

   module Run
   end

   def start_client
     @@service = Service::Consumer.new('DistributedTest')
   end
   def start_server
     @@service = Service::Producer.new('DistributedTest')
     loop do
       log "Waiting to take test"
       file, name, meth, oid = *(@@service.take([:test, nil]).last)
       log "Loading #{name}::#{meth} in file #{file}"
       load(file)
       klass = nil
       i = 0
       ObjectSpace.each_object do |obj|
         if (obj.class == Class and obj.to_s == name)
           klass = obj
           break
         end
         i += 1
       end
       log "Checked #{i} objects"
       begin
         test = klass.new(meth)
         log "Running #{name}::#{meth} in file #{file})"
         test.run(Test::Unit::TestResultProxy.new(@@service, oid))
         log "Finished running #{name}::#{meth} in file #{file}"
       rescue => e
         @@service.write([:result, oid, :exception, e])
       end
     end
   end

   def inherited(base)
     caller[0] =~ /(.+?):.*/
     @@file = File.expand_path($1)
   end
end

class << self
   def included(base)
     base.extend(ClassMethods)
     base.class_eval do
       alias_method :run_original, :run
       alias_method :run, :run_distributed
     end
   end
end

def run_distributed(result)
   if ClassMethods.service.consumer?
     th = Thread.new do
       log "New thread"
       ClassMethods.service.consume do |srv|
         oid = method(method_name).object_id
         log "Dispatching test #{self.class.to_s}::#{method_name}
(#{oid})"
         srv.write [:test, [ClassMethods.file, self.class.to_s,
method_name, oid]]
         log "Waiting for result from
#{self.class.to_s}::#{method_name}"
         loop do
           tuple = [:result, oid, nil, nil]
           tuple = srv.take(tuple)
           args, method = tuple.pop, tuple.pop
           log "Test #{self.class.to_s}::#{method_name} called
#{method}"
           if method == :exception
             raise args.class, "#{args.message}\n\t(remote)
#{args.backtrace.join("\n\t(remote) ")}\n"
           end
           if %W(add_failure add_error).include? method.to_s
             klass = Test::Unit::Error
             klass = Test::Unit::Failure if method.to_s =~ /failure/
             result.send(method, klass.new(*args))
           else
             result.send(method)
           end
           break if method == :add_run
         end
       end
       log "Thread exiting"
     end
     callcc do |cc|
       throw :new_thread, [th, cc]
     end
   else
     run_original(result) do |s,n| end
   end
end

module DistributedTestSuite
class << self
   def included(base)
     base.class_eval do
       alias_method :run_original, :run
       alias_method :run, :run_distributed
     end
   end
end

def run_distributed(result, &block)
   threads =
   th, cc = *catch(:new_thread) do
     run_original(result, &block)
     nil
   end
   if th
     threads << th
     cc.call
   end
   threads.each { |th| th.join }
end

module Test
module Unit
   class TestSuite
     include DistributedTestSuite
   end
   class TestCase
     include DistributedTestCase
   end
   class TestResultProxy
     def initialize(server, oid)
       @server = server
       @oid = oid
     end

     def method_missing(name, *args)
       name = name.id2name
       if name =~ /add_(.*)/
         if %W(failure error).include? $1
           args = args[0]
           if $1 =~ /failure/
             args = [args.test_name, args.location, args.message]
           else
             args = [args.test_name, args.exception]
           end
         end
         @server.write([:result, @oid, name.to_sym, args])
       end
     end
   end
end

--server.rb--
#!/bin/env ruby
require 'optparse'
require 'distributed'

Test::Unit::TestCase.start_server

--tests.rb--
require 'test/unit'

class TC_MyTest < Test::Unit::TestCase
def setup
   puts "in setup"
end

def teardown
   puts "in teardown"
end

def test_it
   assert(false, 'Assertion was false.')
end

def test_pass
   assert(true, 'Assertion was true.')
end

--test.rb--
#!/bin/env ruby
require 'optparse'
require 'distributed'
Test::Unit::TestCase.start_client
require ARGV.shift

--
Posted via http://www.ruby-forum.com/\.

--
Chris Carter
concentrationstudios.com
brynmawrcs.com