This week, in stumping Ross for a couple of hours...

At Snap Tech, we've been breaking up some of our large-file-parsing components into smaller units (for testability, reusability, and to be able to join them up nicely with simple Unix pipes). This means they've been getting some unit-test loving as well. One component does some lengthy tasks over a small thread pool. Here's the typical threading + queues Python example:

import queue
import threading

q = queue.Queue()
threads = []
num_threads = 5

def worker():
    while True:
        item = q.get()
        if item is None:
            break
            
        do_some_lengthy_task(item)
        
        q.task_done()

# Start some threads
for i in range(0, num_threads):
    thread = threading.Thread(target=worker)
    thread.start()
    threads.append(thread)

# Plonk items into the queue
for j in range(0, num_threads * 10): q.put(j)

q.join() # Block until all threads are waiting on an empty queue

# Signal all threads to break and join the main thread
for i in range(0, num_threads): q.put(None)
for t in threads: t.join()

This works fine, until we attempted to run it under pytest. The only hint I could get out was that my KeyboardInterrupt happened at the q.join() line.

It turned out that the if item is None: break bit also needs to signal that the task is done, because q.join() essentially blocks until all tasks have been acknowledged. Internally, q.put() increments a counter and q.task_done() decrements it.

if item is None:
    q.task_done()
    break

Still not entirely sure as to why this only seems to happen when running under pytest though...