RabbitMQ — Полное руководство по брокеру сообщений
Что это, как работает, когда использовать. Примеры кода, сравнение с Kafka, плюсы и минусы — всё, что нужно для принятия решений.
🐰 Что такое RabbitMQ
RabbitMQ — это брокер сообщений (message broker), который позволяет приложениям обмениваться данными асинхронно. Думайте о нём как о почтовом отделении: вы отправляете письмо, оно хранится до тех пор, пока получатель не заберёт его.
Написан на Erlang — языке, созданном для телекоммуникаций, что даёт RabbitMQ отличную отказоустойчивость и способность обрабатывать тысячи одновременных соединений.
Развязать отправителя и получателя сообщений. Отправитель не знает, кто получит сообщение и когда. Получатель не знает, кто отправил. Это даёт гибкость, масштабируемость и отказоустойчивость.
Как это работает (простыми словами)
Представьте ресторан:
- Официант (Producer) — принимает заказ и кладёт его на стойку
- Стойка заказов (Queue) — место, где заказы ждут обработки
- Повар (Consumer) — берёт заказы со стойки и готовит
- RabbitMQ — это сама стойка + система распределения заказов
Официант не стоит и не ждёт, пока повар приготовит блюдо — он сразу идёт к следующему столику. Повар работает в своём темпе. Если поваров несколько — заказы распределяются между ними.
Кто использует RabbitMQ
Крупные компании: Reddit, 9GAG, Bloomberg, VMware, Mozilla
В России: Сбер, МТС, Авито, и многие другие
🧱 Ключевые концепции
Прежде чем писать код, важно понять базовые компоненты RabbitMQ:
Архитектура: как всё связано
Producer никогда не отправляет напрямую в Queue. Всегда через Exchange. Даже "простая" отправка использует default exchange (безымянный), который направляет по имени очереди.
🔀 Типы Exchange
Exchange — сердце маршрутизации в RabbitMQ. От типа Exchange зависит, как сообщения распределяются по очередям. Есть 4 основных типа:
payment.success
→ только в очередь с таким же binding
* (одно слово),
# (ноль или более слов).
order.*.success матчит
order.payment.success, order.shipping.success
format=pdf +
type=report → PDF-генератор
Какой Exchange выбрать?
Direct: Одно сообщение → один конкретный получатель
Fanout: Одно сообщение → всем подписчикам (pub/sub)
Topic: Сложная маршрутизация по категориям/иерархии
Headers: Когда routing key недостаточно гибкий
🚀 Быстрый старт
Запустим RabbitMQ локально за 2 минуты с помощью Docker:
# 15672 - веб-интерфейс управления
Docker Compose (рекомендуется)
version: '3.8' services: rabbitmq: image: rabbitmq:3-management container_name: rabbitmq ports: - "5672:5672" # AMQP - "15672:15672" # Management UI environment: - RABBITMQ_DEFAULT_USER=admin - RABBITMQ_DEFAULT_PASS=secret volumes: - rabbitmq_data:/var/lib/rabbitmq volumes: rabbitmq_data:
rabbitmq | Server startup complete
Откройте http://localhost:15672 — это веб-интерфейс для управления RabbitMQ. Здесь можно создавать очереди, смотреть статистику, отправлять тестовые сообщения.
💻 Примеры кода
Python (pika)
import pika import json # Подключение к RabbitMQ connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() # Создаём очередь (если не существует) channel.queue_declare(queue='orders', durable=True) # Отправляем сообщение order = { 'order_id': 12345, 'product': 'iPhone 15', 'quantity': 2, 'user_id': 67890 } channel.basic_publish( exchange='', # default exchange routing_key='orders', body=json.dumps(order), properties=pika.BasicProperties( delivery_mode=2 # persistent ) ) print(f"✅ Sent order: {order['order_id']}") connection.close()
import pika import json connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.queue_declare(queue='orders', durable=True) def process_order(ch, method, properties, body): order = json.loads(body) print(f"📦 Processing order: {order['order_id']}") print(f" Product: {order['product']}") print(f" Quantity: {order['quantity']}") # Имитация обработки import time time.sleep(1) # Подтверждаем обработку ch.basic_ack(delivery_tag=method.delivery_tag) print(f"✅ Order {order['order_id']} processed!") # Получаем по одному сообщению за раз channel.basic_qos(prefetch_count=1) # Подписываемся на очередь channel.basic_consume( queue='orders', on_message_callback=process_order ) print('🐰 Waiting for orders. Press CTRL+C to exit') channel.start_consuming()
Node.js (amqplib)
const amqp = require('amqplib'); async function sendOrder() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const queue = 'orders'; // Создаём очередь await channel.assertQueue(queue, { durable: true }); const order = { order_id: 12345, product: 'MacBook Pro', quantity: 1, user_id: 67890 }; // Отправляем channel.sendToQueue( queue, Buffer.from(JSON.stringify(order)), { persistent: true } ); console.log(`✅ Sent: ${order.order_id}`); await channel.close(); await connection.close(); } sendOrder().catch(console.error);
const amqp = require('amqplib'); async function consumeOrders() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const queue = 'orders'; await channel.assertQueue(queue, { durable: true }); // Получаем по одному сообщению channel.prefetch(1); console.log('🐰 Waiting for orders...'); channel.consume(queue, async (msg) => { if (msg !== null) { const order = JSON.parse(msg.content.toString()); console.log(`📦 Processing: ${order.order_id}`); console.log(` Product: ${order.product}`); // Имитация обработки await new Promise(r => setTimeout(r, 1000)); // Подтверждаем channel.ack(msg); console.log(`✅ Done: ${order.order_id}`); } }); } consumeOrders().catch(console.error);
Go (amqp091-go)
package main import ( "encoding/json" "log" amqp "github.com/rabbitmq/amqp091-go" ) type Order struct { OrderID int `json:"order_id"` Product string `json:"product"` Quantity int `json:"quantity"` UserID int `json:"user_id"` } func main() { conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/") defer conn.Close() ch, _ := conn.Channel() defer ch.Close() // Объявляем очередь q, _ := ch.QueueDeclare( "orders", // name true, // durable false, // auto-delete false, // exclusive false, // no-wait nil, // args ) order := Order{ OrderID: 12345, Product: "AirPods Pro", Quantity: 3, UserID: 67890, } body, _ := json.Marshal(order) ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: body, }, ) log.Printf("✅ Sent order: %d", order.OrderID) }
Пример с Topic Exchange
Более продвинутый пример — маршрутизация по категориям с использованием Topic Exchange:
import pika import json connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() # Создаём Topic Exchange channel.exchange_declare( exchange='events', exchange_type='topic', durable=True ) # Отправляем разные события events = [ ('order.created.premium', {'order_id': 1, 'type': 'premium'}), ('order.created.standard', {'order_id': 2, 'type': 'standard'}), ('order.shipped.premium', {'order_id': 1, 'status': 'shipped'}), ('payment.completed.premium', {'order_id': 1, 'amount': 999}), ('user.registered.standard', {'user_id': 42}), ] for routing_key, event in events: channel.basic_publish( exchange='events', routing_key=routing_key, body=json.dumps(event) ) print(f"📤 Sent [{routing_key}]: {event}") connection.close()
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.exchange_declare( exchange='events', exchange_type='topic', durable=True ) # Создаём временную очередь result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # Паттерны подписки из аргументов # Примеры: # python topic_consumer.py "order.*.*" - все события заказов # python topic_consumer.py "*.*.premium" - все premium события # python topic_consumer.py "#" - все события binding_keys = sys.argv[1:] or ['#'] for binding_key in binding_keys: channel.queue_bind( exchange='events', queue=queue_name, routing_key=binding_key ) print(f"🔗 Bound to: {binding_key}") def callback(ch, method, properties, body): print(f"📥 [{method.routing_key}] {body.decode()}") channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True ) print('🐰 Waiting for events...') channel.start_consuming()
* — заменяет ровно одно слово
# — заменяет ноль или более слов
Примеры:
order.*.premium → order.created.premium, order.shipped.premium
order.# → order.created, order.shipped.premium, order.a.b.c
#.premium → любое событие, заканчивающееся на .premium
⚖️ Плюсы и минусы RabbitMQ
RabbitMQ — отличный инструмент, но не серебряная пуля. Вот честный разбор сильных и слабых сторон:
- Зрелость и стабильность — 15+ лет в продакшене, проверен тысячами компаний
- Гибкая маршрутизация — 4 типа exchange покрывают почти любой сценарий
- Гарантии доставки — acknowledgments, persistent messages, publisher confirms
- Простота старта — запустил Docker, подключился, работает
- Management UI — отличный веб-интерфейс из коробки
- Множество протоколов — AMQP, MQTT, STOMP, HTTP
- Кластеризация — встроенная поддержка высокой доступности
- Плагины — расширяемая архитектура, много готовых плагинов
- Не для больших данных — при миллионах сообщений в секунду лучше Kafka
- Сообщения удаляются — после обработки сообщение пропадает (нет replay)
- Erlang — для отладки и тюнинга нужно понимать Erlang/OTP
- Память — при большом количестве сообщений в очередях потребляет много RAM
- Сложность кластера — настройка HA-кластера требует опыта
- Нет ordering гарантий — между разными очередями порядок не гарантирован
- Масштабирование — горизонтальное масштабирование сложнее, чем в Kafka
- Нет встроенного stream processing — для этого нужны внешние инструменты
Очереди могут переполниться. Если consumer не успевает
обрабатывать сообщения, очередь растёт → память заканчивается →
RabbitMQ начинает отклонять новые сообщения или падает.
Решение: Настройте x-max-length и
x-overflow для очередей, мониторьте размер очередей,
масштабируйте consumer'ов.
🥊 RabbitMQ vs Kafka
Самый частый вопрос: "Что выбрать — RabbitMQ или Kafka?" Короткий ответ: это разные инструменты для разных задач.
| Критерий | RabbitMQ | Kafka |
|---|---|---|
| Модель | Message Broker (Smart broker, dumb consumer) | Event Log (Dumb broker, smart consumer) |
| Хранение | Удаляет после доставки | Хранит по времени/размеру (replay) |
| Пропускная способность | ~50K msg/sec | ~1M msg/sec |
| Маршрутизация | Гибкая (exchanges, bindings) | Простая (topics, partitions) |
| Гарантии порядка | В пределах одной очереди | В пределах partition (строгий) |
| Consumer groups | Ручная реализация | Встроенная поддержка |
| Сложность | Простой старт | Требует ZooKeeper/KRaft |
| Протоколы | AMQP, MQTT, STOMP, HTTP | Свой бинарный протокол |
| Приоритеты сообщений | Поддерживаются | Нет |
| Dead Letter Queue | Встроенная | Ручная реализация |
| Stream Processing | Нет (нужен внешний) | Kafka Streams встроен |
Когда выбрать RabbitMQ
• Нужна сложная маршрутизация сообщений
• Важны приоритеты сообщений
• Нужна гарантия доставки каждого сообщения
• Требуется поддержка разных протоколов (MQTT для IoT)
• Нагрузка до 50K msg/sec
• Команда не имеет опыта с Kafka
• Микросервисы с request/reply паттерном
Когда выбрать Kafka
• Нужен event sourcing или replay событий
• Пропускная способность 100K+ msg/sec
• Нужен stream processing (Kafka Streams, ksqlDB)
• Строгий порядок событий критичен
• Данные нужно хранить долго (дни/недели)
• Много consumer'ов читают одни и те же данные
• Big Data и аналитика в реальном времени
RabbitMQ: "Отправь сообщение и забудь" —
традиционная очередь задач, уведомления, интеграции.
Kafka: "Запиши событие навсегда" —
event sourcing, аналитика, data pipelines, аудит.
🎯 Когда использовать RabbitMQ
Конкретные сценарии, где RabbitMQ — правильный выбор, и где лучше посмотреть на альтернативы:
✅ Идеальные сценарии
❌ Когда НЕ использовать RabbitMQ
Реальные примеры архитектуры
┌─────────────────────────────────────────────────────────────────┐ │ E-Commerce с RabbitMQ │ └─────────────────────────────────────────────────────────────────┘ [Web API] ──publish──► orders.created │ ┌─────────┴─────────┐ ▼ ▼ [Payment Service] [Inventory Service] │ │ ▼ ▼ orders.paid orders.reserved │ │ └─────────┬─────────┘ ▼ [Shipping Service] │ ▼ orders.shipped │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ [Email Service] [SMS Service] [Analytics] Exchanges: • orders (topic) - все события заказов • notifications (fanout) - рассылка во все каналы Queues: • payment.process (binding: orders.created) • inventory.reserve (binding: orders.created) • shipping.prepare (binding: orders.paid AND orders.reserved) • email.send (binding: orders.*) • analytics.track (binding: #)
❓ Частые вопросы
redelivered=True.Если auto_ack=True, сообщение будет потеряно сразу после доставки (до обработки).
1. Durable queue:
queue_declare(queue='orders', durable=True)2. Persistent messages:
delivery_mode=2 в properties3. Publisher confirms: подтверждение, что RabbitMQ записал на диск
Все три вместе дают максимальные гарантии (но снижают производительность).
Важно: установите
prefetch_count (например, 1 или 10), чтобы
один consumer не захватил все сообщения. Это называется Fair Dispatch.
• Consumer явно отклонил его (reject/nack с requeue=false)
• Истёк TTL сообщения
• Очередь переполнена
DLQ настраивается через аргументы очереди:
x-dead-letter-exchange
и x-dead-letter-routing-key.
max_message_size).Рекомендация: Не отправляйте большие файлы через RabbitMQ. Лучше сохраните файл в S3/MinIO и отправьте только ссылку. Оптимальный размер сообщения — до 1 МБ.
• Fire-and-forget — если subscriber offline, сообщение потеряно
• Нет persistence, нет acknowledgments
• Очень быстрый, но ненадёжный
RabbitMQ:
• Сообщения хранятся в очереди до обработки
• Гарантии доставки, persistence, acknowledgments
• Медленнее, но надёжнее
Используйте Redis для real-time событий, где потеря допустима. RabbitMQ — когда каждое сообщение важно.
• Management UI (порт 15672)
• HTTP API для метрик
•
rabbitmqctl CLIProduction-решения:
• Prometheus + Grafana (плагин rabbitmq_prometheus)
• Datadog, New Relic, Dynatrace
• ELK Stack для логов
Ключевые метрики: размер очередей, rate publish/consume, memory usage, connection count, unacked messages.
1. Запустите RabbitMQ локально через Docker
2. Откройте Management UI и создайте очередь вручную
3. Напишите простой producer/consumer на своём языке
4. Поэкспериментируйте с разными типами Exchange
5. Добавьте в проект — начните с отправки email/уведомлений
RabbitMQ — это инструмент, который должен быть в арсенале каждого backend-разработчика.
Не потому что он нужен везде, а потому что когда он нужен — альтернативы часто хуже.