13 Comments

Ruby Queue Pop with Timeout

While writing some ruby scripts to handle asynchronous messages to and from some external hardware I ran into a problem. I wanted to wait on a queue of responses until I either got a response, or a timeout expired. Unfortunately this turned out to be a bit harder than I expected. The two most common answers when I search for “ruby queue pop with timeout” or “ruby queue timeout” are to either use some variant of “run it in a separate thread and toss in an exception when you want to stop” (such as ruby’s Timeout), or to use the non-blocking pop, with something like:


def pop_with_timeout(q, timeout)
  start_time = Time.now
  t = start_time
  loop do
    begin
      return q.pop(true) 
    rescue ThreadError => e
      if t < start_time + timeout
        sleep 0.01
        t = Time.now
      else
        raise e
      end
    end
  end
end

which actually works, but is ridiculous.

The Usual Solutions Are Terrible

Both of these options are terrible.

The problem with the first option, even assuming that asynchronous exceptions thrown into threads are well behaved (which seems to be in question), is that there is a possible race condition, where the thread receives the timeout exception just after successfully pulling an item from the queue, but before it has had a chance to return it. This could cause you to "lose" and item from the queue, where it was removed, but none of the receivers got it. Not a good option.

The second option — using the non-blocking pop — will actually work, but I have to poll and make a trade-off between latency and how much CPU I want to waste. This is not a trade-off I should have to make in 2014.

Fortunately I don't have to! Ruby's Mutex has a wait() with a timeout, and it's actually fairly straightforward to use Ruby's ConditionVariables with just a regular array to build a thread safe queue with a pop-with-timeout.

The Solution

Note: I've updated the following code since my original posting. The original code had a bug in it which is discussed here.

class QueueWithTimeout
  def initialize
    @mutex = Mutex.new
    @queue = []
    @received = ConditionVariable.new
  end
 
  def <<(x)
    @mutex.synchronize do
      @queue << x
      @received.signal
    end
  end
 
  def pop(non_block = false)
    pop_with_timeout(non_block ? 0 : nil)
  end
 
  def pop_with_timeout(timeout = nil)
    @mutex.synchronize do
      if timeout.nil?
        # wait indefinitely until there is an element in the queue
        while @queue.empty?
          @received.wait(@mutex)
        end
      elsif @queue.empty? && timeout != 0
        # wait for element or timeout
        timeout_time = timeout + Time.now.to_f
        while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
          @received.wait(@mutex, remaining_time)
        end
      end
      #if we're still empty after the timeout, raise exception
      raise ThreadError, "queue empty" if @queue.empty?
      @queue.shift
    end
  end
end

I have no idea why Ruby's built-in Queue class doesn't do something similar.