Insightsโ†’Advance Python : Part 3
Python

Advance Python : Part 3

Gaurav ChopraGaurav ChopraยทNovember 15, 2025
PART 3 OF 3

Advanced Python

Asynchronous Programming

๐Ÿš€ Welcome to Asynchronous Programming!

Master the power of async/await to build high-performance applications that handle thousands of concurrent operations efficiently. Learn how to write non-blocking code, manage concurrent tasks, and build real-time systems that scale. This module will transform how you think about I/O-bound operations and concurrent execution.

MODULE 9
Understanding Asynchronous Concepts
โ–ผ

9.1 Synchronous vs Asynchronous

  • Blocking vs non-blocking operations - what makes code wait?
  • When async makes sense: I/O-bound vs CPU-bound tasks
  • The problem async solves: waiting efficiently instead of idling
  • Real-world analogy: Coffee shop order processing
  • Understanding throughput vs latency trade-offs

๐Ÿ”‘ Key Concept: Blocking vs Non-Blocking

Synchronous (Blocking): Your code waits for each operation to complete before moving to the next. Like standing in line at a single checkout counter.

Asynchronous (Non-Blocking): Your code starts an operation and continues with other work while waiting. Like ordering coffee and sitting down while it's being made.

9.2 Concurrency Models

  • Threading: Parallel execution with shared memory (GIL limitations)
  • Multiprocessing: True parallelism with separate memory spaces
  • Async: Cooperative multitasking on a single thread
  • The Global Interpreter Lock (GIL) - why it matters for Python
  • Choosing the right concurrency model for your use case
  • Combining approaches: when to use threads, processes, and async together

๐Ÿ’ก When to Use What?

  • Async: I/O-bound tasks (API calls, database queries, file operations)
  • Threading: I/O-bound tasks with libraries that don't support async
  • Multiprocessing: CPU-bound tasks (data processing, image manipulation)

9.3 Event Loop Fundamentals

  • What is the event loop and how does it work?
  • How async/await works under the hood with coroutines
  • Cooperative multitasking: yielding control voluntarily
  • Understanding the execution model: single-threaded concurrency
  • Event loop scheduling and task prioritization
  • Common misconceptions about async Python

๐Ÿ‹๏ธ Lab Exercise: Compare Sync vs Async API Calls

Task: Compare the performance of synchronous vs asynchronous API calls to understand the efficiency gains.

Requirements:

  • Create a synchronous function that fetches data from 10 different URLs sequentially
  • Create an asynchronous function that fetches the same URLs concurrently
  • Use time.time() to measure execution time for both approaches
  • Print timing results and calculate the speedup factor
  • Use httpx or aiohttp for async requests
  • Handle errors gracefully in both implementations

Hint: Your solution structure should look like:

  • fetch_sync(): Use requests.get() in a loop
  • fetch_async(): Use aiohttp.ClientSession() with asyncio.gather()
  • Test URLs: Use JSONPlaceholder, httpbin.org, or any public API
  • Expected result: Async should be 5-10x faster for 10 URLs
  • Don't forget: await session.get(url) and await response.text()
MODULE 10
Async/Await Syntax
โ–ผ

10.1 Core Syntax

  • async def functions - defining coroutines
  • await keyword - pausing execution to wait for results
  • Running async functions with asyncio.run()
  • Understanding coroutine objects and their lifecycle
  • Syntax rules: when you can and cannot use await
import asyncioasync def greet(name):
# await only works inside async functions
await asyncio.sleep(1) # Non-blocking sleep
return f"Hello, {name}!"# Run the async function
result = asyncio.run(greet("Python"))
print(result) # Output: Hello, Python!

10.2 Creating Coroutines

  • Writing your first async function from scratch
  • asyncio.sleep() vs time.sleep() - the critical difference
  • Common mistakes: forgetting await, using blocking code
  • Debugging async functions: tools and techniques
  • Coroutine states: created, running, suspended, completed

โš ๏ธ Common Pitfall: Blocking the Event Loop

Never use time.sleep() in async functions! It blocks the entire event loop.

โŒ Bad: time.sleep(1)

โœ… Good: await asyncio.sleep(1)

10.3 Awaitable Objects

  • Understanding Coroutines, Tasks, and Futures
  • When to await and when not to await
  • Converting between coroutines and tasks
  • The awaitable protocol in Python
  • Creating custom awaitable objects

๐Ÿ‹๏ธ Lab Exercise: Build Async URL Fetcher

Task: Create async functions that fetch and process data from multiple URLs concurrently.

Requirements:

  • Create an async def fetch_url(url) function using aiohttp
  • Create an async def fetch_all(urls) that fetches multiple URLs concurrently
  • Return a list of response status codes and content lengths
  • Add error handling for failed requests (timeouts, network errors)
  • Print progress as each URL completes
  • Test with at least 5 different URLs

Hint: Your solution structure should look like:

  • Use async with aiohttp.ClientSession() as session:
  • In fetch_url: async with session.get(url) as response:
  • Use asyncio.gather(*tasks) to run all fetches concurrently
  • Wrap in try/except to handle aiohttp.ClientError
  • Return tuple: (url, status, length) or (url, error)
MODULE 11
Managing Concurrent Tasks
โ–ผ

11.1 Task Creation and Execution

  • asyncio.create_task() - scheduling coroutines for execution
  • Running multiple tasks concurrently without waiting
  • asyncio.gather() - collecting results from multiple tasks
  • asyncio.wait() - advanced task coordination
  • Differences between gather() and wait()
  • Task cancellation and cleanup
async def main():
# Create tasks (start running immediately)
task1 = asyncio.create_task(fetch_data("api1"))
task2 = asyncio.create_task(fetch_data("api2"))
task3 = asyncio.create_task(fetch_data("api3"))

# Wait for all tasks to complete
results = await asyncio.gather(task1, task2, task3)
return results

11.2 Task Coordination

  • Waiting for first completion: asyncio.wait_for()
  • Setting timeouts to prevent hanging operations
  • asyncio.as_completed() - processing results as they arrive
  • Cancelling tasks gracefully with task.cancel()
  • Handling CancelledError exceptions
  • Task groups and structured concurrency (Python 3.11+)

๐Ÿ”‘ gather() vs wait() vs as_completed()

  • gather(): Wait for all tasks, return results in order, fail fast on errors
  • wait(): More control, get done/pending sets, handle partial completion
  • as_completed(): Process results as soon as each task finishes (unordered)

11.3 Error Handling in Async Code

  • Try/except blocks in async functions - same as sync but with await
  • Handling exceptions from multiple concurrent tasks
  • return_exceptions=True in gather()
  • Graceful shutdown patterns and cleanup
  • Timeout handling with asyncio.wait_for()
  • Best practices for robust async error handling

๐Ÿ‹๏ธ Lab Exercise: Concurrent Web Scraper with Timeouts

Task: Build a web scraper that fetches multiple pages concurrently with proper timeout and error handling.

Requirements:

  • Create a list of 10-15 URLs to scrape
  • Implement async def scrape_page(url, timeout=10) with timeout handling
  • Use asyncio.gather() with return_exceptions=True
  • Extract and return page titles from successful requests
  • Log errors for failed requests (timeouts, 404s, network errors)
  • Print summary: total URLs, successful, failed, total time
  • Use asyncio.as_completed() to show progress

Hint: Your solution structure should look like:

  • Wrap session.get() in asyncio.wait_for(coro, timeout=10)
  • Use BeautifulSoup to extract <title> tags
  • Handle asyncio.TimeoutError, aiohttp.ClientError
  • Create tasks: [scrape_page(url) for url in urls]
  • Use for coro in asyncio.as_completed(tasks): for progress
MODULE 12
Async Patterns and Best Practices
โ–ผ

12.1 Async Context Managers

  • async with statement for resource management
  • Implementing __aenter__ and __aexit__ methods
  • Managing async resources: database connections, file handles, HTTP sessions
  • Proper cleanup in async context managers
  • Real examples: aiohttp.ClientSession, asyncpg.connect()
class AsyncResource:
async def __aenter__(self):
# Acquire resource asynchronously
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# Release resource asynchronously
await self.disconnect()async with AsyncResource() as resource:
await resource.do_something()

12.2 Async Iterators and Generators

  • async for loops for asynchronous iteration
  • Implementing __aiter__ and __anext__ methods
  • async def generators with yield
  • Streaming data processing with async generators
  • Use cases: paginated API results, database cursors, file streaming
  • Combining async iteration with task management

๐Ÿ’ก When to Use Async Iterators

Perfect for streaming data from sources that produce results over time:

  • Paginated API endpoints (fetch pages as needed)
  • Database result cursors (stream rows without loading all)
  • Real-time data feeds (WebSockets, SSE)
  • Large file processing (read chunks asynchronously)

12.3 Synchronization Primitives

  • asyncio.Lock - mutual exclusion for async code
  • asyncio.Event - signaling between tasks
  • asyncio.Semaphore - limiting concurrent operations
  • Preventing race conditions in async code
  • Rate limiting with semaphores (max N concurrent requests)
  • Queue-based task distribution: asyncio.Queue
# Rate limiting with Semaphore
semaphore = asyncio.Semaphore(5) # Max 5 concurrentasync def limited_fetch(url):
async with semaphore:
return await fetch(url)

๐Ÿ‹๏ธ Lab Exercise: Async Queue Processing System

Task: Build an async worker pool that processes jobs from a queue with rate limiting.

Requirements:

  • Create an asyncio.Queue to hold jobs (URLs to fetch)
  • Create 5 worker coroutines that consume jobs from the queue
  • Use asyncio.Semaphore to limit to 3 concurrent requests across all workers
  • Each worker should process jobs until the queue is empty
  • Track successful and failed job counts per worker
  • Implement graceful shutdown when queue is empty
  • Print statistics: jobs processed per worker, total time

Hint: Your solution structure should look like:

  • Create queue: queue = asyncio.Queue()
  • Add jobs: await queue.put(url)
  • Worker loop: while True: job = await queue.get(); process(job); queue.task_done()
  • Use semaphore inside worker: async with semaphore: await fetch(url)
  • Wait for completion: await queue.join()
  • Cancel workers after queue is empty
MODULE 13
Real-World Async Applications
โ–ผ

13.1 Async HTTP Requests

  • Using aiohttp library for async HTTP requests
  • Making parallel API calls efficiently (10x-100x faster)
  • Session management and connection pooling
  • Request headers, authentication, and cookies
  • POST requests with JSON data
  • Streaming large responses
import aiohttpasync def fetch_api_data():
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]

13.2 Async Database Operations

  • Using asyncpg for PostgreSQL (fastest Python PostgreSQL driver)
  • Connection pooling for database efficiency
  • Transaction handling with async context managers
  • Batch operations and bulk inserts
  • Prepared statements and query optimization
  • Other async database drivers: aiomysql, motor (MongoDB)

๐Ÿ”‘ Why Async Databases?

Database operations are I/O-bound. While waiting for query results, async allows handling other requests:

  • Handle 1000s of concurrent database queries
  • Prevent blocking on slow queries
  • Efficient connection pool utilization
  • Perfect for web APIs and microservices

13.3 Async File I/O

  • Using aiofiles library for async file operations
  • When async file operations matter (many small files, network file systems)
  • Reading and writing files without blocking
  • Processing multiple files concurrently
  • Streaming large files efficiently

13.4 Mixing Sync and Async Code

  • asyncio.to_thread() - running blocking code in threads (Python 3.9+)
  • Running sync code in executor pools
  • Best practices for hybrid codebases (mixing async and sync)
  • Bridging sync libraries in async applications
  • When to refactor sync code vs wrap it
  • Common pitfalls and how to avoid them
# Running blocking code in async function
async def process_data():
# This blocks the event loop - BAD!
result = blocking_function()

# This runs in thread pool - GOOD!
result = await asyncio.to_thread(blocking_function)

๐Ÿ‹๏ธ Lab Exercise: Async Data Pipeline

Task: Build a data pipeline that fetches, processes, and stores data asynchronously.

