Event based model - best way to implement?

I am currently using perl's POE to communicate with the asterisk manager
interface, but I would like a ruby implementation. A server will connect to
asterisk and read events as they become available. The webserver will
connnect periodically to retrieve any available events and to optionally
send more requests.

My first thought was to use a simple select loop in the server combined with
observers, and have the webserver communicate via drb. I could use http for
the server also to make it easy for ajax calls to communicate directly to
the server (proxied via apache).

The asterisk manager interface sends events to the connected client (my
server) pretty much ad hoc whenever asterisk has something to report. In
other words requests and responses are not necessarily sequential.

Does a simple select loop sound like the way to go here?

Chris

snacktime wrote:

I am currently using perl's POE to communicate with the asterisk
manager interface, but I would like a ruby implementation.

Concluding from a quick glance at POE all it does is already built into
Ruby (Threads, Queue, synchronization).

A server
will connect to asterisk and read events as they become available.
The webserver will connnect periodically to retrieve any available
events and to optionally send more requests.

My first thought was to use a simple select loop in the server
combined with observers, and have the webserver communicate via drb.
I could use http for the server also to make it easy for ajax calls
to communicate directly to the server (proxied via apache).

The asterisk manager interface sends events to the connected client
(my server) pretty much ad hoc whenever asterisk has something to
report. In other words requests and responses are not necessarily
sequential.

Does a simple select loop sound like the way to go here?

You architecture is not fully clear to me, but given the small about of
sources and sinks I'd use a thread per source and either do the processing
directly in that thread or push tasks down a queue. IMHO this is far
easier in this case than writing a select loop that does the multiplexing.
Does that sound reasonable to you?

Kind regards

    robert

are you on windows?

-a

···

On Tue, 11 Oct 2005, snacktime wrote:

I am currently using perl's POE to communicate with the asterisk manager
interface, but I would like a ruby implementation. A server will connect to
asterisk and read events as they become available. The webserver will
connnect periodically to retrieve any available events and to optionally
send more requests.

My first thought was to use a simple select loop in the server combined with
observers, and have the webserver communicate via drb. I could use http for
the server also to make it easy for ajax calls to communicate directly to
the server (proxied via apache).

The asterisk manager interface sends events to the connected client (my
server) pretty much ad hoc whenever asterisk has something to report. In
other words requests and responses are not necessarily sequential.

Does a simple select loop sound like the way to go here?

--

email :: ara [dot] t [dot] howard [at] noaa [dot] gov
phone :: 303.497.6469
Your life dwells amoung the causes of death
Like a lamp standing in a strong breeze. --Nagarjuna

===============================================================================

Have you considered using the reactor pattern?

http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf
http://en.wikipedia.org/wiki/Reactor_Pattern

A couple of ruby implementations:

http://www.deveiate.org/projects/IO-Reactor/
http://rubystuff.org/treasures/RubyTreasures-0.5/lib/reactor/

Paul

Just some general observations that may help....

   * Threads, select, poll, reactor, state machines are all isomorphic. ie.
     You can translate any solution written using any of those mechanisms
     into any other. The trick is to choose which ever is easiest to code
     for your particular problem.

   * Threads in Ruby are implemented as a stonking great big select deep in
     the C implementation of Ruby. ie. No matter what you do, at the OS
     level you are doing a "select" :slight_smile:

   * On some systems "select" is implemented as a call to "poll" :-))

You may have observed it is _very_ easy to write ruby internal iterators, (the each / yield paradigm), but harder (sometimes lots harder) to write a C++'ish external iterator. (.more?, .at, .next paradigm).

Hint: You can _always_ change any solution involving an external iterator into one involving only internal iterators, a Thread and a queue.

The last biggish chunk of code I wrote doing this sort of thing I found I could delete a _lot_ of code and make everything lucidly straight forward and simple by dropping the external iterator and spinning a Thread communicating via a Queue.

Hint: By making the Queue a Queue of buffers (Arrays), the locking performance hit is vanishingly small.

Hint: By using communicating processes, (ie. pipe & fork instead of Thread.new) lots of obscure Threading race conditions go away.

John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : john.carter@tait.co.nz
New Zealand

Carter's Clarification of Murphy's Law.

"Things only ever go right so that they may go more spectacularly wrong later."

From this principle, all of life and physics may be deduced.

You architecture is not fully clear to me, but given the small about of
sources and sinks I'd use a thread per source and either do the processing
directly in that thread or push tasks down a queue. IMHO this is far
easier in this case than writing a select loop that does the multiplexing.
Does that sound reasonable to you?

It was late and my brain was tired.. My question really involved around the
best way to constantly read from a socket in ruby and at the same time be
able to detect when a client connects with data that needs to be written to
the socket.

After looking around some more it looks like in the thread that holds the
connection to asterisk a simple loop combined with io/wait would work. Any
reason to use io/wait over select in a case like this?

are you on windows?

No, Freebsd.

···

you're options are open then. select works, so does io/wait.

-a

···

On Wed, 12 Oct 2005, snacktime wrote:

are you on windows?

No, Freebsd.

--

email :: ara [dot] t [dot] howard [at] noaa [dot] gov
phone :: 303.497.6469
Your life dwells amoung the causes of death
Like a lamp standing in a strong breeze. --Nagarjuna

===============================================================================

