Switch ad-hoc queries to arrow_ipc format#4240
Conversation
56b7606 to
9c0a916
Compare
|
It doesn't look like it answers queries (I tried fraud-detection), revision 9c0a916 |
|
I think you encountered this one: #4239 |
9c0a916 to
166c097
Compare
gz
left a comment
There was a problem hiding this comment.
lgtm make sure to do
some testing before merging (different pipelines with different types (variant map etc.) and queries)
|
@Karakatiza666 can we merge this? |
|
Haven't had the chance to do a few tests with complex types, hopefully tomorrow |
94e2e00 to
726dedf
Compare
|
Blocked by #4287 |
mythical-fred
left a comment
There was a problem hiding this comment.
Web-console behavioral changes without tests. Blocked by #4287 per author — but tests are also needed before this lands.
| import BigNumber from 'bignumber.js' | ||
| import Dayjs from 'dayjs' | ||
|
|
||
| const arrowIpcValueToJS = <T extends DataType<Type, any>>(arrowType: Field<T>, value: any) => { |
There was a problem hiding this comment.
New Arrow IPC value conversion logic covers a lot of type cases (int64, timestamps, structs, maps, lists). This is exactly the kind of logic that deserves unit tests — each type case can go wrong independently. Per Gerd's directive (2026-03-04): behavioral changes require tests. Setup: npm install -D vitest @testing-library/svelte jsdom. The pure conversion functions here are ideal for unit testing without any DOM or component setup.
|
@abhizer can we close this? |
|
This is for the UI, cc @Karakatiza666 |
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
|
@Karakatiza666 we shoudl be able to fix/merge this now |
…eldera#4226 feldera#5814 revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: feldera#3923 feldera#3792 feldera#4287 feldera#4226 feldera#5814 feldera#4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
fada2e5 to
8255106
Compare
8255106 to
79d2f14
Compare
| assert_eq!(expected_buffer, buffer_copy); | ||
| } | ||
|
|
||
| /// `relation_to_arrow_fields` picks the array layout based on the consumer: |
There was a problem hiding this comment.
technically it's not "consumer", but the "delta_lake" boolean field, which happens to be correlated with the consumer.
| if delta_lake { | ||
| DataType::LargeList(element) | ||
| } else { | ||
| DataType::List(element) |
There was a problem hiding this comment.
Add a TODO here to remove this once that issue is resolved instead of the comment above.
| adhocQueries[tenantName][pipelineName].queries[i].result.columns.length === 0 && | ||
| isDataRow(input[0]) | ||
| ) { | ||
| // Limit result size behavior - ignore all but first bufferSize rows |
There was a problem hiding this comment.
I don't understand the comment before -.
| enumValue(Type.Float16), | ||
| enumValue(Type.Float32), | ||
| enumValue(Type.Float64), | ||
| enumValue(Type.DenseUnion), |
There was a problem hiding this comment.
what are these? will they even work?
There was a problem hiding this comment.
These values? All arrow types recognized by this library
| const field = batch.schema.fields[j] | ||
| const column = batch.getChildAt(j) | ||
|
|
||
| if (column && column.isValid(i)) { |
There was a problem hiding this comment.
what does it mean not being valid?
is this what is supposed to happen?
There was a problem hiding this comment.
This is arrow's semantics, just field not being NULL. Slightly adjusted the implementation
| ), | ||
| list_col: buildVector(new List(new Field('item', new Int32(), true)), [10, 20, 30]), | ||
| map_col: buildVector( | ||
| new Map_( |
There was a problem hiding this comment.
We support maps with (almost) arbitrary key types (and value types), so maybe you should add a test for a case with integer keys too.
| expect(nullValue).toBeNull() | ||
| expect((struct as { toJSON(): unknown }).toJSON()).toEqual({ a: 7, b: 'foo' }) | ||
| expect((list as { toJSON(): unknown }).toJSON()).toEqual([10, 20, 30]) | ||
| expect((map as { toJSON(): unknown }).toJSON()).toEqual({ k1: 42 }) |
There was a problem hiding this comment.
for other maps this kind of test won't work
| adhocQueries[tenantName][pipelineName].queries[i].result | ||
| .rows() | ||
| .push(...rows.slice(0, bufferSize - previousLength)) | ||
| reclosureKey(adhocQueries[tenantName][pipelineName].queries[i].result, 'rows') |
There was a problem hiding this comment.
I can't guess from this function name what it does
(I know it's not added in this PR)
There was a problem hiding this comment.
Ugh yes, that's one of a pair of helpers for advanced state reactivity management in Svelte 5
79d2f14 to
f2cc20d
Compare
Make ARRAYs serialize as arrow List for ad-hoc queries Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
f2cc20d to
513e6e3
Compare
|
Switching from LargeList to List for ad-hoc queries causes invalidation of all checksums we use in QA. I'll try to add support for LargeList to the upstream JS package first |
|
Tracked in apache/arrow-js#438 |
Changes:
Testing: manual, added unit test for formatting
Part of #4219: Deprecate the JSON format for ad-hoc queries