Requirements:

  • Stage 1: Fetch JSON data from 5-10 API endpoints concurrently
  • Stage 2: Process/transform the data (extract specific fields, calculate values)
  • Stage 3: Store results to JSON files using aiofiles
  • Use asyncio.Queue to pass data between stages
  • Implement 3 worker coroutines for each stage
  • Add proper error handling and logging at each stage
  • Track and display pipeline statistics (throughput, errors)

Hint: Your solution structure should look like:

  • Create two queues: fetch_queue and process_queue
  • Fetcher workers: get from fetch_queue, fetch data, put in process_queue
  • Processor workers: get from process_queue, transform, write to file
  • Use asyncio.gather() to run all workers concurrently
  • Signal completion with sentinel values in queues
  • Test APIs: JSONPlaceholder, CoinGecko, OpenWeather
MODULE 14
Advanced Async Topics
โ–ผ

14.1 Async Frameworks

  • FastAPI: modern async web framework with automatic API docs
  • Building REST APIs with async route handlers
  • aiohttp server for HTTP servers and WebSocket support
  • Comparing async frameworks: FastAPI vs Sanic vs Quart
  • When async frameworks outperform sync (Flask, Django)
  • Middleware and dependency injection in async frameworks

๐Ÿ’ก FastAPI Example

from fastapi import FastAPIapp = FastAPI()@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await db.fetch_user(user_id)
return user

14.2 Testing Async Code

  • Using pytest-asyncio plugin for async tests
  • Writing async test functions with @pytest.mark.asyncio
  • Mocking async functions with AsyncMock
  • Testing concurrent behavior and race conditions
  • Using fixtures for async resources (databases, HTTP clients)
  • Best practices for async test organization
import pytest# Async test function
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data("https://api.example.com")
assert result["status"] == "success"

14.3 Performance Optimization

  • Profiling async applications with asyncio debug mode
  • Identifying common bottlenecks in async code
  • Connection pooling and reusing sessions
  • Optimizing task scheduling and prioritization
  • Memory management in long-running async applications
  • Async best practices checklist for production

