Архитектурный паттерн
📡
🛒
💳
📦
📧

Event-Driven Architecture

Событийная архитектура для микросервисов. Слабая связанность, высокая масштабируемость, реактивность в реальном времени.

Kafka RabbitMQ Redis Streams Pub/Sub

📡 Что такое Event-Driven Architecture

Event-Driven Architecture (EDA) — архитектурный паттерн, где компоненты системы общаются через события. Вместо прямых вызовов (запрос → ответ) сервисы публикуют события о том, что произошло, а заинтересованные сервисы на них реагируют.

🎯 Простая аналогия

Синхронный подход (REST): Вы звоните в пиццерию, ждёте на линии, пока примут заказ, подтвердят оплату, назначат курьера. Вы "заблокированы" всё это время.

Событийный подход (EDA): Вы отправляете заказ в приложении и идёте по своим делам. Вам приходят уведомления: "Заказ принят", "Готовится", "Курьер выехал". Каждое уведомление — это событие.

Synchronous vs Asynchronous

🔗
Синхронный (REST/RPC)
  • Клиент ждёт ответа
  • 🔒 Сильная связанность сервисов
  • 📉 Сбой одного → сбой цепочки
  • 📊 Сложнее масштабировать
  • Простая отладка
  • Немедленный результат
📡
Асинхронный (Events)
  • Fire-and-forget
  • 🔓 Слабая связанность
  • 📈 Отказоустойчивость
  • 🚀 Легко масштабируется
  • ⚠️ Сложнее отладка
  • ⚠️ Eventual consistency

🤔 Зачем нужна EDA

В монолите всё просто: один сервис вызывает метод другого напрямую. Но когда у вас 10, 50, 100 микросервисов — прямые вызовы превращаются в кошмар:

❌ Проблема: Спагетти из REST-вызовов
🛒
Orders
💳
Payments
📦
Inventory
📧
Notifications

Каждый сервис знает о каждом. Orders вызывает Payments, Inventory, Notifications напрямую.
Если Notifications упал — Orders ждёт таймаут. Добавить новый сервис = менять все остальные.

✅ Решение: Event-Driven Architecture
🛒
Orders
publish
📡
Event Broker
subscribe
💳
Payments
📦
Inventory
📧
Notifications

Преимущества EDA

  • Слабая связанность — сервисы не знают друг о друге, только о событиях
  • Масштабируемость — добавляйте consumer'ов без изменения producer'а
  • Отказоустойчивость — если consumer упал, события ждут в очереди
  • Расширяемость — новый сервис просто подписывается на нужные события
  • Реальное время — мгновенная реакция на изменения
  • Аудит — история всех событий хранится в брокере

🧩 Компоненты системы

Поток события в системе
1
Producer
Создаёт и публикует событие
2
Event
Факт о том, что произошло
3
Broker
Хранит и маршрутизирует
4
Consumer
Получает и обрабатывает

Producer (Издатель)

Сервис, который создаёт события. Producer не знает, кто будет обрабатывать событие — он просто публикует факт: OrderCreated, UserRegistered, PaymentCompleted.

Event (Событие)

Неизменяемая запись о факте, который произошёл в системе. Событие содержит всю необходимую информацию для его обработки.

ORDER OrderCreated
{
"event_id": "evt_abc123",
"event_type": "OrderCreated",
"timestamp": "2024-01-15T10:30:00Z",
"source": "order-service",
"data": {
    "order_id": "ord_456",
    "user_id": "usr_789",
    "items": [...],
    "total": 99.99
},
"metadata": {
    "correlation_id": "req_xyz",
    "version": 1
}
}

Broker (Брокер сообщений)

Посредник, который принимает события от producer'ов, хранит их и доставляет consumer'ам. Примеры: Apache Kafka, RabbitMQ, Redis Streams.

Consumer (Подписчик)

Сервис, который подписан на определённые типы событий и реагирует на них. Один consumer может слушать несколько типов событий, и наоборот — на одно событие могут быть подписаны несколько consumer'ов.

🎨 Паттерны EDA

📢
Pub/Sub
Publisher публикует событие в топик. Все подписчики получают копию. Один-ко-многим.
OrderService → [OrderCreated]
  ↳ PaymentService ✓
  ↳ InventoryService ✓
  ↳ EmailService ✓
📬
Message Queue
Сообщение обрабатывается только одним consumer'ом. Для распределения нагрузки. Point-to-point.
Tasks Queue → [Task1, Task2, Task3]
  ↳ Worker1 берёт Task1
  ↳ Worker2 берёт Task2
  ↳ Worker3 берёт Task3
📖
Event Sourcing
Состояние восстанавливается из последовательности событий. Полная история изменений.
Account Events:
  [Created: $0]
  [Deposited: +$100]
  [Withdrawn: -$30]
→ Current Balance: $70
🔀
CQRS
Command Query Responsibility Segregation. Разделение записи (commands) и чтения (queries).
Write Model → Events → Read Model

Commands: CreateOrder, UpdateOrder
Queries: GetOrders, GetOrderById
🎭
Saga Pattern
Распределённая транзакция через цепочку событий. Компенсирующие действия при ошибках.
Order Saga:
  1. ReserveInventory
  2. ProcessPayment
  3. ShipOrder
❌ → RollbackPayment, ReleaseInventory
🔔
Event Notification
Минимальное событие — только факт и ID. Получатель сам запрашивает детали.
Event: {type: "OrderCreated", id: "123"}

Consumer:
  GET /orders/123 → full details

🔧 Брокеры сообщений

Брокер — сердце событийной архитектуры. Выбор брокера зависит от требований: пропускная способность, гарантии доставки, сложность настройки.

Apache Kafka
Distributed streaming platform
1M+
msg/sec
Days
retention
  • Огромная пропускная способность
  • Хранение на диске (retention)
  • Replay событий
  • Consumer groups
  • Отлично для Big Data
RabbitMQ
Message broker
50K+
msg/sec
AMQP
protocol
  • Гибкая маршрутизация
  • Подтверждения доставки (ACK)
  • Dead Letter Queues
  • Простая настройка
  • Множество плагинов
Redis Streams
In-memory data store
100K+
msg/sec
RAM
storage
  • Минимальная задержка
  • Consumer groups
  • Уже есть Redis? Бонус!
  • Простое API
  • Идеально для real-time

Когда что выбрать?

🎯 Рекомендации

Apache Kafka — Big Data, аналитика, event sourcing, нужен replay событий, высокая нагрузка.

RabbitMQ — традиционные очереди задач, сложная маршрутизация, гарантии доставки, интеграции.

Redis Streams — уже используете Redis, нужна минимальная задержка, простые сценарии, real-time.

✏️ Дизайн событий

Правильный дизайн событий — основа успешной EDA. Плохо спроектированные события создадут больше проблем, чем решат.

Правила именования

Events Naming conventions
# ✅ Хорошо: прошедшее время (факт уже произошёл)
OrderCreated
PaymentCompleted
UserRegistered
InventoryReserved
EmailSent

# ❌ Плохо: императив (команда, а не событие)
CreateOrder        # Это команда, не событие
ProcessPayment     # Команда
SendEmail          # Команда

# ❌ Плохо: слишком общие
OrderUpdated       # Что именно обновилось?
DataChanged        # Какие данные?

# ✅ Лучше: конкретные события
OrderShipped
OrderCancelled
OrderItemAdded
OrderAddressChanged

Структура события

JSON event_schema.json
{
    // === МЕТАДАННЫЕ (обязательные) ===
    "event_id": "evt_a1b2c3d4",          // Уникальный ID (UUID)
    "event_type": "OrderCreated",       // Тип события
    "event_version": 1,                 // Версия схемы события
    "timestamp": "2024-01-15T10:30:00.000Z", // ISO 8601
    "source": "order-service",         // Кто создал событие

    // === ТРАССИРОВКА ===
    "correlation_id": "req_xyz789",    // Связь с исходным запросом
    "causation_id": "evt_prev123",    // ID события-причины

    // === ПОЛЕЗНАЯ НАГРУЗКА ===
    "data": {
        "order_id": "ord_456",
        "user_id": "usr_789",
        "items": [
            {
                "product_id": "prod_111",
                "quantity": 2,
                "price": 29.99
            }
        ],
        "total": 59.98,
        "currency": "USD",
        "shipping_address": {
            "city": "Moscow",
            "street": "Tverskaya 1"
        }
    }
}

Fat Events vs Thin Events

📦
Fat Events (Полные)
  • Вся информация в событии
  • Consumer автономен
  • Нет доп. запросов
  • ⚠️ Больший размер
  • ⚠️ Дублирование данных
📄
Thin Events (Уведомления)
  • Минимальный размер
  • Актуальные данные
  • ⚠️ Доп. запросы к API
  • ⚠️ Связанность
  • ⚠️ Данные могут измениться
💡 Рекомендация

Используйте Fat Events для важных бизнес-событий. Consumer должен иметь всё необходимое для обработки без дополнительных запросов.

Thin Events подходят для уведомлений, где актуальность данных важнее автономности.

💻 Практический пример

Создадим простую событийную систему для интернет-магазина: при создании заказа отправляем событие, которое обрабатывают несколько сервисов.

