๐ Welcome to Asynchronous Programming!
Master the power of async/await to build high-performance applications that handle thousands of concurrent operations efficiently. Learn how to write non-blocking code, manage concurrent tasks, and build real-time systems that scale. This module will transform how you think about I/O-bound operations and concurrent execution.
9.1 Synchronous vs Asynchronous
- Blocking vs non-blocking operations - what makes code wait?
- When async makes sense: I/O-bound vs CPU-bound tasks
- The problem async solves: waiting efficiently instead of idling
- Real-world analogy: Coffee shop order processing
- Understanding throughput vs latency trade-offs
๐ Key Concept: Blocking vs Non-Blocking
Synchronous (Blocking): Your code waits for each operation to complete before moving to the next. Like standing in line at a single checkout counter.
Asynchronous (Non-Blocking): Your code starts an operation and continues with other work while waiting. Like ordering coffee and sitting down while it's being made.
9.2 Concurrency Models
- Threading: Parallel execution with shared memory (GIL limitations)
- Multiprocessing: True parallelism with separate memory spaces
- Async: Cooperative multitasking on a single thread
- The Global Interpreter Lock (GIL) - why it matters for Python
- Choosing the right concurrency model for your use case
- Combining approaches: when to use threads, processes, and async together
๐ก When to Use What?
- Async: I/O-bound tasks (API calls, database queries, file operations)
- Threading: I/O-bound tasks with libraries that don't support async
- Multiprocessing: CPU-bound tasks (data processing, image manipulation)
9.3 Event Loop Fundamentals
- What is the event loop and how does it work?
- How async/await works under the hood with coroutines
- Cooperative multitasking: yielding control voluntarily
- Understanding the execution model: single-threaded concurrency
- Event loop scheduling and task prioritization
- Common misconceptions about async Python
๐๏ธ Lab Exercise: Compare Sync vs Async API Calls
Task: Compare the performance of synchronous vs asynchronous API calls to understand the efficiency gains.
Requirements:
- Create a synchronous function that fetches data from 10 different URLs sequentially
- Create an asynchronous function that fetches the same URLs concurrently
- Use
time.time()to measure execution time for both approaches - Print timing results and calculate the speedup factor
- Use
httpxoraiohttpfor async requests - Handle errors gracefully in both implementations
Hint: Your solution structure should look like:
fetch_sync(): Userequests.get()in a loopfetch_async(): Useaiohttp.ClientSession()withasyncio.gather()- Test URLs: Use JSONPlaceholder, httpbin.org, or any public API
- Expected result: Async should be 5-10x faster for 10 URLs
- Don't forget:
await session.get(url)andawait response.text()
10.1 Core Syntax
async deffunctions - defining coroutinesawaitkeyword - pausing execution to wait for results- Running async functions with
asyncio.run() - Understanding coroutine objects and their lifecycle
- Syntax rules: when you can and cannot use
await
import asyncioasync def greet(name):
# await only works inside async functions
await asyncio.sleep(1) # Non-blocking sleep
return f"Hello, {name}!"# Run the async function
result = asyncio.run(greet("Python"))
print(result) # Output: Hello, Python!
10.2 Creating Coroutines
- Writing your first async function from scratch
asyncio.sleep()vstime.sleep()- the critical difference- Common mistakes: forgetting
await, using blocking code - Debugging async functions: tools and techniques
- Coroutine states: created, running, suspended, completed
โ ๏ธ Common Pitfall: Blocking the Event Loop
Never use time.sleep() in async functions! It blocks the entire event loop.
โ Bad: time.sleep(1)
โ
Good: await asyncio.sleep(1)
10.3 Awaitable Objects
- Understanding Coroutines, Tasks, and Futures
- When to await and when not to await
- Converting between coroutines and tasks
- The awaitable protocol in Python
- Creating custom awaitable objects
๐๏ธ Lab Exercise: Build Async URL Fetcher
Task: Create async functions that fetch and process data from multiple URLs concurrently.
Requirements:
- Create an
async def fetch_url(url)function usingaiohttp - Create an
async def fetch_all(urls)that fetches multiple URLs concurrently - Return a list of response status codes and content lengths
- Add error handling for failed requests (timeouts, network errors)
- Print progress as each URL completes
- Test with at least 5 different URLs
Hint: Your solution structure should look like:
- Use
async with aiohttp.ClientSession() as session: - In
fetch_url:async with session.get(url) as response: - Use
asyncio.gather(*tasks)to run all fetches concurrently - Wrap in try/except to handle
aiohttp.ClientError - Return tuple:
(url, status, length)or(url, error)
11.1 Task Creation and Execution
asyncio.create_task()- scheduling coroutines for execution- Running multiple tasks concurrently without waiting
asyncio.gather()- collecting results from multiple tasksasyncio.wait()- advanced task coordination- Differences between
gather()andwait() - Task cancellation and cleanup
async def main():
# Create tasks (start running immediately)
task1 = asyncio.create_task(fetch_data("api1"))
task2 = asyncio.create_task(fetch_data("api2"))
task3 = asyncio.create_task(fetch_data("api3"))
# Wait for all tasks to complete
results = await asyncio.gather(task1, task2, task3)
return results
11.2 Task Coordination
- Waiting for first completion:
asyncio.wait_for() - Setting timeouts to prevent hanging operations
asyncio.as_completed()- processing results as they arrive- Cancelling tasks gracefully with
task.cancel() - Handling
CancelledErrorexceptions - Task groups and structured concurrency (Python 3.11+)
๐ gather() vs wait() vs as_completed()
- gather(): Wait for all tasks, return results in order, fail fast on errors
- wait(): More control, get done/pending sets, handle partial completion
- as_completed(): Process results as soon as each task finishes (unordered)
11.3 Error Handling in Async Code
- Try/except blocks in async functions - same as sync but with await
- Handling exceptions from multiple concurrent tasks
return_exceptions=Trueingather()- Graceful shutdown patterns and cleanup
- Timeout handling with
asyncio.wait_for() - Best practices for robust async error handling
๐๏ธ Lab Exercise: Concurrent Web Scraper with Timeouts
Task: Build a web scraper that fetches multiple pages concurrently with proper timeout and error handling.
Requirements:
- Create a list of 10-15 URLs to scrape
- Implement
async def scrape_page(url, timeout=10)with timeout handling - Use
asyncio.gather()withreturn_exceptions=True - Extract and return page titles from successful requests
- Log errors for failed requests (timeouts, 404s, network errors)
- Print summary: total URLs, successful, failed, total time
- Use
asyncio.as_completed()to show progress
Hint: Your solution structure should look like:
- Wrap
session.get()inasyncio.wait_for(coro, timeout=10) - Use
BeautifulSoupto extract<title>tags - Handle
asyncio.TimeoutError,aiohttp.ClientError - Create tasks:
[scrape_page(url) for url in urls] - Use
for coro in asyncio.as_completed(tasks):for progress
12.1 Async Context Managers
async withstatement for resource management- Implementing
__aenter__and__aexit__methods - Managing async resources: database connections, file handles, HTTP sessions
- Proper cleanup in async context managers
- Real examples:
aiohttp.ClientSession,asyncpg.connect()
class AsyncResource:
async def __aenter__(self):
# Acquire resource asynchronously
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Release resource asynchronously
await self.disconnect()async with AsyncResource() as resource:
await resource.do_something()
12.2 Async Iterators and Generators
async forloops for asynchronous iteration- Implementing
__aiter__and__anext__methods async defgenerators withyield- Streaming data processing with async generators
- Use cases: paginated API results, database cursors, file streaming
- Combining async iteration with task management
๐ก When to Use Async Iterators
Perfect for streaming data from sources that produce results over time:
- Paginated API endpoints (fetch pages as needed)
- Database result cursors (stream rows without loading all)
- Real-time data feeds (WebSockets, SSE)
- Large file processing (read chunks asynchronously)
12.3 Synchronization Primitives
asyncio.Lock- mutual exclusion for async codeasyncio.Event- signaling between tasksasyncio.Semaphore- limiting concurrent operations- Preventing race conditions in async code
- Rate limiting with semaphores (max N concurrent requests)
- Queue-based task distribution:
asyncio.Queue
# Rate limiting with Semaphore
semaphore = asyncio.Semaphore(5) # Max 5 concurrentasync def limited_fetch(url):
async with semaphore:
return await fetch(url)
๐๏ธ Lab Exercise: Async Queue Processing System
Task: Build an async worker pool that processes jobs from a queue with rate limiting.
Requirements:
- Create an
asyncio.Queueto hold jobs (URLs to fetch) - Create 5 worker coroutines that consume jobs from the queue
- Use
asyncio.Semaphoreto limit to 3 concurrent requests across all workers - Each worker should process jobs until the queue is empty
- Track successful and failed job counts per worker
- Implement graceful shutdown when queue is empty
- Print statistics: jobs processed per worker, total time
Hint: Your solution structure should look like:
- Create queue:
queue = asyncio.Queue() - Add jobs:
await queue.put(url) - Worker loop:
while True: job = await queue.get(); process(job); queue.task_done() - Use semaphore inside worker:
async with semaphore: await fetch(url) - Wait for completion:
await queue.join() - Cancel workers after queue is empty
13.1 Async HTTP Requests
- Using
aiohttplibrary for async HTTP requests - Making parallel API calls efficiently (10x-100x faster)
- Session management and connection pooling
- Request headers, authentication, and cookies
- POST requests with JSON data
- Streaming large responses
import aiohttpasync def fetch_api_data():
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
13.2 Async Database Operations
- Using
asyncpgfor PostgreSQL (fastest Python PostgreSQL driver) - Connection pooling for database efficiency
- Transaction handling with async context managers
- Batch operations and bulk inserts
- Prepared statements and query optimization
- Other async database drivers:
aiomysql,motor(MongoDB)
๐ Why Async Databases?
Database operations are I/O-bound. While waiting for query results, async allows handling other requests:
- Handle 1000s of concurrent database queries
- Prevent blocking on slow queries
- Efficient connection pool utilization
- Perfect for web APIs and microservices
13.3 Async File I/O
- Using
aiofileslibrary for async file operations - When async file operations matter (many small files, network file systems)
- Reading and writing files without blocking
- Processing multiple files concurrently
- Streaming large files efficiently
13.4 Mixing Sync and Async Code
asyncio.to_thread()- running blocking code in threads (Python 3.9+)- Running sync code in executor pools
- Best practices for hybrid codebases (mixing async and sync)
- Bridging sync libraries in async applications
- When to refactor sync code vs wrap it
- Common pitfalls and how to avoid them
# Running blocking code in async function
async def process_data():
# This blocks the event loop - BAD!
result = blocking_function()
# This runs in thread pool - GOOD!
result = await asyncio.to_thread(blocking_function)
๐๏ธ Lab Exercise: Async Data Pipeline
Task: Build a data pipeline that fetches, processes, and stores data asynchronously.
Requirements:
- Stage 1: Fetch JSON data from 5-10 API endpoints concurrently
- Stage 2: Process/transform the data (extract specific fields, calculate values)
- Stage 3: Store results to JSON files using
aiofiles - Use
asyncio.Queueto pass data between stages - Implement 3 worker coroutines for each stage
- Add proper error handling and logging at each stage
- Track and display pipeline statistics (throughput, errors)
Hint: Your solution structure should look like:
- Create two queues:
fetch_queueandprocess_queue - Fetcher workers: get from fetch_queue, fetch data, put in process_queue
- Processor workers: get from process_queue, transform, write to file
- Use
asyncio.gather()to run all workers concurrently - Signal completion with sentinel values in queues
- Test APIs: JSONPlaceholder, CoinGecko, OpenWeather
14.1 Async Frameworks
- FastAPI: modern async web framework with automatic API docs
- Building REST APIs with async route handlers
- aiohttp server for HTTP servers and WebSocket support
- Comparing async frameworks: FastAPI vs Sanic vs Quart
- When async frameworks outperform sync (Flask, Django)
- Middleware and dependency injection in async frameworks
๐ก FastAPI Example
from fastapi import FastAPIapp = FastAPI()@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await db.fetch_user(user_id)
return user
14.2 Testing Async Code
- Using
pytest-asyncioplugin for async tests - Writing async test functions with
@pytest.mark.asyncio - Mocking async functions with
AsyncMock - Testing concurrent behavior and race conditions
- Using fixtures for async resources (databases, HTTP clients)
- Best practices for async test organization
import pytest# Async test function
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data("https://api.example.com")
assert result["status"] == "success"
14.3 Performance Optimization
- Profiling async applications with
asynciodebug mode - Identifying common bottlenecks in async code
- Connection pooling and reusing sessions
- Optimizing task scheduling and prioritization
- Memory management in long-running async applications
- Async best practices checklist for production
โก Async Performance Tips
- Reuse
ClientSessionobjects (don't create per request) - Use connection pooling for databases
- Set appropriate timeouts (prevent hanging tasks)
- Limit concurrency with
Semaphore(prevent overwhelming servers) - Profile with
PYTHONASYNCIODEBUG=1 - Close resources properly (avoid resource leaks)
๐๏ธ Lab Exercise: Add Tests to Async Scraper
Task: Write comprehensive tests for the async web scraper built in previous exercises.
Requirements:
- Install and configure
pytest-asyncio - Create test fixtures for mock HTTP responses using
aioresponses - Write tests for successful URL fetching
- Write tests for timeout handling
- Write tests for error handling (404, network errors)
- Test concurrent execution with multiple URLs
- Verify that semaphore limits work correctly
- Achieve >80% code coverage
Hint: Your solution structure should look like:
- Install:
pip install pytest-asyncio aioresponses - Fixture:
@pytest.fixture def mock_aiohttp(aioresponses): ... - Mock response:
aioresponses.get(url, status=200, body='...') - Test timeout:
aioresponses.get(url, exception=asyncio.TimeoutError()) - Use
@pytest.mark.asyncioon all async test functions - Assert on results, error counts, and execution time
๐ Part 3 Capstone Project: Real-Time Dashboard System
Build a real-time data aggregation and streaming dashboard that demonstrates async mastery!
Project Requirements:
- Multi-Source Data Fetching
- Fetch data from at least 5 different APIs concurrently
- Support multiple data sources: REST APIs, WebSocket streams, RSS feeds
- Implement retry logic with exponential backoff for failed requests
- Use connection pooling for efficient HTTP connections
- Handle rate limiting from external APIs
- Async Data Processing Pipeline
- Use
asyncio.Queuefor data flow between stages - Stage 1: Fetch - concurrent API calls with rate limiting
- Stage 2: Transform - process and aggregate data
- Stage 3: Store - persist to database using
asyncpg - Implement worker pools for each stage (3-5 workers per stage)
- Use
- WebSocket Streaming Server
- Build WebSocket server using
aiohttporwebsockets - Stream real-time updates to connected clients
- Support multiple concurrent client connections (target: 100+)
- Broadcast updates to all clients when new data arrives
- Handle client connect/disconnect gracefully
- Build WebSocket server using
- Error Handling & Monitoring
- Comprehensive error handling for all async operations
- Graceful degradation when data sources are unavailable
- Health check endpoint for monitoring system status
- Metrics tracking: requests/second, error rates, latency
- Logging with proper async-safe logging configuration
- Performance & Scalability
- Use semaphores to limit concurrent requests to external APIs
- Implement caching for frequently requested data
- Database connection pooling (10-20 connections)
- Optimize memory usage for long-running processes
- Target: Handle 1000+ concurrent WebSocket connections
Example Architecture:
# Main Application Structureclass DashboardSystem:
def __init__(self):
self.fetch_queue = asyncio.Queue()
self.process_queue = asyncio.Queue()
self.clients = set() # WebSocket clients
self.db_pool = None
async def start(self):
# Initialize database pool
self.db_pool = await asyncpg.create_pool(DATABASE_URL)
# Start worker pools
fetchers = [self.fetch_worker() for _ in range(5)]
processors = [self.process_worker() for _ in range(3)]
# Start WebSocket server
ws_server = self.start_websocket_server()
# Run all concurrently
await asyncio.gather(*fetchers, *processors, ws_server)
async def fetch_worker(self):
# Fetch data from APIs
pass
async def process_worker(self):
# Process and store data
pass
async def broadcast(self, data):
# Send updates to all WebSocket clients
if self.clients:
await asyncio.gather(*[
client.send_json(data) for client in self.clients
])
Suggested Data Sources:
- ๐ Stock Prices: Alpha Vantage, IEX Cloud, or Yahoo Finance API
- ๐ฆ๏ธ Weather Data: OpenWeatherMap API
- ๐ฐ News Headlines: NewsAPI or RSS feeds
- ๐ฑ Cryptocurrency: CoinGecko or Binance API
- ๐ฆ Social Media: Twitter API or Reddit API
Evaluation Criteria:
- โ Proper use of async/await throughout the application
- โ Efficient concurrent task execution with appropriate coordination
- โ Robust error handling and graceful degradation
- โ Clean code with async best practices (context managers, generators)
- โ
Comprehensive testing with
pytest-asyncio - โ Performance metrics and monitoring
- โ Documentation and type hints
- โ Can handle 100+ concurrent WebSocket connections
Bonus Challenges:
- ๐ Add authentication for WebSocket connections
- ๐ Implement data visualization dashboard (HTML/JavaScript frontend)
- ๐ Add database query caching with TTL
- ๐ Implement graceful shutdown with cleanup
- ๐ Add admin API for system control (start/stop data sources)
- ๐ Deploy to cloud with Docker and load testing results
๐ Part 3 Summary
What You've Learned:
- Module 9: Async concepts, concurrency models, and event loop fundamentals
- Module 10: Async/await syntax, coroutines, and awaitable objects
- Module 11: Task management, coordination, and error handling
- Module 12: Async patterns including context managers, iterators, and synchronization
- Module 13: Real-world applications with HTTP, databases, and file I/O
- Module 14: Advanced topics including frameworks, testing, and optimization
Key Takeaways:
- Async is for I/O-bound tasks - don't use it for CPU-intensive work
- Event loop is single-threaded - never block it with sync operations
- await is mandatory - always await coroutines or they won't execute
- gather() for parallel execution - run multiple tasks concurrently
- Semaphores limit concurrency - prevent overwhelming external services
- Context managers for resources - use async with for cleanup
- Testing requires pytest-asyncio - mark tests with @pytest.mark.asyncio
- Performance comes from concurrency - not speed of individual operations
๐ฏ You're Now Ready For:
- Building high-performance async web applications with FastAPI
- Creating concurrent data processing pipelines
- Developing real-time systems with WebSockets
- Optimizing I/O-bound applications for maximum throughput
- Contributing to async Python projects and libraries
Next Steps:
- Complete the Final Integration Project combining all three parts
- Explore advanced async libraries:
trio,anyio - Study async implementations in popular frameworks (FastAPI, aiohttp)
- Build your own async library or contribute to open source
๐ Final Integration Project (Optional Bonus)
Distributed Task Processing System
Combine everything you've learned from all three parts into one comprehensive project!
System Overview:
Build a distributed task processing system that accepts jobs, distributes them to workers, executes them asynchronously, and provides real-time monitoring. This project integrates OOP design (Part 1), decorators (Part 2), and async programming (Part 3).
Core Components:
1. Task System (OOP - Part 1)
- Base
Taskclass with common functionality - Specialized task types:
HTTPTask,DataProcessingTask,EmailTask - Use polymorphism for different task execution strategies
- Abstract base class for worker types
- Task queue implementations using composition
2. Decorator-Based Features (Decorators - Part 2)
@taskdecorator for task registration@retry(max_attempts=3, backoff=2)for automatic retries@timeout(seconds=30)for task timeouts@priority(level=5)for task prioritization@log_executionfor automatic logging@measure_performancefor metrics collection
3. Async Execution Engine (Async - Part 3)
- Async task queue using
asyncio.PriorityQueue - Worker pool with configurable concurrency
- Async task execution with proper error handling
- Real-time WebSocket updates for task status
- Concurrent result aggregation and storage
Example Usage:
# Define custom task with decorators
@task("api_fetch")
@retry(max_attempts=3, backoff=2)
@timeout(seconds=30)
@log_execution
async def fetch_api_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()# Create and run task system
system = TaskProcessingSystem(max_workers=10)
await system.start()# Submit tasks
task1 = fetch_api_data("https://api.example.com/data")
task2 = process_data(input_file="data.csv")results = await system.submit_tasks([task1, task2])
print(results)
Advanced Features:
- ๐ Monitoring Dashboard: Real-time WebSocket dashboard showing active tasks, completion rates, errors
- ๐ Task Dependencies: Tasks that wait for other tasks to complete
- โ๏ธ Load Balancing: Distribute tasks across multiple worker processes
- ๐พ Persistent Queue: Store tasks in database for crash recovery
- ๐ Notifications: Async email/Slack notifications on task completion/failure
- ๐ Metrics & Analytics: Track execution times, success rates, resource usage
Technical Requirements:
- โ Use classes and inheritance for task types and workers (Part 1 OOP)
- โ Implement at least 5 custom decorators for task management (Part 2)
- โ Async execution engine with concurrent worker pools (Part 3)
- โ Priority queue implementation with async operations
- โ Comprehensive error handling and retry logic
- โ WebSocket server for real-time monitoring
- โ
Database integration with
asyncpg - โ Unit tests covering all three parts (OOP, decorators, async)
- โ Type hints throughout the codebase
- โ Complete documentation and README
๐ Congratulations! By completing this integration project, you've demonstrated mastery of advanced Python programming across three critical domains: Object-Oriented Programming, Decorators, and Asynchronous Programming. You're now equipped to build sophisticated, production-ready Python applications!

