rename websocket directory to websocket-chat
This commit is contained in:
@@ -0,0 +1,379 @@
|
||||
import {Nullable} from '../../types/optional';
|
||||
import Assert from '../../lang/Assert';
|
||||
import moment from 'moment';
|
||||
import type {Server} from 'node:http';
|
||||
import http from 'node:http';
|
||||
import {EventEmitter} from 'node:events';
|
||||
import type {ILogger} from '@techniker-me/logger';
|
||||
import {LoggerFactory} from '@techniker-me/logger';
|
||||
import IRoutes from './IRoutes';
|
||||
import {Subject} from '@techniker-me/tools';
|
||||
import express, {RequestHandler} from 'express';
|
||||
import morgan from 'morgan';
|
||||
import favicon from 'serve-favicon';
|
||||
import bodyParser from 'body-parser';
|
||||
import multer from 'multer';
|
||||
import {Kilobytes} from '../../types/Units';
|
||||
import responseTime from 'response-time';
|
||||
import onHeaders from 'on-headers';
|
||||
import {LRUCache} from 'lru-cache';
|
||||
|
||||
const requestSizeLimit: Kilobytes = 10240;
|
||||
const defaultTcpSocketTimeout = moment.duration(720, 'seconds'); // Google HTTPS load balancer expects at least 600 seconds
|
||||
const defaultKeepAliveTimeout = moment.duration(660, 'seconds'); // Google HTTPS load balancer expects at least 600 seconds
|
||||
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Max-Age#:~:text=The%20Access%2DControl%2DMax%2D,Headers%20headers)%20can%20be%20cached.
|
||||
const corsAccessControlMaxAge = moment.duration(24, 'hours');
|
||||
const shortTermCaching = 'public, max-age=20, s-maxage=20';
|
||||
const tlsSessionTimeout = moment.duration(5, 'minutes');
|
||||
const maxCachedTlsSessions = 1000;
|
||||
|
||||
export default class HttpServer {
|
||||
private readonly _logger: ILogger = LoggerFactory.getLogger('HttpServer');
|
||||
private readonly _eventEmitter: EventEmitter;
|
||||
private readonly _protocol: 'http' | 'https';
|
||||
private readonly _port: number;
|
||||
private readonly _routes: IRoutes;
|
||||
// @ts-expect-error - unused parameter for future functionality
|
||||
private readonly _viewsPath: object;
|
||||
// @ts-expect-error - unused parameter for future functionality
|
||||
private readonly _viewParameters: string;
|
||||
private readonly _resourcesPaths: string[];
|
||||
private readonly _favicon: string;
|
||||
private readonly _cors: object;
|
||||
private readonly _app: Subject<Nullable<express.Application>>;
|
||||
private readonly _server: Subject<Nullable<Server>>;
|
||||
private readonly _tlsSessionCache = new LRUCache({
|
||||
ttl: tlsSessionTimeout.asMilliseconds(),
|
||||
max: maxCachedTlsSessions
|
||||
});
|
||||
private _jsonHandler: Nullable<express.RequestHandler>;
|
||||
|
||||
constructor(
|
||||
protocol: 'https' | 'http',
|
||||
port: number,
|
||||
routes: IRoutes,
|
||||
viewsPath: object,
|
||||
viewParameters: string,
|
||||
resourcesPaths: string[],
|
||||
favicon: string,
|
||||
cors: object
|
||||
) {
|
||||
Assert.isString('protocol', protocol);
|
||||
Assert.isNumber('port', port);
|
||||
Assert.satisfiesInterface('routes', routes, ['getGETRoutes', 'getPOSTRoutes', 'getPUTRoutes', 'getPATCHRoutes', 'getDELETERoutes']);
|
||||
// Assert.isObjectOf<string>('viewsPath', viewsPath);
|
||||
Assert.isString('viewParameters', viewParameters);
|
||||
Assert.isArrayOf<string>('resourcesPaths', 'string', resourcesPaths);
|
||||
Assert.isString('favicon', favicon);
|
||||
// Assert.isObjectOf<string>('cors', cors, 'string');
|
||||
|
||||
this._protocol = protocol;
|
||||
this._port = port;
|
||||
this._routes = routes;
|
||||
this._viewsPath = viewsPath;
|
||||
this._viewParameters = viewParameters;
|
||||
this._resourcesPaths = resourcesPaths;
|
||||
this._favicon = favicon;
|
||||
this._cors = cors;
|
||||
this._app = new Subject<Nullable<express.Application>>(null);
|
||||
this._eventEmitter = new EventEmitter();
|
||||
this._server = new Subject<Nullable<Server>>(null);
|
||||
this._jsonHandler = null;
|
||||
}
|
||||
|
||||
public start() {
|
||||
return new Promise((resolve, reject) => {
|
||||
const app = (this._app.value = express());
|
||||
this._jsonHandler = bodyParser.json({limit: requestSizeLimit});
|
||||
|
||||
this.configureListener();
|
||||
this.configureMiddleware();
|
||||
this.configureResources();
|
||||
this.configureRoutes();
|
||||
|
||||
app.set('x-powered-by', false);
|
||||
|
||||
const server = (this._server.value = http.createServer(app));
|
||||
|
||||
const onListen = () => {
|
||||
this._logger.info('HTTP Server listening on %s://*:%s', this._protocol, this._port);
|
||||
|
||||
server.removeListener('error', onError);
|
||||
|
||||
resolve(this);
|
||||
};
|
||||
|
||||
const onError = (err: unknown) => {
|
||||
server.removeListener('listening', onListen);
|
||||
|
||||
reject(err);
|
||||
};
|
||||
|
||||
server.keepAliveTimeout = defaultKeepAliveTimeout.milliseconds();
|
||||
server.timeout = defaultTcpSocketTimeout.asMilliseconds();
|
||||
server.setTimeout(defaultTcpSocketTimeout.asMilliseconds());
|
||||
server.once('error', onError);
|
||||
server.once('listening', onListen);
|
||||
|
||||
server.on('request', (req, res) => {
|
||||
this._eventEmitter.emit('request', req.method, req.url, res.statusCode, req.headers);
|
||||
});
|
||||
|
||||
server.on('newSession', (sessionId, sessionData, callback) => {
|
||||
const cacheId = sessionId.toString('hex');
|
||||
|
||||
this._tlsSessionCache.set(cacheId, sessionData);
|
||||
this._logger.debug('Created new TLS session [%s]', cacheId);
|
||||
|
||||
callback();
|
||||
});
|
||||
|
||||
server.on('resumeSession', (sessionId, callback) => {
|
||||
const cacheId = sessionId.toString('hex');
|
||||
const sessionData = this._tlsSessionCache.get(cacheId);
|
||||
|
||||
callback(null, sessionData);
|
||||
|
||||
if (sessionData) {
|
||||
this._logger.debug('Resumed TLS session [%s]', cacheId);
|
||||
} else {
|
||||
this._logger.debug('TLS session [%s] not found', cacheId);
|
||||
}
|
||||
});
|
||||
|
||||
server.listen({
|
||||
port: this._port,
|
||||
backlog: 16 * 1024
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public on(event: string, handler: (...args: unknown[]) => void): void {
|
||||
Assert.isNonEmptyString('event', event);
|
||||
Assert.isFunction('handler', handler);
|
||||
|
||||
this._eventEmitter.on(event, handler);
|
||||
}
|
||||
|
||||
public getServer(): Nullable<Server> {
|
||||
return this._server.value;
|
||||
}
|
||||
|
||||
private configureListener() {
|
||||
if (!this._app.value) {
|
||||
throw new Error('Unable to configure listener, no app instance found');
|
||||
}
|
||||
|
||||
const app = this._app.value;
|
||||
|
||||
app.use((req, res, next) => {
|
||||
req.on('finish', () => {
|
||||
this._eventEmitter.emit('request', req.method, req.url, res.statusCode, req.headers);
|
||||
});
|
||||
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
private configureMiddleware() {
|
||||
if (!this._app.value) {
|
||||
throw new Error('Unable to configure middleware, no app instance found');
|
||||
}
|
||||
|
||||
const app = this._app.value;
|
||||
const logger = this._logger;
|
||||
|
||||
app.use(morgan('common'));
|
||||
app.enable('trust proxy');
|
||||
app.set('env', 'development'); // setting env to test prevents logging to the console
|
||||
app.use(favicon(this._favicon));
|
||||
app.use(
|
||||
morgan(
|
||||
':remote-addr - :remote-user [:date[clf]] ":method :url HTTP/:http-version" :status :res[content-length] ":referrer" ":user-agent" :response-time',
|
||||
{
|
||||
stream: {
|
||||
write(line) {
|
||||
logger.info(line.trim());
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
app.use(
|
||||
bodyParser.text({
|
||||
type: 'application/sdp',
|
||||
limit: requestSizeLimit
|
||||
})
|
||||
);
|
||||
app.use(
|
||||
bodyParser.text({
|
||||
type: 'application/trickle-ice-sdpfrag',
|
||||
limit: requestSizeLimit
|
||||
})
|
||||
);
|
||||
app.use(
|
||||
bodyParser.urlencoded({
|
||||
extended: true,
|
||||
limit: requestSizeLimit
|
||||
})
|
||||
);
|
||||
app.use(
|
||||
multer({
|
||||
limits: {
|
||||
fields: 1,
|
||||
fieldNameSize: 100,
|
||||
fieldSize: requestSizeLimit,
|
||||
files: 0,
|
||||
parts: 1,
|
||||
headerPairs: 1
|
||||
}
|
||||
}).none()
|
||||
);
|
||||
app.use((req, _res, next) => {
|
||||
const contentType = req?.headers?.['content-type'] || '';
|
||||
|
||||
if (contentType.startsWith('multipart/form-data;')) {
|
||||
if (req?.body?.jsonBody) {
|
||||
req.body = JSON.parse(req.body.jsonBody);
|
||||
}
|
||||
}
|
||||
|
||||
next();
|
||||
});
|
||||
app.use(responseTime());
|
||||
app.use((req, res, next) => {
|
||||
res.set('x-origination', 'Platform');
|
||||
|
||||
if (req.secure) {
|
||||
res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains; preload');
|
||||
}
|
||||
|
||||
next();
|
||||
});
|
||||
|
||||
if (this._cors) {
|
||||
this._logger.info('Enable CORS on %s://*:%s', this._protocol, this._port);
|
||||
|
||||
const cachedCorsAllowOrigins: Record<string, string> = {};
|
||||
const getCorsAllowOrigin = (url: string) => {
|
||||
let corsAllowOrigin = cachedCorsAllowOrigins[url];
|
||||
|
||||
if (!Object.hasOwn(cachedCorsAllowOrigins, url)) {
|
||||
Object.entries(this._cors).forEach(([key, value]) => {
|
||||
if (url.startsWith(key)) {
|
||||
corsAllowOrigin = value;
|
||||
}
|
||||
});
|
||||
|
||||
cachedCorsAllowOrigins[url] = corsAllowOrigin ?? '';
|
||||
}
|
||||
|
||||
return corsAllowOrigin;
|
||||
};
|
||||
|
||||
app.use((req, res, next) => {
|
||||
const corsAllowOrigin = getCorsAllowOrigin(req.url);
|
||||
|
||||
if (corsAllowOrigin) {
|
||||
res.header('Access-Control-Allow-Origin', corsAllowOrigin);
|
||||
res.header(
|
||||
'Access-Control-Allow-Headers',
|
||||
'Authorization, Origin, Range, X-Requested-With, If-Modified-Since, Accept, Keep-Alive, Cache-Control, Content-Type, DNT'
|
||||
);
|
||||
res.header('Access-Control-Allow-Methods', 'POST, GET, HEAD, OPTIONS, PUT, PATCH, DELETE');
|
||||
res.header('Access-Control-Expose-Headers', 'Server, Range, Date, Content-Disposition, X-Timer, ETag, Link, Location');
|
||||
res.header('Access-Control-Max-Age', corsAccessControlMaxAge.asSeconds().toString());
|
||||
|
||||
if (req.method === 'OPTIONS') {
|
||||
res.header('Cache-Control', shortTermCaching);
|
||||
}
|
||||
}
|
||||
|
||||
next();
|
||||
});
|
||||
}
|
||||
app.use((_req, res, next) => {
|
||||
const startTimeSeconds = Date.now() / 1000;
|
||||
const startTimeNanoseconds = process.hrtime.bigint();
|
||||
|
||||
onHeaders(res, () => {
|
||||
const durationNanoseconds = process.hrtime.bigint() - startTimeNanoseconds;
|
||||
const durationMilliseconds = durationNanoseconds / 1000000n;
|
||||
|
||||
// https://developer.fastly.com/reference/http/http-headers/X-Timer/
|
||||
// S{unixStartTimeSeconds},VS0,VE{durationMilliseconds}
|
||||
res.setHeader('X-Timer', `S${startTimeSeconds},VS0,VE${durationMilliseconds}`);
|
||||
});
|
||||
|
||||
next();
|
||||
});
|
||||
}
|
||||
|
||||
private configureResources() {
|
||||
if (!this._app.value) {
|
||||
throw new Error('Unable to configure resources, no app instance found');
|
||||
}
|
||||
|
||||
const app = this._app.value;
|
||||
|
||||
for (const resourcePath of this._resourcesPaths) {
|
||||
app.use(express.static(resourcePath));
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
private errorHandler(err: Error, _req: express.Request, res: express.Response, _next: express.NextFunction) {
|
||||
this._logger.error(err.message);
|
||||
|
||||
res.status(500).send({status: 'error'});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
private genericNotFoundHandler(_req: express.Request, res: express.Response) {
|
||||
console.log('Generic not found handler');
|
||||
|
||||
res.status(404).send({status: 'not-found'});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
private configureRoutes() {
|
||||
if (!this._app.value) {
|
||||
throw new Error('Unable to configure routes, no app instance found');
|
||||
}
|
||||
|
||||
const app = this._app.value;
|
||||
|
||||
let catchAllHandler: Nullable<RequestHandler> = null;
|
||||
|
||||
const registerRoutes = (method: string, routes: Record<string, RequestHandler>): void => {
|
||||
for (const route of Object.entries(routes)) {
|
||||
if (!route) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const [name, handler] = route;
|
||||
|
||||
if (name === '*') {
|
||||
if (catchAllHandler) {
|
||||
throw new Error(`Only one catch-all handler can ber registered per server, ignoring conflicting catch-all`);
|
||||
}
|
||||
|
||||
catchAllHandler = handler;
|
||||
continue;
|
||||
}
|
||||
|
||||
this._logger.debug(`Registering [${method}] route [${name}] handler`);
|
||||
app[method.toLowerCase() as 'get' | 'post' | 'put' | 'patch' | 'delete'](name, handler);
|
||||
}
|
||||
};
|
||||
|
||||
registerRoutes('GET', this._routes.getGETRoutes());
|
||||
registerRoutes('POST', this._routes.getPOSTRoutes());
|
||||
registerRoutes('PUT', this._routes.getPUTRoutes());
|
||||
registerRoutes('PATCH', this._routes.getPATCHRoutes());
|
||||
registerRoutes('DELETE', this._routes.getDELETERoutes());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
import type {RequestHandler} from 'express';
|
||||
|
||||
export default interface IRoutes {
|
||||
getGETRoutes(): Record<string, RequestHandler>;
|
||||
getPOSTRoutes(): Record<string, RequestHandler>;
|
||||
getPUTRoutes(): Record<string, RequestHandler>;
|
||||
getPATCHRoutes(): Record<string, RequestHandler>;
|
||||
getDELETERoutes(): Record<string, RequestHandler>;
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
import {LoggerFactory} from '@techniker-me/logger';
|
||||
import {Server as HttpServer} from 'node:http';
|
||||
import {IncomingMessage} from 'node:http';
|
||||
import ws, {WebSocketServer as WSServer, WebSocket} from 'ws';
|
||||
import Assert from '../../lang/Assert';
|
||||
import Strings from '../../lang/Strings';
|
||||
import WebsocketExtensions from 'websocket-extensions';
|
||||
|
||||
export interface ExtendedWebSocket extends WebSocket {
|
||||
id: string;
|
||||
remoteAddress: string;
|
||||
isOpen(): boolean;
|
||||
isClosed(): boolean;
|
||||
}
|
||||
|
||||
function deepClone<T>(obj: T): T {
|
||||
if (obj === null || typeof obj !== 'object') {
|
||||
return obj;
|
||||
}
|
||||
|
||||
if (obj instanceof Date) {
|
||||
return new Date(obj.getTime()) as T;
|
||||
}
|
||||
|
||||
if (obj instanceof Array) {
|
||||
return obj.map(item => deepClone(item)) as T;
|
||||
}
|
||||
|
||||
if (obj instanceof Object) {
|
||||
const copy = {} as T;
|
||||
Object.keys(obj).forEach(key => {
|
||||
copy[key as keyof T] = deepClone(obj[key as keyof T]);
|
||||
});
|
||||
return copy;
|
||||
}
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
function getRemoteAddress(connection: ExtendedWebSocket, req: IncomingMessage): string {
|
||||
// WebSocket doesn't expose socket directly, so we need to use the underlying socket
|
||||
const socket = (connection as WebSocket & {_socket?: {remoteAddress?: string}})._socket;
|
||||
const remoteAddress = socket?.remoteAddress || '::';
|
||||
const xForwardedFor = req.headers['x-forwarded-for'];
|
||||
|
||||
let forwardingRemoteAddressChain: string[] = [];
|
||||
if (typeof xForwardedFor === 'string') {
|
||||
forwardingRemoteAddressChain = xForwardedFor.split(', ').filter(Boolean);
|
||||
} else if (Array.isArray(xForwardedFor)) {
|
||||
forwardingRemoteAddressChain = xForwardedFor.flatMap(addr => addr.split(', ')).filter(Boolean);
|
||||
}
|
||||
|
||||
forwardingRemoteAddressChain.push(remoteAddress);
|
||||
|
||||
// For now, just return the first address (most direct)
|
||||
// TODO: Implement proper proxy trust checking
|
||||
return forwardingRemoteAddressChain[forwardingRemoteAddressChain.length - 1] || '::';
|
||||
}
|
||||
|
||||
export type WebSovketServerOptions = {
|
||||
path?: string;
|
||||
};
|
||||
|
||||
const connectionIdLength = 32;
|
||||
|
||||
export default class WebSocketServer {
|
||||
private readonly _logger = LoggerFactory.getLogger('WebSocketServer');
|
||||
private readonly _httpServer: HttpServer;
|
||||
private readonly _parameters: Record<string, string>;
|
||||
private _server?: WSServer;
|
||||
private _extensions?: WebsocketExtensions;
|
||||
|
||||
constructor(httpServer: HttpServer, parameters: WebSovketServerOptions) {
|
||||
this._httpServer = httpServer;
|
||||
this._parameters = parameters;
|
||||
}
|
||||
|
||||
public start(
|
||||
connectDelegate: (connection: ExtendedWebSocket, req: IncomingMessage) => void,
|
||||
requestDelegate: (connection: ExtendedWebSocket, message: Buffer) => void,
|
||||
disconnectDelegate: (connection: ExtendedWebSocket, reasonCode: number, description: string) => void,
|
||||
pongDelegate: (connection: ExtendedWebSocket, message: Buffer) => void
|
||||
): void {
|
||||
Assert.isFunction('connectDelegate', connectDelegate);
|
||||
Assert.isFunction('requestDelegate', requestDelegate);
|
||||
Assert.isFunction('disconnectDelegate', disconnectDelegate);
|
||||
Assert.isFunction('pongDelegate', pongDelegate);
|
||||
|
||||
const serverOptions = deepClone(this._parameters);
|
||||
|
||||
const address = this._httpServer.address();
|
||||
const port = typeof address === 'string' ? address : address?.port?.toString() || 'unknown';
|
||||
const path = serverOptions['path'] || '/';
|
||||
|
||||
this._logger.info(`Listening on port [${port}] and bound to [${path}]`);
|
||||
|
||||
(serverOptions as Record<string, unknown>)['noServer'] = true;
|
||||
|
||||
this._server = new WSServer(serverOptions);
|
||||
this._extensions = new WebsocketExtensions();
|
||||
|
||||
// this._extensions.add(deflate());
|
||||
(this._server as WSServer & {_server?: HttpServer})._server = this._httpServer;
|
||||
this._httpServer.on('error', this._server.emit.bind(this._server, 'error'));
|
||||
this._httpServer.on('listening', this._server.emit.bind(this._server, 'listening'));
|
||||
this._httpServer.on('upgrade', (req, socket, head) => {
|
||||
this._logger.debug(`[HttpServer] Upgrade to WebSocket: ${req.method} ${req.url}`);
|
||||
if (!req.url?.startsWith(path)) {
|
||||
this._logger.debug(`Skipping upgrade of http request due to incorrect path [${req.url}]`);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this._server!.handleUpgrade(req, socket, head, (ws: WebSocket) => {
|
||||
this._server!.emit('connection', ws, req as IncomingMessage);
|
||||
});
|
||||
});
|
||||
this._httpServer.on('request', (req, res) => {
|
||||
this._logger.debug(`[HttpServer] Request: ${req.method} ${req.url} -> ${res.statusCode}`);
|
||||
});
|
||||
|
||||
this._server.on('error', err => this._logger.error('An error occurred with WebSocket', err));
|
||||
this._server.on('connection', (connection: WebSocket, req: IncomingMessage) => {
|
||||
let closed = false;
|
||||
|
||||
try {
|
||||
const extendedConnection = connection as ExtendedWebSocket;
|
||||
extendedConnection.id = Strings.randomString(connectionIdLength);
|
||||
extendedConnection.remoteAddress = getRemoteAddress(extendedConnection, req);
|
||||
extendedConnection.isOpen = () => connection.readyState === ws.OPEN;
|
||||
extendedConnection.isClosed = () => connection.readyState === ws.CLOSED;
|
||||
|
||||
connection.on('error', (e: Error) => {
|
||||
this._logger.error('An error occurred on websocket', e);
|
||||
});
|
||||
|
||||
connection.on('message', (message: Buffer) => {
|
||||
try {
|
||||
requestDelegate(extendedConnection, message);
|
||||
} catch (e) {
|
||||
this._logger.error('Request handler failed for message [%s]', message, e);
|
||||
}
|
||||
});
|
||||
|
||||
connection.on('close', (reasonCode: number, description: string) => {
|
||||
if (closed) {
|
||||
this._logger.warn('[%s] Multiple close events [%s] [%s] [%s]', extendedConnection.id, extendedConnection.remoteAddress, reasonCode, description);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
closed = true;
|
||||
|
||||
try {
|
||||
disconnectDelegate(extendedConnection, reasonCode, description);
|
||||
} catch (e) {
|
||||
this._logger.error('Disconnect handler failed', e);
|
||||
}
|
||||
});
|
||||
|
||||
connection.on('pong', (message: Buffer) => {
|
||||
try {
|
||||
pongDelegate(extendedConnection, message);
|
||||
} catch (e) {
|
||||
this._logger.error('Pong handler failed', e);
|
||||
}
|
||||
});
|
||||
|
||||
connectDelegate(extendedConnection, req);
|
||||
} catch (e) {
|
||||
this._logger.error('Accept/connect handler failed', e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public stop() {}
|
||||
}
|
||||
Reference in New Issue
Block a user