Managing your threads with Queue

Backstage

One of my favourite utilities in the Ruby standard library is the Queue class. Queue provides you with a delightfully simple way to synchronise communication between threads. Typically, Queue is used to manage the work assigned to a thread pool. We'll look at some of the ways it assists us in this endeavour.

Queue has two main methods in its API, push and pop. These two methods do exactly what you'd expect, push adds an item to the queue, and pop retrieves an item. By default, if you call pop in your thread and the queue is empty, it will block until an item becomes available. Multiple threads will be able to pop items off of the queue in a safe manner. Let's have a look in a simple example:

queue = Queue.new
threads = []

# Start 2 threads to pop work off of the queue and print it, then sleep 3 seconds for effect
2.times do
  threads << Thread.new do
    loop do
      queue_item = queue.pop
      puts queue_item.inspect
      sleep 3
    end
  end
end

# Add 5 items to the queue to be processed by the threads above
5.times do |n|
  queue.push "Item #{n}"
end

Running this example you'll see that each thread picks up an item off of the queue, prints it out and then sleeps for 3 seconds before picking up the next item from the queue. Once the queue is empty, they'll block on queue.pop until a new item becomes available for processing.

Queue also comes with a couple of extra handy methods to help you manage the workload for your threads. #length tells you how many items are currently queued, #num_waiting lets you know how many threads are currently blocking on pop, waiting for input and #clear allows you to remove any items from the queue currently waiting to be processed. In particular, you can use the last 2 methods to build a clean shutdown into your script, so that all of your threads exit gracefully once they've finished their current work.

trap "INT" do
  # Remove any queued jobs from the list
  queue.clear

  # Wait until any currently running threads have finished their current work and returned to queue.pop
  while queue.num_waiting < threads.count
    pending_threads = threads.count - queue.num_waiting
    puts "Waiting for #{pending_threads} threads to finish"
    sleep 1
  end

  # Kill off each thread now that they're idle and exit
  threads.each(&:exit)
  Process.exit(0)
end

We utilise the Queue class in a couple of places throughout our apps, we use it in Deploy to send files to multiple servers simultaneously and it's used in some internal scripts where parallel processing is advantageous.

A little bit about the author

My name is Dan. My primary role at aTech is to develop and maintain our hosted applications, such as Codebase and Deploy. I'm often the person who responds to helpdesk tickets where further investigation is needed. My preferences include; beer over lager, dogs over cats, coffee over tea and cars from the 80s.