Conversation
I found these useful for testing. I don't know why `aws_skip_signature` is needed sometimes but not others. Web searches were unenlightening. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now the node identifiers for operator invocations have been formatted as `[123.456]`. This is fine except that the profiles show them as `n123_456` and search only works if they're in the latter form, so this commit changes them to use that form. Signed-off-by: Ben Pfaff <blp@feldera.com>
…cation. With this change, one can use `GlobalNodeId::node_identifier()` in a format string without an extra allocation to make a String and then copy it. This seems like a good idea because each operator evaluation calls this (if profile capture is running). Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, each iteration of the spine merge loop has done three steps per level like this: 1. Run a merge step, if there's a merger for this level. 2. If the merger is done, update the spine and drop the merger. 3. Start a new merger, if there's no merger for this level. This meant that there was always a time gap between creating a merger (in step 3) and running a merge step for it (in step 1), because all the other mergers and levels would get a chance to run in between. This made it hard to interpret the elapsed time of a merge for level-0 merges, which always complete in a single merge step. This commit changes the pattern to: 1. Start a new merger, if there's no merger for this level. 2. Run a merge step, if there's a merger for this level. 3. If the merger is done, update the spine and drop the merger. This means that elapsed time for level-0 merges is now accurate. In addition, the previous implementation took the shared state lock on every iteration, even though it was technically needed only to start a new merge. This commit fixes that. It also fixes how the previous implementation sometimes took the lock twice successively. I don't know whether lock contention was a serious issue here. Signed-off-by: Ben Pfaff <blp@feldera.com>
ShardedAccumulatorReceiver did not wait for any kind of backpressure in the spine that it created. This meant that in some cases the spine could end up with thousands of loose batches, which could start exhausting memory and lead to memory pressure elsewhere, causing an overall slowdown. This commit fixes the problem by making the receiver operator wait until the number of batches in the spine reaches an acceptable level. Signed-off-by: Ben Pfaff <blp@feldera.com>
This made the problem with too many batches in the receiver's spine evident from a circuit profile. Signed-off-by: Ben Pfaff <blp@feldera.com>
ShardedExchangeSender was sending data to one remote worker, then waiting for the channel to no longer be backed up, then sending it to another remote worker, and so on. This commit switches to sending all the data to remote workers, then waiting for them all together, reducing the amount of serialization. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, exchange profile spans have included exchange IDs, which allows one to distinguish them from one another in the profiles but it doesn't allow for associating them with particular operators. This commit enables the latter by adding operators' global node ids for spans associated with exchanges and sharded accumulators. Signed-off-by: Ben Pfaff <blp@feldera.com>
The sharded accumulator adds larger number of smaller batches to its spine than the exchange operator. We can get faster merges by merging more of these batches at a time. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
mythical-fred
left a comment
There was a problem hiding this comment.
PR description says "Not yet ready for review" but the PR isn't in draft state — could you flip it to draft until it's ready, or update the description? In the meantime, drive-by notes below.
No new tests in the diff for several behavior changes (spine MERGE_COUNTS bump, new ShardedAccumulatorReceiver backpressure path, exchange tx-buffer-drain reordering, level-0 single-merge-step pattern). Some are inherently perf-only, but at least the backpressure path looks testable. When you mark this ready, please describe the manual testing you did and the benchmark numbers that motivated the level-0 range and the receiver backpressure changes — that's load-bearing context for reviewers.
| let entry = &mut self.spines[index]; | ||
|
|
||
| entry.spine.insert_without_blocking(batch); | ||
| let should_block = entry.spine.insert_without_blocking(batch); |
There was a problem hiding this comment.
Would it be worse to insert with backpressure here and avoid the complexity of dealing with notifiers? I there are multiple receivers approaching backpressure threshold, they are guaranteed to all be busy merging, so it's not like they will remain idle if you don't push an additional batch to them.
I don't see any bugs in this implementation, just wondering if it could be simpler.
There was a problem hiding this comment.
It would definitely be easier to insert with backpressure.
The reason that I went with the more complex implementation is to avoid adding the latencies of multiple waits. This should wait for at most the longest latency of the waits across the receivers rather than the sum of the latencies.
Signed-off-by: Ben Pfaff <blp@feldera.com>
The other names of this kind were prefixed by the operator name. Signed-off-by: Ben Pfaff <blp@feldera.com>
The global variable implementation didn't always work because some tests don't run via run_cli(), which was what set it. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
…nds. A job failed after 10 seconds because only 8797 out of 10000 records showed up on time: https://github.com/feldera/feldera/actions/runs/26249800789/job/77261331405 I think it's best to give more time if we're going to expect so many records. Signed-off-by: Ben Pfaff <blp@feldera.com>
|
The same kind of test has failed a couple of times, and I was able to reproduce it with the full series locally, so I'm going to try to bisect it to a particular commit locally. |
Haven't been able to make it fail with |
Not yet ready for review.