Redis — Полное руководство
In-memory база данных для кеширования, очередей сообщений, real-time аналитики. Redis Queue, Celery интеграция, Redis Streams — всё с примерами на Python.
🔴 Что такое Redis
Redis (Remote Dictionary Server) — это высокопроизводительная in-memory база данных, хранящая данные в оперативной памяти. Это делает её невероятно быстрой — миллионы операций в секунду.
RAM vs Disk: Чтение из RAM ~100 наносекунд, с диска ~10 миллисекунд.
Разница в 100 000 раз!
Однопоточность: Нет накладных расходов на блокировки и синхронизацию.
Простые структуры: Оптимизированные алгоритмы для каждого типа данных.
Ключевые особенности
- In-Memory — данные в RAM, скорость <1ms
- Персистентность — опционально сохраняет на диск (RDB, AOF)
- Репликация — Master-Replica для отказоустойчивости
- Кластеризация — Redis Cluster для горизонтального масштабирования
- Pub/Sub — встроенная система публикации/подписки
- Lua scripting — атомарные операции через скрипты
- TTL — автоматическое удаление по времени жизни
Redis хранит данные в RAM — это дорого и ограничено.
Используйте Redis как дополнение к PostgreSQL/MySQL:
• Основные данные → PostgreSQL (надёжность)
• Кеш, сессии, очереди → Redis (скорость)
📊 Типы данных Redis
Redis — не просто key-value хранилище. Он поддерживает богатые структуры данных, каждая оптимизирована для своих задач:
GET user:1:name → "John"
INCR page:views → 1, 2, 3...
RPOP queue → "task1"
LRANGE list 0 9 → последние 10
SISMEMBER tags "python" → 1
SINTER tags1 tags2 → пересечение
ZRANK leaderboard "player1" → 0
ZRANGE leaderboard 0 9 → топ-10
HGET user:1 name → "John"
HGETALL user:1 → все поля
XREAD STREAMS events 0
XRANGE events - +
GEODIST locations "Berlin" "Paris"
GEORADIUS locations 13 52 100 km
PFCOUNT visitors → ~2
# 12KB на миллиарды элементов!
🎯 Где использовать Redis
🚀 Быстрый старт
Установка через Docker (рекомендуется)
Базовые команды Redis CLI
🐍 Redis + Python
import redis import json # Подключение к Redis r = redis.Redis( host='localhost', port=6379, db=0, decode_responses=True # Автоматически декодировать в str ) # ============ Strings ============ # Простое значение r.set('greeting', 'Hello, Redis!') print(r.get('greeting')) # "Hello, Redis!" # С временем жизни (TTL) r.setex('session:abc123', 3600, 'user:1') # 1 час print(r.ttl('session:abc123')) # 3599 # Счётчики r.set('views', 0) r.incr('views') # 1 r.incrby('views', 10) # 11 # ============ Hash (объекты) ============ # Сохранить объект пользователя r.hset('user:1', mapping={ 'name': 'John Doe', 'email': 'john@example.com', 'age': '30' }) # Получить все поля user = r.hgetall('user:1') print(user) # {'name': 'John Doe', 'email': '...', 'age': '30'} # Одно поле print(r.hget('user:1', 'name')) # "John Doe" # ============ List (очередь) ============ # Добавить в очередь r.lpush('tasks', 'task1', 'task2', 'task3') # Взять из очереди (FIFO) task = r.rpop('tasks') print(task) # "task1" # Блокирующее ожидание (для воркеров) # task = r.brpop('tasks', timeout=30) # Ждать до 30 сек # ============ Set (уникальные значения) ============ r.sadd('tags:post:1', 'python', 'redis', 'tutorial') r.sadd('tags:post:2', 'python', 'django') # Пересечение тегов common = r.sinter('tags:post:1', 'tags:post:2') print(common) # {'python'} # ============ Sorted Set (рейтинг) ============ # Добавить игроков с очками r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150}) # Топ-3 (по убыванию) top3 = r.zrevrange('leaderboard', 0, 2, withscores=True) print(top3) # [('player2', 200.0), ('player3', 150.0), ('player1', 100.0)] # Ранг игрока rank = r.zrevrank('leaderboard', 'player1') print(f"Player1 rank: {rank + 1}") # 3
Паттерн: Кеширование с Redis
import redis import json from functools import wraps r = redis.Redis(host='localhost', decode_responses=True) # ============ Декоратор для кеширования ============ def cache(ttl=300): """Кеширует результат функции на ttl секунд""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Генерируем ключ из имени функции и аргументов key = f"cache:{func.__name__}:{args}:{kwargs}" # Пробуем получить из кеша cached = r.get(key) if cached: print(f"✓ Cache HIT: {key}") return json.loads(cached) # Нет в кеше — вызываем функцию print(f"✗ Cache MISS: {key}") result = func(*args, **kwargs) # Сохраняем в кеш r.setex(key, ttl, json.dumps(result)) return result return wrapper return decorator # ============ Использование ============ @cache(ttl=60) def get_user_from_db(user_id): """Имитация тяжёлого запроса к БД""" import time time.sleep(2) # Типа долгий запрос return { 'id': user_id, 'name': f'User {user_id}', 'email': f'user{user_id}@example.com' } # Первый вызов — 2 секунды (запрос к "БД") user = get_user_from_db(1) # ✗ Cache MISS # Второй вызов — мгновенно (из кеша) user = get_user_from_db(1) # ✓ Cache HIT # ============ Cache-Aside Pattern ============ def get_product(product_id): """Классический паттерн Cache-Aside""" cache_key = f"product:{product_id}" # 1. Проверяем кеш cached = r.get(cache_key) if cached: return json.loads(cached) # 2. Запрос к БД product = db.query("SELECT * FROM products WHERE id = %s", product_id) if product: # 3. Сохраняем в кеш r.setex(cache_key, 3600, json.dumps(product)) return product def update_product(product_id, data): """При обновлении — инвалидируем кеш""" # 1. Обновляем в БД db.query("UPDATE products SET ... WHERE id = %s", product_id) # 2. Удаляем из кеша r.delete(f"product:{product_id}")
📬 Redis Queue (RQ)
RQ (Redis Queue) — простая библиотека для фоновых задач на Python. Легче Celery, отлично подходит для небольших и средних проектов.
Queue
Шаг 1: Определяем задачи
import time import requests def send_email(to, subject, body): """Отправка email (тяжёлая операция)""" print(f"📧 Sending email to {to}...") time.sleep(3) # Имитация отправки print(f"✅ Email sent to {to}") return {'status': 'sent', 'to': to} def process_image(image_url): """Обработка изображения""" print(f"🖼️ Processing {image_url}...") time.sleep(5) # Тяжёлая обработка return {'status': 'processed', 'url': image_url} def generate_report(user_id, date_range): """Генерация PDF-отчёта""" print(f"📊 Generating report for user {user_id}...") time.sleep(10) return {'status': 'ready', 'file': f'/reports/{user_id}.pdf'}
Шаг 2: Добавляем задачи в очередь
from redis import Redis from rq import Queue from tasks import send_email, process_image, generate_report # Подключение к Redis redis_conn = Redis(host='localhost', port=6379) # Создаём очереди default_queue = Queue(connection=redis_conn) high_priority = Queue('high', connection=redis_conn) low_priority = Queue('low', connection=redis_conn) # ============ Добавление задач ============ # Простое добавление job = default_queue.enqueue(send_email, 'user@example.com', 'Welcome!', 'Hello from our app' ) print(f"Job ID: {job.id}") # С приоритетом urgent_job = high_priority.enqueue(send_email, 'vip@example.com', 'Urgent!', 'Important message' ) # Отложенная задача (через 60 секунд) from datetime import timedelta delayed_job = default_queue.enqueue_in( timedelta(seconds=60), send_email, 'user@example.com', 'Reminder', 'Don\'t forget!' ) # С таймаутом выполнения long_job = low_priority.enqueue( generate_report, 'user123', '2024-01-01:2024-12-31', job_timeout=600 # 10 минут максимум ) # ============ Проверка статуса ============ print(f"Job status: {job.get_status()}") # queued, started, finished, failed print(f"Job result: {job.result}") # None пока не выполнено # Дождаться результата (блокирующий вызов) result = job.result if result is None: job.refresh() # Обновить статус if job.is_finished: result = job.result
Шаг 3: Запускаем Worker
Веб-интерфейс для мониторинга очередей:
pip install rq-dashboard
rq-dashboard
Откройте http://localhost:9181 — увидите все очереди, задачи, воркеры.
🥬 Redis + Celery
Celery — мощная система распределённых очередей задач. Сложнее RQ, но больше возможностей: периодические задачи, цепочки, группы, retry, мониторинг.
- RQ Простые проекты, быстрый старт
- RQ Только Python
- Celery Сложная логика, цепочки задач
- Celery Периодические задачи (cron)
- Celery Несколько брокеров (Redis, RabbitMQ)
- ✓ Асинхронные задачи
- ✓ Периодические задачи (Celery Beat)
- ✓ Retry с exponential backoff
- ✓ Цепочки, группы, хорды
- ✓ Мониторинг (Flower)
Конфигурация Celery
from celery import Celery # Создаём приложение Celery app = Celery( 'myapp', broker='redis://localhost:6379/0', # Брокер (очередь задач) backend='redis://localhost:6379/1', # Backend (результаты) include=['tasks'] # Модули с задачами ) # Конфигурация app.conf.update( task_serializer='json', result_serializer='json', accept_content=['json'], timezone='UTC', enable_utc=True, # Retry настройки task_acks_late=True, task_reject_on_worker_lost=True, # Таймауты task_soft_time_limit=300, # 5 минут soft limit task_time_limit=600, # 10 минут hard limit ) # Периодические задачи (Celery Beat) app.conf.beat_schedule = { 'cleanup-every-hour': { 'task': 'tasks.cleanup_old_data', 'schedule': 3600.0, # каждый час }, 'daily-report': { 'task': 'tasks.generate_daily_report', 'schedule': crontab(hour=9, minute=0), # каждый день в 9:00 }, }
Определение задач
from celery_app import app import time # ============ Базовая задача ============ @app.task def send_email(to, subject, body): """Простая задача отправки email""" print(f"📧 Sending to {to}") time.sleep(2) return {'status': 'sent', 'to': to} # ============ Задача с retry ============ @app.task( bind=True, autoretry_for=(Exception,), retry_backoff=True, # Exponential backoff retry_backoff_max=600, # Max 10 минут между retry max_retries=5 ) def call_external_api(self, url): """Задача с автоматическим retry при ошибках""" try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except requests.RequestException as exc: print(f"❌ API error: {exc}, retry {self.request.retries}/{self.max_retries}") raise # autoretry сработает # ============ Задача с ручным retry ============ @app.task(bind=True, max_retries=3) def process_payment(self, payment_id): """Обработка платежа с контролем retry""" try: result = payment_gateway.process(payment_id) return result except PaymentError as exc: # Retry через 60 секунд raise self.retry(exc=exc, countdown=60) # ============ Периодические задачи ============ @app.task def cleanup_old_data(): """Очистка старых данных (запускается по расписанию)""" deleted = db.execute("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'") return {'deleted_rows': deleted} @app.task def generate_daily_report(): """Ежедневный отчёт""" # ... генерация отчёта return {'status': 'generated'}
Вызов задач
from tasks import send_email, call_external_api # ============ Асинхронный вызов ============ # Отправить в очередь (не ждём результат) result = send_email.delay('user@example.com', 'Hello', 'Welcome!') print(f"Task ID: {result.id}") # Альтернативный синтаксис result = send_email.apply_async( args=['user@example.com', 'Hello', 'Welcome!'], countdown=60, # Выполнить через 60 сек expires=3600, # Задача истекает через час queue='high' # Конкретная очередь ) # ============ Проверка статуса ============ print(result.status) # PENDING, STARTED, SUCCESS, FAILURE print(result.ready()) # True если завершено print(result.successful())# True если успешно # Получить результат (блокирующий вызов) try: data = result.get(timeout=10) # Ждать макс 10 сек print(f"Result: {data}") except TimeoutError: print("Task still running...") # ============ Цепочки задач ============ from celery import chain, group, chord # Последовательное выполнение workflow = chain( download_file.s('http://example.com/data.csv'), parse_csv.s(), save_to_db.s() ) result = workflow.apply_async() # Параллельное выполнение emails = group( send_email.s('user1@example.com', 'Hi', 'Body'), send_email.s('user2@example.com', 'Hi', 'Body'), send_email.s('user3@example.com', 'Hi', 'Body'), ) result = emails.apply_async()
Запуск Celery
🌊 Redis Streams
Redis Streams — структура данных для event streaming и обработки сообщений. Как Apache Kafka, но встроенный в Redis. Идеально для event-driven архитектуры.
Pub/Sub: Fire-and-forget. Если подписчика нет — сообщение потеряно.
List: Простые очереди. Один consumer = одно сообщение.
Streams: Персистентный лог. Consumer Groups, acknowledgments,
история сообщений. Надёжность + масштабируемость.
Базовые операции
Consumer Groups
Consumer Groups позволяют нескольким воркерам обрабатывать один stream, при этом каждое сообщение получает только один воркер:
"events"
"analytics"
"notifications"
Python + Redis Streams
import redis import json from datetime import datetime r = redis.Redis(host='localhost', decode_responses=True) # ============ Producer: отправка событий ============ def publish_event(stream_name, event_type, data): """Публикация события в stream""" event = { 'type': event_type, 'timestamp': datetime.utcnow().isoformat(), 'data': json.dumps(data) } # XADD добавляет в stream, * = автогенерация ID message_id = r.xadd(stream_name, event) print(f"✅ Published: {message_id}") return message_id # Примеры событий publish_event('user-events', 'user.registered', { 'user_id': 123, 'email': 'john@example.com' }) publish_event('user-events', 'user.login', { 'user_id': 123, 'ip': '192.168.1.1' }) publish_event('order-events', 'order.created', { 'order_id': 456, 'user_id': 123, 'total': 99.99 })
import redis import json r = redis.Redis(host='localhost', decode_responses=True) STREAM = 'user-events' GROUP = 'analytics-group' CONSUMER = 'consumer-1' # ============ Создание Consumer Group ============ def create_consumer_group(): """Создать группу (если не существует)""" try: # $ = читать только новые сообщения # 0 = читать все с начала r.xgroup_create(STREAM, GROUP, id='0', mkstream=True) print(f"✅ Group '{GROUP}' created") except redis.exceptions.ResponseError as e: if 'BUSYGROUP' in str(e): print(f"ℹ️ Group '{GROUP}' already exists") else: raise # ============ Consumer: обработка событий ============ def process_events(): """Основной цикл обработки событий""" print(f"🚀 Consumer '{CONSUMER}' started, listening to '{STREAM}'...") while True: # Читаем сообщения из группы # > = только новые (не прочитанные этой группой) messages = r.xreadgroup( groupname=GROUP, consumername=CONSUMER, streams={STREAM: '>'}, count=10, # Максимум 10 за раз block=5000 # Ждать 5 сек если нет сообщений ) if not messages: continue for stream, events in messages: for message_id, data in events: try: # Обработка события process_single_event(message_id, data) # Подтверждаем обработку (ACK) r.xack(STREAM, GROUP, message_id) print(f"✅ ACK: {message_id}") except Exception as e: print(f"❌ Error processing {message_id}: {e}") # Не делаем ACK — сообщение останется в pending def process_single_event(message_id, data): """Обработка одного события""" event_type = data.get('type') payload = json.loads(data.get('data', '{}')) print(f"📨 Processing: {event_type}") print(f" Data: {payload}") if event_type == 'user.registered': # Отправить welcome email send_welcome_email(payload['email']) elif event_type == 'user.login': # Записать в аналитику track_login(payload['user_id'], payload['ip']) if __name__ == '__main__': create_consumer_group() process_events()
• Event Sourcing — лог всех изменений в системе
• Activity Feed — лента активности пользователей
• IoT данные — поток данных с датчиков
• Audit Log — журнал аудита действий
• Микросервисы — асинхронная коммуникация между сервисами
✅ Best Practices
- ✓ user:123:profile
- ✓ cache:products:456
- ✓ session:abc123
- ✓ queue:emails:high
- ✓ Сессии: 24 часа
- ✓ Кеш: 5-60 минут
- ✓ Rate limit: 1 минута
- ✓ Временные данные: по задаче
- JSON Читаемость, совместимость
- MessagePack Компактность, скорость
- Pickle Python-only, осторожно!
- ✓ Пароль (requirepass)
- ✓ Bind только localhost
- ✓ Firewall правила
- ✓ TLS для продакшена
Чего избегать
❌ KEYS * — блокирует Redis, используйте SCAN
❌ Большие значения — >100KB замедляют Redis
❌ Без TTL — память заполнится, Redis упадёт
❌ Один ключ для всего — разбивайте данные
❌ Хранить всё в Redis — это кеш, не основная БД
Memory Management
# Максимальная память maxmemory 2gb # Политика вытеснения при нехватке памяти # volatile-lru — удалять ключи с TTL (LRU) # allkeys-lru — удалять любые ключи (LRU) # volatile-ttl — удалять ключи с наименьшим TTL # noeviction — не удалять, возвращать ошибку maxmemory-policy allkeys-lru # Персистентность appendonly yes appendfsync everysec
❓ FAQ
RDB (Snapshot): Снимки данных каждые N минут. Быстрый рестарт, но можно потерять последние данные.
AOF (Append Only File): Логирует каждую операцию. Надёжнее, но больше диск и медленнее рестарт.
RDB + AOF: Рекомендуется для продакшена — надёжность AOF + скорость RDB.
• До 25GB — один инстанс Redis
• 25-100GB — Redis Cluster (шардинг)
• >100GB — подумайте, нужен ли вам Redis для этих данных
Redis использует ~1.5-2x от размера данных (overhead на структуры).
• Богатые структуры данных (списки, множества, хеши...)
• Персистентность
• Pub/Sub, Streams
• Lua scripting
Memcached:
• Проще (только key-value)
• Многопоточный (лучше использует многоядерные CPU)
• Меньше памяти на элемент
Вывод: В 95% случаев — Redis. Memcached — только для простого кеширования с огромным объёмом данных.
Запись: Redis Cluster — шардинг данных по нескольким мастерам. Автоматическое распределение ключей.
Managed: AWS ElastiCache, Redis Cloud — масштабирование одной кнопкой.
• Простой API, быстрый старт
• Только Redis + Python
• Для небольших проектов
Celery:
• Мощный, много возможностей
• Периодические задачи (Beat)
• Цепочки, группы, хорды
• Несколько брокеров
Начните с RQ, переходите на Celery когда понадобятся его возможности.
• Проще настройка, уже есть Redis
• До ~100K msg/sec
• Ограничен памятью
Apache Kafka:
• Миллионы msg/sec
• Хранение на диске (терабайты)
• Сложнее настройка и поддержка
• Лучшая экосистема для Big Data
Вывод: Redis Streams для малых-средних нагрузок. Kafka — когда нужны огромные объёмы и гарантии доставки enterprise-уровня.
INFO — статистика сервераINFO memory — использование памятиSLOWLOG GET 10 — медленные запросыCLIENT LIST — подключённые клиентыИнструменты:
• Redis Insight — официальный GUI
• Prometheus + Grafana — метрики и дашборды
• RedisCommander — веб-интерфейс
1. Запустите Redis — docker run -d -p 6379:6379 redis:alpine
2. Попробуйте redis-cli — поиграйте с командами
3. Добавьте кеширование — в свой Python-проект
4. Настройте фоновые задачи — RQ или Celery
5. Изучите Streams — для event-driven архитектуры
Redis — один из самых полезных инструментов.
Знание Redis выделяет разработчика и открывает множество архитектурных возможностей.
# ═══════════════════════════════════════════════════════════════ # REDIS QUICK REFERENCE # ═══════════════════════════════════════════════════════════════ # ─────────────── STRINGS ─────────────── SET key value # Установить GET key # Получить SETEX key 3600 value # Установить с TTL (секунды) INCR counter # Увеличить на 1 INCRBY counter 10 # Увеличить на N MSET k1 v1 k2 v2 # Установить несколько MGET k1 k2 k3 # Получить несколько # ─────────────── HASHES ─────────────── HSET user:1 name "John" # Установить поле HGET user:1 name # Получить поле HGETALL user:1 # Все поля HMSET user:1 a 1 b 2 # Несколько полей HINCRBY user:1 views 1 # Инкремент поля # ─────────────── LISTS ─────────────── LPUSH queue task1 # Добавить в начало RPUSH queue task1 # Добавить в конец LPOP queue # Взять из начала RPOP queue # Взять из конца BRPOP queue 30 # Блокирующий pop (30 сек) LRANGE queue 0 -1 # Все элементы LLEN queue # Длина # ─────────────── SETS ─────────────── SADD tags python redis # Добавить SMEMBERS tags # Все элементы SISMEMBER tags python # Проверить наличие SINTER tags1 tags2 # Пересечение SUNION tags1 tags2 # Объединение # ─────────────── SORTED SETS ─────────────── ZADD board 100 player1 # Добавить с score ZRANGE board 0 9 # Топ 10 (по возрастанию) ZREVRANGE board 0 9 # Топ 10 (по убыванию) ZRANK board player1 # Ранг игрока ZINCRBY board 10 player1 # Увеличить score # ─────────────── STREAMS ─────────────── XADD events * k1 v1 # Добавить событие XREAD STREAMS events 0 # Читать с начала XRANGE events - + # Все события XLEN events # Количество XGROUP CREATE events grp 0 # Создать группу XREADGROUP GROUP grp c1 ... # Читать в группе XACK events grp id # Подтвердить # ─────────────── KEYS & EXPIRY ─────────────── KEYS user:* # Поиск (НЕ для prod!) SCAN 0 MATCH user:* COUNT 100# Безопасный поиск DEL key # Удалить EXISTS key # Проверить существование EXPIRE key 3600 # Установить TTL TTL key # Узнать TTL PERSIST key # Убрать TTL # ─────────────── PUB/SUB ─────────────── SUBSCRIBE channel # Подписаться PUBLISH channel message # Опубликовать PSUBSCRIBE news:* # Подписка по паттерну # ─────────────── ADMIN ─────────────── INFO # Статистика сервера INFO memory # Память DBSIZE # Количество ключей FLUSHDB # Очистить текущую БД SLOWLOG GET 10 # Медленные запросы CLIENT LIST # Подключения
Docker Compose для разработки
version: '3.8' services: # Redis Server redis: image: redis:7-alpine container_name: redis ports: - "6379:6379" volumes: - redis-data:/data command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 3 # Redis Insight — GUI для Redis redis-insight: image: redislabs/redisinsight:latest container_name: redis-insight ports: - "8001:8001" depends_on: - redis # Ваше приложение app: build: . ports: - "8000:8000" environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis # Celery Worker celery-worker: build: . command: celery -A celery_app worker --loglevel=info environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis # Celery Beat (периодические задачи) celery-beat: build: . command: celery -A celery_app beat --loglevel=info environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis # Flower — мониторинг Celery flower: build: . command: celery -A celery_app flower --port=5555 ports: - "5555:5555" environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis - celery-worker volumes: redis-data: