Multi-threading lesson wanted


(Tim Bates) #1

Hi all,
I’ve not done a lot of multi-threaded programming before, and I now find
myself in the middle of writing a multi-threaded application. I’d
appreciate someone who knows more about thread-safety than me to do a
review of the code below and tell me where it falls short in that
respect (as I’m sure it does). I can’t immediately see anywhere in the
code which is likely to cause problems and I’ve tried to write it so
that it doesn’t have problems, but I haven’t used Mutex or any other
library traditionally associated with thread-safety so I’m feeling a bit
nervous about it. I learn best by getting my hands dirty, so I’ve given
it a shot and I’d like someone to point out my mistakes.

Some background: the application (known as SAMS) is essentially no more
than a database-backed DRb server. DRb takes care of most of the
threading, by starting a new thread for each connection for me. Rather
than starting a separate database connection for each thread, or trying
to share one connection between all threads, I’ve tried to write a
connection pooling class. The other application code generally requests
a connection from the pool immediately before performing its queries and
releases it immediately after, so unless the queries go for quite some
time no one thread will be hanging onto a connection for more than a
second or so.

Below is the code I have written. The idea is that other application
code calls SAMS::Database.get_handle{ |handle| handle.execute… } and
this module takes care of the rest. Also, the application’s shutdown
code will call SAMS::Database.destroy to cleanly disconnect all the handles.

require ‘dbi’

module SAMS
module Database
# These values will eventually be taken from a configuration file
MAX_FREE_HANDLES = 10
MAX_HANDLES = 20

 @handles = []
 @free_handles = []

 @stopped = false

 class << self
   def new_handle
     # The DB connection parameters will eventually be read from a
     # configuration file somewhere
     h = DBI.connect('DBI:pg:sams', 'samsd', 'foo')
     h['AutoCommit'] = false
     @handles << h
     h
   end

   def destroy_handle
     h = @free_handles.shift
     if h
       h.disconnect
       @handles.delete h
     end
   end

   def allocate_handle
     if @free_handles.empty?
       if @handles.length < MAX_HANDLES
         return new_handle
       else
         while @free_handles.empty?
           sleep(1)
         end
         return allocate_handle
       end
     else
       return @free_handles.shift
     end
   end

   private :new_handle, :destroy_handle, :allocate_handle

   def get_handle
     return nil if @stopped
     begin
       h = allocate_handle
       yield h
     ensure # Make sure the handle gets put back in the list
       @free_handles << h
     end
     while @free_handles.length > MAX_FREE_HANDLES
       destroy_handle
     end
   end

   def destroy
     # Don't allocate any more handles
     @stopped = true
     # Destroy all handles, waiting for them to be released first
     while @handles.length > 0
       if @free_handles.length > 0
         destroy_handle
       else
         # Wait for a handle to be released
         sleep(1)
       end
     end
   end
 end

end
end

···


Tim Bates
tim@bates.id.au


(Joel VanderWerf) #2

Tim Bates wrote:

Hi all,
I’ve not done a lot of multi-threaded programming before, and I now find
myself in the middle of writing a multi-threaded application. I’d

You have some excitement ahead of you!

Here’s one potential problem:

  def allocate_handle
    if @free_handles.empty?

    else

At this point we’ve just decided that @free_handles is not empty.

      return @free_handles.shift

But before the next line happens, another thread executes and steals the
last handle. The return value is nil.

In the empty case, there is a less serious problem:

  def allocate_handle
    if @free_handles.empty?
      if @handles.length < MAX_HANDLES

Two threads can get to this point at the same time. Then they both
create a new handle, even though that might result in MAX_HANDLES+1
handles. So, very gradually, the size of the pool grows.

        return new_handle

By the same token, this code:

    while @free_handles.length > MAX_FREE_HANDLES
      destroy_handle

could cause too many handles to be deleted: it’s possible for N threads
to be scheduled to check the condition, and then after all that checking
is done, each calls destroy_handle. So you end up with too few free
handles left.

There is also a performance problem with code like:

        while @free_handles.empty?
          sleep(1)
        end

Rather than wake up each second and check for an available handle, the
thread would be better off going to sleep indefinitely, and being
wakened when a handle is available. This would save context-switches at
a rate of twice per waiting thread per second. And it would reduce delay
from 0.5sec average to (probably) milliseconds.

On the positive side, this code won’t ever give the same handle to two
threads, because #shift is atomic, as far as ruby threads are concerned.

The construct that would probably apply best here is the Queue in
thread.rb. It would replace @free_handles (just create 20 handles and
put them on the queue, use #pop and #push to wait for and to release
handles). It would solve the performance problem as described (a thread
goes to sleep waiting for a handle, and is woken by another thread when
there is a handle available in the queue). It would also avoid the race
condition inherent in checking empty? and then shifting.

However, it won’t help with the MAX_FREE_HANDLES logic that you’ve
designed. You would probably have to use Thread.critical for that, so
that you can check the size of the queue and be certain that it isn’t
changing while you decide to destroy a handle or not.


(Robert) #3

