2025-11-25
This commit is contained in:
BIN
PhenixRTS/.DS_Store
vendored
Normal file
BIN
PhenixRTS/.DS_Store
vendored
Normal file
Binary file not shown.
20
PhenixRTS/chat/Demo.md
Normal file
20
PhenixRTS/chat/Demo.md
Normal file
@@ -0,0 +1,20 @@
|
||||
|
||||
```sh
|
||||
curl https://pcast-stg.phenixrts.com/pcast/channel/us-central%23phenixrts.com-alex.zinn%23frontendWebsocket.LnQnpYeK26hH/message \
|
||||
-u "${APPLICATION_ID}:${SECRET_STG}" \
|
||||
-H "Accept: application/json" \
|
||||
-H "Content-Type: application/json" \
|
||||
-X PUT \
|
||||
-d '{
|
||||
"message": {
|
||||
"from": {
|
||||
"screenName": "Me",
|
||||
"role": "Moderator",
|
||||
"lastUpdate": 0
|
||||
},
|
||||
"mimeType": "text/plain",
|
||||
"message": "This is my chat message",
|
||||
"tags": ["my-tag", "my-other-tag"]
|
||||
}
|
||||
}'
|
||||
```
|
||||
227
PhenixRTS/frontend-draining/19 days draining.md
Normal file
227
PhenixRTS/frontend-draining/19 days draining.md
Normal file
@@ -0,0 +1,227 @@
|
||||
Hostname: `frontend-us-northeast-3-vm4w`
|
||||
InstanceId: `us-northeast#us-east4-c.Iqb8nNAA`
|
||||
|
||||
|
||||
```SQL
|
||||
DECLARE
|
||||
hostName STRING DEFAULT "frontend-us-northeast-3-vm4w";
|
||||
DECLARE
|
||||
lookbackDays INT64 DEFAULT 41;
|
||||
DECLARE
|
||||
start_time TIMESTAMP DEFAULT TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -lookbackDays DAY);
|
||||
|
||||
-----------------------------------------------------------------
|
||||
-- Step 1: Find the most recent log message indicating draining connections for a specific host
|
||||
WITH LatestDrainLog AS (
|
||||
SELECT Message
|
||||
FROM `phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp > start_time
|
||||
AND Facility = 'platform'
|
||||
AND HostName = hostName
|
||||
AND Message LIKE 'Websocket connectionids preventing drain%'
|
||||
ORDER BY Timestamp DESC LIMIT 1
|
||||
-- Step 2: Extract all connection IDs from that single log message
|
||||
DrainingConnectionIds AS (
|
||||
SELECT connectionId
|
||||
FROM
|
||||
LatestDrainLog,
|
||||
UNNEST(REGEXP_EXTRACT_ALL(Message, r"'([^']*)'")) AS connectionId )
|
||||
),
|
||||
-- Step 3: Find all logs that associate sessions with connections
|
||||
SessionConnections AS (
|
||||
Timestamp
|
||||
SELECT
|
||||
REGEXP_EXTRACT(Message, r'\] \[(.*?)\] Session started with connection') AS sessionId,
|
||||
REGEXP_EXTRACT(Message, r'connection \[(.*?)\] and roles') AS connectionId,
|
||||
FROM `phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE Timestamp > start_time
|
||||
AND Facility = 'platform'
|
||||
AND Message LIKE '%Session started with connection%'
|
||||
AND REGEXP_EXTRACT(Message, r'\] \[(.*?)\] Session started with connection') IS NOT NULL
|
||||
AND REGEXP_EXTRACT(Message, r'connection \[(.*?)\] and roles') IS NOT NULL
|
||||
),
|
||||
-- Pattern 2: "Session [sessionId] has a new connection [connectionId], previously [oldConnectionId]"
|
||||
SessionNewConnections AS (
|
||||
SELECT
|
||||
Timestamp
|
||||
REGEXP_EXTRACT(Message, r'Session \[(.*?)\] has a new connection') AS sessionId,
|
||||
REGEXP_EXTRACT(Message, r'connection \[(.*?)\], previously') AS connectionId
|
||||
FROM `phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp > start_time
|
||||
AND Facility = 'platform'
|
||||
AND Message LIKE '%Session%has a new connection%'
|
||||
AND REGEXP_EXTRACT(Message, r'Session \[(.*?)\] has a new connection') IS NOT NULL
|
||||
AND REGEXP_EXTRACT(Message, r'connection \[(.*?)\], previously') IS NOT NULL
|
||||
AllSessionConnections AS (
|
||||
SELECT Timestamp, sessionId, connectionId
|
||||
FROM SessionConnections
|
||||
UNION DISTINCT
|
||||
SELECT Timestamp, sessionId, connectionId
|
||||
FROM SessionNewConnections )
|
||||
)
|
||||
SELECT *
|
||||
FROM AllSessionConnections
|
||||
ORDER BY Timestmap
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
------
|
||||
|
||||
```SQL
|
||||
DECLARE TargetHostName STRING DEFAULT "frontend-us-northeast-3-vm4w";
|
||||
DECLARE TargetInstanceId STRING DEFAULT "us-northeast#us-east4-c.Iqb8nNAA";
|
||||
DECLARE TargetConnectionId STRING DEFAULT "us-central#ZQiUCdymrZHbmeF12NhUZQ8xZZXxviWD";
|
||||
------------
|
||||
With ConnectionIdsPreventingDrain AS (
|
||||
SELECT
|
||||
Message
|
||||
FROM
|
||||
`phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp > TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -2 MINUTE)
|
||||
AND Facility = 'platform'
|
||||
AND HostName = hostName
|
||||
AND Message LIKE 'Websocket connectionids preventing drain%'
|
||||
ORDER BY
|
||||
Timestamp DESC
|
||||
LIMIT
|
||||
1
|
||||
)
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
```SQL
|
||||
DECLARE HostName STRING DEFAULT "frontend-us-northeast-3-vm4w";
|
||||
|
||||
CREATE TEMPORARY FUNCTION GET_METRIC_VALUE(statusJson STRING, metricName STRING) RETURNS FLOAT64 AS (
|
||||
COALESCE(
|
||||
(
|
||||
SELECT
|
||||
CAST(
|
||||
JSON_EXTRACT_SCALAR(metric, '$.value') AS FLOAT64
|
||||
)
|
||||
FROM
|
||||
UNNEST(
|
||||
JSON_EXTRACT_ARRAY(JSON_EXTRACT(statusJson, '$.load'))
|
||||
) AS metric
|
||||
WHERE
|
||||
JSON_EXTRACT_SCALAR(metric, '$.name') = metricName
|
||||
LIMIT
|
||||
1
|
||||
),
|
||||
0
|
||||
)
|
||||
);
|
||||
-- WITH LatestInstanceMetricForHost AS (
|
||||
SELECT
|
||||
Timestamp,
|
||||
Status,
|
||||
(GET_METRIC_VALUE(Status, 'uptime/os/seconds') / 3600 ) AS UptimeHours,
|
||||
(GET_METRIC_VALUE(Status, 'status/seconds') / 3600 ) AS DrainingHours
|
||||
FROM `phenix-pcast.pcast.InstanceMetrics`
|
||||
WHERE Timestamp > TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 MINUTE)
|
||||
AND Hostname = Hostname
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY InstanceId ORDER BY Timestamp DESC) = 1
|
||||
ORDER BY Timestamp DESC
|
||||
LIMIT 1
|
||||
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
```SQL
|
||||
DECLARE TargetInstanceId STRING DEFAULT "us-northeast#us-east4-c.Iqb8nNAA";
|
||||
|
||||
CREATE TEMPORARY FUNCTION GET_METRIC_VALUE(statusJson STRING, metricName STRING) RETURNS FLOAT64 AS (
|
||||
COALESCE(
|
||||
(
|
||||
SELECT
|
||||
CAST(
|
||||
JSON_EXTRACT_SCALAR(metric, '$.value') AS FLOAT64
|
||||
)
|
||||
FROM
|
||||
UNNEST(
|
||||
JSON_EXTRACT_ARRAY(JSON_EXTRACT(statusJson, '$.load'))
|
||||
) AS metric
|
||||
WHERE
|
||||
JSON_EXTRACT_SCALAR(metric, '$.name') = metricName
|
||||
LIMIT
|
||||
1
|
||||
),
|
||||
0
|
||||
|
||||
|
||||
)
|
||||
);
|
||||
|
||||
SELECT
|
||||
Timestamp,
|
||||
Status,
|
||||
InstanceId,
|
||||
HostName,
|
||||
Health,
|
||||
HealthAlert,
|
||||
FORMAT('%.2f', (GET_METRIC_VALUE(Status, 'uptime/os/seconds') / 3600 )) AS UptimeHours,
|
||||
FORMAT('%.2f', (GET_METRIC_VALUE(Status, 'status/seconds') / 3600 )) AS DrainingHours,
|
||||
GET_METRIC_VALUE(Status, 'connections/open') AS connectionsOpen,
|
||||
GET_METRIC_VALUE(Status, 'clients') AS clients,
|
||||
GET_METRIC_VALUE(Status, 'clients/subscriptions') AS clientsSubscriptions,
|
||||
GET_METRIC_VALUE(Status, 'clients/replay/events') AS clientsReplayEvents,
|
||||
GET_METRIC_VALUE(Status, 'mq/incoming/pending') AS mqIncomingPending,
|
||||
GET_METRIC_VALUE(Status, 'mq/outgoing/pending') AS mqOutgoingPending,
|
||||
GET_METRIC_VALUE(Status, 'mq/incoming/rate') AS mqIncomingRate,
|
||||
|
||||
FROM `phenix-pcast.pcast.InstanceMetrics`
|
||||
WHERE Timestamp > TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 MINUTE)
|
||||
AND InstanceId = TargetInstanceId
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY InstanceId ORDER BY Timestamp DESC) = 1
|
||||
ORDER BY Timestamp DESC
|
||||
LIMIT 1
|
||||
|
||||
```
|
||||
|
||||
|
||||
Using
|
||||
```SQL
|
||||
SELECT
|
||||
Timestamp,
|
||||
Category,
|
||||
Severity,
|
||||
Message,
|
||||
HostName,
|
||||
Region,
|
||||
Zone,
|
||||
FROM
|
||||
`phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp > TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -90 DAY)
|
||||
|
||||
AND Facility = 'platform'
|
||||
AND Service = 'frontend'
|
||||
AND Message LIKE "%Drain instance%"
|
||||
AND HostName = 'frontend-us-northeast-3-vm4w'
|
||||
ORDER BY
|
||||
Timestamp
|
||||
|
||||
```
|
||||
|
||||
`HostName`: `frontend-us-northeast-3-vm4w`
|
||||
|
||||
Went into draining
|
||||
`2025-11-03 19:49:37.012998 UTC` - `[us-northeast#us-east4-c.Iqb8nNAA] Drain instance (undoable=[false])`
|
||||
|
||||
`Skipping ping as previous ping is still pending since [1760474289916]`
|
||||
1760474289916 --> `2025-10-14T20:38:09.916Z`
|
||||
|
||||
|
||||
37
PhenixRTS/frontend-draining/Potential causes.md
Normal file
37
PhenixRTS/frontend-draining/Potential causes.md
Normal file
@@ -0,0 +1,37 @@
|
||||
|
||||
- Close path can loop forever if peer never finishes closing. When `close()` is called we just invoke `socket.close()` and wait for a future `disconnectDelegate` If the FIN/ACK never arrives, we keep the entry and never fall back to terminate() (despite adding that method). Consider scheduling a timeout so that a close automatically escalates to terminate() and disconnectDelegate cleanup.
|
||||
|
||||
|
||||
```sql
|
||||
-- Check for send errors during drain
|
||||
SELECT
|
||||
timestamp,
|
||||
message
|
||||
FROM
|
||||
`phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp > TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -10 DAY)
|
||||
AND Facility = 'platform'
|
||||
AND HostName = "frontend-australia-southeast-1-k3ms"
|
||||
AND (
|
||||
message LIKE '%Failed to send%'
|
||||
OR message LIKE '%not opened%'
|
||||
OR message LIKE '%not found%'
|
||||
)
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 100;
|
||||
```
|
||||
|
||||
|
||||
| id | Timestamp | Message | Hostname | Thread |
|
||||
| :-- | :----------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :---------------------------------- | :----- |
|
||||
| 0 | 2025-11-22 14:09:32.453398 UTC | [australia-southeast#WOkAeFYOsT3gWMwVbTLpcHYqIEYttN9k] Rejecting message [chat.RoomEvent] for closed client: Client MQ adapter for australia-southeast#WOkAeFYOsT3gWMwVbTLpcHYqIEYttN9k not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 1 | 2025-11-22 14:04:56.692624 UTC | [australia-southeast#f0AOMhXqggfsFmog5FYfDOfHzl4guVTp] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#f0AOMhXqggfsFmog5FYfDOfHzl4guVTp not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 2 | 2025-11-22 14:04:55.976509 UTC | [australia-southeast#01let42yTTGJJViCmDNG5YkfPAhr1Fk5] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#01let42yTTGJJViCmDNG5YkfPAhr1Fk5 not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 3 | 2025-11-22 14:03:48.854516 UTC | [australia-southeast#x1o42d2VicP5Cfq9pYH1k9eW6sKi5djC] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#x1o42d2VicP5Cfq9pYH1k9eW6sKi5djC not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 4 | 2025-11-22 10:24:52.469294 UTC | [australia-southeast#TVhflpoNzOK6dSloQCFqoYcY1ttbiOiM] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#TVhflpoNzOK6dSloQCFqoYcY1ttbiOiM not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 5 | 2025-11-22 10:00:42.335501 UTC | [australia-southeast#Kb5C372wLecwyTihj1yO1jZT5t6ghNmU] Rejecting message [chat.RoomEvent] for closed client: Client MQ adapter for australia-southeast#Kb5C372wLecwyTihj1yO1jZT5t6ghNmU not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 6 | 2025-11-22 06:46:44.278841 UTC | [australia-southeast#f4cOuMd2wMGQ7v8DWLVYbCu6andAXeKO] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#f4cOuMd2wMGQ7v8DWLVYbCu6andAXeKO not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 7 | 2025-11-22 06:37:02.677132 UTC | [australia-southeast#vKYJCpFY1rXi4VT0Rn1A5ufFvbHuUNX4] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#vKYJCpFY1rXi4VT0Rn1A5ufFvbHuUNX4 not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
| 8 | 2025-11-22 06:36:11.151866 UTC | [australia-southeast#S8vqusQFBG4s7zK4IS1hlqIg5ADM4KwB] Rejecting message [pcast.StreamEnded] for closed client: Client MQ adapter for australia-southeast#S8vqusQFBG4s7zK4IS1hlqIg5ADM4KwB not found | frontend-australia-southeast-1-k3ms | PID:35 |
|
||||
|
||||
35
PhenixRTS/frontend-draining/THE QUERY.md
Normal file
35
PhenixRTS/frontend-draining/THE QUERY.md
Normal file
@@ -0,0 +1,35 @@
|
||||
|
||||
```SQL
|
||||
WITH DrainingConnectionIds AS (
|
||||
SELECT DISTINCT
|
||||
connectionId
|
||||
FROM
|
||||
`phenix-pcast.pcast_logs_us.syslog`,
|
||||
UNNEST(REGEXP_EXTRACT_ALL(Message, r"'([^']+)'")) AS connectionId
|
||||
WHERE
|
||||
Timestamp >= TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -2 MINUTE)
|
||||
AND Message LIKE "%Websocket connectionids preventing drain%"
|
||||
),
|
||||
SessionStartLogs AS (
|
||||
SELECT
|
||||
REGEXP_EXTRACT(Message, r"\[([^\]]+)\]") AS ApplicationId,
|
||||
REGEXP_EXTRACT(Message, r"connection \[([^\]]+)\]") AS ConnectionId,
|
||||
Message
|
||||
FROM
|
||||
`phenix-pcast.pcast_logs_us.syslog`
|
||||
WHERE
|
||||
Timestamp >= TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -90 DAY)
|
||||
AND Message LIKE '%Session started with connection%'
|
||||
)
|
||||
SELECT
|
||||
s.ApplicationId,
|
||||
s.ConnectionId,
|
||||
REGEXP_EXTRACT_ALL(s.Message, r"connection \[(\]]+)\]")[SAFE_OFFSET(1)] AS SessionId
|
||||
FROM
|
||||
SessionStartLogs AS s
|
||||
INNER JOIN
|
||||
DrainingConnectionIds AS d
|
||||
ON
|
||||
s.ConnectionId = d.connectionId
|
||||
|
||||
```
|
||||
@@ -0,0 +1,190 @@
|
||||
****
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Client
|
||||
participant HTTPServer
|
||||
participant WebSocketServer
|
||||
participant ClientMQ
|
||||
participant ClientMQWorker
|
||||
participant MQWorker
|
||||
participant PingInterval as Ping Interval<br/>(20s)
|
||||
|
||||
Note over Client,ClientMQWorker: === Connection Establishment ===
|
||||
Client->>HTTPServer: HTTP Upgrade Request
|
||||
HTTPServer->>WebSocketServer: upgrade event
|
||||
WebSocketServer->>WebSocketServer: handleUpgrade()
|
||||
WebSocketServer->>WebSocketServer: Generate connection.id (32 chars)
|
||||
WebSocketServer->>WebSocketServer: Get remoteAddress (from X-Forwarded-For)
|
||||
WebSocketServer->>WebSocketServer: Setup event handlers<br/>(message, close, pong, error)
|
||||
WebSocketServer->>ClientMQAdapter: connectDelegate(socket, headers)
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __connectionStart<br/>Set __connectionId<br/>Set __meta {headers}
|
||||
ClientMQAdapter->>ClientMQAdapter: Register in _socketsById
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __connected = true
|
||||
ClientMQAdapter->>ClientMQ: emit('connect', connectionId)
|
||||
ClientMQ->>ClientMQ: Register adapter in _adaptersById
|
||||
ClientMQ->>ClientMQWorker: emit('connect', connectionId)
|
||||
ClientMQWorker->>ClientMQWorker: Increment _connectionsTotal<br/>Increment _connectionsOpen
|
||||
alt Connection limit exceeded (>2000)
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 1013, 'concurrency-limit-exceeded')
|
||||
ClientMQ->>ClientMQAdapter: close(connectionId, code, reason)
|
||||
Note over ClientMQAdapter: See "Server-Initiated Close" below
|
||||
end
|
||||
|
||||
Note over Client,ClientMQWorker: === Normal Message Flow (Client Request) ===
|
||||
Client->>WebSocketServer: Message (binary or base64)
|
||||
WebSocketServer->>ClientMQAdapter: messageDelegate(socket, data)
|
||||
ClientMQAdapter->>ClientMQAdapter: Decode message (MQ protocol)
|
||||
alt Unsupported message format
|
||||
ClientMQAdapter->>ClientMQAdapter: Drop message, log warning
|
||||
else Socket not connected
|
||||
ClientMQAdapter->>ClientMQAdapter: Drop message, log warning
|
||||
else Message type = 'Request'
|
||||
ClientMQAdapter->>MQWorker: processRequest(connectionId, type, payload, meta)
|
||||
alt Request timeout (>15s)
|
||||
MQWorker-->>ClientMQAdapter: TimeoutError
|
||||
ClientMQAdapter->>ClientMQAdapter: Encode mq.Error {reason: 'timeout'}
|
||||
ClientMQAdapter->>Client: Response (mq.Error)
|
||||
else Request failed
|
||||
MQWorker-->>ClientMQAdapter: Error
|
||||
ClientMQAdapter->>ClientMQAdapter: Encode mq.Error {reason: 'failed'}
|
||||
ClientMQAdapter->>Client: Response (mq.Error)
|
||||
else Request succeeded
|
||||
MQWorker-->>ClientMQAdapter: {type, payload, wallTime}
|
||||
ClientMQAdapter->>ClientMQAdapter: Build Response message<br/>(messageType: 'Response', requestId, type, payload)
|
||||
alt Socket not open
|
||||
ClientMQAdapter->>ClientMQAdapter: Drop response, log debug
|
||||
else Socket open
|
||||
ClientMQAdapter->>Client: Response (binary or base64)
|
||||
ClientMQAdapter->>ClientMQ: emit('request', connectionId, data)
|
||||
ClientMQ->>ClientMQWorker: emit('request', connectionId, data)
|
||||
ClientMQWorker->>ClientMQWorker: Handle subscription updates<br/>(JoinRoom, LeaveRoom, etc.)
|
||||
end
|
||||
end
|
||||
else Message type = 'Response'
|
||||
alt Request handler not found
|
||||
ClientMQAdapter->>ClientMQAdapter: Log warning, ignore
|
||||
else Request handler exists
|
||||
ClientMQAdapter->>ClientMQAdapter: Resolve pending request promise
|
||||
end
|
||||
else Message type = 'Event'
|
||||
ClientMQAdapter->>ClientMQAdapter: Log warning (unsupported)
|
||||
end
|
||||
|
||||
Note over Client,ClientMQWorker: === Server-Initiated Request ===
|
||||
ClientMQWorker->>ClientMQ: sendRequest(connectionId, type, message)
|
||||
ClientMQ->>ClientMQAdapter: sendRequest(connectionId, type, message)
|
||||
ClientMQAdapter->>ClientMQAdapter: Build request (requestId: 'S' + counter)
|
||||
ClientMQAdapter->>ClientMQAdapter: Register in _requests map
|
||||
alt Socket not found
|
||||
ClientMQAdapter-->>ClientMQWorker: Error: 'Websocket connection not found'
|
||||
else Socket not connected
|
||||
ClientMQAdapter-->>ClientMQWorker: Error: 'Websocket connection not opened'
|
||||
else Socket valid
|
||||
ClientMQAdapter->>Client: Request (binary or base64)
|
||||
ClientMQAdapter->>ClientMQAdapter: Start timeout (15s)
|
||||
Client->>WebSocketServer: Response message
|
||||
WebSocketServer->>ClientMQAdapter: messageDelegate (Response)
|
||||
ClientMQAdapter->>ClientMQAdapter: Resolve request promise
|
||||
ClientMQAdapter-->>ClientMQWorker: Response message
|
||||
alt Response timeout
|
||||
ClientMQAdapter->>ClientMQAdapter: Log warning, cleanup after 2x timeout
|
||||
end
|
||||
end
|
||||
|
||||
Note over Client,ClientMQWorker: === Heartbeat/Ping Flow ===
|
||||
loop Every 20 seconds
|
||||
PingInterval->>ClientMQAdapter: Ping interval trigger
|
||||
ClientMQAdapter->>ClientMQAdapter: Iterate all sockets
|
||||
alt Previous ping still pending
|
||||
ClientMQAdapter->>ClientMQAdapter: Skip ping, log warning
|
||||
else Socket has ping() method
|
||||
alt No pong received (lastPong < lastPing)
|
||||
ClientMQAdapter->>ClientMQAdapter: Detect timeout
|
||||
ClientMQAdapter->>ClientMQ: emit('timeout', connectionId)
|
||||
ClientMQ->>ClientMQWorker: emit('timeout', connectionId)
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 3334, 'timeout')
|
||||
Note over ClientMQAdapter: See "Server-Initiated Close" below
|
||||
else Pong received
|
||||
ClientMQAdapter->>Client: ping()
|
||||
Client->>WebSocketServer: pong()
|
||||
WebSocketServer->>ClientMQAdapter: pongDelegate(socket)
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __lastPong = Date.now()<br/>Calculate RTT
|
||||
ClientMQAdapter->>MQWorker: processRoundTripTimeMeasurement(...)
|
||||
end
|
||||
else Socket has no ping() method
|
||||
alt Last ping > 2x heartbeatInterval ago
|
||||
ClientMQAdapter->>ClientMQAdapter: Detect timeout
|
||||
ClientMQAdapter->>ClientMQ: emit('timeout', connectionId)
|
||||
Note over ClientMQAdapter: See timeout flow above
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Note over Client,ClientMQWorker: === Server-Initiated Close ===
|
||||
alt Close reason: timeout
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 3334, 'timeout')
|
||||
else Close reason: drain
|
||||
ClientMQWorker->>ClientMQ: stop(3333, 'drain')
|
||||
ClientMQ->>ClientMQAdapter: close(connectionId, code, reason) [for each]
|
||||
else Close reason: connection limit
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 1013, 'concurrency-limit-exceeded')
|
||||
else Close reason: heartbeat terminate
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 3335, 'unexpected-close')
|
||||
else Close reason: send error
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 3335, 'unexpected-close')
|
||||
end
|
||||
ClientMQ->>ClientMQAdapter: close(connectionId, code, reason)
|
||||
alt Socket not found
|
||||
ClientMQAdapter->>ClientMQAdapter: Log warning, return false
|
||||
else Connection already closed (>5s ago)
|
||||
ClientMQAdapter->>ClientMQAdapter: Force disconnect
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __forcefullyClosed = true
|
||||
ClientMQAdapter->>ClientMQ: emit('disconnect-after-forcefull-close')
|
||||
else Connection valid
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __connectionEnded = now()
|
||||
ClientMQAdapter->>Client: socket.close(code, reason)
|
||||
Client->>WebSocketServer: close event
|
||||
WebSocketServer->>ClientMQAdapter: disconnectDelegate(socket, code, description)
|
||||
alt Multiple close events
|
||||
ClientMQAdapter->>ClientMQAdapter: Log warning, return early
|
||||
else Socket already disconnected
|
||||
alt Previously forcefully closed
|
||||
ClientMQAdapter->>ClientMQ: emit('disconnect-after-forcefull-close')
|
||||
else Already disconnected
|
||||
ClientMQAdapter->>ClientMQ: emit('disconnect-already-disconnected')
|
||||
end
|
||||
else Normal disconnect
|
||||
ClientMQAdapter->>ClientMQAdapter: Remove from _socketsById
|
||||
ClientMQAdapter->>ClientMQAdapter: Set __connected = false
|
||||
ClientMQAdapter->>ClientMQAdapter: Calculate connection duration
|
||||
ClientMQAdapter->>ClientMQ: emit('disconnect', connectionId, code, description)
|
||||
ClientMQ->>ClientMQ: Remove from _adaptersById
|
||||
ClientMQ->>ClientMQWorker: emit('disconnect', connectionId, code, description)
|
||||
ClientMQWorker->>ClientMQWorker: Decrement _connectionsOpen
|
||||
ClientMQWorker->>ClientMQWorker: Clean up room subscriptions
|
||||
ClientMQWorker->>ClientMQWorker: Clean up conversation subscriptions
|
||||
ClientMQWorker->>MQWorker: publish('pcast.ConnectionDisconnected', ...)
|
||||
end
|
||||
end
|
||||
|
||||
Note over Client,ClientMQWorker: === Client-Initiated Close ===
|
||||
Client->>WebSocketServer: close connection
|
||||
WebSocketServer->>WebSocketServer: close event (reasonCode, description)
|
||||
WebSocketServer->>ClientMQAdapter: disconnectDelegate(socket, code, description)
|
||||
Note over ClientMQAdapter: Same disconnect flow as above
|
||||
|
||||
Note over Client,ClientMQWorker: === Error Handling ===
|
||||
alt Socket error
|
||||
Client->>WebSocketServer: error event
|
||||
WebSocketServer->>WebSocketServer: Log error
|
||||
else Handler error
|
||||
WebSocketServer->>WebSocketServer: Log error, continue
|
||||
else Send error (not opened)
|
||||
ClientMQWorker->>ClientMQ: close(connectionId, 3335, 'unexpected-close')
|
||||
ClientMQWorker->>MQWorker: request('pcast.ConnectionDisconnected', ...)
|
||||
Note over ClientMQAdapter: See "Server-Initiated Close" above
|
||||
else Send error (not found/closed)
|
||||
ClientMQWorker->>ClientMQWorker: Return {status: 'closed'}
|
||||
end
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user