Suggestions for a distributed job queue

I'm looking at replacing our homebrew job queue with something better, and
I'm wondering what's out there. If there's something that meets our needs
I'd like to use it; otherwise I may end up building our next gen job queue
by assembling various components that exist already. I've seen a couple
dozen solutions for this in the Ruby world, but haven't scrutinized many of
them in depth and am unsure if there are any that fully meet our needs.

We have many different kinds of jobs, however the main ones I'm worried
about are rather CPU intensive, take a long time to execute, and operate on
large amounts of input and output data.

- Distributed: we certainly have too many jobs to run on a single computer.
We need to distribute them across a number of workers. Beyond that we'd
like to automatically provision more workers when the backlog of jobs gets
too large, and shut down workers if too many of them are idle.

- Fault Tolerant: as with any distributed system, fault tolerance is
important. An evil monkey should be able to futz with any part of the
system, crashing computers willy nilly, and it should continue to Just
Work. If a job breaks down and is somehow lost anywhere along the way, the
system should eventually detect this and retry the job. From a design
perspective, I would like to see as much of the system as stateless as
possible. The agents executing the jobs and message queues should be
stateless. Ideally the entirety of the state of the system is kept in the
database, and that's the only part of the system we need to ensure
recovering state from after a crash. If a worker or message queue goes down
we shouldn't need to worry about recovering jobs-in-flight, they should
simply be retried.

- Idempotent: going along with a stateless approach to fault tolerance, if
the system does misdetect a failed job and ends up executing it twice, the
system should detect this and discard the redundant results. Having the
same job accidentally complete multiple times should not screw up the state
of the system.

- Support for Temporary Failures: sometimes a job fails in such a way that
it should be retried (i.e. external resources needed to complete a job are
temporarily unavailable). The system should support retrying these jobs in
a sensible way, and it'd be nice to specify on a job-by-job basis how many
attempts should be made before the system should give up and consider it a
permanent failure.

- Support for Live Upgrades: we'd like to be able to add new types of jobs
to the system . So along with this, agents should know what types of jobs
they support, and not request unsupported jobs from the message queue.

I suppose some of my comments are presupposing the architecture: a list of
jobs stored in the database, command and control processes which load jobs
into and read results from the message queues, and agents which pull jobs
from and report jobs to the message queues, and the message queues
themselves. I'd be open to other designs, so long as they have the above
properties.

Given that, is there an existing job queue that meets my needs that I should
be checking out?

···

--
Tony Arcieri
Medioh! A Kudelski Brand

I don't think this exactly meets everything you are after, but it gets close in a lot of ways:

  http://github.com/blog/542-introducing-resque

James Edward Gray II

···

On Dec 22, 2009, at 1:23 PM, Tony Arcieri wrote:

Given that, is there an existing job queue that meets my needs that I should be checking out?

I don't know off the top of my head if it does _all_ of that, but
I'd look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/

···

-----Original Message-----
From: bascule@gmail.com [mailto:bascule@gmail.com] On Behalf Of Tony
Arcieri

I'm looking at replacing our homebrew job queue with something better,
and
I'm wondering what's out there. If there's something that meets our
needs
I'd like to use it; otherwise I may end up building our next gen job
queue
by assembling various components that exist already. I've seen a
couple
dozen solutions for this in the Ruby world, but haven't scrutinized
many of
them in depth and am unsure if there are any that fully meet our needs.

We have many different kinds of jobs, however the main ones I'm worried
about are rather CPU intensive, take a long time to execute, and
operate on
large amounts of input and output data.

- Distributed: we certainly have too many jobs to run on a single
computer.
We need to distribute them across a number of workers. Beyond that
we'd
like to automatically provision more workers when the backlog of jobs
gets
too large, and shut down workers if too many of them are idle.

- Fault Tolerant: as with any distributed system, fault tolerance is
important. An evil monkey should be able to futz with any part of the
system, crashing computers willy nilly, and it should continue to Just
Work. If a job breaks down and is somehow lost anywhere along the way,
the
system should eventually detect this and retry the job. From a design
perspective, I would like to see as much of the system as stateless as
possible. The agents executing the jobs and message queues should be
stateless. Ideally the entirety of the state of the system is kept in
the
database, and that's the only part of the system we need to ensure
recovering state from after a crash. If a worker or message queue goes
down
we shouldn't need to worry about recovering jobs-in-flight, they should
simply be retried.

