While writing some ruby scripts to handle asynchronous messages to and from some external hardware I ran into a problem. I wanted to wait on a queue of responses until I either got a response, or a timeout expired. Unfortunately this turned out to be a bit harder than I expected. The two most common answers when I search for “ruby queue pop with timeout” or “ruby queue timeout” are to either use some variant of “run it in a separate thread and toss in an exception when you want to stop” (such as ruby’s Timeout), or to use the non-blocking pop, with something like:
def pop_with_timeout(q, timeout) start_time = Time.now t = start_time loop do begin return q.pop(true) rescue ThreadError => e if t < start_time + timeout sleep 0.01 t = Time.now else raise e end end end end
which actually works, but is ridiculous.
The Usual Solutions Are Terrible
Both of these options are terrible.
The problem with the first option, even assuming that asynchronous exceptions thrown into threads are well behaved (which seems to be in question), is that there is a possible race condition, where the thread receives the timeout exception just after successfully pulling an item from the queue, but before it has had a chance to return it. This could cause you to "lose" and item from the queue, where it was removed, but none of the receivers got it. Not a good option.
The second option — using the non-blocking pop — will actually work, but I have to poll and make a trade-off between latency and how much CPU I want to waste. This is not a trade-off I should have to make in 2014.
Fortunately I don't have to! Ruby's Mutex has a
wait() with a timeout, and it's actually fairly straightforward to use Ruby's ConditionVariables with just a regular array to build a thread safe queue with a pop-with-timeout.
Note: I've updated the following code since my original posting. The original code had a bug in it which is discussed here.
class QueueWithTimeout def initialize @mutex = Mutex.new @queue =  @received = ConditionVariable.new end def <<(x) @mutex.synchronize do @queue << x @received.signal end end def pop(non_block = false) pop_with_timeout(non_block ? 0 : nil) end def pop_with_timeout(timeout = nil) @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue while @queue.empty? @received.wait(@mutex) end elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0 @received.wait(@mutex, remaining_time) end end #if we're still empty after the timeout, raise exception raise ThreadError, "queue empty" if @queue.empty? @queue.shift end end end
I have no idea why Ruby's built-in Queue class doesn't do something similar.