In one of my previous posts, I wrote a queue implementation that would let you provide a timeout to the pop method. Unfortunately, it has a bug caused by spurious wakeups. Here is the fix along with an explanation of how you can avoid this sort of problem in the future.
The Problem: Spurious Wakeups
I found out about this bug from a commenter on my previous post. The bug is particularly interesting because the Ruby documentation doesn’t mention anything about the potential for spurious wakeups. In fact, I think they imply that you shouldn’t have to worry about them at all.
However, according to the POSIX threads specification (on which Ruby’s Mutex and ConditionVariable are built), the
wait() call is allowed to return prematurely for almost any reason. This is called a spurious wakeup. When the
wait method returns, it could be due to one of the following reasons:
- Your thread was signaled.
- The wait timed out (if a timeout was provided).
- There was a spurious wakeup.
In my original code, I wasn’t checking for the spurious wakeup case:
@recieved.wait(@mutex, timeout) if timeout != 0 #if we're still empty after the timeout, raise exception raise ThreadError, "queue empty" if @queue.empty?
wait returned for a reason other than a timeout or a thread signal, then my code would erroneously raise a “queue empty” exception. I wasn’t able to hit this condition if I only had one consumer (which is probably why I had never run into it before). But, sure enough, with several consumers, I hit the condition almost immediately.
Here’s the code I used to reproduce the issue:
queue = QueueWithTimeout.new Thread.abort_on_exception = true 20.times do |i| Thread.new do loop do queue.pop_with_timeout(100000.0) # a long timeout, that we'll never reach puts i end end end loop do queue << "random stuff" puts "pushed" end
But why does
wait have spurious wakeups at all? The short answer is: performance. For a longer answer, see this Stack Overflow post.
The Fix: Always Wrap
wait in a Loop
The intended way to use
wait is to always wrap it in a loop, where you check to see if the condition waking the thread is still true, or if it’s timed out. If not, you go back to waiting. Basically, you can almost think of it like a busy loop, except that it’s thread-safe and a lot more efficient with CPU cycles.
It’s a little more tricky with a timeout, because you have to re-compute your timeout time to account for the time elapsed before the spurious wakeup. (This isn’t a problem with the POSIX API because it uses an absolute time specifically to avoid this problem.)
Here is my fixed implementation:
class QueueWithTimeout def initialize @mutex = Mutex.new @queue =  @received = ConditionVariable.new end def <<(x) @mutex.synchronize do @queue << x @received.signal end end def pop(non_block = false) pop_with_timeout(non_block ? 0 : nil) end def pop_with_timeout(timeout = nil) @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue while @queue.empty? @received.wait(@mutex) end elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0 @received.wait(@mutex, remaining_time) end end #if we're still empty after the timeout, raise exception raise ThreadError, "queue empty" if @queue.empty? @queue.shift end end end
My new improved
QueueWithTimeout implementation should now work reliably with any number of producers and consumers.