Event-Driven Architecture
Событийная архитектура для микросервисов. Слабая связанность, высокая масштабируемость, реактивность в реальном времени.
📡 Что такое Event-Driven Architecture
Event-Driven Architecture (EDA) — архитектурный паттерн, где компоненты системы общаются через события. Вместо прямых вызовов (запрос → ответ) сервисы публикуют события о том, что произошло, а заинтересованные сервисы на них реагируют.
Синхронный подход (REST): Вы звоните в пиццерию, ждёте на линии,
пока примут заказ, подтвердят оплату, назначат курьера. Вы "заблокированы" всё это время.
Событийный подход (EDA): Вы отправляете заказ в приложении и идёте по своим делам.
Вам приходят уведомления: "Заказ принят", "Готовится", "Курьер выехал".
Каждое уведомление — это событие.
Synchronous vs Asynchronous
- ⏳ Клиент ждёт ответа
- 🔒 Сильная связанность сервисов
- 📉 Сбой одного → сбой цепочки
- 📊 Сложнее масштабировать
- ✅ Простая отладка
- ✅ Немедленный результат
- ⚡ Fire-and-forget
- 🔓 Слабая связанность
- 📈 Отказоустойчивость
- 🚀 Легко масштабируется
- ⚠️ Сложнее отладка
- ⚠️ Eventual consistency
🤔 Зачем нужна EDA
В монолите всё просто: один сервис вызывает метод другого напрямую. Но когда у вас 10, 50, 100 микросервисов — прямые вызовы превращаются в кошмар:
Каждый сервис знает о каждом. Orders вызывает Payments, Inventory, Notifications напрямую.
Если Notifications упал — Orders ждёт таймаут. Добавить новый сервис = менять все остальные.
Преимущества EDA
- Слабая связанность — сервисы не знают друг о друге, только о событиях
- Масштабируемость — добавляйте consumer'ов без изменения producer'а
- Отказоустойчивость — если consumer упал, события ждут в очереди
- Расширяемость — новый сервис просто подписывается на нужные события
- Реальное время — мгновенная реакция на изменения
- Аудит — история всех событий хранится в брокере
🧩 Компоненты системы
Producer (Издатель)
Сервис, который создаёт события. Producer не знает, кто будет обрабатывать событие —
он просто публикует факт: OrderCreated, UserRegistered, PaymentCompleted.
Event (Событие)
Неизменяемая запись о факте, который произошёл в системе. Событие содержит всю необходимую информацию для его обработки.
{
"event_id": "evt_abc123",
"event_type": "OrderCreated",
"timestamp": "2024-01-15T10:30:00Z",
"source": "order-service",
"data": {
"order_id": "ord_456",
"user_id": "usr_789",
"items": [...],
"total": 99.99
},
"metadata": {
"correlation_id": "req_xyz",
"version": 1
}
}
Broker (Брокер сообщений)
Посредник, который принимает события от producer'ов, хранит их и доставляет consumer'ам. Примеры: Apache Kafka, RabbitMQ, Redis Streams.
Consumer (Подписчик)
Сервис, который подписан на определённые типы событий и реагирует на них. Один consumer может слушать несколько типов событий, и наоборот — на одно событие могут быть подписаны несколько consumer'ов.
🎨 Паттерны EDA
↳ PaymentService ✓
↳ InventoryService ✓
↳ EmailService ✓
↳ Worker1 берёт Task1
↳ Worker2 берёт Task2
↳ Worker3 берёт Task3
[Created: $0]
[Deposited: +$100]
[Withdrawn: -$30]
→ Current Balance: $70
Commands: CreateOrder, UpdateOrder
Queries: GetOrders, GetOrderById
1. ReserveInventory
2. ProcessPayment
3. ShipOrder
❌ → RollbackPayment, ReleaseInventory
Consumer:
GET /orders/123 → full details
🔧 Брокеры сообщений
Брокер — сердце событийной архитектуры. Выбор брокера зависит от требований: пропускная способность, гарантии доставки, сложность настройки.
- Огромная пропускная способность
- Хранение на диске (retention)
- Replay событий
- Consumer groups
- Отлично для Big Data
- Гибкая маршрутизация
- Подтверждения доставки (ACK)
- Dead Letter Queues
- Простая настройка
- Множество плагинов
- Минимальная задержка
- Consumer groups
- Уже есть Redis? Бонус!
- Простое API
- Идеально для real-time
Когда что выбрать?
Apache Kafka — Big Data, аналитика, event sourcing, нужен replay событий, высокая нагрузка.
RabbitMQ — традиционные очереди задач, сложная маршрутизация, гарантии доставки, интеграции.
Redis Streams — уже используете Redis, нужна минимальная задержка, простые сценарии, real-time.
✏️ Дизайн событий
Правильный дизайн событий — основа успешной EDA. Плохо спроектированные события создадут больше проблем, чем решат.
Правила именования
# ✅ Хорошо: прошедшее время (факт уже произошёл) OrderCreated PaymentCompleted UserRegistered InventoryReserved EmailSent # ❌ Плохо: императив (команда, а не событие) CreateOrder # Это команда, не событие ProcessPayment # Команда SendEmail # Команда # ❌ Плохо: слишком общие OrderUpdated # Что именно обновилось? DataChanged # Какие данные? # ✅ Лучше: конкретные события OrderShipped OrderCancelled OrderItemAdded OrderAddressChanged
Структура события
{
// === МЕТАДАННЫЕ (обязательные) ===
"event_id": "evt_a1b2c3d4", // Уникальный ID (UUID)
"event_type": "OrderCreated", // Тип события
"event_version": 1, // Версия схемы события
"timestamp": "2024-01-15T10:30:00.000Z", // ISO 8601
"source": "order-service", // Кто создал событие
// === ТРАССИРОВКА ===
"correlation_id": "req_xyz789", // Связь с исходным запросом
"causation_id": "evt_prev123", // ID события-причины
// === ПОЛЕЗНАЯ НАГРУЗКА ===
"data": {
"order_id": "ord_456",
"user_id": "usr_789",
"items": [
{
"product_id": "prod_111",
"quantity": 2,
"price": 29.99
}
],
"total": 59.98,
"currency": "USD",
"shipping_address": {
"city": "Moscow",
"street": "Tverskaya 1"
}
}
}
Fat Events vs Thin Events
- ✅ Вся информация в событии
- ✅ Consumer автономен
- ✅ Нет доп. запросов
- ⚠️ Больший размер
- ⚠️ Дублирование данных
- ✅ Минимальный размер
- ✅ Актуальные данные
- ⚠️ Доп. запросы к API
- ⚠️ Связанность
- ⚠️ Данные могут измениться
Используйте Fat Events для важных бизнес-событий.
Consumer должен иметь всё необходимое для обработки без дополнительных запросов.
Thin Events подходят для уведомлений, где актуальность данных важнее автономности.
💻 Практический пример
Создадим простую событийную систему для интернет-магазина: при создании заказа отправляем событие, которое обрабатывают несколько сервисов.
Шаг 1: Event Bus (Шина событий)
import json import redis from datetime import datetime from uuid import uuid4 from typing import Callable, Dict, Any from pydantic import BaseModel, Field from abc import ABC, abstractmethod # ═══════════════════════════════════════════════════════════════ # БАЗОВЫЙ КЛАСС СОБЫТИЯ # ═══════════════════════════════════════════════════════════════ class Event(BaseModel): """Базовый класс для всех событий""" event_id: str = Field(default_factory=lambda: str(uuid4())) event_type: str = "" timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat() + "Z") source: str = "unknown" correlation_id: str | None = None data: Dict[str, Any] = {} def __init__(self, **kwargs): super().__init__(**kwargs) if not self.event_type: self.event_type = self.__class__.__name__ # ═══════════════════════════════════════════════════════════════ # EVENT BUS (REDIS) # ═══════════════════════════════════════════════════════════════ class EventBus: """Шина событий на Redis Streams""" def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis = redis.from_url(redis_url, decode_responses=True) self.handlers: Dict[str, list[Callable]] = {} def publish(self, event: Event, stream: str = "events") -> str: """Опубликовать событие в stream""" event_data = event.model_dump_json() # Добавляем в Redis Stream message_id = self.redis.xadd(stream, { "event_type": event.event_type, "data": event_data }) print(f"📤 Published: {event.event_type} [{message_id}]") return message_id def subscribe(self, event_type: str): """Декоратор для подписки на события""" def decorator(handler: Callable): if event_type not in self.handlers: self.handlers[event_type] = [] self.handlers[event_type].append(handler) print(f"📥 Subscribed: {handler.__name__} → {event_type}") return handler return decorator def consume(self, stream: str = "events", group: str = "default", consumer: str = "consumer-1"): """Слушать события из stream (Consumer Group)""" # Создаём consumer group если не существует try: self.redis.xgroup_create(stream, group, id="0", mkstream=True) except redis.ResponseError: pass # Группа уже существует print(f"🎧 Listening on '{stream}' as '{consumer}'...") while True: # Читаем новые сообщения messages = self.redis.xreadgroup( group, consumer, {stream: ">"}, count=10, block=5000 ) for stream_name, stream_messages in messages: for message_id, data in stream_messages: event_type = data.get("event_type") event_data = json.loads(data.get("data", "{}")) # Вызываем обработчики if event_type in self.handlers: for handler in self.handlers[event_type]: try: handler(event_data) # Подтверждаем обработку self.redis.xack(stream, group, message_id) except Exception as e: print(f"❌ Error: {e}") # Глобальный экземпляр event_bus = EventBus()
Шаг 2: Определяем события
from event_bus import Event from typing import List from pydantic import BaseModel # ═══════════════════════════════════════════════════════════════ # ВСПОМОГАТЕЛЬНЫЕ МОДЕЛИ # ═══════════════════════════════════════════════════════════════ class OrderItem(BaseModel): product_id: str product_name: str quantity: int price: float class Address(BaseModel): city: str street: str postal_code: str # ═══════════════════════════════════════════════════════════════ # СОБЫТИЯ # ═══════════════════════════════════════════════════════════════ class OrderCreated(Event): """Заказ создан""" source: str = "order-service" def __init__( self, order_id: str, user_id: str, user_email: str, items: List[dict], total: float, shipping_address: dict, **kwargs ): super().__init__( data={ "order_id": order_id, "user_id": user_id, "user_email": user_email, "items": items, "total": total, "shipping_address": shipping_address }, **kwargs ) class PaymentProcessed(Event): """Оплата обработана""" source: str = "payment-service" def __init__(self, order_id: str, amount: float, status: str, **kwargs): super().__init__( data={"order_id": order_id, "amount": amount, "status": status}, **kwargs ) class InventoryReserved(Event): """Товар зарезервирован""" source: str = "inventory-service" def __init__(self, order_id: str, items: List[dict], **kwargs): super().__init__( data={"order_id": order_id, "reserved_items": items}, **kwargs ) class NotificationSent(Event): """Уведомление отправлено""" source: str = "notification-service" def __init__(self, recipient: str, channel: str, template: str, **kwargs): super().__init__( data={"recipient": recipient, "channel": channel, "template": template}, **kwargs )
Шаг 3: Producer (Order Service)
from event_bus import event_bus from events import OrderCreated from uuid import uuid4 class OrderService: """Сервис заказов — Producer событий""" def create_order(self, user_id: str, user_email: str, items: list, address: dict) -> str: """Создать заказ и опубликовать событие""" # 1. Создаём заказ (обычно сохраняем в БД) order_id = f"ord_{uuid4().hex[:8]}" total = sum(item["price"] * item["quantity"] for item in items) print(f"🛒 Order created: {order_id} | Total: ${total:.2f}") # 2. Публикуем событие event = OrderCreated( order_id=order_id, user_id=user_id, user_email=user_email, items=items, total=total, shipping_address=address ) event_bus.publish(event) return order_id # ═══════════════════════════════════════════════════════════════ # ДЕМОНСТРАЦИЯ # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": service = OrderService() # Создаём тестовый заказ order_id = service.create_order( user_id="usr_123", user_email="customer@example.com", items=[ {"product_id": "prod_1", "product_name": "Laptop", "quantity": 1, "price": 999.99}, {"product_id": "prod_2", "product_name": "Mouse", "quantity": 2, "price": 29.99} ], address={ "city": "Moscow", "street": "Tverskaya 1", "postal_code": "125009" } ) print(f"✅ Order {order_id} published!")
Шаг 4: Consumers (Подписчики)
from event_bus import event_bus import time # ═══════════════════════════════════════════════════════════════ # PAYMENT SERVICE # ═══════════════════════════════════════════════════════════════ @event_bus.subscribe("OrderCreated") def process_payment(event_data: dict): """Обработка оплаты при создании заказа""" data = event_data.get("data", {}) order_id = data.get("order_id") total = data.get("total") print(f"💳 [PaymentService] Processing payment for {order_id}...") time.sleep(0.5) # Имитация обработки print(f"💳 [PaymentService] ✅ Payment ${total:.2f} completed!") # ═══════════════════════════════════════════════════════════════ # INVENTORY SERVICE # ═══════════════════════════════════════════════════════════════ @event_bus.subscribe("OrderCreated") def reserve_inventory(event_data: dict): """Резервирование товара на складе""" data = event_data.get("data", {}) order_id = data.get("order_id") items = data.get("items", []) print(f"📦 [InventoryService] Reserving {len(items)} items for {order_id}...") for item in items: print(f" → {item['product_name']} x{item['quantity']}") time.sleep(0.3) print(f"📦 [InventoryService] ✅ Inventory reserved!") # ═══════════════════════════════════════════════════════════════ # NOTIFICATION SERVICE # ═══════════════════════════════════════════════════════════════ @event_bus.subscribe("OrderCreated") def send_confirmation_email(event_data: dict): """Отправка email подтверждения""" data = event_data.get("data", {}) order_id = data.get("order_id") email = data.get("user_email") print(f"📧 [NotificationService] Sending email to {email}...") time.sleep(0.2) print(f"📧 [NotificationService] ✅ Email sent: 'Order {order_id} confirmed!'") # ═══════════════════════════════════════════════════════════════ # ANALYTICS SERVICE # ═══════════════════════════════════════════════════════════════ @event_bus.subscribe("OrderCreated") def track_order_analytics(event_data: dict): """Сбор аналитики""" data = event_data.get("data", {}) total = data.get("total") items = data.get("items", []) print(f"📊 [AnalyticsService] Recording: ${total:.2f}, {len(items)} items") # ═══════════════════════════════════════════════════════════════ # ЗАПУСК CONSUMER # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": print("🚀 Starting event consumers...") print("=" * 50) # Запускаем прослушивание событий event_bus.consume(stream="events", group="order-processors", consumer="worker-1")
Запуск системы
🎯 Где применяется EDA
Реальные примеры
Netflix — обрабатывает триллионы событий в день для рекомендаций и аналитики.
Uber — real-time обновления позиции, ценообразование, матчинг водителей.
LinkedIn — Kafka был создан в LinkedIn для обработки активности пользователей.
Spotify — события прослушивания для рекомендаций и статистики.
Airbnb — бронирования, сообщения, платежи между гостями и хостами.
⚠️ Сложности и решения
• Оптимистичный UI
• Polling / WebSockets
• Saga для критичных операций
• Уникальный event_id
• Idempotency keys
• Проверка "уже обработано?"
• Партиции по ключу (Kafka)
• Sequence numbers
• Временны́е метки
• Correlation ID
• Distributed tracing (Jaeger)
• Централизованные логи
• Schema Registry
• Версионирование событий
• Backward compatibility
• DLQ для "мёртвых" событий
• Retry с backoff
• Алерты и мониторинг
• Простые CRUD-приложения — overkill, REST проще
• Нужен немедленный ответ — синхронный запрос быстрее
• Строгая консистентность — банковские транзакции требуют ACID
• Маленькая команда — сложность инфраструктуры может не окупиться
• Монолит — события внутри одного процесса не имеют смысла
❓ FAQ
• Сообщение обрабатывается одним consumer'ом
• Point-to-point коммуникация
• Фокус на доставке задач
Event-Driven Architecture:
• Событие получают все подписчики
• Pub/Sub модель
• Фокус на реакции на изменения
EDA — это архитектурный паттерн, который может использовать очереди как один из механизмов.
• Нужен replay событий (event sourcing)
• Высокая пропускная способность (100K+ msg/sec)
• Big Data, аналитика, стриминг
• Долгосрочное хранение событий
Выбирайте RabbitMQ, если:
• Классические очереди задач
• Сложная маршрутизация
• Нужны гарантии доставки (ACK)
• Проще настройка, меньше ресурсов
1. Идемпотентность:
Сделайте обработчик идемпотентным — повторный вызов не меняет результат.
2. Deduplication:
Храните обработанные event_id. Пропускайте дубликаты.
3. Transactional outbox:
Сохраняйте событие в БД в той же транзакции, что и бизнес-данные.
4. Kafka transactions:
Kafka поддерживает exactly-once между producer и consumer.
1. Версия в имени:
OrderCreatedV1, OrderCreatedV22. Поле version:
{"event_type": "OrderCreated", "version": 2}3. Schema Registry:
Confluent Schema Registry хранит версии схем.
Правила совместимости:
• Добавление полей — OK (backward compatible)
• Удаление полей — осторожно (forward compatible)
• Изменение типа — новая версия события
1. Unit-тесты:
Мокайте event bus. Проверяйте, что событие публикуется с правильными данными.
2. Integration-тесты:
Используйте Testcontainers для поднятия реального брокера.
3. Contract-тесты:
Pact или Spring Cloud Contract для проверки контрактов событий.
4. E2E-тесты:
Полный flow от producer до consumer. Используйте таймауты и polling.
1. Начните с одного use case:
Выберите простой сценарий — например, отправка email после регистрации.
2. Выберите простой брокер:
Redis Streams или RabbitMQ. Kafka — позже.
3. Определите события:
Event Storming с командой. Что происходит в системе?
4. Реализуйте producer и consumer:
Один сервис публикует, другой слушает.
5. Добавьте observability:
Логи, метрики, трейсинг. Без них отладка невозможна.
6. Масштабируйте:
Добавляйте новые события и сервисы постепенно.
📐 Полная архитектура EDA
╔═══════════════════════════════════════════════════════════════════════════════╗ ║ EVENT-DRIVEN MICROSERVICES ARCHITECTURE ║ ╚═══════════════════════════════════════════════════════════════════════════════╝ ┌─────────────┐ │ Client │ │ (Web/App) │ └──────┬──────┘ │ ▼ ┌─────────────┐ │ API Gateway │ └──────┬──────┘ │ ┌────────────────────────────┼────────────────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Order │ │ User │ │ Product │ │ Service │ │ Service │ │ Service │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ publish │ publish │ publish ▼ ▼ ▼ ═══════════════════════════════════════════════════════════════════════════════ ║ ║ ║ 📡 EVENT BROKER 📡 ║ ║ ║ ║ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ║ ║ │ orders.created │ │ users.created │ │ products.updated│ ║ ║ │ Topic │ │ Topic │ │ Topic │ ║ ║ └─────────────────┘ └─────────────────┘ └─────────────────┘ ║ ║ ║ ═══════════════════════════════════════════════════════════════════════════════ │ │ │ │ subscribe │ subscribe │ subscribe ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Payment │ │Notification │ │ Analytics │ │ Service │ │ Service │ │ Service │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Payment DB │ │ Email/Push │ │ ClickHouse│ └─────────────┘ └─────────────┘ └─────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ LEGEND │ ├─────────────────────────────────────────────────────────────────────────────┤ │ █ Producer - сервисы, публикующие события │ │ █ Consumer - сервисы, обрабатывающие события │ │ █ Broker - Kafka / RabbitMQ / Redis Streams │ │ █ Storage - базы данных, внешние системы │ │ █ Gateway - точка входа (Kong, Nginx, etc.) │ └─────────────────────────────────────────────────────────────────────────────┘
🔥 Пример с Apache Kafka
version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
from confluent_kafka import Producer import json from uuid import uuid4 from datetime import datetime class KafkaEventProducer: """Producer событий для Kafka""" def __init__(self, bootstrap_servers: str = "localhost:9092"): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'client.id': 'order-service' }) def _delivery_report(self, err, msg): """Callback при доставке сообщения""" if err: print(f'❌ Delivery failed: {err}') else: print(f'✅ Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}') def publish(self, topic: str, event_type: str, data: dict, key: str = None): """Опубликовать событие в Kafka""" event = { "event_id": str(uuid4()), "event_type": event_type, "timestamp": datetime.utcnow().isoformat() + "Z", "source": "order-service", "data": data } # Key определяет партицию (важно для порядка) message_key = key or event["event_id"] self.producer.produce( topic=topic, key=message_key.encode('utf-8'), value=json.dumps(event).encode('utf-8'), callback=self._delivery_report ) # Flush для гарантии отправки self.producer.flush() return event # ═══════════════════════════════════════════════════════════════ # ИСПОЛЬЗОВАНИЕ # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": producer = KafkaEventProducer() # Публикуем событие создания заказа event = producer.publish( topic="orders", event_type="OrderCreated", data={ "order_id": "ord_12345", "user_id": "usr_789", "items": [ {"product_id": "p1", "quantity": 2, "price": 29.99} ], "total": 59.98 }, key="ord_12345" # Все события заказа в одной партиции ) print(f"📤 Published: {event['event_type']}")
from confluent_kafka import Consumer, KafkaError import json class KafkaEventConsumer: """Consumer событий из Kafka""" def __init__( self, bootstrap_servers: str = "localhost:9092", group_id: str = "payment-service" ): self.consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'auto.offset.reset': 'earliest', # Читать с начала 'enable.auto.commit': False # Ручной commit }) self.handlers = {} def subscribe(self, event_type: str): """Декоратор для подписки на тип события""" def decorator(handler): self.handlers[event_type] = handler return handler return decorator def start(self, topics: list): """Запустить прослушивание топиков""" self.consumer.subscribe(topics) print(f"🎧 Listening to topics: {topics}") try: while True: msg = self.consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue print(f"❌ Error: {msg.error()}") continue # Парсим событие event = json.loads(msg.value().decode('utf-8')) event_type = event.get("event_type") print(f"📥 Received: {event_type} from {msg.topic()}") # Вызываем обработчик if event_type in self.handlers: try: self.handlers[event_type](event) # Подтверждаем обработку self.consumer.commit(asynchronous=False) except Exception as e: print(f"❌ Handler error: {e}") # Здесь можно отправить в DLQ else: print(f"⚠️ No handler for {event_type}") self.consumer.commit(asynchronous=False) except KeyboardInterrupt: print("👋 Shutting down...") finally: self.consumer.close() # ═══════════════════════════════════════════════════════════════ # PAYMENT SERVICE # ═══════════════════════════════════════════════════════════════ consumer = KafkaEventConsumer(group_id="payment-service") @consumer.subscribe("OrderCreated") def handle_order_created(event: dict): """Обработка нового заказа — инициация оплаты""" data = event.get("data", {}) order_id = data.get("order_id") total = data.get("total") print(f"💳 Processing payment for order {order_id}...") print(f"💳 Amount: ${total}") # Здесь логика оплаты... print(f"💳 ✅ Payment successful!") if __name__ == "__main__": consumer.start(topics=["orders"])
📊 Observability для EDA
Метрики:
• Количество событий в секунду (throughput)
• Consumer lag (отставание обработки)
• Время обработки события
• Количество ошибок / retries
• Размер очереди
Логи:
• event_id, correlation_id в каждом логе
• Структурированные логи (JSON)
• Централизованное хранение (ELK, Loki)
Трейсинг:
• Jaeger, Zipkin для distributed tracing
• Propagate trace context через события
• Визуализация пути события через систему
import logging import json from uuid import uuid4 from datetime import datetime from contextvars import ContextVar # Context для трейсинга correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='') class StructuredLogger: """Структурированный логгер с correlation_id""" def __init__(self, name: str): self.logger = logging.getLogger(name) self.service = name def _log(self, level: str, message: str, **extra): log_entry = { "timestamp": datetime.utcnow().isoformat() + "Z", "level": level, "service": self.service, "correlation_id": correlation_id_var.get(), "message": message, **extra } print(json.dumps(log_entry)) def info(self, message: str, **extra): self._log("INFO", message, **extra) def error(self, message: str, **extra): self._log("ERROR", message, **extra) def event_received(self, event: dict): """Логирование полученного события""" # Устанавливаем correlation_id из события corr_id = event.get("correlation_id") or event.get("event_id") correlation_id_var.set(corr_id) self._log("INFO", "Event received", event_type=event.get("event_type"), event_id=event.get("event_id") ) def event_processed(self, event: dict, duration_ms: float): """Логирование обработанного события""" self._log("INFO", "Event processed", event_type=event.get("event_type"), event_id=event.get("event_id"), duration_ms=duration_ms ) # Использование logger = StructuredLogger("payment-service") # При получении события # logger.event_received(event) # ... обработка ... # logger.event_processed(event, duration_ms=45.2)
📋 Шпаргалка по EDA
╔═══════════════════════════════════════════════════════════════════════════╗ ║ EVENT-DRIVEN ARCHITECTURE CHEAT SHEET ║ ╚═══════════════════════════════════════════════════════════════════════════╝ ┌─────────────────────────────────────────────────────────────────────────────┐ │ КОМПОНЕНТЫ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Producer → Создаёт и публикует события │ │ Event → Неизменяемый факт о произошедшем │ │ Broker → Хранит и маршрутизирует события (Kafka, RabbitMQ) │ │ Consumer → Подписывается и обрабатывает события │ │ Topic/Queue → Канал для определённого типа событий │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ ПАТТЕРНЫ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Pub/Sub → Один-ко-многим, все подписчики получают копию │ │ Message Queue → Point-to-point, сообщение = один consumer │ │ Event Sourcing → Состояние = сумма всех событий │ │ CQRS → Разделение записи и чтения │ │ Saga → Распределённая транзакция через события │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ СТРУКТУРА СОБЫТИЯ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ { │ │ "event_id": "uuid", // Уникальный ID │ │ "event_type": "OrderCreated", // Тип (прошедшее время!) │ │ "timestamp": "ISO8601", // Когда произошло │ │ "source": "order-service", // Кто создал │ │ "correlation_id": "req_id", // Для трейсинга │ │ "data": { ... } // Полезная нагрузка │ │ } │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ ВЫБОР БРОКЕРА │ ├─────────────────────────────────────────────────────────────────────────────┤ │ Kafka → High throughput, replay, event sourcing, Big Data │ │ RabbitMQ → Сложная маршрутизация, ACK, традиционные очереди │ │ Redis → Простота, low latency, уже используете Redis │ │ AWS SQS/SNS → Serverless, managed, AWS экосистема │ │ Google Pub/Sub → GCP экосистема, global, managed │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ ГАРАНТИИ ДОСТАВКИ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ At-most-once → Fire-and-forget, может потеряться │ │ At-least-once → Гарантия доставки, возможны дубликаты │ │ Exactly-once → Идеально, но сложно (idempotency + deduplication) │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ BEST PRACTICES │ ├─────────────────────────────────────────────────────────────────────────────┤ │ ✅ Именуйте события в прошедшем времени (OrderCreated, NOT CreateOrder) │ │ ✅ Включайте correlation_id для трейсинга │ │ ✅ Делайте consumer'ы идемпотентными │ │ ✅ Версионируйте схемы событий │ │ ✅ Используйте Dead Letter Queue для ошибок │ │ ✅ Мониторьте consumer lag │ │ ❌ Не кладите огромные payload'ы в события │ │ ❌ Не полагайтесь на порядок событий между партициями │ └─────────────────────────────────────────────────────────────────────────────┘
1. Начните с простого — Redis Streams или RabbitMQ для первого проекта
2. Определите события — проведите Event Storming с командой
3. Реализуйте один flow — например, Order → Payment → Notification
4. Добавьте observability — логи, метрики, трейсинг
5. Обработайте edge cases — дубликаты, ошибки, DLQ
6. Масштабируйте — переходите на Kafka при росте нагрузки
Event-Driven Architecture — мощный паттерн для построения масштабируемых,
отказоустойчивых микросервисных систем. Но помните:
не каждой системе нужна EDA. Начинайте просто!