>
>
>
> You architecture is not fully clear to me, but given the small about of
> sources and sinks I'd use a thread per source and either do the processing
> directly in that thread or push tasks down a queue. IMHO this is far
> easier in this case than writing a select loop that does the multiplexing.
> Does that sound reasonable to you?

It was late and my brain was tired.. My question really involved around the
best way to constantly read from a socket in ruby and at the same time be
able to detect when a client connects with data that needs to be written to
the socket.

Here's a typical example which uses a reader thread per client

.require 'socket'
.port = (ARGV[0] || 80).to_i
.server = TCPServer.new('localhost', port)
.while (session = server.accept)
. Thread.new(session) do |sess|
. puts "Request: #{sess.gets}"
. sess.print "HTTP/1.1 200/OK\r\nContent-type: text/html\r\n\r\n"
. sess.print "<html><body><h1>#{Time.now}</h1></body></html>\r\n"
. # other stuff
. sess.close
. end
.end

Derived from
http://www.ruby-doc.org/docs/ProgrammingRuby/html/lib_network.html

After looking around some more it looks like in the thread that holds the
connection to asterisk a simple loop combined with io/wait would work. Any
reason to use io/wait over select in a case like this?

As I said, IMHO it's simpler and cleaner. Otherwise you'll be
implementing the multiplexing loop with select in Ruby which is
implemented in the interpreter in C anyway. Using select seems only
appropriate to me if the number of descriptors becomes very large.

Kind regards

robert

···

2005/10/11, snacktime <snacktime@gmail.com>:

You can use gserver for this, too:

  require 'gserver'
  class MyServer < GServer
    def serve(io)
      puts "Request: #{sess.gets}"
      sess.print "HTTP/1.1 200/OK\r\nContent-type: text/html\r\n\r\n"
      sess.print "<html><body><h1>#{Time.now}</h1></body></html>\r\n"
      # other stuff
      sess.close
    end
  end
  port = (ARGV[0] || 80).to_i
  server = MyServer.new(port)
  server.start
  server.join

Paul

···

On Wed, Oct 12, 2005 at 04:26:13AM +0900, Robert Klemme wrote:

Here's a typical example which uses a reader thread per client

.require 'socket'
.port = (ARGV[0] || 80).to_i
.server = TCPServer.new('localhost', port)
.while (session = server.accept)
. Thread.new(session) do |sess|
. puts "Request: #{sess.gets}"
. sess.print "HTTP/1.1 200/OK\r\nContent-type: text/html\r\n\r\n"
. sess.print "<html><body><h1>#{Time.now}</h1></body></html>\r\n"
. # other stuff
. sess.close
. end
.end

I've gotten a ways on this project and now have another hurdle to cross.

Right now I have a server that connects to the asterisk manager interface (a
simple tcp line based protocol) and stays connected, acting as a kind of
proxy for connecting clients instead of making a new connection to asterisk
for each request.

One thread constantly reads events from asterisk. Each event is stuck into a
hash and the hash is pushed onto an array. Using the array like this can
change, it just happens to be what I've been using so far.

Clients connect to the server via drb with a request which is sent to
asterisk. The client then waits until a response is available, or until a
timeout is reached. Each client request is tagged with a unique id when it
is sent to asterisk, and asterisk returns that unique id in the response.

So basically the (abbreviated) code structure is like this, with
some_request_method being the method that is called from the drb client.
What I'm not sure about is how some_request_method will be able to know when
the response is available, or actually what would be the right way to do
this. some_request_method should block until a response is available.

class Asterisk
@events = []
@events.extend(MonitorMixin)
@events_pending = @events.new_cond

def run
reader = Thread.new
do loop
# reading from socket. Each event goes into a hash which is pushed onto
@events.
end
end
end

def writesock(hash)
## synchronized write to socket connected to asterisk
end

def some_request_method(hash)
writesock(hash)
# Wait for a response, which will be a hash
return response
end

end

snacktime wrote:

I've gotten a ways on this project and now have another hurdle to
cross.

Right now I have a server that connects to the asterisk manager
interface (a simple tcp line based protocol) and stays connected,
acting as a kind of proxy for connecting clients instead of making a
new connection to asterisk for each request.

One thread constantly reads events from asterisk. Each event is stuck
into a hash and the hash is pushed onto an array. Using the array
like this can change, it just happens to be what I've been using so
far.

Usually one would use a Queue instead of the array because the Queue is
thread safe. You'll find one shipped in "thread". However, in this case
I'd probably use something hash like.

Clients connect to the server via drb with a request which is sent to
asterisk. The client then waits until a response is available, or
until a timeout is reached. Each client request is tagged with a
unique id when it is sent to asterisk, and asterisk returns that
unique id in the response.

So basically the (abbreviated) code structure is like this, with
some_request_method being the method that is called from the drb
client. What I'm not sure about is how some_request_method will be
able to know when the response is available, or actually what would
be the right way to do this. some_request_method should block until a
response is available.

Yep. You can achieve this by using a ConditionVariable. You can see
sample usage in my self grown sample queue implementation here

I'd encapsulate this in a class. Something along the lines of this

class ResultRepository
  def initialize
    @results = {}
    @cond = ConditionVariable.new
    @mutex = Mutex.new
  end

  def put_result(id, result)
    @mutex.synchronize do
       @results[id]=result
       @cond.signal
    end
  end

  # will block
  def get_result(id)
    @mutex.synchronize do
       @cond.wait(mutex) until @results.contains_key id
       return @results.delete id
    end
  end
end

Kind regards

    robert