Przejdź do treści
Technologien

Redis Pub/Sub - Echtzeit-Benachrichtigungen

Veröffentlicht am:
·7 Min. Lesezeit·Autor: MDS Software Solutions Group

Redis Pub/Sub Echtzeit-Benachrichtigungen

technologie

Redis Pub/Sub - Echtzeit-Benachrichtigungen

Moderne Webanwendungen erfordern die sofortige Zustellung von Informationen an Benutzer. Chat-Systeme, Nachrichtenbenachrichtigungen, Live-Aktienkurse, Echtzeit-Sportergebnisse - all diese Funktionen teilen eine gemeinsame Anforderung: Echtzeitkommunikation. Redis Pub/Sub ist ein Muster, das den Aufbau solcher Systeme auf einfache, performante und skalierbare Weise ermoeglicht.

In diesem Artikel werden wir den Publish/Subscribe-Mechanismus in Redis eingehend untersuchen, die Integration mit WebSockets demonstrieren, ihn mit Redis Streams und alternativen Loesungen wie RabbitMQ und Apache Kafka vergleichen und schliesslich ein vollstaendiges Benachrichtigungssystem in Node.js aufbauen.

Was ist das Pub/Sub-Muster?#

Das Publish/Subscribe (Pub/Sub)-Muster ist ein asynchrones Kommunikationsmodell, bei dem Nachrichtensender (Publisher) Nachrichten nicht direkt an bestimmte Empfaenger (Subscriber) senden. Stattdessen werden Nachrichten in Kanaelen veroeffentlicht, und Empfaenger abonnieren diese Kanaele, um die fuer sie relevanten Nachrichten zu erhalten.

Hauptmerkmale des Pub/Sub-Musters:

  • Lose Kopplung - der Sender muss nicht wissen, wer die Nachricht empfaengt
  • Eins-zu-viele-Kommunikation - eine einzelne Nachricht erreicht mehrere Empfaenger
  • Asynchron - der Sender wartet nicht auf eine Antwort des Empfaengers
  • Dynamisch - Empfaenger koennen jederzeit beitreten und verlassen

Wie Redis Pub/Sub funktioniert#

Redis implementiert das Pub/Sub-Muster als native Funktion. Der Redis-Server fungiert als Message Broker, verwaltet Kanaele und liefert Nachrichten an alle aktiven Subscriber.

Kanaele#

Kanaele in Redis sind benannte Nachrichtenstroeme. Sie muessen nicht explizit erstellt werden - sie entstehen automatisch, wenn sich der erste Subscriber verbindet oder die erste Nachricht veroeffentlicht wird. Kanalnamen sind beliebige Strings, aber eine gute Praxis ist die Verwendung einer hierarchischen Namenskonvention mit Trennzeichen:

notifications:user:123
chat:room:general
orders:status:updated
system:alerts

Publisher#

Ein Publisher ist ein Redis-Client, der Nachrichten an einen bestimmten Kanal mit dem PUBLISH-Befehl sendet:

PUBLISH notifications:user:123 '{"type":"new_message","from":"John","text":"Hello!"}'

Der Befehl gibt die Anzahl der Subscriber zurueck, die die Nachricht erhalten haben. Wenn niemand zuhoert, geht die Nachricht fuer immer verloren. Dies ist ein entscheidender Unterschied zu Message Queues.

Subscriber#

Ein Subscriber ist ein Redis-Client, der auf einem oder mehreren Kanaelen lauscht:

SUBSCRIBE notifications:user:123 chat:room:general

Redis unterstuetzt auch musterbasierte Subskriptionen:

PSUBSCRIBE notifications:user:* chat:room:*

Dies ermoeglicht es einem einzelnen Subscriber, auf mehreren Kanaelen zu lauschen, die einem Muster entsprechen, was in Benachrichtigungssystemen aeusserst nuetzlich ist.

Einfaches Node.js-Beispiel#

Beginnen wir mit einem einfachen Beispiel unter Verwendung der ioredis-Bibliothek:

import Redis from 'ioredis';

// Subscriber - lauscht auf einem Kanal
const subscriber = new Redis({ host: '127.0.0.1', port: 6379 });

subscriber.subscribe('notifications:global', (err, count) => {
  if (err) {
    console.error('Subscription error:', err);
    return;
  }
  console.log(`Subscribed to ${count} channels`);
});

subscriber.on('message', (channel, message) => {
  const data = JSON.parse(message);
  console.log(`[${channel}] Received:`, data);
});

// Publisher - sendet Nachrichten
const publisher = new Redis({ host: '127.0.0.1', port: 6379 });

async function sendNotification(userId, notification) {
  const message = JSON.stringify({
    id: crypto.randomUUID(),
    userId,
    ...notification,
    timestamp: Date.now(),
  });

  const receivers = await publisher.publish(
    `notifications:user:${userId}`,
    message
  );
  console.log(`Message delivered to ${receivers} subscribers`);
}

// Eine Beispielbenachrichtigung senden
sendNotification('123', {
  type: 'order_update',
  title: 'Order Shipped',
  body: 'Your order #4567 has been shipped.',
});

Wichtig: Ein Redis-Client im Subscriber-Modus kann keine anderen Befehle ausfuehren (GET, SET, usw.). Deshalb benoetigen Sie immer separate Verbindungen fuer Publisher und Subscriber.

WebSocket-Integration - Echtzeit-Architektur#

Um Benachrichtigungen von Redis an den Browser des Benutzers zu liefern, benoetigen wir eine Transportschicht. WebSocket ist die ideale Wahl - es bietet bidirektionale, persistente Kommunikation zwischen Server und Client.

Systemarchitektur#

[Browser]     <--WebSocket--> [Node.js Server] <--Redis Pub/Sub--> [Redis]
                                                                      ^
[Browser]     <--WebSocket--> [Node.js Server] <--Redis Pub/Sub--> ---|
                                                                      |
                              [Microservice A] ----PUBLISH----------> |
                              [Microservice B] ----PUBLISH----------> |

Jeder Node.js-Server abonniert Redis-Kanaele und leitet Nachrichten an verbundene WebSocket-Clients weiter. Microservices veroeffentlichen Events in Redis-Kanaelen, die alle Server erreichen, welche sie wiederum an die entsprechenden Browser liefern.

Vollstaendige Server-Implementierung#

import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import Redis from 'ioredis';
import express from 'express';

const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });

// Redis - separate Verbindungen fuer Sub und Pub
const redisSub = new Redis({ host: '127.0.0.1', port: 6379 });
const redisPub = new Redis({ host: '127.0.0.1', port: 6379 });

// Map: userId -> Set<WebSocket>
const userConnections = new Map();

// WebSocket-Verbindungsverwaltung
wss.on('connection', (ws, req) => {
  const userId = authenticateUser(req); // JWT / session
  if (!userId) {
    ws.close(4001, 'Unauthorized');
    return;
  }

  // Verbindung zur Map hinzufuegen
  if (!userConnections.has(userId)) {
    userConnections.set(userId, new Set());
    // Benutzerkanal nur bei erster Verbindung abonnieren
    redisSub.subscribe(`notifications:user:${userId}`);
  }
  userConnections.get(userId).add(ws);

  console.log(`User ${userId} connected. Active connections: ${userConnections.get(userId).size}`);

  // Nachrichten vom Client verarbeiten
  ws.on('message', async (data) => {
    try {
      const msg = JSON.parse(data.toString());
      await handleClientMessage(userId, msg);
    } catch (err) {
      ws.send(JSON.stringify({ error: 'Invalid message' }));
    }
  });

  // Bereinigung bei Trennung
  ws.on('close', () => {
    const connections = userConnections.get(userId);
    if (connections) {
      connections.delete(ws);
      if (connections.size === 0) {
        userConnections.delete(userId);
        redisSub.unsubscribe(`notifications:user:${userId}`);
      }
    }
  });

  // Verbindungsbestaetigung senden
  ws.send(JSON.stringify({ type: 'connected', userId }));
});

// Nachrichten von Redis verarbeiten
redisSub.on('message', (channel, message) => {
  // userId aus Kanalnamen extrahieren
  const match = channel.match(/^notifications:user:(.+)$/);
  if (!match) return;

  const userId = match[1];
  const connections = userConnections.get(userId);

  if (connections) {
    connections.forEach((ws) => {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
      }
    });
  }
});

// Globale Kanaele abonnieren
redisSub.subscribe('notifications:global');
redisSub.on('message', (channel, message) => {
  if (channel === 'notifications:global') {
    // An alle verbundenen Benutzer senden
    wss.clients.forEach((ws) => {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
      }
    });
  }
});

// API zum Senden von Benachrichtigungen
app.post('/api/notify', express.json(), async (req, res) => {
  const { userId, type, title, body } = req.body;

  const notification = JSON.stringify({
    id: crypto.randomUUID(),
    type,
    title,
    body,
    timestamp: Date.now(),
  });

  const channel = userId
    ? `notifications:user:${userId}`
    : 'notifications:global';

  const receivers = await redisPub.publish(channel, notification);
  res.json({ delivered: receivers });
});

async function handleClientMessage(userId, msg) {
  if (msg.type === 'mark_read') {
    await redisPub.publish(
      `notifications:user:${userId}`,
      JSON.stringify({ type: 'notification_read', id: msg.notificationId })
    );
  }
}

function authenticateUser(req) {
  // Authentifizierungsimplementierung - JWT, Cookie Session, usw.
  const url = new URL(req.url, `http://${req.headers.host}`);
  return url.searchParams.get('userId');
}

server.listen(3000, () => {
  console.log('Server running on port 3000');
});

Browser-WebSocket-Client#

class NotificationClient {
  constructor(userId) {
    this.userId = userId;
    this.listeners = new Map();
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(`ws://localhost:3000?userId=${this.userId}`);

    this.ws.onopen = () => {
      console.log('Connected to notification server');
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = (event) => {
      const notification = JSON.parse(event.data);
      this.emit(notification.type, notification);
    };

    this.ws.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
        this.reconnectAttempts++;
        setTimeout(() => this.connect(), delay);
      }
    };
  }

  on(type, callback) {
    if (!this.listeners.has(type)) {
      this.listeners.set(type, new Set());
    }
    this.listeners.get(type).add(callback);
  }

  emit(type, data) {
    const callbacks = this.listeners.get(type);
    if (callbacks) {
      callbacks.forEach((cb) => cb(data));
    }
  }

  markAsRead(notificationId) {
    this.ws.send(JSON.stringify({
      type: 'mark_read',
      notificationId,
    }));
  }
}

// Verwendung
const client = new NotificationClient('123');

client.on('order_update', (notification) => {
  showToast(notification.title, notification.body);
});

client.on('new_message', (notification) => {
  updateChatBadge(notification);
});

Redis Streams vs Pub/Sub - Welches waehlen?#

Redis bietet zwei Mechanismen fuer Echtzeitkommunikation: Pub/Sub und Streams. Obwohl sie aehnlich erscheinen moegen, unterscheiden sie sich grundlegend.

Redis Pub/Sub#

  • Fire-and-forget - Nachrichten werden nicht gespeichert
  • Keine Historie - ein neuer Subscriber erhaelt keine alten Nachrichten
  • Keine Bestaetigung - kein ACK-Mechanismus
  • Einfachheit - minimaler Overhead, maximale Performance
  • Ideal fuer: Benachrichtigungen, Chat, Live-Updates

Redis Streams#

  • Persistenz - Nachrichten werden in einer Datenstruktur gespeichert
  • Consumer Groups - parallele Verarbeitung mit Bestaetigungen
  • Historie - Moeglichkeit, alte Nachrichten zu lesen
  • At-least-once - Zustellungsgarantie
  • Ideal fuer: Task Queues, Event Sourcing, Audit Trails
// Redis Streams - Beispiel
// Nachricht zum Stream hinzufuegen
await redis.xadd('orders:events', '*',
  'action', 'created',
  'orderId', '4567',
  'userId', '123'
);

// Aus einer Consumer Group lesen
await redis.xreadgroup(
  'GROUP', 'order-processors', 'worker-1',
  'COUNT', 10,
  'BLOCK', 5000,
  'STREAMS', 'orders:events', '>'
);

// Verarbeitung bestaetigen
await redis.xack('orders:events', 'order-processors', messageId);

Wann was waehlen?

| Szenario | Pub/Sub | Streams | |---|---|---| | Push-Benachrichtigungen | Ja | Nein | | Live-Chat | Ja | Ja (mit Historie) | | Task Queue | Nein | Ja | | Event Sourcing | Nein | Ja | | Live Dashboard | Ja | Nein | | Log-Verarbeitung | Nein | Ja |

In der Praxis ist der beste Ansatz oft eine Kombination aus beidem. Pub/Sub fuer sofortige Zustellung, Streams fuer Persistenz und Zustandswiederherstellung.

Skalierung mit Redis Cluster#

Wenn ein einzelner Redis-Server nicht ausreicht, ermoeglicht Redis Cluster die Lastverteilung. Jedoch funktioniert Pub/Sub in einem Cluster anders als im Standalone-Modus.

Sharded Pub/Sub (Redis 7.0+)#

Seit Version 7.0 hat Redis Sharded Pub/Sub eingefuehrt, das das Skalierbarkeitsproblem loest:

import Redis from 'ioredis';

const cluster = new Redis.Cluster([
  { host: '10.0.0.1', port: 6379 },
  { host: '10.0.0.2', port: 6379 },
  { host: '10.0.0.3', port: 6379 },
]);

// Sharded Publish - Nachricht geht nur an den fuer den Kanal zustaendigen Node
await cluster.spublish('notifications:user:123', JSON.stringify(payload));

// Sharded Subscribe
cluster.ssubscribe('notifications:user:123', (err) => {
  if (err) console.error(err);
});

cluster.on('smessage', (channel, message) => {
  console.log(`Sharded message on ${channel}:`, message);
});

Im klassischen Pub/Sub wird jede Nachricht an alle Nodes im Cluster gesendet. Sharded Pub/Sub leitet Nachrichten nur an den Node weiter, der den relevanten Hash-Slot verwaltet, was den Netzwerkverkehr drastisch reduziert.

Skalierungsstrategie#

[Load Balancer]
    |-- [Node.js Server 1] --\
    |-- [Node.js Server 2] ---|-- Redis Cluster
    \-- [Node.js Server 3] --/    |-- Node A (slots 0-5460)
                                   |-- Node B (slots 5461-10922)
                                   \-- Node C (slots 10923-16383)

Jeder Node.js-Server haelt eine Verbindung zum Redis Cluster aufrecht. Benutzer koennen mit jedem Server verbunden sein - Pub/Sub stellt sicher, dass die Nachricht den richtigen Ort erreicht.

Anwendungsfaelle#

1. Echtzeit-Chat#

Redis Pub/Sub eignet sich perfekt fuer Chat-Systeme. Jeder Chat-Raum ist ein separater Kanal:

// Einem Raum beitreten
async function joinRoom(userId, roomId) {
  await redisSub.subscribe(`chat:room:${roomId}`);
  await redisPub.publish(`chat:room:${roomId}`, JSON.stringify({
    type: 'user_joined',
    userId,
    timestamp: Date.now(),
  }));
}

// Nachricht senden
async function sendMessage(userId, roomId, text) {
  const message = {
    id: crypto.randomUUID(),
    type: 'message',
    userId,
    roomId,
    text,
    timestamp: Date.now(),
  };

  // In Redis Streams speichern (Persistenz + Historie)
  await redis.xadd(`chat:history:${roomId}`, '*',
    'data', JSON.stringify(message)
  );

  // Ueber Pub/Sub senden (Echtzeit-Zustellung)
  await redisPub.publish(`chat:room:${roomId}`, JSON.stringify(message));
}

2. Live-Updates (Dashboards, Monitoring)#

Echtzeit-Updates fuer Business-Dashboards, Monitoring-Systeme oder Scoreboards:

// Metriken-Microservice veroeffentlicht Daten jede Sekunde
setInterval(async () => {
  const metrics = {
    cpu: os.loadavg()[0],
    memory: process.memoryUsage().heapUsed,
    activeUsers: await redis.scard('active:users'),
    requestsPerSec: await redis.get('metrics:rps'),
  };

  await redisPub.publish('dashboard:metrics', JSON.stringify(metrics));
}, 1000);

3. Gaming - Multiplayer und Bestenlisten#

In Multiplayer-Spielen uebernimmt Redis Pub/Sub die Synchronisation des Spielzustands:

// Spielerposition aktualisieren
async function updatePlayerPosition(gameId, playerId, position) {
  await redisPub.publish(`game:${gameId}:state`, JSON.stringify({
    type: 'player_move',
    playerId,
    position,
    timestamp: Date.now(),
  }));
}

// Bestenliste aktualisieren
async function updateLeaderboard(gameId, playerId, score) {
  await redis.zadd(`leaderboard:${gameId}`, score, playerId);
  const topPlayers = await redis.zrevrange(
    `leaderboard:${gameId}`, 0, 9, 'WITHSCORES'
  );

  await redisPub.publish(`game:${gameId}:leaderboard`, JSON.stringify({
    type: 'leaderboard_update',
    topPlayers,
  }));
}

Vergleich mit RabbitMQ und Apache Kafka#

Redis Pub/Sub ist nicht die einzige Loesung fuer asynchrone Kommunikation. Vergleichen wir es mit den zwei beliebtesten Alternativen.

Redis Pub/Sub vs RabbitMQ#

| Merkmal | Redis Pub/Sub | RabbitMQ | |---|---|---| | Modell | Pub/Sub (Fire-and-forget) | Queue + Exchange (mit Bestaetigungen) | | Persistenz | Keine | Ja (dauerhafte Queues) | | Routing | Einfach (Kanaele, Muster) | Erweitert (direct, topic, fanout, headers) | | Protokoll | RESP (Redis-Protokoll) | AMQP 0.9.1 | | Latenz | Ultra-niedrig (~0.1ms) | Niedrig (~1ms) | | Durchsatz | ~1M Nachr./s | ~50K Nachr./s | | Zustellungsgarantie | At-most-once | At-least-once / Exactly-once | | Betriebskomplexitaet | Niedrig | Mittel |

Waehlen Sie Redis Pub/Sub wenn: Sie minimale Latenz benoetigen, Nachrichtenverlust akzeptieren koennen und Redis bereits in Ihrem Stack haben.

Waehlen Sie RabbitMQ wenn: Sie Zustellungsgarantien, erweitertes Routing, Priority Queues oder Dead-Letter Queues benoetigen.

Redis Pub/Sub vs Apache Kafka#

| Merkmal | Redis Pub/Sub | Apache Kafka | |---|---|---| | Speicherung | Keine | Persistent (Log-Retention) | | Durchsatz | ~1M Nachr./s | ~1M+ Nachr./s (Partitionierung) | | Latenz | ~0.1ms | ~5-10ms | | Replay | Nicht moeglich | Vollstaendig (offset-basiert) | | Consumer Groups | Keine | Nativ | | Skalierung | Redis Cluster | Partitions + Brokers | | Komplexitaet | Minimal | Hoch (ZooKeeper/KRaft) | | Anwendungsfall | Echtzeit-Benachrichtigungen | Event Streaming, Datenpipelines |

Waehlen Sie Redis Pub/Sub wenn: Sie ein Benachrichtigungssystem, Chat oder Live-Updates bauen - wo Einfachheit und Latenz am wichtigsten sind.

Waehlen Sie Kafka wenn: Sie Datenpipelines bauen, Event Sourcing benoetigen, Replay und Dauerhaftigkeit brauchen oder Millionen von Events pro Minute verarbeiten.

Wann Redis Streams statt beidem verwenden?#

Redis Streams ist ein Mittelweg - es bietet Persistenz und Consumer Groups wie Kafka, aber mit der Einfachheit von Redis. Fuer viele Anwendungsfaelle ist es eine ausreichende Loesung, die den Bedarf an zusaetzlicher Infrastruktur eliminiert.

Entwurfsmuster und Best Practices#

1. Kanal-Namenskonvention#

{domain}:{entity}:{identifier}:{event}

notifications:user:123:new_message
orders:status:4567:updated
chat:room:general:message
system:health:server-1:alert

2. Nachrichtenstruktur#

const messageSchema = {
  id: 'uuid-v4',           // Eindeutige Nachrichten-ID
  type: 'string',          // Event-Typ
  source: 'string',        // Quelle (Microservice)
  timestamp: 'number',     // Unix-Zeitstempel
  version: 'number',       // Schema-Version
  data: {},                // Nutzdaten
  metadata: {},            // Zusaetzliche Informationen
};

3. Fehlerbehandlung und Wiederverbindung#

const redisSub = new Redis({
  host: '127.0.0.1',
  port: 6379,
  retryStrategy(times) {
    const delay = Math.min(times * 100, 3000);
    return delay;
  },
  maxRetriesPerRequest: null,
  enableReadyCheck: true,
});

redisSub.on('error', (err) => {
  console.error('Redis subscriber error:', err);
});

redisSub.on('reconnecting', (delay) => {
  console.log(`Reconnecting to Redis in ${delay}ms...`);
});

4. Monitoring und Metriken#

// Subscriber-Anzahl ueberwachen
async function getChannelStats() {
  const info = await redisPub.pubsub('NUMSUB',
    'notifications:global',
    'chat:room:general'
  );

  // info = ['notifications:global', 5, 'chat:room:general', 12]
  return info;
}

// Aktive Kanalanzahl ueberwachen
async function getActiveChannels() {
  const channels = await redisPub.pubsub('CHANNELS', 'notifications:*');
  return channels.length;
}

Fazit#

Redis Pub/Sub ist ein leistungsstarkes und dennoch einfaches Werkzeug fuer den Aufbau von Echtzeit-Kommunikationssystemen. Wichtigste Erkenntnisse:

  1. Pub/Sub ist Fire-and-forget - ideal fuer Benachrichtigungen, Chat, Live-Updates
  2. Streams ergaenzen Pub/Sub - wenn Sie Persistenz und Historie benoetigen
  3. WebSocket + Redis Pub/Sub - ist der Standard fuer Echtzeit-Architektur
  4. Sharded Pub/Sub (Redis 7.0+) - loest Skalierungsherausforderungen
  5. Redis ersetzt Kafka nicht - es sind verschiedene Werkzeuge fuer verschiedene Probleme

Redis Pub/Sub funktioniert am besten als Zustellungsschicht in einer Microservice-Architektur, wo minimale Latenz und einfache Bereitstellung gefragt sind.


Benoetigen Sie ein Echtzeit-Benachrichtigungssystem?#

Bei MDS Software Solutions Group entwerfen und implementieren wir skalierbare Echtzeit-Kommunikationssysteme. Von Chat-Anwendungen ueber Alarmsysteme bis hin zu Live-Dashboards - wir nutzen Redis Pub/Sub, WebSocket und bewaehrte Architekturmuster, um Loesungen zu liefern, die unter hoher Last zuverlaessig funktionieren.

Kontaktieren Sie uns - wir helfen Ihnen, die richtige Technologie zu waehlen und ein auf Ihre Beduerfnisse zugeschnittenes System zu bauen.

Autor
MDS Software Solutions Group

Team von Programmierexperten, die sich auf moderne Webtechnologien spezialisiert haben.

Redis Pub/Sub - Echtzeit-Benachrichtigungen | MDS Software Solutions Group | MDS Software Solutions Group