TCP Socket read and write

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

Thank you,

Brian

Brian Schröder <ruby@brian-schroeder.de> writes:

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

Thank you,

Brian

First: Yes, they are thread safe. However, I think you still want to
synchronise access to the socket otherwise it is like two people
talking at once: confusion. Then again, this depends on the nature of
the protocol you are using (app-level protocol).

Second: you can use IO.select() to determine the readability (and
writeablity) of a socket. Or you can just use reading and sending
threads. Your choice.

YS.

Not unless you have two threads which are talking on the same TCP socket at
once (which would be unusual). The generic TCP server skeleton I use looks
like this:

···

On Sun, Sep 19, 2004 at 09:24:41PM +0900, Brian Schr?der wrote:

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

-------------------------------------------------------------------------
require 'socket'
module MyServer
  def run
    print "Hello, world!\r\n"
    line = gets
    print "You said #{line}\r\n"
  end
end

port = ARGV[0] || 9876
bind = ARGV[1] || '0.0.0.0'
server = TCPserver.new(bind, port)

while s = server.accept
  s.extend MyServer
  Thread.new(s) do |session|
    begin
      session.run
    rescue Exception => e
      $stderr.puts "Caught exception: #{e}\n\t#{e.backtrace.join("\n\t")}"
    ensure
      session.close
    end
  end
end
-------------------------------------------------------------------------

A new thread is started for each session, and any local variables
instantiated in the 'run' method (like 'line' in this case) are separate for
each thread, so you don't have to worry about them interfering with each
other.

The only thing to beware of is the local variable 's' in the main loop. As
soon as the main loop goes back up to the top and a new connection is
accepted, 's' changes. That's why we pass 's' in as a parameter to
Thread.new, where it is copied into the block-local variable 'session' for
use later when we close the connection.

If you don't like the way I add a 'run' method to TCPSocket's singleton
class, you can always put the code in-line:

...
while s = server.accept
  Thread.new(s) do |session|
    begin
      session.print "Hello, world!\r\n"
      line = session.gets
      session.print "You said #{line}\r\n"
    rescue Exception => e
...

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

I'm not really sure what you mean here. You read the socket by calling
TCPSocket#read or TCPSocket#gets. If you want to check whether there's data
available, then you can call select first - although that only guarantees
there's one byte available, so you'd have to do read(1) to guarantee that
you never blocked.

If you want to stream data out at the same time as streaming data in (rather
than lock-step command-response-command-response), then yes you'd use two
threads, one reading and one writing. As long as one only does 'reads' and
one only does 'writes', then you don't need to mutex them. But you may need
to signal between them so that if one side detects the socket has been
closed, the other terminates as well.

Regards,

Another Brian.

Hello Yohanes,

in the program only one thread at a time is reading, and only one at a
time is writing. I wanted to know if I could read and write to the same
socket from two different threads simultaneously. And I think this is a
subset of the answer, so I take it for "yes".

Thanks for the answer!

regards,

Brian

···

Am Mon, 20 Sep 2004 16:31:53 +0900 schrieb Yohanes Santoso:

Brian Schröder <ruby@brian-schroeder.de> writes:

Hello Group,

I'm writing a simple chat client-server as an introductory example, and
now I'm wondering about some things.

First: Do I have to make TCPSocket#puts TCPSocket#gets thread save by
using mutexes?

Second: Is there a possibility do use push rather than poll for reading
the Socket. (If it is already threadsafe, then there is nothing to do
here, but if not, I'd have to peek the TCPSocket, and send afterwards if
nothing was in the pipe). Otherwise I could just have a reading and a
sending thread.

Thank you,

Brian

First: Yes, they are thread safe. However, I think you still want to
synchronise access to the socket otherwise it is like two people
talking at once: confusion. Then again, this depends on the nature of
the protocol you are using (app-level protocol).

Second: you can use IO.select() to determine the readability (and
writeablity) of a socket. Or you can just use reading and sending
threads. Your choice.

YS.

Hi,

in the program only one thread at a time is reading, and only one at a
time is writing. I wanted to know if I could read and write to the same
socket from two different threads simultaneously. And I think this is a
subset of the answer, so I take it for "yes".

I didn't know if two threads simultaneously accessing the same
socket was legal, (I think I asked about this here a year or
two ago) .. so I'm doing a $sock.dup, the read-thread using one
and the write-thread using the other. I don't know if the
#dup is necessary or beneficial, actually. But for what it's
worth, here's how I'm using it:

http://bwk.homeip.net/ftp/dorkbuster/wallfly/buffered-io.rb

The above is an IO class whose read thread just slurps data
as fast as it can in the background. And whose write thread
writes data out, similarly, when it can. So the code interfacing
with this class gets a non-blocking read & write, with infinite
buffer size. (Up to available RAM of course.) For my purposes
it has been convenient... dunno if it would be useful to anyone
else. If so I could put it on RAA (?)

It's been stress tested with as many variations of reads, writes,
random & non random timing delays as I could think of to devise,
running for days under these stress conditions without a hiccup.
I realize that doesn't prove anything, but I have a good feeling
about its reliability at this point.

For what it's worth :slight_smile:

Regards,

Bill

···

From: "Brian Schröder" <ruby@brian-schroeder.de>

Hi,

From: "Brian Schr?der" <ruby@brian-schroeder.de>

> in the program only one thread at a time is reading, and only one at a
> time is writing. I wanted to know if I could read and write to the same
> socket from two different threads simultaneously. And I think this is a
> subset of the answer, so I take it for "yes".

I didn't know if two threads simultaneously accessing the same
socket was legal

I believe it is, if you think about how Ruby implements threads internally.
It doesn't use any O/S threading at all (it even works under MS-DOS, not
that I've tried it myself :slight_smile:

Rather, the Ruby interpreter chugs along the annotated syntax tree, changes
to another Thread, chugs along another big of syntax tree, and so on. For
threads which are blocked on I/O, it uses select() to determine when they
are ready to be scheduled again.

http://bwk.homeip.net/ftp/dorkbuster/wallfly/buffered-io.rb

The above is an IO class whose read thread just slurps data
as fast as it can in the background. And whose write thread
writes data out, similarly, when it can. So the code interfacing
with this class gets a non-blocking read & write, with infinite
buffer size. (Up to available RAM of course.) For my purposes
it has been convenient... dunno if it would be useful to anyone
else. If so I could put it on RAA (?)

You stuff data down one socket and read it back from the same socket, like a
loopback? If so I think it could be written much more simply; for example it
is superfluous to write

        if select([@sock_rd], nil, nil, nil)
          dat = @sock_rd.recv(65536)

when you can just do

          dat = @sock_rd.recv(65536)

because Ruby handles the select() behind the scenes to prevent one thread
blocking another, as outlined above; recv deschedules the thread until at
least one byte is available.

In fact I think the core could be re-written as something like this:

···

On Tue, Sep 21, 2004 at 02:38:55AM +0900, Bill Kelly wrote:
-----------------------------------------------------------------------
require 'thread'

class BufferedIO
  def initialize(sock)
    @sock = sock
    @queue = Queue.new
    @rd_thread = Thread.new { background_read }
    @wr_thread = Thread.new { background_write }
  end

  def background_read
    while true
      dat = @sock.recv(65536)
      break if dat.nil? or dat.empty?
      @queue.push(dat)
    end
    @queue.push(nil) # EOF indication
  end

  def background_write
    while dat = @queue.pop
      @sock.write(dat)
    end
  end

  def close
    @sock.close
    @rd_thread.join
    @wr_thread.join
  end
end
-----------------------------------------------------------------------

which looks a lot less like C and a lot more like Ruby :slight_smile:

However that doesn't include your 'signal' functionality, nor do I have
access to your timed-wait.rb, so I can't prove it against your Unit tests.

Not sure how useful such a thing would be for RAA though. There is already
the "Queue" class in thread.rb, which is a queue of objects rather than a
queue of bytes, as I've used above. I think that's a more generic and useful
pattern. There is also a SizedQueue which limits the maximum number of
objects it contains.

Queue and SizedQueue are thread-safe, which is why there are no Mutexes in
the code above, although if paranoid you might want one to ensure that
@sock.read, @sock.write and @sock.close are mutually exclusive. (But then,
if @sock.write blocked, that would prevent @sock.read from running, which I
don't think is what you want)

It's been stress tested with as many variations of reads, writes,
random & non random timing delays as I could think of to devise,
running for days under these stress conditions without a hiccup.
I realize that doesn't prove anything, but I have a good feeling
about its reliability at this point.

That's definitely good :slight_smile:

Regards,

Brian.

Hi Brian,

> I didn't know if two threads simultaneously accessing the same
> socket was legal

I believe it is, if you think about how Ruby implements threads internally.
It doesn't use any O/S threading at all (it even works under MS-DOS, not
that I've tried it myself :slight_smile:

Ahh.. That does make sense...

> http://bwk.homeip.net/ftp/dorkbuster/wallfly/buffered-io.rb
>
> The above is an IO class whose read thread just slurps data
> as fast as it can in the background. And whose write thread
> writes data out, similarly, when it can. So the code interfacing
> with this class gets a non-blocking read & write, with infinite
> buffer size. (Up to available RAM of course.) For my purposes
> it has been convenient... dunno if it would be useful to anyone
> else. If so I could put it on RAA (?)

You stuff data down one socket and read it back from the same socket, like a
loopback?

Oh.. hehe... No, it's intended to be connected to a remote
socket. (Or at least, to a socket whose other end is
connected to a different process.)

I suppose in practice it's a convenience mechanism. It lets
my main thread blast an arbitrarily large chunk of data at
the BufferedIO#send_nonblock without having to wait. The main
thread can go about its business, knowing that data will be

And similarly, in the reverse direction, BufferedIO is
continually retrieving any data sent by the remote host
to a buffer, that will be available via BufferedIO#recv_nonblock,
whenever the main thread gets around to checking for it.

If so I think it could be written much more simply; for example it
is superfluous to write

        if select([@sock_rd], nil, nil, nil)
          dat = @sock_rd.recv(65536)

when you can just do

          dat = @sock_rd.recv(65536)

because Ruby handles the select() behind the scenes to prevent one thread
blocking another, as outlined above; recv deschedules the thread until at
least one byte is available.

Good point, thanks !

In fact I think the core could be re-written as something like this:
-----------------------------------------------------------------------
require 'thread'

class BufferedIO
  def initialize(sock)
    @sock = sock
    @queue = Queue.new
    @rd_thread = Thread.new { background_read }
    @wr_thread = Thread.new { background_write }
  end

  def background_read
    while true
      dat = @sock.recv(65536)
      break if dat.nil? or dat.empty?
      @queue.push(dat)
    end
    @queue.push(nil) # EOF indication
  end

  def background_write
    while dat = @queue.pop
      @sock.write(dat)
    end
  end

  def close
    @sock.close
    @rd_thread.join
    @wr_thread.join
  end
end
-----------------------------------------------------------------------

which looks a lot less like C and a lot more like Ruby :slight_smile:

Hmm... I like !

However that doesn't include your 'signal' functionality, nor do I have
access to your timed-wait.rb, so I can't prove it against your Unit tests.

Sorry, timed-wait.rb is there now.

Not sure how useful such a thing would be for RAA though. There is already
the "Queue" class in thread.rb, which is a queue of objects rather than a
queue of bytes, as I've used above. I think that's a more generic and useful
pattern. There is also a SizedQueue which limits the maximum number of
objects it contains.

Queue and SizedQueue are thread-safe, which is why there are no Mutexes in
the code above, although if paranoid you might want one to ensure that
@sock.read, @sock.write and @sock.close are mutually exclusive. (But then,
if @sock.write blocked, that would prevent @sock.read from running, which I
don't think is what you want)

Thanks!

Yes, I guess BufferedIO is somewhat like a Queue (of bytes) for
processes connected between sockets. Ultimately it's an experiment,
I'm interested to see whether it has a simplifying effect on my
"main thread" code, or not. Previously, my application was single-
threaded, and was a typical select() dispatch serving many clients.

I'm sort of injecting BufferedIO into this application to give me
the effect of (as though) arbitrarily large send/recv buffers in
the kernel. So my application is still in an equivalent of its
select() dispatch, only now waiting on this global-signal instead.
And able to toss arbitrarly large results of data at a client
without getting hung up on the transmit.

I don't know if in the end it will be beneficial, or if it
would be better to just rewrite the app to be completely
multi-threaded, and get rid of this main thread dispatch
notion entirely. I have always had a love-hate relationship
with both multithreading, and single-threaded select() dispatch
loops. <grin>

Thanks for your feedback & insights,

Regards,

Bill

···

From: "Brian Candler" <B.Candler@pobox.com>
sent by BufferedIO to the remote host as fast as it can be.

Oh.. hehe... No, it's intended to be connected to a remote
socket. (Or at least, to a socket whose other end is
connected to a different process.)

I suppose in practice it's a convenience mechanism. It lets
my main thread blast an arbitrarily large chunk of data at
the BufferedIO#send_nonblock without having to wait. The main
thread can go about its business, knowing that data will be
sent by BufferedIO to the remote host as fast as it can be.

Oh I see now.

Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.

My code fails your unit tests with some deadlock problem. I don't understand
what timed-wait is doing, so I can't really fix it. I've never found I've
had to use Thread.stop and the like; Ruby's primitives like Mutex, Queue,
SizedQueue, and Timeout always seem to do the job.

I think what I'm saying is: there's a more natural way to do it in Ruby.

I don't know if in the end it will be beneficial, or if it
would be better to just rewrite the app to be completely
multi-threaded, and get rid of this main thread dispatch
notion entirely. I have always had a love-hate relationship
with both multithreading, and single-threaded select() dispatch
loops. <grin>

For me, it's now a love-love relationship. I've seen programs which consist
of pages and pages of C, implementing select() across arrays of
filedescriptors and an array of state machines for each socket; then I've
rewritten it into a couple of screens of Ruby using Threads. The Ruby code
works far better, as it's less buggy :slight_smile:

Regards,

Brian.

buffered-io2.rb (3.32 KB)

···

On Wed, Sep 22, 2004 at 03:50:53AM +0900, Bill Kelly wrote:

Hi Brian,

> Oh.. hehe... No, it's intended to be connected to a remote
> socket. (Or at least, to a socket whose other end is
> connected to a different process.)
>
> I suppose in practice it's a convenience mechanism. It lets
> my main thread blast an arbitrarily large chunk of data at
> the BufferedIO#send_nonblock without having to wait. The main
> thread can go about its business, knowing that data will be
> sent by BufferedIO to the remote host as fast as it can be.

Oh I see now.

Well, I can make my code have an API like yours, but it seems very unnatural
to me. For exmaple, #recv_nonblock always returns "" if no data is available
- but you cannot do a select() on a BufferedIO object, so you have to waste
time polling it.

The data_ready_signal passed into BufferedIO provides a means to
wait rather than poll. My application, which is currently structured
around a select() dispatch, uses global-signal instead, as in effect
a drop-in replacement for select().

So my main loop, instead of using select, like:

    nready = select([@tcp_clients], nil, nil, timeout)

Has a semantically-similar, if you will, timed wait like:

    begin
      @global_signal.timed_wait(timeout)
    rescue Timeout::Error

My code fails your unit tests with some deadlock problem. I don't understand
what timed-wait is doing, so I can't really fix it. I've never found I've
had to use Thread.stop and the like; Ruby's primitives like Mutex, Queue,
SizedQueue, and Timeout always seem to do the job.

I wasn't happy with how complex timed_wait was when I wrote it.
It was the culmination of a week-long learning experience, bug
hunt, and wild goose chase, coming to an understanding of why
ruby's timeout is hazardous to my health. Timeout will interrupt
your thread anywhere, even inside an ensure block. (This makes
it difficult if not impossible to use a timeout block on code
that internally uses ensure to release some critical resource.)
It all started when I tried to do a timeout around a condition
variable wait.

My quest started here:
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/110306

And ended here with the timed_wait you see now:
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/110799

I think what I'm saying is: there's a more natural way to do it in Ruby.

Yes, from the complexity of some of the resultant code, I seem
to be fighting the language to make this drop-in replacement for
select-with-timeout.

It seemed like a simple idea. I had a single-threaded app that
uses select() with a timeout. I thought my life would be simpler
if I could keep that structure, but just increase the buffer
size to infinity on my IO objects behind the scenes. . . . .
However, by this point, I could have long since rewritten the
app to be totally multithreaded. ;-/

However - should you ever need a ConditionVariable#wait with a
timeout--as my app does as it's *currently* structured--I haven't
yet found a simpler way to do it safely than that ugly timed_wait.

> I have always had a love-hate relationship
> with both multithreading, and single-threaded select() dispatch
> loops. <grin>

For me, it's now a love-love relationship. I've seen programs which consist
of pages and pages of C, implementing select() across arrays of
filedescriptors and an array of state machines for each socket; then I've
rewritten it into a couple of screens of Ruby using Threads. The Ruby code
works far better, as it's less buggy :slight_smile:

I'm contemplating going ahead and thorougly restructuring my
app, around ruby threads and primitives like Queue, and see
how it turns out. I will avoid Timeout, however, unless I'm
certain the code within the timeout block makes no use of
ensure or otherwise has no need to manage any critical resources
or maintain class invariants, etc...

Thanks again for your thoughts,

Regards,

Bill

···

From: "Brian Candler" <B.Candler@pobox.com>

On Wed, Sep 22, 2004 at 03:50:53AM +0900, Bill Kelly wrote:

Subject: How safe is 'timeout' ?
Subject: Re: Request For Comments: exception safe ConditionVariable#wait

> Well, I can make my code have an API like yours, but it seems very unnatural
> to me. For exmaple, #recv_nonblock always returns "" if no data is available
> - but you cannot do a select() on a BufferedIO object, so you have to waste
> time polling it.

The data_ready_signal passed into BufferedIO provides a means to
wait rather than poll. My application, which is currently structured
around a select() dispatch, uses global-signal instead, as in effect
a drop-in replacement for select().

So my main loop, instead of using select, like:

    nready = select([@tcp_clients], nil, nil, timeout)

Has a semantically-similar, if you will, timed wait like:

    begin
      @global_signal.timed_wait(timeout)
    rescue Timeout::Error

Ah, I see now.

Perhaps what would be nice here is a version of Queue#pop with a timeout.
Then you don't need two separate objects, one to signal "data ready" and one
to carry the data itself.

Timeout will interrupt
your thread anywhere, even inside an ensure block. (This makes
it difficult if not impossible to use a timeout block on code
that internally uses ensure to release some critical resource.)

Yes, I see the problem. It's hard to see a general-purpose solution to this;
after all, a timeout has no way of knowing whether you're in a short-lived
"ensure" block, or something which has gone wrong and really does need to be
aborted (such as an infinite loop within your "ensure" block)

However, specific solutions should be doable. Here's a first stab at a
Queue#pop_timeout, see if this looks reasonable to you:

require 'thread'
require 'timeout'
class Queue
  def pop_timeout(secs, e=Timeout::Error)
    me = Thread.current
    timedout = false
    timer = Thread.new { sleep(secs); timedout=true; me.wakeup }
    while (Thread.critical = true; @que.empty?)
      raise e if timedout
      @waiting.push Thread.current
      Thread.stop
    end
    @que.shift
  ensure
    timer.kill if timer and timer.alive?
    Thread.critical=false
  end
end

if __FILE__ == $0
  queue = Queue.new
  Thread.new { queue.push("hello"); sleep(1); queue.push("world") }
  while true
    a = queue.pop_timeout(3)
    p a
  end
end

It all started when I tried to do a timeout around a condition
variable wait.

...

However - should you ever need a ConditionVariable#wait with a
timeout--as my app does as it's *currently* structured--I haven't
yet found a simpler way to do it safely than that ugly timed_wait.

Perhaps I've missed something, but I don't see why
ConditionVariable#wait_timeout can't be implemented safely. Unlike the
'timeout' module, you don't need to have the timeout thread itself raise an
exception; it can just set a flag.

This code is untested but is just to sketch a solution:

class ConditionVariable
  def wait_timeout(mutex, timeout=nil, e=Timeout::Error)
    timedout = false
    if timeout
      me = Thread.current
      timer = Thread.new { sleep(timeout); timedout=true; me.wakeup }
    end
    mutex.exclusive_unlock do
      @waiters.push(Thread.current)
      Thread.stop
    end
    raise e if timedout
    mutex.lock
  ensure
    timer.kill if timer and timer.alive?
  end
end

It's basically just ConditionVariable#wait copied from thread.rb, with an
extra "raise e if timedout" at the end. Comments?

Cheers,

Brian.

···

On Thu, Sep 23, 2004 at 04:25:12AM +0900, Bill Kelly wrote: