bfcache: server - Implement WebSocket server functionality
* Added WebSocketServer class with configurable options * Introduced WebSocketServerFactory for creating WebSocket relay servers * Updated index.ts to set up a server with WebSocket support and handle various signals for graceful shutdown
This commit is contained in:
98
Web/bfcache/server/src/net/websockets/WebSocketServer.ts
Normal file
98
Web/bfcache/server/src/net/websockets/WebSocketServer.ts
Normal file
@@ -0,0 +1,98 @@
|
||||
import type {ServerWebSocket, WebSocketCompressor} from 'bun';
|
||||
import type {Seconds, Bytes} from '../../types/Units';
|
||||
|
||||
export type PerMessageDeflate =
|
||||
| boolean
|
||||
| {
|
||||
compress?: boolean | WebSocketCompressor;
|
||||
decomporess?: boolean | WebSocketCompressor;
|
||||
};
|
||||
|
||||
export type WebSocketServerOptions = {
|
||||
maxPayloadLength?: Bytes;
|
||||
idleTimeout?: Seconds;
|
||||
backPressureLimit?: Bytes;
|
||||
closeOnBackPressureLimit?: boolean;
|
||||
sendPings?: boolean;
|
||||
publishToSelf?: boolean;
|
||||
perMessageDeflate?: PerMessageDeflate;
|
||||
onSocketError?: (client: ServerWebSocket, error: Error) => void;
|
||||
onSocketOpen?: (client: ServerWebSocket) => void;
|
||||
onSocketMessage?: (client: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
|
||||
onSocketDrain?: (client: ServerWebSocket) => void;
|
||||
onSocketClose?: (client: ServerWebSocket) => void;
|
||||
};
|
||||
|
||||
const webSocketServerDefaults: WebSocketServerOptions = {
|
||||
maxPayloadLength: 16 * 1024 * 1024, // 16 MB
|
||||
idleTimeout: 120, // 2 minutes
|
||||
backPressureLimit: 1 * 1024 * 1024, // 1 MB
|
||||
closeOnBackPressureLimit: false,
|
||||
sendPings: true,
|
||||
publishToSelf: true,
|
||||
perMessageDeflate: true,
|
||||
onSocketError: undefined,
|
||||
onSocketOpen: undefined,
|
||||
onSocketMessage: undefined,
|
||||
onSocketDrain: undefined,
|
||||
onSocketClose: undefined
|
||||
};
|
||||
|
||||
export default class WebSocketServer {
|
||||
private readonly _maxPayloadLength: Bytes;
|
||||
private readonly _idleTimeout: Seconds;
|
||||
private readonly _backPressureLimit: Bytes;
|
||||
private readonly _closeOnBackPressureLimit: boolean;
|
||||
private readonly _sendPings: boolean;
|
||||
private readonly _publishToSelf: boolean;
|
||||
private readonly _perMessageDeflate: PerMessageDeflate;
|
||||
|
||||
public readonly error?: (client: ServerWebSocket, error: Error) => void;
|
||||
public readonly open?: (client: ServerWebSocket) => void;
|
||||
public readonly message?: (client: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
|
||||
public readonly drain?: (client: ServerWebSocket) => void;
|
||||
public readonly close?: (client: ServerWebSocket) => void;
|
||||
|
||||
constructor(options?: WebSocketServerOptions) {
|
||||
this._maxPayloadLength = options?.maxPayloadLength ?? webSocketServerDefaults.maxPayloadLength!;
|
||||
this._idleTimeout = options?.idleTimeout ?? webSocketServerDefaults.idleTimeout!;
|
||||
this._backPressureLimit = options?.backPressureLimit ?? webSocketServerDefaults.backPressureLimit!;
|
||||
this._closeOnBackPressureLimit = options?.closeOnBackPressureLimit ?? webSocketServerDefaults.closeOnBackPressureLimit!;
|
||||
this._sendPings = options?.sendPings ?? webSocketServerDefaults.sendPings!;
|
||||
this._publishToSelf = options?.publishToSelf ?? webSocketServerDefaults.publishToSelf!;
|
||||
this._perMessageDeflate = options?.perMessageDeflate ?? webSocketServerDefaults.perMessageDeflate!;
|
||||
this.error = options?.onSocketError;
|
||||
this.open = options?.onSocketOpen;
|
||||
this.message = options?.onSocketMessage;
|
||||
this.drain = options?.onSocketDrain;
|
||||
this.close = options?.onSocketClose;
|
||||
}
|
||||
|
||||
get maxPayloadLength(): Bytes {
|
||||
return this._maxPayloadLength;
|
||||
}
|
||||
|
||||
get idleTimeout(): Seconds {
|
||||
return this._idleTimeout;
|
||||
}
|
||||
|
||||
get backPressureLimit(): Bytes {
|
||||
return this._backPressureLimit;
|
||||
}
|
||||
|
||||
get closeOnBackPressureLimit(): boolean {
|
||||
return this._closeOnBackPressureLimit;
|
||||
}
|
||||
|
||||
get sendPings(): boolean {
|
||||
return this._sendPings;
|
||||
}
|
||||
|
||||
get publishToSelf(): boolean {
|
||||
return this._publishToSelf;
|
||||
}
|
||||
|
||||
get perMessageDeflate(): PerMessageDeflate {
|
||||
return this._perMessageDeflate;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
import {LoggerFactory} from '@techniker-me/logger';
|
||||
import WebSocketServer, {WebSocketServerOptions} from './WebSocketServer';
|
||||
import {ServerWebSocket} from 'bun';
|
||||
|
||||
export default class WebSocketServerFactory {
|
||||
public static createWebSocketRelayServer(): WebSocketServer {
|
||||
const logger = LoggerFactory.getLogger('WebSocketRelayServer');
|
||||
const clients = new Set<ServerWebSocket>();
|
||||
const webSocketRelayServerOptions: WebSocketServerOptions = {
|
||||
onSocketError: (client, error) => logger.error(`Error: [%o] [${error.message}]`, client),
|
||||
onSocketOpen: client => {
|
||||
logger.debug('New WebSocketClient [%o]', client);
|
||||
|
||||
clients.add(client);
|
||||
},
|
||||
onSocketMessage: (fromClient, message) => {
|
||||
logger.debug(`Relaying message [%o]`, message);
|
||||
|
||||
for (const client of clients) {
|
||||
if (client === fromClient) {
|
||||
continue;
|
||||
}
|
||||
|
||||
client.send(message);
|
||||
}
|
||||
},
|
||||
onSocketClose: client => clients.delete(client),
|
||||
onSocketDrain: client => logger.debug('Client drain [%o]', client),
|
||||
publishToSelf: false
|
||||
};
|
||||
|
||||
return new WebSocketServer(webSocketRelayServerOptions);
|
||||
}
|
||||
private constructor() {
|
||||
throw new Error('WebSocketServerFactory is a static class that may not be instantiated');
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user