Concurrency

Python provides multiple concurrency models. Choose based on your workload: threading for I/O-bound tasks, multiprocessing for CPU-bound tasks.

Threading vs Multiprocessing

Feature Threading Multiprocessing
Memory Shared (same process) Separate (each process)
GIL Limited by GIL Bypasses GIL
CPU Usage Single core Multiple cores
Best For I/O-bound (network, disk) CPU-bound (computation)
Startup Time Fast Slower

Threading Basics

import threading
import time

def task(name, delay):
    print(f"{name} started")
    time.sleep(delay)
    print(f"{name} finished")

# Create threads
t1 = threading.Thread(target=task, args=("Thread-1", 2))
t2 = threading.Thread(target=task, args=("Thread-2", 1))

# Start threads
t1.start()
t2.start()

# Wait for completion
t1.join()
t2.join()

print("All threads done")

Thread Subclass

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__(name=name)
        self.delay = delay
    
    def run(self):
        print(f"Starting {self.name}")
        time.sleep(self.delay)
        print(f"Finished {self.name}")

t = MyThread("MyThread", 1)
t.start()
t.join()

Threading Locks

Synchronize access to shared resources:

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()

print(counter)  # Exactly 500000

# RLock - reentrant lock (same thread can acquire multiple times)
rlock = threading.RLock()
with rlock:
    with rlock:  # Same thread can re-acquire
        print("Nested lock")

Condition Variables

Wait for specific conditions:

import threading
import time

stock = 0
condition = threading.Condition()

def producer():
    global stock
    for i in range(5):
        with condition:
            stock += 1
            print(f"Produced: {stock}")
            condition.notify()  # Notify one waiting thread
        time.sleep(0.1)

def consumer():
    global stock
    for _ in range(5):
        with condition:
            while stock == 0:
                condition.wait()  # Wait for stock
            stock -= 1
            print(f"Consumed: {stock}")

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Thread-safe Queue

Use Queue for producer-consumer patterns:

import threading
import queue

q = queue.Queue()

def producer():
    for i in range(10):
        q.put(i)
    q.put(None)  # Signal end

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Got: {item}")
        q.task_done()

for _ in range(3):
    threading.Thread(target=consumer).start()

producer()

concurrent.futures

Higher-level interface for concurrency:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

# ThreadPoolExecutor - for I/O-bound tasks
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(task, i) for i in range(4)]
    results = [f.result() for f futures]
    print(results)  # [0, 1, 4, 9] - ~1 second total

# ProcessPoolExecutor - for CPU-bound tasks
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(task, range(4)))
    print(results)  # [0, 1, 4, 9]

Callbacks and Futures

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n

def on_complete(future):
    print(f"Result: {future.result()}")

with ThreadPoolExecutor(max_workers=2) as executor:
    future = executor.submit(task, 42)
    future.add_done_callback(on_complete)
    future.result()  # Wait for completion

Multiprocessing Basics

import multiprocessing
import time

def worker(name):
    print(f"Worker {name} starting")
    time.sleep(1)
    print(f"Worker {name} done")

if __name__ == "__main__":
    processes = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

Process Pools

import multiprocessing
import os

def compute(n):
    return n * n, os.getpid()

# Pool of workers
with multiprocessing.Pool(processes=4) as pool:
    results = pool.map(compute, range(10))

print(results)
# [(0, pid1), (1, pid2), (4, pid1), ...]

# Apply async
with multiprocessing.Pool(2) as pool:
    async_result = pool.apply_async(compute, (5,))
    print(async_result.get())

Shared Memory

Share data between processes:

import multiprocessing

# Shared value
value = multiprocessing.Value('d', 0.0)  # 'd' = double
value.value = 3.14

# Shared array
arr = multiprocessing.Array('i', [1, 2, 3])

# Manager - shared objects
manager = multiprocessing.Manager()
shared_dict = manager.dict()
shared_list = manager.list()

def worker(val, arr, d, lst):
    val.value *= 2
    arr[0] = 10
    d["key"] = "value"
    lst.append(42)

if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(value, arr, shared_dict, shared_list))
    p.start()
    p.join()
    print(value.value, arr[:], dict(shared_dict), list(shared_list))

Process Queues

import multiprocessing

def producer(queue):
    for i in range(5):
        queue.put(i)
    queue.put(None)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Got: {item}")

if __name__ == "__main__":
    queue = multiprocessing.Queue()
    multiprocessing.Process(target=producer, args=(queue,)).start()
    multiprocessing.Process(target=consumer, args=(queue,)).start()

The Global Interpreter Lock (GIL)

Python's GIL limits threads to one CPU core for CPU-bound work:

# CPU-bound task - threads won't help due to GIL
import threading
import time

def cpu_bound():
    total = 0
    for i in range(10 ** 7):
        total += i
    return total

# This won't run faster with threads!
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Time: {time.time() - start:.2f}s")

# Use multiprocessing instead for CPU-bound work
from concurrent.futures import ProcessPoolExecutor
start = time.time()
with ProcessPoolExecutor(4) as executor:
    list(executor.map(cpu_bound, range(4)))
print(f"Time: {time.time() - start:.2f}s")

When to Use What

Scenario Recommended Approach
Network requests, HTTP Async/await or ThreadPoolExecutor
File I/O ThreadPoolExecutor
CPU computation ProcessPoolExecutor
Parallel data processing multiprocessing.Pool
Simple background task threading.Thread
Web scraping (many URLs) Async + aiohttp