Terminal — Установка
$ pip install redis pydantic
# Запуск Redis (Docker)
$ docker run -d -p 6379:6379 redis:alpine

Шаг 1: Event Bus (Шина событий)

Python event_bus.py
import json
import redis
from datetime import datetime
from uuid import uuid4
from typing import Callable, Dict, Any
from pydantic import BaseModel, Field
from abc import ABC, abstractmethod

# ═══════════════════════════════════════════════════════════════
#                      БАЗОВЫЙ КЛАСС СОБЫТИЯ
# ═══════════════════════════════════════════════════════════════

class Event(BaseModel):
    """Базовый класс для всех событий"""
    event_id: str = Field(default_factory=lambda: str(uuid4()))
    event_type: str = ""
    timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
    source: str = "unknown"
    correlation_id: str | None = None
    data: Dict[str, Any] = {}

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if not self.event_type:
            self.event_type = self.__class__.__name__

# ═══════════════════════════════════════════════════════════════
#                       EVENT BUS (REDIS)
# ═══════════════════════════════════════════════════════════════

class EventBus:
    """Шина событий на Redis Streams"""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.handlers: Dict[str, list[Callable]] = {}

    def publish(self, event: Event, stream: str = "events") -> str:
        """Опубликовать событие в stream"""
        event_data = event.model_dump_json()

        # Добавляем в Redis Stream
        message_id = self.redis.xadd(stream, {
            "event_type": event.event_type,
            "data": event_data
        })

        print(f"📤 Published: {event.event_type} [{message_id}]")
        return message_id

    def subscribe(self, event_type: str):
        """Декоратор для подписки на события"""
        def decorator(handler: Callable):
            if event_type not in self.handlers:
                self.handlers[event_type] = []
            self.handlers[event_type].append(handler)
            print(f"📥 Subscribed: {handler.__name__} → {event_type}")
            return handler
        return decorator

    def consume(self, stream: str = "events", group: str = "default", consumer: str = "consumer-1"):
        """Слушать события из stream (Consumer Group)"""
        # Создаём consumer group если не существует
        try:
            self.redis.xgroup_create(stream, group, id="0", mkstream=True)
        except redis.ResponseError:
            pass  # Группа уже существует

        print(f"🎧 Listening on '{stream}' as '{consumer}'...")

        while True:
            # Читаем новые сообщения
            messages = self.redis.xreadgroup(
                group, consumer, {stream: ">"}, count=10, block=5000
            )

            for stream_name, stream_messages in messages:
                for message_id, data in stream_messages:
                    event_type = data.get("event_type")
                    event_data = json.loads(data.get("data", "{}"))

                    # Вызываем обработчики
                    if event_type in self.handlers:
                        for handler in self.handlers[event_type]:
                            try:
                                handler(event_data)
                                # Подтверждаем обработку
                                self.redis.xack(stream, group, message_id)
                            except Exception as e:
                                print(f"❌ Error: {e}")

# Глобальный экземпляр
event_bus = EventBus()

Шаг 2: Определяем события

Python events.py
from event_bus import Event
from typing import List
from pydantic import BaseModel

# ═══════════════════════════════════════════════════════════════
#                    ВСПОМОГАТЕЛЬНЫЕ МОДЕЛИ
# ═══════════════════════════════════════════════════════════════

class OrderItem(BaseModel):
    product_id: str
    product_name: str
    quantity: int
    price: float

class Address(BaseModel):
    city: str
    street: str
    postal_code: str

# ═══════════════════════════════════════════════════════════════
#                         СОБЫТИЯ
# ═══════════════════════════════════════════════════════════════

class OrderCreated(Event):
    """Заказ создан"""
    source: str = "order-service"

    def __init__(
        self,
        order_id: str,
        user_id: str,
        user_email: str,
        items: List[dict],
        total: float,
        shipping_address: dict,
        **kwargs
    ):
        super().__init__(
            data={
                "order_id": order_id,
                "user_id": user_id,
                "user_email": user_email,
                "items": items,
                "total": total,
                "shipping_address": shipping_address
            },
            **kwargs
        )

class PaymentProcessed(Event):
    """Оплата обработана"""
    source: str = "payment-service"

    def __init__(self, order_id: str, amount: float, status: str, **kwargs):
        super().__init__(
            data={"order_id": order_id, "amount": amount, "status": status},
            **kwargs
        )

class InventoryReserved(Event):
    """Товар зарезервирован"""
    source: str = "inventory-service"

    def __init__(self, order_id: str, items: List[dict], **kwargs):
        super().__init__(
            data={"order_id": order_id, "reserved_items": items},
            **kwargs
        )

class NotificationSent(Event):
    """Уведомление отправлено"""
    source: str = "notification-service"

    def __init__(self, recipient: str, channel: str, template: str, **kwargs):
        super().__init__(
            data={"recipient": recipient, "channel": channel, "template": template},
            **kwargs
        )

Шаг 3: Producer (Order Service)

Python order_service.py
from event_bus import event_bus
from events import OrderCreated
from uuid import uuid4

class OrderService:
    """Сервис заказов — Producer событий"""

    def create_order(self, user_id: str, user_email: str, items: list, address: dict) -> str:
        """Создать заказ и опубликовать событие"""

        # 1. Создаём заказ (обычно сохраняем в БД)
        order_id = f"ord_{uuid4().hex[:8]}"
        total = sum(item["price"] * item["quantity"] for item in items)

        print(f"🛒 Order created: {order_id} | Total: ${total:.2f}")

        # 2. Публикуем событие
        event = OrderCreated(
            order_id=order_id,
            user_id=user_id,
            user_email=user_email,
            items=items,
            total=total,
            shipping_address=address
        )

        event_bus.publish(event)

        return order_id

# ═══════════════════════════════════════════════════════════════
#                         ДЕМОНСТРАЦИЯ
# ═══════════════════════════════════════════════════════════════

if __name__ == "__main__":
    service = OrderService()

    # Создаём тестовый заказ
    order_id = service.create_order(
        user_id="usr_123",
        user_email="customer@example.com",
        items=[
            {"product_id": "prod_1", "product_name": "Laptop", "quantity": 1, "price": 999.99},
            {"product_id": "prod_2", "product_name": "Mouse", "quantity": 2, "price": 29.99}
        ],
        address={
            "city": "Moscow",
            "street": "Tverskaya 1",
            "postal_code": "125009"
        }
    )

    print(f"✅ Order {order_id} published!")

Шаг 4: Consumers (Подписчики)

Python consumers.py
from event_bus import event_bus
import time

# ═══════════════════════════════════════════════════════════════
#                      PAYMENT SERVICE
# ═══════════════════════════════════════════════════════════════

@event_bus.subscribe("OrderCreated")
def process_payment(event_data: dict):
    """Обработка оплаты при создании заказа"""
    data = event_data.get("data", {})
    order_id = data.get("order_id")
    total = data.get("total")

    print(f"💳 [PaymentService] Processing payment for {order_id}...")
    time.sleep(0.5)  # Имитация обработки
    print(f"💳 [PaymentService] ✅ Payment ${total:.2f} completed!")

# ═══════════════════════════════════════════════════════════════
#                     INVENTORY SERVICE
# ═══════════════════════════════════════════════════════════════

@event_bus.subscribe("OrderCreated")
def reserve_inventory(event_data: dict):
    """Резервирование товара на складе"""
    data = event_data.get("data", {})
    order_id = data.get("order_id")
    items = data.get("items", [])

    print(f"📦 [InventoryService] Reserving {len(items)} items for {order_id}...")

    for item in items:
        print(f"   → {item['product_name']} x{item['quantity']}")

    time.sleep(0.3)
    print(f"📦 [InventoryService] ✅ Inventory reserved!")

# ═══════════════════════════════════════════════════════════════
#                   NOTIFICATION SERVICE
# ═══════════════════════════════════════════════════════════════

@event_bus.subscribe("OrderCreated")
def send_confirmation_email(event_data: dict):
    """Отправка email подтверждения"""
    data = event_data.get("data", {})
    order_id = data.get("order_id")
    email = data.get("user_email")

    print(f"📧 [NotificationService] Sending email to {email}...")
    time.sleep(0.2)
    print(f"📧 [NotificationService] ✅ Email sent: 'Order {order_id} confirmed!'")

# ═══════════════════════════════════════════════════════════════
#                    ANALYTICS SERVICE
# ═══════════════════════════════════════════════════════════════

@event_bus.subscribe("OrderCreated")
def track_order_analytics(event_data: dict):
    """Сбор аналитики"""
    data = event_data.get("data", {})
    total = data.get("total")
    items = data.get("items", [])

    print(f"📊 [AnalyticsService] Recording: ${total:.2f}, {len(items)} items")

# ═══════════════════════════════════════════════════════════════
#                      ЗАПУСК CONSUMER
# ═══════════════════════════════════════════════════════════════

if __name__ == "__main__":
    print("🚀 Starting event consumers...")
    print("=" * 50)

    # Запускаем прослушивание событий
    event_bus.consume(stream="events", group="order-processors", consumer="worker-1")

Запуск системы

Terminal 1 — Consumer
$ python consumers.py
🚀 Starting event consumers... ================================================== 📥 Subscribed: process_payment → OrderCreated 📥 Subscribed: reserve_inventory → OrderCreated 📥 Subscribed: send_confirmation_email → OrderCreated 📥 Subscribed: track_order_analytics → OrderCreated 🎧 Listening on 'events' as 'worker-1'...
Terminal 2 — Producer
$ python order_service.py
🛒 Order created: ord_a1b2c3d4 | Total: $1059.97 📤 Published: OrderCreated [1705312200000-0] ✅ Order ord_a1b2c3d4 published!
Terminal 1 — Consumer (обработка)
💳 [PaymentService] Processing payment for ord_a1b2c3d4... 💳 [PaymentService] ✅ Payment $1059.97 completed! 📦 [InventoryService] Reserving 2 items for ord_a1b2c3d4... → Laptop x1 → Mouse x2 📦 [InventoryService] ✅ Inventory reserved! 📧 [NotificationService] Sending email to customer@example.com... 📧 [NotificationService] ✅ Email sent: 'Order ord_a1b2c3d4 confirmed!' 📊 [AnalyticsService] Recording: $1059.97, 2 items

🎯 Где применяется EDA

🛒
E-Commerce
Заказы, оплаты, доставка, уведомления
🏦
FinTech
Транзакции, fraud detection, отчёты
🎮
Gaming
Игровые события, лидерборды, матчмейкинг
📱
Social Media
Лайки, комментарии, нотификации
🚗
IoT / Automotive
Телеметрия, датчики, алерты
📊
Analytics
Real-time dashboards, метрики
🏥
Healthcare
Мониторинг пациентов, алерты
✈️
Travel
Бронирования, изменения рейсов

Реальные примеры

🌐 Кто использует EDA

Netflix — обрабатывает триллионы событий в день для рекомендаций и аналитики.

Uber — real-time обновления позиции, ценообразование, матчинг водителей.

LinkedIn — Kafka был создан в LinkedIn для обработки активности пользователей.

Spotify — события прослушивания для рекомендаций и статистики.

Airbnb — бронирования, сообщения, платежи между гостями и хостами.

⚠️ Сложности и решения

🔄
Eventual Consistency
Данные синхронизируются не мгновенно. Пользователь может увидеть устаревшие данные.
Решения:
• Оптимистичный UI
• Polling / WebSockets
• Saga для критичных операций
📋
Дублирование событий
Брокер может доставить событие дважды. Consumer должен быть идемпотентным.
Решения:
• Уникальный event_id
• Idempotency keys
• Проверка "уже обработано?"
🔀
Порядок событий
События могут прийти не в том порядке, в котором были созданы.
Решения:
• Партиции по ключу (Kafka)
• Sequence numbers
• Временны́е метки
🐛
Сложная отладка
Трудно отследить путь запроса через асинхронные сервисы.
Решения:
• Correlation ID
• Distributed tracing (Jaeger)
• Централизованные логи
📜
Эволюция схемы
Изменение структуры события может сломать consumer'ов.
Решения:
• Schema Registry
• Версионирование событий
• Backward compatibility
💀
Dead Letter Queue
Что делать с событиями, которые не удалось обработать?
Решения:
• DLQ для "мёртвых" событий
• Retry с backoff
• Алерты и мониторинг
⚠️ Когда НЕ использовать EDA

Простые CRUD-приложения — overkill, REST проще
Нужен немедленный ответ — синхронный запрос быстрее
Строгая консистентность — банковские транзакции требуют ACID
Маленькая команда — сложность инфраструктуры может не окупиться
Монолит — события внутри одного процесса не имеют смысла

FAQ

Message Queue (Очередь сообщений):
• Сообщение обрабатывается одним consumer'ом
• Point-to-point коммуникация
• Фокус на доставке задач

Event-Driven Architecture:
• Событие получают все подписчики
• Pub/Sub модель
• Фокус на реакции на изменения

EDA — это архитектурный паттерн, который может использовать очереди как один из механизмов.
Выбирайте Kafka, если:
• Нужен replay событий (event sourcing)
• Высокая пропускная способность (100K+ msg/sec)
• Big Data, аналитика, стриминг
• Долгосрочное хранение событий

Выбирайте RabbitMQ, если:
• Классические очереди задач
• Сложная маршрутизация
• Нужны гарантии доставки (ACK)
• Проще настройка, меньше ресурсов
Exactly-once семантика сложна. Решения:

1. Идемпотентность:
Сделайте обработчик идемпотентным — повторный вызов не меняет результат.

2. Deduplication:
Храните обработанные event_id. Пропускайте дубликаты.

3. Transactional outbox:
Сохраняйте событие в БД в той же транзакции, что и бизнес-данные.