โšก Async Performance Tips

  1. Reuse ClientSession objects (don't create per request)
  2. Use connection pooling for databases
  3. Set appropriate timeouts (prevent hanging tasks)
  4. Limit concurrency with Semaphore (prevent overwhelming servers)
  5. Profile with PYTHONASYNCIODEBUG=1
  6. Close resources properly (avoid resource leaks)

๐Ÿ‹๏ธ Lab Exercise: Add Tests to Async Scraper

Task: Write comprehensive tests for the async web scraper built in previous exercises.

Requirements:

  • Install and configure pytest-asyncio
  • Create test fixtures for mock HTTP responses using aioresponses
  • Write tests for successful URL fetching
  • Write tests for timeout handling
  • Write tests for error handling (404, network errors)
  • Test concurrent execution with multiple URLs
  • Verify that semaphore limits work correctly
  • Achieve >80% code coverage

Hint: Your solution structure should look like:

  • Install: pip install pytest-asyncio aioresponses
  • Fixture: @pytest.fixture def mock_aiohttp(aioresponses): ...
  • Mock response: aioresponses.get(url, status=200, body='...')
  • Test timeout: aioresponses.get(url, exception=asyncio.TimeoutError())
  • Use @pytest.mark.asyncio on all async test functions
  • Assert on results, error counts, and execution time

๐ŸŽ“ Part 3 Capstone Project: Real-Time Dashboard System

Build a real-time data aggregation and streaming dashboard that demonstrates async mastery!

Project Requirements:

  1. Multi-Source Data Fetching
    • Fetch data from at least 5 different APIs concurrently
    • Support multiple data sources: REST APIs, WebSocket streams, RSS feeds
    • Implement retry logic with exponential backoff for failed requests
    • Use connection pooling for efficient HTTP connections
    • Handle rate limiting from external APIs
  2. Async Data Processing Pipeline
    • Use asyncio.Queue for data flow between stages
    • Stage 1: Fetch - concurrent API calls with rate limiting
    • Stage 2: Transform - process and aggregate data
    • Stage 3: Store - persist to database using asyncpg
    • Implement worker pools for each stage (3-5 workers per stage)
  3. WebSocket Streaming Server
    • Build WebSocket server using aiohttp or websockets
    • Stream real-time updates to connected clients
    • Support multiple concurrent client connections (target: 100+)
    • Broadcast updates to all clients when new data arrives
    • Handle client connect/disconnect gracefully
  4. Error Handling & Monitoring
    • Comprehensive error handling for all async operations
    • Graceful degradation when data sources are unavailable
    • Health check endpoint for monitoring system status
    • Metrics tracking: requests/second, error rates, latency
    • Logging with proper async-safe logging configuration
  5. Performance & Scalability
    • Use semaphores to limit concurrent requests to external APIs
    • Implement caching for frequently requested data
    • Database connection pooling (10-20 connections)
    • Optimize memory usage for long-running processes
    • Target: Handle 1000+ concurrent WebSocket connections

Example Architecture:

# Main Application Structureclass DashboardSystem:
def __init__(self):
self.fetch_queue = asyncio.Queue()
self.process_queue = asyncio.Queue()
self.clients = set() # WebSocket clients
self.db_pool = None

async def start(self):
# Initialize database pool
self.db_pool = await asyncpg.create_pool(DATABASE_URL)

# Start worker pools
fetchers = [self.fetch_worker() for _ in range(5)]
processors = [self.process_worker() for _ in range(3)]

# Start WebSocket server
ws_server = self.start_websocket_server()

# Run all concurrently
await asyncio.gather(*fetchers, *processors, ws_server)

async def fetch_worker(self):
# Fetch data from APIs
pass

async def process_worker(self):
# Process and store data
pass

async def broadcast(self, data):
# Send updates to all WebSocket clients
if self.clients:
await asyncio.gather(*[
client.send_json(data) for client in self.clients
])

Suggested Data Sources:

  • ๐Ÿ“ˆ Stock Prices: Alpha Vantage, IEX Cloud, or Yahoo Finance API
  • ๐ŸŒฆ๏ธ Weather Data: OpenWeatherMap API
  • ๐Ÿ“ฐ News Headlines: NewsAPI or RSS feeds
  • ๐Ÿ’ฑ Cryptocurrency: CoinGecko or Binance API
  • ๐Ÿฆ Social Media: Twitter API or Reddit API

Evaluation Criteria:

  • โœ… Proper use of async/await throughout the application
  • โœ… Efficient concurrent task execution with appropriate coordination
  • โœ… Robust error handling and graceful degradation
  • โœ… Clean code with async best practices (context managers, generators)
  • โœ… Comprehensive testing with pytest-asyncio
  • โœ… Performance metrics and monitoring
  • โœ… Documentation and type hints
  • โœ… Can handle 100+ concurrent WebSocket connections

Bonus Challenges:

  • ๐ŸŒŸ Add authentication for WebSocket connections
  • ๐ŸŒŸ Implement data visualization dashboard (HTML/JavaScript frontend)
  • ๐ŸŒŸ Add database query caching with TTL
  • ๐ŸŒŸ Implement graceful shutdown with cleanup
  • ๐ŸŒŸ Add admin API for system control (start/stop data sources)
  • ๐ŸŒŸ Deploy to cloud with Docker and load testing results

๐Ÿ“š Part 3 Summary

What You've Learned:

  • Module 9: Async concepts, concurrency models, and event loop fundamentals
  • Module 10: Async/await syntax, coroutines, and awaitable objects
  • Module 11: Task management, coordination, and error handling
  • Module 12: Async patterns including context managers, iterators, and synchronization
  • Module 13: Real-world applications with HTTP, databases, and file I/O
  • Module 14: Advanced topics including frameworks, testing, and optimization

Key Takeaways:

  1. Async is for I/O-bound tasks - don't use it for CPU-intensive work
  2. Event loop is single-threaded - never block it with sync operations
  3. await is mandatory - always await coroutines or they won't execute
  4. gather() for parallel execution - run multiple tasks concurrently
  5. Semaphores limit concurrency - prevent overwhelming external services
  6. Context managers for resources - use async with for cleanup
  7. Testing requires pytest-asyncio - mark tests with @pytest.mark.asyncio
  8. Performance comes from concurrency - not speed of individual operations

๐ŸŽฏ You're Now Ready For:

  • Building high-performance async web applications with FastAPI
  • Creating concurrent data processing pipelines
  • Developing real-time systems with WebSockets
  • Optimizing I/O-bound applications for maximum throughput
  • Contributing to async Python projects and libraries

Next Steps:

  • Complete the Final Integration Project combining all three parts
  • Explore advanced async libraries: trio, anyio
  • Study async implementations in popular frameworks (FastAPI, aiohttp)
  • Build your own async library or contribute to open source

๐Ÿš€ Final Integration Project (Optional Bonus)

Distributed Task Processing System

Combine everything you've learned from all three parts into one comprehensive project!

System Overview:

Build a distributed task processing system that accepts jobs, distributes them to workers, executes them asynchronously, and provides real-time monitoring. This project integrates OOP design (Part 1), decorators (Part 2), and async programming (Part 3).

Core Components:

1. Task System (OOP - Part 1)

  • Base Task class with common functionality
  • Specialized task types: HTTPTask, DataProcessingTask, EmailTask
  • Use polymorphism for different task execution strategies
  • Abstract base class for worker types
  • Task queue implementations using composition

2. Decorator-Based Features (Decorators - Part 2)

  • @task decorator for task registration
  • @retry(max_attempts=3, backoff=2) for automatic retries
  • @timeout(seconds=30) for task timeouts
  • @priority(level=5) for task prioritization
  • @log_execution for automatic logging
  • @measure_performance for metrics collection

3. Async Execution Engine (Async - Part 3)

  • Async task queue using asyncio.PriorityQueue
  • Worker pool with configurable concurrency
  • Async task execution with proper error handling
  • Real-time WebSocket updates for task status
  • Concurrent result aggregation and storage

Example Usage:

# Define custom task with decorators
@task("api_fetch")
@retry(max_attempts=3, backoff=2)
@timeout(seconds=30)
@log_execution
async def fetch_api_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()# Create and run task system
system = TaskProcessingSystem(max_workers=10)
await system.start()# Submit tasks
task1 = fetch_api_data("https://api.example.com/data")
task2 = process_data(input_file="data.csv")results = await system.submit_tasks([task1, task2])
print(results)

Advanced Features:

  • ๐Ÿ“Š Monitoring Dashboard: Real-time WebSocket dashboard showing active tasks, completion rates, errors
  • ๐Ÿ”„ Task Dependencies: Tasks that wait for other tasks to complete
  • โš–๏ธ Load Balancing: Distribute tasks across multiple worker processes
  • ๐Ÿ’พ Persistent Queue: Store tasks in database for crash recovery
  • ๐Ÿ”” Notifications: Async email/Slack notifications on task completion/failure
  • ๐Ÿ“ˆ Metrics & Analytics: Track execution times, success rates, resource usage

Technical Requirements:

  • โœ… Use classes and inheritance for task types and workers (Part 1 OOP)
  • โœ… Implement at least 5 custom decorators for task management (Part 2)
  • โœ… Async execution engine with concurrent worker pools (Part 3)
  • โœ… Priority queue implementation with async operations
  • โœ… Comprehensive error handling and retry logic
  • โœ… WebSocket server for real-time monitoring
  • โœ… Database integration with asyncpg
  • โœ… Unit tests covering all three parts (OOP, decorators, async)
  • โœ… Type hints throughout the codebase
  • โœ… Complete documentation and README

๐Ÿ† Congratulations! By completing this integration project, you've demonstrated mastery of advanced Python programming across three critical domains: Object-Oriented Programming, Decorators, and Asynchronous Programming. You're now equipped to build sophisticated, production-ready Python applications!

Gaurav Chopra
Gaurav Chopra

Gaurav is a Co-Founder of Eightgen AI

Work with us

Found this useful? Let's talk about your build.

We write about what we build. If any of this resonates with a challenge you're facing, book a free 30-minute call โ€” no prep needed.