Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Draft: two design questions, see inline.
datagenoriginal is v0.281.0 and roaring branch is the current platform runtime on 92e4a95. enable_roaring=true was set on all runs. Bench Summary
Filter Hit/Miss Comparison:
Takeaways:
postgresoriginal is v0.281.0 and roaring branch is the platform runtime on 92e4a95. I loaded all four Postgres tables to 200,000,000 rows each. It's very slow. Bench Summary
Filter Read
Takeaways:
deltalakeoriginal is v0.281.0 and roaring branch is the platform runtime on the current manager. These were 300-second fda bench runs, and all four runs had enable_roaring=true. Bench Summary
For fixed 300s runs, absolute storage at stop time is influenced by how much work got processed in that window. Filter Miss Table Miss rate is misses / (hits + misses), so higher is better.
Read
kafka
Bench Summary
Filter Miss Table
Main read:
|
a1e6733 to
84819c1
Compare
|
This is currently configured as an enabled by default feature with option to disable it in dev-tweaks. Thoughts? |
mythical-fred
left a comment
There was a problem hiding this comment.
Three blockers. The new dev_tweaks knob needs docs in docs.feldera.com, the roaring path should start default-off rather than enabled-by-default with a kill switch, and the [ci] apply automatic fixes commit needs to be squashed because dirty history is still a hard no.
| pub fn enable_roaring(&self) -> bool { | ||
| // Roaring is enabled by default, but `enable_roaring = false` remains | ||
| // available as a kill switch while the feature is still being tuned. | ||
| self.enable_roaring.unwrap_or(true) |
There was a problem hiding this comment.
On the feature-flag question from the PR thread: this should start default-off. An incompatible storage-format feature that is still being tuned wants a real rollout gate, not an opt-out kill switch after it is already on for everyone. enable_roaring = true should selectively enable it; the default should stay false until we have more production mileage.
73d0445 to
0db137f
Compare
1f2dec2 to
04e01a1
Compare
|
Thanks @blp great feedback, I addressed evyerthing except for the upsampling constant of 100. Will try to run the benchmark with lower values and see if it has an impact. |
|
ryzhyk
left a comment
There was a problem hiding this comment.
The implementation looks nice and modular, but it's a lot of complexity. Most benchmarks are mixed. Is the main justification for this that it significantly speeds up ingest into tables whose PKs arrive in-order?
| K: DataTrait + ?Sized, | ||
| { | ||
| fn sample_count_for_filter_plan(num_keys: usize) -> usize { | ||
| let scaled = ((num_keys as f64) * (FILTER_PLAN_SAMPLE_PERCENT / 100.0)).ceil() as usize; |
There was a problem hiding this comment.
What does the theory (and/or benchmarks) say here? Sampling 0.1% of a large batch is a lot of IO. Sampling 1024 keys in a small batch can be relatively expensive too.
Is there a constant ceiling on the number of keys that need to be sampled to achieve good probabilistic precision? E.g., is 100 or 1000 sufficient?
There was a problem hiding this comment.
There is an implicit upper bound by the fact that we don't attempt to build a bitmap filter if the batch range does not fit in a u32::MAX.
e.g. 0.1% of u32::MAX is 4294967 or 34 MiB of we assume 8 bytes keys. But I'll see what happens to prediction if we make it lower.
As long as the connector can max out the circuit it seems to be between 10-20% better wrg to tput (datagen+delta), and less memory (-28% datagen, -20% delta) for things that arrive mostly ordered. One question I don't know and haven't looked at is if also can benefits transactions commits if it has to build a spine and can do it in a way such new entries appear mostly sequential. Do you think it's plausible? |
I missed the numbers for memory. But yes, this is a significant improvement, if the input pattern is favorable. I don't know how frequently this happens in practice.
This will be the case if the output of an operator is indexed in the same way as the input, then you can expect outputs to be ordered. Also not sure how common this is. Normally outputs need to be reindexed. |
Have you tested it on some customer workloads? If you feel you've done enough testing, it may be ok to enable by default. |
51ca6f7 to
3793d66
Compare
| #deltalake-catalog-unity = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" } | ||
| #deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" } | ||
| deltalake-catalog-unity = { git = "https://github.com/gz/delta-rs.git", rev = "c37cf6e788d653db4149b205542559cf25674002" } | ||
| deltalake = { git = "https://github.com/gz/delta-rs.git", rev = "c37cf6e788d653db4149b205542559cf25674002" } |
There was a problem hiding this comment.
Two concerns on this dep bump:
-
Personal-fork dependency on
main. This switchesdeltalake/deltalake-catalog-unityfromryzhyk/delta-rs(the team's vetted fork) togz/delta-rsat a single SHA. Personal forks under one engineer's account are fine for spike branches, but landing them onmainmakes the build depend on an account/repo with no team write access — ifgzever force-pushes, deletes the branch, or rotates ownership, the workspace stops building. Please push these patches intoryzhyk/delta-rs(or a feldera-org fork) and bump to that SHA before merging. -
Round-robin pre-split is back, just relocated. The commit message says this points at the "round-robin pre-split branch". That's the same design we agreed to withdraw on 2026-04-30 — moving it down into the delta-rs fork doesn't make the design questions go away (lex ordering assumptions, per-file reader scheduling, behavior under concurrent writers / OPTIMIZE rewrites, retry semantics). If we want to land it via the fork, please link the delta-rs PR/diff in this PR description and add tests on the feldera side that exercise the new path; otherwise drop the fork bump from this PR and keep it for a separate, dedicated change.
| .map(|n| n as usize) | ||
| .filter(|n| *n > 0) | ||
| }) | ||
| .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); |
There was a problem hiding this comment.
This is presented as opt-in via DELTA_DF_TARGET_PARTITIONS, but the default path now pins datafusion.execution.target_partitions = 6 (or max_concurrent_readers) for every Delta connector. Before this change target_partitions was unset, so DataFusion used its own default (num_cpus). On a typical multi-core worker that's a regression in snapshot parallelism — e.g. a 32-core host drops from 32 to 6 partitions on upgrade with no config change.
If the goal is just to expose a knob, leave target_partitions unset when neither the env var nor max_concurrent_readers is provided (i.e. only set_usize it when one of those was actually specified). Otherwise this needs to be called out as an intentional default change with a benchmark, because it silently re-tunes every existing Delta source.
This makes sure the merger overhead does not become too high once batches grow close to u32::MAX sampling millions of keys (lots of random lookups/reads). One way to fix is would have been to cap sampling to like 1000 keys. This works but.. It turns out it's probably good enough to just look at min/max/count and make the predictor slightly more pessimistic (so pick bloom when it's close because it will not cause perf regressions in existing deployments). The new predictor is very close for this. It gets it mostly right, when it gets it wrong it mostly suggests bloom, when it wrongly suggests roaring either the performance diff is negligible or the distribution is artifically pathological. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
Track the exact number of 16-bit windows touched by each simulated input batch using a temporary 8 KiB bitset, store the final count in a compact 16-bit encoding, and feed those counts into the merge-time roaring lookup heuristic alongside min/max span overlap. This fixes the wide-but-holey benchmark cases without sampling keys and updates the CSV/report output to show the new touched-window estimates.
This turned out to be a cheaper predictor than sampling because it doesnt require reads over the file. Also make some perf tweaks to avoid regressions: - Avoid building filter for in-memory batches (unlikely to be useful at that size anyways) Reduce runtime overheads when feature is disabled: - inline(always) for per-key code-path Reduce runtime overhead when feature is enabled (and random ingest): - Don't build filter for memory batches Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
Bumps the delta-rs deps to the round-robin pre-split branch
(gz/delta-rs.git#c37cf6e7), and adds three env-var knobs to the Delta
input connector that the patched delta-rs branch plays with:
DELTA_DF_TARGET_PARTITIONS - number of parallel scan partitions.
Falls back to `max_concurrent_readers`
from the connector config.
DELTA_DF_BATCH_SIZE - rows per parquet batch. Defaults to
DataFusion's 8192 when unset.
DELTA_SNAPSHOT_EXPLAIN - one-shot dump of the snapshot query's
physical plan at INFO level, useful
when sanity-checking parallelism /
ordering decisions.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
| pub fn enable_roaring(&self) -> bool { | ||
| // Roaring is enabled by default, but `enable_roaring = false` remains | ||
| // available as a kill switch while the feature is still being tuned. | ||
| self.enable_roaring.unwrap_or(true) |
There was a problem hiding this comment.
Still unwrap_or(true). The new INCOMPATIBLE_FEATURE_ROARING_FILTERS gating in 53da209 is a nice cleanup (avoids a v6 bump and keeps Bloom-only files readable) but it does not address the on-disk forward-compat concern: with this default, every roaring-eligible batch written by an upgraded binary still sets the incompatible bit, and the PR body itself notes downgrade requires clearing storage. While the comment literally says "still being tuned", the default should be false for the first release. Flip it; opt in selectively until tuning is complete.
| deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" } | ||
| #deltalake-catalog-unity = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" } | ||
| #deltalake = { git = "https://github.com/ryzhyk/delta-rs.git", rev = "b24e5aaec322db0f084449e5c57f97ff4526bbdc" } | ||
| deltalake-catalog-unity = { git = "https://github.com/gz/delta-rs.git", rev = "c37cf6e788d653db4149b205542559cf25674002" } |
There was a problem hiding this comment.
gz/delta-rs#c37cf6e7 is still a personal fork — single-engineer maintenance/supply-chain risk and, per the commit subject, the SHA is the "round-robin pre-split" branch, i.e. the snapshot round-robin design we agreed to withdraw on 2026-04-30 has just moved into the fork. Two things needed before this can land:
- Upstream the delta-rs change to
delta-io/delta-rs(or at minimum toryzhyk/delta-rs) and re-pin here. - Link the delta-rs diff in the PR description and add feldera-side coverage for the round-robin behavior — the prior round-robin review raised lex-ordering assumptions, retry semantics under concurrent writers / OPTIMIZE, and behavior on restart, none of which are exercised by anything in this PR.
| .map(|n| n as usize) | ||
| .filter(|n| *n > 0) | ||
| }) | ||
| .unwrap_or(DEFAULT_MAX_CONCURRENT_READERS); |
There was a problem hiding this comment.
.unwrap_or(DEFAULT_MAX_CONCURRENT_READERS) is unconditional: when neither DELTA_DF_TARGET_PARTITIONS nor max_concurrent_readers is set, this pins datafusion.execution.target_partitions = 6 for every Delta connector. Previously the option was unset and DataFusion picked num_cpus. On any host with > 6 cores this is a silent snapshot-parallelism regression on upgrade.
Only call set_usize("datafusion.execution.target_partitions", …) when the env var or max_concurrent_readers is explicitly provided; otherwise leave DataFusion's default in place.
This PR adds three things:
1st commit: A bitmap filter based on the roaring crate with
2nd commit: A benchmark that validates the predictor function we use to choose between roaring and bloom filters (2k lines, this code can be mostly ignored for the PR review)
3rd commit: Code that adjusts storage files by setting an incompatible feature flag
Describe Manual Test Plan
Ran many pipelines
Checklist
Breaking Changes?
Ideally no
Describe Incompatible Changes
File format has new incompatible feature, old versions will refuse to read the files with a roaring filter.
Users should not downgrade once they go beyond this version without clearing storage.