Multithreading in Ruby

Hi, I'm relatively new to ruby and threading in general. I'm trying to
get the following code to work. Essentially, the program scrapes data
from a site which has a list of urls on the first page (and more list
pages can be accessed by hitting next) and each url in the list needs to
be followed as well.

So what I would like to do is create 5 concurrent threads: Thread 1
would download the list page as a Mechanize page object and queue it,
hit next and download the next page as a Mechanize page object and queue
it, etc. Thread 2 would take the queue from Thread 1 and start to
extract the required data, i.e. the urls and queue them into a new
queue. Thread 3 would take the queue from Thread 2 and download the page
each url points to and save it as a Mechanize page object and queue it
into another queue. Thread 4 would take the queue from Thread 3 and
extract the necessary data, format it and queue it into yet another
queue. And finally, Thread 5 will take the queue from Thread 4 and write
the data to a file.

At least, that is in theory... So I wrote the following program, however
only the first Thread seems to be queueing and the rest don't work.
Please let me know if I'm missing something.

  rank_pages_queue = Queue.new
  items_queue = Queue.new
  item_pages_queue = Queue.new
  finished_items_queue = Queue.new

  mech_page = get_page(url)
  rank_page_download = Thread.new do
    while mech_page
      rank_pages_queue << mech_page
      mech_page = hit_next(mech_page)
    end
  end

  rank_page_extract = Thread.new do
    while rank_pages_download.alive?
      rank_page = rank_pages_queue.pop
      items_queue << get_rank_name_url(rank_page)
    end
  end

  item_page_download = Thread.new do
    while rank_page_extract.alive?
      items = items_queue.pop
      items.each do |item_arr|
        item_pages_queue << [item_arr, get_page(item_arr[2])]
      end
    end

Thanks in advance. And I'm running Ruby 1.8.7 on Snow Leopard.

···

--
Posted via http://www.ruby-forum.com/.

All threads will be started at the same time, so when this runs for
the first time, rank_pages_queue will still be ampty and the next line
will probably raise an exception - which will silently disappear, as
exceptions always do in threads. When debugging a threading program,
always use Thread.abort_on_exception = true - this will generate the
stack trace and barf every time a thread raises an exception.

···

2011/8/22 Nabs Kahn <nabusman@gmail.com>:

rank_page_extract = Thread.new do
while rank_pages_download.alive?
rank_page = rank_pages_queue.pop
items_queue << get_rank_name_url(rank_page)
end
end

I'm not really sure what your problem is, but I see an issue and
something that I'd do differently. If you are experiencing that the
program ends without anything done, it's because you need to wait for
the threads to complete. You can do that with the join method, or the
value method, if you need a return value from the thread. Also, I
think there's something wrong with your logic of calling alive?. It
can happen that the thread is still alive but it will not enqueue
anything else because it's finished already. If this happens, that
thread will block in the call to queue.pop forever. A better
alternative is to queue a special object which will signal the
consumer that it has to finish. Something like:

require 'thread'

def process
sleep(rand)
end

queue = Queue.new

puts "creating first"
first = Thread.new do
  ["work", "more work", "yet some more", :finish].each do |work|
    puts "first thread is processing..."
    process
    puts "queuing #{work}"
    queue << work
  end
end
puts "creating second"
second = Thread.new do
  while (work = queue.pop) != :finish
    puts "got #{work} to do. processing it..."
    process
  end
end

first.join
second.join
puts "finished"

$ ruby queues.rb
creating first
first thread is processing...
creating second
queuing work
got work to do. processing it...
first thread is processing...
queuing more work
got more work to do. processing it...
first thread is processing...
queuing yet some more
first thread is processing...
queuing finish
got yet some more to do. processing it...
finished

In this little example, I used the symbol :finish to signal that the
other thread should finish and not wait for anything else in the
queue. You can use any other thing you want. For example, if the
consumer can receive any arbitrary object, I've seen people enqueue
the actual queue object to signal the end. In my example a symbol was
enough and a simple solution. Also note how I joined the two threads
at the end, to avoid the main thread exiting the program before the
threads are finished. I also added a little process method that sleeps
a random amount of time, to simulate the actual work.

Hope this helps,

Jesus.

···

On Mon, Aug 22, 2011 at 6:46 PM, Nabs Kahn <nabusman@gmail.com> wrote:

Hi, I'm relatively new to ruby and threading in general. I'm trying to
get the following code to work. Essentially, the program scrapes data
from a site which has a list of urls on the first page (and more list
pages can be accessed by hitting next) and each url in the list needs to
be followed as well.

So what I would like to do is create 5 concurrent threads: Thread 1
would download the list page as a Mechanize page object and queue it,
hit next and download the next page as a Mechanize page object and queue
it, etc. Thread 2 would take the queue from Thread 1 and start to
extract the required data, i.e. the urls and queue them into a new
queue. Thread 3 would take the queue from Thread 2 and download the page
each url points to and save it as a Mechanize page object and queue it
into another queue. Thread 4 would take the queue from Thread 3 and
extract the necessary data, format it and queue it into yet another
queue. And finally, Thread 5 will take the queue from Thread 4 and write
the data to a file.

At least, that is in theory... So I wrote the following program, however
only the first Thread seems to be queueing and the rest don't work.
Please let me know if I'm missing something.

rank_pages_queue = Queue.new
items_queue = Queue.new
item_pages_queue = Queue.new
finished_items_queue = Queue.new

mech_page = get_page(url)
rank_page_download = Thread.new do
while mech_page
rank_pages_queue << mech_page
mech_page = hit_next(mech_page)
end
end

rank_page_extract = Thread.new do
while rank_pages_download.alive?
rank_page = rank_pages_queue.pop
items_queue << get_rank_name_url(rank_page)
end
end

item_page_download = Thread.new do
while rank_page_extract.alive?
items = items_queue.pop
items.each do |item_arr|
item_pages_queue << [item_arr, get_page(item_arr[2])]
end
end

Thanks in advance. And I'm running Ruby 1.8.7 on Snow Leopard.

--
Posted via http://www.ruby-forum.com/\.

Thanks Bartosz and Jesús, I implemented both of your suggestions and got
it working. For anyone else, this is what the end result looks like:

  Thread.abort_on_exception = true
  rank_pages_queue = Queue.new
  items_queue = Queue.new
  item_pages_queue = Queue.new
  finished_items_queue = Queue.new

  mech_page = get_page(url)
  rank_page_download = Thread.new do
    while mech_page != :finish
      rank_pages_queue << mech_page
      mech_page = hit_next(mech_page)
    end
    rank_pages_queue << :finish
  end

  rank_page_extract = Thread.new do
    while (rank_page = rank_pages_queue.pop) != :finish
      items_queue << get_rank_data(rank_page)
    end
    items_queue << :finish
  end

  item_page_download = Thread.new do
    while (items = items_queue.pop) != :finish
      items.each do |item_arr|
        item_pages_queue << [item_arr, get_page(item_arr[3])]
      end
    end
    item_pages_queue << :finish
  end

  item_pages_extract = Thread.new do
    while true
      item_arr, item_page = item_pages_queue.pop
      break if item_arr == :finish
      finished_items_queue << item_arr.concat(get_item_data(item_page))
    end
    finished_items_queue << :finish
  end

  write_to_file = Thread.new do
    while (item_arr = finished_items_queue.pop) != :finish
      writeToTxt(item_arr, item_arr[0] + ' - ' + filename)
    end
  end

  rank_page_download.join
  rank_page_extract.join
  item_page_download.join
  item_pages_extract.join
  write_to_file.join

···

--
Posted via http://www.ruby-forum.com/.

unsubscribe

···

2011/8/22 Jesús Gabriel y Galán <jgabrielygalan@gmail.com>

On Mon, Aug 22, 2011 at 6:46 PM, Nabs Kahn <nabusman@gmail.com> wrote:
> Hi, I'm relatively new to ruby and threading in general. I'm trying to
> get the following code to work. Essentially, the program scrapes data
> from a site which has a list of urls on the first page (and more list
> pages can be accessed by hitting next) and each url in the list needs to
> be followed as well.
>
> So what I would like to do is create 5 concurrent threads: Thread 1
> would download the list page as a Mechanize page object and queue it,
> hit next and download the next page as a Mechanize page object and queue
> it, etc. Thread 2 would take the queue from Thread 1 and start to
> extract the required data, i.e. the urls and queue them into a new
> queue. Thread 3 would take the queue from Thread 2 and download the page
> each url points to and save it as a Mechanize page object and queue it
> into another queue. Thread 4 would take the queue from Thread 3 and
> extract the necessary data, format it and queue it into yet another
> queue. And finally, Thread 5 will take the queue from Thread 4 and write
> the data to a file.
>
> At least, that is in theory... So I wrote the following program, however
> only the first Thread seems to be queueing and the rest don't work.
> Please let me know if I'm missing something.
>
> rank_pages_queue = Queue.new
> items_queue = Queue.new
> item_pages_queue = Queue.new
> finished_items_queue = Queue.new
>
> mech_page = get_page(url)
> rank_page_download = Thread.new do
> while mech_page
> rank_pages_queue << mech_page
> mech_page = hit_next(mech_page)
> end
> end
>
> rank_page_extract = Thread.new do
> while rank_pages_download.alive?
> rank_page = rank_pages_queue.pop
> items_queue << get_rank_name_url(rank_page)
> end
> end
>
> item_page_download = Thread.new do
> while rank_page_extract.alive?
> items = items_queue.pop
> items.each do |item_arr|
> item_pages_queue << [item_arr, get_page(item_arr[2])]
> end
> end
>
> Thanks in advance. And I'm running Ruby 1.8.7 on Snow Leopard.
>
> --
> Posted via http://www.ruby-forum.com/\.
>
>

I'm not really sure what your problem is, but I see an issue and
something that I'd do differently. If you are experiencing that the
program ends without anything done, it's because you need to wait for
the threads to complete. You can do that with the join method, or the
value method, if you need a return value from the thread. Also, I
think there's something wrong with your logic of calling alive?. It
can happen that the thread is still alive but it will not enqueue
anything else because it's finished already. If this happens, that
thread will block in the call to queue.pop forever. A better
alternative is to queue a special object which will signal the
consumer that it has to finish. Something like:

require 'thread'

def process
sleep(rand)
end

queue = Queue.new

puts "creating first"
first = Thread.new do
["work", "more work", "yet some more", :finish].each do |work|
       puts "first thread is processing..."
       process
       puts "queuing #{work}"
       queue << work
end
end
puts "creating second"
second = Thread.new do
while (work = queue.pop) != :finish
   puts "got #{work} to do. processing it..."
   process
end
end

first.join
second.join
puts "finished"

$ ruby queues.rb
creating first
first thread is processing...
creating second
queuing work
got work to do. processing it...
first thread is processing...
queuing more work
got more work to do. processing it...
first thread is processing...
queuing yet some more
first thread is processing...
queuing finish
got yet some more to do. processing it...
finished

In this little example, I used the symbol :finish to signal that the
other thread should finish and not wait for anything else in the
queue. You can use any other thing you want. For example, if the
consumer can receive any arbitrary object, I've seen people enqueue
the actual queue object to signal the end. In my example a symbol was
enough and a simple solution. Also note how I joined the two threads
at the end, to avoid the main thread exiting the program before the
threads are finished. I also added a little process method that sleeps
a random amount of time, to simulate the actual work.

Hope this helps,

Jesus.