[web-console] Reconcile high-throughput stream parsing behavior#6273
[web-console] Reconcile high-throughput stream parsing behavior#6273Karakatiza666 wants to merge 1 commit into
Conversation
Treat metrics stream values as simple numbers Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
mihaibudiu
left a comment
There was a problem hiding this comment.
Please fix the spelling of the commit message; I changed the PR.
| } | ||
| // `JSONParser` from `@streamparser/json-whatwg` handles tokenizing and | ||
| // document framing. Metric values parse as JS numbers — `Number.MAX_SAFE_INTEGER` | ||
| // is well outside the range of realistic timestamps, record counts, and byte sizes. |
There was a problem hiding this comment.
why is the comment about SAFE_INTEGER relevant? I don't see it used anywhere.
| * | ||
| * - **Network chunk:** whatever bytes one `reader.read()` call returns. | ||
| * Sized by the network, unpredictable, can be very large. | ||
| * - **Parser chunk:** a line-aligned slice the decoder hands to |
There was a problem hiding this comment.
what happens if you have a very large string in a column?
| * `parser.write` — bounded by `PARSER_CHUNK_TARGET_BYTES` so a single | ||
| * synchronous parse can't monopolize the main thread. | ||
| * | ||
| * After each read, the accumulated buffer (prior leftover + the new network |
There was a problem hiding this comment.
I recall reading these comments in a prior PR.
How is this code related? Is it a new implementation?
mythical-fred
left a comment
There was a problem hiding this comment.
Two blockers, see inline. The refactor itself is well-designed — the decoder/orchestrator split is a good separation of concerns. But the new newlineTextDecoder ships with zero tests, and TabPerformance opts out of the abstraction this PR introduces.
| */ | ||
| function splitByNewlineWithOverflowShedding( | ||
| maxBufferedBytes: number, | ||
| export const newlineTextDecoder = (opts?: { |
There was a problem hiding this comment.
Hard block: newlineTextDecoder has zero test coverage.
newlineJsonDecoder is exercised by the existing parseStream describe block, but newlineTextDecoder — a brand-new ~100-line decoder with its own forward-scan chunking strategy (different from newlineJsonDecoder's backward lastIndexOf approach) — has no tests at all. It's used by TabLogs.svelte for the log viewer.
At minimum, please add a describe('newlineTextDecoder') block covering:
- Lines split correctly across chunk boundaries
\r\npreserved on each emitted line (the docstring promises this)- Load shedding fires
onBytesSkippedwhen budget is exceeded - Runaway record (no newline within
MAX_LINE_SIZE) is purged
Vitest is already wired — this is straightforward.
| // `JSONParser` from `@streamparser/json-whatwg` handles tokenizing and | ||
| // document framing. Metric values parse as JS numbers — `Number.MAX_SAFE_INTEGER` | ||
| // is well outside the range of realistic timestamps, record counts, and byte sizes. | ||
| const reader = result.stream | ||
| .pipeThrough(new JSONParser({ paths: ['$'], separator: '' })) | ||
| .getReader() | ||
| let cancelled = false | ||
| const appendRow = pushAsCircularBuffer( | ||
| () => timeSeries, | ||
| 63, | ||
| (v: TimeSeriesEntry) => v | ||
| ) | ||
| cancelStream = () => { | ||
| cancelled = true | ||
| reader.cancel().catch(() => {}) | ||
| result.cancel() | ||
| } | ||
| ;(async () => { | ||
| try { | ||
| while (!cancelled) { | ||
| const { value, done } = await reader.read() | ||
| if (done) break | ||
| appendRow([value.value as TimeSeriesEntry]) | ||
| } | ||
| }, | ||
| { | ||
| bufferSize: 8 * 1024 * 1024 | ||
| } catch { | ||
| /* fall through to restart logic */ | ||
| } | ||
| ) | ||
| cancelStream = cancel | ||
| if (cancelled || !metricsAvailable || !cancelStream) return | ||
| endMetricsStream() | ||
| if (pipelineName === targetPipelineName) { | ||
| startMetricsStream(api, targetPipelineName) | ||
| } | ||
| })() |
There was a problem hiding this comment.
This PR introduces StreamFormatDecoder + parseStream as the way to consume byte streams, then immediately sidesteps it here by piping through JSONParser from @streamparser/json-whatwg with a hand-rolled read loop. That means metrics streaming has:
- No load shedding (the other two consumers have it)
- No shared error/cancel lifecycle
- A bespoke restart pattern instead of
onParseEnded - A silent
catch {}at line 114 that swallows all errors, not just network failures
Consider writing a thin decoder (e.g. nativeJsonDecoder) that wraps JSONParser behind the StreamFormatDecoder interface and plugs into parseStream like the other two call sites. That way the abstraction this refactor introduces actually applies uniformly.
| options | ||
| { flushIntervalMs: options?.flushIntervalMs } | ||
| ) | ||
| }) |
There was a problem hiding this comment.
The test helper runParseStream only exercises newlineJsonDecoder. There is no equivalent for newlineTextDecoder, which has a substantially different chunking algorithm (forward indexOf scan collecting line endings vs. backward lastIndexOf snap). The two decoders could diverge on edge cases (e.g. a parser chunk that lands exactly on a \n, or a \r\n split across network chunks) without any test catching it.
Treat metrics stream values as simple numbers
Testing: manually make sure the metrics graphs, the change stream and logs behave as expected, in particular Change Stream is responsive when displaying a high-throughput stream.
The change removes a previous implementation for stream parsing, now Logs and Change Stream use the sam eload shedding mechanism; metrics parsing uses a simpler, off-the-shelf stream parsing without load shedding, and removes the unnecessary complication of using a BigNumber implementation to process the metrics values. The total code size and complexity is reduced.