Przejdź do treści
Technologie

Redis Pub/Sub - Powiadomienia w czasie rzeczywistym

Opublikowano:
·7 min czytania·Autor: MDS Software Solutions Group

Redis Pub/Sub Powiadomienia

technologie

Redis Pub/Sub - Powiadomienia w czasie rzeczywistym

Współczesne aplikacje internetowe wymagają natychmiastowego dostarczania informacji do użytkowników. Czat, powiadomienia o nowych wiadomościach, aktualizacje cen na giełdzie, wyniki sportowe na żywo - wszystkie te funkcje łączy jedno: potrzeba komunikacji w czasie rzeczywistym. Redis Pub/Sub to wzorzec, który umożliwia budowanie takich systemów w prosty, wydajny i skalowalny sposób.

W tym artykule omówimy dogłębnie mechanizm Publish/Subscribe w Redis, pokażemy jak zintegrować go z WebSocketami, porównamy z Redis Streams oraz alternatywnymi rozwiązaniami, takimi jak RabbitMQ i Apache Kafka, a na koniec zbudujemy kompletny system powiadomień w Node.js.

Czym jest wzorzec Pub/Sub?#

Wzorzec Publish/Subscribe (Pub/Sub) to model komunikacji asynchronicznej, w którym nadawcy wiadomości (publishers) nie wysyłają wiadomości bezpośrednio do konkretnych odbiorców (subscribers). Zamiast tego, wiadomości są publikowane na kanałach (channels), a odbiorcy subskrybują te kanały, aby otrzymywać wiadomości, które ich interesują.

Kluczowe cechy wzorca Pub/Sub:

  • Luźne powiązanie - nadawca nie musi wiedzieć, kto odbiera wiadomość
  • Komunikacja jeden-do-wielu - jedna wiadomość trafia do wielu odbiorców
  • Asynchroniczność - nadawca nie czeka na odpowiedź odbiorcy
  • Dynamiczność - odbiorcy mogą dołączać i odłączać się w dowolnym momencie

Redis Pub/Sub - jak to działa?#

Redis implementuje wzorzec Pub/Sub jako natywną funkcjonalność. Serwer Redis działa jako broker wiadomości, zarządzając kanałami i dostarczając wiadomości do wszystkich aktywnych subskrybentów.

Kanały (Channels)#

Kanały w Redis to nazwane strumienie wiadomości. Nie trzeba ich tworzyć - powstają automatycznie, gdy pierwszy subskrybent się podłączy lub gdy pierwsza wiadomość zostanie opublikowana. Nazwy kanałów to dowolne ciągi znaków, ale dobrą praktyką jest stosowanie konwencji z separatorem:

notifications:user:123
chat:room:general
orders:status:updated
system:alerts

Publishers (nadawcy)#

Publisher to klient Redis, który wysyła wiadomości na określony kanał za pomocą komendy PUBLISH:

PUBLISH notifications:user:123 '{"type":"new_message","from":"Jan","text":"Cześć!"}'

Komenda zwraca liczbę subskrybentów, którzy otrzymali wiadomość. Jeśli nikt nie nasłuchuje - wiadomość jest bezpowrotnie tracona. To kluczowa różnica w stosunku do kolejek wiadomości.

Subscribers (odbiorcy)#

Subscriber to klient Redis, który nasłuchuje na jednym lub wielu kanałach:

SUBSCRIBE notifications:user:123 chat:room:general

Redis wspiera również subskrypcję z użyciem wzorców (pattern matching):

PSUBSCRIBE notifications:user:* chat:room:*

Dzięki temu jeden subskrybent może nasłuchiwać na wielu kanałach pasujących do wzorca, co jest niezwykle przydatne w systemach powiadomień.

Podstawowy przykład w Node.js#

Zacznijmy od prostego przykładu z biblioteką ioredis:

import Redis from 'ioredis';

// Subscriber - nasłuchuje na kanale
const subscriber = new Redis({ host: '127.0.0.1', port: 6379 });

subscriber.subscribe('notifications:global', (err, count) => {
  if (err) {
    console.error('Błąd subskrypcji:', err);
    return;
  }
  console.log(`Subskrybowano ${count} kanałów`);
});

subscriber.on('message', (channel, message) => {
  const data = JSON.parse(message);
  console.log(`[${channel}] Otrzymano:`, data);
});

// Publisher - wysyła wiadomości
const publisher = new Redis({ host: '127.0.0.1', port: 6379 });

async function sendNotification(userId, notification) {
  const message = JSON.stringify({
    id: crypto.randomUUID(),
    userId,
    ...notification,
    timestamp: Date.now(),
  });

  const receivers = await publisher.publish(
    `notifications:user:${userId}`,
    message
  );
  console.log(`Wiadomość dostarczona do ${receivers} subskrybentów`);
}

// Wysłanie przykładowej notyfikacji
sendNotification('123', {
  type: 'order_update',
  title: 'Zamówienie wysłane',
  body: 'Twoje zamówienie #4567 zostało wysłane.',
});

Ważne: Klient Redis w trybie subskrybenta nie może wykonywać innych komend (GET, SET itp.). Dlatego zawsze potrzebujesz oddzielnych połączeń dla publishera i subscribera.

Integracja z WebSocket - architektura real-time#

Aby dostarczyć powiadomienia z Redis do przeglądarki użytkownika, potrzebujemy warstwy transportowej. WebSocket to idealny wybór - zapewnia dwukierunkową, trwałą komunikację między serwerem a klientem.

Architektura systemu#

[Przeglądarka] <--WebSocket--> [Serwer Node.js] <--Redis Pub/Sub--> [Redis]
                                                                        ^
[Przeglądarka] <--WebSocket--> [Serwer Node.js] <--Redis Pub/Sub--> ---|
                                                                        |
                                [Mikroserwis A] ----PUBLISH-----------> |
                                [Mikroserwis B] ----PUBLISH-----------> |

Każdy serwer Node.js subskrybuje kanały Redis i przekazuje wiadomości do podłączonych klientów WebSocket. Mikroserwisy publikują zdarzenia na kanałach Redis, a te trafiają do wszystkich serwerów, które z kolei dostarczają je do odpowiednich przeglądarek.

Kompletna implementacja serwera#

import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import Redis from 'ioredis';
import express from 'express';

const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });

// Redis - oddzielne połączenia dla sub i pub
const redisSub = new Redis({ host: '127.0.0.1', port: 6379 });
const redisPub = new Redis({ host: '127.0.0.1', port: 6379 });

// Mapa: userId -> Set<WebSocket>
const userConnections = new Map();

// Zarządzanie połączeniami WebSocket
wss.on('connection', (ws, req) => {
  const userId = authenticateUser(req); // JWT / session
  if (!userId) {
    ws.close(4001, 'Unauthorized');
    return;
  }

  // Dodaj połączenie do mapy
  if (!userConnections.has(userId)) {
    userConnections.set(userId, new Set());
    // Subskrybuj kanał użytkownika tylko przy pierwszym połączeniu
    redisSub.subscribe(`notifications:user:${userId}`);
  }
  userConnections.get(userId).add(ws);

  console.log(`Użytkownik ${userId} połączony. Aktywne połączenia: ${userConnections.get(userId).size}`);

  // Obsługa wiadomości od klienta
  ws.on('message', async (data) => {
    try {
      const msg = JSON.parse(data.toString());
      await handleClientMessage(userId, msg);
    } catch (err) {
      ws.send(JSON.stringify({ error: 'Nieprawidłowa wiadomość' }));
    }
  });

  // Czyszczenie po rozłączeniu
  ws.on('close', () => {
    const connections = userConnections.get(userId);
    if (connections) {
      connections.delete(ws);
      if (connections.size === 0) {
        userConnections.delete(userId);
        redisSub.unsubscribe(`notifications:user:${userId}`);
      }
    }
  });

  // Wyślij potwierdzenie połączenia
  ws.send(JSON.stringify({ type: 'connected', userId }));
});

// Obsługa wiadomości z Redis
redisSub.on('message', (channel, message) => {
  // Wyciągnij userId z nazwy kanału
  const match = channel.match(/^notifications:user:(.+)$/);
  if (!match) return;

  const userId = match[1];
  const connections = userConnections.get(userId);

  if (connections) {
    connections.forEach((ws) => {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
      }
    });
  }
});

// Subskrybuj kanały globalne
redisSub.subscribe('notifications:global');
redisSub.on('message', (channel, message) => {
  if (channel === 'notifications:global') {
    // Broadcast do wszystkich połączonych użytkowników
    wss.clients.forEach((ws) => {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
      }
    });
  }
});

// API do wysyłania powiadomień
app.post('/api/notify', express.json(), async (req, res) => {
  const { userId, type, title, body } = req.body;

  const notification = JSON.stringify({
    id: crypto.randomUUID(),
    type,
    title,
    body,
    timestamp: Date.now(),
  });

  const channel = userId
    ? `notifications:user:${userId}`
    : 'notifications:global';

  const receivers = await redisPub.publish(channel, notification);
  res.json({ delivered: receivers });
});

async function handleClientMessage(userId, msg) {
  if (msg.type === 'mark_read') {
    await redisPub.publish(
      `notifications:user:${userId}`,
      JSON.stringify({ type: 'notification_read', id: msg.notificationId })
    );
  }
}

function authenticateUser(req) {
  // Implementacja autentykacji - JWT, cookie session itp.
  const url = new URL(req.url, `http://${req.headers.host}`);
  return url.searchParams.get('userId');
}

server.listen(3000, () => {
  console.log('Serwer uruchomiony na porcie 3000');
});

Klient WebSocket (przeglądarka)#

class NotificationClient {
  constructor(userId) {
    this.userId = userId;
    this.listeners = new Map();
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(`ws://localhost:3000?userId=${this.userId}`);

    this.ws.onopen = () => {
      console.log('Połączono z serwerem powiadomień');
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = (event) => {
      const notification = JSON.parse(event.data);
      this.emit(notification.type, notification);
    };

    this.ws.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
        this.reconnectAttempts++;
        setTimeout(() => this.connect(), delay);
      }
    };
  }

  on(type, callback) {
    if (!this.listeners.has(type)) {
      this.listeners.set(type, new Set());
    }
    this.listeners.get(type).add(callback);
  }

  emit(type, data) {
    const callbacks = this.listeners.get(type);
    if (callbacks) {
      callbacks.forEach((cb) => cb(data));
    }
  }

  markAsRead(notificationId) {
    this.ws.send(JSON.stringify({
      type: 'mark_read',
      notificationId,
    }));
  }
}

// Użycie
const client = new NotificationClient('123');

client.on('order_update', (notification) => {
  showToast(notification.title, notification.body);
});

client.on('new_message', (notification) => {
  updateChatBadge(notification);
});

Redis Streams vs Pub/Sub - co wybrać?#

Redis oferuje dwa mechanizmy komunikacji w czasie rzeczywistym: Pub/Sub i Streams. Choć mogą wydawać się podobne, różnią się fundamentalnie.

Redis Pub/Sub#

  • Fire-and-forget - wiadomości nie są przechowywane
  • Brak historii - nowy subskrybent nie otrzyma starych wiadomości
  • Brak potwierdzenia - brak mechanizmu ACK
  • Prostota - minimalny narzut, maksymalna wydajność
  • Idealny dla: powiadomienia, czat, live updates

Redis Streams#

  • Trwałość - wiadomości są zapisywane w strukturze danych
  • Consumer Groups - przetwarzanie równoległe z potwierdzeniami
  • Historia - możliwość odczytania starych wiadomości
  • Dokładnie raz (at-least-once) - gwarancja dostarczenia
  • Idealny dla: kolejki zadań, event sourcing, audyt
// Redis Streams - przykład
// Dodanie wiadomości do streamu
await redis.xadd('orders:events', '*',
  'action', 'created',
  'orderId', '4567',
  'userId', '123'
);

// Odczyt z consumer group
await redis.xreadgroup(
  'GROUP', 'order-processors', 'worker-1',
  'COUNT', 10,
  'BLOCK', 5000,
  'STREAMS', 'orders:events', '>'
);

// Potwierdzenie przetworzenia
await redis.xack('orders:events', 'order-processors', messageId);

Kiedy wybrać co?

| Scenariusz | Pub/Sub | Streams | |---|---|---| | Powiadomienia push | ✅ | ❌ | | Czat na żywo | ✅ | ✅ (z historią) | | Kolejka zadań | ❌ | ✅ | | Event sourcing | ❌ | ✅ | | Live dashboard | ✅ | ❌ | | Przetwarzanie logów | ❌ | ✅ |

W praktyce najlepszym rozwiązaniem jest często kombinacja obu. Pub/Sub do natychmiastowego dostarczania, Streams do persystencji i odtwarzania stanu.

Skalowanie z Redis Cluster#

Kiedy pojedynczy serwer Redis nie wystarczy, Redis Cluster pozwala na dystrybucję obciążenia. Jednak Pub/Sub w klastrze działa inaczej niż w trybie standalone.

Sharded Pub/Sub (Redis 7.0+)#

Od wersji 7.0 Redis wprowadził Sharded Pub/Sub, który rozwiązuje problem skalowalności:

import Redis from 'ioredis';

const cluster = new Redis.Cluster([
  { host: '10.0.0.1', port: 6379 },
  { host: '10.0.0.2', port: 6379 },
  { host: '10.0.0.3', port: 6379 },
]);

// Sharded publish - wiadomość trafia tylko do węzła odpowiedzialnego za kanał
await cluster.spublish('notifications:user:123', JSON.stringify(payload));

// Sharded subscribe
cluster.ssubscribe('notifications:user:123', (err) => {
  if (err) console.error(err);
});

cluster.on('smessage', (channel, message) => {
  console.log(`Sharded message on ${channel}:`, message);
});

W klasycznym Pub/Sub każda wiadomość jest broadcastowana do wszystkich węzłów klastra. Sharded Pub/Sub kieruje wiadomości tylko do węzła, który zarządza danym slotem hash, co drastycznie redukuje ruch sieciowy.

Strategia skalowania#

[Load Balancer]
    ├── [Node.js Server 1] ──┐
    ├── [Node.js Server 2] ──┤── Redis Cluster
    └── [Node.js Server 3] ──┘    ├── Node A (slots 0-5460)
                                   ├── Node B (slots 5461-10922)
                                   └── Node C (slots 10923-16383)

Każdy serwer Node.js utrzymuje połączenie z klastrem Redis. Użytkownicy mogą być podłączeni do dowolnego serwera - Pub/Sub zapewnia, że wiadomość dotrze do właściwego miejsca.

Przypadki użycia#

1. Czat w czasie rzeczywistym#

Redis Pub/Sub idealnie nadaje się do systemów czatowych. Każdy pokój czatu to osobny kanał:

// Dołączenie do pokoju
async function joinRoom(userId, roomId) {
  await redisSub.subscribe(`chat:room:${roomId}`);
  await redisPub.publish(`chat:room:${roomId}`, JSON.stringify({
    type: 'user_joined',
    userId,
    timestamp: Date.now(),
  }));
}

// Wysłanie wiadomości
async function sendMessage(userId, roomId, text) {
  const message = {
    id: crypto.randomUUID(),
    type: 'message',
    userId,
    roomId,
    text,
    timestamp: Date.now(),
  };

  // Zapisz w Redis Streams (persystencja + historia)
  await redis.xadd(`chat:history:${roomId}`, '*',
    'data', JSON.stringify(message)
  );

  // Wyślij przez Pub/Sub (real-time delivery)
  await redisPub.publish(`chat:room:${roomId}`, JSON.stringify(message));
}

2. Live updates (dashboard, monitoring)#

Aktualizacje na żywo dla dashboardów biznesowych, systemów monitoringu lub tabel wyników:

// Mikroserwis metryczny publikuje dane co sekundę
setInterval(async () => {
  const metrics = {
    cpu: os.loadavg()[0],
    memory: process.memoryUsage().heapUsed,
    activeUsers: await redis.scard('active:users'),
    requestsPerSec: await redis.get('metrics:rps'),
  };

  await redisPub.publish('dashboard:metrics', JSON.stringify(metrics));
}, 1000);

3. Gaming - multiplayer i leaderboardy#

W grach multiplayer Redis Pub/Sub obsługuje synchronizację stanu gry:

// Aktualizacja pozycji gracza
async function updatePlayerPosition(gameId, playerId, position) {
  await redisPub.publish(`game:${gameId}:state`, JSON.stringify({
    type: 'player_move',
    playerId,
    position,
    timestamp: Date.now(),
  }));
}

// Aktualizacja tabeli wyników
async function updateLeaderboard(gameId, playerId, score) {
  await redis.zadd(`leaderboard:${gameId}`, score, playerId);
  const topPlayers = await redis.zrevrange(`leaderboard:${gameId}`, 0, 9, 'WITHSCORES');

  await redisPub.publish(`game:${gameId}:leaderboard`, JSON.stringify({
    type: 'leaderboard_update',
    topPlayers,
  }));
}

Porównanie z RabbitMQ i Apache Kafka#

Redis Pub/Sub nie jest jedynym rozwiązaniem do komunikacji asynchronicznej. Porównajmy go z dwoma najpopularniejszymi alternatywami.

Redis Pub/Sub vs RabbitMQ#

| Cecha | Redis Pub/Sub | RabbitMQ | |---|---|---| | Model | Pub/Sub (fire-and-forget) | Queue + Exchange (z potwierdzeniami) | | Trwałość | Brak | Tak (durable queues) | | Routing | Prosty (kanały, wzorce) | Zaawansowany (direct, topic, fanout, headers) | | Protokół | RESP (Redis protocol) | AMQP 0.9.1 | | Latencja | Ultra-niska (~0.1ms) | Niska (~1ms) | | Throughput | ~1M msg/s | ~50K msg/s | | Gwarancja dostarczenia | At-most-once | At-least-once / Exactly-once | | Złożoność operacyjna | Niska | Średnia |

Wybierz Redis Pub/Sub gdy: potrzebujesz minimalnej latencji, akceptujesz utratę wiadomości, masz już Redis w stacku.

Wybierz RabbitMQ gdy: potrzebujesz gwarancji dostarczenia, zaawansowanego routingu, kolejek priorytetowych, dead-letter queues.

Redis Pub/Sub vs Apache Kafka#

| Cecha | Redis Pub/Sub | Apache Kafka | |---|---|---| | Przechowywanie | Brak | Trwałe (log retention) | | Throughput | ~1M msg/s | ~1M+ msg/s (partycjonowanie) | | Latencja | ~0.1ms | ~5-10ms | | Replay | Niemożliwy | Pełny (offset-based) | | Consumer Groups | Brak | Natywne | | Skalowanie | Redis Cluster | Partycje + brokerzy | | Złożoność | Minimalna | Wysoka (ZooKeeper/KRaft) | | Use case | Real-time notifications | Event streaming, data pipelines |

Wybierz Redis Pub/Sub gdy: budujesz system powiadomień, czat, live updates - gdzie liczy się prostota i latencja.

Wybierz Kafkę gdy: budujesz pipeline danych, event sourcing, potrzebujesz replay i trwałości, przetwarzasz miliony zdarzeń na minutę.

Kiedy użyć Redis Streams zamiast obu?#

Redis Streams to kompromis - oferuje trwałość i consumer groups jak Kafka, ale z prostotą Redis. Dla wielu zastosowań jest to wystarczające rozwiązanie, eliminujące potrzebę dodatkowej infrastruktury.

Wzorce projektowe i best practices#

1. Konwencja nazewnictwa kanałów#

{domena}:{encja}:{identyfikator}:{zdarzenie}

notifications:user:123:new_message
orders:status:4567:updated
chat:room:general:message
system:health:server-1:alert

2. Struktura wiadomości#

const messageSchema = {
  id: 'uuid-v4',           // Unikalne ID wiadomości
  type: 'string',          // Typ zdarzenia
  source: 'string',        // Źródło (mikroserwis)
  timestamp: 'number',     // Unix timestamp
  version: 'number',       // Wersja schematu
  data: {},                // Payload
  metadata: {},            // Dodatkowe informacje
};

3. Obsługa błędów i reconnect#

const redisSub = new Redis({
  host: '127.0.0.1',
  port: 6379,
  retryStrategy(times) {
    const delay = Math.min(times * 100, 3000);
    return delay;
  },
  maxRetriesPerRequest: null,
  enableReadyCheck: true,
});

redisSub.on('error', (err) => {
  console.error('Redis subscriber error:', err);
});

redisSub.on('reconnecting', (delay) => {
  console.log(`Reconnecting to Redis in ${delay}ms...`);
});

4. Monitoring i metryki#

// Monitoruj liczbę subskrybentów
async function getChannelStats() {
  const info = await redisPub.pubsub('NUMSUB',
    'notifications:global',
    'chat:room:general'
  );

  // info = ['notifications:global', 5, 'chat:room:general', 12]
  return info;
}

// Monitoruj liczbę aktywnych kanałów
async function getActiveChannels() {
  const channels = await redisPub.pubsub('CHANNELS', 'notifications:*');
  return channels.length;
}

Podsumowanie#

Redis Pub/Sub to potężne, a jednocześnie proste narzędzie do budowy systemów komunikacji w czasie rzeczywistym. Kluczowe wnioski:

  1. Pub/Sub jest fire-and-forget - idealny do powiadomień, czatu, live updates
  2. Streams uzupełniają Pub/Sub - gdy potrzebujesz trwałości i historii
  3. WebSocket + Redis Pub/Sub - to standard w architekturze real-time
  4. Sharded Pub/Sub (Redis 7.0+) - rozwiązuje problemy skalowania
  5. Redis nie zastępuje Kafki - to różne narzędzia do różnych problemów

Redis Pub/Sub sprawdza się najlepiej jako warstwa dostarczania w architekturze mikroserwisowej, gdzie potrzebujesz minimalnej latencji i prostoty wdrożenia.


Potrzebujesz systemu powiadomień real-time?#

W MDS Software Solutions Group projektujemy i wdrażamy skalowalne systemy komunikacji w czasie rzeczywistym. Od czatów, przez systemy alertów, po dashboardy live - wykorzystujemy Redis Pub/Sub, WebSocket i sprawdzone wzorce architektoniczne, aby dostarczyć rozwiązania, które działają niezawodnie pod dużym obciążeniem.

Skontaktuj się z nami - pomożemy Ci wybrać odpowiednią technologię i zbudować system dopasowany do Twoich potrzeb.

Autor
MDS Software Solutions Group

Zespół ekspertów programistycznych specjalizujących się w nowoczesnych technologiach webowych.

Redis Pub/Sub - Powiadomienia w czasie rzeczywistym | MDS Software Solutions Group | MDS Software Solutions Group