94 KiB
Walk through forking process
Exported on 12/8/2025 at 05:00:24 EST from Cursor (2.1.50)
User
Walk me through the entire forking process
Cursor
I'll search for the forking process in the codebase to understand how it works.
Let me read the complete forking implementation to understand the full process:
Let me also find the CreateMirror stream handler and the HTTP routes:
Let me read the full streamCreateMirror function and also check the HTTP routes:
Let me also look at the createScaleOutStream function to understand the stream mirroring mechanism:
Now I have a complete picture of the forking process. Let me walk you through the entire flow:
Forking Process Overview
The forking feature allows you to take all the members (and their streams) from a source room/channel and move them to a destination room/channel. This is useful for scenarios like redirecting an audience from one channel to another.
1. API Entry Point
Forking is exposed via REST API routes:
- Channels:
PUT /channel/:channelId/fork/:channelIdToFork - Rooms:
PUT /room/:roomId/fork/:roomIdToFork
function forkRoom(req, res, credentials) {
const roomId = req.params.roomId;
const roomIdToFork = req.params.roomIdToFork;
const streamCapabilities = _.get(req.body, ['streamCapabilities'], []);
const streamTags = _.get(req.body, ['streamTags'], []);
const options = _.get(req.body, ['options'], []);
const desiredRegion = _.get(req.body, ['desiredRegion'], undefined);
// ... validation ...
const forkRoom = {
credentials,
roomId,
sourceRoomId: roomIdToFork,
streamCapabilities,
streamTags,
options,
desiredRegion
};
The route handler validates the request and sends an MQ request to roomapi.Fork.
2. Core Fork Logic (roomapiFork)
The main orchestration happens in RoomApiMQWorker.js:
function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertStringNotEmpty('sourceRoomId', sourceRoomId);
assert.assertArrayOfString('options', options);
assert.assertArrayOfString('streamCapabilities', streamCapabilities);
assert.assertArrayOfString('streamTags', streamTags);
// ...
log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options);
Step 2a: Fetch Members from Both Rooms
return Promise.all([
roomapiGetMembers.call(this, {roomId}, {sentTime}),
roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime})
])
Step 2b: Determine Departing and New Members
Based on the options provided:
let departingMembers = room.members;
let newMembers = sourceRoom.members;
if (_.includes(options, 'additive')) {
departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
}
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
Options:
additive: Only remove members that exist in both roomsforce: Don't filter out overlapping members (replace all)keep-streams: Don't destroy streams when members leave
3. Remove Departing Members
For each departing member:
- Send a
chat.LeaveRoomrequest - Unless
keep-streamsoption is set, destroy their streams with reasonterminated-by-fork
return Promise.map(departingMembers, member => {
const leaveRoom = {
roomId,
sessionId: member.sessionId,
timestamp: _.now()
};
return this._phenixMQ.request('chat.LeaveRoom', leaveRoom)
.then(leaveRoomResponse => {
// ...
if (_.includes(options, 'keep-streams')) {
return leaveRoomResponse;
}
// ...
return Promise.each(member.streams, stream => {
// ...
const reason = 'terminated-by-fork';
const destroyStream = { streamId, reason };
return this._phenixMQ.request('stream.DestroyStream', destroyStream)
})
});
})
4. Add New Members and Mirror Streams
For each new member from the source room, the system:
Step 4a: Mirror Each Stream
For each stream the member has, create a mirror stream via stream.CreateMirror:
return Promise.map(joinRoom.member.streams, stream => {
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
// Only provide capabilities list. In particular, drop wildcard token from origin.
parsedUri.query = {capabilities: streamCapabilities.join(',')};
parsedUri.search = undefined;
const originStreamId = parsedUri.pathname.replace(/%23/g, '#')
.slice(1);
const streamMirror = {
originStreamId,
capabilities: _.compact(streamCapabilities),
tags: _.union(_.compact(streamTags), ['x-phenix-fork']),
options: [],
region: desiredRegion,
canForward: false
};
// Add room/channel tags
if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') {
streamMirror.tags.push(`channelId:${roomId}`);
streamMirror.tags.push(`channelAlias:${room.room.alias}`);
} else {
streamMirror.tags.push(`roomId:${roomId}`);
streamMirror.tags.push(`roomAlias:${room.room.alias}`);
}
return this._phenixMQ.request('stream.CreateMirror', streamMirror)
Step 4b: Update Stream URI
Once the mirrored stream is created, the stream URI is updated to point to the new mirrored stream:
const streamId = createMirrorResponse.payload.mirroredStreamId;
// ...
parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23');
stream.uri = url.format(parsedUri)
.replace(/%23/g, '#');
Step 4c: Join Member to Destination Room
After all streams are mirrored, the member joins the destination room:
return this._phenixMQ.request('chat.JoinRoom', joinRoom);
5. Stream Mirroring (streamCreateMirror)
The stream.CreateMirror handler in StreamManagementMQWorker.js creates a federated copy of the origin stream:
function streamCreateMirror({originStreamId, capabilities, tags, options, region, canForward}) {
assert.assertStringNotEmpty('originStreamId', originStreamId);
assert.assertArrayOfString('tags', tags);
if (canForward && region && region !== this._regionName) {
// Forward to the desired region
return this._phenixMQ.request('stream.CreateMirror', {
originStreamId,
capabilities,
tags,
options,
region,
canForward: false
}, {region})
.then(({status, payload}) => {
if (status !== 'ok') {
return {status};
}
return payload;
});
}
It then calls createScaleOutStream which:
- Selects a target instance using load balancers (primary/secondary fanout)
- Creates TX (transmit) federation stream on the origin instance
- Creates RX (receive) federation stream on the target instance
- Exchanges SDP offer/answer between the two streams
- Registers the stream in the source manager
// Request goes to the instance hosting the uploader
return requestCreateStreamWithOriginStream.call(this, createFederationTx)
.then(createFederationTxResponse => {
// ... setup TX side ...
const createFederationRx = {
applicationId,
sessionId,
capabilities: _.union(federatedCapabilities, capabilitiesRx),
// ...
setRemoteDescription: { streamId: '', sessionDescription, /* ... */ },
createAnswerDescription: { streamId: '', options: ['upload', 'skip-set-local-description'], /* ... */ }
};
// Place the federated stream on the selected instance
return Promise.all([
this._phenixMQ.requestDirect(instance.routeKey, 'pcast.CreateStream', createFederationRx),
registerStream.call(this, /* ... */)
])
// ... complete SDP exchange ...
Summary Flow Diagram
API Request (PUT /channel/:channelId/fork/:sourceChannelId)
│
▼
ChannelApiRoute.forkChannel()
│
▼
MQ: roomapi.Fork
│
▼
RoomApiMQWorker.roomapiFork()
│
├─► Get members from destination room
├─► Get members from source room
│
├─► For each departing member:
│ └─► chat.LeaveRoom
│ └─► stream.DestroyStream (unless keep-streams)
│
└─► For each new member:
├─► For each stream:
│ └─► stream.CreateMirror
│ └─► StreamManagementMQWorker.streamCreateMirror()
│ └─► createScaleOutStream()
│ ├─► Create TX federation stream on origin
│ ├─► Create RX federation stream on target instance
│ └─► Exchange SDP offer/answer
│
└─► chat.JoinRoom (with updated stream URIs)
The result is that viewers in the destination channel now receive the streams from the source channel's publishers, via new mirrored/federated streams.
function forkRoom(req, res, credentials) {
const roomId = req.params.roomId;
const roomIdToFork = req.params.roomIdToFork;
const streamCapabilities = _.get(req.body, ['streamCapabilities'], []);
const streamTags = _.get(req.body, ['streamTags'], []);
const options = _.get(req.body, ['options'], []);
const desiredRegion = _.get(req.body, ['desiredRegion'], undefined);
// ... validation ...
const forkRoom = {
credentials,
roomId,
sourceRoomId: roomIdToFork,
streamCapabilities,
streamTags,
options,
desiredRegion
};
function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertStringNotEmpty('sourceRoomId', sourceRoomId);
assert.assertArrayOfString('options', options);
assert.assertArrayOfString('streamCapabilities', streamCapabilities);
assert.assertArrayOfString('streamTags', streamTags);
// ...
log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options);
return Promise.all([
roomapiGetMembers.call(this, {roomId}, {sentTime}),
roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime})
])
let departingMembers = room.members;
let newMembers = sourceRoom.members;
if (_.includes(options, 'additive')) {
departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
}
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
return Promise.map(departingMembers, member => {
const leaveRoom = {
roomId,
sessionId: member.sessionId,
timestamp: _.now()
};
return this._phenixMQ.request('chat.LeaveRoom', leaveRoom)
.then(leaveRoomResponse => {
// ...
if (_.includes(options, 'keep-streams')) {
return leaveRoomResponse;
}
// ...
return Promise.each(member.streams, stream => {
// ...
const reason = 'terminated-by-fork';
const destroyStream = { streamId, reason };
return this._phenixMQ.request('stream.DestroyStream', destroyStream)
})
});
})
return Promise.map(joinRoom.member.streams, stream => {
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
// Only provide capabilities list. In particular, drop wildcard token from origin.
parsedUri.query = {capabilities: streamCapabilities.join(',')};
parsedUri.search = undefined;
const originStreamId = parsedUri.pathname.replace(/%23/g, '#')
.slice(1);
const streamMirror = {
originStreamId,
capabilities: _.compact(streamCapabilities),
tags: _.union(_.compact(streamTags), ['x-phenix-fork']),
options: [],
region: desiredRegion,
canForward: false
};
// Add room/channel tags
if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') {
streamMirror.tags.push(`channelId:${roomId}`);
streamMirror.tags.push(`channelAlias:${room.room.alias}`);
} else {
streamMirror.tags.push(`roomId:${roomId}`);
streamMirror.tags.push(`roomAlias:${room.room.alias}`);
}
return this._phenixMQ.request('stream.CreateMirror', streamMirror)
const streamId = createMirrorResponse.payload.mirroredStreamId;
// ...
parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23');
stream.uri = url.format(parsedUri)
.replace(/%23/g, '#');
return this._phenixMQ.request('chat.JoinRoom', joinRoom);
function streamCreateMirror({originStreamId, capabilities, tags, options, region, canForward}) {
assert.assertStringNotEmpty('originStreamId', originStreamId);
assert.assertArrayOfString('tags', tags);
if (canForward && region && region !== this._regionName) {
// Forward to the desired region
return this._phenixMQ.request('stream.CreateMirror', {
originStreamId,
capabilities,
tags,
options,
region,
canForward: false
}, {region})
.then(({status, payload}) => {
if (status !== 'ok') {
return {status};
}
return payload;
});
}
// Request goes to the instance hosting the uploader
return requestCreateStreamWithOriginStream.call(this, createFederationTx)
.then(createFederationTxResponse => {
// ... setup TX side ...
const createFederationRx = {
applicationId,
sessionId,
capabilities: _.union(federatedCapabilities, capabilitiesRx),
// ...
setRemoteDescription: { streamId: '', sessionDescription, /* ... */ },
createAnswerDescription: { streamId: '', options: ['upload', 'skip-set-local-description'], /* ... */ }
};
// Place the federated stream on the selected instance
return Promise.all([
this._phenixMQ.requestDirect(instance.routeKey, 'pcast.CreateStream', createFederationRx),
registerStream.call(this, /* ... */)
])
// ... complete SDP exchange ...
API Request (PUT /channel/:channelId/fork/:sourceChannelId)
│
▼
ChannelApiRoute.forkChannel()
│
▼
MQ: roomapi.Fork
│
▼
RoomApiMQWorker.roomapiFork()
│
├─► Get members from destination room
├─► Get members from source room
│
├─► For each departing member:
│ └─► chat.LeaveRoom
│ └─► stream.DestroyStream (unless keep-streams)
│
└─► For each new member:
├─► For each stream:
│ └─► stream.CreateMirror
│ └─► StreamManagementMQWorker.streamCreateMirror()
│ └─► createScaleOutStream()
│ ├─► Create TX federation stream on origin
│ ├─► Create RX federation stream on target instance
│ └─► Exchange SDP offer/answer
│
└─► chat.JoinRoom (with updated stream URIs)
User
what happens if a second fork requests comes in for the same source and destination channels
Cursor
let departingMembers = room.members;
let newMembers = sourceRoom.members;
if (_.includes(options, 'additive')) {
departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
}
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
Let me look at how duplicate fork requests are handled:
Great question! Here's what happens when a second fork request comes in for the same source and destination channels:
Default Behavior (No force Option)
The key logic is in the member filtering:
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
What happens:
-
newMembersis filtered to exclude any member from the source room whosesessionIdalready exists in the destination room -
Result: If the first fork already moved members A, B, C from source → destination, a second fork will find that A, B, C already exist in the destination room and filter them out of
newMembers -
No duplicate streams created - The second fork becomes essentially a no-op if all members are already present
-
Returns
status: 'ok'with the current members of the destination room
With force Option
If the second fork includes options: ['force']:
if (!_.includes(options, 'force')) {
// This filtering is SKIPPED with 'force' option
departingMembers = ...
newMembers = ...
}
What happens:
-
No filtering occurs - All source room members are treated as "new members"
-
For each member's streams, a new
stream.CreateMirroris called, creating additional mirrored streams for the same origin -
reenteroption is used when joining:
options: ['reenter'],
This allows the member to rejoin, updating their member record (including the new stream URIs):
if (_.includes(options, 'reenter')) {
actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey)
.then(removed => {
if (removed) {
log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role);
if (member.role === 'Audience') {
return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]);
}
}
}));
}
- Result: The member's streams are replaced with new mirrored streams. The old mirrored streams become orphaned (they'll eventually clean up when they detect no subscribers, but they're not explicitly destroyed)
Race Conditions
If two fork requests arrive simultaneously (before either completes):
| Step | Fork Request 1 | Fork Request 2 |
|---|---|---|
| 1 | Gets members (dest empty) | Gets members (dest empty) |
| 2 | Sees member A as "new" | Sees member A as "new" |
| 3 | Creates mirror stream M1 | Creates mirror stream M2 |
| 4 | Joins member A with M1 | Joins member A with M2 |
| 5 | Member A now has stream M2 | (overwrites M1) |
Result:
- Member A ends up with only M2 (the last one to write wins)
- M1 becomes an orphaned stream that will eventually be cleaned up
- No explicit protection against this race exists - it relies on the eventual consistency of the system
Summary
| Scenario | Departing Members | New Members | Streams Created | Member State |
|---|---|---|---|---|
| 2nd fork (default) | None (already departed) | None (already exist) | None | Unchanged |
2nd fork (force) |
All non-overlapping | All source members | New mirrors for all | Updated with new stream URIs |
| Concurrent forks | Race condition | Race condition | Multiple (orphaned) | Last write wins |
The system is designed to be idempotent by default - calling fork twice without force safely does nothing the second time.
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
if (!_.includes(options, 'force')) {
// This filtering is SKIPPED with 'force' option
departingMembers = ...
newMembers = ...
}
options: ['reenter'],
if (_.includes(options, 'reenter')) {
actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey)
.then(removed => {
if (removed) {
log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role);
if (member.role === 'Audience') {
return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]);
}
}
}));
}
User
what happens if a stream is terminated in the destination channel that's still active in the sourec channel
Cursor
Let me investigate how stream termination is handled and whether it cascades back to the source:
Now I have the complete picture. Here's what happens when a mirrored stream in the destination channel is terminated, while the source stream is still active:
Stream Relationship Architecture
When you fork a channel, here's the stream hierarchy:
Source Channel:
└── Publisher's Origin Stream (streamId: "origin-123")
│
├── [TX Federation Stream] ──────────────────────┐
│ (outgoing, on same instance as origin) │
│ │ WebRTC connection
│ │
Destination Channel: │
└── [RX Federation Stream / Mirrored Stream] ◄─────────┘
(streamId: "mirror-456", upstreamId: "origin-123")
What Happens When the Mirrored Stream Ends
1. Source Stream is NOT Terminated
The termination of a mirrored (federated) stream does not cascade back to the source. Looking at StreamEventMQWorker.js:
if (stream.internal && _.includes(stream.options, optionFederated)) {
if (!_.isString(stream.upstreamId)) {
log.error('[%s] Internal streams must have an upstream id', streamId);
return;
}
return this._streamSourceManager.unregisterActiveFederatedStream(stream.upstreamId, stream.region, streamId)
.then(removed => {
if (removed) {
log.info('[%s] Federated stream for stream [%s] ended', streamId, stream.upstreamId);
}
});
}
It only unregisters the federated stream from the source manager - it does NOT destroy the upstream/origin stream.
2. The TX Side Federation Stream Ends
When the RX (receiving) side ends, the TX (transmitting) side gets cleaned up via the WebRTC connection teardown:
if (stream.originStreamId) {
const originStream = this._streams.get(stream.originStreamId);
if (originStream) {
originStream.subscribers.delete(stream.streamId);
if (_.includes(originStream.options, optionIdleNotification) && originStream.subscribers.count() === 0) {
return this._phenixMQ.publish('pcast.StreamIdle', {
timestamp: Date.now(),
sessionId: originStream.sessionId,
streamId: originStream.streamId,
options: originStream.options,
tags: originStream.tags
});
}
}
}
- The ended stream is removed from the origin's
subscribersset - If no subscribers remain and
idleNotificationis enabled, aStreamIdleevent is published (but the origin still lives)
3. Member Still Exists in Destination Channel
The member record in the destination channel is not automatically updated. The member's streams array still contains the now-dead stream URI.
This means:
- Subscribers in the destination channel trying to use that stream URI will get
stream-endedornot-founderrors - The member appears to have a stream, but it's not actually active
Recovery Options
Option A: Fork Again
Call fork again with the same source/destination. Without force:
- Members already in destination are filtered out
- Nothing happens - the stale member record remains
With force:
- New mirrored streams are created
- Member record is updated with new stream URIs via
reenter
Option B: Republish/Re-mirror Manually
The source stream is still alive, so you could:
- Call
stream.CreateMirroragain for the origin stream - Update the member's streams in the destination room
Option C: Source Stream Terminates All Federated Copies
If the source stream ends, it cascades DOWN to terminate all federated copies:
function terminateFederatedStreams(streamId, reason, stream) {
// ...
return Promise.all(this._activeRegions)
.each(region => this._streamSourceManager.getActiveAndIdleFederatedStreams(streamId, region)
.each(destroyStream)
// ...
.then(() => {
// Also scan for child streams by parentStreamId
return this._keyValueStore.scan('streams', 'destination', onData, {
bins: ['streamId'],
query: {parentStreamId: streamId},
// ...
})
.then(() => streams)
.each(destroyStream);
});
}
This is one-directional: Source → Federated copies, NOT the reverse.
Summary Diagram
If Mirrored Stream in Destination Ends:
Source Channel Destination Channel
┌────────────────────┐ ┌────────────────────┐
│ Origin Stream │ │ Mirrored Stream │
│ (still active ✓) │ │ (TERMINATED ✗) │
└────────────────────┘ └────────────────────┘
│ │
│ ▼
│ Federation registration
│ removed from SourceManager
│
▼
TX federation stream Member record in room
is cleaned up (no cascade) still has stale stream URI
| Component | State After Mirrored Stream Ends |
|---|---|
| Source stream | Still active |
| TX federation stream | Cleaned up |
| RX mirrored stream | Destroyed |
| Member in destination | Still present, but with stale stream URIs |
| Viewers in destination | Will fail to subscribe |
Source Channel:
└── Publisher's Origin Stream (streamId: "origin-123")
│
├── [TX Federation Stream] ──────────────────────┐
│ (outgoing, on same instance as origin) │
│ │ WebRTC connection
│ │
Destination Channel: │
└── [RX Federation Stream / Mirrored Stream] ◄─────────┘
(streamId: "mirror-456", upstreamId: "origin-123")
if (stream.internal && _.includes(stream.options, optionFederated)) {
if (!_.isString(stream.upstreamId)) {
log.error('[%s] Internal streams must have an upstream id', streamId);
return;
}
return this._streamSourceManager.unregisterActiveFederatedStream(stream.upstreamId, stream.region, streamId)
.then(removed => {
if (removed) {
log.info('[%s] Federated stream for stream [%s] ended', streamId, stream.upstreamId);
}
});
}
if (stream.originStreamId) {
const originStream = this._streams.get(stream.originStreamId);
if (originStream) {
originStream.subscribers.delete(stream.streamId);
if (_.includes(originStream.options, optionIdleNotification) && originStream.subscribers.count() === 0) {
return this._phenixMQ.publish('pcast.StreamIdle', {
timestamp: Date.now(),
sessionId: originStream.sessionId,
streamId: originStream.streamId,
options: originStream.options,
tags: originStream.tags
});
}
}
}
function terminateFederatedStreams(streamId, reason, stream) {
// ...
return Promise.all(this._activeRegions)
.each(region => this._streamSourceManager.getActiveAndIdleFederatedStreams(streamId, region)
.each(destroyStream)
// ...
.then(() => {
// Also scan for child streams by parentStreamId
return this._keyValueStore.scan('streams', 'destination', onData, {
bins: ['streamId'],
query: {parentStreamId: streamId},
// ...
})
.then(() => streams)
.each(destroyStream);
});
}
If Mirrored Stream in Destination Ends:
Source Channel Destination Channel
┌────────────────────┐ ┌────────────────────┐
│ Origin Stream │ │ Mirrored Stream │
│ (still active ✓) │ │ (TERMINATED ✗) │
└────────────────────┘ └────────────────────┘
│ │
│ ▼
│ Federation registration
│ removed from SourceManager
│
▼
TX federation stream Member record in room
is cleaned up (no cascade) still has stale stream URI
User
is it possible to detect the stale member stream and update it's source stream uri only when it' stale?
Cursor
Good question! Let me explore what mechanisms exist for detecting stale streams and potential solutions:
Great question! This is something that doesn't exist automatically today, but there are building blocks that could enable it. Let me explain the current state and potential solutions:
Current Detection Capabilities
1. Stream Existence Check
When subscribing to a stream, the system checks if it exists:
return this._streamSourceManager.getSourceStreamInfo(createStream.originStreamId)
.then(originStream => {
if (!originStream) {
return {status: 'origin-not-found'};
}
Result: Subscribers get origin-not-found or origin-stream-ended when trying to subscribe to a stale stream.
2. Forked Streams Are Tagged
Mirrored streams created by fork have a special tag:
tags: _.union(_.compact(streamTags), ['x-phenix-fork']),
3. Upstream ID is Stored
The mirrored stream stores a reference to its source:
this._keyValueStore.store('streams', 'source', federatedStreamResult.streamId, {upstreamId: originStreamId})
What Doesn't Exist Today
There's no automatic mechanism that:
- Periodically validates member stream URIs
- Detects stale forked streams
- Auto-recreates mirrors from the original source
The member record simply holds stream URIs and doesn't actively validate them.
Potential Solutions
Option A: Lazy Validation on GetMembers / GetStreams
You could modify roomapiGetMembers or roomapiGetStreams to validate streams:
// Pseudocode - not currently implemented
function roomapiGetMembersWithValidation({roomId}, {sentTime}, credentials) {
return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials)
.then(({status, room, members}) => {
if (status !== 'ok') return {status};
return Promise.map(members, member => {
return Promise.map(member.streams, stream => {
const streamId = extractStreamIdFromUri(stream.uri);
return this._streamSourceManager.getSourceStreamInfo(streamId)
.then(sourceInfo => {
if (sourceInfo) {
return stream; // Stream is still valid
}
// Stream is stale - check if it was a fork
return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId'])
.then(data => {
const upstreamId = data?.upstreamId;
if (!upstreamId) return null; // Can't recover
// Check if upstream still exists
return this._streamSourceManager.getSourceStreamInfo(upstreamId)
.then(upstreamInfo => {
if (!upstreamInfo) return null; // Source also gone
// Re-create mirror from upstream
return this._phenixMQ.request('stream.CreateMirror', {
originStreamId: upstreamId,
capabilities: extractCapabilitiesFromUri(stream.uri),
tags: ['x-phenix-fork', `channelId:${roomId}`],
// ...
})
.then(mirrorResult => {
if (mirrorResult.status !== 'ok') return null;
// Update stream URI and member record
stream.uri = buildNewUri(mirrorResult.mirroredStreamId);
return stream;
});
});
});
});
})
.then(validatedStreams => {
member.streams = _.compact(validatedStreams);
return member;
});
})
.then(validatedMembers => ({status: 'ok', room, members: validatedMembers}));
});
}
Pros: Transparent to clients, streams auto-heal
Cons: Adds latency to GetMembers calls, complexity
Option B: Background Stream Health Worker
Add a worker that periodically validates forked streams:
// Pseudocode - not currently implemented
function validateForkedStreams(roomId) {
return roomapiGetMembers({roomId})
.then(({members}) => {
return Promise.map(members, member => {
return Promise.map(member.streams, stream => {
if (!hasTag(stream, 'x-phenix-fork')) return; // Skip non-forked
const streamId = extractStreamIdFromUri(stream.uri);
return this._streamSourceManager.getSourceStreamInfo(streamId)
.then(info => {
if (info) return; // Still valid
// Stale - trigger re-mirror
return remirrorStream(member, stream, roomId);
});
});
});
});
}
// Run periodically
scheduler.schedule(validateForkedStreams, interval);
Pros: Doesn't impact normal request latency
Cons: Stale period between checks, more infrastructure
Option C: Event-Driven Re-mirroring
Listen for stream end events and proactively re-mirror:
// In StreamEventMQWorker.js
function processSourceStreamEnded({streamId, reason, tags}) {
// Existing code...
// NEW: If this was a forked stream, try to re-mirror
if (_.includes(tags, 'x-phenix-fork')) {
return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId'])
.then(data => {
if (!data?.upstreamId) return;
// Find which room this was in
const roomTag = _.find(tags, t => t.startsWith('channelId:') || t.startsWith('roomId:'));
if (!roomTag) return;
const roomId = roomTag.split(':')[1];
// Re-mirror and update member
return remirrorAndUpdateMember(roomId, streamId, data.upstreamId);
});
}
}
Pros: Real-time recovery, minimal latency
Cons: Complexity, race conditions, need to update member records
Option D: Client-Side Retry with Fallback
Expose the upstreamId in stream metadata so clients can retry:
// Client-side logic
function subscribeToStream(streamUri) {
return sdk.subscribe(streamUri)
.catch(error => {
if (error.status === 'origin-not-found') {
// Fetch upstream and retry
return getUpstreamId(streamUri)
.then(upstreamId => {
if (upstreamId) {
return sdk.subscribe(buildUri(upstreamId));
}
throw error;
});
}
throw error;
});
}
Pros: No backend changes needed
Cons: Exposes internal details, client complexity
Recommended Approach
For minimal changes with good results, I'd suggest Option C (Event-Driven) combined with storing more metadata:
-
Store room/channel context in the forked stream record:
this._keyValueStore.store('streams', 'source', mirroredStreamId, { upstreamId: originStreamId, roomId: roomId, memberId: member.sessionId }); -
On fork stream end, check if upstream is alive and re-mirror:
if (_.includes(tags, 'x-phenix-fork') && reason !== 'origin-stream-ended') { // Only re-mirror if the source stream is still alive attemptRemirror(streamId); } -
Update member record with new stream URI
Would you like me to draft a more detailed implementation for any of these approaches?
return this._streamSourceManager.getSourceStreamInfo(createStream.originStreamId)
.then(originStream => {
if (!originStream) {
return {status: 'origin-not-found'};
}
tags: _.union(_.compact(streamTags), ['x-phenix-fork']),
this._keyValueStore.store('streams', 'source', federatedStreamResult.streamId, {upstreamId: originStreamId})
// Pseudocode - not currently implemented
function roomapiGetMembersWithValidation({roomId}, {sentTime}, credentials) {
return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials)
.then(({status, room, members}) => {
if (status !== 'ok') return {status};
return Promise.map(members, member => {
return Promise.map(member.streams, stream => {
const streamId = extractStreamIdFromUri(stream.uri);
return this._streamSourceManager.getSourceStreamInfo(streamId)
.then(sourceInfo => {
if (sourceInfo) {
return stream; // Stream is still valid
}
// Stream is stale - check if it was a fork
return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId'])
.then(data => {
const upstreamId = data?.upstreamId;
if (!upstreamId) return null; // Can't recover
// Check if upstream still exists
return this._streamSourceManager.getSourceStreamInfo(upstreamId)
.then(upstreamInfo => {
if (!upstreamInfo) return null; // Source also gone
// Re-create mirror from upstream
return this._phenixMQ.request('stream.CreateMirror', {
originStreamId: upstreamId,
capabilities: extractCapabilitiesFromUri(stream.uri),
tags: ['x-phenix-fork', `channelId:${roomId}`],
// ...
})
.then(mirrorResult => {
if (mirrorResult.status !== 'ok') return null;
// Update stream URI and member record
stream.uri = buildNewUri(mirrorResult.mirroredStreamId);
return stream;
});
});
});
});
})
.then(validatedStreams => {
member.streams = _.compact(validatedStreams);
return member;
});
})
.then(validatedMembers => ({status: 'ok', room, members: validatedMembers}));
});
}
// Pseudocode - not currently implemented
function validateForkedStreams(roomId) {
return roomapiGetMembers({roomId})
.then(({members}) => {
return Promise.map(members, member => {
return Promise.map(member.streams, stream => {
if (!hasTag(stream, 'x-phenix-fork')) return; // Skip non-forked
const streamId = extractStreamIdFromUri(stream.uri);
return this._streamSourceManager.getSourceStreamInfo(streamId)
.then(info => {
if (info) return; // Still valid
// Stale - trigger re-mirror
return remirrorStream(member, stream, roomId);
});
});
});
});
}
// Run periodically
scheduler.schedule(validateForkedStreams, interval);
// In StreamEventMQWorker.js
function processSourceStreamEnded({streamId, reason, tags}) {
// Existing code...
// NEW: If this was a forked stream, try to re-mirror
if (_.includes(tags, 'x-phenix-fork')) {
return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId'])
.then(data => {
if (!data?.upstreamId) return;
// Find which room this was in
const roomTag = _.find(tags, t => t.startsWith('channelId:') || t.startsWith('roomId:'));
if (!roomTag) return;
const roomId = roomTag.split(':')[1];
// Re-mirror and update member
return remirrorAndUpdateMember(roomId, streamId, data.upstreamId);
});
}
}
// Client-side logic
function subscribeToStream(streamUri) {
return sdk.subscribe(streamUri)
.catch(error => {
if (error.status === 'origin-not-found') {
// Fetch upstream and retry
return getUpstreamId(streamUri)
.then(upstreamId => {
if (upstreamId) {
return sdk.subscribe(buildUri(upstreamId));
}
throw error;
});
}
throw error;
});
}
this._keyValueStore.store('streams', 'source', mirroredStreamId, {
upstreamId: originStreamId,
roomId: roomId,
memberId: member.sessionId
});
if (_.includes(tags, 'x-phenix-fork') && reason !== 'origin-stream-ended') {
// Only re-mirror if the source stream is still alive
attemptRemirror(streamId);
}
User
The user is calling fork every 10 seconds, if the streams are OK then nothing happens, but the intention is for the streams to get replaced if they've gone stale
Cursor
Ah, I see the issue now! The current fork logic has a gap:
Without force: Members already in destination are skipped entirely (including their stale streams)
With force: ALL streams get replaced, even healthy ones (wasteful, creates orphans)
What you need is a "smart fork" that only replaces stale streams. Let me look at where this logic could be added:
I see there's already a refresh-stale option implemented in the code! Let me verify this is actual code and check if _streamSourceManager is available:
I see the problem! The refresh-stale code uses this._streamSourceManager but RoomApiMQWorker doesn't have access to it. Looking at line 36:
init(phenixMQ, keyValueStore, regionService, roomManager) {
// ...
this._phenixMQ = phenixMQ;
this._keyValueStore = keyValueStore;
this._regionService = regionService;
this._roomManager = roomManager;
},
The StreamSourceManager isn't injected. Let me check if RoomManager has it:
init(phenixMQ, keyValueStore, regionService, roomManager) {
// ...
this._phenixMQ = phenixMQ;
this._keyValueStore = keyValueStore;
this._regionService = regionService;
this._roomManager = roomManager;
},
RoomManager has _streamSourceManager. The refresh-stale code in RoomApiMQWorker needs to either:
- Access it via
this._roomManager._streamSourceManager(hacky) - Add a method to
RoomManagerto check stream health - Inject
StreamSourceManagerdirectly intoRoomApiMQWorker - Use
keyValueStoredirectly to check stream existence
Here's how you can fix the current implementation - Option 4 is the least invasive since you already have _keyValueStore:
/**
* Copyright 2025 Phenix Real Time Solutions, Inc. Confidential and Proprietary. All Rights Reserved.
*/
define([
'Promise',
'lodash',
'logging/logger',
'lang/assert',
'moment',
'url',
'randomstring',
'lang/Disposable',
'lang/DisposableList',
'mq/PhenixMQ',
'mq/protocol/MQProtocol',
'mq/IMQWorker',
'mq/MQWorker',
'mq/MQWorkerServices',
'mq/MQWorkerService',
'store/kv/IKeyValueStore',
'cloud/RegionService',
'pcast/chat/RoomManager'
], (Promise, _, logger, assert, moment, url, randomstring, Disposable, DisposableList, PhenixMQ, MQProtocol, IMQWorker, MQWorker, MQWorkerServices, MQWorkerService, IKeyValueStore, RegionService, RoomManager) => {
'use strict';
const log = logger.getLogger('pcast/chat/RoomApiMQWorker');
const roomTypes = ['DirectChat', 'MultiPartyChat', 'ModeratedChat', 'TownHall', 'Channel', 'MultiAngleChannel'];
const streamKeyLength = 96;
const roomSuffixLength = 12;
const roomAttributes = ['roomId', 'alias', 'applicationId', 'name', 'description', 'type', 'streamKey', 'created', 'lastUpdate', 'options', 'ingest'];
const roomIdRegex = /[^#]+#[^#]+#.*/;
const roomKeepAliveInterval = moment.duration(1, 'year');
const namespace = 'roomapi';
const RoomApiMQWorker = IMQWorker.extend({
init(phenixMQ, keyValueStore, regionService, roomManager) {
assert.assertInstance('phenixMQ', phenixMQ, PhenixMQ);
assert.assertInstance('keyValueStore', keyValueStore, IKeyValueStore);
assert.assertInstance('regionService', regionService, RegionService);
assert.assertInstance('roomManager', roomManager, RoomManager);
this._worker = new MQWorker(namespace, phenixMQ, new MQWorkerServices({
anycast: [
new MQWorkerService('roomapi.Create', proceedIfAuthenticated.bind(this, roomapiCreate.bind(this), {})),
new MQWorkerService('roomapi.Get', proceedIfAuthenticated.bind(this, roomapiGet.bind(this), {})),
new MQWorkerService('roomapi.GetMembers', proceedIfAuthenticated.bind(this, roomapiGetMembers.bind(this), {allowUnauthorized: true}), {
concurrency: 16,
maximumRatePerSecond: 4,
dispatchTimeInterval: moment.duration(1, 'minutes')
}),
new MQWorkerService('roomapi.GetStreams', proceedIfAuthenticated.bind(this, roomapiGetStreams.bind(this), {allowUnauthorized: true}), {maximumRatePerSecond: 32}),
new MQWorkerService('roomapi.Delete', proceedIfAuthenticated.bind(this, roomapiDelete.bind(this), {})),
new MQWorkerService('roomapi.List', proceedIfAuthenticated.bind(this, roomapiList.bind(this), {}), {
maximumRatePerSecond: 32,
concurrency: 1
}),
new MQWorkerService('roomapi.Fork', proceedIfAuthenticated.bind(this, roomapiFork.bind(this), {})),
new MQWorkerService('roomapi.Kill', proceedIfAuthenticated.bind(this, roomapiKill.bind(this), {}))
]
}));
this._phenixMQ = phenixMQ;
this._keyValueStore = keyValueStore;
this._regionService = regionService;
this._roomManager = roomManager;
},
processRequest(type, message, context) {
return this._worker.processRequest(type, message, context);
},
start() {
return Promise.try(() => start.call(this));
},
getLoadInfo() {
return Promise.try(() => getLoadInfo.call(this));
},
drain() {
return this._worker.drain();
},
stop() {
return this._worker.stop();
},
toString() {
return `RoomApiMQWorker[${this._worker}]`;
}
});
function start() {
return Promise.try(() => this._keyValueStore.createIndex('idx_room_applicationId', 'chat', 'rooms', 'applicationId', 'String'))
.then(() => this._worker.start());
}
function getCredentials(message) {
if (message.credentials) {
return message.credentials;
}
return {
applicationId: message.applicationId,
secret: message.secret
};
}
function proceedIfAuthenticated(next, {allowUnauthorized = false}, message, context) {
assert.assertFunction('next', next);
assert.assertBoolean('allowUnauthorized', allowUnauthorized);
const credentials = getCredentials.call(this, message);
if (allowUnauthorized && !credentials.applicationId) {
return next(message, context, null);
}
return authenticate.call(this, credentials.applicationId, credentials.secret)
.then(isAuthenticated => {
if (!isAuthenticated) {
return {status: 'unauthorized'};
}
return next(message, context, credentials);
});
}
function authenticate(applicationId, secret) {
if (_.isEmpty(applicationId)) {
return false;
}
return this._keyValueStore.fetch('applications', 'definitions', applicationId, ['secret'])
.catch(e => {
log.warn('[%s] Failed to fetch applications definitions', applicationId, e);
return false;
})
.then(application => {
if (!application || application.secret !== secret) {
return false;
}
return true;
});
}
function roomapiCreate({room}, context, {applicationId}) {
assert.assertObject('room', room);
assert.assertStringNotEmpty('applicationId', applicationId);
if (room.alias) {
assert.assertStringNotEmpty('room.alias', room.alias);
}
assert.assertString('room.name', room.name);
assert.assertString('room.description', room.description);
assert.assertEnum('room.type', room.type, roomTypes);
assert.assertArrayOfString('room.options', room.options);
if (room.name === '') {
log.info('[%s] Invalid create room request with empty room.name', applicationId);
return {status: 'name-required'};
}
if (!_.isUndefined(_.find(room.options, option => _.isEmpty(option)))) {
return {status: 'invalid-option'};
}
const alias = applicationId + '#' + (room.alias || _.camelCase(room.name));
return this._roomManager.resolveRoomByAlias(alias, roomAttributes, {useCache: false})
.then(record => {
if (record) {
if (record.type !== room.type) {
return {status: 'type-conflict'};
}
return {
status: 'already-exists',
room: {
applicationId: record.applicationId,
roomId: record.roomId,
alias: convertFromInternalAlias.call(this, record.alias),
name: record.name,
description: record.description,
type: record.type,
options: record.options,
ingestOptions: record.ingest,
streamKey: record.streamKey,
created: record.created,
lastUpdate: record.lastUpdate
}
};
}
const streamKeyOption = room.options.find(option => option.startsWith('streamKey='))
?.substring('streamKey='.length);
const streamKey = streamKeyOption || randomstring.generate(streamKeyLength);
const roomId = this._regionService.getOwnRegion() + '#' + applicationId + '#' + _.camelCase(room.name) + '.' + randomstring.generate(roomSuffixLength);
const now = _.now();
log.info('[%s] Creating room [%s] with alias [%s]', applicationId, roomId, alias);
const storedRoom = {
applicationId,
roomId,
alias,
name: room.name,
description: room.description,
type: room.type,
options: room.options,
streamKey,
messages: 0,
created: now,
lastUpdate: now
};
if (room.ingestOptions) {
storedRoom.ingest = room.ingestOptions;
}
return this._keyValueStore.store('chat', 'rooms', roomId, storedRoom, {ttl: roomKeepAliveInterval})
.then(() => ({
status: 'ok',
room: {
applicationId: storedRoom.applicationId,
roomId: storedRoom.roomId,
alias: convertFromInternalAlias.call(this, storedRoom.alias),
name: storedRoom.name,
description: storedRoom.description,
type: storedRoom.type,
options: storedRoom.options,
ingestOptions: storedRoom.ingest,
streamKey: storedRoom.streamKey,
created: storedRoom.created,
lastUpdate: storedRoom.lastUpdate
}
}));
});
}
function getSize(roomId) {
assert.assertStringNotEmpty('roomId', roomId);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
const getRoomParticipantInfo = {roomId};
return this._phenixMQ.request('chat.GetRoomParticipantInfo', getRoomParticipantInfo)
.then(result => {
const payload = result.payload;
const status = _.get(payload, ['status'], 'failed');
if (_.get(result, ['status']) !== 'ok') {
return {status};
}
assert.assertArray('payload.counts', payload.counts);
const size = _.reduce(payload.counts, (total, participantCount) => total + participantCount.count, 0);
return {
status,
size
};
});
}
function roomapiGet({roomId}, context, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertStringNotEmpty('applicationId', applicationId);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
return this._keyValueStore.fetch('chat', 'rooms', roomId, roomAttributes)
.then(room => {
if (!room) {
return {status: 'not-found'};
}
if (room.applicationId !== applicationId) {
return {status: 'unauthorized'};
}
return getSize.call(this, room.roomId)
.then(getSizeResponse => {
if (getSizeResponse.status !== 'ok') {
log.info('[%s] [%s] Fetching room size failed with response [%j]', applicationId, roomId, getSizeResponse);
return {status: _.get(getSizeResponse, ['status'], 'failed')};
}
assert.assertNumber('getSizeResponse.size', getSizeResponse.size);
return {
status: 'ok',
room: {
applicationId: room.applicationId,
roomId: room.roomId,
alias: convertFromInternalAlias.call(this, room.alias),
name: room.name,
description: room.description,
type: room.type,
options: room.options,
ingestOptions: room.ingest,
streamKey: room.streamKey,
created: room.created,
lastUpdate: room.lastUpdate,
estimatedSize: getSizeResponse.size
}
};
});
})
.catch({code: 'invalid-cluster'}, () => ({status: 'not-found'}));
}
function roomapiGetMembers({roomId}, {sentTime}, credentials) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertMoment('sentTime', sentTime);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
const members = {};
const onData = record => {
members[record.sessionId] = record;
};
return Promise.all([
this._keyValueStore.fetch('chat', 'rooms', roomId, roomAttributes),
this._keyValueStore.scan('members', 'room', onData, {
bins: ['sessionId', 'screenName', 'role', 'streams', 'state', 'lastUpdate'],
query: {roomId},
fromAllClusters: true,
reentrant: true,
offload: false
})
])
.spread(room => {
if (!room) {
return {status: 'not-found'};
}
if (credentials && credentials.applicationId !== room.applicationId) {
return {status: 'unauthorized'};
}
return {
status: 'ok',
room: {
applicationId: room.applicationId,
roomId: room.roomId,
alias: convertFromInternalAlias.call(this, room.alias),
name: room.name,
description: room.description,
type: room.type,
options: room.options,
ingestOptions: room.ingest,
created: room.created,
lastUpdate: room.lastUpdate
},
members: _.values(members)
};
})
.catch({code: 'invalid-cluster'}, () => ({status: 'not-found'}));
}
function roomapiGetStreams({roomId, role, state, screenName, capability}, {sentTime}, credentials) {
assert.assertMoment('sentTime', sentTime);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials)
.then(({status, room, members}) => {
if (status !== 'ok') {
return {status};
}
assert.assertArrayOfObject('members', members);
assert.assertObject('room', room);
let candidateMembers = members;
if (role) {
candidateMembers = _.filter(candidateMembers, {role});
}
if (state) {
candidateMembers = _.filter(candidateMembers, {state});
}
if (screenName) {
candidateMembers = _.filter(candidateMembers, {screenName});
}
candidateMembers = _.sortBy(candidateMembers, ['lastUpdate']);
let streamUris = _.reduce(candidateMembers, (streams, member) => {
_.forEach(member.streams, ({uri}) => {
streams.push(uri);
});
return streams;
}, []);
if (capability) {
streamUris = _.filter(streamUris, streamUri => {
switch (capability) {
case 'RealTime':
return true;
case 'RTMP':
return _.includes(streamUri, 'capabilities=') && _.includes(streamUri, 'rtmp');
case 'HLS':
case 'DASH':
return (_.includes(streamUri, 'capabilities=') && _.includes(streamUri, 'streaming')) || _.includes(streamUri, 'streamTokenForLiveStream');
default:
return true;
}
});
}
const streamIds = _.map(streamUris, streamUri => {
const parsedStreamUri = url.parse(streamUri.replace(/[#]/g, '%23'));
const streamId = parsedStreamUri.pathname.replace(/%23/g, '#')
.slice(1);
return streamId;
});
return {
status: 'ok',
room,
streamIds
};
});
}
function roomapiList({type} /* , order, limit, cursor */, context, {applicationId}) {
assert.assertStringNotEmpty('type', type);
assert.assertStringNotEmpty('applicationId', applicationId);
const rooms = [];
const dataCallback = room => {
if (room.type === type) {
rooms.push(room);
}
};
return this._keyValueStore.scan('chat', 'rooms', dataCallback, {
bins: roomAttributes,
query: {applicationId},
fromAllClusters: true,
reentrant: true,
offload: true
})
.then(() => ({
status: 'ok',
rooms: _.map(rooms, room => {
room.alias = convertFromInternalAlias.call(this, room.alias);
room.ingestOptions = room.ingest;
delete room.ingest;
return room;
})
}))
.catch(e => {
log.warn('[%s] Failed to list rooms', applicationId, e);
});
}
function roomapiDelete({roomId}, context, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertStringNotEmpty('applicationId', applicationId);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
return this._keyValueStore.fetch('chat', 'rooms', roomId, ['applicationId'])
.then(room => {
if (!room) {
return {status: 'not-found'};
}
if (room.applicationId !== applicationId) {
return {status: 'unauthorized'};
}
log.info('[%s] Deleting room [%s]', applicationId, roomId);
return this._keyValueStore.remove('chat', 'rooms', roomId)
.then(removed => {
if (!removed) {
log.warn('[%s] Failed to delete room [%s]', applicationId, roomId);
return {status: 'failed'};
}
return {status: 'ok'};
});
})
.catch({code: 'invalid-cluster'}, () => ({status: 'not-found'}));
}
function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertStringNotEmpty('sourceRoomId', sourceRoomId);
assert.assertArrayOfString('options', options);
assert.assertArrayOfString('streamCapabilities', streamCapabilities);
assert.assertArrayOfString('streamTags', streamTags);
assert.assertMoment('sentTime', sentTime);
assert.assertStringNotEmpty('applicationId', applicationId);
log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options);
if (!roomIdRegex.test(roomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
if (!roomIdRegex.test(sourceRoomId)) {
return Promise.resolve({status: 'invalid-room-id'});
}
return Promise.all([
roomapiGetMembers.call(this, {roomId}, {sentTime}),
roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime})
])
.spread((room, sourceRoom) => {
if (_.get(room, ['status']) !== 'ok') {
return {status: _.get(room, ['status'], 'no-get-channel-members-status')};
}
if (_.get(sourceRoom, ['status']) !== 'ok') {
return {status: _.get(sourceRoom, ['status'], 'no-get-channel-members-status')};
}
if (_.get(room, ['room', 'applicationId']) !== applicationId) {
return {status: 'unauthorized'};
}
if (_.get(sourceRoom, ['room', 'applicationId']) !== applicationId) {
return {status: 'unauthorized'};
}
let departingMembers = room.members;
let newMembers = sourceRoom.members;
if (_.includes(options, 'additive')) {
departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
}
if (!_.includes(options, 'force')) {
departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId));
newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId));
}
// Check for stale streams in existing members and refresh them if 'refresh-stale' option is set
const checkAndRefreshStaleMembers = () => {
if (!_.includes(options, 'refresh-stale')) {
return Promise.resolve();
}
// Find members that exist in both rooms (already forked but potentially stale)
const existingMembersInBoth = _.filter(room.members, existingMember =>
_.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId));
return Promise.map(existingMembersInBoth, existingMember => {
if (_.isEmpty(existingMember.streams)) {
return null;
}
// Check if any of the member's streams are stale
return Promise.map(existingMember.streams, stream => {
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
const streamId = parsedUri.pathname.replace(/%23/g, '#').slice(1);
return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending'])
.then(sourceInfo => ({
stream,
streamId,
isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending
}))
.catch(() => ({stream, streamId, isStale: true}));
})
.then(streamStatuses => {
const staleStreams = _.filter(streamStatuses, {isStale: true});
if (_.isEmpty(staleStreams)) {
return null; // All streams are healthy
}
log.info('[%s] [%s] Member [%s] has [%d] stale streams, will refresh', applicationId, roomId, existingMember.sessionId, staleStreams.length);
// Find the corresponding source member to get fresh stream URIs
const sourceMember = _.find(sourceRoom.members, m => m.sessionId === existingMember.sessionId);
if (!sourceMember) {
return null;
}
// Add to newMembers so they get re-processed with fresh mirrors
newMembers.push(sourceMember);
// Destroy the stale streams to clean up
return Promise.each(staleStreams, ({streamId}) => {
log.info('[%s] [%s] Destroying stale stream [%s] for member [%s]', applicationId, roomId, streamId, existingMember.sessionId);
return this._phenixMQ.request('stream.DestroyStream', {
streamId,
reason: 'stale-stream-refresh'
}).catch(() => {}); // Ignore errors - stream might already be gone
});
});
})
.then(() => {
// Remove duplicates in case member was already in newMembers
newMembers = _.uniqBy(newMembers, 'sessionId');
});
};
return checkAndRefreshStaleMembers()
.then(() => Promise.map(departingMembers, member => {
const leaveRoom = {
roomId,
sessionId: member.sessionId,
timestamp: _.now()
};
return this._phenixMQ.request('chat.LeaveRoom', leaveRoom)
.then(leaveRoomResponse => {
if (_.get(leaveRoomResponse, ['status']) !== 'ok') {
return leaveRoomResponse;
}
if (_.includes(options, 'keep-streams')) {
return leaveRoomResponse;
}
if (_.isEmpty(member.streams)) {
return leaveRoomResponse;
}
return Promise.each(member.streams, stream => {
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
const streamId = parsedUri.pathname.replace(/%23/g, '#')
.slice(1);
const reason = 'terminated-by-fork';
log.info('[%s] [%s] Terminating stream [%s] during forking of room with reason [%s]', applicationId, roomId, streamId, reason);
const destroyStream = {
streamId,
reason
};
return this._phenixMQ.request('stream.DestroyStream', destroyStream)
.then(destroyStreamResponse => {
if (_.get(destroyStreamResponse, ['status']) !== 'ok') {
log.warn('[%s] [%s] Terminating stream [%s] during forking of room failed [%j]', applicationId, roomId, streamId, destroyStreamResponse) ;
}
});
})
.then(() => leaveRoomResponse);
});
})
.then(leaveRoomResponses => {
const failedLeaveRoomResponse = _.find(leaveRoomResponses, response => response.status !== 'ok');
if (failedLeaveRoomResponse) {
return failedLeaveRoomResponse;
}
return Promise.map(newMembers, member => {
const joinRoom = {
roomId,
sessionId: member.sessionId,
member: _.clone(member),
options: ['reenter'],
timestamp: _.now()
};
return Promise.map(joinRoom.member.streams, stream => {
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
// Only provide capabilities list. In particular, drop wildcard token from origin.
parsedUri.query = {capabilities: streamCapabilities.join(',')};
parsedUri.search = undefined;
const originStreamId = parsedUri.pathname.replace(/%23/g, '#')
.slice(1);
const streamMirror = {
originStreamId,
capabilities: _.compact(streamCapabilities),
tags: _.union(_.compact(streamTags), ['x-phenix-fork']),
options: [],
region: desiredRegion
};
if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') {
streamMirror.tags.push(`channelId:${roomId}`);
streamMirror.tags.push(`channelAlias:${room.room.alias}`);
} else {
streamMirror.tags.push(`roomId:${roomId}`);
streamMirror.tags.push(`roomAlias:${room.room.alias}`);
}
return this._phenixMQ.request('stream.CreateMirror', streamMirror)
.then(createMirrorResponse => {
switch (createMirrorResponse.status) {
case 'ok':
break;
case 'origin-stream-ended':
log.info('[%s] [%s] Mirroring stream [%s] failed [%j]', applicationId, roomId, originStreamId, createMirrorResponse);
return {status: createMirrorResponse.status};
default:
log.warn('[%s] [%s] Mirroring stream [%s] failed [%j]', applicationId, roomId, originStreamId, createMirrorResponse);
return {status: createMirrorResponse.status};
}
const streamId = createMirrorResponse.payload.mirroredStreamId;
assert.assertStringNotEmpty('streamId', streamId);
parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23');
stream.uri = url.format(parsedUri)
.replace(/%23/g, '#');
return createMirrorResponse;
});
})
.then(createMirrorResponses => {
const failedCreateMirrorResponse = _.find(createMirrorResponses, response => _.get(response, ['status']) !== 'ok');
if (failedCreateMirrorResponse) {
return failedCreateMirrorResponse;
}
return this._phenixMQ.request('chat.JoinRoom', joinRoom);
});
})
.then(joinRoomResponses => {
const failedJoinRoomResponse = _.find(joinRoomResponses, response => _.get(response, ['status']) !== 'ok');
if (failedJoinRoomResponse) {
switch (failedJoinRoomResponse.status) {
case 'origin-stream-ended':
log.info('[%s] [%s] Join room failed [%j]', applicationId, roomId, joinRoomResponses);
return {status: failedJoinRoomResponse.status};
default:
log.warn('[%s] [%s] Join room failed [%j]', applicationId, roomId, joinRoomResponses);
return {status: failedJoinRoomResponse.status};
}
}
return roomapiGetMembers.call(this, {roomId}, {sentTime});
})
.then(updatedRoom => {
if (_.get(updatedRoom, ['status']) !== 'ok') {
return {status: _.get(updatedRoom, ['status'], 'no-get-channel-members-status')};
}
return {
status: 'ok',
members: updatedRoom.members
};
});
}));
})
.then(response => {
switch (response.status) {
case 'ok':
break;
case 'not-found':
log.info('[%s] [%s] Forking from room [%s] failed (streamCapabilities=[%s],streamTags=[%s],options=[%s],desiredRegion=[%s]) with response [%j]', applicationId, roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion, response);
break;
default:
log.warn('[%s] [%s] Forking from room [%s] failed (streamCapabilities=[%s],streamTags=[%s],options=[%s],desiredRegion=[%s]) with response [%j]', applicationId, roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion, response);
break;
}
return response;
});
}
function roomapiKill({roomId, reason, options}, {sentTime}, {applicationId}) {
assert.assertStringNotEmpty('roomId', roomId);
assert.assertString('reason', reason);
assert.assertArrayOfString('options', options);
assert.assertMoment('sentTime', sentTime);
assert.assertStringNotEmpty('applicationId', applicationId);
log.info('[%s] [%s] Killing room with reason [%s] and options [%s]', applicationId, roomId, reason, options);
return roomapiGetMembers.call(this, {roomId}, {sentTime})
.then(getChannelMembersResponse => {
if (_.get(getChannelMembersResponse, ['status']) !== 'ok') {
return {status: _.get(getChannelMembersResponse, ['status'], 'no-get-channel-members-status')};
}
if (getChannelMembersResponse.room.applicationId !== applicationId) {
return {status: 'unauthorized'};
}
return Promise.map(getChannelMembersResponse.members, member => {
const leaveRoom = {
roomId,
sessionId: member.sessionId,
timestamp: _.now()
};
return Promise.map(member.streams, stream => {
if (_.includes(options, 'keep-streams')) {
return {status: 'ok'};
}
const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23'));
const streamId = parsedUri.pathname.replace(/%23/g, '#')
.slice(1);
log.info('[%s] [%s] Killing stream [%s] with reason [%s]', applicationId, roomId, streamId, reason);
const destroyStream = {
streamId,
reason
};
return this._phenixMQ.request('stream.DestroyStream', destroyStream)
.then(destroyStreamResponse => {
if (_.get(destroyStreamResponse, ['status']) !== 'ok') {
log.warn('[%s] [%s] Destroying stream [%s] failed [%j]', applicationId, roomId, streamId, destroyStreamResponse);
}
return destroyStreamResponse;
});
})
.then(destroyStreamResponses => {
const failedDestroyStreamResponse = _.find(destroyStreamResponses, response => _.get(response, ['status']) !== 'ok');
if (_.includes(options, 'destroy-required') && failedDestroyStreamResponse) {
log.warn('[%s] [%s] Killing of stream failed [%j]', applicationId, roomId, destroyStreamResponses);
return failedDestroyStreamResponse;
}
return this._phenixMQ.request('chat.LeaveRoom', leaveRoom);
});
})
.then(leaveRoomResponses => {
const failedLeaveRoomResponse = _.find(leaveRoomResponses, response => response.status !== 'ok');
if (failedLeaveRoomResponse) {
log.warn('[%s] [%s] Leaving room failed [%j]', applicationId, roomId, leaveRoomResponses);
return failedLeaveRoomResponse;
}
return {
status: 'ok',
killedMembers: getChannelMembersResponse.members
};
});
});
}
function convertFromInternalAlias(aliasWithApplicationId) {
if (!aliasWithApplicationId) {
return aliasWithApplicationId;
}
assert.assertStringNotEmpty('aliasWithApplicationId', aliasWithApplicationId);
const idx = aliasWithApplicationId.indexOf('#');
return aliasWithApplicationId.substring(idx + 1);
}
function getLoadInfo() {
return Promise.all([
this._worker.getLoadInfo(),
this._roomManager.getRoomIdByAliasCacheUsage()
.then(({pendingOperationsCounter}) => _.map(pendingOperationsCounter, (count, operationName) => ({
name: `cache/room/alias/room/id/pending/${operationName}`,
value: count,
unit: 'Count',
flags: ['drainable']
}))),
this._roomManager.getRoomMembersByRoomIdCacheUsage()
.then(({pendingOperationsCounter}) => _.map(pendingOperationsCounter, (count, operationName) => ({
name: `cache/room/id/room/members/pending/${operationName}`,
value: count,
unit: 'Count',
flags: ['drainable']
})))
])
.then(loadInfos => loadInfos.flat());
}
return RoomApiMQWorker;
});
Now let me check for any lint errors:
The fix is in place. Now here's how the refresh-stale option works for your use case:
How to Use It
Call fork every 10 seconds with the refresh-stale option:
const forkRequest = {
roomId: destinationChannelId,
sourceRoomId: sourceChannelId,
streamCapabilities: ['streaming', 'hd'],
streamTags: [],
options: ['refresh-stale'], // <-- This is the key option
desiredRegion: 'us-west'
};
// Call this every 10 seconds
phenixMQ.request('roomapi.Fork', forkRequest);
Or via REST API:
PUT /channel/{destinationChannelId}/fork/{sourceChannelId}
{
"options": ["refresh-stale"],
"streamCapabilities": ["streaming", "hd"]
}
What Happens Now
| Scenario | Behavior |
|---|---|
| Member not in destination | Gets added with new mirrored streams |
| Member in destination with healthy streams | Skipped - no action |
| Member in destination with stale streams | Stale streams destroyed, fresh mirrors created |
The Flow
Fork Request with 'refresh-stale'
│
▼
┌─────────────────────────────────────────┐
│ 1. Get members from both rooms │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 2. For each member in BOTH rooms: │
│ - Check each stream in destination │
│ - Query keyValueStore for stream │
│ - If stream missing/ended/ending │
│ → Mark as STALE │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 3. For stale members: │
│ - Destroy stale streams │
│ - Add to "newMembers" list │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 4. Process newMembers: │
│ - Create fresh mirrors from source │
│ - Update member with new stream URIs │
│ - 'reenter' updates existing record │
└─────────────────────────────────────────┘
Key Code Section
// Check for stale streams in existing members and refresh them if 'refresh-stale' option is set
const checkAndRefreshStaleMembers = () => {
if (!_.includes(options, 'refresh-stale')) {
return Promise.resolve();
}
// Find members that exist in both rooms (already forked but potentially stale)
const existingMembersInBoth = _.filter(room.members, existingMember =>
_.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId));
return Promise.map(existingMembersInBoth, existingMember => {
// ... check each stream ...
return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending'])
.then(sourceInfo => ({
stream,
streamId,
isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending
}))
// ... if stale, destroy and add to newMembers ...
});
};
Important Notes
-
Source stream must still be alive - If the origin stream in the source channel also died, creating a new mirror will fail with
origin-stream-ended -
No duplicate processing - The
_.uniqBy(newMembers, 'sessionId')ensures a member isn't processed twice -
Stale stream cleanup - Old stale streams are explicitly destroyed with reason
stale-stream-refresh -
10 second interval is reasonable - The check is lightweight (just a keyValueStore fetch per stream), so frequent polling is fine
const forkRequest = {
roomId: destinationChannelId,
sourceRoomId: sourceChannelId,
streamCapabilities: ['streaming', 'hd'],
streamTags: [],
options: ['refresh-stale'], // <-- This is the key option
desiredRegion: 'us-west'
};
// Call this every 10 seconds
phenixMQ.request('roomapi.Fork', forkRequest);
const forkRequest = {
roomId: destinationChannelId,
sourceRoomId: sourceChannelId,
streamCapabilities: ['streaming', 'hd'],
streamTags: [],
options: ['refresh-stale'], // <-- This is the key option
desiredRegion: 'us-west'
};
// Call this every 10 seconds
phenixMQ.request('roomapi.Fork', forkRequest);
PUT /channel/{destinationChannelId}/fork/{sourceChannelId}
{
"options": ["refresh-stale"],
"streamCapabilities": ["streaming", "hd"]
}
Fork Request with 'refresh-stale'
│
▼
┌─────────────────────────────────────────┐
│ 1. Get members from both rooms │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 2. For each member in BOTH rooms: │
│ - Check each stream in destination │
│ - Query keyValueStore for stream │
│ - If stream missing/ended/ending │
│ → Mark as STALE │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 3. For stale members: │
│ - Destroy stale streams │
│ - Add to "newMembers" list │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 4. Process newMembers: │
│ - Create fresh mirrors from source │
│ - Update member with new stream URIs │
│ - 'reenter' updates existing record │
└─────────────────────────────────────────┘
// Check for stale streams in existing members and refresh them if 'refresh-stale' option is set
const checkAndRefreshStaleMembers = () => {
if (!_.includes(options, 'refresh-stale')) {
return Promise.resolve();
}
// Find members that exist in both rooms (already forked but potentially stale)
const existingMembersInBoth = _.filter(room.members, existingMember =>
_.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId));
return Promise.map(existingMembersInBoth, existingMember => {
// ... check each stream ...
return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending'])
.then(sourceInfo => ({
stream,
streamId,
isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending
}))
// ... if stale, destroy and add to newMembers ...
});
};
User
what does reeenter do?
Cursor
Let me find the reenter implementation:
The reenter option in chat.JoinRoom allows a member to update their existing membership record without causing errors or duplicate entries. Here's what it does:
What reenter Does
if (_.includes(options, 'reenter')) {
actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey)
.then(removed => {
if (removed) {
log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role);
if (member.role === 'Audience') {
return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]);
}
}
}));
}
Key behaviors:
-
Overwrites the existing member record - The
store()call at line 285 writes the new member data, replacing any existing record with the samesessionId|roomIdkey -
Handles role changes - If the member was previously in a different role set (e.g., was
Audience, now becomingPresenter), it removes them from the old set:- Members are stored in either
members/room(for Participant, Moderator, Presenter) ormembers/audience reenterremoves them from the other set if they existed there
- Members are stored in either
-
Broadcasts appropriate events - If the member was in the
roomset and is now becomingAudience, it broadcasts aMemberLeftevent
Without reenter vs With reenter
| Scenario | Without reenter |
With reenter |
|---|---|---|
| Member already in room | Could create duplicate/inconsistent state | Safely overwrites existing record |
| Member changing roles | Old record in wrong set remains | Old record cleaned up |
| Updating stream URIs | Record updated but no cleanup | Record updated with cleanup |
Why Fork Uses reenter
In the fork code:
const joinRoom = {
roomId,
sessionId: member.sessionId,
member: _.clone(member),
options: ['reenter'],
timestamp: _.now()
};
Fork uses reenter because:
- The member might already exist in the destination room (from a previous fork)
- The member's streams are being replaced with new mirrored streams
- We want to update, not duplicate -
reenterensures the member record is cleanly replaced with the new stream URIs
Visual Flow
Member A exists in destination room with stale stream M1
│
▼
Fork with 'refresh-stale'
│
▼
Stale stream M1 detected & destroyed
│
▼
New mirror M2 created from source
│
▼
JoinRoom with options: ['reenter']
│
▼
┌─────────────────────────────────────────┐
│ keyValueStore.store() overwrites │
│ member record with new stream URI (M2) │
└─────────────────────────────────────────┘
│
▼
Member A now has fresh stream M2
Without reenter, the member would need to explicitly leave and rejoin, which would trigger MemberLeft/MemberJoined events and potentially disrupt connected clients.
if (_.includes(options, 'reenter')) {
actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey)
.then(removed => {
if (removed) {
log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role);
if (member.role === 'Audience') {
return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]);
}
}
}));
}
const joinRoom = {
roomId,
sessionId: member.sessionId,
member: _.clone(member),
options: ['reenter'],
timestamp: _.now()
};
Member A exists in destination room with stale stream M1
│
▼
Fork with 'refresh-stale'
│
▼
Stale stream M1 detected & destroyed
│
▼
New mirror M2 created from source
│
▼
JoinRoom with options: ['reenter']
│
▼
┌─────────────────────────────────────────┐
│ keyValueStore.store() overwrites │
│ member record with new stream URI (M2) │
└─────────────────────────────────────────┘
│
▼
Member A now has fresh stream M2