Ruby Queue Pop with Timeout, v.2 (Correctly Handling Spurious Wakeups)

In one of my previous posts, I wrote a queue implementation that would let you provide a timeout to the pop method. Unfortunately, it has a bug caused by spurious wakeups. Here is the fix along with an explanation of how you can avoid this sort of problem in the future.

The Problem: Spurious Wakeups

I found out about this bug from a commenter on my previous post. The bug is particularly interesting because the Ruby documentation doesn’t mention anything about the potential for spurious wakeups. In fact, I think they imply that you shouldn’t have to worry about them at all.

However, according to the POSIX threads specification (on which Ruby’s Mutex and ConditionVariable are built), the wait() call is allowed to return prematurely for almost any reason. This is called a spurious wakeup. When the wait method returns, it could be due to one of the following reasons:

  • Your thread was signaled.
  • The wait timed out (if a timeout was provided).
  • There was a spurious wakeup.

In my original code, I wasn’t checking for the spurious wakeup case:

@recieved.wait(@mutex, timeout) if timeout != 0
#if we're still empty after the timeout, raise exception
raise ThreadError, "queue empty" if @queue.empty?

If wait returned for a reason other than a timeout or a thread signal, then my code would erroneously raise a “queue empty” exception. I wasn’t able to hit this condition if I only had one consumer (which is probably why I had never run into it before). But, sure enough, with several consumers, I hit the condition almost immediately.

Here’s the code I used to reproduce the issue:

queue = QueueWithTimeout.new

Thread.abort_on_exception = true

20.times do |i|
  Thread.new do 
    loop do
      queue.pop_with_timeout(100000.0) # a long timeout, that we'll never reach
      puts i
    end
  end
end

loop do
  queue << "random stuff"
  puts "pushed"
end

But why does wait have spurious wakeups at all? The short answer is: performance. For a longer answer, see this Stack Overflow post.

The Fix: Always Wrap wait in a Loop

The intended way to use wait is to always wrap it in a loop, where you check to see if the condition waking the thread is still true, or if it’s timed out. If not, you go back to waiting. Basically, you can almost think of it like a busy loop, except that it’s thread-safe and a lot more efficient with CPU cycles.

It’s a little more tricky with a timeout, because you have to re-compute your timeout time to account for the time elapsed before the spurious wakeup. (This isn’t a problem with the POSIX API because it uses an absolute time specifically to avoid this problem.)

Here is my fixed implementation:

class QueueWithTimeout
  def initialize
    @mutex = Mutex.new
    @queue = []
    @received = ConditionVariable.new
  end
 
  def <<(x)
    @mutex.synchronize do
      @queue << x
      @received.signal
    end
  end
 
  def pop(non_block = false)
    pop_with_timeout(non_block ? 0 : nil)
  end
 
  def pop_with_timeout(timeout = nil)
    @mutex.synchronize do
      if timeout.nil?
        # wait indefinitely until there is an element in the queue
        while @queue.empty?
          @received.wait(@mutex)
        end
      elsif @queue.empty? && timeout != 0
        # wait for element or timeout
        timeout_time = timeout + Time.now.to_f
        while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
          @received.wait(@mutex, remaining_time)
        end
      end
      #if we're still empty after the timeout, raise exception
      raise ThreadError, "queue empty" if @queue.empty?
      @queue.shift
    end
  end
end

My new improved QueueWithTimeout implementation should now work reliably with any number of producers and consumers.

Conversation
  • Bruno Sutic says:

    Hi Job,
    the claim about condition variable “spurious wakeup” caught my eye so I wanted to reproduce that behavior.

    I took your code and slightly modified it in 2 ways:
    – added thread local variable to identify threads more easily
    – added a couple ‘puts’ statements to see control flow

    Here’s the code with example program output:
    https://pastebin.com/wnib7RKb

    My findings are:

    – I couldn’t reproduce “spurious wakeup” behavior.
    – I was able to find/reproduce a bug related to interesting `ConditionVariable#signal` behavior. Basically it doesn’t transfer execution to the waiting thread immediately!
    – I do agree using `while` loop with a condition variable predicate solves the problem.

    • Job Vranish Job Vranish says:

      Hi Bruno,

      Thanks for the detailed analysis!

      So the issue I was seeing was probably not a spurious wakeup at all, but rather just a consequence of a thread that hadn’t `wait`d yet, pulling an element out of the queue, after the signal, but before any of the sleeping threads have woken up. Basically a classic race condition.

      I think I’m going to stick with that while-loop idiom from now on, it prevents many potential issues. There’s a lot of old-school posix wisdom baked into it :)

  • Ben Thomas says:

    Hey. Thanks for the post ! I’ve been doing the following, is there a reason you think this wouldn’t work ?

    require ‘thread’
    Timeout::timeout(10) {
    item = queue.pop
    }

    • Ben Thomas says:

      OOps.

      require ‘thread’
      queue = Queue.new
      Timeout::timeout(10) {
      item = queue.pop
      }

      • Job Vranish Job Vranish says:

        The main risk I see with this approach is a possible race condition, where the thread running `queue.pop` receives the timeout exception just after successfully pulling an item from the queue, but before it has had a chance to return it. This could cause you to “lose” and item from the queue, where the item was removed, but none of the receivers got it.

  • Comments are closed.