Concurrency
Python provides multiple concurrency models. Choose based on your workload: threading for I/O-bound tasks, multiprocessing for CPU-bound tasks.
Threading vs Multiprocessing
| Feature | Threading | Multiprocessing |
|---|---|---|
| Memory | Shared (same process) | Separate (each process) |
| GIL | Limited by GIL | Bypasses GIL |
| CPU Usage | Single core | Multiple cores |
| Best For | I/O-bound (network, disk) | CPU-bound (computation) |
| Startup Time | Fast | Slower |
Threading Basics
import threading
import time
def task(name, delay):
print(f"{name} started")
time.sleep(delay)
print(f"{name} finished")
# Create threads
t1 = threading.Thread(target=task, args=("Thread-1", 2))
t2 = threading.Thread(target=task, args=("Thread-2", 1))
# Start threads
t1.start()
t2.start()
# Wait for completion
t1.join()
t2.join()
print("All threads done")
Thread Subclass
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__(name=name)
self.delay = delay
def run(self):
print(f"Starting {self.name}")
time.sleep(self.delay)
print(f"Finished {self.name}")
t = MyThread("MyThread", 1)
t.start()
t.join()
Threading Locks
Synchronize access to shared resources:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # Exactly 500000
# RLock - reentrant lock (same thread can acquire multiple times)
rlock = threading.RLock()
with rlock:
with rlock: # Same thread can re-acquire
print("Nested lock")
Condition Variables
Wait for specific conditions:
import threading
import time
stock = 0
condition = threading.Condition()
def producer():
global stock
for i in range(5):
with condition:
stock += 1
print(f"Produced: {stock}")
condition.notify() # Notify one waiting thread
time.sleep(0.1)
def consumer():
global stock
for _ in range(5):
with condition:
while stock == 0:
condition.wait() # Wait for stock
stock -= 1
print(f"Consumed: {stock}")
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Thread-safe Queue
Use Queue for producer-consumer patterns:
import threading
import queue
q = queue.Queue()
def producer():
for i in range(10):
q.put(i)
q.put(None) # Signal end
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Got: {item}")
q.task_done()
for _ in range(3):
threading.Thread(target=consumer).start()
producer()
concurrent.futures
Higher-level interface for concurrency:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
time.sleep(1)
return n * n
# ThreadPoolExecutor - for I/O-bound tasks
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(4)]
results = [f.result() for f futures]
print(results) # [0, 1, 4, 9] - ~1 second total
# ProcessPoolExecutor - for CPU-bound tasks
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(4)))
print(results) # [0, 1, 4, 9]
Callbacks and Futures
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(1)
return n
def on_complete(future):
print(f"Result: {future.result()}")
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(task, 42)
future.add_done_callback(on_complete)
future.result() # Wait for completion
Multiprocessing Basics
import multiprocessing
import time
def worker(name):
print(f"Worker {name} starting")
time.sleep(1)
print(f"Worker {name} done")
if __name__ == "__main__":
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
Process Pools
import multiprocessing
import os
def compute(n):
return n * n, os.getpid()
# Pool of workers
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(compute, range(10))
print(results)
# [(0, pid1), (1, pid2), (4, pid1), ...]
# Apply async
with multiprocessing.Pool(2) as pool:
async_result = pool.apply_async(compute, (5,))
print(async_result.get())
Process Queues
import multiprocessing
def producer(queue):
for i in range(5):
queue.put(i)
queue.put(None)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Got: {item}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
multiprocessing.Process(target=producer, args=(queue,)).start()
multiprocessing.Process(target=consumer, args=(queue,)).start()
The Global Interpreter Lock (GIL)
Python's GIL limits threads to one CPU core for CPU-bound work:
# CPU-bound task - threads won't help due to GIL
import threading
import time
def cpu_bound():
total = 0
for i in range(10 ** 7):
total += i
return total
# This won't run faster with threads!
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Time: {time.time() - start:.2f}s")
# Use multiprocessing instead for CPU-bound work
from concurrent.futures import ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(4) as executor:
list(executor.map(cpu_bound, range(4)))
print(f"Time: {time.time() - start:.2f}s")
When to Use What
| Scenario | Recommended Approach |
|---|---|
| Network requests, HTTP | Async/await or ThreadPoolExecutor |
| File I/O | ThreadPoolExecutor |
| CPU computation | ProcessPoolExecutor |
| Parallel data processing | multiprocessing.Pool |
| Simple background task | threading.Thread |
| Web scraping (many URLs) | Async + aiohttp |