4. Kafka transactions:
Kafka поддерживает exactly-once между producer и consumer.
Стратегии версионирования:

1. Версия в имени:
OrderCreatedV1, OrderCreatedV2

2. Поле version:
{"event_type": "OrderCreated", "version": 2}

3. Schema Registry:
Confluent Schema Registry хранит версии схем.

Правила совместимости:
• Добавление полей — OK (backward compatible)
• Удаление полей — осторожно (forward compatible)
• Изменение типа — новая версия события
Уровни тестирования:

1. Unit-тесты:
Мокайте event bus. Проверяйте, что событие публикуется с правильными данными.

2. Integration-тесты:
Используйте Testcontainers для поднятия реального брокера.

3. Contract-тесты:
Pact или Spring Cloud Contract для проверки контрактов событий.

4. E2E-тесты:
Полный flow от producer до consumer. Используйте таймауты и polling.
Пошаговый план:

1. Начните с одного use case:
Выберите простой сценарий — например, отправка email после регистрации.

2. Выберите простой брокер:
Redis Streams или RabbitMQ. Kafka — позже.

3. Определите события:
Event Storming с командой. Что происходит в системе?

4. Реализуйте producer и consumer:
Один сервис публикует, другой слушает.

5. Добавьте observability:
Логи, метрики, трейсинг. Без них отладка невозможна.

6. Масштабируйте:
Добавляйте новые события и сервисы постепенно.
📡 Event-Driven Architecture — Итоги
🎯
Суть
Сервисы общаются через события, не напрямую
Преимущества
Слабая связанность, масштабируемость, отказоустойчивость
🔧
Брокеры
Kafka, RabbitMQ, Redis Streams
🎨
Паттерны
Pub/Sub, Event Sourcing, CQRS, Saga
⚠️
Сложности
Eventual consistency, отладка, дубликаты
🎯
Когда использовать
Микросервисы, real-time, высокая нагрузка

📐 Полная архитектура EDA

Diagram Event-Driven Microservices Architecture
╔═══════════════════════════════════════════════════════════════════════════════╗
║                    EVENT-DRIVEN MICROSERVICES ARCHITECTURE                     ║
╚═══════════════════════════════════════════════════════════════════════════════╝

                        ┌─────────────┐
                        │   Client    │
                        │  (Web/App)  │
                        └──────┬──────┘
                               │
                               ▼
                        ┌─────────────┐
                        │ API Gateway │
                        └──────┬──────┘
                               │
  ┌────────────────────────────┼────────────────────────────┐
  │                            │                            │
  ▼                            ▼                            ▼
┌─────────────┐          ┌─────────────┐          ┌─────────────┐
│   Order     │          │   User      │          │  Product    │
│  Service    │          │  Service    │          │  Service    │
└──────┬──────┘          └──────┬──────┘          └──────┬──────┘
  │                            │                            │
  │ publishpublishpublish
  ▼                            ▼                            ▼
═══════════════════════════════════════════════════════════════════════════════
║                                                                               ║
║                          📡  EVENT BROKER  📡                                 ║
║                                                                               ║
║    ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐             ║
║    │  orders.created │  │  users.created  │  │ products.updated│             ║
║    │     Topic       │  │     Topic       │  │     Topic       │             ║
║    └─────────────────┘  └─────────────────┘  └─────────────────┘             ║
║                                                                               ║
═══════════════════════════════════════════════════════════════════════════════
  │                            │                            │
  │ subscribesubscribesubscribe
  ▼                            ▼                            ▼
┌─────────────┐          ┌─────────────┐          ┌─────────────┐
│  Payment    │          │Notification │          │  Analytics  │
│  Service    │          │  Service    │          │  Service    │
└─────────────┘          └─────────────┘          └─────────────┘
  │                            │                            │
  ▼                            ▼                            ▼
┌─────────────┐          ┌─────────────┐          ┌─────────────┐
│  Payment DB │          │ Email/Push  │          │   ClickHouse│
└─────────────┘          └─────────────┘          └─────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                              LEGEND                                         │
├─────────────────────────────────────────────────────────────────────────────┤
  █ Producer  - сервисы, публикующие события                                
  █ Consumer  - сервисы, обрабатывающие события                              
  █ Broker    - Kafka / RabbitMQ / Redis Streams                            
  █ Storage   - базы данных, внешние системы                                
  █ Gateway   - точка входа (Kong, Nginx, etc.)                             
└─────────────────────────────────────────────────────────────────────────────┘

🔥 Пример с Apache Kafka

Terminal — Установка Kafka
# Docker Compose для Kafka
$ pip install confluent-kafka
YAML docker-compose.yml
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Python kafka_producer.py
from confluent_kafka import Producer
import json
from uuid import uuid4
from datetime import datetime

