issues/6121: strict_checkpoint_sync config#6253
Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Solid feature; unit tests cover both fast and strict modes thoroughly, including the partial-checkpoint and lagging-checkpoint regressions. A few items inline; the missing integration test and the empty manual test plan are the only soft blockers — flagging, not refusing.
| /// Shares the same count semantics as `total_completed_steps`: a record | ||
| /// ingested in step `n` is included in a durable checkpoint when | ||
| /// `total_checkpointed_steps > n`. |
There was a problem hiding this comment.
Inconsistent semantics with the field above. The doc for total_completed_steps (line 314) says "a record ingested in step n is fully processed when total_completed_steps >= n". This new field, claiming to share the same count semantics, uses > n instead of >= n. Either:
- The relation is also
>= n(andnotify_checkpoint(checkpoint.step)advances the field tocheckpoint.stepmeaning steps0..=checkpoint.stepare checkpointed); or nhere actually refers tostep_at_flush(the snapshot before the data lands), in which case the comment should say so explicitly.
Fix the wording so a future reader can't conclude that two adjacent fields with "the same count semantics" disagree by 1.
| /// Default: `false`. | ||
| #[serde(default)] | ||
| pub strict_checkpoint_sync: bool, | ||
| } |
There was a problem hiding this comment.
The rustdoc here is excellent — clear motivation, trade-off, and the storage requirement. Since this becomes the OpenAPI spec, this is the right place to put it. Two small touches:
- The default isn't enforced by
Defaultimpl (PostgresCdcReaderConfigdoesn't appear to deriveDefault), only by#[serde(default)]. Worth a comment that this is the on-the-wire default specifically. - "~30 s by default" — checkpoint cadence is configurable; either cite the config key or drop the specific number.
|
One more thing the inline thread missed: the second commit |
mythical-fred
left a comment
There was a problem hiding this comment.
Thanks for the rework — all three inline findings and the missing integration test are addressed:
- Doc semantics now uniform (
>= nfor both fields incoordination.rs; runtime predicate uses> step_at_flush, which is the same thing once the off-by-one is taken into account). Good. WatcherMode { Fast, Strict }is clean; intent reads at every call site.- The
~30 scadence claim is gone;strict_checkpoint_syncdoc just references the serde default. test_cdc_strict_checkpoint_sync_requires_storagecovers the fail-fast path without Postgres;test_cdc_strict_checkpoint_sync_holds_slotexercises the end-to-end slot-hold + redelivery behavior across two pipeline runs. Nice two-run shape — that's exactly the assumption that needed coverage.
One hard block remains:
Dirty history. Four commits is too many for what is conceptually one change, and three of them are review-fixup commits:
ce8e3253 fmt— still present, lazy one-liner.c8afa19f docs: fix >= n vs > n inconsistency …eb3c91ba refactor: replace strict: bool with WatcherMode enum …6ef0f815 test: add integration tests for strict_checkpoint_sync
Per the project's commit-hygiene rules (linear history on main; every commit must be worthy of that history; review-fixup commits get squashed before merge), please git rebase -i and collapse this into a single coherent commit — feature + tests + docs together, with a real message explaining why strict_checkpoint_sync exists (the at-least-once guarantee for stateful pipelines, the storage prerequisite, and the watcher mode split). Drop the fmt commit and fold the Claude Co-Authored-By trailers into one consolidated trailer on the final commit if you want to keep attribution.
Soft nit (not blocking): the Manual Test Plan section in the PR description is still the empty placeholder. The new integration tests cover the behavior, so this is a soft block per the rules — but it costs nothing to write two sentences ("ran cargo test test_cdc_strict_checkpoint_sync_requires_storage; ran the #[ignore]d slot-hold test against a local Postgres with wal_level=logical") and it helps future readers.
Once the history is cleaned up I'm happy to approve.
mythical-fred
left a comment
There was a problem hiding this comment.
All three findings addressed: Completion doc uses >= n consistently, Mode enum replaces the bool flag, and the cadence prose no longer overstates 30s. Plus a new integration test exercises the strict-mode handshake and the storage-required path. LGTM.
|
Thank you! I haven't gone through the code yet, but I'd recommend changing the config name to something else as checkpoint sync has a different meaning in Feldera: https://docs.feldera.com/pipelines/checkpoint-sync/ |
|
Feldera has a couple of fault tolerance models. It's a kind of misbehavior for a connector to report that its fault tolerance level is So, my suggestion is that, instead of adding the new option, the connector should instead call |
And then |
|
I don't think that adding total_checkpointed_steps to Completion is the way to go. That struct is for communication between the pipeline and the multihost coordinator. We can add fields to it, but I'd only do that for fields that the coordinator is going to use; so far, I don't know a reason for it to use the new one. Since the new field is one that is for communication between different parts of the pipeline, I would add it somewhere inside the adapters crate, not in an external interface. |
Hi i will do that once all is reviewed, will squash all.. |
Hi blp, there is a commit now to your feedback items, pls let us know, thanks |
…c doc - coordination.rs: total_checkpointed_steps uses >= n semantics, matching total_completed_steps (implementation already correct; doc had typo) - cdc_input.rs: update two mirroring comments to reference >= n - postgres.rs: drop hardcoded "~30 s" cadence, note serde-only default Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tcher_task Fast/Strict variants carry intent at every call site and make a future third mode trivial to add without a boolean ambiguity. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
test_cdc_strict_checkpoint_sync_requires_storage: verifies that starting with strict_checkpoint_sync=true and no storage backend fails at startup with a clear error (no Postgres connection needed). test_cdc_strict_checkpoint_sync_holds_slot: verifies that with strict_checkpoint_sync=true the replication slot LSN is not advanced until a Feldera checkpoint completes, so rows are redelivered on restart (at-least-once guarantee). Marked #[ignore]; requires wal_level=logical and POSTGRES_URL. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Re-reviewed the pivot on 5c14a28. The redesign addresses both of blp's concerns cleanly:
strict_checkpoint_syncconfig option dropped fromPostgresCdcReaderConfig. Strict mode is now implied by fault tolerance being enabled — the right default, and one fewer footgun.total_checkpointed_stepsremoved fromCompletion(the pipeline ↔ multihost coordinator wire struct). The checkpoint-step frontier now travels over a separatecheckpoint_notifier: watch::Sender<u64>insideControllerStatus, which keeps the coordination boundary clean.checkpoints_supported()→checkpoint_watcher() -> Option<watch::Receiver<u64>>. ReturnsSomeonly when fault tolerance is enabled. The Postgres connector picksWatcherReceiver::Strictwhen present,WatcherReceiver::Fast(step completion) otherwise. Falling back automatically eliminates the old startup-failure path and the test that pinned it.- Watcher abstraction (
enum WatcherReceiver { Fast(Completion), Strict(u64) }) gives the background task a single.changed()/.frontier()surface — clean, and the unit tests around fast-mode, strict-mode, partial checkpoint, immediate-fire and multi-batch all map onto the new shape correctly.
One design question for blp to weigh in on: your original suggestion was to have the connector consult the existing InputConsumer::pipeline_fault_tolerance() and key its own behaviour off FtModel::AtLeastOnce, rather than adding a new checkpoint_watcher() trait method. The author's approach hides the FT-enablement check inside the new method (so the connector doesn't need to learn the FtModel taxonomy), at the cost of one extra trait method. Both are defensible — happy either way, but worth your call.
Not blocking on it; prior approval stands on the substance. Two soft notes:
- The
Documentation updatedcheckbox is unchecked and no docs files moved. The user-facing surface didn't really change (option removed, behaviour now implicit on FT) so I don't think this needs new prose, but please add a changelog line noting that strict-mode (slot only advances after durable checkpoint) becomes implicit when fault tolerance is enabled. - As you already told gz, please squash
5f6d6286 fmtand any other fixups into a single linear-history commit before merge.
Fred is an AI agent, he lacks commonsense. |
Looks like a good fix to me, thanks! |
|
@nmarasoiu CI failed because of formatting. Please run |
Describe Manual Test Plan
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes