Học async processing và message queues: sync vs async trade-offs, background jobs, retry strategies, dead letter queues, event-driven architecture, và idempotency. Hiểu cách defer work để improve user experience và system scalability.
Chia sẻ bài học
User click "Submit Order" trên e-commerce site.
Synchronous approach:
1. Validate order (100ms)
2. Check inventory (200ms)
3. Process payment (500ms)
4. Update database (100ms)
5. Send confirmation email (300ms)
6. Generate invoice PDF (400ms)
7. Notify warehouse (200ms)
8. Update analytics (100ms)
───────────────────────────────────
Total: 1900ms (gần 2 giây!)
User đợi 2 giây mới thấy "Order Successful". Experience tệ.
Asynchronous approach:
1. Validate order (100ms)
2. Check inventory (200ms)
3. Process payment (500ms)
4. Update database (100ms)
5. Queue remaining tasks (10ms)
───────────────────────────────────
Return "Order Successful": 910ms
Background (async):
- Send email
- Generate PDF
- Notify warehouse
- Update analytics
User chỉ đợi ~1 giây. Experience tốt hơn 2x.
Đây chính là power của async processing.
Lesson này dạy bạn:
Request → Process → Response. User đợi toàn bộ.
sequenceDiagram
participant User
participant API
participant DB
participant Email
User->>API: Submit order
API->>DB: Save order
DB-->>API: OK
API->>Email: Send confirmation
Email-->>API: Sent
API-->>User: Success (1500ms)
Characteristics:
Pros:
Cons:
Request → Quick response → Process in background.
sequenceDiagram
participant User
participant API
participant Queue
participant Worker
participant Email
User->>API: Submit order
API->>Queue: Enqueue email task
API-->>User: Success (100ms)
Note over Worker: Process async
Worker->>Queue: Dequeue task
Worker->>Email: Send email
Characteristics:
Pros:
Cons:
Dùng SYNC khi:
1. User needs immediate result
# Login - must know if success/fail
def login(username, password):
user = authenticate(username, password) # Must wait
return user
2. Business logic requires
# Payment - must confirm before proceeding
def checkout(cart):
payment_result = charge_card(cart.total) # Must wait
if payment_result.success:
create_order()
3. Simple, fast operations
# Get user profile - fast query
def get_profile(user_id):
return db.get_user(user_id) # 10ms, không cần async
Dùng ASYNC khi:
1. Non-critical operations
# Send email - user doesn't need to wait
def register(email):
user = create_user(email)
send_welcome_email.delay(email) # Async
return user
2. Heavy/slow operations
# Video processing - can take minutes
def upload_video(file):
video = save_video(file)
process_video.delay(video.id) # Async
return video
3. External service calls
# 3rd party APIs - unpredictable latency
def create_order(order):
save_order(order)
notify_warehouse.delay(order.id) # Async
update_analytics.delay(order) # Async
return order
4. Fan-out operations
# Notify many users - can be slow
def post_update(post):
save_post(post)
notify_followers.delay(post.id) # Async (may be 10k users)
return post
Message queue = buffer giữa producer và consumer.
flowchart LR
subgraph Producers
API1[API Server 1]
API2[API Server 2]
end
Queue[(Message Queue)]
subgraph Consumers
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
API1 -->|Enqueue| Queue
API2 -->|Enqueue| Queue
Queue -->|Dequeue| W1
Queue -->|Dequeue| W2
Queue -->|Dequeue| W3
Producer:
Queue:
Consumer/Worker:
RabbitMQ:
Redis (with Sidekiq/Bull):
AWS SQS:
Kafka:
Comparison:
| Feature | RabbitMQ | Redis | SQS | Kafka |
|---|---|---|---|---|
| Throughput | High | Very High | High | Extreme |
| Latency | Low | Very Low | Medium | Low |
| Persistence | Yes | Optional | Yes | Yes |
| Ordering | Yes | Limited | FIFO queue | Yes |
| Complexity | Medium | Low | Low | High |
| Use Case | Task queues | Simple jobs | AWS apps | Event streams |
Enqueue (Producer):
# Python with Celery + Redis
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def send_email(to, subject, body):
# Email sending logic
smtp.send(to, subject, body)
# Enqueue task
send_email.delay('user@example.com', 'Welcome', 'Hello!')
Dequeue (Consumer):
# Worker processes tasks automatically
# Start worker:
# celery -A tasks worker --loglevel=info
# Celery handles:
# - Dequeue messages
# - Call task function
# - Retry on failure
# - Acknowledge on success
Manual dequeue (Redis example):
import redis
r = redis.Redis()
while True:
# Blocking pop (wait for messages)
task = r.brpop('email_queue', timeout=5)
if task:
queue_name, message = task
process_email(message)
Execute task sau một khoảng thời gian.
# Send reminder email after 24 hours
send_reminder_email.apply_async(
args=[user.id],
countdown=86400 # 24 hours in seconds
)
Use cases:
Execute task theo schedule định kỳ.
# Celery beat - periodic tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-old-data': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
'send-daily-report': {
'task': 'tasks.send_daily_report',
'schedule': crontab(hour=9, minute=0, day_of_week=1), # Monday 9 AM
},
}
Use cases:
Process nhiều items cùng lúc.
# Process batch of emails
@app.task
def send_batch_emails(email_ids):
emails = Email.objects.filter(id__in=email_ids)
for email in emails:
send_email(email)
# Split into batches
email_ids = [1, 2, 3, ..., 10000]
batch_size = 100
for i in range(0, len(email_ids), batch_size):
batch = email_ids[i:i+batch_size]
send_batch_emails.delay(batch)
Benefits:
Tasks execute in sequence.
from celery import chain
# Task chain
workflow = chain(
resize_image.s(image_id),
apply_watermark.s(),
upload_to_cdn.s(),
update_database.s()
)
workflow.apply_async()
Each task output = next task input.
Parallel execution, then aggregate.
from celery import group, chord
# Process multiple images in parallel
job = chord(
group(process_image.s(img_id) for img_id in image_ids),
aggregate_results.s() # Called after all finish
)
job.apply_async()
Use cases:
Failures happen. Network timeout, service down, rate limits.
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60 # 60 seconds
)
def send_email(self, to, subject, body):
try:
smtp.send(to, subject, body)
except SMTPException as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Exponential backoff:
Retry 1: wait 2^1 = 2 seconds
Retry 2: wait 2^2 = 4 seconds
Retry 3: wait 2^3 = 8 seconds
Prevents overwhelming failing service.
Retry chỉ với specific errors.
@app.task(bind=True, max_retries=5)
def call_external_api(self, url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.Timeout as exc:
# Retry on timeout
raise self.retry(exc=exc, countdown=30)
except requests.HTTPError as exc:
if exc.response.status_code == 429: # Rate limited
raise self.retry(exc=exc, countdown=120)
elif exc.response.status_code >= 500: # Server error
raise self.retry(exc=exc)
else:
# 4xx errors - don't retry
raise
Logic:
Don't retry forever.
@app.task(
max_retries=10,
retry_backoff=True,
retry_backoff_max=600, # Max 10 minutes
retry_jitter=True # Add randomness
)
def risky_task():
# Task logic
pass
After max retries → task fails → moves to DLQ.
DLQ = queue for messages that failed permanently.
flowchart TD
Producer[Producer] -->|Enqueue| MainQueue[Main Queue]
MainQueue --> Worker[Worker]
Worker -->|Success| Done[Complete]
Worker -->|Retry| MainQueue
Worker -->|Max retries exceeded| DLQ[Dead Letter Queue]
DLQ --> Manual[Manual Investigation]
DLQ --> Replay[Replay to Main Queue]
1. Don't lose messages
Failed tasks không biến mất. Store in DLQ for investigation.
2. Prevent queue blocking
Failing message không block queue. Other messages process normally.
3. Enable debugging
Analyze failed messages, understand patterns, fix bugs.
4. Manual intervention
Admin có thể replay messages sau khi fix issue.
Celery + RabbitMQ:
from celery import Celery
app = Celery('tasks')
app.conf.task_reject_on_worker_lost = True
app.conf.task_acks_late = True
@app.task(
bind=True,
max_retries=3,
dead_letter_exchange='dlx',
dead_letter_routing_key='dlq'
)
def problematic_task(self):
# Task logic
pass
AWS SQS:
import boto3
sqs = boto3.client('sqs')
# Create main queue with DLQ
response = sqs.create_queue(
QueueName='main-queue',
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': '3' # After 3 attempts → DLQ
})
}
)
Track metrics:
- DLQ size (messages count)
- DLQ message age
- Failure reasons distribution
- Failure rate trends
Alert when:
Periodic review:
def review_dlq():
messages = dlq.get_messages(limit=100)
for msg in messages:
# Analyze failure reason
if is_fixable(msg):
fix_and_replay(msg)
else:
log_permanent_failure(msg)
Idempotent = executing multiple times produces same result as executing once.
Scenario:
1. User submits order
2. API enqueues "charge_payment" task
3. Worker processes task, charges card
4. Network timeout before ack
5. Queue redelivers task
6. Worker processes again → charges card TWICE! 💸
User charged 2x. Very bad.
Method 1: Idempotency Keys
@app.task
def charge_payment(order_id, amount, idempotency_key):
# Check if already processed
if Payment.objects.filter(idempotency_key=idempotency_key).exists():
print("Already processed, skipping")
return
# Process payment
result = stripe.charge(amount, idempotency_key=idempotency_key)
# Save with idempotency key
Payment.objects.create(
order_id=order_id,
amount=amount,
idempotency_key=idempotency_key,
transaction_id=result.id
)
Stripe API cũng support idempotency keys - double protection.
Method 2: Database Constraints
# Unique constraint on database
class Payment(models.Model):
order_id = models.IntegerField()
transaction_id = models.CharField(max_length=100, unique=True)
@app.task
def charge_payment(order_id):
try:
result = stripe.charge(...)
Payment.objects.create(
order_id=order_id,
transaction_id=result.id # Unique - prevents duplicates
)
except IntegrityError:
# Duplicate transaction - already processed
print("Already processed")
Method 3: Atomic Operations
# Use database atomic operations
@app.task
def increment_view_count(post_id):
# Idempotent - can run multiple times safely
Post.objects.filter(id=post_id).update(
view_count=F('view_count') + 1
)
Method 4: State Machines
class Order(models.Model):
status = models.CharField(
choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
]
)
@app.task
def process_order(order_id):
order = Order.objects.get(id=order_id)
# Only process if pending
if order.status != 'pending':
print("Already processed")
return
# Update status atomically
updated = Order.objects.filter(
id=order_id,
status='pending'
).update(status='processing')
if updated == 0:
print("Another worker already processing")
return
# Process order...
order.status = 'completed'
order.save()
State transitions enforce idempotency.
1. Always use idempotency keys for financial operations
import uuid
idempotency_key = str(uuid.uuid4())
charge_payment.delay(order_id, amount, idempotency_key)
2. Generate idempotency key at producer, not consumer
Producer ensures same key for retries.
3. Store processed keys with TTL
# Redis
redis.setex(f"processed:{idempotency_key}", 86400, "1") # 24h TTL
4. Make all external API calls idempotent
Stripe, payment gateways, email services all support idempotency keys.
Async processing naturally leads to event-driven thinking.
Request-driven (traditional):
def create_order(order_data):
order = save_order(order_data)
charge_payment(order)
send_confirmation(order)
notify_warehouse(order)
update_analytics(order)
return order
Tight coupling. Order creation knows về all downstream systems.
Event-driven:
def create_order(order_data):
order = save_order(order_data)
# Publish event
publish_event('order.created', {
'order_id': order.id,
'user_id': order.user_id,
'total': order.total
})
return order
# Separate subscribers
@subscribe('order.created')
def handle_payment(event):
charge_payment(event['order_id'])
@subscribe('order.created')
def send_email(event):
send_confirmation(event['order_id'])
@subscribe('order.created')
def notify_warehouse(event):
notify_warehouse(event['order_id'])
Benefits:
1. Decoupling
Order service không biết về email service, warehouse service.
2. Scalability
Mỗi subscriber scale independently.
3. Flexibility
Add new subscribers without changing order service.
4. Resilience
1 subscriber fail không affect others.
{
"event_id": "evt_abc123",
"event_type": "order.created",
"timestamp": "2025-02-15T10:00:00Z",
"version": "1.0",
"data": {
"order_id": 12345,
"user_id": 789,
"total": 99.99
}
}
Include:
1. Event Notification
Thông báo something happened. Subscribers fetch details nếu cần.
publish_event('user.registered', {'user_id': 123})
2. Event-Carried State Transfer
Event chứa all data subscribers cần.
publish_event('user.registered', {
'user_id': 123,
'email': 'user@example.com',
'name': 'John Doe',
'signup_date': '2025-02-15'
})
3. Event Sourcing
Store all events as source of truth. Current state = replay events.
events = [
{'type': 'account.created', 'balance': 1000},
{'type': 'transaction.deposit', 'amount': 500},
{'type': 'transaction.withdraw', 'amount': 200},
]
# Current balance = 1000 + 500 - 200 = 1300
Don't over-engineer. Begin with basic background jobs.
# Simple Celery task
@app.task
def send_email(to, subject, body):
smtp.send(to, subject, body)
# Usage
send_email.delay('user@example.com', 'Welcome', 'Hello!')
Add complexity only khi cần:
Track metrics:
# Queue depth
queue_size = celery.control.inspect().stats()
# Task success/failure rate
from celery.events import EventReceiver
def monitor_events():
receiver = EventReceiver(app.connection())
for event in receiver.events():
if event['type'] == 'task-succeeded':
metrics.increment('task.success')
elif event['type'] == 'task-failed':
metrics.increment('task.failure')
Alert when:
Assume tasks will fail. Plan accordingly.
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def resilient_task(self, data):
try:
# Task logic with timeout
result = process_with_timeout(data, timeout=30)
return result
except TimeoutError as exc:
# Retry on timeout
raise self.retry(exc=exc)
except Exception as exc:
# Log error for investigation
logger.error(f"Task failed: {exc}", extra={'data': data})
raise
Testing async code harder than sync.
# Test with eager mode (synchronous for testing)
app.conf.task_always_eager = True
def test_order_creation():
order = create_order(order_data)
# Tasks execute synchronously in tests
assert email_was_sent(order.user.email)
assert payment_was_charged(order.id)
Integration tests:
def test_queue_integration():
# Send task to real queue
result = send_email.delay('test@example.com', 'Test', 'Body')
# Wait for completion
assert result.get(timeout=10) == 'sent'
Queue down không nên crash toàn bộ system.
def create_order(order_data):
order = save_order(order_data)
try:
send_confirmation.delay(order.id)
except ConnectionError:
# Queue unavailable - fallback to sync
logger.warning("Queue unavailable, sending email synchronously")
send_confirmation_sync(order.id)
return order
Core insight:
User không cần đợi work không critical. Defer nó để improve experience.
Sync mindset:
Complete everything before responding
User experience = slow
Async mindset:
Complete critical work only
Defer rest to background
User experience = fast
Decision tree:
Does user need result immediately?
YES → Sync (e.g., login validation)
NO → Can defer?
YES → Async (e.g., send email)
NO → Sync (e.g., payment validation)
Architect thinking:
Great systems respond fast, work happens later.
1. Async processing reduces user-facing latency
User gets response fast, work happens in background.
2. Message queues decouple producers from consumers
Scale independently, handle spikes, improve resilience.
3. Choose queue system based on requirements
Redis = simple/fast, RabbitMQ = feature-rich, SQS = managed, Kafka = streaming.
4. Retry strategies handle transient failures
Exponential backoff, conditional retry, max retries.
5. DLQ captures permanent failures
Don't lose messages, enable investigation, allow replay.
6. Idempotency critical for reliability
Use idempotency keys, database constraints, atomic operations.
7. Event-driven architecture = extreme decoupling
Services communicate via events, not direct calls.
8. Monitor queue health continuously
Queue depth, task success rate, processing time.
9. Design for failure
Tasks will fail. Have retry logic, DLQ, fallbacks.
10. Test async flows thoroughly
Use eager mode for unit tests, real queue for integration.
Remember: The best user experience is fast response. Defer non-critical work to background. Users don't need to wait for emails to be sent or analytics to be updated - they need to see "Order Successful" immediately.