RabbitMQ — Полное руководство по брокеру сообщений

Что это, как работает, когда использовать. Примеры кода, сравнение с Kafka, плюсы и минусы — всё, что нужно для принятия решений.

Message Broker AMQP Protocol Open Source Erlang
2007
Год создания
35K+
GitHub Stars
100K+
msg/sec
#1
Среди брокеров

🐰 Что такое RabbitMQ

RabbitMQ — это брокер сообщений (message broker), который позволяет приложениям обмениваться данными асинхронно. Думайте о нём как о почтовом отделении: вы отправляете письмо, оно хранится до тех пор, пока получатель не заберёт его.

Написан на Erlang — языке, созданном для телекоммуникаций, что даёт RabbitMQ отличную отказоустойчивость и способность обрабатывать тысячи одновременных соединений.

🎯 Главная задача RabbitMQ

Развязать отправителя и получателя сообщений. Отправитель не знает, кто получит сообщение и когда. Получатель не знает, кто отправил. Это даёт гибкость, масштабируемость и отказоустойчивость.

Как это работает (простыми словами)

Представьте ресторан:

  • Официант (Producer) — принимает заказ и кладёт его на стойку
  • Стойка заказов (Queue) — место, где заказы ждут обработки
  • Повар (Consumer) — берёт заказы со стойки и готовит
  • RabbitMQ — это сама стойка + система распределения заказов

Официант не стоит и не ждёт, пока повар приготовит блюдо — он сразу идёт к следующему столику. Повар работает в своём темпе. Если поваров несколько — заказы распределяются между ними.

Кто использует RabbitMQ

Крупные компании: Reddit, 9GAG, Bloomberg, VMware, Mozilla
В России: Сбер, МТС, Авито, и многие другие

🧱 Ключевые концепции

Прежде чем писать код, важно понять базовые компоненты RabbitMQ:

📤
Producer
Приложение, которое отправляет сообщения. Producer не отправляет напрямую в очередь — он отправляет в Exchange.
🔀
Exchange
"Почтовое отделение" — получает сообщения от Producer и решает, в какие очереди их направить на основе правил (bindings).
📬
Queue
Буфер, который хранит сообщения. Сообщения ждут здесь, пока Consumer не заберёт их. Может быть durable (переживает рестарт).
📥
Consumer
Приложение, которое получает и обрабатывает сообщения из очереди. После обработки отправляет acknowledgment (подтверждение).
🔗
Binding
Связь между Exchange и Queue. Определяет правила маршрутизации: какие сообщения куда направлять (routing key).
🏷️
Routing Key
"Адрес" сообщения. Exchange использует его для определения, в какую очередь отправить сообщение.
Acknowledgment
Подтверждение от Consumer, что сообщение обработано. Без ack сообщение вернётся в очередь (для другого consumer).
🖥️
Virtual Host
Логическая изоляция внутри RabbitMQ. Как отдельные "базы данных" — у каждой свои очереди, exchanges, пользователи.

Архитектура: как всё связано

Поток сообщений в RabbitMQ
Producer
Отправляет
Exchange
Маршрутизирует
binding
Queue
Хранит
Consumer
Обрабатывает
💡 Важно понять

Producer никогда не отправляет напрямую в Queue. Всегда через Exchange. Даже "простая" отправка использует default exchange (безымянный), который направляет по имени очереди.

🔀 Типы Exchange

Exchange — сердце маршрутизации в RabbitMQ. От типа Exchange зависит, как сообщения распределяются по очередям. Есть 4 основных типа:

🎯 Direct Exchange
Точное совпадение routing key. Сообщение идёт в очередь, где binding key = routing key сообщения.
Пример: routing_key=payment.success → только в очередь с таким же binding
📢 Fanout Exchange
Broadcast — сообщение идёт во ВСЕ привязанные очереди. Routing key игнорируется.
Пример: Уведомление о новом заказе → email-сервис, SMS-сервис, аналитика
🌳 Topic Exchange
Паттерн-matching с wildcards: * (одно слово), # (ноль или более слов).
Пример: order.*.success матчит order.payment.success, order.shipping.success
📋 Headers Exchange
Маршрутизация по заголовкам сообщения, а не routing key. Редко используется, но гибко.
Пример: header format=pdf + type=report → PDF-генератор

Какой Exchange выбрать?

📌 Простое правило выбора

Direct: Одно сообщение → один конкретный получатель
Fanout: Одно сообщение → всем подписчикам (pub/sub)
Topic: Сложная маршрутизация по категориям/иерархии
Headers: Когда routing key недостаточно гибкий

🚀 Быстрый старт

Запустим RabbitMQ локально за 2 минуты с помощью Docker:

Terminal
$ docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# 5672 - порт AMQP (для приложений)
# 15672 - веб-интерфейс управления
$ open http://localhost:15672
# Login: guest / Password: guest

Docker Compose (рекомендуется)

YAML docker-compose.yml
version: '3.8'

services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
  - "5672:5672"    # AMQP
  - "15672:15672"  # Management UI
environment:
  - RABBITMQ_DEFAULT_USER=admin
  - RABBITMQ_DEFAULT_PASS=secret
volumes:
  - rabbitmq_data:/var/lib/rabbitmq

volumes:
rabbitmq_data:
Terminal
$ docker-compose up -d
Creating rabbitmq ... done
$ docker-compose logs -f rabbitmq
rabbitmq | Starting RabbitMQ 3.12.0 on Erlang 25.3
rabbitmq | Server startup complete
🌐 Management UI

Откройте http://localhost:15672 — это веб-интерфейс для управления RabbitMQ. Здесь можно создавать очереди, смотреть статистику, отправлять тестовые сообщения.

💻 Примеры кода

Python (pika)

Python producer.py
import pika
import json

# Подключение к RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Создаём очередь (если не существует)
channel.queue_declare(queue='orders', durable=True)

# Отправляем сообщение
order = {
'order_id': 12345,
'product': 'iPhone 15',
'quantity': 2,
'user_id': 67890
}

channel.basic_publish(
exchange='',  # default exchange
routing_key='orders',
body=json.dumps(order),
properties=pika.BasicProperties(
    delivery_mode=2  # persistent
)
)

print(f"✅ Sent order: {order['order_id']}")
connection.close()
Python consumer.py
import pika
import json

connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.queue_declare(queue='orders', durable=True)

def process_order(ch, method, properties, body):
order = json.loads(body)
print(f"📦 Processing order: {order['order_id']}")
print(f"   Product: {order['product']}")
print(f"   Quantity: {order['quantity']}")

# Имитация обработки
import time
time.sleep(1)

# Подтверждаем обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"✅ Order {order['order_id']} processed!")

# Получаем по одному сообщению за раз
channel.basic_qos(prefetch_count=1)

# Подписываемся на очередь
channel.basic_consume(
queue='orders',
on_message_callback=process_order
)

print('🐰 Waiting for orders. Press CTRL+C to exit')
channel.start_consuming()

Node.js (amqplib)

JavaScript producer.js
const amqp = require('amqplib');

async function sendOrder() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();

const queue = 'orders';

// Создаём очередь
await channel.assertQueue(queue, { durable: true });

const order = {
    order_id: 12345,
    product: 'MacBook Pro',
    quantity: 1,
    user_id: 67890
};

// Отправляем
channel.sendToQueue(
    queue,
    Buffer.from(JSON.stringify(order)),
    { persistent: true }
);

console.log(`✅ Sent: ${order.order_id}`);

await channel.close();
await connection.close();
}

sendOrder().catch(console.error);
JavaScript consumer.js
const amqp = require('amqplib');

async function consumeOrders() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'orders';

    await channel.assertQueue(queue, { durable: true });

    // Получаем по одному сообщению
    channel.prefetch(1);

    console.log('🐰 Waiting for orders...');

    channel.consume(queue, async (msg) => {
        if (msg !== null) {
            const order = JSON.parse(msg.content.toString());

            console.log(`📦 Processing: ${order.order_id}`);
            console.log(`   Product: ${order.product}`);

            // Имитация обработки
            await new Promise(r => setTimeout(r, 1000));

            // Подтверждаем
            channel.ack(msg);
            console.log(`✅ Done: ${order.order_id}`);
        }
    });
}

consumeOrders().catch(console.error);

Go (amqp091-go)

Go producer.go
package main

import (
    "encoding/json"
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

type Order struct {
    OrderID  int    `json:"order_id"`
    Product  string `json:"product"`
    Quantity int    `json:"quantity"`
    UserID   int    `json:"user_id"`
}

func main() {
    conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()

    ch, _ := conn.Channel()
    defer ch.Close()

    // Объявляем очередь
    q, _ := ch.QueueDeclare(
        "orders", // name
        true,     // durable
        false,    // auto-delete
        false,    // exclusive
        false,    // no-wait
        nil,      // args
    )

    order := Order{
        OrderID:  12345,
        Product:  "AirPods Pro",
        Quantity: 3,
        UserID:   67890,
    }

    body, _ := json.Marshal(order)

    ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "application/json",
            Body:         body,
        },
    )

    log.Printf("✅ Sent order: %d", order.OrderID)
}

Пример с Topic Exchange

Более продвинутый пример — маршрутизация по категориям с использованием Topic Exchange:

Python topic_producer.py
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Создаём Topic Exchange
channel.exchange_declare(
    exchange='events',
    exchange_type='topic',
    durable=True
)

# Отправляем разные события
events = [
    ('order.created.premium', {'order_id': 1, 'type': 'premium'}),
    ('order.created.standard', {'order_id': 2, 'type': 'standard'}),
    ('order.shipped.premium', {'order_id': 1, 'status': 'shipped'}),
    ('payment.completed.premium', {'order_id': 1, 'amount': 999}),
    ('user.registered.standard', {'user_id': 42}),
]

for routing_key, event in events:
    channel.basic_publish(
        exchange='events',
        routing_key=routing_key,
        body=json.dumps(event)
    )
    print(f"📤 Sent [{routing_key}]: {event}")

connection.close()
Python topic_consumer.py
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.exchange_declare(
    exchange='events',
    exchange_type='topic',
    durable=True
)

# Создаём временную очередь
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Паттерны подписки из аргументов
# Примеры:
# python topic_consumer.py "order.*.*"       - все события заказов
# python topic_consumer.py "*.*.premium"     - все premium события
# python topic_consumer.py "#"               - все события

binding_keys = sys.argv[1:] or ['#']

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='events',
        queue=queue_name,
        routing_key=binding_key
    )
    print(f"🔗 Bound to: {binding_key}")

def callback(ch, method, properties, body):
    print(f"📥 [{method.routing_key}] {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print('🐰 Waiting for events...')
channel.start_consuming()
💡 Паттерны Topic Exchange

* — заменяет ровно одно слово
# — заменяет ноль или более слов

Примеры:
order.*.premiumorder.created.premium, order.shipped.premium
order.#order.created, order.shipped.premium, order.a.b.c
#.premium → любое событие, заканчивающееся на .premium

⚖️ Плюсы и минусы RabbitMQ

RabbitMQ — отличный инструмент, но не серебряная пуля. Вот честный разбор сильных и слабых сторон:

Плюсы
  • Зрелость и стабильность — 15+ лет в продакшене, проверен тысячами компаний
  • Гибкая маршрутизация — 4 типа exchange покрывают почти любой сценарий
  • Гарантии доставки — acknowledgments, persistent messages, publisher confirms
  • Простота старта — запустил Docker, подключился, работает
  • Management UI — отличный веб-интерфейс из коробки
  • Множество протоколов — AMQP, MQTT, STOMP, HTTP
  • Кластеризация — встроенная поддержка высокой доступности
  • Плагины — расширяемая архитектура, много готовых плагинов
Минусы
  • Не для больших данных — при миллионах сообщений в секунду лучше Kafka
  • Сообщения удаляются — после обработки сообщение пропадает (нет replay)
  • Erlang — для отладки и тюнинга нужно понимать Erlang/OTP
  • Память — при большом количестве сообщений в очередях потребляет много RAM
  • Сложность кластера — настройка HA-кластера требует опыта
  • Нет ordering гарантий — между разными очередями порядок не гарантирован
  • Масштабирование — горизонтальное масштабирование сложнее, чем в Kafka
  • Нет встроенного stream processing — для этого нужны внешние инструменты
⚠️ Главный подводный камень

Очереди могут переполниться. Если consumer не успевает обрабатывать сообщения, очередь растёт → память заканчивается → RabbitMQ начинает отклонять новые сообщения или падает.

Решение: Настройте x-max-length и x-overflow для очередей, мониторьте размер очередей, масштабируйте consumer'ов.

🥊 RabbitMQ vs Kafka

Самый частый вопрос: "Что выбрать — RabbitMQ или Kafka?" Короткий ответ: это разные инструменты для разных задач.

Критерий RabbitMQ Kafka
Модель Message Broker (Smart broker, dumb consumer) Event Log (Dumb broker, smart consumer)
Хранение Удаляет после доставки Хранит по времени/размеру (replay)
Пропускная способность ~50K msg/sec ~1M msg/sec
Маршрутизация Гибкая (exchanges, bindings) Простая (topics, partitions)
Гарантии порядка В пределах одной очереди В пределах partition (строгий)
Consumer groups Ручная реализация Встроенная поддержка
Сложность Простой старт Требует ZooKeeper/KRaft
Протоколы AMQP, MQTT, STOMP, HTTP Свой бинарный протокол
Приоритеты сообщений Поддерживаются Нет
Dead Letter Queue Встроенная Ручная реализация
Stream Processing Нет (нужен внешний) Kafka Streams встроен

Когда выбрать RabbitMQ

🐰 RabbitMQ — ваш выбор, если:

• Нужна сложная маршрутизация сообщений
• Важны приоритеты сообщений
• Нужна гарантия доставки каждого сообщения
• Требуется поддержка разных протоколов (MQTT для IoT)
• Нагрузка до 50K msg/sec
• Команда не имеет опыта с Kafka
• Микросервисы с request/reply паттерном

Когда выбрать Kafka

📊 Kafka — ваш выбор, если:

• Нужен event sourcing или replay событий
• Пропускная способность 100K+ msg/sec
• Нужен stream processing (Kafka Streams, ksqlDB)
• Строгий порядок событий критичен
• Данные нужно хранить долго (дни/недели)
• Много consumer'ов читают одни и те же данные
• Big Data и аналитика в реальном времени

💡 Практическое правило

RabbitMQ: "Отправь сообщение и забудь" — традиционная очередь задач, уведомления, интеграции.

Kafka: "Запиши событие навсегда" — event sourcing, аналитика, data pipelines, аудит.

🎯 Когда использовать RabbitMQ

Конкретные сценарии, где RabbitMQ — правильный выбор, и где лучше посмотреть на альтернативы:

✅ Идеальные сценарии

❌ Когда НЕ использовать RabbitMQ

Избегать
📊
Big Data Streaming
Миллионы событий в секунду, аналитика в реальном времени. Используйте Kafka или Apache Pulsar.
Избегать
🔁
Event Sourcing
Нужно хранить и переигрывать все события. RabbitMQ удаляет сообщения после обработки.
Избегать
💾
Долгое хранение
Сообщения нужно хранить дни/недели для аудита. RabbitMQ — не база данных.
Избегать
🔢
Строгий глобальный порядок
Нужен порядок между разными очередями/consumer'ами. Kafka с одной partition справится лучше.

Реальные примеры архитектуры

Architecture e-commerce-example.txt
┌─────────────────────────────────────────────────────────────────┐
│                    E-Commerce с RabbitMQ                        │
└─────────────────────────────────────────────────────────────────┘

[Web API] ──publish──► orders.created
                              │
                    ┌─────────┴─────────┐
                    ▼                   ▼
           [Payment Service]    [Inventory Service]
                    │                   │
                    ▼                   ▼
           orders.paid         orders.reserved
                    │                   │
                    └─────────┬─────────┘
                              ▼
                    [Shipping Service]
                              │
                              ▼
                    orders.shipped
                              │
              ┌───────────────┼───────────────┐
              ▼               ▼               ▼
    [Email Service]  [SMS Service]  [Analytics]

Exchanges:orders (topic) - все события заказов
  • notifications (fanout) - рассылка во все каналы

Queues:
  • payment.process (binding: orders.created)
  • inventory.reserve (binding: orders.created)  
  • shipping.prepare (binding: orders.paid AND orders.reserved)
  • email.send (binding: orders.*)
  • analytics.track (binding: #)

Частые вопросы

Если auto_ack=False (рекомендуется), сообщение вернётся в очередь и будет доставлено другому consumer'у. Это называется redelivery. Сообщение получит флаг redelivered=True.

Если auto_ack=True, сообщение будет потеряно сразу после доставки (до обработки).
Нужны три вещи:

1. Durable queue: queue_declare(queue='orders', durable=True)
2. Persistent messages: delivery_mode=2 в properties
3. Publisher confirms: подтверждение, что RabbitMQ записал на диск

Все три вместе дают максимальные гарантии (но снижают производительность).
Просто запустите несколько экземпляров consumer'а — RabbitMQ автоматически распределит сообщения между ними (round-robin).

Важно: установите prefetch_count (например, 1 или 10), чтобы один consumer не захватил все сообщения. Это называется Fair Dispatch.
Dead Letter Queue — очередь для "мёртвых" сообщений, которые не удалось обработать. Сообщение попадает в DLQ, если:

• Consumer явно отклонил его (reject/nack с requeue=false)
• Истёк TTL сообщения
• Очередь переполнена

DLQ настраивается через аргументы очереди: x-dead-letter-exchange и x-dead-letter-routing-key.
По умолчанию 128 МБ, но это можно изменить в конфигурации (max_message_size).

Рекомендация: Не отправляйте большие файлы через RabbitMQ. Лучше сохраните файл в S3/MinIO и отправьте только ссылку. Оптимальный размер сообщения — до 1 МБ.
Redis Pub/Sub:
• Fire-and-forget — если subscriber offline, сообщение потеряно
• Нет persistence, нет acknowledgments
• Очень быстрый, но ненадёжный

RabbitMQ:
• Сообщения хранятся в очереди до обработки
• Гарантии доставки, persistence, acknowledgments
• Медленнее, но надёжнее

Используйте Redis для real-time событий, где потеря допустима. RabbitMQ — когда каждое сообщение важно.
Встроенные инструменты:
• Management UI (порт 15672)
• HTTP API для метрик
rabbitmqctl CLI

Production-решения:
• Prometheus + Grafana (плагин rabbitmq_prometheus)
• Datadog, New Relic, Dynatrace
• ELK Stack для логов

Ключевые метрики: размер очередей, rate publish/consume, memory usage, connection count, unacked messages.
🐰 RabbitMQ — Итоги
🎯
Для чего
Message Broker, очереди задач, интеграции
Производительность
~50-100K msg/sec
Сильные стороны
Маршрутизация, гарантии, простота
Слабые стороны
Нет replay, не для Big Data
🆚
Альтернатива
Kafka (streaming), Redis (простые очереди)
📚
Документация
rabbitmq.com/documentation.html
🚀 Следующие шаги

1. Запустите RabbitMQ локально через Docker
2. Откройте Management UI и создайте очередь вручную
3. Напишите простой producer/consumer на своём языке
4. Поэкспериментируйте с разными типами Exchange
5. Добавьте в проект — начните с отправки email/уведомлений

RabbitMQ — это инструмент, который должен быть в арсенале каждого backend-разработчика. Не потому что он нужен везде, а потому что когда он нужен — альтернативы часто хуже.

© 2025 • Написано разработчиком для разработчиков

Версия RabbitMQ на момент написания: 3.12.x