Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Two hard blocks (manual test plan + missing tests for the actual behavior change) and a couple of design points worth a look. Findings inline.
On the PR description: ### Describe Manual Test Plan literally says TODO: verify if we're actually spilling to disk. The whole point of this PR is to make Delta/Iceberg connectors spill so they stop OOMing on #6153 — please actually verify spilling happens (e.g. run a Delta CDC scan with a tight memory_mb_max against data large enough to force a spill, point at a storage path, observe files appearing under <storage>/delta-tmp-*/) and document those steps. Without that, neither you nor a reviewer can claim this fixes the bug.
mythical-fred
left a comment
There was a problem hiding this comment.
Architectural fix is in good shape — single shared RuntimeEnv, no more unwrap, sanitize_path_component retired, scratch dir centralized. One soft follow-up inline.
mythical-fred
left a comment
There was a problem hiding this comment.
LGTM — only change since my prior approval (a93909b) is the regenerated openapi.json. Trivial.
caf694c to
7cd8572
Compare
|
Seems to work as expected, needs feedback / input on: 1. Delta CDC target_partitions: if max_concurrent_readers isn't set, Delta input connector falls bach to DEFAULT_MAX_CONCURRENT_READERS=6.
if user specifies max_concurrent_readers, then it's upto them to increase datafusion memory as well if we don't cap. 2. Logs spam: When reading from spill, logs get spammed with: The issue is open at datafusion, one way would be to set |
|
On the two open questions: 1. I'd avoid silently expanding partitions past what the budget can sustain — the existing
The first is the principled fix. Floor-bump-only papers over the symptom. If you cap and the resulting partition count differs from Don't drop 2. Spill-log spam. Target-scoped filter is the right call —
I'd take option (a) — single line of code, dies when upstream fixes the accounting. Global Both are nice-to-have follow-ups, not blockers on this PR — my approval still stands. |
|
re 2: spill-log spam level was changed from and to use datafusion 53, we would need to update iceberg & delta-rs ( which we need to do anyways for fixing dependabot security alerts, but waiting for iceberg next release ) |
|
re 1: changed it to use worker same as ad-hoc query, but with improved error message: |
…ctors
Delta Lake and Iceberg connectors built bare SessionContexts with no
memory pool or spill path, so large scans (e.g. ORDER BY in Delta CDC)
could OOM the pipeline.
Build one Arc<RuntimeEnv> per pipeline with a FairSpillPool and a
spill directory at {storage.path}/datafusion-tmp/, shared by the ad-hoc
engine and every DataFusion-using connector. Add
RuntimeConfig.datafusion_memory_mb (default: 5% of the effective
memory budget, capped at 2 GB) and subtract it from the
DBSP circuit's RSS limit so the two no longer double-book RAM.
Rename ADHOC_TEMP_DIR -> DATAFUSION_TEMP_DIR ("datafusion-tmp") so
checkpointer::gc_startup keeps the new directory. Stale adhoc-tmp/
from prior releases is removed by the existing GC sweep.
Expose create_runtime_env / create_session_context[_with] in
feldera_adapterlib::utils::datafusion; the _with variant preserves
Delta's schema_force_view_types override.
Emit a startup `warn!` when the pool can't satisfy the ad-hoc sort
reservation (`workers * 64 MiB`), so an under-sized pool no longer
fails ORDER BY silently.
Fix #6153
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Re-review on 6ca60eedb7 — APPROVE stands. Solid pass on both open questions:
1. target_partitions vs budget. You went with neither cap-from-below nor floor-bump, and instead made the failure mode loud and actionable. That's a reasonable call:
warn_if_pool_too_small_for_adhoc_sortfires at startup whenpool_mb ≤ workers × 64 MB, with the exact arithmetic in the message.format_datafusion_errorappends the'datafusion_memory_mb'/'io_workers'/DELTA_DF_TARGET_PARTITIONS=1hint when the root cause isResourcesExhausted, walking pastContext(...)wrappers viafind_rootso it survives nested errors. The log you pasted reads exactly the way a user-actionable error should.min_pool_mb_for_adhoc_sort_matches_reservation_times_workerspins the threshold toSORT_SPILL_RESERVATION_BYTES, so the warning stays correct if the constant ever moves.
What I'd still gently push for as a follow-up: a real cap, not just a warning. A user who blows past the threshold sees the WARN at startup, ignores it (or it's drowned in pod logs), then gets a per-query failure later. Capping target_partitions = min(io_workers ∨ workers, max(1, pool_mb / 64)) would make the failure impossible instead of merely actionable. Tracking issue or follow-up PR is fine.
2. Spill-log spam. Agreed — apache/datafusion#19885 already demoted to debug! in v53.0.0 and the iceberg / delta-rs bump is the right cleanup path. Nothing to do here.
Small things, all non-blocking:
clean_stale_scratch_entriesis per-process best-effort. Two pipelines sharing the same storage path would race here (and would already have other problems — just worth a doc note that the scratch dir is single-tenant).format_datafusion_erroris delta-only; iceberginput.rswill produce the same error class on the same memory pool but won't get the hint. Cheap to share viafeldera_adapterlib::utils::datafusion.create_session_context_withis a nice generalization, but the only remaining caller-customised setting isschema_force_view_types = falseforserde_arrow. A// TODO: remove when serde_arrow supports Utf8Viewnext to it would prevent it from becoming load-bearing.
Thanks for the careful turnaround.
| /// the pipeline's effective memory budget, which would leave no memory | ||
| /// for the DBSP circuit. | ||
| DatafusionMemoryExceedsBudget { | ||
| datafusion_memory_mb: u64, |
There was a problem hiding this comment.
rather than throwing an error, you should just do min(datafusion_memory_mb, max_rss_mb) and log a warning?
| use tracing::warn; | ||
|
|
||
| /// In-memory sort threshold; above this, sorts spill to disk. 64 MB. | ||
| const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1_000_000; |
There was a problem hiding this comment.
any reason we can't we do MiB here
I get it k8s decided to be stupid and use MB, but computers and os allocators ultimately deal with pages and ranges that are in powers of two (4KiB, 64KiB, 2MiB, for 1 GiB) and a good way is to just think in base 2 for this if you want things that naturally fit in a range the OS or CPU deals with anyways
| /// typically a handful of bytes over a 4 KB tolerance -- upstream | ||
| /// accounting drift, tracked at | ||
| /// <https://github.com/apache/datafusion/issues/17340> Not a query failure | ||
| const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1_000_000; |
There was a problem hiding this comment.
same.. and you can write it cooler: 1<<26
| /// from the pool *before* sorting any rows. Both the pool and the | ||
| /// reservation are decimal MB, so this is straight multiplication. | ||
| fn min_pool_mb_for_adhoc_sort(workers: u64) -> u64 { | ||
| let reservation_mb = (SORT_SPILL_RESERVATION_BYTES as u64) / 1_000_000; |
There was a problem hiding this comment.
another nice thing about using powers of two is that divisions are very quick for computers.
| /// process is gone by the time we get here, so anything still in the dir is | ||
| /// orphaned. Spill files are per-query and never need to survive a restart. | ||
| /// Errors only logged: a stuck file should not block startup. | ||
| fn clean_stale_scratch_entries(scratch_dir: &Path) { |
There was a problem hiding this comment.
it would be nice to have a safe guard here such that Path can't be / (and/or has to have osme well formed pattern like startswith(datafusion) and rejected otherwise
| /// Upper bound on the default DataFusion pool size, in MB. | ||
| /// | ||
| /// Spill-to-disk handles overflow; reserving more starves the circuit. | ||
| pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048; |
| pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048; | ||
|
|
||
| /// Default DataFusion pool sizing fraction, as the divisor `effective / N`. | ||
| pub const DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR: u64 = 20; |
see commit
Fix #6153
Describe Manual Test Plan
Created big ( ~15M rows ) CDC Delta table and verified that delta connector cdc mode spillds to disk.
on main branch it uses over 8GB memory where as with this fix we use only about 2GB ( / around what we configure )
Verified there are datafusion session files on disk, if we configure pipeline with low memory we get:
though this was same behavior prev ig, but now we log warning if allocated memory is too low like:
pipeline sql:
run
select * from seeds order by id;.for delta cdc:
Checklist
Breaking Changes?
not a breaking change