“Tim Bates” tim@bates.id.au schrieb im Newsbeitrag
news:4025CA70.9090202@bates.id.au

Below is the code I have written. The idea is that other application
code calls SAMS::Database.get_handle{ |handle| handle.execute… } and
this module takes care of the rest. Also, the application’s shutdown
code will call SAMS::Database.destroy to cleanly disconnect all the
handles.

You definitely need some kind of synchronization mechanism that makes
accesses to the shared connection pool thread safe. In your case a
ConditionVariable will help you with the max connection logic.

See section “Logging from multiple threads” for example usage of a
ConditionVariable at
http://www.rubygarden.org/ruby?MultiThreading

I’d implement a Semaphore using a ConditionVariable and a Mutex; that way
you can initialize the semaphore with the max connection value and count
the semaphore down when you take a connection from the pool and increment
it when you put it back. If the semaphore is zero, the next thread trying
to decrease the semaphore is put to sleep and will wake up when another
thread increments the semaphore.

You can as well use the semaphore implementation in the RAA:
http://raa.ruby-lang.org/list.rhtml?name=semaphore

As an additional note: I would not use module methods for the pool
handling. Instead I’d instantiate an instance of the pool. This is IMHO
better since the pool is not necessarily a singleton: Just think of an
application that needs to access more than one database and hence use more
than one connection configuration.

Kind regards

robert

(Charles Comstock) #4

Robert Klemme wrote:

“Tim Bates” tim@bates.id.au schrieb im Newsbeitrag
news:4025CA70.9090202@bates.id.au

Below is the code I have written. The idea is that other application
code calls SAMS::Database.get_handle{ |handle| handle.execute… } and
this module takes care of the rest. Also, the application’s shutdown
code will call SAMS::Database.destroy to cleanly disconnect all the

handles.

You definitely need some kind of synchronization mechanism that makes
accesses to the shared connection pool thread safe. In your case a
ConditionVariable will help you with the max connection logic.

See section “Logging from multiple threads” for example usage of a
ConditionVariable at
http://www.rubygarden.org/ruby?MultiThreading

I’d implement a Semaphore using a ConditionVariable and a Mutex; that way
you can initialize the semaphore with the max connection value and count
the semaphore down when you take a connection from the pool and increment
it when you put it back. If the semaphore is zero, the next thread trying
to decrease the semaphore is put to sleep and will wake up when another
thread increments the semaphore.

You can as well use the semaphore implementation in the RAA:
http://raa.ruby-lang.org/list.rhtml?name=semaphore

As an additional note: I would not use module methods for the pool
handling. Instead I’d instantiate an instance of the pool. This is IMHO
better since the pool is not necessarily a singleton: Just think of an
application that needs to access more than one database and hence use more
than one connection configuration.

Kind regards

robert

Why doesn’t the counting semaphore library come standard with Ruby? I
haven’t done anything multithreaded in a little while, but I remember
being annoyed I didn’t have a counting semaphore. Is there a particular
reason it’s not part of the standard library? Obviously not everything
should come standard but this seems to be the sort of thing that should.

Charles Comstock

(Robert) #5

“Charles Comstock” cc1@cec.wustl.edu schrieb im Newsbeitrag
news:c07jo8$l5f$1@newsreader.wustl.edu

Robert Klemme wrote:

“Tim Bates” tim@bates.id.au schrieb im Newsbeitrag
news:4025CA70.9090202@bates.id.au

Below is the code I have written. The idea is that other application
code calls SAMS::Database.get_handle{ |handle| handle.execute… } and
this module takes care of the rest. Also, the application’s shutdown
code will call SAMS::Database.destroy to cleanly disconnect all the

handles.

You definitely need some kind of synchronization mechanism that makes
accesses to the shared connection pool thread safe. In your case a
ConditionVariable will help you with the max connection logic.

See section “Logging from multiple threads” for example usage of a
ConditionVariable at
http://www.rubygarden.org/ruby?MultiThreading

I’d implement a Semaphore using a ConditionVariable and a Mutex; that
way

you can initialize the semaphore with the max connection value and
count

the semaphore down when you take a connection from the pool and
increment

it when you put it back. If the semaphore is zero, the next thread
trying

to decrease the semaphore is put to sleep and will wake up when
another

thread increments the semaphore.

You can as well use the semaphore implementation in the RAA:
http://raa.ruby-lang.org/list.rhtml?name=semaphore

As an additional note: I would not use module methods for the pool
handling. Instead I’d instantiate an instance of the pool. This is
IMHO

better since the pool is not necessarily a singleton: Just think of an
application that needs to access more than one database and hence use
more

than one connection configuration.

Kind regards

robert

Why doesn’t the counting semaphore library come standard with Ruby?

Dunno.

I
haven’t done anything multithreaded in a little while, but I remember
being annoyed I didn’t have a counting semaphore. Is there a particular
reason it’s not part of the standard library? Obviously not everything
should come standard but this seems to be the sort of thing that should.

Yeah, that’s true. +1 for including Semaphore in “Thread”. Matz?

Regards

robert