class KafkaEventProducer:
"""Producer событий для Kafka"""

def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'client.id': 'order-service'
})

def _delivery_report(self, err, msg):
"""Callback при доставке сообщения"""
if err:
print(f'❌ Delivery failed: {err}')
else:
print(f'✅ Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

def publish(self, topic: str, event_type: str, data: dict, key: str = None):
"""Опубликовать событие в Kafka"""
event = {
"event_id": str(uuid4()),
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
"source": "order-service",
"data": data
}

# Key определяет партицию (важно для порядка)
message_key = key or event["event_id"]

self.producer.produce(
topic=topic,
key=message_key.encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
callback=self._delivery_report
)

# Flush для гарантии отправки
self.producer.flush()

return event

# ═══════════════════════════════════════════════════════════════
#                         ИСПОЛЬЗОВАНИЕ
# ═══════════════════════════════════════════════════════════════

if __name__ == "__main__":
producer = KafkaEventProducer()

# Публикуем событие создания заказа
event = producer.publish(
topic="orders",
event_type="OrderCreated",
data={
"order_id": "ord_12345",
"user_id": "usr_789",
"items": [
    {"product_id": "p1", "quantity": 2, "price": 29.99}
],
"total": 59.98
},
key="ord_12345"  # Все события заказа в одной партиции
)

print(f"📤 Published: {event['event_type']}")
Python kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import json

class KafkaEventConsumer:
"""Consumer событий из Kafka"""

def __init__(
self, 
bootstrap_servers: str = "localhost:9092",
group_id: str = "payment-service"
):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',  # Читать с начала
'enable.auto.commit': False       # Ручной commit
})
self.handlers = {}

def subscribe(self, event_type: str):
"""Декоратор для подписки на тип события"""
def decorator(handler):
self.handlers[event_type] = handler
return handler
return decorator

def start(self, topics: list):
"""Запустить прослушивание топиков"""
self.consumer.subscribe(topics)
print(f"🎧 Listening to topics: {topics}")

try:
while True:
    msg = self.consumer.poll(timeout=1.0)

    if msg is None:
        continue

    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        print(f"❌ Error: {msg.error()}")
        continue

    # Парсим событие
    event = json.loads(msg.value().decode('utf-8'))
    event_type = event.get("event_type")

    print(f"📥 Received: {event_type} from {msg.topic()}")

    # Вызываем обработчик
    if event_type in self.handlers:
        try:
            self.handlers[event_type](event)
            # Подтверждаем обработку
            self.consumer.commit(asynchronous=False)
        except Exception as e:
            print(f"❌ Handler error: {e}")
            # Здесь можно отправить в DLQ
    else:
        print(f"⚠️ No handler for {event_type}")
        self.consumer.commit(asynchronous=False)

except KeyboardInterrupt:
print("👋 Shutting down...")
finally:
self.consumer.close()

# ═══════════════════════════════════════════════════════════════
#                      PAYMENT SERVICE
# ═══════════════════════════════════════════════════════════════

consumer = KafkaEventConsumer(group_id="payment-service")

@consumer.subscribe("OrderCreated")
def handle_order_created(event: dict):
"""Обработка нового заказа — инициация оплаты"""
data = event.get("data", {})
order_id = data.get("order_id")
total = data.get("total")

print(f"💳 Processing payment for order {order_id}...")
print(f"💳 Amount: ${total}")

# Здесь логика оплаты...

print(f"💳 ✅ Payment successful!")

if __name__ == "__main__":
consumer.start(topics=["orders"])

📊 Observability для EDA

🔍 Что мониторить

Метрики:
• Количество событий в секунду (throughput)
• Consumer lag (отставание обработки)
• Время обработки события
• Количество ошибок / retries
• Размер очереди

Логи:
• event_id, correlation_id в каждом логе
• Структурированные логи (JSON)
• Централизованное хранение (ELK, Loki)

Трейсинг:
• Jaeger, Zipkin для distributed tracing
• Propagate trace context через события
• Визуализация пути события через систему

Python observability.py — Добавление трейсинга
import logging
import json
from uuid import uuid4
from datetime import datetime
from contextvars import ContextVar

# Context для трейсинга
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')

class StructuredLogger:
"""Структурированный логгер с correlation_id"""

def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.service = name

def _log(self, level: str, message: str, **extra):
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level,
"service": self.service,
"correlation_id": correlation_id_var.get(),
"message": message,
**extra
}
print(json.dumps(log_entry))

def info(self, message: str, **extra):
self._log("INFO", message, **extra)

def error(self, message: str, **extra):
self._log("ERROR", message, **extra)

def event_received(self, event: dict):
"""Логирование полученного события"""
# Устанавливаем correlation_id из события
corr_id = event.get("correlation_id") or event.get("event_id")
correlation_id_var.set(corr_id)

self._log("INFO", "Event received", 
event_type=event.get("event_type"),
event_id=event.get("event_id")
)

def event_processed(self, event: dict, duration_ms: float):
"""Логирование обработанного события"""
self._log("INFO", "Event processed",
event_type=event.get("event_type"),
event_id=event.get("event_id"),
duration_ms=duration_ms
)

# Использование
logger = StructuredLogger("payment-service")

# При получении события
# logger.event_received(event)
# ... обработка ...
# logger.event_processed(event, duration_ms=45.2)

📋 Шпаргалка по EDA

Cheat Sheet Event-Driven Architecture Quick Reference
╔═══════════════════════════════════════════════════════════════════════════╗
║                  EVENT-DRIVEN ARCHITECTURE CHEAT SHEET                     ║
╚═══════════════════════════════════════════════════════════════════════════╝

┌─────────────────────────────────────────────────────────────────────────────┐
│ КОМПОНЕНТЫ                                                                  │
├─────────────────────────────────────────────────────────────────────────────┤
 Producer      → Создаёт и публикует события                               
 Event         → Неизменяемый факт о произошедшем                          
 Broker        → Хранит и маршрутизирует события (Kafka, RabbitMQ)         
 Consumer      → Подписывается и обрабатывает события                      
 Topic/Queue   → Канал для определённого типа событий                      
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│ ПАТТЕРНЫ                                                                    │
├─────────────────────────────────────────────────────────────────────────────┤
 Pub/Sub           → Один-ко-многим, все подписчики получают копию        
 Message Queue     → Point-to-point, сообщение = один consumer             
 Event Sourcing    → Состояние = сумма всех событий                        
 CQRS              → Разделение записи и чтения                            
 Saga              → Распределённая транзакция через события               
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│ СТРУКТУРА СОБЫТИЯ                                                           │
├─────────────────────────────────────────────────────────────────────────────┤
  {                                                                          
    "event_id": "uuid",           // Уникальный ID                        
    "event_type": "OrderCreated", // Тип (прошедшее время!)               
    "timestamp": "ISO8601",       // Когда произошло                      
    "source": "order-service",    // Кто создал                           
    "correlation_id": "req_id",  // Для трейсинга                         
    "data": { ... }               // Полезная нагрузка                     
  }                                                                          
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│ ВЫБОР БРОКЕРА                                                               │
├─────────────────────────────────────────────────────────────────────────────┤
 Kafka       → High throughput, replay, event sourcing, Big Data          
 RabbitMQ    → Сложная маршрутизация, ACK, традиционные очереди           
 Redis       → Простота, low latency, уже используете Redis               
 AWS SQS/SNS → Serverless, managed, AWS экосистема                       
 Google Pub/Sub → GCP экосистема, global, managed                        
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│ ГАРАНТИИ ДОСТАВКИ                                                           │
├─────────────────────────────────────────────────────────────────────────────┤
 At-most-once   → Fire-and-forget, может потеряться                       
 At-least-once  → Гарантия доставки, возможны дубликаты                   
 Exactly-once   → Идеально, но сложно (idempotency + deduplication)       
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│ BEST PRACTICES                                                              │
├─────────────────────────────────────────────────────────────────────────────┤
 ✅ Именуйте события в прошедшем времени (OrderCreated, NOT CreateOrder)   
 ✅ Включайте correlation_id для трейсинга                                 
 ✅ Делайте consumer'ы идемпотентными                                      
 ✅ Версионируйте схемы событий                                            
 ✅ Используйте Dead Letter Queue для ошибок                               
 ✅ Мониторьте consumer lag                                                
 ❌ Не кладите огромные payload'ы в события                                
 ❌ Не полагайтесь на порядок событий между партициями                     
└─────────────────────────────────────────────────────────────────────────────┘
🚀 Следующие шаги

1. Начните с простого — Redis Streams или RabbitMQ для первого проекта
2. Определите события — проведите Event Storming с командой
3. Реализуйте один flow — например, Order → Payment → Notification
4. Добавьте observability — логи, метрики, трейсинг
5. Обработайте edge cases — дубликаты, ошибки, DLQ
6. Масштабируйте — переходите на Kafka при росте нагрузки

Event-Driven Architecture — мощный паттерн для построения масштабируемых, отказоустойчивых микросервисных систем. Но помните: не каждой системе нужна EDA. Начинайте просто!

© 2025 • Event-Driven Architecture Guide

Kafka • RabbitMQ • Redis Streams • Microservices • Pub/Sub