This week, in stumping Ross for a couple of hours...
At Snap Tech, we've been breaking up some of our large-file-parsing components into smaller units (for testability, reusability, and to be able to join them up nicely with simple Unix pipes). This means they've been getting some unit-test loving as well. One component does some lengthy tasks over a small thread pool. Here's the typical threading + queues Python example:
import queue import threading q = queue.Queue() threads =  num_threads = 5 def worker(): while True: item = q.get() if item is None: break do_some_lengthy_task(item) q.task_done() # Start some threads for i in range(0, num_threads): thread = threading.Thread(target=worker) thread.start() threads.append(thread) # Plonk items into the queue for j in range(0, num_threads * 10): q.put(j) q.join() # Block until all threads are waiting on an empty queue # Signal all threads to break and join the main thread for i in range(0, num_threads): q.put(None) for t in threads: t.join()
This works fine, until we attempted to run it under pytest. The only hint I could get out was that my KeyboardInterrupt happened at the
It turned out that the
if item is None: break bit also needs to signal that the task is done, because
q.join() essentially blocks until all tasks have been acknowledged. Internally,
q.put() increments a counter and
q.task_done() decrements it.
if item is None: q.task_done() break
Still not entirely sure as to why this only seems to happen when running under pytest though...