Skip to content

[web-console] Reconcile high-throughput stream parsing behavior#6273

Open
Karakatiza666 wants to merge 1 commit into
mainfrom
refactor-stream
Open

[web-console] Reconcile high-throughput stream parsing behavior#6273
Karakatiza666 wants to merge 1 commit into
mainfrom
refactor-stream

Conversation

@Karakatiza666
Copy link
Copy Markdown
Contributor

Treat metrics stream values as simple numbers

Testing: manually make sure the metrics graphs, the change stream and logs behave as expected, in particular Change Stream is responsive when displaying a high-throughput stream.

The change removes a previous implementation for stream parsing, now Logs and Change Stream use the sam eload shedding mechanism; metrics parsing uses a simpler, off-the-shelf stream parsing without load shedding, and removes the unnecessary complication of using a BigNumber implementation to process the metrics values. The total code size and complexity is reduced.

Treat metrics stream values as simple numbers

Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
@Karakatiza666 Karakatiza666 requested a review from mihaibudiu May 19, 2026 12:55
@mihaibudiu mihaibudiu changed the title [web-console] Reconsile high-throughput stream parsing behavior [web-console] Reconcile high-throughput stream parsing behavior May 19, 2026
Copy link
Copy Markdown
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the spelling of the commit message; I changed the PR.

}
// `JSONParser` from `@streamparser/json-whatwg` handles tokenizing and
// document framing. Metric values parse as JS numbers — `Number.MAX_SAFE_INTEGER`
// is well outside the range of realistic timestamps, record counts, and byte sizes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the comment about SAFE_INTEGER relevant? I don't see it used anywhere.

*
* - **Network chunk:** whatever bytes one `reader.read()` call returns.
* Sized by the network, unpredictable, can be very large.
* - **Parser chunk:** a line-aligned slice the decoder hands to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you have a very large string in a column?

* `parser.write` — bounded by `PARSER_CHUNK_TARGET_BYTES` so a single
* synchronous parse can't monopolize the main thread.
*
* After each read, the accumulated buffer (prior leftover + the new network
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall reading these comments in a prior PR.
How is this code related? Is it a new implementation?

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blockers, see inline. The refactor itself is well-designed — the decoder/orchestrator split is a good separation of concerns. But the new newlineTextDecoder ships with zero tests, and TabPerformance opts out of the abstraction this PR introduces.

*/
function splitByNewlineWithOverflowShedding(
maxBufferedBytes: number,
export const newlineTextDecoder = (opts?: {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard block: newlineTextDecoder has zero test coverage.

newlineJsonDecoder is exercised by the existing parseStream describe block, but newlineTextDecoder — a brand-new ~100-line decoder with its own forward-scan chunking strategy (different from newlineJsonDecoder's backward lastIndexOf approach) — has no tests at all. It's used by TabLogs.svelte for the log viewer.

At minimum, please add a describe('newlineTextDecoder') block covering:

  1. Lines split correctly across chunk boundaries
  2. \r\n preserved on each emitted line (the docstring promises this)
  3. Load shedding fires onBytesSkipped when budget is exceeded
  4. Runaway record (no newline within MAX_LINE_SIZE) is purged

Vitest is already wired — this is straightforward.

Comment on lines +90 to +122
// `JSONParser` from `@streamparser/json-whatwg` handles tokenizing and
// document framing. Metric values parse as JS numbers — `Number.MAX_SAFE_INTEGER`
// is well outside the range of realistic timestamps, record counts, and byte sizes.
const reader = result.stream
.pipeThrough(new JSONParser({ paths: ['$'], separator: '' }))
.getReader()
let cancelled = false
const appendRow = pushAsCircularBuffer(
() => timeSeries,
63,
(v: TimeSeriesEntry) => v
)
cancelStream = () => {
cancelled = true
reader.cancel().catch(() => {})
result.cancel()
}
;(async () => {
try {
while (!cancelled) {
const { value, done } = await reader.read()
if (done) break
appendRow([value.value as TimeSeriesEntry])
}
},
{
bufferSize: 8 * 1024 * 1024
} catch {
/* fall through to restart logic */
}
)
cancelStream = cancel
if (cancelled || !metricsAvailable || !cancelStream) return
endMetricsStream()
if (pipelineName === targetPipelineName) {
startMetricsStream(api, targetPipelineName)
}
})()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces StreamFormatDecoder + parseStream as the way to consume byte streams, then immediately sidesteps it here by piping through JSONParser from @streamparser/json-whatwg with a hand-rolled read loop. That means metrics streaming has:

  • No load shedding (the other two consumers have it)
  • No shared error/cancel lifecycle
  • A bespoke restart pattern instead of onParseEnded
  • A silent catch {} at line 114 that swallows all errors, not just network failures

Consider writing a thin decoder (e.g. nativeJsonDecoder) that wraps JSONParser behind the StreamFormatDecoder interface and plugs into parseStream like the other two call sites. That way the abstraction this refactor introduces actually applies uniformly.

options
{ flushIntervalMs: options?.flushIntervalMs }
)
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test helper runParseStream only exercises newlineJsonDecoder. There is no equivalent for newlineTextDecoder, which has a substantially different chunking algorithm (forward indexOf scan collecting line endings vs. backward lastIndexOf snap). The two decoders could diverge on edge cases (e.g. a parser chunk that lands exactly on a \n, or a \r\n split across network chunks) without any test catching it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants