# Walk through forking process _Exported on 12/8/2025 at 05:00:24 EST from Cursor (2.1.50)_ --- **User** Walk me through the entire forking process --- **Cursor** I'll search for the forking process in the codebase to understand how it works. Let me read the complete forking implementation to understand the full process: Let me also find the `CreateMirror` stream handler and the HTTP routes: Let me read the full `streamCreateMirror` function and also check the HTTP routes: Let me also look at the `createScaleOutStream` function to understand the stream mirroring mechanism: Now I have a complete picture of the forking process. Let me walk you through the entire flow: --- ## Forking Process Overview The **forking** feature allows you to take all the members (and their streams) from a **source room/channel** and move them to a **destination room/channel**. This is useful for scenarios like redirecting an audience from one channel to another. --- ## 1. API Entry Point Forking is exposed via REST API routes: - **Channels:** `PUT /channel/:channelId/fork/:channelIdToFork` - **Rooms:** `PUT /room/:roomId/fork/:roomIdToFork` ```500:539:platform/src/pcast/frontend/RoomApiRoute.js function forkRoom(req, res, credentials) { const roomId = req.params.roomId; const roomIdToFork = req.params.roomIdToFork; const streamCapabilities = _.get(req.body, ['streamCapabilities'], []); const streamTags = _.get(req.body, ['streamTags'], []); const options = _.get(req.body, ['options'], []); const desiredRegion = _.get(req.body, ['desiredRegion'], undefined); // ... validation ... const forkRoom = { credentials, roomId, sourceRoomId: roomIdToFork, streamCapabilities, streamTags, options, desiredRegion }; ``` The route handler validates the request and sends an MQ request to `roomapi.Fork`. --- ## 2. Core Fork Logic (`roomapiFork`) The main orchestration happens in `RoomApiMQWorker.js`: ```510:527:platform/src/pcast/chat/RoomApiMQWorker.js function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertStringNotEmpty('sourceRoomId', sourceRoomId); assert.assertArrayOfString('options', options); assert.assertArrayOfString('streamCapabilities', streamCapabilities); assert.assertArrayOfString('streamTags', streamTags); // ... log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options); ``` ### Step 2a: Fetch Members from Both Rooms ```529:532:platform/src/pcast/chat/RoomApiMQWorker.js return Promise.all([ roomapiGetMembers.call(this, {roomId}, {sentTime}), roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime}) ]) ``` ### Step 2b: Determine Departing and New Members Based on the `options` provided: ```550:560:platform/src/pcast/chat/RoomApiMQWorker.js let departingMembers = room.members; let newMembers = sourceRoom.members; if (_.includes(options, 'additive')) { departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); } if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } ``` **Options:** - **`additive`**: Only remove members that exist in both rooms - **`force`**: Don't filter out overlapping members (replace all) - **`keep-streams`**: Don't destroy streams when members leave --- ## 3. Remove Departing Members For each departing member: 1. Send a `chat.LeaveRoom` request 2. Unless `keep-streams` option is set, destroy their streams with reason `terminated-by-fork` ```562:604:platform/src/pcast/chat/RoomApiMQWorker.js return Promise.map(departingMembers, member => { const leaveRoom = { roomId, sessionId: member.sessionId, timestamp: _.now() }; return this._phenixMQ.request('chat.LeaveRoom', leaveRoom) .then(leaveRoomResponse => { // ... if (_.includes(options, 'keep-streams')) { return leaveRoomResponse; } // ... return Promise.each(member.streams, stream => { // ... const reason = 'terminated-by-fork'; const destroyStream = { streamId, reason }; return this._phenixMQ.request('stream.DestroyStream', destroyStream) }) }); }) ``` --- ## 4. Add New Members and Mirror Streams For each new member from the source room, the system: ### Step 4a: Mirror Each Stream For each stream the member has, create a **mirror stream** via `stream.CreateMirror`: ```622:672:platform/src/pcast/chat/RoomApiMQWorker.js return Promise.map(joinRoom.member.streams, stream => { const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); // Only provide capabilities list. In particular, drop wildcard token from origin. parsedUri.query = {capabilities: streamCapabilities.join(',')}; parsedUri.search = undefined; const originStreamId = parsedUri.pathname.replace(/%23/g, '#') .slice(1); const streamMirror = { originStreamId, capabilities: _.compact(streamCapabilities), tags: _.union(_.compact(streamTags), ['x-phenix-fork']), options: [], region: desiredRegion, canForward: false }; // Add room/channel tags if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') { streamMirror.tags.push(`channelId:${roomId}`); streamMirror.tags.push(`channelAlias:${room.room.alias}`); } else { streamMirror.tags.push(`roomId:${roomId}`); streamMirror.tags.push(`roomAlias:${room.room.alias}`); } return this._phenixMQ.request('stream.CreateMirror', streamMirror) ``` ### Step 4b: Update Stream URI Once the mirrored stream is created, the stream URI is updated to point to the new mirrored stream: ```663:669:platform/src/pcast/chat/RoomApiMQWorker.js const streamId = createMirrorResponse.payload.mirroredStreamId; // ... parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23'); stream.uri = url.format(parsedUri) .replace(/%23/g, '#'); ``` ### Step 4c: Join Member to Destination Room After all streams are mirrored, the member joins the destination room: ```681:682:platform/src/pcast/chat/RoomApiMQWorker.js return this._phenixMQ.request('chat.JoinRoom', joinRoom); ``` --- ## 5. Stream Mirroring (`streamCreateMirror`) The `stream.CreateMirror` handler in `StreamManagementMQWorker.js` creates a federated copy of the origin stream: ```3686:3709:platform/src/pcast/session/StreamManagementMQWorker.js function streamCreateMirror({originStreamId, capabilities, tags, options, region, canForward}) { assert.assertStringNotEmpty('originStreamId', originStreamId); assert.assertArrayOfString('tags', tags); if (canForward && region && region !== this._regionName) { // Forward to the desired region return this._phenixMQ.request('stream.CreateMirror', { originStreamId, capabilities, tags, options, region, canForward: false }, {region}) .then(({status, payload}) => { if (status !== 'ok') { return {status}; } return payload; }); } ``` It then calls `createScaleOutStream` which: 1. **Selects a target instance** using load balancers (primary/secondary fanout) 2. **Creates TX (transmit) federation stream** on the origin instance 3. **Creates RX (receive) federation stream** on the target instance 4. **Exchanges SDP offer/answer** between the two streams 5. **Registers the stream** in the source manager ```2351:2488:platform/src/pcast/session/StreamManagementMQWorker.js // Request goes to the instance hosting the uploader return requestCreateStreamWithOriginStream.call(this, createFederationTx) .then(createFederationTxResponse => { // ... setup TX side ... const createFederationRx = { applicationId, sessionId, capabilities: _.union(federatedCapabilities, capabilitiesRx), // ... setRemoteDescription: { streamId: '', sessionDescription, /* ... */ }, createAnswerDescription: { streamId: '', options: ['upload', 'skip-set-local-description'], /* ... */ } }; // Place the federated stream on the selected instance return Promise.all([ this._phenixMQ.requestDirect(instance.routeKey, 'pcast.CreateStream', createFederationRx), registerStream.call(this, /* ... */) ]) // ... complete SDP exchange ... ``` --- ## Summary Flow Diagram ``` API Request (PUT /channel/:channelId/fork/:sourceChannelId) │ ▼ ChannelApiRoute.forkChannel() │ ▼ MQ: roomapi.Fork │ ▼ RoomApiMQWorker.roomapiFork() │ ├─► Get members from destination room ├─► Get members from source room │ ├─► For each departing member: │ └─► chat.LeaveRoom │ └─► stream.DestroyStream (unless keep-streams) │ └─► For each new member: ├─► For each stream: │ └─► stream.CreateMirror │ └─► StreamManagementMQWorker.streamCreateMirror() │ └─► createScaleOutStream() │ ├─► Create TX federation stream on origin │ ├─► Create RX federation stream on target instance │ └─► Exchange SDP offer/answer │ └─► chat.JoinRoom (with updated stream URIs) ``` The result is that viewers in the destination channel now receive the streams from the source channel's publishers, via new mirrored/federated streams. ```javascript function forkRoom(req, res, credentials) { const roomId = req.params.roomId; const roomIdToFork = req.params.roomIdToFork; const streamCapabilities = _.get(req.body, ['streamCapabilities'], []); const streamTags = _.get(req.body, ['streamTags'], []); const options = _.get(req.body, ['options'], []); const desiredRegion = _.get(req.body, ['desiredRegion'], undefined); // ... validation ... const forkRoom = { credentials, roomId, sourceRoomId: roomIdToFork, streamCapabilities, streamTags, options, desiredRegion }; ``` ```javascript function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertStringNotEmpty('sourceRoomId', sourceRoomId); assert.assertArrayOfString('options', options); assert.assertArrayOfString('streamCapabilities', streamCapabilities); assert.assertArrayOfString('streamTags', streamTags); // ... log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options); ``` ```javascript return Promise.all([ roomapiGetMembers.call(this, {roomId}, {sentTime}), roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime}) ]) ``` ```javascript let departingMembers = room.members; let newMembers = sourceRoom.members; if (_.includes(options, 'additive')) { departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); } if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } ``` ```javascript return Promise.map(departingMembers, member => { const leaveRoom = { roomId, sessionId: member.sessionId, timestamp: _.now() }; return this._phenixMQ.request('chat.LeaveRoom', leaveRoom) .then(leaveRoomResponse => { // ... if (_.includes(options, 'keep-streams')) { return leaveRoomResponse; } // ... return Promise.each(member.streams, stream => { // ... const reason = 'terminated-by-fork'; const destroyStream = { streamId, reason }; return this._phenixMQ.request('stream.DestroyStream', destroyStream) }) }); }) ``` ```javascript return Promise.map(joinRoom.member.streams, stream => { const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); // Only provide capabilities list. In particular, drop wildcard token from origin. parsedUri.query = {capabilities: streamCapabilities.join(',')}; parsedUri.search = undefined; const originStreamId = parsedUri.pathname.replace(/%23/g, '#') .slice(1); const streamMirror = { originStreamId, capabilities: _.compact(streamCapabilities), tags: _.union(_.compact(streamTags), ['x-phenix-fork']), options: [], region: desiredRegion, canForward: false }; // Add room/channel tags if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') { streamMirror.tags.push(`channelId:${roomId}`); streamMirror.tags.push(`channelAlias:${room.room.alias}`); } else { streamMirror.tags.push(`roomId:${roomId}`); streamMirror.tags.push(`roomAlias:${room.room.alias}`); } return this._phenixMQ.request('stream.CreateMirror', streamMirror) ``` ```javascript const streamId = createMirrorResponse.payload.mirroredStreamId; // ... parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23'); stream.uri = url.format(parsedUri) .replace(/%23/g, '#'); ``` ```javascript return this._phenixMQ.request('chat.JoinRoom', joinRoom); ``` ```javascript function streamCreateMirror({originStreamId, capabilities, tags, options, region, canForward}) { assert.assertStringNotEmpty('originStreamId', originStreamId); assert.assertArrayOfString('tags', tags); if (canForward && region && region !== this._regionName) { // Forward to the desired region return this._phenixMQ.request('stream.CreateMirror', { originStreamId, capabilities, tags, options, region, canForward: false }, {region}) .then(({status, payload}) => { if (status !== 'ok') { return {status}; } return payload; }); } ``` ```javascript // Request goes to the instance hosting the uploader return requestCreateStreamWithOriginStream.call(this, createFederationTx) .then(createFederationTxResponse => { // ... setup TX side ... const createFederationRx = { applicationId, sessionId, capabilities: _.union(federatedCapabilities, capabilitiesRx), // ... setRemoteDescription: { streamId: '', sessionDescription, /* ... */ }, createAnswerDescription: { streamId: '', options: ['upload', 'skip-set-local-description'], /* ... */ } }; // Place the federated stream on the selected instance return Promise.all([ this._phenixMQ.requestDirect(instance.routeKey, 'pcast.CreateStream', createFederationRx), registerStream.call(this, /* ... */) ]) // ... complete SDP exchange ... ``` ```plaintext API Request (PUT /channel/:channelId/fork/:sourceChannelId) │ ▼ ChannelApiRoute.forkChannel() │ ▼ MQ: roomapi.Fork │ ▼ RoomApiMQWorker.roomapiFork() │ ├─► Get members from destination room ├─► Get members from source room │ ├─► For each departing member: │ └─► chat.LeaveRoom │ └─► stream.DestroyStream (unless keep-streams) │ └─► For each new member: ├─► For each stream: │ └─► stream.CreateMirror │ └─► StreamManagementMQWorker.streamCreateMirror() │ └─► createScaleOutStream() │ ├─► Create TX federation stream on origin │ ├─► Create RX federation stream on target instance │ └─► Exchange SDP offer/answer │ └─► chat.JoinRoom (with updated stream URIs) ``` --- **User** what happens if a second fork requests comes in for the same source and destination channels --- **Cursor** ```javascript let departingMembers = room.members; let newMembers = sourceRoom.members; if (_.includes(options, 'additive')) { departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); } if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } ``` Let me look at how duplicate fork requests are handled: Great question! Here's what happens when a **second fork request** comes in for the same source and destination channels: --- ## Default Behavior (No `force` Option) The key logic is in the member filtering: ```557:560:platform/src/pcast/chat/RoomApiMQWorker.js if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } ``` ### What happens: 1. **`newMembers` is filtered** to exclude any member from the source room whose `sessionId` already exists in the destination room 2. **Result**: If the first fork already moved members A, B, C from source → destination, a second fork will find that A, B, C already exist in the destination room and filter them out of `newMembers` 3. **No duplicate streams created** - The second fork becomes essentially a **no-op** if all members are already present 4. **Returns `status: 'ok'`** with the current members of the destination room --- ## With `force` Option If the second fork includes `options: ['force']`: ```557:560:platform/src/pcast/chat/RoomApiMQWorker.js if (!_.includes(options, 'force')) { // This filtering is SKIPPED with 'force' option departingMembers = ... newMembers = ... } ``` ### What happens: 1. **No filtering occurs** - All source room members are treated as "new members" 2. **For each member's streams**, a new `stream.CreateMirror` is called, creating **additional mirrored streams** for the same origin 3. **`reenter` option** is used when joining: ```618:618:platform/src/pcast/chat/RoomApiMQWorker.js options: ['reenter'], ``` This allows the member to rejoin, updating their member record (including the new stream URIs): ```298:308:platform/src/pcast/chat/RoomMQWorker.js if (_.includes(options, 'reenter')) { actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey) .then(removed => { if (removed) { log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role); if (member.role === 'Audience') { return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]); } } })); } ``` 4. **Result**: The member's streams are **replaced** with new mirrored streams. The **old mirrored streams become orphaned** (they'll eventually clean up when they detect no subscribers, but they're not explicitly destroyed) --- ## Race Conditions If two fork requests arrive **simultaneously** (before either completes): | Step | Fork Request 1 | Fork Request 2 | |------|----------------|----------------| | 1 | Gets members (dest empty) | Gets members (dest empty) | | 2 | Sees member A as "new" | Sees member A as "new" | | 3 | Creates mirror stream M1 | Creates mirror stream M2 | | 4 | Joins member A with M1 | Joins member A with M2 | | 5 | **Member A now has stream M2** | (overwrites M1) | **Result**: - Member A ends up with only M2 (the last one to write wins) - M1 becomes an **orphaned stream** that will eventually be cleaned up - **No explicit protection** against this race exists - it relies on the eventual consistency of the system --- ## Summary | Scenario | Departing Members | New Members | Streams Created | Member State | |----------|-------------------|-------------|-----------------|--------------| | 2nd fork (default) | None (already departed) | None (already exist) | None | Unchanged | | 2nd fork (`force`) | All non-overlapping | All source members | New mirrors for all | Updated with new stream URIs | | Concurrent forks | Race condition | Race condition | Multiple (orphaned) | Last write wins | The system is designed to be **idempotent by default** - calling fork twice without `force` safely does nothing the second time. ```javascript if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } ``` ```javascript if (!_.includes(options, 'force')) { // This filtering is SKIPPED with 'force' option departingMembers = ... newMembers = ... } ``` ```javascript options: ['reenter'], ``` ```javascript if (_.includes(options, 'reenter')) { actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey) .then(removed => { if (removed) { log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role); if (member.role === 'Audience') { return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]); } } })); } ``` --- **User** what happens if a stream is terminated in the destination channel that's still active in the sourec channel --- **Cursor** Let me investigate how stream termination is handled and whether it cascades back to the source: Now I have the complete picture. Here's what happens when a **mirrored stream in the destination channel is terminated**, while the source stream is still active: --- ## Stream Relationship Architecture When you fork a channel, here's the stream hierarchy: ``` Source Channel: └── Publisher's Origin Stream (streamId: "origin-123") │ ├── [TX Federation Stream] ──────────────────────┐ │ (outgoing, on same instance as origin) │ │ │ WebRTC connection │ │ Destination Channel: │ └── [RX Federation Stream / Mirrored Stream] ◄─────────┘ (streamId: "mirror-456", upstreamId: "origin-123") ``` --- ## What Happens When the Mirrored Stream Ends ### 1. **Source Stream is NOT Terminated** The termination of a mirrored (federated) stream does **not** cascade back to the source. Looking at `StreamEventMQWorker.js`: ```270:282:platform/src/pcast/session/StreamEventMQWorker.js if (stream.internal && _.includes(stream.options, optionFederated)) { if (!_.isString(stream.upstreamId)) { log.error('[%s] Internal streams must have an upstream id', streamId); return; } return this._streamSourceManager.unregisterActiveFederatedStream(stream.upstreamId, stream.region, streamId) .then(removed => { if (removed) { log.info('[%s] Federated stream for stream [%s] ended', streamId, stream.upstreamId); } }); } ``` It only **unregisters** the federated stream from the source manager - it does NOT destroy the upstream/origin stream. ### 2. **The TX Side Federation Stream Ends** When the RX (receiving) side ends, the TX (transmitting) side gets cleaned up via the WebRTC connection teardown: ```9334:9350:platform/src/pcast/streaming/StreamingMQWorker.js if (stream.originStreamId) { const originStream = this._streams.get(stream.originStreamId); if (originStream) { originStream.subscribers.delete(stream.streamId); if (_.includes(originStream.options, optionIdleNotification) && originStream.subscribers.count() === 0) { return this._phenixMQ.publish('pcast.StreamIdle', { timestamp: Date.now(), sessionId: originStream.sessionId, streamId: originStream.streamId, options: originStream.options, tags: originStream.tags }); } } } ``` - The ended stream is removed from the origin's `subscribers` set - If no subscribers remain and `idleNotification` is enabled, a `StreamIdle` event is published (but the origin still lives) ### 3. **Member Still Exists in Destination Channel** The member record in the destination channel is **not automatically updated**. The member's `streams` array still contains the now-dead stream URI. This means: - Subscribers in the destination channel trying to use that stream URI will get `stream-ended` or `not-found` errors - The member appears to have a stream, but it's not actually active --- ## Recovery Options ### Option A: Fork Again Call fork again with the same source/destination. Without `force`: - Members already in destination are filtered out - **Nothing happens** - the stale member record remains With `force`: - New mirrored streams are created - Member record is updated with new stream URIs via `reenter` ### Option B: Republish/Re-mirror Manually The source stream is still alive, so you could: 1. Call `stream.CreateMirror` again for the origin stream 2. Update the member's streams in the destination room ### Option C: Source Stream Terminates All Federated Copies If the **source** stream ends, it cascades DOWN to terminate all federated copies: ```359:408:platform/src/pcast/session/StreamEventMQWorker.js function terminateFederatedStreams(streamId, reason, stream) { // ... return Promise.all(this._activeRegions) .each(region => this._streamSourceManager.getActiveAndIdleFederatedStreams(streamId, region) .each(destroyStream) // ... .then(() => { // Also scan for child streams by parentStreamId return this._keyValueStore.scan('streams', 'destination', onData, { bins: ['streamId'], query: {parentStreamId: streamId}, // ... }) .then(() => streams) .each(destroyStream); }); } ``` This is **one-directional**: Source → Federated copies, NOT the reverse. --- ## Summary Diagram ``` If Mirrored Stream in Destination Ends: Source Channel Destination Channel ┌────────────────────┐ ┌────────────────────┐ │ Origin Stream │ │ Mirrored Stream │ │ (still active ✓) │ │ (TERMINATED ✗) │ └────────────────────┘ └────────────────────┘ │ │ │ ▼ │ Federation registration │ removed from SourceManager │ ▼ TX federation stream Member record in room is cleaned up (no cascade) still has stale stream URI ``` | Component | State After Mirrored Stream Ends | |-----------|----------------------------------| | Source stream | Still active | | TX federation stream | Cleaned up | | RX mirrored stream | Destroyed | | Member in destination | Still present, but with stale stream URIs | | Viewers in destination | Will fail to subscribe | ```plaintext Source Channel: └── Publisher's Origin Stream (streamId: "origin-123") │ ├── [TX Federation Stream] ──────────────────────┐ │ (outgoing, on same instance as origin) │ │ │ WebRTC connection │ │ Destination Channel: │ └── [RX Federation Stream / Mirrored Stream] ◄─────────┘ (streamId: "mirror-456", upstreamId: "origin-123") ``` ```javascript if (stream.internal && _.includes(stream.options, optionFederated)) { if (!_.isString(stream.upstreamId)) { log.error('[%s] Internal streams must have an upstream id', streamId); return; } return this._streamSourceManager.unregisterActiveFederatedStream(stream.upstreamId, stream.region, streamId) .then(removed => { if (removed) { log.info('[%s] Federated stream for stream [%s] ended', streamId, stream.upstreamId); } }); } ``` ```javascript if (stream.originStreamId) { const originStream = this._streams.get(stream.originStreamId); if (originStream) { originStream.subscribers.delete(stream.streamId); if (_.includes(originStream.options, optionIdleNotification) && originStream.subscribers.count() === 0) { return this._phenixMQ.publish('pcast.StreamIdle', { timestamp: Date.now(), sessionId: originStream.sessionId, streamId: originStream.streamId, options: originStream.options, tags: originStream.tags }); } } } ``` ```javascript function terminateFederatedStreams(streamId, reason, stream) { // ... return Promise.all(this._activeRegions) .each(region => this._streamSourceManager.getActiveAndIdleFederatedStreams(streamId, region) .each(destroyStream) // ... .then(() => { // Also scan for child streams by parentStreamId return this._keyValueStore.scan('streams', 'destination', onData, { bins: ['streamId'], query: {parentStreamId: streamId}, // ... }) .then(() => streams) .each(destroyStream); }); } ``` ```plaintext If Mirrored Stream in Destination Ends: Source Channel Destination Channel ┌────────────────────┐ ┌────────────────────┐ │ Origin Stream │ │ Mirrored Stream │ │ (still active ✓) │ │ (TERMINATED ✗) │ └────────────────────┘ └────────────────────┘ │ │ │ ▼ │ Federation registration │ removed from SourceManager │ ▼ TX federation stream Member record in room is cleaned up (no cascade) still has stale stream URI ``` --- **User** is it possible to detect the stale member stream and update it's source stream uri only when it' stale? --- **Cursor** Good question! Let me explore what mechanisms exist for detecting stale streams and potential solutions: Great question! This is something that **doesn't exist automatically today**, but there are building blocks that could enable it. Let me explain the current state and potential solutions: --- ## Current Detection Capabilities ### 1. Stream Existence Check When subscribing to a stream, the system checks if it exists: ```1279:1283:platform/src/pcast/session/StreamManagementMQWorker.js return this._streamSourceManager.getSourceStreamInfo(createStream.originStreamId) .then(originStream => { if (!originStream) { return {status: 'origin-not-found'}; } ``` **Result**: Subscribers get `origin-not-found` or `origin-stream-ended` when trying to subscribe to a stale stream. ### 2. Forked Streams Are Tagged Mirrored streams created by fork have a special tag: ```634:634:platform/src/pcast/chat/RoomApiMQWorker.js tags: _.union(_.compact(streamTags), ['x-phenix-fork']), ``` ### 3. Upstream ID is Stored The mirrored stream stores a reference to its source: ```2816:2816:platform/src/pcast/session/StreamManagementMQWorker.js this._keyValueStore.store('streams', 'source', federatedStreamResult.streamId, {upstreamId: originStreamId}) ``` --- ## What Doesn't Exist Today There's **no automatic mechanism** that: 1. Periodically validates member stream URIs 2. Detects stale forked streams 3. Auto-recreates mirrors from the original source The member record simply holds stream URIs and doesn't actively validate them. --- ## Potential Solutions ### Option A: Lazy Validation on `GetMembers` / `GetStreams` You could modify `roomapiGetMembers` or `roomapiGetStreams` to validate streams: ```javascript // Pseudocode - not currently implemented function roomapiGetMembersWithValidation({roomId}, {sentTime}, credentials) { return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials) .then(({status, room, members}) => { if (status !== 'ok') return {status}; return Promise.map(members, member => { return Promise.map(member.streams, stream => { const streamId = extractStreamIdFromUri(stream.uri); return this._streamSourceManager.getSourceStreamInfo(streamId) .then(sourceInfo => { if (sourceInfo) { return stream; // Stream is still valid } // Stream is stale - check if it was a fork return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId']) .then(data => { const upstreamId = data?.upstreamId; if (!upstreamId) return null; // Can't recover // Check if upstream still exists return this._streamSourceManager.getSourceStreamInfo(upstreamId) .then(upstreamInfo => { if (!upstreamInfo) return null; // Source also gone // Re-create mirror from upstream return this._phenixMQ.request('stream.CreateMirror', { originStreamId: upstreamId, capabilities: extractCapabilitiesFromUri(stream.uri), tags: ['x-phenix-fork', `channelId:${roomId}`], // ... }) .then(mirrorResult => { if (mirrorResult.status !== 'ok') return null; // Update stream URI and member record stream.uri = buildNewUri(mirrorResult.mirroredStreamId); return stream; }); }); }); }); }) .then(validatedStreams => { member.streams = _.compact(validatedStreams); return member; }); }) .then(validatedMembers => ({status: 'ok', room, members: validatedMembers})); }); } ``` **Pros**: Transparent to clients, streams auto-heal **Cons**: Adds latency to GetMembers calls, complexity --- ### Option B: Background Stream Health Worker Add a worker that periodically validates forked streams: ```javascript // Pseudocode - not currently implemented function validateForkedStreams(roomId) { return roomapiGetMembers({roomId}) .then(({members}) => { return Promise.map(members, member => { return Promise.map(member.streams, stream => { if (!hasTag(stream, 'x-phenix-fork')) return; // Skip non-forked const streamId = extractStreamIdFromUri(stream.uri); return this._streamSourceManager.getSourceStreamInfo(streamId) .then(info => { if (info) return; // Still valid // Stale - trigger re-mirror return remirrorStream(member, stream, roomId); }); }); }); }); } // Run periodically scheduler.schedule(validateForkedStreams, interval); ``` **Pros**: Doesn't impact normal request latency **Cons**: Stale period between checks, more infrastructure --- ### Option C: Event-Driven Re-mirroring Listen for stream end events and proactively re-mirror: ```javascript // In StreamEventMQWorker.js function processSourceStreamEnded({streamId, reason, tags}) { // Existing code... // NEW: If this was a forked stream, try to re-mirror if (_.includes(tags, 'x-phenix-fork')) { return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId']) .then(data => { if (!data?.upstreamId) return; // Find which room this was in const roomTag = _.find(tags, t => t.startsWith('channelId:') || t.startsWith('roomId:')); if (!roomTag) return; const roomId = roomTag.split(':')[1]; // Re-mirror and update member return remirrorAndUpdateMember(roomId, streamId, data.upstreamId); }); } } ``` **Pros**: Real-time recovery, minimal latency **Cons**: Complexity, race conditions, need to update member records --- ### Option D: Client-Side Retry with Fallback Expose the `upstreamId` in stream metadata so clients can retry: ```javascript // Client-side logic function subscribeToStream(streamUri) { return sdk.subscribe(streamUri) .catch(error => { if (error.status === 'origin-not-found') { // Fetch upstream and retry return getUpstreamId(streamUri) .then(upstreamId => { if (upstreamId) { return sdk.subscribe(buildUri(upstreamId)); } throw error; }); } throw error; }); } ``` **Pros**: No backend changes needed **Cons**: Exposes internal details, client complexity --- ## Recommended Approach For minimal changes with good results, I'd suggest **Option C (Event-Driven)** combined with storing more metadata: 1. **Store room/channel context** in the forked stream record: ```javascript this._keyValueStore.store('streams', 'source', mirroredStreamId, { upstreamId: originStreamId, roomId: roomId, memberId: member.sessionId }); ``` 2. **On fork stream end**, check if upstream is alive and re-mirror: ```javascript if (_.includes(tags, 'x-phenix-fork') && reason !== 'origin-stream-ended') { // Only re-mirror if the source stream is still alive attemptRemirror(streamId); } ``` 3. **Update member record** with new stream URI Would you like me to draft a more detailed implementation for any of these approaches? ```javascript return this._streamSourceManager.getSourceStreamInfo(createStream.originStreamId) .then(originStream => { if (!originStream) { return {status: 'origin-not-found'}; } ``` ```javascript tags: _.union(_.compact(streamTags), ['x-phenix-fork']), ``` ```javascript this._keyValueStore.store('streams', 'source', federatedStreamResult.streamId, {upstreamId: originStreamId}) ``` ```javascript // Pseudocode - not currently implemented function roomapiGetMembersWithValidation({roomId}, {sentTime}, credentials) { return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials) .then(({status, room, members}) => { if (status !== 'ok') return {status}; return Promise.map(members, member => { return Promise.map(member.streams, stream => { const streamId = extractStreamIdFromUri(stream.uri); return this._streamSourceManager.getSourceStreamInfo(streamId) .then(sourceInfo => { if (sourceInfo) { return stream; // Stream is still valid } // Stream is stale - check if it was a fork return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId']) .then(data => { const upstreamId = data?.upstreamId; if (!upstreamId) return null; // Can't recover // Check if upstream still exists return this._streamSourceManager.getSourceStreamInfo(upstreamId) .then(upstreamInfo => { if (!upstreamInfo) return null; // Source also gone // Re-create mirror from upstream return this._phenixMQ.request('stream.CreateMirror', { originStreamId: upstreamId, capabilities: extractCapabilitiesFromUri(stream.uri), tags: ['x-phenix-fork', `channelId:${roomId}`], // ... }) .then(mirrorResult => { if (mirrorResult.status !== 'ok') return null; // Update stream URI and member record stream.uri = buildNewUri(mirrorResult.mirroredStreamId); return stream; }); }); }); }); }) .then(validatedStreams => { member.streams = _.compact(validatedStreams); return member; }); }) .then(validatedMembers => ({status: 'ok', room, members: validatedMembers})); }); } ``` ```javascript // Pseudocode - not currently implemented function validateForkedStreams(roomId) { return roomapiGetMembers({roomId}) .then(({members}) => { return Promise.map(members, member => { return Promise.map(member.streams, stream => { if (!hasTag(stream, 'x-phenix-fork')) return; // Skip non-forked const streamId = extractStreamIdFromUri(stream.uri); return this._streamSourceManager.getSourceStreamInfo(streamId) .then(info => { if (info) return; // Still valid // Stale - trigger re-mirror return remirrorStream(member, stream, roomId); }); }); }); }); } // Run periodically scheduler.schedule(validateForkedStreams, interval); ``` ```javascript // In StreamEventMQWorker.js function processSourceStreamEnded({streamId, reason, tags}) { // Existing code... // NEW: If this was a forked stream, try to re-mirror if (_.includes(tags, 'x-phenix-fork')) { return this._keyValueStore.fetch('streams', 'source', streamId, ['upstreamId']) .then(data => { if (!data?.upstreamId) return; // Find which room this was in const roomTag = _.find(tags, t => t.startsWith('channelId:') || t.startsWith('roomId:')); if (!roomTag) return; const roomId = roomTag.split(':')[1]; // Re-mirror and update member return remirrorAndUpdateMember(roomId, streamId, data.upstreamId); }); } } ``` ```javascript // Client-side logic function subscribeToStream(streamUri) { return sdk.subscribe(streamUri) .catch(error => { if (error.status === 'origin-not-found') { // Fetch upstream and retry return getUpstreamId(streamUri) .then(upstreamId => { if (upstreamId) { return sdk.subscribe(buildUri(upstreamId)); } throw error; }); } throw error; }); } ``` ```javascript this._keyValueStore.store('streams', 'source', mirroredStreamId, { upstreamId: originStreamId, roomId: roomId, memberId: member.sessionId }); ``` ```javascript if (_.includes(tags, 'x-phenix-fork') && reason !== 'origin-stream-ended') { // Only re-mirror if the source stream is still alive attemptRemirror(streamId); } ``` --- **User** The user is calling fork every 10 seconds, if the streams are OK then nothing happens, but the intention is for the streams to get replaced if they've gone stale --- **Cursor** Ah, I see the issue now! The current fork logic has a gap: **Without `force`:** Members already in destination are skipped entirely (including their stale streams) **With `force`:** ALL streams get replaced, even healthy ones (wasteful, creates orphans) What you need is a **"smart fork"** that only replaces stale streams. Let me look at where this logic could be added: I see there's already a `refresh-stale` option implemented in the code! Let me verify this is actual code and check if `_streamSourceManager` is available: I see the problem! The `refresh-stale` code uses `this._streamSourceManager` but `RoomApiMQWorker` doesn't have access to it. Looking at line 36: ```36:66:platform/src/pcast/chat/RoomApiMQWorker.js init(phenixMQ, keyValueStore, regionService, roomManager) { // ... this._phenixMQ = phenixMQ; this._keyValueStore = keyValueStore; this._regionService = regionService; this._roomManager = roomManager; }, ``` The `StreamSourceManager` isn't injected. Let me check if `RoomManager` has it: ```javascript init(phenixMQ, keyValueStore, regionService, roomManager) { // ... this._phenixMQ = phenixMQ; this._keyValueStore = keyValueStore; this._regionService = regionService; this._roomManager = roomManager; }, ``` `RoomManager` has `_streamSourceManager`. The `refresh-stale` code in `RoomApiMQWorker` needs to either: 1. Access it via `this._roomManager._streamSourceManager` (hacky) 2. Add a method to `RoomManager` to check stream health 3. Inject `StreamSourceManager` directly into `RoomApiMQWorker` 4. Use `keyValueStore` directly to check stream existence Here's how you can fix the current implementation - **Option 4** is the least invasive since you already have `_keyValueStore`: ```javascript /** * Copyright 2025 Phenix Real Time Solutions, Inc. Confidential and Proprietary. All Rights Reserved. */ define([ 'Promise', 'lodash', 'logging/logger', 'lang/assert', 'moment', 'url', 'randomstring', 'lang/Disposable', 'lang/DisposableList', 'mq/PhenixMQ', 'mq/protocol/MQProtocol', 'mq/IMQWorker', 'mq/MQWorker', 'mq/MQWorkerServices', 'mq/MQWorkerService', 'store/kv/IKeyValueStore', 'cloud/RegionService', 'pcast/chat/RoomManager' ], (Promise, _, logger, assert, moment, url, randomstring, Disposable, DisposableList, PhenixMQ, MQProtocol, IMQWorker, MQWorker, MQWorkerServices, MQWorkerService, IKeyValueStore, RegionService, RoomManager) => { 'use strict'; const log = logger.getLogger('pcast/chat/RoomApiMQWorker'); const roomTypes = ['DirectChat', 'MultiPartyChat', 'ModeratedChat', 'TownHall', 'Channel', 'MultiAngleChannel']; const streamKeyLength = 96; const roomSuffixLength = 12; const roomAttributes = ['roomId', 'alias', 'applicationId', 'name', 'description', 'type', 'streamKey', 'created', 'lastUpdate', 'options', 'ingest']; const roomIdRegex = /[^#]+#[^#]+#.*/; const roomKeepAliveInterval = moment.duration(1, 'year'); const namespace = 'roomapi'; const RoomApiMQWorker = IMQWorker.extend({ init(phenixMQ, keyValueStore, regionService, roomManager) { assert.assertInstance('phenixMQ', phenixMQ, PhenixMQ); assert.assertInstance('keyValueStore', keyValueStore, IKeyValueStore); assert.assertInstance('regionService', regionService, RegionService); assert.assertInstance('roomManager', roomManager, RoomManager); this._worker = new MQWorker(namespace, phenixMQ, new MQWorkerServices({ anycast: [ new MQWorkerService('roomapi.Create', proceedIfAuthenticated.bind(this, roomapiCreate.bind(this), {})), new MQWorkerService('roomapi.Get', proceedIfAuthenticated.bind(this, roomapiGet.bind(this), {})), new MQWorkerService('roomapi.GetMembers', proceedIfAuthenticated.bind(this, roomapiGetMembers.bind(this), {allowUnauthorized: true}), { concurrency: 16, maximumRatePerSecond: 4, dispatchTimeInterval: moment.duration(1, 'minutes') }), new MQWorkerService('roomapi.GetStreams', proceedIfAuthenticated.bind(this, roomapiGetStreams.bind(this), {allowUnauthorized: true}), {maximumRatePerSecond: 32}), new MQWorkerService('roomapi.Delete', proceedIfAuthenticated.bind(this, roomapiDelete.bind(this), {})), new MQWorkerService('roomapi.List', proceedIfAuthenticated.bind(this, roomapiList.bind(this), {}), { maximumRatePerSecond: 32, concurrency: 1 }), new MQWorkerService('roomapi.Fork', proceedIfAuthenticated.bind(this, roomapiFork.bind(this), {})), new MQWorkerService('roomapi.Kill', proceedIfAuthenticated.bind(this, roomapiKill.bind(this), {})) ] })); this._phenixMQ = phenixMQ; this._keyValueStore = keyValueStore; this._regionService = regionService; this._roomManager = roomManager; }, processRequest(type, message, context) { return this._worker.processRequest(type, message, context); }, start() { return Promise.try(() => start.call(this)); }, getLoadInfo() { return Promise.try(() => getLoadInfo.call(this)); }, drain() { return this._worker.drain(); }, stop() { return this._worker.stop(); }, toString() { return `RoomApiMQWorker[${this._worker}]`; } }); function start() { return Promise.try(() => this._keyValueStore.createIndex('idx_room_applicationId', 'chat', 'rooms', 'applicationId', 'String')) .then(() => this._worker.start()); } function getCredentials(message) { if (message.credentials) { return message.credentials; } return { applicationId: message.applicationId, secret: message.secret }; } function proceedIfAuthenticated(next, {allowUnauthorized = false}, message, context) { assert.assertFunction('next', next); assert.assertBoolean('allowUnauthorized', allowUnauthorized); const credentials = getCredentials.call(this, message); if (allowUnauthorized && !credentials.applicationId) { return next(message, context, null); } return authenticate.call(this, credentials.applicationId, credentials.secret) .then(isAuthenticated => { if (!isAuthenticated) { return {status: 'unauthorized'}; } return next(message, context, credentials); }); } function authenticate(applicationId, secret) { if (_.isEmpty(applicationId)) { return false; } return this._keyValueStore.fetch('applications', 'definitions', applicationId, ['secret']) .catch(e => { log.warn('[%s] Failed to fetch applications definitions', applicationId, e); return false; }) .then(application => { if (!application || application.secret !== secret) { return false; } return true; }); } function roomapiCreate({room}, context, {applicationId}) { assert.assertObject('room', room); assert.assertStringNotEmpty('applicationId', applicationId); if (room.alias) { assert.assertStringNotEmpty('room.alias', room.alias); } assert.assertString('room.name', room.name); assert.assertString('room.description', room.description); assert.assertEnum('room.type', room.type, roomTypes); assert.assertArrayOfString('room.options', room.options); if (room.name === '') { log.info('[%s] Invalid create room request with empty room.name', applicationId); return {status: 'name-required'}; } if (!_.isUndefined(_.find(room.options, option => _.isEmpty(option)))) { return {status: 'invalid-option'}; } const alias = applicationId + '#' + (room.alias || _.camelCase(room.name)); return this._roomManager.resolveRoomByAlias(alias, roomAttributes, {useCache: false}) .then(record => { if (record) { if (record.type !== room.type) { return {status: 'type-conflict'}; } return { status: 'already-exists', room: { applicationId: record.applicationId, roomId: record.roomId, alias: convertFromInternalAlias.call(this, record.alias), name: record.name, description: record.description, type: record.type, options: record.options, ingestOptions: record.ingest, streamKey: record.streamKey, created: record.created, lastUpdate: record.lastUpdate } }; } const streamKeyOption = room.options.find(option => option.startsWith('streamKey=')) ?.substring('streamKey='.length); const streamKey = streamKeyOption || randomstring.generate(streamKeyLength); const roomId = this._regionService.getOwnRegion() + '#' + applicationId + '#' + _.camelCase(room.name) + '.' + randomstring.generate(roomSuffixLength); const now = _.now(); log.info('[%s] Creating room [%s] with alias [%s]', applicationId, roomId, alias); const storedRoom = { applicationId, roomId, alias, name: room.name, description: room.description, type: room.type, options: room.options, streamKey, messages: 0, created: now, lastUpdate: now }; if (room.ingestOptions) { storedRoom.ingest = room.ingestOptions; } return this._keyValueStore.store('chat', 'rooms', roomId, storedRoom, {ttl: roomKeepAliveInterval}) .then(() => ({ status: 'ok', room: { applicationId: storedRoom.applicationId, roomId: storedRoom.roomId, alias: convertFromInternalAlias.call(this, storedRoom.alias), name: storedRoom.name, description: storedRoom.description, type: storedRoom.type, options: storedRoom.options, ingestOptions: storedRoom.ingest, streamKey: storedRoom.streamKey, created: storedRoom.created, lastUpdate: storedRoom.lastUpdate } })); }); } function getSize(roomId) { assert.assertStringNotEmpty('roomId', roomId); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } const getRoomParticipantInfo = {roomId}; return this._phenixMQ.request('chat.GetRoomParticipantInfo', getRoomParticipantInfo) .then(result => { const payload = result.payload; const status = _.get(payload, ['status'], 'failed'); if (_.get(result, ['status']) !== 'ok') { return {status}; } assert.assertArray('payload.counts', payload.counts); const size = _.reduce(payload.counts, (total, participantCount) => total + participantCount.count, 0); return { status, size }; }); } function roomapiGet({roomId}, context, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertStringNotEmpty('applicationId', applicationId); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } return this._keyValueStore.fetch('chat', 'rooms', roomId, roomAttributes) .then(room => { if (!room) { return {status: 'not-found'}; } if (room.applicationId !== applicationId) { return {status: 'unauthorized'}; } return getSize.call(this, room.roomId) .then(getSizeResponse => { if (getSizeResponse.status !== 'ok') { log.info('[%s] [%s] Fetching room size failed with response [%j]', applicationId, roomId, getSizeResponse); return {status: _.get(getSizeResponse, ['status'], 'failed')}; } assert.assertNumber('getSizeResponse.size', getSizeResponse.size); return { status: 'ok', room: { applicationId: room.applicationId, roomId: room.roomId, alias: convertFromInternalAlias.call(this, room.alias), name: room.name, description: room.description, type: room.type, options: room.options, ingestOptions: room.ingest, streamKey: room.streamKey, created: room.created, lastUpdate: room.lastUpdate, estimatedSize: getSizeResponse.size } }; }); }) .catch({code: 'invalid-cluster'}, () => ({status: 'not-found'})); } function roomapiGetMembers({roomId}, {sentTime}, credentials) { assert.assertStringNotEmpty('roomId', roomId); assert.assertMoment('sentTime', sentTime); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } const members = {}; const onData = record => { members[record.sessionId] = record; }; return Promise.all([ this._keyValueStore.fetch('chat', 'rooms', roomId, roomAttributes), this._keyValueStore.scan('members', 'room', onData, { bins: ['sessionId', 'screenName', 'role', 'streams', 'state', 'lastUpdate'], query: {roomId}, fromAllClusters: true, reentrant: true, offload: false }) ]) .spread(room => { if (!room) { return {status: 'not-found'}; } if (credentials && credentials.applicationId !== room.applicationId) { return {status: 'unauthorized'}; } return { status: 'ok', room: { applicationId: room.applicationId, roomId: room.roomId, alias: convertFromInternalAlias.call(this, room.alias), name: room.name, description: room.description, type: room.type, options: room.options, ingestOptions: room.ingest, created: room.created, lastUpdate: room.lastUpdate }, members: _.values(members) }; }) .catch({code: 'invalid-cluster'}, () => ({status: 'not-found'})); } function roomapiGetStreams({roomId, role, state, screenName, capability}, {sentTime}, credentials) { assert.assertMoment('sentTime', sentTime); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } return roomapiGetMembers.call(this, {roomId}, {sentTime}, credentials) .then(({status, room, members}) => { if (status !== 'ok') { return {status}; } assert.assertArrayOfObject('members', members); assert.assertObject('room', room); let candidateMembers = members; if (role) { candidateMembers = _.filter(candidateMembers, {role}); } if (state) { candidateMembers = _.filter(candidateMembers, {state}); } if (screenName) { candidateMembers = _.filter(candidateMembers, {screenName}); } candidateMembers = _.sortBy(candidateMembers, ['lastUpdate']); let streamUris = _.reduce(candidateMembers, (streams, member) => { _.forEach(member.streams, ({uri}) => { streams.push(uri); }); return streams; }, []); if (capability) { streamUris = _.filter(streamUris, streamUri => { switch (capability) { case 'RealTime': return true; case 'RTMP': return _.includes(streamUri, 'capabilities=') && _.includes(streamUri, 'rtmp'); case 'HLS': case 'DASH': return (_.includes(streamUri, 'capabilities=') && _.includes(streamUri, 'streaming')) || _.includes(streamUri, 'streamTokenForLiveStream'); default: return true; } }); } const streamIds = _.map(streamUris, streamUri => { const parsedStreamUri = url.parse(streamUri.replace(/[#]/g, '%23')); const streamId = parsedStreamUri.pathname.replace(/%23/g, '#') .slice(1); return streamId; }); return { status: 'ok', room, streamIds }; }); } function roomapiList({type} /* , order, limit, cursor */, context, {applicationId}) { assert.assertStringNotEmpty('type', type); assert.assertStringNotEmpty('applicationId', applicationId); const rooms = []; const dataCallback = room => { if (room.type === type) { rooms.push(room); } }; return this._keyValueStore.scan('chat', 'rooms', dataCallback, { bins: roomAttributes, query: {applicationId}, fromAllClusters: true, reentrant: true, offload: true }) .then(() => ({ status: 'ok', rooms: _.map(rooms, room => { room.alias = convertFromInternalAlias.call(this, room.alias); room.ingestOptions = room.ingest; delete room.ingest; return room; }) })) .catch(e => { log.warn('[%s] Failed to list rooms', applicationId, e); }); } function roomapiDelete({roomId}, context, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertStringNotEmpty('applicationId', applicationId); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } return this._keyValueStore.fetch('chat', 'rooms', roomId, ['applicationId']) .then(room => { if (!room) { return {status: 'not-found'}; } if (room.applicationId !== applicationId) { return {status: 'unauthorized'}; } log.info('[%s] Deleting room [%s]', applicationId, roomId); return this._keyValueStore.remove('chat', 'rooms', roomId) .then(removed => { if (!removed) { log.warn('[%s] Failed to delete room [%s]', applicationId, roomId); return {status: 'failed'}; } return {status: 'ok'}; }); }) .catch({code: 'invalid-cluster'}, () => ({status: 'not-found'})); } function roomapiFork({roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion}, {sentTime}, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertStringNotEmpty('sourceRoomId', sourceRoomId); assert.assertArrayOfString('options', options); assert.assertArrayOfString('streamCapabilities', streamCapabilities); assert.assertArrayOfString('streamTags', streamTags); assert.assertMoment('sentTime', sentTime); assert.assertStringNotEmpty('applicationId', applicationId); log.info('[%s] [%s] Forking room from room [%s] with options [%s]', applicationId, roomId, sourceRoomId, options); if (!roomIdRegex.test(roomId)) { return Promise.resolve({status: 'invalid-room-id'}); } if (!roomIdRegex.test(sourceRoomId)) { return Promise.resolve({status: 'invalid-room-id'}); } return Promise.all([ roomapiGetMembers.call(this, {roomId}, {sentTime}), roomapiGetMembers.call(this, {roomId: sourceRoomId}, {sentTime}) ]) .spread((room, sourceRoom) => { if (_.get(room, ['status']) !== 'ok') { return {status: _.get(room, ['status'], 'no-get-channel-members-status')}; } if (_.get(sourceRoom, ['status']) !== 'ok') { return {status: _.get(sourceRoom, ['status'], 'no-get-channel-members-status')}; } if (_.get(room, ['room', 'applicationId']) !== applicationId) { return {status: 'unauthorized'}; } if (_.get(sourceRoom, ['room', 'applicationId']) !== applicationId) { return {status: 'unauthorized'}; } let departingMembers = room.members; let newMembers = sourceRoom.members; if (_.includes(options, 'additive')) { departingMembers = _.filter(departingMembers, currentMember => _.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); } if (!_.includes(options, 'force')) { departingMembers = _.filter(departingMembers, currentMember => !_.find(sourceRoom.members, newMember => newMember.sessionId === currentMember.sessionId)); newMembers = _.filter(newMembers, candidateMember => !_.find(room.members, existingMember => existingMember.sessionId === candidateMember.sessionId)); } // Check for stale streams in existing members and refresh them if 'refresh-stale' option is set const checkAndRefreshStaleMembers = () => { if (!_.includes(options, 'refresh-stale')) { return Promise.resolve(); } // Find members that exist in both rooms (already forked but potentially stale) const existingMembersInBoth = _.filter(room.members, existingMember => _.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId)); return Promise.map(existingMembersInBoth, existingMember => { if (_.isEmpty(existingMember.streams)) { return null; } // Check if any of the member's streams are stale return Promise.map(existingMember.streams, stream => { const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); const streamId = parsedUri.pathname.replace(/%23/g, '#').slice(1); return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending']) .then(sourceInfo => ({ stream, streamId, isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending })) .catch(() => ({stream, streamId, isStale: true})); }) .then(streamStatuses => { const staleStreams = _.filter(streamStatuses, {isStale: true}); if (_.isEmpty(staleStreams)) { return null; // All streams are healthy } log.info('[%s] [%s] Member [%s] has [%d] stale streams, will refresh', applicationId, roomId, existingMember.sessionId, staleStreams.length); // Find the corresponding source member to get fresh stream URIs const sourceMember = _.find(sourceRoom.members, m => m.sessionId === existingMember.sessionId); if (!sourceMember) { return null; } // Add to newMembers so they get re-processed with fresh mirrors newMembers.push(sourceMember); // Destroy the stale streams to clean up return Promise.each(staleStreams, ({streamId}) => { log.info('[%s] [%s] Destroying stale stream [%s] for member [%s]', applicationId, roomId, streamId, existingMember.sessionId); return this._phenixMQ.request('stream.DestroyStream', { streamId, reason: 'stale-stream-refresh' }).catch(() => {}); // Ignore errors - stream might already be gone }); }); }) .then(() => { // Remove duplicates in case member was already in newMembers newMembers = _.uniqBy(newMembers, 'sessionId'); }); }; return checkAndRefreshStaleMembers() .then(() => Promise.map(departingMembers, member => { const leaveRoom = { roomId, sessionId: member.sessionId, timestamp: _.now() }; return this._phenixMQ.request('chat.LeaveRoom', leaveRoom) .then(leaveRoomResponse => { if (_.get(leaveRoomResponse, ['status']) !== 'ok') { return leaveRoomResponse; } if (_.includes(options, 'keep-streams')) { return leaveRoomResponse; } if (_.isEmpty(member.streams)) { return leaveRoomResponse; } return Promise.each(member.streams, stream => { const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); const streamId = parsedUri.pathname.replace(/%23/g, '#') .slice(1); const reason = 'terminated-by-fork'; log.info('[%s] [%s] Terminating stream [%s] during forking of room with reason [%s]', applicationId, roomId, streamId, reason); const destroyStream = { streamId, reason }; return this._phenixMQ.request('stream.DestroyStream', destroyStream) .then(destroyStreamResponse => { if (_.get(destroyStreamResponse, ['status']) !== 'ok') { log.warn('[%s] [%s] Terminating stream [%s] during forking of room failed [%j]', applicationId, roomId, streamId, destroyStreamResponse) ; } }); }) .then(() => leaveRoomResponse); }); }) .then(leaveRoomResponses => { const failedLeaveRoomResponse = _.find(leaveRoomResponses, response => response.status !== 'ok'); if (failedLeaveRoomResponse) { return failedLeaveRoomResponse; } return Promise.map(newMembers, member => { const joinRoom = { roomId, sessionId: member.sessionId, member: _.clone(member), options: ['reenter'], timestamp: _.now() }; return Promise.map(joinRoom.member.streams, stream => { const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); // Only provide capabilities list. In particular, drop wildcard token from origin. parsedUri.query = {capabilities: streamCapabilities.join(',')}; parsedUri.search = undefined; const originStreamId = parsedUri.pathname.replace(/%23/g, '#') .slice(1); const streamMirror = { originStreamId, capabilities: _.compact(streamCapabilities), tags: _.union(_.compact(streamTags), ['x-phenix-fork']), options: [], region: desiredRegion }; if (room.room.type === 'Channel' || room.room.type === 'MultiAngleChannel') { streamMirror.tags.push(`channelId:${roomId}`); streamMirror.tags.push(`channelAlias:${room.room.alias}`); } else { streamMirror.tags.push(`roomId:${roomId}`); streamMirror.tags.push(`roomAlias:${room.room.alias}`); } return this._phenixMQ.request('stream.CreateMirror', streamMirror) .then(createMirrorResponse => { switch (createMirrorResponse.status) { case 'ok': break; case 'origin-stream-ended': log.info('[%s] [%s] Mirroring stream [%s] failed [%j]', applicationId, roomId, originStreamId, createMirrorResponse); return {status: createMirrorResponse.status}; default: log.warn('[%s] [%s] Mirroring stream [%s] failed [%j]', applicationId, roomId, originStreamId, createMirrorResponse); return {status: createMirrorResponse.status}; } const streamId = createMirrorResponse.payload.mirroredStreamId; assert.assertStringNotEmpty('streamId', streamId); parsedUri.pathname = '/' + streamId.replace(/[#]/g, '%23'); stream.uri = url.format(parsedUri) .replace(/%23/g, '#'); return createMirrorResponse; }); }) .then(createMirrorResponses => { const failedCreateMirrorResponse = _.find(createMirrorResponses, response => _.get(response, ['status']) !== 'ok'); if (failedCreateMirrorResponse) { return failedCreateMirrorResponse; } return this._phenixMQ.request('chat.JoinRoom', joinRoom); }); }) .then(joinRoomResponses => { const failedJoinRoomResponse = _.find(joinRoomResponses, response => _.get(response, ['status']) !== 'ok'); if (failedJoinRoomResponse) { switch (failedJoinRoomResponse.status) { case 'origin-stream-ended': log.info('[%s] [%s] Join room failed [%j]', applicationId, roomId, joinRoomResponses); return {status: failedJoinRoomResponse.status}; default: log.warn('[%s] [%s] Join room failed [%j]', applicationId, roomId, joinRoomResponses); return {status: failedJoinRoomResponse.status}; } } return roomapiGetMembers.call(this, {roomId}, {sentTime}); }) .then(updatedRoom => { if (_.get(updatedRoom, ['status']) !== 'ok') { return {status: _.get(updatedRoom, ['status'], 'no-get-channel-members-status')}; } return { status: 'ok', members: updatedRoom.members }; }); })); }) .then(response => { switch (response.status) { case 'ok': break; case 'not-found': log.info('[%s] [%s] Forking from room [%s] failed (streamCapabilities=[%s],streamTags=[%s],options=[%s],desiredRegion=[%s]) with response [%j]', applicationId, roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion, response); break; default: log.warn('[%s] [%s] Forking from room [%s] failed (streamCapabilities=[%s],streamTags=[%s],options=[%s],desiredRegion=[%s]) with response [%j]', applicationId, roomId, sourceRoomId, streamCapabilities, streamTags, options, desiredRegion, response); break; } return response; }); } function roomapiKill({roomId, reason, options}, {sentTime}, {applicationId}) { assert.assertStringNotEmpty('roomId', roomId); assert.assertString('reason', reason); assert.assertArrayOfString('options', options); assert.assertMoment('sentTime', sentTime); assert.assertStringNotEmpty('applicationId', applicationId); log.info('[%s] [%s] Killing room with reason [%s] and options [%s]', applicationId, roomId, reason, options); return roomapiGetMembers.call(this, {roomId}, {sentTime}) .then(getChannelMembersResponse => { if (_.get(getChannelMembersResponse, ['status']) !== 'ok') { return {status: _.get(getChannelMembersResponse, ['status'], 'no-get-channel-members-status')}; } if (getChannelMembersResponse.room.applicationId !== applicationId) { return {status: 'unauthorized'}; } return Promise.map(getChannelMembersResponse.members, member => { const leaveRoom = { roomId, sessionId: member.sessionId, timestamp: _.now() }; return Promise.map(member.streams, stream => { if (_.includes(options, 'keep-streams')) { return {status: 'ok'}; } const parsedUri = url.parse(stream.uri.replace(/[#]/g, '%23')); const streamId = parsedUri.pathname.replace(/%23/g, '#') .slice(1); log.info('[%s] [%s] Killing stream [%s] with reason [%s]', applicationId, roomId, streamId, reason); const destroyStream = { streamId, reason }; return this._phenixMQ.request('stream.DestroyStream', destroyStream) .then(destroyStreamResponse => { if (_.get(destroyStreamResponse, ['status']) !== 'ok') { log.warn('[%s] [%s] Destroying stream [%s] failed [%j]', applicationId, roomId, streamId, destroyStreamResponse); } return destroyStreamResponse; }); }) .then(destroyStreamResponses => { const failedDestroyStreamResponse = _.find(destroyStreamResponses, response => _.get(response, ['status']) !== 'ok'); if (_.includes(options, 'destroy-required') && failedDestroyStreamResponse) { log.warn('[%s] [%s] Killing of stream failed [%j]', applicationId, roomId, destroyStreamResponses); return failedDestroyStreamResponse; } return this._phenixMQ.request('chat.LeaveRoom', leaveRoom); }); }) .then(leaveRoomResponses => { const failedLeaveRoomResponse = _.find(leaveRoomResponses, response => response.status !== 'ok'); if (failedLeaveRoomResponse) { log.warn('[%s] [%s] Leaving room failed [%j]', applicationId, roomId, leaveRoomResponses); return failedLeaveRoomResponse; } return { status: 'ok', killedMembers: getChannelMembersResponse.members }; }); }); } function convertFromInternalAlias(aliasWithApplicationId) { if (!aliasWithApplicationId) { return aliasWithApplicationId; } assert.assertStringNotEmpty('aliasWithApplicationId', aliasWithApplicationId); const idx = aliasWithApplicationId.indexOf('#'); return aliasWithApplicationId.substring(idx + 1); } function getLoadInfo() { return Promise.all([ this._worker.getLoadInfo(), this._roomManager.getRoomIdByAliasCacheUsage() .then(({pendingOperationsCounter}) => _.map(pendingOperationsCounter, (count, operationName) => ({ name: `cache/room/alias/room/id/pending/${operationName}`, value: count, unit: 'Count', flags: ['drainable'] }))), this._roomManager.getRoomMembersByRoomIdCacheUsage() .then(({pendingOperationsCounter}) => _.map(pendingOperationsCounter, (count, operationName) => ({ name: `cache/room/id/room/members/pending/${operationName}`, value: count, unit: 'Count', flags: ['drainable'] }))) ]) .then(loadInfos => loadInfos.flat()); } return RoomApiMQWorker; }); ``` Now let me check for any lint errors: The fix is in place. Now here's how the **`refresh-stale`** option works for your use case: --- ## How to Use It Call fork every 10 seconds with the `refresh-stale` option: ```javascript const forkRequest = { roomId: destinationChannelId, sourceRoomId: sourceChannelId, streamCapabilities: ['streaming', 'hd'], streamTags: [], options: ['refresh-stale'], // <-- This is the key option desiredRegion: 'us-west' }; // Call this every 10 seconds phenixMQ.request('roomapi.Fork', forkRequest); ``` Or via REST API: ```bash PUT /channel/{destinationChannelId}/fork/{sourceChannelId} { "options": ["refresh-stale"], "streamCapabilities": ["streaming", "hd"] } ``` --- ## What Happens Now | Scenario | Behavior | |----------|----------| | Member not in destination | Gets added with new mirrored streams | | Member in destination with **healthy** streams | **Skipped** - no action | | Member in destination with **stale** streams | Stale streams destroyed, fresh mirrors created | --- ## The Flow ``` Fork Request with 'refresh-stale' │ ▼ ┌─────────────────────────────────────────┐ │ 1. Get members from both rooms │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 2. For each member in BOTH rooms: │ │ - Check each stream in destination │ │ - Query keyValueStore for stream │ │ - If stream missing/ended/ending │ │ → Mark as STALE │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 3. For stale members: │ │ - Destroy stale streams │ │ - Add to "newMembers" list │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 4. Process newMembers: │ │ - Create fresh mirrors from source │ │ - Update member with new stream URIs │ │ - 'reenter' updates existing record │ └─────────────────────────────────────────┘ ``` --- ## Key Code Section ```562:624:platform/src/pcast/chat/RoomApiMQWorker.js // Check for stale streams in existing members and refresh them if 'refresh-stale' option is set const checkAndRefreshStaleMembers = () => { if (!_.includes(options, 'refresh-stale')) { return Promise.resolve(); } // Find members that exist in both rooms (already forked but potentially stale) const existingMembersInBoth = _.filter(room.members, existingMember => _.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId)); return Promise.map(existingMembersInBoth, existingMember => { // ... check each stream ... return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending']) .then(sourceInfo => ({ stream, streamId, isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending })) // ... if stale, destroy and add to newMembers ... }); }; ``` --- ## Important Notes 1. **Source stream must still be alive** - If the origin stream in the source channel also died, creating a new mirror will fail with `origin-stream-ended` 2. **No duplicate processing** - The `_.uniqBy(newMembers, 'sessionId')` ensures a member isn't processed twice 3. **Stale stream cleanup** - Old stale streams are explicitly destroyed with reason `stale-stream-refresh` 4. **10 second interval is reasonable** - The check is lightweight (just a keyValueStore fetch per stream), so frequent polling is fine ```javascript const forkRequest = { roomId: destinationChannelId, sourceRoomId: sourceChannelId, streamCapabilities: ['streaming', 'hd'], streamTags: [], options: ['refresh-stale'], // <-- This is the key option desiredRegion: 'us-west' }; // Call this every 10 seconds phenixMQ.request('roomapi.Fork', forkRequest); ``` ```javascript const forkRequest = { roomId: destinationChannelId, sourceRoomId: sourceChannelId, streamCapabilities: ['streaming', 'hd'], streamTags: [], options: ['refresh-stale'], // <-- This is the key option desiredRegion: 'us-west' }; // Call this every 10 seconds phenixMQ.request('roomapi.Fork', forkRequest); ``` ```shellscript PUT /channel/{destinationChannelId}/fork/{sourceChannelId} { "options": ["refresh-stale"], "streamCapabilities": ["streaming", "hd"] } ``` ```plaintext Fork Request with 'refresh-stale' │ ▼ ┌─────────────────────────────────────────┐ │ 1. Get members from both rooms │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 2. For each member in BOTH rooms: │ │ - Check each stream in destination │ │ - Query keyValueStore for stream │ │ - If stream missing/ended/ending │ │ → Mark as STALE │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 3. For stale members: │ │ - Destroy stale streams │ │ - Add to "newMembers" list │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 4. Process newMembers: │ │ - Create fresh mirrors from source │ │ - Update member with new stream URIs │ │ - 'reenter' updates existing record │ └─────────────────────────────────────────┘ ``` ```javascript // Check for stale streams in existing members and refresh them if 'refresh-stale' option is set const checkAndRefreshStaleMembers = () => { if (!_.includes(options, 'refresh-stale')) { return Promise.resolve(); } // Find members that exist in both rooms (already forked but potentially stale) const existingMembersInBoth = _.filter(room.members, existingMember => _.find(sourceRoom.members, sourceMember => sourceMember.sessionId === existingMember.sessionId)); return Promise.map(existingMembersInBoth, existingMember => { // ... check each stream ... return this._keyValueStore.fetch('streams', 'source', streamId, ['streamId', 'ended', 'ending']) .then(sourceInfo => ({ stream, streamId, isStale: !sourceInfo || !sourceInfo.streamId || sourceInfo.ended || sourceInfo.ending })) // ... if stale, destroy and add to newMembers ... }); }; ``` --- **User** what does reeenter do? --- **Cursor** Let me find the `reenter` implementation: The **`reenter`** option in `chat.JoinRoom` allows a member to **update their existing membership record** without causing errors or duplicate entries. Here's what it does: --- ## What `reenter` Does ```298:309:platform/src/pcast/chat/RoomMQWorker.js if (_.includes(options, 'reenter')) { actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey) .then(removed => { if (removed) { log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role); if (member.role === 'Audience') { return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]); } } })); } ``` ### Key behaviors: 1. **Overwrites the existing member record** - The `store()` call at line 285 writes the new member data, replacing any existing record with the same `sessionId|roomId` key 2. **Handles role changes** - If the member was previously in a different role set (e.g., was `Audience`, now becoming `Presenter`), it removes them from the old set: - Members are stored in either `members/room` (for Participant, Moderator, Presenter) or `members/audience` - `reenter` removes them from the *other* set if they existed there 3. **Broadcasts appropriate events** - If the member was in the `room` set and is now becoming `Audience`, it broadcasts a `MemberLeft` event --- ## Without `reenter` vs With `reenter` | Scenario | Without `reenter` | With `reenter` | |----------|-------------------|----------------| | Member already in room | Could create duplicate/inconsistent state | Safely overwrites existing record | | Member changing roles | Old record in wrong set remains | Old record cleaned up | | Updating stream URIs | Record updated but no cleanup | Record updated with cleanup | --- ## Why Fork Uses `reenter` In the fork code: ```683:685:platform/src/pcast/chat/RoomApiMQWorker.js const joinRoom = { roomId, sessionId: member.sessionId, member: _.clone(member), options: ['reenter'], timestamp: _.now() }; ``` Fork uses `reenter` because: 1. **The member might already exist** in the destination room (from a previous fork) 2. **The member's streams are being replaced** with new mirrored streams 3. **We want to update, not duplicate** - `reenter` ensures the member record is cleanly replaced with the new stream URIs --- ## Visual Flow ``` Member A exists in destination room with stale stream M1 │ ▼ Fork with 'refresh-stale' │ ▼ Stale stream M1 detected & destroyed │ ▼ New mirror M2 created from source │ ▼ JoinRoom with options: ['reenter'] │ ▼ ┌─────────────────────────────────────────┐ │ keyValueStore.store() overwrites │ │ member record with new stream URI (M2) │ └─────────────────────────────────────────┘ │ ▼ Member A now has fresh stream M2 ``` Without `reenter`, the member would need to explicitly leave and rejoin, which would trigger `MemberLeft`/`MemberJoined` events and potentially disrupt connected clients. ```javascript if (_.includes(options, 'reenter')) { actions.push(this._keyValueStore.remove('members', otherMemberSet, recordKey) .then(removed => { if (removed) { log.info('[%s] Member changed role [%s] during reentry', sessionId, member.role); if (member.role === 'Audience') { return broadcastRoomEventGlobally.call(this, roomId, 'MemberLeft', [member]); } } })); } ``` ```javascript const joinRoom = { roomId, sessionId: member.sessionId, member: _.clone(member), options: ['reenter'], timestamp: _.now() }; ``` ```plaintext Member A exists in destination room with stale stream M1 │ ▼ Fork with 'refresh-stale' │ ▼ Stale stream M1 detected & destroyed │ ▼ New mirror M2 created from source │ ▼ JoinRoom with options: ['reenter'] │ ▼ ┌─────────────────────────────────────────┐ │ keyValueStore.store() overwrites │ │ member record with new stream URI (M2) │ └─────────────────────────────────────────┘ │ ▼ Member A now has fresh stream M2 ```