Skip to content

Performance optimizations#6266

Open
blp wants to merge 16 commits into
mainfrom
perf-opt
Open

Performance optimizations#6266
blp wants to merge 16 commits into
mainfrom
perf-opt

Conversation

@blp
Copy link
Copy Markdown
Member

@blp blp commented May 18, 2026

Not yet ready for review.

@blp blp self-assigned this May 18, 2026
@blp blp added ft Fault tolerant, distributed, and scale-out implementation performance storage Persistence for internal state in DBSP operators rust Pull requests that update Rust code labels May 18, 2026
blp added 10 commits May 20, 2026 16:22
I found these useful for testing.

I don't know why `aws_skip_signature` is needed sometimes but not others.
Web searches were unenlightening.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now the node identifiers for operator invocations have been formatted
as `[123.456]`.  This is fine except that the profiles show them as
`n123_456` and search only works if they're in the latter form, so this
commit changes them to use that form.

Signed-off-by: Ben Pfaff <blp@feldera.com>
…cation.

With this change, one can use `GlobalNodeId::node_identifier()` in a format
string without an extra allocation to make a String and then copy it.

This seems like a good idea because each operator evaluation calls this
(if profile capture is running).

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, each iteration of the spine merge loop has done three steps per
level like this:

1. Run a merge step, if there's a merger for this level.
2. If the merger is done, update the spine and drop the merger.
3. Start a new merger, if there's no merger for this level.

This meant that there was always a time gap between creating a merger (in
step 3) and running a merge step for it (in step 1), because all the other
mergers and levels would get a chance to run in between.  This made it hard
to interpret the elapsed time of a merge for level-0 merges, which always
complete in a single merge step.

This commit changes the pattern to:

1. Start a new merger, if there's no merger for this level.
2. Run a merge step, if there's a merger for this level.
3. If the merger is done, update the spine and drop the merger.

This means that elapsed time for level-0 merges is now accurate.

In addition, the previous implementation took the shared state lock on
every iteration, even though it was technically needed only to start a new
merge.  This commit fixes that.  It also fixes how the previous
implementation sometimes took the lock twice successively.  I don't know
whether lock contention was a serious issue here.

Signed-off-by: Ben Pfaff <blp@feldera.com>
ShardedAccumulatorReceiver did not wait for any kind of backpressure in
the spine that it created.  This meant that in some cases the spine could
end up with thousands of loose batches, which could start exhausting
memory and lead to memory pressure elsewhere, causing an overall slowdown.
This commit fixes the problem by making the receiver operator wait until
the number of batches in the spine reaches an acceptable level.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This made the problem with too many batches in the receiver's spine
evident from a circuit profile.

Signed-off-by: Ben Pfaff <blp@feldera.com>
ShardedExchangeSender was sending data to one remote worker, then waiting
for the channel to no longer be backed up, then sending it to another
remote worker, and so on.  This commit switches to sending all the data
to remote workers, then waiting for them all together, reducing the amount
of serialization.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, exchange profile spans have included exchange IDs, which allows
one to distinguish them from one another in the profiles but it doesn't
allow for associating them with particular operators.  This commit enables
the latter by adding operators' global node ids for spans associated with
exchanges and sharded accumulators.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The sharded accumulator adds larger number of smaller batches to its spine
than the exchange operator.  We can get faster merges by merging more of
these batches at a time.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@blp blp marked this pull request as ready for review May 21, 2026 00:21
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says "Not yet ready for review" but the PR isn't in draft state — could you flip it to draft until it's ready, or update the description? In the meantime, drive-by notes below.

No new tests in the diff for several behavior changes (spine MERGE_COUNTS bump, new ShardedAccumulatorReceiver backpressure path, exchange tx-buffer-drain reordering, level-0 single-merge-step pattern). Some are inherently perf-only, but at least the backpressure path looks testable. When you mark this ready, please describe the manual testing you did and the benchmark numbers that motivated the level-0 range and the receiver backpressure changes — that's load-bearing context for reviewers.

Comment thread crates/dbsp/src/operator/communication/exchange.rs
Comment thread crates/dbsp/src/operator/dynamic/sharded_accumulator.rs
Comment thread crates/dbsp/src/operator/dynamic/sharded_accumulator.rs Outdated
Comment thread crates/dbsp/src/operator/communication/exchange.rs
Comment thread crates/dbsp/src/trace/spine_async.rs
Comment thread crates/dbsp/src/operator/dynamic/sharded_accumulator.rs
Comment thread python/tests/workloads/test_tpch.py Outdated
Comment thread python/tests/workloads/test_tpch.py
Comment thread python/tests/workloads/test_tpch.py Outdated
let entry = &mut self.spines[index];

entry.spine.insert_without_blocking(batch);
let should_block = entry.spine.insert_without_blocking(batch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worse to insert with backpressure here and avoid the complexity of dealing with notifiers? I there are multiple receivers approaching backpressure threshold, they are guaranteed to all be busy merging, so it's not like they will remain idle if you don't push an additional batch to them.

I don't see any bugs in this implementation, just wondering if it could be simpler.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would definitely be easier to insert with backpressure.

The reason that I went with the more complex implementation is to avoid adding the latencies of multiple waits. This should wait for at most the longest latency of the waits across the receivers rather than the sum of the latencies.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Comment thread crates/dbsp/src/operator/dynamic/sharded_accumulator.rs Outdated
blp added 2 commits May 20, 2026 21:42
Signed-off-by: Ben Pfaff <blp@feldera.com>
The other names of this kind were prefixed by the operator name.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp added this pull request to the merge queue May 21, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks May 21, 2026
The global variable implementation didn't always work because some tests
don't run via run_cli(), which was what set it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp enabled auto-merge May 21, 2026 17:42
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@blp blp added this pull request to the merge queue May 21, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks May 21, 2026
…nds.

A job failed after 10 seconds because only 8797 out of 10000 records
showed up on time:
https://github.com/feldera/feldera/actions/runs/26249800789/job/77261331405

I think it's best to give more time if we're going to expect so many
records.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp enabled auto-merge May 21, 2026 20:49
@blp blp added this pull request to the merge queue May 21, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks May 21, 2026
@blp
Copy link
Copy Markdown
Member Author

blp commented May 21, 2026

The same kind of test has failed a couple of times, and I was able to reproduce it with the full series locally, so I'm going to try to bisect it to a particular commit locally.

@blp
Copy link
Copy Markdown
Member Author

blp commented May 21, 2026

The same kind of test has failed a couple of times, and I was able to reproduce it with the full series locally, so I'm going to try to bisect it to a particular commit locally.

Haven't been able to make it fail with [dbsp] Make ShardedAccumulatorReceiver operator wait for backpressure. yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ft Fault tolerant, distributed, and scale-out implementation performance rust Pull requests that update Rust code storage Persistence for internal state in DBSP operators

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants