Redis Pub/Sub - Powiadomienia w czasie rzeczywistym
Redis Pub/Sub Powiadomienia
technologieRedis Pub/Sub - Powiadomienia w czasie rzeczywistym
Współczesne aplikacje internetowe wymagają natychmiastowego dostarczania informacji do użytkowników. Czat, powiadomienia o nowych wiadomościach, aktualizacje cen na giełdzie, wyniki sportowe na żywo - wszystkie te funkcje łączy jedno: potrzeba komunikacji w czasie rzeczywistym. Redis Pub/Sub to wzorzec, który umożliwia budowanie takich systemów w prosty, wydajny i skalowalny sposób.
W tym artykule omówimy dogłębnie mechanizm Publish/Subscribe w Redis, pokażemy jak zintegrować go z WebSocketami, porównamy z Redis Streams oraz alternatywnymi rozwiązaniami, takimi jak RabbitMQ i Apache Kafka, a na koniec zbudujemy kompletny system powiadomień w Node.js.
Czym jest wzorzec Pub/Sub?#
Wzorzec Publish/Subscribe (Pub/Sub) to model komunikacji asynchronicznej, w którym nadawcy wiadomości (publishers) nie wysyłają wiadomości bezpośrednio do konkretnych odbiorców (subscribers). Zamiast tego, wiadomości są publikowane na kanałach (channels), a odbiorcy subskrybują te kanały, aby otrzymywać wiadomości, które ich interesują.
Kluczowe cechy wzorca Pub/Sub:
- Luźne powiązanie - nadawca nie musi wiedzieć, kto odbiera wiadomość
- Komunikacja jeden-do-wielu - jedna wiadomość trafia do wielu odbiorców
- Asynchroniczność - nadawca nie czeka na odpowiedź odbiorcy
- Dynamiczność - odbiorcy mogą dołączać i odłączać się w dowolnym momencie
Redis Pub/Sub - jak to działa?#
Redis implementuje wzorzec Pub/Sub jako natywną funkcjonalność. Serwer Redis działa jako broker wiadomości, zarządzając kanałami i dostarczając wiadomości do wszystkich aktywnych subskrybentów.
Kanały (Channels)#
Kanały w Redis to nazwane strumienie wiadomości. Nie trzeba ich tworzyć - powstają automatycznie, gdy pierwszy subskrybent się podłączy lub gdy pierwsza wiadomość zostanie opublikowana. Nazwy kanałów to dowolne ciągi znaków, ale dobrą praktyką jest stosowanie konwencji z separatorem:
notifications:user:123
chat:room:general
orders:status:updated
system:alerts
Publishers (nadawcy)#
Publisher to klient Redis, który wysyła wiadomości na określony kanał za pomocą komendy PUBLISH:
PUBLISH notifications:user:123 '{"type":"new_message","from":"Jan","text":"Cześć!"}'
Komenda zwraca liczbę subskrybentów, którzy otrzymali wiadomość. Jeśli nikt nie nasłuchuje - wiadomość jest bezpowrotnie tracona. To kluczowa różnica w stosunku do kolejek wiadomości.
Subscribers (odbiorcy)#
Subscriber to klient Redis, który nasłuchuje na jednym lub wielu kanałach:
SUBSCRIBE notifications:user:123 chat:room:general
Redis wspiera również subskrypcję z użyciem wzorców (pattern matching):
PSUBSCRIBE notifications:user:* chat:room:*
Dzięki temu jeden subskrybent może nasłuchiwać na wielu kanałach pasujących do wzorca, co jest niezwykle przydatne w systemach powiadomień.
Podstawowy przykład w Node.js#
Zacznijmy od prostego przykładu z biblioteką ioredis:
import Redis from 'ioredis';
// Subscriber - nasłuchuje na kanale
const subscriber = new Redis({ host: '127.0.0.1', port: 6379 });
subscriber.subscribe('notifications:global', (err, count) => {
if (err) {
console.error('Błąd subskrypcji:', err);
return;
}
console.log(`Subskrybowano ${count} kanałów`);
});
subscriber.on('message', (channel, message) => {
const data = JSON.parse(message);
console.log(`[${channel}] Otrzymano:`, data);
});
// Publisher - wysyła wiadomości
const publisher = new Redis({ host: '127.0.0.1', port: 6379 });
async function sendNotification(userId, notification) {
const message = JSON.stringify({
id: crypto.randomUUID(),
userId,
...notification,
timestamp: Date.now(),
});
const receivers = await publisher.publish(
`notifications:user:${userId}`,
message
);
console.log(`Wiadomość dostarczona do ${receivers} subskrybentów`);
}
// Wysłanie przykładowej notyfikacji
sendNotification('123', {
type: 'order_update',
title: 'Zamówienie wysłane',
body: 'Twoje zamówienie #4567 zostało wysłane.',
});
Ważne: Klient Redis w trybie subskrybenta nie może wykonywać innych komend (GET, SET itp.). Dlatego zawsze potrzebujesz oddzielnych połączeń dla publishera i subscribera.
Integracja z WebSocket - architektura real-time#
Aby dostarczyć powiadomienia z Redis do przeglądarki użytkownika, potrzebujemy warstwy transportowej. WebSocket to idealny wybór - zapewnia dwukierunkową, trwałą komunikację między serwerem a klientem.
Architektura systemu#
[Przeglądarka] <--WebSocket--> [Serwer Node.js] <--Redis Pub/Sub--> [Redis]
^
[Przeglądarka] <--WebSocket--> [Serwer Node.js] <--Redis Pub/Sub--> ---|
|
[Mikroserwis A] ----PUBLISH-----------> |
[Mikroserwis B] ----PUBLISH-----------> |
Każdy serwer Node.js subskrybuje kanały Redis i przekazuje wiadomości do podłączonych klientów WebSocket. Mikroserwisy publikują zdarzenia na kanałach Redis, a te trafiają do wszystkich serwerów, które z kolei dostarczają je do odpowiednich przeglądarek.
Kompletna implementacja serwera#
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import Redis from 'ioredis';
import express from 'express';
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
// Redis - oddzielne połączenia dla sub i pub
const redisSub = new Redis({ host: '127.0.0.1', port: 6379 });
const redisPub = new Redis({ host: '127.0.0.1', port: 6379 });
// Mapa: userId -> Set<WebSocket>
const userConnections = new Map();
// Zarządzanie połączeniami WebSocket
wss.on('connection', (ws, req) => {
const userId = authenticateUser(req); // JWT / session
if (!userId) {
ws.close(4001, 'Unauthorized');
return;
}
// Dodaj połączenie do mapy
if (!userConnections.has(userId)) {
userConnections.set(userId, new Set());
// Subskrybuj kanał użytkownika tylko przy pierwszym połączeniu
redisSub.subscribe(`notifications:user:${userId}`);
}
userConnections.get(userId).add(ws);
console.log(`Użytkownik ${userId} połączony. Aktywne połączenia: ${userConnections.get(userId).size}`);
// Obsługa wiadomości od klienta
ws.on('message', async (data) => {
try {
const msg = JSON.parse(data.toString());
await handleClientMessage(userId, msg);
} catch (err) {
ws.send(JSON.stringify({ error: 'Nieprawidłowa wiadomość' }));
}
});
// Czyszczenie po rozłączeniu
ws.on('close', () => {
const connections = userConnections.get(userId);
if (connections) {
connections.delete(ws);
if (connections.size === 0) {
userConnections.delete(userId);
redisSub.unsubscribe(`notifications:user:${userId}`);
}
}
});
// Wyślij potwierdzenie połączenia
ws.send(JSON.stringify({ type: 'connected', userId }));
});
// Obsługa wiadomości z Redis
redisSub.on('message', (channel, message) => {
// Wyciągnij userId z nazwy kanału
const match = channel.match(/^notifications:user:(.+)$/);
if (!match) return;
const userId = match[1];
const connections = userConnections.get(userId);
if (connections) {
connections.forEach((ws) => {
if (ws.readyState === ws.OPEN) {
ws.send(message);
}
});
}
});
// Subskrybuj kanały globalne
redisSub.subscribe('notifications:global');
redisSub.on('message', (channel, message) => {
if (channel === 'notifications:global') {
// Broadcast do wszystkich połączonych użytkowników
wss.clients.forEach((ws) => {
if (ws.readyState === ws.OPEN) {
ws.send(message);
}
});
}
});
// API do wysyłania powiadomień
app.post('/api/notify', express.json(), async (req, res) => {
const { userId, type, title, body } = req.body;
const notification = JSON.stringify({
id: crypto.randomUUID(),
type,
title,
body,
timestamp: Date.now(),
});
const channel = userId
? `notifications:user:${userId}`
: 'notifications:global';
const receivers = await redisPub.publish(channel, notification);
res.json({ delivered: receivers });
});
async function handleClientMessage(userId, msg) {
if (msg.type === 'mark_read') {
await redisPub.publish(
`notifications:user:${userId}`,
JSON.stringify({ type: 'notification_read', id: msg.notificationId })
);
}
}
function authenticateUser(req) {
// Implementacja autentykacji - JWT, cookie session itp.
const url = new URL(req.url, `http://${req.headers.host}`);
return url.searchParams.get('userId');
}
server.listen(3000, () => {
console.log('Serwer uruchomiony na porcie 3000');
});
Klient WebSocket (przeglądarka)#
class NotificationClient {
constructor(userId) {
this.userId = userId;
this.listeners = new Map();
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.connect();
}
connect() {
this.ws = new WebSocket(`ws://localhost:3000?userId=${this.userId}`);
this.ws.onopen = () => {
console.log('Połączono z serwerem powiadomień');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const notification = JSON.parse(event.data);
this.emit(notification.type, notification);
};
this.ws.onclose = () => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
};
}
on(type, callback) {
if (!this.listeners.has(type)) {
this.listeners.set(type, new Set());
}
this.listeners.get(type).add(callback);
}
emit(type, data) {
const callbacks = this.listeners.get(type);
if (callbacks) {
callbacks.forEach((cb) => cb(data));
}
}
markAsRead(notificationId) {
this.ws.send(JSON.stringify({
type: 'mark_read',
notificationId,
}));
}
}
// Użycie
const client = new NotificationClient('123');
client.on('order_update', (notification) => {
showToast(notification.title, notification.body);
});
client.on('new_message', (notification) => {
updateChatBadge(notification);
});
Redis Streams vs Pub/Sub - co wybrać?#
Redis oferuje dwa mechanizmy komunikacji w czasie rzeczywistym: Pub/Sub i Streams. Choć mogą wydawać się podobne, różnią się fundamentalnie.
Redis Pub/Sub#
- Fire-and-forget - wiadomości nie są przechowywane
- Brak historii - nowy subskrybent nie otrzyma starych wiadomości
- Brak potwierdzenia - brak mechanizmu ACK
- Prostota - minimalny narzut, maksymalna wydajność
- Idealny dla: powiadomienia, czat, live updates
Redis Streams#
- Trwałość - wiadomości są zapisywane w strukturze danych
- Consumer Groups - przetwarzanie równoległe z potwierdzeniami
- Historia - możliwość odczytania starych wiadomości
- Dokładnie raz (at-least-once) - gwarancja dostarczenia
- Idealny dla: kolejki zadań, event sourcing, audyt
// Redis Streams - przykład
// Dodanie wiadomości do streamu
await redis.xadd('orders:events', '*',
'action', 'created',
'orderId', '4567',
'userId', '123'
);
// Odczyt z consumer group
await redis.xreadgroup(
'GROUP', 'order-processors', 'worker-1',
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', 'orders:events', '>'
);
// Potwierdzenie przetworzenia
await redis.xack('orders:events', 'order-processors', messageId);
Kiedy wybrać co?
| Scenariusz | Pub/Sub | Streams | |---|---|---| | Powiadomienia push | ✅ | ❌ | | Czat na żywo | ✅ | ✅ (z historią) | | Kolejka zadań | ❌ | ✅ | | Event sourcing | ❌ | ✅ | | Live dashboard | ✅ | ❌ | | Przetwarzanie logów | ❌ | ✅ |
W praktyce najlepszym rozwiązaniem jest często kombinacja obu. Pub/Sub do natychmiastowego dostarczania, Streams do persystencji i odtwarzania stanu.
Skalowanie z Redis Cluster#
Kiedy pojedynczy serwer Redis nie wystarczy, Redis Cluster pozwala na dystrybucję obciążenia. Jednak Pub/Sub w klastrze działa inaczej niż w trybie standalone.
Sharded Pub/Sub (Redis 7.0+)#
Od wersji 7.0 Redis wprowadził Sharded Pub/Sub, który rozwiązuje problem skalowalności:
import Redis from 'ioredis';
const cluster = new Redis.Cluster([
{ host: '10.0.0.1', port: 6379 },
{ host: '10.0.0.2', port: 6379 },
{ host: '10.0.0.3', port: 6379 },
]);
// Sharded publish - wiadomość trafia tylko do węzła odpowiedzialnego za kanał
await cluster.spublish('notifications:user:123', JSON.stringify(payload));
// Sharded subscribe
cluster.ssubscribe('notifications:user:123', (err) => {
if (err) console.error(err);
});
cluster.on('smessage', (channel, message) => {
console.log(`Sharded message on ${channel}:`, message);
});
W klasycznym Pub/Sub każda wiadomość jest broadcastowana do wszystkich węzłów klastra. Sharded Pub/Sub kieruje wiadomości tylko do węzła, który zarządza danym slotem hash, co drastycznie redukuje ruch sieciowy.
Strategia skalowania#
[Load Balancer]
├── [Node.js Server 1] ──┐
├── [Node.js Server 2] ──┤── Redis Cluster
└── [Node.js Server 3] ──┘ ├── Node A (slots 0-5460)
├── Node B (slots 5461-10922)
└── Node C (slots 10923-16383)
Każdy serwer Node.js utrzymuje połączenie z klastrem Redis. Użytkownicy mogą być podłączeni do dowolnego serwera - Pub/Sub zapewnia, że wiadomość dotrze do właściwego miejsca.
Przypadki użycia#
1. Czat w czasie rzeczywistym#
Redis Pub/Sub idealnie nadaje się do systemów czatowych. Każdy pokój czatu to osobny kanał:
// Dołączenie do pokoju
async function joinRoom(userId, roomId) {
await redisSub.subscribe(`chat:room:${roomId}`);
await redisPub.publish(`chat:room:${roomId}`, JSON.stringify({
type: 'user_joined',
userId,
timestamp: Date.now(),
}));
}
// Wysłanie wiadomości
async function sendMessage(userId, roomId, text) {
const message = {
id: crypto.randomUUID(),
type: 'message',
userId,
roomId,
text,
timestamp: Date.now(),
};
// Zapisz w Redis Streams (persystencja + historia)
await redis.xadd(`chat:history:${roomId}`, '*',
'data', JSON.stringify(message)
);
// Wyślij przez Pub/Sub (real-time delivery)
await redisPub.publish(`chat:room:${roomId}`, JSON.stringify(message));
}
2. Live updates (dashboard, monitoring)#
Aktualizacje na żywo dla dashboardów biznesowych, systemów monitoringu lub tabel wyników:
// Mikroserwis metryczny publikuje dane co sekundę
setInterval(async () => {
const metrics = {
cpu: os.loadavg()[0],
memory: process.memoryUsage().heapUsed,
activeUsers: await redis.scard('active:users'),
requestsPerSec: await redis.get('metrics:rps'),
};
await redisPub.publish('dashboard:metrics', JSON.stringify(metrics));
}, 1000);
3. Gaming - multiplayer i leaderboardy#
W grach multiplayer Redis Pub/Sub obsługuje synchronizację stanu gry:
// Aktualizacja pozycji gracza
async function updatePlayerPosition(gameId, playerId, position) {
await redisPub.publish(`game:${gameId}:state`, JSON.stringify({
type: 'player_move',
playerId,
position,
timestamp: Date.now(),
}));
}
// Aktualizacja tabeli wyników
async function updateLeaderboard(gameId, playerId, score) {
await redis.zadd(`leaderboard:${gameId}`, score, playerId);
const topPlayers = await redis.zrevrange(`leaderboard:${gameId}`, 0, 9, 'WITHSCORES');
await redisPub.publish(`game:${gameId}:leaderboard`, JSON.stringify({
type: 'leaderboard_update',
topPlayers,
}));
}
Porównanie z RabbitMQ i Apache Kafka#
Redis Pub/Sub nie jest jedynym rozwiązaniem do komunikacji asynchronicznej. Porównajmy go z dwoma najpopularniejszymi alternatywami.
Redis Pub/Sub vs RabbitMQ#
| Cecha | Redis Pub/Sub | RabbitMQ | |---|---|---| | Model | Pub/Sub (fire-and-forget) | Queue + Exchange (z potwierdzeniami) | | Trwałość | Brak | Tak (durable queues) | | Routing | Prosty (kanały, wzorce) | Zaawansowany (direct, topic, fanout, headers) | | Protokół | RESP (Redis protocol) | AMQP 0.9.1 | | Latencja | Ultra-niska (~0.1ms) | Niska (~1ms) | | Throughput | ~1M msg/s | ~50K msg/s | | Gwarancja dostarczenia | At-most-once | At-least-once / Exactly-once | | Złożoność operacyjna | Niska | Średnia |
Wybierz Redis Pub/Sub gdy: potrzebujesz minimalnej latencji, akceptujesz utratę wiadomości, masz już Redis w stacku.
Wybierz RabbitMQ gdy: potrzebujesz gwarancji dostarczenia, zaawansowanego routingu, kolejek priorytetowych, dead-letter queues.
Redis Pub/Sub vs Apache Kafka#
| Cecha | Redis Pub/Sub | Apache Kafka | |---|---|---| | Przechowywanie | Brak | Trwałe (log retention) | | Throughput | ~1M msg/s | ~1M+ msg/s (partycjonowanie) | | Latencja | ~0.1ms | ~5-10ms | | Replay | Niemożliwy | Pełny (offset-based) | | Consumer Groups | Brak | Natywne | | Skalowanie | Redis Cluster | Partycje + brokerzy | | Złożoność | Minimalna | Wysoka (ZooKeeper/KRaft) | | Use case | Real-time notifications | Event streaming, data pipelines |
Wybierz Redis Pub/Sub gdy: budujesz system powiadomień, czat, live updates - gdzie liczy się prostota i latencja.
Wybierz Kafkę gdy: budujesz pipeline danych, event sourcing, potrzebujesz replay i trwałości, przetwarzasz miliony zdarzeń na minutę.
Kiedy użyć Redis Streams zamiast obu?#
Redis Streams to kompromis - oferuje trwałość i consumer groups jak Kafka, ale z prostotą Redis. Dla wielu zastosowań jest to wystarczające rozwiązanie, eliminujące potrzebę dodatkowej infrastruktury.
Wzorce projektowe i best practices#
1. Konwencja nazewnictwa kanałów#
{domena}:{encja}:{identyfikator}:{zdarzenie}
notifications:user:123:new_message
orders:status:4567:updated
chat:room:general:message
system:health:server-1:alert
2. Struktura wiadomości#
const messageSchema = {
id: 'uuid-v4', // Unikalne ID wiadomości
type: 'string', // Typ zdarzenia
source: 'string', // Źródło (mikroserwis)
timestamp: 'number', // Unix timestamp
version: 'number', // Wersja schematu
data: {}, // Payload
metadata: {}, // Dodatkowe informacje
};
3. Obsługa błędów i reconnect#
const redisSub = new Redis({
host: '127.0.0.1',
port: 6379,
retryStrategy(times) {
const delay = Math.min(times * 100, 3000);
return delay;
},
maxRetriesPerRequest: null,
enableReadyCheck: true,
});
redisSub.on('error', (err) => {
console.error('Redis subscriber error:', err);
});
redisSub.on('reconnecting', (delay) => {
console.log(`Reconnecting to Redis in ${delay}ms...`);
});
4. Monitoring i metryki#
// Monitoruj liczbę subskrybentów
async function getChannelStats() {
const info = await redisPub.pubsub('NUMSUB',
'notifications:global',
'chat:room:general'
);
// info = ['notifications:global', 5, 'chat:room:general', 12]
return info;
}
// Monitoruj liczbę aktywnych kanałów
async function getActiveChannels() {
const channels = await redisPub.pubsub('CHANNELS', 'notifications:*');
return channels.length;
}
Podsumowanie#
Redis Pub/Sub to potężne, a jednocześnie proste narzędzie do budowy systemów komunikacji w czasie rzeczywistym. Kluczowe wnioski:
- Pub/Sub jest fire-and-forget - idealny do powiadomień, czatu, live updates
- Streams uzupełniają Pub/Sub - gdy potrzebujesz trwałości i historii
- WebSocket + Redis Pub/Sub - to standard w architekturze real-time
- Sharded Pub/Sub (Redis 7.0+) - rozwiązuje problemy skalowania
- Redis nie zastępuje Kafki - to różne narzędzia do różnych problemów
Redis Pub/Sub sprawdza się najlepiej jako warstwa dostarczania w architekturze mikroserwisowej, gdzie potrzebujesz minimalnej latencji i prostoty wdrożenia.
Potrzebujesz systemu powiadomień real-time?#
W MDS Software Solutions Group projektujemy i wdrażamy skalowalne systemy komunikacji w czasie rzeczywistym. Od czatów, przez systemy alertów, po dashboardy live - wykorzystujemy Redis Pub/Sub, WebSocket i sprawdzone wzorce architektoniczne, aby dostarczyć rozwiązania, które działają niezawodnie pod dużym obciążeniem.
Skontaktuj się z nami - pomożemy Ci wybrać odpowiednią technologię i zbudować system dopasowany do Twoich potrzeb.
Zespół ekspertów programistycznych specjalizujących się w nowoczesnych technologiach webowych.