- Idempotent: going along with a stateless approach to fault tolerance,
if
the system does misdetect a failed job and ends up executing it twice,
the
system should detect this and discard the redundant results. Having
the
same job accidentally complete multiple times should not screw up the
state
of the system.

- Support for Temporary Failures: sometimes a job fails in such a way
that
it should be retried (i.e. external resources needed to complete a job
are
temporarily unavailable). The system should support retrying these
jobs in
a sensible way, and it'd be nice to specify on a job-by-job basis how
many
attempts should be made before the system should give up and consider
it a
permanent failure.

- Support for Live Upgrades: we'd like to be able to add new types of
jobs
to the system . So along with this, agents should know what types of
jobs
they support, and not request unsupported jobs from the message queue.

I suppose some of my comments are presupposing the architecture: a list
of
jobs stored in the database, command and control processes which load
jobs
into and read results from the message queues, and agents which pull
jobs
from and report jobs to the message queues, and the message queues
themselves. I'd be open to other designs, so long as they have the
above
properties.

Given that, is there an existing job queue that meets my needs that I
should
be checking out?

Hi,

I am using an AMQP compliant queue for this. With its permanent queuing and routing mechanisms it can be made to meet many if not all of your requirements, I believe.

As job items I am using thrift RPC method calls, which is very convenient on both sides (server, client). The library that allows you to do this is here: http://github.com/kschiess/thrift_amqp_transport (currently being redesigned).

There are many solutions in this space. Most recently, people have been using resque (github I believe) and AMQP-Queues. Other solutions can be found using the database or the filesystem, as you already know.

my 2 cents
kaspar

I'll +1 resque. It's a pretty fantastic system.

Jason

···

On Tue, Dec 22, 2009 at 2:35 PM, James Edward Gray II <james@graysoftinc.com > wrote:

On Dec 22, 2009, at 1:23 PM, Tony Arcieri wrote:

> Given that, is there an existing job queue that meets my needs that I
should be checking out?

I don't think this exactly meets everything you are after, but it gets
close in a lot of ways:

Introducing Resque - The GitHub Blog

James Edward Gray II

Resque looks very interesting. Thanks for the heads up.

···

On Tue, Dec 22, 2009 at 12:35 PM, James Edward Gray II < james@graysoftinc.com> wrote:

I don't think this exactly meets everything you are after, but it gets
close in a lot of ways:

Introducing Resque - The GitHub Blog

--
Tony Arcieri
Medioh! A Kudelski Brand

I've looked at RubyQueue in the past and it is rather interesting, however
it as a number of issues which would prevent us from using it.

For starters, it uses NFS as the distribution protocol, and using NFS isn't
really practical in our environment.

···

On Tue, Dec 22, 2009 at 12:40 PM, Walton Hoops <walton@vyper.hopto.org>wrote:

I don't know off the top of my head if it does _all_ of that, but
I'd look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/

--
Tony Arcieri
Medioh! A Kudelski Brand

Not that I doubt it, but I'm curious what the limitation is. Is it
scalability?

···

On Tuesday 22 December 2009 03:39:15 pm Tony Arcieri wrote:

For starters, it uses NFS as the distribution protocol, and using NFS isn't
really practical in our environment.

A couple of other options:

(1) AMQP, e.g. rabbitmq. I believe it comes with ruby bindings. Can be
made as fault-tolerant as you like :slight_smile:

(2) Depending on your needs, you could consider rolling your own with
DRb. This means at least you know the system inside-out and can easily
customise it - although avoiding the queue server itself being a SPOF is
awkward.

Here are a couple of working proofs-of-concept.

In-RAM queue

···

============
---- server ----
require 'drb'
require 'thread'
q = Queue.new # or SizedQueue.new(1000)
DRb.start_service("druby://127.0.0.1:9911", q)
DRb.thread.join

---- client ----
require 'drb'
DRb.start_service
q = DRbObject.new(nil, "druby://localhost:9911")
q.push "abc"
puts q.pop

On-disk queue using Madeleine

---- server ----
require 'rubygems'
require 'madeleine'

class MadQueue
  def initialize(madeleine)
    @madeleine = madeleine
  end

  # Read operations don't need to go via command objects (if you don't
  # care about synchronization)
  def length
    @madeleine.system.length
  end

  class Pusher
    def initialize(data)
      @data = data
    end
    def execute(system)
      system.push(@data)
    end
  end

  class Popper
    def execute(system)
      system.shift
    end
  end

  def push(data)
    @madeleine.execute_command(Pusher.new(data))
  end

  def pop
    @madeleine.execute_command(Popper.new)
  end
end

require 'drb'
madeleine = SnapshotMadeleine.new("madqueue.dir") { [] }

Thread.new(madeleine) {
  puts "Taking snapshot every 30 seconds."
  while true
    sleep(30)
    madeleine.take_snapshot
  end
}

DRb.start_service("druby://127.0.0.1:9911", MadQueue.new(madeleine))
DRb.thread.join

---- client ----
Same as above
--
Posted via http://www.ruby-forum.com/.

More like security. This is running partially in our datacenter and
partially on EC2. While I'm sure it "can be done", it really doesn't seem
like the ideal solution.

···

On Wed, Dec 23, 2009 at 12:10 AM, David Masover <ninja@slaphack.com> wrote:

On Tuesday 22 December 2009 03:39:15 pm Tony Arcieri wrote:
> For starters, it uses NFS as the distribution protocol, and using NFS
isn't
> really practical in our environment.

Not that I doubt it, but I'm curious what the limitation is. Is it
scalability?

--
Tony Arcieri
Medioh! A Kudelski Brand

Worth mentioning: EC2's internal IPs can be pretty much completely firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

But you're right, probably not the ideal solution.

···

On Wednesday 23 December 2009 11:04:18 am Tony Arcieri wrote:

On Wed, Dec 23, 2009 at 12:10 AM, David Masover <ninja@slaphack.com> wrote:
> On Tuesday 22 December 2009 03:39:15 pm Tony Arcieri wrote:
> > For starters, it uses NFS as the distribution protocol, and using NFS
>
> isn't
>
> > really practical in our environment.
>
> Not that I doubt it, but I'm curious what the limitation is. Is it
> scalability?

More like security. This is running partially in our datacenter and
partially on EC2. While I'm sure it "can be done", it really doesn't seem
like the ideal solution.

Then what happens when the VPN goes down and the leaky abstraction that is
NFS's synchronous API grinds your message queue to a halt?

···

On Wed, Dec 23, 2009 at 11:57 AM, David Masover <ninja@slaphack.com> wrote:

Worth mentioning: EC2's internal IPs can be pretty much completely
firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

--
Tony Arcieri
Medioh! A Kudelski Brand

Same thing as when you have an EC2 outage: You save state, and resume processing ASAP.

···

On 24.12.2009 03:08, Tony Arcieri wrote:

Then what happens when the VPN goes down and the leaky abstraction that is
NFS's synchronous API grinds your message queue to a halt?

--
Phillip Gawlowski

Oof. Well for one, due to the nature of the synchronous filesystem API,
it's hard for processes in userspace to detect when things are amiss in the
underlying NFS layers.

Also, if you read my OP, saving state (aside from the state of "what jobs
have not been run yet") and recovering jobs in flight is something I want to
avoid. If the system fails I'd rather it simply fail and restore it to a
clean state. That way, you can have only one stateful part of the system,
and that's the only part you need to worry about recovering state from after
a failure.

···

On Wed, Dec 23, 2009 at 7:17 PM, Phillip Gawlowski <pg@thimian.com> wrote:

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

--
Tony Arcieri
Medioh! A Kudelski Brand

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

Oof. Well for one, due to the nature of the synchronous filesystem API,
it's hard for processes in userspace to detect when things are amiss in the
underlying NFS layers.

And implementing some sort of keep-alive/heartbeat system is too much work, as well.

Also, if you read my OP, saving state (aside from the state of "what jobs
have not been run yet") and recovering jobs in flight is something I want to
avoid.

Well, I didn't *mean* that you implement a whole synchronization framework (what it comes down to). Alas, I implied it.

If the system fails I'd rather it simply fail and restore it to a
clean state. That way, you can have only one stateful part of the system,
and that's the only part you need to worry about recovering state from after
a failure.

And this would probably be best done on "your" end of the network, too. That way you could ignore the EC2 nodes for the time being, in case of some form of network outage (assuming I understood you correctly, in that only part of your nodes are in Amazon's cloud).

Not having had a look at RQueue's implemtation details, maybe you could, without too much effort, port to the Devil From Redmond's SMB system, via Samba.

Or roll your own, as has been suggested, with DRb. :slight_smile:

···

On 24.12.2009 03:36, Tony Arcieri wrote:

On Wed, Dec 23, 2009 at 7:17 PM, Phillip Gawlowski<pg@thimian.com> wrote:

--
Phillip Gawlowski
Wishing everyone a merry Christmas & happy holidays. :smiley: