Distributed EventBus
для многопользовательских игр
Создаём распределённую шину событий на TypeScript. Синхронизация клиента и сервера через WebSockets.
Архитектура системы
Как работает распределённая шина событий
🎯 Цель проекта
Создать систему, где события могут прозрачно передаваться между клиентом и сервером через единый API EventBus. Разработчик пишет код один раз, а система сама решает — обработать событие локально или отправить по сети.
+ Bridge для отправки на сервер
UI компоненты
Передача →
Десериализация
+ Bridge для рассылки клиентам
State Management
📁 Структура проекта
🔄 Поток данных
Клиент вызывает eventBus.emit('player:move', { x, y })
Если событие помечено как remote — сериализуем и отправляем
JSON → бинарный формат → передача → JSON
Десериализация, валидация, помещение в серверный EventBus
Сервер обрабатывает и рассылает обновления всем клиентам
Серверная часть
WebSocket сервер и Server Bridge
🔌 WebSocketServer.ts
import { WebSocketServer as WSServer, WebSocket } from 'ws';
import { IncomingMessage } from 'http';
/**
* Информация о подключённом клиенте
*/
export interface ClientInfo {
id: string;
socket: WebSocket;
connectedAt: number;
lastPing: number;
metadata: Record<string, unknown>;
}
/**
* События WebSocket сервера
*/
export interface ServerEvents {
onConnection: (client: ClientInfo) => void;
onDisconnect: (clientId: string, reason: string) => void;
onMessage: (clientId: string, data: string) => void;
onError: (clientId: string, error: Error) => void;
}
/**
* WebSocket Server обёртка
*/
export class GameWebSocketServer {
private wss: WSServer;
private clients: Map<string, ClientInfo> = new Map();
private events: Partial<ServerEvents> = {};
private pingInterval?: NodeJS.Timeout;
constructor(port: number) {
this.wss = new WSServer({ port });
this.setupServer();
this.startPingInterval();
console.log(`🚀 WebSocket Server started on port ${port}`);
}
/**
* Настройка обработчиков сервера
*/
private setupServer(): void {
this.wss.on('connection', (socket: WebSocket, request: IncomingMessage) => {
const clientId = this.generateClientId();
const client: ClientInfo = {
id: clientId,
socket,
connectedAt: Date.now(),
lastPing: Date.now(),
metadata: {},
};
this.clients.set(clientId, client);
console.log(`✅ Client connected: ${clientId}`);
// Отправляем клиенту его ID
this.sendToClient(clientId, JSON.stringify({
type: '__system:connected',
clientId,
}));
// Вызываем callback
this.events.onConnection?.(client);
// Обработчик сообщений
socket.on('message', (data: Buffer) => {
try {
const message = data.toString();
this.events.onMessage?.(clientId, message);
} catch (error) {
console.error(`Error processing message from ${clientId}:`, error);
}
});
// Обработчик закрытия
socket.on('close', (code: number, reason: Buffer) => {
this.clients.delete(clientId);
console.log(`❌ Client disconnected: ${clientId}`);
this.events.onDisconnect?.(clientId, reason.toString() || 'unknown');
});
// Обработчик ошибок
socket.on('error', (error: Error) => {
console.error(`Socket error for ${clientId}:`, error);
this.events.onError?.(clientId, error);
});
// Pong обработчик
socket.on('pong', () => {
const c = this.clients.get(clientId);
if (c) c.lastPing = Date.now();
});
});
}
/**
* Ping интервал для проверки живых соединений
*/
private startPingInterval(): void {
this.pingInterval = setInterval(() => {
const now = Date.now();
this.clients.forEach((client, id) => {
// Отключаем если не было pong более 30 секунд
if (now - client.lastPing > 30000) {
console.log(`⏰ Client ${id} timed out`);
client.socket.terminate();
return;
}
// Отправляем ping
if (client.socket.readyState === WebSocket.OPEN) {
client.socket.ping();
}
});
}, 10000);
}
/**
* Генерация уникального ID клиента
*/
private generateClientId(): string {
return 'client_' + Math.random().toString(36).substring(2, 11);
}
/**
* Отправка сообщения конкретному клиенту
*/
sendToClient(clientId: string, data: string): boolean {
const client = this.clients.get(clientId);
if (!client || client.socket.readyState !== WebSocket.OPEN) {
return false;
}
client.socket.send(data);
return true;
}
/**
* Отправка всем клиентам
*/
broadcast(data: string, excludeClientId?: string): void {
this.clients.forEach((client, id) => {
if (id !== excludeClientId && client.socket.readyState === WebSocket.OPEN) {
client.socket.send(data);
}
});
}
/**
* Регистрация обработчиков событий
*/
on<K extends keyof ServerEvents>(event: K, handler: ServerEvents[K]): void {
this.events[event] = handler;
}
/**
* Получение списка клиентов
*/
getClients(): Map<string, ClientInfo> {
return this.clients;
}
/**
* Получение клиента по ID
*/
getClient(clientId: string): ClientInfo | undefined {
return this.clients.get(clientId);
}
/**
* Остановка сервера
*/
close(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
this.wss.close();
console.log('🛑 WebSocket Server stopped');
}
}
🌉 ServerBridge.ts — Мост между EventBus и WebSocket
import { EventBus } from '../shared/EventBus';
import { GameWebSocketServer } from './WebSocketServer';
import { Serializer } from '../shared/serialization';
import {
IEvent,
EventMeta,
EventDirection,
NetworkMessage,
} from '../shared/types';
import { GameEvents } from '../shared/events';
/**
* ServerBridge — связывает серверный EventBus с WebSocket
*
* Функции:
* - Принимает события от клиентов и помещает в EventBus
* - Перехватывает события из EventBus и рассылает клиентам
*/
export class ServerBridge {
private eventBus: EventBus;
private wsServer: GameWebSocketServer;
private removeInterceptor?: () => void;
constructor(eventBus: EventBus, wsServer: GameWebSocketServer) {
this.eventBus = eventBus;
this.wsServer = wsServer;
this.setupIncomingMessages();
this.setupOutgoingMessages();
this.setupConnectionEvents();
console.log('🌉 ServerBridge initialized');
}
/**
* Обработка входящих сообщений от клиентов
*/
private setupIncomingMessages(): void {
this.wsServer.on('onMessage', (clientId, data) => {
try {
const message = Serializer.deserialize(data);
// Добавляем ID отправителя
message.event.senderId = clientId;
console.log(`📨 Received from ${clientId}: ${message.event.type}`);
// Помещаем в EventBus для обработки
this.eventBus.dispatch(message.event, {
...message.meta,
direction: EventDirection.LOCAL, // Локальная обработка на сервере
});
} catch (error) {
console.error(`Failed to process message from ${clientId}:`, error);
}
});
}
/**
* Перехват исходящих событий для отправки клиентам
*/
private setupOutgoingMessages(): void {
this.removeInterceptor = this.eventBus.intercept((event, meta) => {
switch (meta.direction) {
case EventDirection.TO_ALL_CLIENTS:
this.sendToAllClients(event, meta);
break;
case EventDirection.TO_CLIENT:
if (meta.targetClientId) {
this.sendToClient(meta.targetClientId, event, meta);
}
break;
case EventDirection.BROADCAST:
// Отправляем всем кроме отправителя
this.broadcast(event, meta, event.senderId);
break;
}
});
}
/**
* Обработка событий подключения/отключения
*/
private setupConnectionEvents(): void {
this.wsServer.on('onConnection', (client) => {
// Генерируем событие подключения игрока
this.eventBus.emit(GameEvents.PLAYER_CONNECTED, {
playerId: client.id,
username: `Player_${client.id.slice(-4)}`,
position: { x: 0, y: 0 },
});
});
this.wsServer.on('onDisconnect', (clientId, reason) => {
this.eventBus.emit(GameEvents.PLAYER_DISCONNECTED, {
playerId: clientId,
reason,
});
});
}
/**
* Отправка события конкретному клиенту
*/
private sendToClient(clientId: string, event: IEvent, meta: EventMeta): void {
const message = Serializer.createMessage(event, meta);
const data = Serializer.serialize(message);
this.wsServer.sendToClient(clientId, data);
console.log(`📤 Sent to ${clientId}: ${event.type}`);
}
/**
* Отправка события всем клиентам
*/
private sendToAllClients(event: IEvent, meta: EventMeta): void {
const message = Serializer.createMessage(event, meta);
const data = Serializer.serialize(message);
this.wsServer.broadcast(data);
console.log(`📢 Broadcast: ${event.type}`);
}
/**
* Broadcast с исключением
*/
private broadcast(event: IEvent, meta: EventMeta, excludeId?: string): void {
const message = Serializer.createMessage(event, meta);
const data = Serializer.serialize(message);
this.wsServer.broadcast(data, excludeId);
console.log(`📢 Broadcast (exclude ${excludeId}): ${event.type}`);
}
/**
* Уничтожение bridge
*/
destroy(): void {
this.removeInterceptor?.();
}
}
🎮 GameServer.ts — Игровой сервер
import { EventBus } from '../shared/EventBus';
import {
GameEvents,
PlayerConnectedPayload,
PlayerDisconnectedPayload,
PlayerInputPayload,
PlayerMovePayload,
GameStateSyncPayload,
ChatMessagePayload,
} from '../shared/events';
import { EventDirection } from '../shared/types';
/**
* Состояние игрока на сервере
*/
interface PlayerState {
id: string;
username: string;
position: { x: number; y: number };
velocity: { x: number; y: number };
health: number;
score: number;
lastInput: PlayerInputPayload | null;
}
/**
* GameServer — игровая логика на сервере
*/
export class GameServer {
private eventBus: EventBus;
private players: Map<string, PlayerState> = new Map();
private tickRate: number = 60; // тиков в секунду
private tick: number = 0;
private gameLoop?: NodeJS.Timeout;
private syncInterval?: NodeJS.Timeout;
private readonly PLAYER_SPEED = 200; // пикселей в секунду
private readonly WORLD_BOUNDS = { width: 1920, height: 1080 };
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.setupEventHandlers();
console.log('🎮 GameServer initialized');
}
/**
* Подписка на игровые события
*/
private setupEventHandlers(): void {
// Подключение игрока
this.eventBus.on(GameEvents.PLAYER_CONNECTED, (payload) => {
this.handlePlayerConnected(payload);
});
// Отключение игрока
this.eventBus.on(GameEvents.PLAYER_DISCONNECTED, (payload) => {
this.handlePlayerDisconnected(payload);
});
// Ввод игрока
this.eventBus.on(GameEvents.PLAYER_INPUT, (payload) => {
this.handlePlayerInput(payload);
});
// Чат
this.eventBus.on(GameEvents.CHAT_MESSAGE, (payload) => {
this.handleChatMessage(payload);
});
}
/**
* Обработка подключения игрока
*/
private handlePlayerConnected(payload: PlayerConnectedPayload): void {
// Создаём состояние нового игрока
const player: PlayerState = {
id: payload.playerId,
username: payload.username,
position: {
x: Math.random() * this.WORLD_BOUNDS.width,
y: Math.random() * this.WORLD_BOUNDS.height,
},
velocity: { x: 0, y: 0 },
health: 100,
score: 0,
lastInput: null,
};
this.players.set(player.id, player);
console.log(`👤 Player joined: ${player.username} (${player.id})`);
// Отправляем новому игроку текущее состояние
this.sendGameStateToPlayer(player.id);
}
/**
* Обработка отключения игрока
*/
private handlePlayerDisconnected(payload: PlayerDisconnectedPayload): void {
const player = this.players.get(payload.playerId);
if (player) {
console.log(`👤 Player left: ${player.username}`);
this.players.delete(payload.playerId);
}
}
/**
* Обработка ввода игрока
*/
private handlePlayerInput(payload: PlayerInputPayload): void {
const player = this.players.get(payload.playerId);
if (!player) return;
// Сохраняем последний ввод
player.lastInput = payload;
// Вычисляем velocity на основе ввода
player.velocity = {
x: (payload.keys.right ? 1 : 0) - (payload.keys.left ? 1 : 0),
y: (payload.keys.down ? 1 : 0) - (payload.keys.up ? 1 : 0),
};
}
/**
* Обработка чат-сообщения
*/
private handleChatMessage(payload: ChatMessagePayload): void {
console.log(`💬 [${payload.senderName}]: ${payload.message}`);
// Рассылаем сообщение всем игрокам
this.eventBus.emit(GameEvents.CHAT_MESSAGE, payload, {
direction: EventDirection.TO_ALL_CLIENTS,
});
}
/**
* Запуск игрового цикла
*/
start(): void {
const tickInterval = 1000 / this.tickRate;
// Основной игровой цикл
this.gameLoop = setInterval(() => {
this.update(tickInterval / 1000);
this.tick++;
}, tickInterval);
// Синхронизация состояния (20 раз в секунду)
this.syncInterval = setInterval(() => {
this.broadcastGameState();
}, 50);
console.log(`🎮 Game loop started (${this.tickRate} tick/s)`);
}
/**
* Обновление игрового состояния
*/
private update(deltaTime: number): void {
this.players.forEach(player => {
// Обновляем позицию
player.position.x += player.velocity.x * this.PLAYER_SPEED * deltaTime;
player.position.y += player.velocity.y * this.PLAYER_SPEED * deltaTime;
// Ограничиваем границами мира
player.position.x = Math.max(0, Math.min(this.WORLD_BOUNDS.width, player.position.x));
player.position.y = Math.max(0, Math.min(this.WORLD_BOUNDS.height, player.position.y));
});
}
/**
* Отправка состояния конкретному игроку
*/
private sendGameStateToPlayer(playerId: string): void {
const statePayload = this.createGameStatePayload();
this.eventBus.emit(GameEvents.GAME_STATE_SYNC, statePayload, {
direction: EventDirection.TO_CLIENT,
targetClientId: playerId,
});
}
/**
* Рассылка состояния всем игрокам
*/
private broadcastGameState(): void {
if (this.players.size === 0) return;
const statePayload = this.createGameStatePayload();
this.eventBus.emit(GameEvents.GAME_STATE_SYNC, statePayload, {
direction: EventDirection.TO_ALL_CLIENTS,
});
}
/**
* Создание payload состояния игры
*/
private createGameStatePayload(): GameStateSyncPayload {
return {
players: Array.from(this.players.values()).map(p => ({
id: p.id,
position: { ...p.position },
health: p.health,
score: p.score,
})),
serverTime: Date.now(),
tick: this.tick,
};
}
/**
* Остановка сервера
*/
stop(): void {
if (this.gameLoop) clearInterval(this.gameLoop);
if (this.syncInterval) clearInterval(this.syncInterval);
console.log('🛑 Game loop stopped');
}
}
🚀 server/index.ts — Точка входа
import { EventBus } from '../shared/EventBus';
import { GameWebSocketServer } from './WebSocketServer';
import { ServerBridge } from './ServerBridge';
import { GameServer } from './GameServer';
const PORT = 8080;
// Создаём компоненты
const eventBus = new EventBus('server');
eventBus.debug = true;
const wsServer = new GameWebSocketServer(PORT);
const bridge = new ServerBridge(eventBus, wsServer);
const gameServer = new GameServer(eventBus);
// Запускаем игровой цикл
gameServer.start();
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n🛑 Shutting down...');
gameServer.stop();
bridge.destroy();
wsServer.close();
process.exit(0);
});
console.log(`
╔════════════════════════════════════════════╗
║ 🎮 Game Server Running ║
║ 📡 WebSocket: ws://localhost:${PORT} ║
║ Press Ctrl+C to stop ║
╚════════════════════════════════════════════╝
`);
Клиентская часть
WebSocket клиент и Client Bridge
🔌 WebSocketClient.ts
/**
* Состояние соединения
*/
export enum ConnectionState {
DISCONNECTED = 'disconnected',
CONNECTING = 'connecting',
CONNECTED = 'connected',
RECONNECTING = 'reconnecting',
}
/**
* События клиента
*/
export interface ClientEvents {
onConnect: (clientId: string) => void;
onDisconnect: (reason: string) => void;
onMessage: (data: string) => void;
onError: (error: Error) => void;
onStateChange: (state: ConnectionState) => void;
}
/**
* WebSocket клиент с автореконнектом
*/
export class GameWebSocketClient {
private socket: WebSocket | null = null;
private url: string;
private events: Partial<ClientEvents> = {};
private state: ConnectionState = ConnectionState.DISCONNECTED;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 5;
private reconnectDelay: number = 1000;
private clientId: string | null = null;
private messageQueue: string[] = [];
constructor(url: string) {
this.url = url;
}
/**
* Подключение к серверу
*/
connect(): Promise<string> {
return new Promise((resolve, reject) => {
if (this.state === ConnectionState.CONNECTED) {
resolve(this.clientId!);
return;
}
this.setState(ConnectionState.CONNECTING);
try {
this.socket = new WebSocket(this.url);
this.socket.onopen = () => {
console.log('🔌 WebSocket connected');
this.reconnectAttempts = 0;
// Ждём сообщение с clientId
};
this.socket.onmessage = (event) => {
const data = event.data as string;
// Проверяем системное сообщение о подключении
try {
const parsed = JSON.parse(data);
if (parsed.type === '__system:connected') {
this.clientId = parsed.clientId;
this.setState(ConnectionState.CONNECTED);
this.events.onConnect?.(this.clientId);
this.flushMessageQueue();
resolve(this.clientId);
return;
}
} catch {}
// Обычное сообщение
this.events.onMessage?.(data);
};
this.socket.onclose = (event) => {
console.log(`❌ WebSocket closed: ${event.reason || 'unknown'}`);
this.events.onDisconnect?.(event.reason || 'Connection closed');
this.handleDisconnect();
};
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
this.events.onError?.(new Error('WebSocket error'));
reject(error);
};
} catch (error) {
this.setState(ConnectionState.DISCONNECTED);
reject(error);
}
});
}
/**
* Обработка отключения
*/
private handleDisconnect(): void {
this.socket = null;
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.setState(ConnectionState.RECONNECTING);
this.reconnectAttempts++;
const delay = this.reconnectDelay * this.reconnectAttempts;
console.log(`🔄 Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
} else {
this.setState(ConnectionState.DISCONNECTED);
console.log('❌ Max reconnect attempts reached');
}
}
/**
* Отправка сообщения
*/
send(data: string): boolean {
if (this.state !== ConnectionState.CONNECTED || !this.socket) {
// Добавляем в очередь если не подключены
this.messageQueue.push(data);
return false;
}
this.socket.send(data);
return true;
}
/**
* Отправка очереди сообщений
*/
private flushMessageQueue(): void {
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift()!;
this.send(msg);
}
}
/**
* Изменение состояния
*/
private setState(state: ConnectionState): void {
this.state = state;
this.events.onStateChange?.(state);
}
/**
* Регистрация обработчиков
*/
on<K extends keyof ClientEvents>(event: K, handler: ClientEvents[K]): void {
this.events[event] = handler;
}
/**
* Геттеры
*/
getClientId(): string | null { return this.clientId; }
getState(): ConnectionState { return this.state; }
isConnected(): boolean { return this.state === ConnectionState.CONNECTED; }
/**
* Отключение
*/
disconnect(): void {
this.maxReconnectAttempts = 0; // Отключаем автореконнект
this.socket?.close();
}
}
🌉 ClientBridge.ts
import { EventBus } from '../shared/EventBus';
import { GameWebSocketClient } from './WebSocketClient';
import { Serializer } from '../shared/serialization';
import { IEvent, EventMeta, EventDirection } from '../shared/types';
/**
* ClientBridge — связывает клиентский EventBus с WebSocket
*/
export class ClientBridge {
private eventBus: EventBus;
private wsClient: GameWebSocketClient;
private removeInterceptor?: () => void;
constructor(eventBus: EventBus, wsClient: GameWebSocketClient) {
this.eventBus = eventBus;
this.wsClient = wsClient;
this.setupIncomingMessages();
this.setupOutgoingMessages();
console.log('🌉 ClientBridge initialized');
}
/**
* Обработка входящих сообщений от сервера
*/
private setupIncomingMessages(): void {
this.wsClient.on('onMessage', (data) => {
try {
const message = Serializer.deserialize(data);
console.log(`📨 Received: ${message.event.type}`);
// Помещаем в локальный EventBus
this.eventBus.dispatch(message.event, {
...message.meta,
direction: EventDirection.LOCAL,
});
} catch (error) {
console.error('Failed to process server message:', error);
}
});
}
/**
* Перехват исходящих событий для отправки на сервер
*/
private setupOutgoingMessages(): void {
this.removeInterceptor = this.eventBus.intercept((event, meta) => {
// Отправляем только события с направлением TO_SERVER
if (meta.direction === EventDirection.TO_SERVER) {
this.sendToServer(event, meta);
}
});
}
/**
* Отправка события на сервер
*/
private sendToServer(event: IEvent, meta: EventMeta): void {
// Добавляем clientId
event.senderId = this.wsClient.getClientId() ?? undefined;
const message = Serializer.createMessage(event, meta);
const data = Serializer.serialize(message);
this.wsClient.send(data);
console.log(`📤 Sent to server: ${event.type}`);
}
/**
* Уничтожение bridge
*/
destroy(): void {
this.removeInterceptor?.();
}
}
🎮 GameClient.ts — Игровой клиент
import { EventBus } from '../shared/EventBus';
import {
GameEvents,
PlayerInputPayload,
GameStateSyncPayload,
PlayerConnectedPayload,
PlayerDisconnectedPayload,
ChatMessagePayload,
} from '../shared/events';
import { EventDirection } from '../shared/types';
/**
* Состояние ввода
*/
interface InputState {
up: boolean;
down: boolean;
left: boolean;
right: boolean;
action: boolean;
}
/**
* Локальное представление игрока
*/
interface LocalPlayer {
id: string;
position: { x: number; y: number };
health: number;
score: number;
}
/**
* GameClient — клиентская игровая логика
*/
export class GameClient {
private eventBus: EventBus;
private playerId: string | null = null;
private players: Map<string, LocalPlayer> = new Map();
private inputState: InputState = {
up: false, down: false, left: false, right: false, action: false
};
private inputSequence: number = 0;
private inputInterval?: number;
private lastInputState: string = '';
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.setupEventHandlers();
console.log('🎮 GameClient initialized');
}
/**
* Подписка на события
*/
private setupEventHandlers(): void {
// Синхронизация состояния игры
this.eventBus.on(GameEvents.GAME_STATE_SYNC, (payload) => {
this.handleGameStateSync(payload);
});
// Подключение другого игрока
this.eventBus.on(GameEvents.PLAYER_CONNECTED, (payload) => {
this.handlePlayerConnected(payload);
});
// Отключение игрока
this.eventBus.on(GameEvents.PLAYER_DISCONNECTED, (payload) => {
this.handlePlayerDisconnected(payload);
});
// Чат
this.eventBus.on(GameEvents.CHAT_MESSAGE, (payload) => {
this.handleChatMessage(payload);
});
}
/**
* Установка ID игрока (вызывается после подключения)
*/
setPlayerId(id: string): void {
this.playerId = id;
console.log(`👤 Player ID set: ${id}`);
}
/**
* Обработка синхронизации состояния
*/
private handleGameStateSync(payload: GameStateSyncPayload): void {
// Обновляем состояние всех игроков
payload.players.forEach(playerData => {
this.players.set(playerData.id, {
id: playerData.id,
position: { ...playerData.position },
health: playerData.health,
score: playerData.score,
});
});
// Удаляем игроков, которых нет в обновлении
const serverPlayerIds = new Set(payload.players.map(p => p.id));
this.players.forEach((_, id) => {
if (!serverPlayerIds.has(id)) {
this.players.delete(id);
}
});
}
/**
* Обработка подключения другого игрока
*/
private handlePlayerConnected(payload: PlayerConnectedPayload): void {
console.log(`👤 Player joined: ${payload.username}`);
this.players.set(payload.playerId, {
id: payload.playerId,
position: payload.position,
health: 100,
score: 0,
});
}
/**
* Обработка отключения игрока
*/
private handlePlayerDisconnected(payload: PlayerDisconnectedPayload): void {
console.log(`👤 Player left: ${payload.playerId}`);
this.players.delete(payload.playerId);
}
/**
* Обработка чат-сообщения
*/
private handleChatMessage(payload: ChatMessagePayload): void {
console.log(`💬 [${payload.senderName}]: ${payload.message}`);
// Здесь можно обновить UI чата
}
/**
* Запуск обработки ввода
*/
startInputHandling(): void {
// Слушаем клавиши
document.addEventListener('keydown', this.handleKeyDown.bind(this));
document.addEventListener('keyup', this.handleKeyUp.bind(this));
// Отправляем состояние ввода 30 раз в секунду
this.inputInterval = window.setInterval(() => {
this.sendInputIfChanged();
}, 33);
console.log('🎮 Input handling started (WASD + Space)');
}
/**
* Обработка нажатия клавиши
*/
private handleKeyDown(e: KeyboardEvent): void {
switch (e.code) {
case 'KeyW': case 'ArrowUp': this.inputState.up = true; break;
case 'KeyS': case 'ArrowDown': this.inputState.down = true; break;
case 'KeyA': case 'ArrowLeft': this.inputState.left = true; break;
case 'KeyD': case 'ArrowRight': this.inputState.right = true; break;
case 'Space': this.inputState.action = true; break;
}
}
/**
* Обработка отпускания клавиши
*/
private handleKeyUp(e: KeyboardEvent): void {
switch (e.code) {
case 'KeyW': case 'ArrowUp': this.inputState.up = false; break;
case 'KeyS': case 'ArrowDown': this.inputState.down = false; break;
case 'KeyA': case 'ArrowLeft': this.inputState.left = false; break;
case 'KeyD': case 'ArrowRight': this.inputState.right = false; break;
case 'Space': this.inputState.action = false; break;
}
}
/**
* Отправка ввода если изменился
*/
private sendInputIfChanged(): void {
if (!this.playerId) return;
const currentState = JSON.stringify(this.inputState);
// Отправляем только если состояние изменилось
if (currentState !== this.lastInputState) {
this.lastInputState = currentState;
this.inputSequence++;
const payload: PlayerInputPayload = {
playerId: this.playerId,
keys: { ...this.inputState },
sequenceNumber: this.inputSequence,
};
this.eventBus.emit(GameEvents.PLAYER_INPUT, payload, {
direction: EventDirection.TO_SERVER,
});
}
}
/**
* Отправка сообщения в чат
*/
sendChatMessage(message: string): void {
if (!this.playerId) return;
const payload: ChatMessagePayload = {
senderId: this.playerId,
senderName: `Player_${this.playerId.slice(-4)}`,
message,
channel: 'global',
};
this.eventBus.emit(GameEvents.CHAT_MESSAGE, payload, {
direction: EventDirection.TO_SERVER,
});
}
/**
* Получение списка игроков
*/
getPlayers(): Map<string, LocalPlayer> {
return this.players;
}
/**
* Получение своего игрока
*/
getLocalPlayer(): LocalPlayer | undefined {
return this.playerId ? this.players.get(this.playerId) : undefined;
}
/**
* Остановка
*/
stop(): void {
if (this.inputInterval) {
clearInterval(this.inputInterval);
}
document.removeEventListener('keydown', this.handleKeyDown);
document.removeEventListener('keyup', this.handleKeyUp);
}
}
🚀 client/index.ts — Точка входа клиента
import { EventBus } from '../shared/EventBus';
import { GameWebSocketClient } from './WebSocketClient';
import { ClientBridge } from './ClientBridge';
import { GameClient } from './GameClient';
const SERVER_URL = 'ws://localhost:8080';
/**
* Инициализация клиента
*/
async function init(): Promise<void> {
console.log('🎮 Initializing game client...');
// Создаём компоненты
const eventBus = new EventBus('client');
eventBus.debug = true;
const wsClient = new GameWebSocketClient(SERVER_URL);
const bridge = new ClientBridge(eventBus, wsClient);
const gameClient = new GameClient(eventBus);
// Обработка изменения состояния соединения
wsClient.on('onStateChange', (state) => {
updateConnectionStatus(state);
});
try {
// Подключаемся к серверу
const clientId = await wsClient.connect();
console.log(`✅ Connected as ${clientId}`);
// Устанавливаем ID игрока
gameClient.setPlayerId(clientId);
// Запускаем обработку ввода
gameClient.startInputHandling();
// Запускаем игровой цикл рендеринга
startRenderLoop(gameClient);
// Экспортируем для отладки
(window as any).game = { eventBus, wsClient, gameClient };
} catch (error) {
console.error('❌ Failed to connect:', error);
}
}
/**
* Обновление статуса соединения в UI
*/
function updateConnectionStatus(state: string): void {
const statusEl = document.getElementById('connection-status');
if (statusEl) {
statusEl.textContent = state;
statusEl.className = `status status--${state}`;
}
}
/**
* Игровой цикл рендеринга
*/
function startRenderLoop(gameClient: GameClient): void {
const canvas = document.getElementById('game-canvas') as HTMLCanvasElement;
if (!canvas) return;
const ctx = canvas.getContext('2d')!;
const PLAYER_SIZE = 30;
function render(): void {
// Очищаем canvas
ctx.fillStyle = '#1a1a2e';
ctx.fillRect(0, 0, canvas.width, canvas.height);
// Рендерим сетку
ctx.strokeStyle = '#2a2a4e';
ctx.lineWidth = 1;
for (let x = 0; x < canvas.width; x += 50) {
ctx.beginPath();
ctx.moveTo(x, 0);
ctx.lineTo(x, canvas.height);
ctx.stroke();
}
for (let y = 0; y < canvas.height; y += 50) {
ctx.beginPath();
ctx.moveTo(0, y);
ctx.lineTo(canvas.width, y);
ctx.stroke();
}
// Рендерим игроков
const localPlayer = gameClient.getLocalPlayer();
gameClient.getPlayers().forEach(player => {
const isLocal = player.id === localPlayer?.id;
// Тело игрока
ctx.fillStyle = isLocal ? '#4ade80' : '#60a5fa';
ctx.beginPath();
ctx.arc(
player.position.x,
player.position.y,
PLAYER_SIZE / 2,
0,
Math.PI * 2
);
ctx.fill();
// ID игрока
ctx.fillStyle = '#ffffff';
ctx.font = '12px monospace';
ctx.textAlign = 'center';
ctx.fillText(
player.id.slice(-4),
player.position.x,
player.position.y - PLAYER_SIZE
);
// Полоска здоровья
const healthWidth = 40;
const healthHeight = 4;
ctx.fillStyle = '#333';
ctx.fillRect(
player.position.x - healthWidth / 2,
player.position.y + PLAYER_SIZE,
healthWidth,
healthHeight
);
ctx.fillStyle = '#4ade80';
ctx.fillRect(
player.position.x - healthWidth / 2,
player.position.y + PLAYER_SIZE,
(player.health / 100) * healthWidth,
healthHeight
);
});
// Следующий кадр
requestAnimationFrame(render);
}
render();
console.log('🖼️ Render loop started');
}
// Запуск при загрузке
document.addEventListener('DOMContentLoaded', init);
Примеры использования
Запуск проекта и API шины событий
📦 package.json
{
"name": "distributed-eventbus",
"version": "1.0.0",
"type": "module",
"scripts": {
"build": "tsc",
"server": "tsx server/index.ts",
"client": "vite",
"dev": "concurrently \"npm run server\" \"npm run client\""
},
"dependencies": {
"ws": "^8.16.0"
},
"devDependencies": {
"@types/node": "^20.11.0",
"@types/ws": "^8.5.10",
"concurrently": "^8.2.2",
"tsx": "^4.7.0",
"typescript": "^5.3.3",
"vite": "^5.0.12"
}
}
⚙️ tsconfig.json
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"outDir": "./dist",
"rootDir": "./",
"declaration": true
},
"include": ["shared/**/*", "server/**/*", "client/**/*"],
"exclude": ["node_modules", "dist"]
}
🌐 index.html — HTML для клиента
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Distributed EventBus Demo</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Segoe UI', sans-serif;
background: #0f0f1a;
color: #fff;
min-height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
padding: 20px;
}
h1 { margin-bottom: 10px; }
.status {
padding: 8px 16px;
border-radius: 20px;
font-size: 14px;
margin-bottom: 20px;
}
.status--connected { background: #22c55e; }
.status--connecting { background: #f59e0b; }
.status--disconnected { background: #ef4444; }
.status--reconnecting { background: #8b5cf6; }
#game-canvas {
border: 2px solid #333;
border-radius: 8px;
}
.controls {
margin-top: 20px;
color: #888;
text-align: center;
}
</style>
</head>
<body>
<h1>🎮 Distributed EventBus Demo</h1>
<div id="connection-status" class="status status--connecting">
connecting
</div>
<canvas id="game-canvas" width="800" height="600"></canvas>
<div class="controls">
<p>🕹️ Use <strong>WASD</strong> or <strong>Arrow Keys</strong> to move</p>
<p>Open multiple tabs to see multiplayer in action!</p>
</div>
<script type="module" src="/client/index.ts"></script>
</body>
</html>
🚀 Запуск проекта
npm install — установка зависимостей
npm run server — запуск сервера (порт 8080)
npm run client — запуск клиента (Vite dev server)
npm run dev — запуск сервера и клиента одновременно
📡 API EventBus
// ═══════════════════════════════════════════════════════════
// Создание EventBus
// ═══════════════════════════════════════════════════════════
const eventBus = new EventBus('my-bus');
eventBus.debug = true; // Включить логирование
// ═══════════════════════════════════════════════════════════
// Подписка на события (с полной типизацией!)
// ═══════════════════════════════════════════════════════════
// Обычная подписка
const sub = eventBus.on(GameEvents.PLAYER_MOVE, (payload) => {
// payload типизирован как PlayerMovePayload
console.log(payload.position.x, payload.position.y);
});
// Подписка с опциями
eventBus.on(GameEvents.CHAT_MESSAGE, (payload) => {
console.log(payload.message);
}, {
priority: 10, // Высокий приоритет
once: false, // Многоразовая подписка
filter: (e) => e.payload.channel === 'global', // Фильтр
});
// Подписка на один раз
eventBus.once(GameEvents.PLAYER_CONNECTED, (payload) => {
console.log('First player connected!');
});
// Подписка на ВСЕ события
eventBus.onAny((payload, event) => {
console.log(`Event: ${event.type}`);
});
// ═══════════════════════════════════════════════════════════
// Отправка событий
// ═══════════════════════════════════════════════════════════
// Локальное событие (обработается только локально)
eventBus.emit(GameEvents.PLAYER_MOVE, {
playerId: 'player_123',
position: { x: 100, y: 200 },
velocity: { x: 1, y: 0 },
timestamp: Date.now(),
});
// Событие на сервер
eventBus.emit(GameEvents.PLAYER_INPUT, {
playerId: 'player_123',
keys: { up: true, down: false, left: false, right: false, action: false },
sequenceNumber: 42,
}, {
direction: EventDirection.TO_SERVER,
});
// Событие всем клиентам (с сервера)
eventBus.emit(GameEvents.GAME_STATE_SYNC, {
players: [...],
serverTime: Date.now(),
tick: 1234,
}, {
direction: EventDirection.TO_ALL_CLIENTS,
});
// ═══════════════════════════════════════════════════════════
// Отписка
// ═══════════════════════════════════════════════════════════
// Через возвращённый объект
sub.unsubscribe();
// Или напрямую
eventBus.off(GameEvents.PLAYER_MOVE, sub.id);
// ═══════════════════════════════════════════════════════════
// Middleware (обработка перед dispatch)
// ═══════════════════════════════════════════════════════════
eventBus.use((event, meta) => {
// Логирование всех событий
console.log(`[Middleware] ${event.type}`);
// Можно модифицировать событие
event.timestamp = Date.now();
// Вернуть null чтобы заблокировать событие
if (event.type === 'blocked:event') {
return null;
}
return event;
});
// ═══════════════════════════════════════════════════════════
// Статистика
// ═══════════════════════════════════════════════════════════
const stats = eventBus.getStats();
console.log(stats.eventTypes); // Количество типов событий
console.log(stats.totalSubscriptions); // Общее количество подписок
Теперь у вас есть полностью рабочая распределённая шина событий. Откройте несколько вкладок браузера и увидите, как игроки синхронизируются в реальном времени через WebSocket.
Заключение
Итоги и дальнейшее развитие
📝 Что мы создали
Мы построили полноценную архитектуру распределённой шины событий, которая позволяет:
- ✅ Единый API — один и тот же EventBus на клиенте и сервере
- ✅ Типобезопасность — полная типизация событий и payload в TypeScript
- ✅ Прозрачная сериализация — автоматическое преобразование событий для передачи по сети
- ✅ Гибкая маршрутизация — события могут направляться локально, на сервер или всем клиентам
- ✅ Автореконнект — клиент автоматически переподключается при потере связи
- ✅ Middleware — возможность перехватывать и модифицировать события
- ✅ Масштабируемость — легко добавлять новые типы событий
🚀 Идеи для развития
JWT токены, проверка прав доступа к событиям
Отправка только изменений состояния
Предсказание движения для плавности
Изоляция событий по игровым комнатам
MessagePack или Protocol Buffers вместо JSON
Ограничение частоты событий от клиентов
🔧 Пример расширения: Комнаты
/**
* Расширение EventMeta для поддержки комнат
*/
export interface RoomEventMeta extends EventMeta {
/** ID комнаты */
roomId?: string;
/** Отправить только в комнату */
roomOnly?: boolean;
}
/**
* RoomManager — управление игровыми комнатами
*/
export class RoomManager {
private rooms: Map<string, Set<string>> = new Map();
/**
* Создание комнаты
*/
createRoom(roomId: string): void {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
console.log(`🚪 Room created: ${roomId}`);
}
}
/**
* Присоединение клиента к комнате
*/
joinRoom(roomId: string, clientId: string): void {
this.createRoom(roomId);
this.rooms.get(roomId)!.add(clientId);
console.log(`👤 ${clientId} joined room ${roomId}`);
}
/**
* Выход клиента из комнаты
*/
leaveRoom(roomId: string, clientId: string): void {
const room = this.rooms.get(roomId);
if (room) {
room.delete(clientId);
if (room.size === 0) {
this.rooms.delete(roomId);
console.log(`🚪 Room deleted: ${roomId}`);
}
}
}
/**
* Получение клиентов в комнате
*/
getRoomClients(roomId: string): string[] {
return Array.from(this.rooms.get(roomId) ?? []);
}
/**
* Получение комнат клиента
*/
getClientRooms(clientId: string): string[] {
const clientRooms: string[] = [];
this.rooms.forEach((clients, roomId) => {
if (clients.has(clientId)) {
clientRooms.push(roomId);
}
});
return clientRooms;
}
}
// Использование в ServerBridge
class ServerBridgeWithRooms extends ServerBridge {
private roomManager = new RoomManager();
/**
* Отправка события в комнату
*/
sendToRoom(roomId: string, event: IEvent, meta: EventMeta): void {
const clients = this.roomManager.getRoomClients(roomId);
const message = Serializer.createMessage(event, meta);
const data = Serializer.serialize(message);
clients.forEach(clientId => {
this.wsServer.sendToClient(clientId, data);
});
console.log(`📢 Sent to room ${roomId}: ${event.type}`);
}
}
🔧 Пример расширения: Rate Limiting
/**
* Конфигурация rate limiting
*/
interface RateLimitConfig {
/** Максимум событий */
maxEvents: number;
/** За период (мс) */
windowMs: number;
}
/**
* Rate Limiter для EventBus
*/
export class RateLimiter {
private events: Map<string, number[]> = new Map();
private config: Map<string, RateLimitConfig> = new Map();
/**
* Установка лимита для типа события
*/
setLimit(eventType: string, maxEvents: number, windowMs: number): void {
this.config.set(eventType, { maxEvents, windowMs });
}
/**
* Проверка возможности отправки события
*/
canEmit(senderId: string, eventType: string): boolean {
const limit = this.config.get(eventType);
if (!limit) return true; // Нет лимита
const key = `${senderId}:${eventType}`;
const now = Date.now();
const timestamps = this.events.get(key) ?? [];
// Удаляем устаревшие записи
const validTimestamps = timestamps.filter(
t => now - t < limit.windowMs
);
if (validTimestamps.length >= limit.maxEvents) {
console.warn(`⚠️ Rate limit exceeded: ${senderId} - ${eventType}`);
return false;
}
validTimestamps.push(now);
this.events.set(key, validTimestamps);
return true;
}
/**
* Создание middleware для EventBus
*/
createMiddleware(): (event: IEvent, meta: EventMeta) => IEvent | null {
return (event, meta) => {
if (!event.senderId) return event;
if (!this.canEmit(event.senderId, event.type)) {
return null; // Блокируем событие
}
return event;
};
}
}
// Использование
const rateLimiter = new RateLimiter();
// Максимум 30 input событий в секунду от одного клиента
rateLimiter.setLimit(GameEvents.PLAYER_INPUT, 30, 1000);
// Максимум 5 сообщений в чат за 10 секунд
rateLimiter.setLimit(GameEvents.CHAT_MESSAGE, 5, 10000);
// Подключаем к EventBus
eventBus.use(rateLimiter.createMiddleware());
В production-окружении добавьте:
• SSL/TLS — используйте wss:// вместо ws://
• Аутентификацию — проверяйте токены при подключении
• Валидацию — проверяйте payload событий на сервере
• Логирование — записывайте события для отладки
• Мониторинг — отслеживайте количество подключений и нагрузку
📚 Полезные ресурсы
- MDN WebSockets API
- TypeScript Documentation
- ws — Node.js WebSocket library
- Game Networking by Glenn Fiedler
- Fast-Paced Multiplayer
Спасибо за чтение! 🎮
Если статья была полезной — поделитесь с коллегами