Ruby Queue Pop with Timeout

While writing some ruby scripts to handle asynchronous messages to and from some external hardware I ran into a problem. I wanted to wait on a queue of responses until I either got a response, or a timeout expired. Unfortunately this turned out to be a bit harder than I expected. The two most common answers when I search for “ruby queue pop with timeout” or “ruby queue timeout” are to either use some variant of “run it in a separate thread and toss in an exception when you want to stop” (such as ruby’s Timeout), or to use the non-blocking pop, with something like:


def pop_with_timeout(q, timeout)
  start_time = Time.now
  t = start_time
  loop do
    begin
      return q.pop(true) 
    rescue ThreadError => e
      if t < start_time + timeout
        sleep 0.01
        t = Time.now
      else
        raise e
      end
    end
  end
end

which actually works, but is ridiculous.

The Usual Solutions Are Terrible

Both of these options are terrible.

The problem with the first option, even assuming that asynchronous exceptions thrown into threads are well behaved (which seems to be in question), is that there is a possible race condition, where the thread receives the timeout exception just after successfully pulling an item from the queue, but before it has had a chance to return it. This could cause you to “lose” and item from the queue, where it was removed, but none of the receivers got it. Not a good option.

The second option — using the non-blocking pop — will actually work, but I have to poll and make a trade-off between latency and how much CPU I want to waste. This is not a trade-off I should have to make in 2014.

Fortunately I don’t have to! Ruby’s Mutex has a wait() with a timeout, and it’s actually fairly straightforward to use Ruby’s ConditionVariables with just a regular array to build a thread safe queue with a pop-with-timeout.

The Solution

Note: I’ve updated the following code since my original posting. The original code had a bug in it which is discussed here.

class QueueWithTimeout
  def initialize
    @mutex = Mutex.new
    @queue = []
    @received = ConditionVariable.new
  end
 
  def <<(x)
    @mutex.synchronize do
      @queue << x @received.signal end end def pop(non_block = false) pop_with_timeout(non_block ? 0 : nil) end def pop_with_timeout(timeout = nil) @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue while @queue.empty? @received.wait(@mutex) end elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
          @received.wait(@mutex, remaining_time)
        end
      end
      #if we're still empty after the timeout, raise exception
      raise ThreadError, "queue empty" if @queue.empty?
      @queue.shift
    end
  end
end

I have no idea why Ruby’s built-in Queue class doesn’t do something similar.

Conversation
  • Mike Perham says:

    Yeah, it’s a common need.

    My connection_pool gem has an Stack implementation with a timed pop and has been heavily tested under load:

    https://github.com/mperham/connection_pool/blob/master/lib/connection_pool/timed_stack.rb#L71

  • For a while I’ve been wanting to (or hoping someone else would) port all/most/some of java.util.concurrent to ruby. java.util.concurrent are some pretty solid building blocks for concurrency, and probably wouldn’t be too hard to port using the lower level threading primitives ruby does have, which generally match Java’s low-level threading primitives. (with some ‘gotchas’).

  • Jens Peter says:

    When using @queue << to put data on the array, you need to use @queue.shift to take them out again (since @queue is a normal array). The above implementation is a LIFO (stack) not a FIFO (queue).

  • Jens Peter says:

    When using the << operator to add items to the queue, you need to use 'shift' to take them out again, not 'pop', since @queue is a normal Ruby array. The above implementation is a LIFO (stack) not a FIFO (queue).

    • Job Vranish Job Vranish says:

      Doh! Thanks for the catch. It looks like I corrected this in the implementation I’m actually using but it didn’t make it into the blog post. Sorry about that :/

  • Michael says:

    I thought when using Queues I dont have to use mutex. I have iterated Queue over 10k times and didnt run into any issues, or am I missing something?

    • Job Vranish Job Vranish says:

      The issue isn’t that queue’s aren’t thread-safe (they are), it’s that queues don’t have a way to timeout a blocking read if the queue never gets any new input.

  • Dan Sandberg says:

    Received is spelled as “recieved”.

  • What license are you publishing the code above with?

    • Job Vranish Job Vranish says:

      I place it in the public domain.

      But in cases where that is not enough and you need an explicit license, you may use:

      ISC License

      Copyright (c) 2016, Job Vranish

      Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.

      THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

  • Jonas Nicklas says:

    Just ran across this post, and I was about to use it, when I realized something: isn’t this incorrect when encountering spurious signals in the ConditionVariable? If `@recieved` [sic ;)] spuriously signals, an error will be raised, even though the timeout may not actually have elapsed?

    • Job Vranish Job Vranish says:

      I think you are correct. Ruby’s documentation doesn’t mention anything about the potential for spurious wakeups, but it was probably naive of me to assume that ruby’s implementation is anything more than a thin wrapper around the pthreads primitives (where it would certainly be a problem).

      In fact, after looking into this more carefully. It looks like ruby _does_ have code to avoid the spurious wakeups, but they are explicitly not using it, see here:
      https://github.com/ruby/ruby/blob/b15f8473d7bf26a1b46381ba2ac0d89701e364e6/thread_sync.c#L429
      (if that last parameter was a 1, I think this wouldn’t be a problem)

      I’ll see if I can come up with a fix for this (and update the code in the post), but no guarantees as to when :)

      Thanks for pointing this out!

    • Job Vranish Job Vranish says:

      I did some experimentation and was able to cause some spurious wake-ups (and the resulting errors) if I had many consumers. I’ve implemented a fix and updated the code in the post. I’ve also written up another blog post discussing the problem here: https://spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/

  • Comments are closed.