Chapter 9: Threading: Concurrency for I/O-Bound Tasks

As we established in the previous chapter, threading is the ideal tool for managing I/O-bound tasks. Its purpose is not to make your code run faster by using more CPU cores—the Global Interpreter Lock (GIL) prevents that for Python code—but to make your application more responsive. By running I/O operations in separate threads, the main thread can remain unblocked, continuing to execute other tasks or respond to user input.

While the concept is straightforward, writing correct multi-threaded code requires a disciplined approach. The primary challenge lies in managing access to shared resources. When multiple threads attempt to read from and write to the same data simultaneously, you can encounter bizarre and hard-to-debug issues.

This chapter covers:

  1. Using the modern concurrent.futures.ThreadPoolExecutor.

  2. Understanding and identifying race conditions.

  3. Using threading.Lock to protect shared state.

  4. Leveraging the thread-safe queue.Queue for robust data exchange.

Modern Threading: ThreadPoolExecutor

While you can manually instantiate and manage threading.Thread objects, the modern, high-level approach is to use concurrent.futures.ThreadPoolExecutor. This abstraction manages a pool of worker threads for you, simplifying the process of submitting tasks and retrieving their results.

Let's consider a practical example: downloading the content of several web pages.

import concurrent.futures
import requests
import time

URLS = [
    '[https://www.python.org/](https://www.python.org/)',
    '[https://www.djangoproject.com/](https://www.djangoproject.com/)',
    '[https://flask.palletsprojects.com/](https://flask.palletsprojects.com/)',
    '[https://fastapi.tiangolo.com/](https://fastapi.tiangolo.com/)',
    '[https://numpy.org/](https://numpy.org/)'
]

def download_url(url):
    """Downloads a single URL and returns its content length."""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status() # Raise an exception for bad status codes
        return url, len(response.content)
    except requests.RequestException as e:
        return url, f"Error: {e}"

start_time = time.time()
# The executor manages thread creation, execution, and shutdown.
# The 'with' statement ensures threads are cleaned up properly.
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # map() applies the function to each item in the iterable and returns results in order.
    results = executor.map(download_url, URLS)

    for url, length in results:
        print(f"{url}: {length} bytes")

print(f"Completed in {time.time() - start_time:.2f} seconds")

If you run this, the total time will be roughly the time of the slowest download, not the sum of all of them. The ThreadPoolExecutor handles running the download_url function for each URL in a separate thread. While one thread is waiting for the network, another can be actively downloading.

The Danger: Race Conditions

A race condition occurs when the behavior of a system depends on the unpredictable timing of concurrent operations. It's a bug that happens when multiple threads access a shared resource (like a variable or object) and at least one of them modifies it.

Consider this classic example of a shared counter:

When you run this code, the final count will almost never be 1000. It will be a different, smaller number each time. Why? The operation self.count = self.count + 1 is not a single, atomic step. It's three steps:

  1. Read the value of self.count.

  2. Add 1 to that value.

  3. Write the new value back to self.count.

A race condition happens when Thread A reads the count (e.g., 5), but before it can write back 6, the OS switches to Thread B. Thread B also reads the count (still 5), adds 1, and writes back 6. Then, the OS switches back to Thread A, which finally writes back its calculated value of 6. Both threads did work, but the counter only increased by one.

Solution 1: threading.Lock

To prevent race conditions, we must ensure that the read-modify-write operation is atomic—that it cannot be interrupted. We can achieve this using a lock. A lock (or mutex) is a synchronization primitive that allows only one thread to enter a specific block of code, known as the critical section, at a time.

By wrapping the critical section in with self._lock:, we guarantee that only one thread can execute those three lines at once, eliminating the race condition.

Solution 2: The queue Module

Explicitly managing locks can be tricky. A more robust and often simpler pattern for concurrent programming is to avoid sharing state directly. Instead, use thread-safe data structures to pass messages between threads.

The queue module provides the Queue class, a thread-safe First-In-First-Out (FIFO) data structure. It handles all the internal locking for you. This is perfect for the producer-consumer pattern, where some threads ("producers") add work to a queue, and others ("consumers") pull work from it.

Summary

threading is a powerful tool for building responsive applications that handle I/O-bound work. While ThreadPoolExecutor provides a convenient high-level interface, it does not absolve you of the responsibility of managing shared state. You must be vigilant in identifying critical sections and protecting them with locks or, preferably, designing your architecture around thread-safe structures like queue.Queue to minimize the need for explicit locking.

Last updated