Chapter 9: Threading: Concurrency for I/O-Bound Tasks
As we established in the previous chapter, threading is the ideal tool for managing I/O-bound tasks. Its purpose is not to make your code run faster by using more CPU cores—the Global Interpreter Lock (GIL) prevents that for Python code—but to make your application more responsive. By running I/O operations in separate threads, the main thread can remain unblocked, continuing to execute other tasks or respond to user input.
While the concept is straightforward, writing correct multi-threaded code requires a disciplined approach. The primary challenge lies in managing access to shared resources. When multiple threads attempt to read from and write to the same data simultaneously, you can encounter bizarre and hard-to-debug issues.
This chapter covers:
Using the modern concurrent.futures.ThreadPoolExecutor.
Understanding and identifying race conditions.
Using threading.Lock to protect shared state.
Leveraging the thread-safe queue.Queue for robust data exchange.
Modern Threading: ThreadPoolExecutor
While you can manually instantiate and manage threading.Thread objects, the modern, high-level approach is to use concurrent.futures.ThreadPoolExecutor. This abstraction manages a pool of worker threads for you, simplifying the process of submitting tasks and retrieving their results.
Let's consider a practical example: downloading the content of several web pages.
import concurrent.futures
import requests
import time
URLS = [
'[https://www.python.org/](https://www.python.org/)',
'[https://www.djangoproject.com/](https://www.djangoproject.com/)',
'[https://flask.palletsprojects.com/](https://flask.palletsprojects.com/)',
'[https://fastapi.tiangolo.com/](https://fastapi.tiangolo.com/)',
'[https://numpy.org/](https://numpy.org/)'
]
def download_url(url):
"""Downloads a single URL and returns its content length."""
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
return url, len(response.content)
except requests.RequestException as e:
return url, f"Error: {e}"
start_time = time.time()
# The executor manages thread creation, execution, and shutdown.
# The 'with' statement ensures threads are cleaned up properly.
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# map() applies the function to each item in the iterable and returns results in order.
results = executor.map(download_url, URLS)
for url, length in results:
print(f"{url}: {length} bytes")
print(f"Completed in {time.time() - start_time:.2f} seconds")
If you run this, the total time will be roughly the time of the slowest download, not the sum of all of them. The ThreadPoolExecutor handles running the download_url function for each URL in a separate thread. While one thread is waiting for the network, another can be actively downloading.
The Danger: Race Conditions
A race condition occurs when the behavior of a system depends on the unpredictable timing of concurrent operations. It's a bug that happens when multiple threads access a shared resource (like a variable or object) and at least one of them modifies it.
Consider this classic example of a shared counter:
When you run this code, the final count will almost never be 1000. It will be a different, smaller number each time. Why? The operation self.count = self.count + 1 is not a single, atomic step. It's three steps:
Read the value of self.count.
Add 1 to that value.
Write the new value back to self.count.
A race condition happens when Thread A reads the count (e.g., 5), but before it can write back 6, the OS switches to Thread B. Thread B also reads the count (still 5), adds 1, and writes back 6. Then, the OS switches back to Thread A, which finally writes back its calculated value of 6. Both threads did work, but the counter only increased by one.
Solution 1: threading.Lock
To prevent race conditions, we must ensure that the read-modify-write operation is atomic—that it cannot be interrupted. We can achieve this using a lock. A lock (or mutex) is a synchronization primitive that allows only one thread to enter a specific block of code, known as the critical section, at a time.
By wrapping the critical section in with self._lock:, we guarantee that only one thread can execute those three lines at once, eliminating the race condition.
Solution 2: The queue Module
Explicitly managing locks can be tricky. A more robust and often simpler pattern for concurrent programming is to avoid sharing state directly. Instead, use thread-safe data structures to pass messages between threads.
The queue module provides the Queue class, a thread-safe First-In-First-Out (FIFO) data structure. It handles all the internal locking for you. This is perfect for the producer-consumer pattern, where some threads ("producers") add work to a queue, and others ("consumers") pull work from it.
Summary
threading is a powerful tool for building responsive applications that handle I/O-bound work. While ThreadPoolExecutor provides a convenient high-level interface, it does not absolve you of the responsibility of managing shared state. You must be vigilant in identifying critical sections and protecting them with locks or, preferably, designing your architecture around thread-safe structures like queue.Queue to minimize the need for explicit locking.
import threading
import time
class UnsafeCounter:
def __init__(self):
self.count = 0
def increment(self):
# This operation is not atomic!
current_count = self.count
# Simulate a small delay, giving another thread a chance to run
time.sleep(0.001)
self.count = current_count + 1
counter = UnsafeCounter()
def worker():
for _ in range(100):
counter.increment()
threads = []
for _ in range(10): # 10 threads, each incrementing 100 times
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final count: {counter.count}") # Expected: 1000
class SafeCounter:
def __init__(self):
self.count = 0
self._lock = threading.Lock()
def increment(self):
# The 'with' statement ensures the lock is always released,
# even if an error occurs inside the block.
with self._lock:
# This is now the critical section.
current_count = self.count
time.sleep(0.001)
self.count = current_count + 1
# ... (running this with SafeCounter will now correctly print 1000 every time) ...
import queue
import threading
import time
# A thread-safe queue to hold tasks
work_queue = queue.Queue()
# Producer: adds items to the queue
def producer():
for i in range(10):
print(f"Producer: Adding item {i} to the queue")
work_queue.put(i)
time.sleep(0.5)
# Consumer: processes items from the queue
def consumer():
while True:
try:
# The 'get' call will block until an item is available.
# A timeout prevents it from blocking forever if the queue is empty.
item = work_queue.get(timeout=3)
print(f"Consumer: Processing item {item}")
# Simulate work
time.sleep(1)
# Mark the task as done
work_queue.task_done()
except queue.Empty:
print("Consumer: Queue is empty, exiting.")
break
# Start the threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
# Wait for the producer to finish
producer_thread.join()
# Wait for the queue to be empty
work_queue.join()
print("All work completed.")