5 Comments

Ruby Queue Pop with Timeout, v.2 (Correctly Handling Spurious Wakeups)

In one of my previous posts, I wrote a queue implementation that would let you provide a timeout to the pop method. Unfortunately, it has a bug caused by spurious wakeups. Here is the fix along with an explanation of how you can avoid this sort of problem in the future.

The Problem: Spurious Wakeups

I found out about this bug from a commenter on my previous post. The bug is particularly interesting because the Ruby documentation doesn’t mention anything about the potential for spurious wakeups. In fact, I think they imply that you shouldn’t have to worry about them at all.

However, according to the POSIX threads specification (on which Ruby’s Mutex and ConditionVariable are built), the wait() call is allowed to return prematurely for almost any reason. This is called a spurious wakeup. When the wait method returns, it could be due to one of the following reasons:

  • Your thread was signaled.
  • The wait timed out (if a timeout was provided).
  • There was a spurious wakeup.

In my original code, I wasn’t checking for the spurious wakeup case:

@recieved.wait(@mutex, timeout) if timeout != 0
#if we're still empty after the timeout, raise exception
raise ThreadError, "queue empty" if @queue.empty?

If wait returned for a reason other than a timeout or a thread signal, then my code would erroneously raise a “queue empty” exception. I wasn’t able to hit this condition if I only had one consumer (which is probably why I had never run into it before). But, sure enough, with several consumers, I hit the condition almost immediately.

Here’s the code I used to reproduce the issue:

queue = QueueWithTimeout.new

Thread.abort_on_exception = true

20.times do |i|
  Thread.new do 
    loop do
      queue.pop_with_timeout(100000.0) # a long timeout, that we'll never reach
      puts i
    end
  end
end

loop do
  queue << "random stuff"
  puts "pushed"
end

But why does wait have spurious wakeups at all? The short answer is: performance. For a longer answer, see this Stack Overflow post.

The Fix: Always Wrap wait in a Loop

The intended way to use wait is to always wrap it in a loop, where you check to see if the condition waking the thread is still true, or if it’s timed out. If not, you go back to waiting. Basically, you can almost think of it like a busy loop, except that it’s thread-safe and a lot more efficient with CPU cycles.

It’s a little more tricky with a timeout, because you have to re-compute your timeout time to account for the time elapsed before the spurious wakeup. (This isn’t a problem with the POSIX API because it uses an absolute time specifically to avoid this problem.)

Here is my fixed implementation:

class QueueWithTimeout
  def initialize
    @mutex = Mutex.new
    @queue = []
    @received = ConditionVariable.new
  end
 
  def <<(x)
    @mutex.synchronize do
      @queue << x
      @received.signal
    end
  end
 
  def pop(non_block = false)
    pop_with_timeout(non_block ? 0 : nil)
  end
 
  def pop_with_timeout(timeout = nil)
    @mutex.synchronize do
      if timeout.nil?
        # wait indefinitely until there is an element in the queue
        while @queue.empty?
          @received.wait(@mutex)
        end
      elsif @queue.empty? && timeout != 0
        # wait for element or timeout
        timeout_time = timeout + Time.now.to_f
        while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
          @received.wait(@mutex, remaining_time)
        end
      end
      #if we're still empty after the timeout, raise exception
      raise ThreadError, "queue empty" if @queue.empty?
      @queue.shift
    end
  end
end

My new improved QueueWithTimeout implementation should now work reliably with any number of producers and consumers.