Skip to content

issues/6121: strict_checkpoint_sync config#6253

Open
nmarasoiu wants to merge 6 commits into
feldera:mainfrom
nmarasoiu:strict_mode
Open

issues/6121: strict_checkpoint_sync config#6253
nmarasoiu wants to merge 6 commits into
feldera:mainfrom
nmarasoiu:strict_mode

Conversation

@nmarasoiu
Copy link
Copy Markdown

@nmarasoiu nmarasoiu commented May 16, 2026

Describe Manual Test Plan

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid feature; unit tests cover both fast and strict modes thoroughly, including the partial-checkpoint and lagging-checkpoint regressions. A few items inline; the missing integration test and the empty manual test plan are the only soft blockers — flagging, not refusing.

Comment on lines +327 to +329
/// Shares the same count semantics as `total_completed_steps`: a record
/// ingested in step `n` is included in a durable checkpoint when
/// `total_checkpointed_steps > n`.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent semantics with the field above. The doc for total_completed_steps (line 314) says "a record ingested in step n is fully processed when total_completed_steps >= n". This new field, claiming to share the same count semantics, uses > n instead of >= n. Either:

  • The relation is also >= n (and notify_checkpoint(checkpoint.step) advances the field to checkpoint.step meaning steps 0..=checkpoint.step are checkpointed); or
  • n here actually refers to step_at_flush (the snapshot before the data lands), in which case the comment should say so explicitly.

Fix the wording so a future reader can't conclude that two adjacent fields with "the same count semantics" disagree by 1.

Comment thread crates/adapters/src/integrated/postgres/cdc_input.rs Outdated
/// Default: `false`.
#[serde(default)]
pub strict_checkpoint_sync: bool,
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rustdoc here is excellent — clear motivation, trade-off, and the storage requirement. Since this becomes the OpenAPI spec, this is the right place to put it. Two small touches:

  • The default isn't enforced by Default impl (PostgresCdcReaderConfig doesn't appear to derive Default), only by #[serde(default)]. Worth a comment that this is the on-the-wire default specifically.
  • "~30 s by default" — checkpoint cadence is configurable; either cite the config key or drop the specific number.

@mythical-fred
Copy link
Copy Markdown

One more thing the inline thread missed: the second commit ce8e3253 fmt is a fixup. Please git rebase -i and squash it into the previous commit before merge — linear-history hygiene.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the rework — all three inline findings and the missing integration test are addressed:

  • Doc semantics now uniform (>= n for both fields in coordination.rs; runtime predicate uses > step_at_flush, which is the same thing once the off-by-one is taken into account). Good.
  • WatcherMode { Fast, Strict } is clean; intent reads at every call site.
  • The ~30 s cadence claim is gone; strict_checkpoint_sync doc just references the serde default.
  • test_cdc_strict_checkpoint_sync_requires_storage covers the fail-fast path without Postgres; test_cdc_strict_checkpoint_sync_holds_slot exercises the end-to-end slot-hold + redelivery behavior across two pipeline runs. Nice two-run shape — that's exactly the assumption that needed coverage.

One hard block remains:

Dirty history. Four commits is too many for what is conceptually one change, and three of them are review-fixup commits:

  • ce8e3253 fmt — still present, lazy one-liner.
  • c8afa19f docs: fix >= n vs > n inconsistency …
  • eb3c91ba refactor: replace strict: bool with WatcherMode enum …
  • 6ef0f815 test: add integration tests for strict_checkpoint_sync

Per the project's commit-hygiene rules (linear history on main; every commit must be worthy of that history; review-fixup commits get squashed before merge), please git rebase -i and collapse this into a single coherent commit — feature + tests + docs together, with a real message explaining why strict_checkpoint_sync exists (the at-least-once guarantee for stateful pipelines, the storage prerequisite, and the watcher mode split). Drop the fmt commit and fold the Claude Co-Authored-By trailers into one consolidated trailer on the final commit if you want to keep attribution.

Soft nit (not blocking): the Manual Test Plan section in the PR description is still the empty placeholder. The new integration tests cover the behavior, so this is a soft block per the rules — but it costs nothing to write two sentences ("ran cargo test test_cdc_strict_checkpoint_sync_requires_storage; ran the #[ignore]d slot-hold test against a local Postgres with wal_level=logical") and it helps future readers.

Once the history is cleaned up I'm happy to approve.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three findings addressed: Completion doc uses >= n consistently, Mode enum replaces the bool flag, and the cadence prose no longer overstates 30s. Plus a new integration test exercises the strict-mode handshake and the storage-required path. LGTM.

@abhizer
Copy link
Copy Markdown
Contributor

abhizer commented May 18, 2026

Thank you!

I haven't gone through the code yet, but I'd recommend changing the config name to something else as checkpoint sync has a different meaning in Feldera: https://docs.feldera.com/pipelines/checkpoint-sync/

@mihaibudiu mihaibudiu requested review from abhizer, blp and swanandx May 19, 2026 03:42
@blp blp self-assigned this May 19, 2026
@blp
Copy link
Copy Markdown
Member

blp commented May 19, 2026

Feldera has a couple of fault tolerance models. FtModel::AtLeastOnce is the one that this PR seems to enable. The usual way for a connector to work is to report the highest fault-tolerance model that it can support via InputEndpoint::fault_tolerance() then, once it's instantiated, if InputConsumer::pipeline_fault_tolerance() reports a lower requirement, it can omit any of the requirements for that level.

It's a kind of misbehavior for a connector to report that its fault tolerance level is FtModel::AtLeastOnce and for the pipeline to report that the user requested FtModel::AtLeastOnce, but then for the connector to not implement that level of fault tolerance. But if I'm reading it right, the new option introduced here does just that: setting it to false (the default) disables fault tolerance.

So, my suggestion is that, instead of adding the new option, the connector should instead call InputConsumer::pipeline_fault_tolerance(). If it returns None, it is free to do whatever is most efficient (presumably this is what false would do); if it returns Some(FtModel::AtLeastOnce), then it acts like true.

@blp
Copy link
Copy Markdown
Member

blp commented May 19, 2026

So, my suggestion is that, instead of adding the new option, the connector should instead call InputConsumer::pipeline_fault_tolerance(). If it returns None, it is free to do whatever is most efficient (presumably this is what false would do); if it returns Some(FtModel::AtLeastOnce), then it acts like true.

And then checkpoints_supported should not be needed, because the controller won't allow fault tolerance without storage anyhow.

@blp
Copy link
Copy Markdown
Member

blp commented May 19, 2026

I don't think that adding total_checkpointed_steps to Completion is the way to go. That struct is for communication between the pipeline and the multihost coordinator. We can add fields to it, but I'd only do that for fields that the coordinator is going to use; so far, I don't know a reason for it to use the new one. Since the new field is one that is for communication between different parts of the pipeline, I would add it somewhere inside the adapters crate, not in an external interface.

@nmarasoiu
Copy link
Copy Markdown
Author

One more thing the inline thread missed: the second commit ce8e3253 fmt is a fixup. Please git rebase -i and squash it into the previous commit before merge — linear-history hygiene.

Hi i will do that once all is reviewed, will squash all..

@swanandx swanandx removed their request for review May 20, 2026 11:25
@nmarasoiu
Copy link
Copy Markdown
Author

I don't think that adding total_checkpointed_steps to Completion is the way to go. That struct is for communication between the pipeline and the multihost coordinator. We can add fields to it, but I'd only do that for fields that the coordinator is going to use; so far, I don't know a reason for it to use the new one. Since the new field is one that is for communication between different parts of the pipeline, I would add it somewhere inside the adapters crate, not in an external interface.

Hi blp, there is a commit now to your feedback items, pls let us know, thanks

nmarasoiu and others added 6 commits May 20, 2026 16:02
…c doc

- coordination.rs: total_checkpointed_steps uses >= n semantics,
  matching total_completed_steps (implementation already correct; doc had typo)
- cdc_input.rs: update two mirroring comments to reference >= n
- postgres.rs: drop hardcoded "~30 s" cadence, note serde-only default

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tcher_task

Fast/Strict variants carry intent at every call site and make a future
third mode trivial to add without a boolean ambiguity.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
test_cdc_strict_checkpoint_sync_requires_storage: verifies that
starting with strict_checkpoint_sync=true and no storage backend
fails at startup with a clear error (no Postgres connection needed).

test_cdc_strict_checkpoint_sync_holds_slot: verifies that with
strict_checkpoint_sync=true the replication slot LSN is not advanced
until a Feldera checkpoint completes, so rows are redelivered on
restart (at-least-once guarantee). Marked #[ignore]; requires
wal_level=logical and POSTGRES_URL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reviewed the pivot on 5c14a28. The redesign addresses both of blp's concerns cleanly:

  • strict_checkpoint_sync config option dropped from PostgresCdcReaderConfig. Strict mode is now implied by fault tolerance being enabled — the right default, and one fewer footgun.
  • total_checkpointed_steps removed from Completion (the pipeline ↔ multihost coordinator wire struct). The checkpoint-step frontier now travels over a separate checkpoint_notifier: watch::Sender<u64> inside ControllerStatus, which keeps the coordination boundary clean.
  • checkpoints_supported()checkpoint_watcher() -> Option<watch::Receiver<u64>>. Returns Some only when fault tolerance is enabled. The Postgres connector picks WatcherReceiver::Strict when present, WatcherReceiver::Fast (step completion) otherwise. Falling back automatically eliminates the old startup-failure path and the test that pinned it.
  • Watcher abstraction (enum WatcherReceiver { Fast(Completion), Strict(u64) }) gives the background task a single .changed() / .frontier() surface — clean, and the unit tests around fast-mode, strict-mode, partial checkpoint, immediate-fire and multi-batch all map onto the new shape correctly.

One design question for blp to weigh in on: your original suggestion was to have the connector consult the existing InputConsumer::pipeline_fault_tolerance() and key its own behaviour off FtModel::AtLeastOnce, rather than adding a new checkpoint_watcher() trait method. The author's approach hides the FT-enablement check inside the new method (so the connector doesn't need to learn the FtModel taxonomy), at the cost of one extra trait method. Both are defensible — happy either way, but worth your call.

Not blocking on it; prior approval stands on the substance. Two soft notes:

  1. The Documentation updated checkbox is unchecked and no docs files moved. The user-facing surface didn't really change (option removed, behaviour now implicit on FT) so I don't think this needs new prose, but please add a changelog line noting that strict-mode (slot only advances after durable checkpoint) becomes implicit when fault tolerance is enabled.
  2. As you already told gz, please squash 5f6d6286 fmt and any other fixups into a single linear-history commit before merge.

@mihaibudiu
Copy link
Copy Markdown
Contributor

One more thing the inline thread missed: the second commit ce8e3253 fmt is a fixup. Please git rebase -i and squash it into the previous commit before merge — linear-history hygiene.

Hi i will do that once all is reviewed, will squash all..

Fred is an AI agent, he lacks commonsense.

@blp
Copy link
Copy Markdown
Member

blp commented May 20, 2026

Hi blp, there is a commit now to your feedback items, pls let us know, thanks

Looks like a good fix to me, thanks!

@blp
Copy link
Copy Markdown
Member

blp commented May 21, 2026

@nmarasoiu CI failed because of formatting. Please run cargo fmt. (Ordinarily CI would just fix this itself but it can't push to your repo of course.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants