fixed tests

This commit is contained in:
2025-09-05 00:36:54 -04:00
commit a536668a0b
48 changed files with 8187 additions and 0 deletions

106
src/ServerFactory.ts Normal file
View File

@@ -0,0 +1,106 @@
import { ClientManager } from './services/ClientManager.ts';
import { SignalingService } from './services/SignalingService.ts';
import type { ISignalingMessage } from './interfaces/ISignalingMessage.ts';
import publisherHtml from '../public/publisher.html';
import subscriberHtml from '../public/subscriber.html';
import type { Server, ServerWebSocket } from 'bun';
export class ServerFactory {
private constructor() {
throw new Error('ServerFactory is a static class and cannot be instantiated');
}
public static createServer(port: number) {
const clientManager = new ClientManager();
const signalingService = new SignalingService(clientManager);
const clientSessions = new Map<object, string>();
const server = Bun.serve({
port: port,
fetch: (request: Request, server: Server) => {
const success = server.upgrade(request);
if (success) {
// Bun automatically returns a 101 Switching Protocols
// if the upgrade succeeds
return undefined;
}
// handle HTTP request normally
return new Response("Hello world!");
},
routes: {
'/': () => new Response(`
<html>
<body>
<h1>WebRTC Broadcasting</h1>
<p><a href="/publisher">Publisher Interface</a></p>
<p><a href="/subscriber">Subscriber Interface</a></p>
</body>
</html>
`, { headers: { 'Content-Type': 'text/html' } }),
'/publisher': publisherHtml,
'/subscriber': subscriberHtml,
},
websocket: {
open(ws: ServerWebSocket<unknown>) {
console.log('WebSocket opened');
// Safely access the request URL from the WebSocket's data property
let url: URL;
try {
// ws.data is of type unknown, so we need to assert its shape
const data = ws.data as { url?: string };
if (!data || typeof data.url !== 'string') {
ws.close(1002, 'Missing or invalid URL in WebSocket data');
return;
}
url = new URL(data.url, 'http://localhost:3000');
} catch (e) {
ws.close(1002, 'Malformed URL in WebSocket data');
return;
}
const role = url.searchParams.get('role') as 'publisher' | 'subscriber';
if (role !== 'publisher' && role !== 'subscriber') {
ws.close(1002, 'Invalid role parameter');
return;
}
const clientId = signalingService.handleConnection(ws, role);
clientSessions.set(ws, clientId);
},
message(ws: ServerWebSocket<unknown>, message: string | Buffer) {
console.log('WebSocket message received [%o]', message);
const clientId = clientSessions.get(ws);
if (!clientId) return;
try {
const parsedMessage: ISignalingMessage = JSON.parse(
typeof message === 'string' ? message : message.toString()
);
signalingService.handleMessage(clientId, parsedMessage);
} catch (error) {
console.error('Failed to parse message:', error);
}
},
close(ws: ServerWebSocket<unknown>) {
console.log('WebSocket closed');
const clientId = clientSessions.get(ws);
if (clientId) {
signalingService.handleDisconnection(clientId);
clientSessions.delete(ws);
}
}
},
development: {
hmr: true,
console: true
}
});
return server;
}
}

View File

@@ -0,0 +1,88 @@
import type { IMediaServerConfig } from '../interfaces/ISFUTypes.ts';
export const mediaServerConfig: IMediaServerConfig = {
listenIp: '0.0.0.0',
announcedIp: '127.0.0.1', // Change to your server's public IP in production
mediasoupSettings: {
worker: {
logLevel: 'warn',
logTags: [
'info',
'ice',
'dtls',
'rtp',
'srtp',
'rtcp',
'rtx',
'bwe',
'score',
'simulcast',
'svc',
'sctp'
],
rtcMinPort: 10000,
rtcMaxPort: 10100
},
router: {
mediaCodecs: [
{
kind: 'audio' as const,
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2
},
{
kind: 'video' as const,
mimeType: 'video/VP8',
clockRate: 90000,
parameters: {
'x-google-start-bitrate': 1000
}
},
{
kind: 'video' as const,
mimeType: 'video/VP9',
clockRate: 90000,
parameters: {
'profile-id': 2,
'x-google-start-bitrate': 1000
}
},
{
kind: 'video' as const,
mimeType: 'video/h264',
clockRate: 90000,
parameters: {
'packetization-mode': 1,
'profile-level-id': '4d0032',
'level-asymmetry-allowed': 1,
'x-google-start-bitrate': 1000
}
},
{
kind: 'video' as const,
mimeType: 'video/H264',
clockRate: 90000,
parameters: {
'packetization-mode': 1,
'profile-level-id': '42e01f',
'level-asymmetry-allowed': 1,
'x-google-start-bitrate': 1000
}
}
]
},
webRtcTransport: {
listenIps: [
{
ip: '0.0.0.0',
announcedIp: '127.0.0.1' // Change to your server's public IP in production
}
],
initialAvailableOutgoingBitrate: 1000000,
minimumAvailableOutgoingBitrate: 600000,
maxSctpMessageSize: 262144,
maxIncomingBitrate: 1500000
}
}
};

7
src/index.ts Normal file
View File

@@ -0,0 +1,7 @@
import { ServerFactory } from './ServerFactory.ts';
const server = ServerFactory.createServer(3000);
console.log(`WebRTC Broadcasting server running on http://localhost:${server.port}`);
console.log(`Publisher: http://localhost:${server.port}/publisher`);
console.log(`Subscriber: http://localhost:${server.port}/subscriber`);

View File

@@ -0,0 +1,81 @@
import type {
Router,
Transport,
Producer,
Consumer,
WebRtcTransport,
PlainTransport,
RtpCapabilities,
DtlsParameters,
IceCandidate,
IceParameters,
MediaKind,
RtpParameters
} from 'mediasoup/node/lib/types';
export interface ISFUClient {
id: string;
role: 'publisher' | 'subscriber';
ws: any;
transport?: WebRtcTransport;
producer?: Producer;
consumers: Map<string, Consumer>;
}
export interface IMediaServerConfig {
listenIp: string;
announcedIp: string;
mediasoupSettings: {
worker: {
logLevel: 'debug' | 'warn' | 'error';
logTags: string[];
rtcMinPort: number;
rtcMaxPort: number;
};
router: {
mediaCodecs: Array<{
kind: MediaKind;
mimeType: string;
clockRate: number;
channels?: number;
parameters?: any;
}>;
};
webRtcTransport: {
listenIps: Array<{
ip: string;
announcedIp?: string;
}>;
initialAvailableOutgoingBitrate: number;
minimumAvailableOutgoingBitrate: number;
maxSctpMessageSize: number;
maxIncomingBitrate: number;
};
};
}
export interface ISFUSignalingMessage {
type: 'join' | 'leave' | 'getRouterRtpCapabilities' | 'createWebRtcTransport' |
'connectWebRtcTransport' | 'produce' | 'consume' | 'resume' | 'pause' |
'close' | 'getProducers' | 'restartIce';
data?: any;
clientId?: string;
}
export interface ITransportOptions {
id: string;
iceParameters: IceParameters;
iceCandidates: IceCandidate[];
dtlsParameters: DtlsParameters;
}
export interface IProducerOptions {
id: string;
kind: MediaKind;
rtpParameters: RtpParameters;
}
export interface IConsumerOptions {
producerId: string;
rtpCapabilities: RtpCapabilities;
}

View File

@@ -0,0 +1,12 @@
export interface ISignalingMessage {
type: 'offer' | 'answer' | 'ice-candidate' | 'join' | 'leave' | 'publisher-joined' | 'publisher-left';
data?: any;
senderId?: string;
targetId?: string;
}
export interface IWebSocketClient {
id: string;
ws: any;
role: 'publisher' | 'subscriber' | 'unknown';
}

View File

@@ -0,0 +1,63 @@
import type { IWebSocketClient, ISignalingMessage } from '../interfaces/ISignalingMessage.ts';
export class ClientManager {
private clients: Map<string, IWebSocketClient> = new Map();
private publisher: IWebSocketClient | null = null;
addClient(client: IWebSocketClient): void {
this.clients.set(client.id, client);
if (client.role === 'publisher') {
this.publisher = client;
this.notifySubscribersPublisherJoined();
}
}
removeClient(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
if (client.role === 'publisher') {
this.publisher = null;
this.notifySubscribersPublisherLeft();
}
this.clients.delete(clientId);
}
getClient(clientId: string): IWebSocketClient | undefined {
return this.clients.get(clientId);
}
getPublisher(): IWebSocketClient | null {
return this.publisher;
}
getSubscribers(): IWebSocketClient[] {
return Array.from(this.clients.values()).filter(client => client.role === 'subscriber');
}
getAllClients(): IWebSocketClient[] {
return Array.from(this.clients.values());
}
private notifySubscribersPublisherJoined(): void {
const message: ISignalingMessage = {
type: 'publisher-joined'
};
this.getSubscribers().forEach(subscriber => {
subscriber.ws.send(JSON.stringify(message));
});
}
private notifySubscribersPublisherLeft(): void {
const message: ISignalingMessage = {
type: 'publisher-left'
};
this.getSubscribers().forEach(subscriber => {
subscriber.ws.send(JSON.stringify(message));
});
}
}

View File

@@ -0,0 +1,97 @@
import * as mediasoup from 'mediasoup';
import type { Worker, Router } from 'mediasoup/node/lib/types';
import type { IMediaServerConfig } from '../interfaces/ISFUTypes.ts';
export class MediaServerManager {
private worker: Worker | null = null;
private router: Router | null = null;
private config: IMediaServerConfig;
constructor(config: IMediaServerConfig) {
this.config = config;
}
async initialize(): Promise<void> {
// Create mediasoup worker
this.worker = await mediasoup.createWorker({
logLevel: this.config.mediasoupSettings.worker.logLevel,
logTags: this.config.mediasoupSettings.worker.logTags,
rtcMinPort: this.config.mediasoupSettings.worker.rtcMinPort,
rtcMaxPort: this.config.mediasoupSettings.worker.rtcMaxPort,
});
this.worker.on('died', () => {
console.error('Mediasoup worker died, exiting in 2 seconds... [pid:%d]', this.worker?.pid);
setTimeout(() => process.exit(1), 2000);
});
// Create router
this.router = await this.worker.createRouter({
mediaCodecs: this.config.mediasoupSettings.router.mediaCodecs,
});
console.log('MediaServer initialized successfully');
console.log('Worker PID:', this.worker.pid);
console.log('Router ID:', this.router.id);
}
getRouter(): Router {
if (!this.router) {
throw new Error('Router not initialized');
}
return this.router;
}
getWorker(): Worker {
if (!this.worker) {
throw new Error('Worker not initialized');
}
return this.worker;
}
async createWebRtcTransport() {
const router = this.getRouter();
const transport = await router.createWebRtcTransport({
listenIps: this.config.mediasoupSettings.webRtcTransport.listenIps,
enableUdp: true,
enableTcp: true,
preferUdp: true,
initialAvailableOutgoingBitrate:
this.config.mediasoupSettings.webRtcTransport.initialAvailableOutgoingBitrate,
minimumAvailableOutgoingBitrate:
this.config.mediasoupSettings.webRtcTransport.minimumAvailableOutgoingBitrate,
maxSctpMessageSize: this.config.mediasoupSettings.webRtcTransport.maxSctpMessageSize,
maxIncomingBitrate: this.config.mediasoupSettings.webRtcTransport.maxIncomingBitrate,
});
transport.on('dtlsstatechange', (dtlsState) => {
if (dtlsState === 'closed') {
transport.close();
}
});
transport.on('@close', () => {
console.log('Transport closed');
});
transport.on('routerclose', () => {
transport.close();
});
return transport;
}
getRtpCapabilities() {
return this.getRouter().rtpCapabilities;
}
async close(): Promise<void> {
if (this.router) {
this.router.close();
}
if (this.worker) {
this.worker.close();
}
}
}

View File

@@ -0,0 +1,117 @@
import type { WebRtcTransport, Producer, Consumer } from 'mediasoup/node/lib/types';
import type { ISFUClient } from '../interfaces/ISFUTypes.ts';
export class SFUClientManager {
private clients: Map<string, ISFUClient> = new Map();
private producers: Map<string, Producer> = new Map(); // ProducerId -> Producer
private consumers: Map<string, Consumer> = new Map(); // ConsumerId -> Consumer
addClient(client: ISFUClient): void {
this.clients.set(client.id, client);
console.log(`Client ${client.id} (${client.role}) added`);
}
removeClient(clientId: string): void {
const client = this.clients.get(clientId);
if (!client) return;
// Close transport
if (client.transport) {
client.transport.close();
}
// Close producer if publisher
if (client.producer) {
this.producers.delete(client.producer.id);
client.producer.close();
}
// Close consumers if subscriber
client.consumers.forEach((consumer, consumerId) => {
this.consumers.delete(consumerId);
consumer.close();
});
this.clients.delete(clientId);
console.log(`Client ${clientId} removed`);
}
getClient(clientId: string): ISFUClient | undefined {
return this.clients.get(clientId);
}
setClientTransport(clientId: string, transport: WebRtcTransport): void {
const client = this.clients.get(clientId);
if (client) {
client.transport = transport;
}
}
setClientProducer(clientId: string, producer: Producer): void {
const client = this.clients.get(clientId);
if (client) {
client.producer = producer;
this.producers.set(producer.id, producer);
console.log(`Producer ${producer.id} created for client ${clientId}`);
}
}
addClientConsumer(clientId: string, consumer: Consumer): void {
const client = this.clients.get(clientId);
if (client) {
client.consumers.set(consumer.id, consumer);
this.consumers.set(consumer.id, consumer);
console.log(`Consumer ${consumer.id} created for client ${clientId}`);
}
}
removeClientConsumer(clientId: string, consumerId: string): void {
const client = this.clients.get(clientId);
if (client) {
const consumer = client.consumers.get(consumerId);
if (consumer) {
consumer.close();
client.consumers.delete(consumerId);
this.consumers.delete(consumerId);
}
}
}
getPublishers(): ISFUClient[] {
return Array.from(this.clients.values()).filter(client =>
client.role === 'publisher' && client.producer
);
}
getSubscribers(): ISFUClient[] {
return Array.from(this.clients.values()).filter(client =>
client.role === 'subscriber'
);
}
getAllProducers(): Producer[] {
return Array.from(this.producers.values());
}
getProducer(producerId: string): Producer | undefined {
return this.producers.get(producerId);
}
getConsumer(consumerId: string): Consumer | undefined {
return this.consumers.get(consumerId);
}
getClientCount(): number {
return this.clients.size;
}
getStats() {
return {
clients: this.clients.size,
publishers: this.getPublishers().length,
subscribers: this.getSubscribers().length,
producers: this.producers.size,
consumers: this.consumers.size
};
}
}

View File

@@ -0,0 +1,285 @@
import type { RtpCapabilities, DtlsParameters, IceCandidate, IceParameters, RtpParameters } from 'mediasoup/node/lib/types';
import { MediaServerManager } from './MediaServerManager.ts';
import { SFUClientManager } from './SFUClientManager.ts';
import type { ISFUClient, ISFUSignalingMessage } from '../interfaces/ISFUTypes.ts';
export class SFUSignalingService {
private mediaServer: MediaServerManager;
private clientManager: SFUClientManager;
constructor(mediaServer: MediaServerManager, clientManager: SFUClientManager) {
this.mediaServer = mediaServer;
this.clientManager = clientManager;
}
handleConnection(ws: any, role: 'publisher' | 'subscriber'): string {
const clientId = this.generateClientId();
const client: ISFUClient = {
id: clientId,
role,
ws,
consumers: new Map()
};
this.clientManager.addClient(client);
// Send join confirmation
this.sendMessage(clientId, {
type: 'join',
data: { clientId, role }
});
return clientId;
}
handleDisconnection(clientId: string): void {
this.clientManager.removeClient(clientId);
}
async handleMessage(clientId: string, message: ISFUSignalingMessage): Promise<void> {
const client = this.clientManager.getClient(clientId);
if (!client) {
console.warn(`Message from unknown client: ${clientId}`);
return;
}
try {
switch (message.type) {
case 'getRouterRtpCapabilities':
await this.handleGetRouterRtpCapabilities(client);
break;
case 'createWebRtcTransport':
await this.handleCreateWebRtcTransport(client);
break;
case 'connectWebRtcTransport':
await this.handleConnectWebRtcTransport(client, message.data);
break;
case 'produce':
await this.handleProduce(client, message.data);
break;
case 'consume':
await this.handleConsume(client, message.data);
break;
case 'resume':
await this.handleResume(client, message.data);
break;
case 'pause':
await this.handlePause(client, message.data);
break;
case 'getProducers':
await this.handleGetProducers(client);
break;
case 'restartIce':
await this.handleRestartIce(client, message.data);
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
} catch (error) {
console.error(`Error handling message ${message.type} from client ${clientId}:`, error);
this.sendMessage(clientId, {
type: 'error',
data: { message: error.message }
});
}
}
private async handleGetRouterRtpCapabilities(client: ISFUClient): Promise<void> {
const rtpCapabilities = this.mediaServer.getRtpCapabilities();
this.sendMessage(client.id, {
type: 'routerRtpCapabilities',
data: { rtpCapabilities }
});
}
private async handleCreateWebRtcTransport(client: ISFUClient): Promise<void> {
const transport = await this.mediaServer.createWebRtcTransport();
this.clientManager.setClientTransport(client.id, transport);
const transportOptions = {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters
};
this.sendMessage(client.id, {
type: 'webRtcTransportCreated',
data: transportOptions
});
}
private async handleConnectWebRtcTransport(client: ISFUClient, data: {
dtlsParameters: DtlsParameters
}): Promise<void> {
if (!client.transport) {
throw new Error('Transport not found');
}
await client.transport.connect({ dtlsParameters: data.dtlsParameters });
this.sendMessage(client.id, {
type: 'webRtcTransportConnected',
data: {}
});
}
private async handleProduce(client: ISFUClient, data: {
kind: 'audio' | 'video',
rtpParameters: RtpParameters
}): Promise<void> {
if (!client.transport) {
throw new Error('Transport not found');
}
if (client.role !== 'publisher') {
throw new Error('Only publishers can produce');
}
const producer = await client.transport.produce({
kind: data.kind,
rtpParameters: data.rtpParameters
});
this.clientManager.setClientProducer(client.id, producer);
this.sendMessage(client.id, {
type: 'produced',
data: { producerId: producer.id }
});
// Notify all subscribers about new producer
this.notifySubscribersNewProducer(producer.id);
}
private async handleConsume(client: ISFUClient, data: {
producerId: string,
rtpCapabilities: RtpCapabilities
}): Promise<void> {
if (!client.transport) {
throw new Error('Transport not found');
}
if (client.role !== 'subscriber') {
throw new Error('Only subscribers can consume');
}
const producer = this.clientManager.getProducer(data.producerId);
if (!producer) {
throw new Error('Producer not found');
}
const router = this.mediaServer.getRouter();
if (!router.canConsume({
producerId: data.producerId,
rtpCapabilities: data.rtpCapabilities
})) {
throw new Error('Cannot consume');
}
const consumer = await client.transport.consume({
producerId: data.producerId,
rtpCapabilities: data.rtpCapabilities,
paused: true // Start paused
});
this.clientManager.addClientConsumer(client.id, consumer);
this.sendMessage(client.id, {
type: 'consumed',
data: {
consumerId: consumer.id,
producerId: data.producerId,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters
}
});
}
private async handleResume(client: ISFUClient, data: { consumerId: string }): Promise<void> {
const consumer = this.clientManager.getConsumer(data.consumerId);
if (!consumer) {
throw new Error('Consumer not found');
}
await consumer.resume();
this.sendMessage(client.id, {
type: 'resumed',
data: { consumerId: data.consumerId }
});
}
private async handlePause(client: ISFUClient, data: { consumerId: string }): Promise<void> {
const consumer = this.clientManager.getConsumer(data.consumerId);
if (!consumer) {
throw new Error('Consumer not found');
}
await consumer.pause();
this.sendMessage(client.id, {
type: 'paused',
data: { consumerId: data.consumerId }
});
}
private async handleGetProducers(client: ISFUClient): Promise<void> {
const producers = this.clientManager.getAllProducers();
const producerList = producers.map(producer => ({
id: producer.id,
kind: producer.kind
}));
this.sendMessage(client.id, {
type: 'producers',
data: { producers: producerList }
});
}
private async handleRestartIce(client: ISFUClient, data: any): Promise<void> {
if (!client.transport) {
throw new Error('Transport not found');
}
const iceParameters = await client.transport.restartIce();
this.sendMessage(client.id, {
type: 'iceRestarted',
data: { iceParameters }
});
}
private notifySubscribersNewProducer(producerId: string): void {
const subscribers = this.clientManager.getSubscribers();
subscribers.forEach(subscriber => {
this.sendMessage(subscriber.id, {
type: 'newProducer',
data: { producerId }
});
});
}
private sendMessage(clientId: string, message: any): void {
const client = this.clientManager.getClient(clientId);
if (client && client.ws) {
client.ws.send(JSON.stringify(message));
}
}
private generateClientId(): string {
return Math.random().toString(36).substring(2, 15) + Date.now().toString(36);
}
getStats() {
return {
...this.clientManager.getStats(),
mediaServer: {
workerId: this.mediaServer.getWorker().pid,
routerId: this.mediaServer.getRouter().id
}
};
}
}

View File

@@ -0,0 +1,116 @@
import type { ISignalingMessage, IWebSocketClient } from '../interfaces/ISignalingMessage.ts';
import { ClientManager } from './ClientManager.ts';
export class SignalingService {
private clientManager: ClientManager;
constructor(clientManager: ClientManager) {
this.clientManager = clientManager;
}
handleConnection(ws: any, role: 'publisher' | 'subscriber' | 'unknown'): string {
const clientId = this.generateClientId();
const client: IWebSocketClient = {
id: clientId,
ws,
role
};
this.clientManager.addClient(client);
// Only send join message if role is known
if (role !== 'unknown') {
ws.send(JSON.stringify({
type: 'join',
data: { clientId, role }
}));
}
return clientId;
}
handleDisconnection(clientId: string): void {
this.clientManager.removeClient(clientId);
}
handleMessage(clientId: string, message: ISignalingMessage): void {
const client = this.clientManager.getClient(clientId);
if (!client) return;
switch (message.type) {
case 'join':
this.handleJoin(client, message);
break;
case 'offer':
this.handleOffer(client, message);
break;
case 'answer':
this.handleAnswer(client, message);
break;
case 'ice-candidate':
this.handleIceCandidate(client, message);
break;
}
}
private handleJoin(client: IWebSocketClient, message: ISignalingMessage): void {
if (client.role === 'unknown' && message.data?.role) {
const role = message.data.role as 'publisher' | 'subscriber';
if (role === 'publisher' || role === 'subscriber') {
client.role = role;
client.ws.send(JSON.stringify({
type: 'join',
data: { clientId: client.id, role }
}));
} else {
client.ws.close(1002, 'Invalid role parameter');
}
}
}
private handleOffer(sender: IWebSocketClient, message: ISignalingMessage): void {
if (sender.role === 'publisher') {
const subscribers = this.clientManager.getSubscribers();
subscribers.forEach(subscriber => {
subscriber.ws.send(JSON.stringify({
...message,
senderId: sender.id
}));
});
}
}
private handleAnswer(sender: IWebSocketClient, message: ISignalingMessage): void {
const publisher = this.clientManager.getPublisher();
if (publisher && sender.role === 'subscriber') {
publisher.ws.send(JSON.stringify({
...message,
senderId: sender.id
}));
}
}
private handleIceCandidate(sender: IWebSocketClient, message: ISignalingMessage): void {
if (sender.role === 'publisher') {
const subscribers = this.clientManager.getSubscribers();
subscribers.forEach(subscriber => {
subscriber.ws.send(JSON.stringify({
...message,
senderId: sender.id
}));
});
} else if (sender.role === 'subscriber') {
const publisher = this.clientManager.getPublisher();
if (publisher) {
publisher.ws.send(JSON.stringify({
...message,
senderId: sender.id
}));
}
}
}
private generateClientId(): string {
return Math.random().toString(36).substring(2, 15);
}
}

442
src/sfu-demo-server.ts Normal file
View File

@@ -0,0 +1,442 @@
// Simplified SFU Demo Server - Shows SFU Architecture Concepts
// This demonstrates SFU principles without requiring full mediasoup integration
import publisherHtml from './public/sfu-publisher.html';
import subscriberHtml from './public/sfu-subscriber.html';
// Enhanced client for SFU simulation
interface SFUSimClient {
id: string;
role: 'publisher' | 'subscriber';
ws: any;
streamId?: string;
quality?: 'low' | 'medium' | 'high';
bitrate?: number;
connectedAt: number;
}
class SFUDemoServer {
private clients: Map<string, SFUSimClient> = new Map();
private clientSessions: Map<any, string> = new Map();
private serverStats = {
totalClients: 0,
publishers: 0,
subscribers: 0,
streamsForwarded: 0,
totalBandwidth: 0
};
async start(): Promise<void> {
const server = Bun.serve({
port: 3001,
routes: {
'/': () => new Response(`
<html>
<head>
<title>SFU Demo Server</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 40px auto; padding: 20px; }
.hero { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; border-radius: 10px; margin-bottom: 30px; }
.hero h1 { margin: 0 0 10px 0; font-size: 2.5em; }
.hero p { margin: 0; font-size: 1.2em; opacity: 0.9; }
.links { display: flex; gap: 20px; justify-content: center; margin: 30px 0; }
.link { background: #2196F3; color: white; padding: 15px 30px; text-decoration: none; border-radius: 8px; font-weight: bold; transition: all 0.3s; }
.link:hover { transform: translateY(-2px); box-shadow: 0 4px 12px rgba(33,150,243,0.3); }
.link.stats { background: #4CAF50; }
.features { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin: 30px 0; }
.feature { background: #f5f5f5; padding: 20px; border-radius: 8px; border-left: 4px solid #2196F3; }
.feature h3 { margin: 0 0 10px 0; color: #1976d2; }
.vs { background: #fff3e0; padding: 20px; border-radius: 8px; margin: 20px 0; border-left: 4px solid #ff9800; }
.vs h3 { color: #f57c00; }
</style>
</head>
<body>
<div class="hero">
<h1>🚀 SFU Demo Server</h1>
<p>Selective Forwarding Unit - Scalable WebRTC Broadcasting</p>
</div>
<div class="links">
<a href="/publisher" class="link">📡 Publisher Interface</a>
<a href="/subscriber" class="link">📺 Subscriber Interface</a>
<a href="/stats" class="link stats">📊 Live Statistics</a>
</div>
<div class="features">
<div class="feature">
<h3>🔄 Stream Forwarding</h3>
<p>Server receives one stream from publisher and forwards optimized versions to all subscribers</p>
</div>
<div class="feature">
<h3>📈 Scalability</h3>
<p>Publisher bandwidth stays constant regardless of subscriber count</p>
</div>
<div class="feature">
<h3>⚡ Adaptive Bitrate</h3>
<p>Server automatically adjusts stream quality based on subscriber capabilities</p>
</div>
<div class="feature">
<h3>🛡️ Reliability</h3>
<p>Server handles connection management, recovery, and optimization</p>
</div>
</div>
<div class="vs">
<h3>📊 SFU vs Mesh Comparison</h3>
<p><strong>Mesh:</strong> Publisher bandwidth = Subscriber count × Stream bitrate (doesn't scale)</p>
<p><strong>SFU:</strong> Publisher bandwidth = 1 × Stream bitrate (scales to thousands)</p>
</div>
</body>
</html>
`, { headers: { 'Content-Type': 'text/html' } }),
'/publisher': publisherHtml,
'/subscriber': subscriberHtml,
'/stats': this.handleStatsRequest.bind(this)
},
websocket: {
open: this.handleWebSocketOpen.bind(this),
message: this.handleWebSocketMessage.bind(this),
close: this.handleWebSocketClose.bind(this)
},
development: {
hmr: true,
console: true
}
});
// Simulate SFU processing
setInterval(() => {
this.simulateSFUProcessing();
}, 1000);
console.log(`🚀 SFU Demo Server running on http://localhost:${server.port}`);
console.log(`📡 Publisher: http://localhost:${server.port}/publisher`);
console.log(`📺 Subscriber: http://localhost:${server.port}/subscriber`);
console.log(`📊 Stats: http://localhost:${server.port}/stats`);
console.log('\n🎯 SFU Demo Features:');
console.log(' • Simulates SFU stream forwarding');
console.log(' • Shows scalability advantages');
console.log(' • Demonstrates adaptive bitrate');
console.log(' • Real-time statistics');
}
private handleWebSocketOpen(ws: any, req: Request): void {
const url = new URL(req.url);
const role = url.searchParams.get('role') as 'publisher' | 'subscriber';
if (!role || (role !== 'publisher' && role !== 'subscriber')) {
ws.close(1002, 'Invalid role parameter');
return;
}
const clientId = this.generateClientId();
const client: SFUSimClient = {
id: clientId,
role,
ws,
quality: 'medium',
bitrate: role === 'publisher' ? 2500 : 0,
connectedAt: Date.now()
};
this.clients.set(clientId, client);
this.clientSessions.set(ws, clientId);
this.updateStats();
// Send join confirmation
ws.send(JSON.stringify({
type: 'join',
data: { clientId, role }
}));
console.log(`🔌 Client connected: ${clientId} (${role})`);
// Simulate SFU capabilities exchange
if (role === 'publisher') {
setTimeout(() => {
ws.send(JSON.stringify({
type: 'routerRtpCapabilities',
data: { rtpCapabilities: this.getMockRtpCapabilities() }
}));
}, 100);
}
}
private async handleWebSocketMessage(ws: any, message: string | ArrayBuffer): Promise<void> {
const clientId = this.clientSessions.get(ws);
if (!clientId) return;
try {
const parsedMessage: any = JSON.parse(message.toString());
await this.handleSFUMessage(clientId, parsedMessage);
} catch (error) {
console.error('Failed to handle message:', error);
}
}
private async handleSFUMessage(clientId: string, message: any): Promise<void> {
const client = this.clients.get(clientId);
if (!client) return;
switch (message.type) {
case 'getRouterRtpCapabilities':
client.ws.send(JSON.stringify({
type: 'routerRtpCapabilities',
data: { rtpCapabilities: this.getMockRtpCapabilities() }
}));
break;
case 'createWebRtcTransport':
client.ws.send(JSON.stringify({
type: 'webRtcTransportCreated',
data: {
id: this.generateClientId(),
iceParameters: {},
iceCandidates: [],
dtlsParameters: {}
}
}));
break;
case 'connectWebRtcTransport':
client.ws.send(JSON.stringify({
type: 'webRtcTransportConnected',
data: {}
}));
break;
case 'produce':
if (client.role === 'publisher') {
const producerId = this.generateClientId();
client.streamId = producerId;
client.ws.send(JSON.stringify({
type: 'produced',
data: { producerId }
}));
// Notify all subscribers about new stream
this.notifySubscribersNewStream(producerId);
this.serverStats.streamsForwarded++;
}
break;
case 'consume':
if (client.role === 'subscriber') {
const consumerId = this.generateClientId();
client.ws.send(JSON.stringify({
type: 'consumed',
data: {
consumerId,
producerId: message.data.producerId,
kind: 'video',
rtpParameters: {}
}
}));
}
break;
case 'resume':
client.ws.send(JSON.stringify({
type: 'resumed',
data: { consumerId: message.data.consumerId }
}));
break;
case 'getProducers':
const publishers = Array.from(this.clients.values())
.filter(c => c.role === 'publisher' && c.streamId);
client.ws.send(JSON.stringify({
type: 'producers',
data: {
producers: publishers.map(p => ({
id: p.streamId,
kind: 'video'
}))
}
}));
break;
}
}
private handleWebSocketClose(ws: any): void {
const clientId = this.clientSessions.get(ws);
if (clientId) {
const client = this.clients.get(clientId);
console.log(`🔌 Client disconnected: ${clientId} (${client?.role})`);
this.clients.delete(clientId);
this.clientSessions.delete(ws);
this.updateStats();
}
}
private notifySubscribersNewStream(producerId: string): void {
const subscribers = Array.from(this.clients.values())
.filter(client => client.role === 'subscriber');
subscribers.forEach(subscriber => {
subscriber.ws.send(JSON.stringify({
type: 'newProducer',
data: { producerId }
}));
});
}
private simulateSFUProcessing(): void {
// Simulate SFU bandwidth optimization
const publishers = Array.from(this.clients.values()).filter(c => c.role === 'publisher');
const subscribers = Array.from(this.clients.values()).filter(c => c.role === 'subscriber');
// Calculate total bandwidth (SFU efficiency)
let totalBandwidth = 0;
publishers.forEach(pub => totalBandwidth += pub.bitrate || 0);
subscribers.forEach(sub => totalBandwidth += (sub.bitrate || 1000)); // Receive bitrate
this.serverStats.totalBandwidth = totalBandwidth;
// Simulate adaptive bitrate for subscribers
subscribers.forEach(subscriber => {
const qualities = ['low', 'medium', 'high'];
const bitrates = [800, 1500, 2500];
const qualityIndex = Math.floor(Math.random() * 3);
subscriber.quality = qualities[qualityIndex];
subscriber.bitrate = bitrates[qualityIndex];
});
}
private updateStats(): void {
const clients = Array.from(this.clients.values());
this.serverStats.totalClients = clients.length;
this.serverStats.publishers = clients.filter(c => c.role === 'publisher').length;
this.serverStats.subscribers = clients.filter(c => c.role === 'subscriber').length;
}
private handleStatsRequest(): Response {
const clients = Array.from(this.clients.values());
const uptime = process.uptime();
const html = `
<html>
<head>
<title>SFU Demo Statistics</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; background: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; }
h1 { color: #333; text-align: center; margin-bottom: 30px; }
.stats-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; margin-bottom: 30px; }
.stat-card { background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); text-align: center; }
.stat-value { font-size: 2.5em; font-weight: bold; color: #2196F3; margin-bottom: 5px; }
.stat-label { color: #666; font-size: 0.9em; text-transform: uppercase; }
.clients-table { background: white; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.clients-table table { width: 100%; border-collapse: collapse; }
.clients-table th { background: #2196F3; color: white; padding: 12px; text-align: left; }
.clients-table td { padding: 12px; border-bottom: 1px solid #eee; }
.clients-table tr:hover { background: #f5f5f5; }
.refresh { text-align: center; margin: 20px 0; color: #666; }
.sfu-advantage { background: #e8f5e8; padding: 20px; border-radius: 8px; margin: 20px 0; border-left: 4px solid #4CAF50; }
</style>
<meta http-equiv="refresh" content="3">
</head>
<body>
<div class="container">
<h1>📊 SFU Demo Server Statistics</h1>
<div class="refresh">🔄 Auto-refreshing every 3 seconds</div>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value">${this.serverStats.totalClients}</div>
<div class="stat-label">Total Clients</div>
</div>
<div class="stat-card">
<div class="stat-value">${this.serverStats.publishers}</div>
<div class="stat-label">Publishers</div>
</div>
<div class="stat-card">
<div class="stat-value">${this.serverStats.subscribers}</div>
<div class="stat-label">Subscribers</div>
</div>
<div class="stat-card">
<div class="stat-value">${this.serverStats.streamsForwarded}</div>
<div class="stat-label">Streams Forwarded</div>
</div>
<div class="stat-card">
<div class="stat-value">${Math.round(this.serverStats.totalBandwidth / 1000)}K</div>
<div class="stat-label">Total Bandwidth</div>
</div>
<div class="stat-card">
<div class="stat-value">${Math.round(uptime)}s</div>
<div class="stat-label">Server Uptime</div>
</div>
</div>
<div class="sfu-advantage">
<h3>🚀 SFU Scaling Advantage</h3>
<p><strong>Traditional Mesh:</strong> ${this.serverStats.publishers} publishers × ${this.serverStats.subscribers} subscribers = ${this.serverStats.publishers * this.serverStats.subscribers} connections</p>
<p><strong>SFU Architecture:</strong> ${this.serverStats.publishers} + ${this.serverStats.subscribers} = ${this.serverStats.publishers + this.serverStats.subscribers} connections</p>
<p><strong>Bandwidth Saved:</strong> ${this.serverStats.subscribers > 0 ? Math.round(((this.serverStats.subscribers - 1) / this.serverStats.subscribers) * 100) : 0}% publisher bandwidth reduction</p>
</div>
<div class="clients-table">
<table>
<thead>
<tr>
<th>Client ID</th>
<th>Role</th>
<th>Quality</th>
<th>Bitrate</th>
<th>Connected</th>
</tr>
</thead>
<tbody>
${clients.map(client => `
<tr>
<td>${client.id.substring(0, 8)}...</td>
<td>${client.role}</td>
<td>${client.quality || '-'}</td>
<td>${client.bitrate ? (client.bitrate / 1000).toFixed(1) + 'K' : '-'}</td>
<td>${Math.round((Date.now() - client.connectedAt) / 1000)}s ago</td>
</tr>
`).join('')}
</tbody>
</table>
</div>
<p style="text-align: center; margin-top: 30px;"><a href="/">← Back to Home</a></p>
</div>
</body>
</html>
`;
return new Response(html, {
headers: { 'Content-Type': 'text/html' }
});
}
private getMockRtpCapabilities() {
return {
codecs: [
{
mimeType: 'video/VP8',
clockRate: 90000
},
{
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2
}
]
};
}
private generateClientId(): string {
return Math.random().toString(36).substring(2, 15) + Date.now().toString(36);
}
}
// Start the SFU Demo server
const sfuServer = new SFUDemoServer();
sfuServer.start().catch((error) => {
console.error('❌ Failed to start SFU server:', error);
process.exit(1);
});

192
src/sfu-server.ts Normal file
View File

@@ -0,0 +1,192 @@
import { MediaServerManager } from '../src/services/MediaServerManager.ts';
import { SFUClientManager } from '../src/services/SFUClientManager.ts';
import { SFUSignalingService } from '../src/services/SFUSignalingService.ts';
import { mediaServerConfig } from '../src/config/mediaServerConfig.ts';
import type { ISFUSignalingMessage } from '../src/interfaces/ISFUTypes.ts';
import publisherHtml from './public/sfu-publisher.html';
import subscriberHtml from './public/sfu-subscriber.html';
class SFUServer {
private mediaServer: MediaServerManager;
private clientManager: SFUClientManager;
private signalingService: SFUSignalingService;
private clientSessions: Map<any, string> = new Map();
constructor() {
this.mediaServer = new MediaServerManager(mediaServerConfig);
this.clientManager = new SFUClientManager();
this.signalingService = new SFUSignalingService(this.mediaServer, this.clientManager);
}
async start(): Promise<void> {
// Initialize media server
await this.mediaServer.initialize();
// Start HTTP/WebSocket server
const server = Bun.serve({
port: 3001,
routes: {
'/': () => new Response(`
<html>
<body>
<h1>WebRTC SFU Broadcasting</h1>
<p><strong>SFU Architecture</strong> - Scalable for many subscribers</p>
<p><a href="/publisher">Publisher Interface</a></p>
<p><a href="/subscriber">Subscriber Interface</a></p>
<p><a href="/stats">Server Statistics</a></p>
<hr>
<h2>Architecture Benefits:</h2>
<ul>
<li>✅ <strong>Scalable:</strong> Constant publisher bandwidth</li>
<li>✅ <strong>Efficient:</strong> Server handles stream forwarding</li>
<li>✅ <strong>Reliable:</strong> Server-side bandwidth management</li>
<li>✅ <strong>Adaptive:</strong> Multiple bitrate support</li>
</ul>
</body>
</html>
`, { headers: { 'Content-Type': 'text/html' } }),
'/publisher': publisherHtml,
'/subscriber': subscriberHtml,
'/stats': this.handleStatsRequest.bind(this),
},
websocket: {
open: this.handleWebSocketOpen.bind(this),
message: this.handleWebSocketMessage.bind(this),
close: this.handleWebSocketClose.bind(this)
},
development: {
hmr: true,
console: true
}
});
console.log(`🚀 SFU WebRTC Broadcasting server running on http://localhost:${server.port}`);
console.log(`📡 Publisher: http://localhost:${server.port}/publisher`);
console.log(`📺 Subscriber: http://localhost:${server.port}/subscriber`);
console.log(`📊 Stats: http://localhost:${server.port}/stats`);
console.log('\n🔧 SFU Features:');
console.log(' • Scalable to hundreds of subscribers');
console.log(' • Server-side stream forwarding');
console.log(' • Adaptive bitrate control');
console.log(' • Bandwidth optimization');
}
private handleWebSocketOpen(ws: any, req: Request): void {
const url = new URL(req.url);
const role = url.searchParams.get('role') as 'publisher' | 'subscriber';
if (!role || (role !== 'publisher' && role !== 'subscriber')) {
ws.close(1002, 'Invalid role parameter');
return;
}
const clientId = this.signalingService.handleConnection(ws, role);
this.clientSessions.set(ws, clientId);
console.log(`🔌 Client connected: ${clientId} (${role})`);
}
private async handleWebSocketMessage(ws: any, message: string | ArrayBuffer): Promise<void> {
const clientId = this.clientSessions.get(ws);
if (!clientId) return;
try {
const parsedMessage: ISFUSignalingMessage = JSON.parse(message.toString());
await this.signalingService.handleMessage(clientId, parsedMessage);
} catch (error) {
console.error('Failed to parse/handle message:', error);
}
}
private handleWebSocketClose(ws: any): void {
const clientId = this.clientSessions.get(ws);
if (clientId) {
this.signalingService.handleDisconnection(clientId);
this.clientSessions.delete(ws);
console.log(`🔌 Client disconnected: ${clientId}`);
}
}
private handleStatsRequest(): Response {
const stats = this.signalingService.getStats();
const html = `
<html>
<head>
<title>SFU Server Statistics</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.stat { background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }
.value { font-weight: bold; color: #2196F3; font-size: 1.2em; }
h1 { color: #333; }
.refresh { margin: 20px 0; }
</style>
<meta http-equiv="refresh" content="5">
</head>
<body>
<h1>📊 SFU Server Statistics</h1>
<div class="refresh">🔄 Auto-refreshing every 5 seconds</div>
<div class="stat">
<strong>Total Clients:</strong> <span class="value">${stats.clients}</span>
</div>
<div class="stat">
<strong>Publishers:</strong> <span class="value">${stats.publishers}</span>
</div>
<div class="stat">
<strong>Subscribers:</strong> <span class="value">${stats.subscribers}</span>
</div>
<div class="stat">
<strong>Active Producers:</strong> <span class="value">${stats.producers}</span>
</div>
<div class="stat">
<strong>Active Consumers:</strong> <span class="value">${stats.consumers}</span>
</div>
<div class="stat">
<strong>MediaSoup Worker PID:</strong> <span class="value">${stats.mediaServer.workerId}</span>
</div>
<div class="stat">
<strong>Router ID:</strong> <span class="value">${stats.mediaServer.routerId}</span>
</div>
<p><a href="/">← Back to Home</a></p>
</body>
</html>
`;
return new Response(html, {
headers: { 'Content-Type': 'text/html' }
});
}
async stop(): Promise<void> {
await this.mediaServer.close();
}
}
// Start the SFU server
const sfuServer = new SFUServer();
// Handle graceful shutdown
process.on('SIGINT', async () => {
console.log('🛑 Shutting down SFU server...');
await sfuServer.stop();
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('🛑 Shutting down SFU server...');
await sfuServer.stop();
process.exit(0);
});
sfuServer.start().catch((error) => {
console.error('❌ Failed to start SFU server:', error);
process.exit(1);
});