Skip to content

pipeline-manager: pipeline tracking#6088

Draft
snkas wants to merge 3 commits into
mainfrom
pipeline-tracking
Draft

pipeline-manager: pipeline tracking#6088
snkas wants to merge 3 commits into
mainfrom
pipeline-tracking

Conversation

@snkas
Copy link
Copy Markdown
Contributor

@snkas snkas commented Apr 21, 2026

This makes the following two changes.

  1. (New) A new endpoint is added to track changes of a pipeline: GET /v0/pipelines/<name>/track. It returns a stream of NDJSON, with each line containing a dictionary of top-level field changes. The API server polls the database every second for any changes, and pushes the changes to the client. If there are no changes, it still pushes an empty dictionary to keep the connection alive. This endpoint is added in order to avoid having to add new selector= in the future.

  2. (New) Python SDK functions pipeline.wait_for_compilation() and pipeline.track() are added. The former was needed to properly test the latter. It is also useful standalone, for instance if the user edits a compilation-relevant field and wants to wait for compilation to finish successfully.

PR information

  • Manual test plan: open the track endpoint in one browser/terminal, and then interact with the pipeline to see changes appear in the tracking
  • Python integration tests are added
  • Fuzzing database tests
  • Changelog is updated
  • No breaking changes
  • Unit tests for the new descriptors in pipeline-manager/src/db/operations/pipeline_parsing.rs
  • Update documentation (if applicable)

Not yet finished; still a draft.

This makes the following two changes.

1. (New) A new endpoint is added to track changes of a pipeline:
   `GET /v0/pipelines/<name>/track`. It returns a stream of NDJSON, with
   each line containing a dictionary of top-level field changes. The API
   server polls the database every second for any changes, and pushes
   the changes to the client. If there are no changes, it still pushes
   an empty dictionary to keep the connection alive. This endpoint is
   added in order to avoid having to add new `selector=` in the future.

2. (New) Python SDK functions `pipeline.wait_for_compilation()` and
   `pipeline.track()` are added. The former was needed to properly test
   the latter. It is also useful standalone, for instance if the user
   edits a compilation-relevant field and wants to wait for compilation
   to finish successfully.

Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft review — design feedback only.

The version-based change tracking approach is well thought out: partitioning pipeline state into four independently-versioned domains and only fetching the groups that actually changed is the right call for a polling-based stream. The determine_json_delta function with its unit tests is a clean way to compute the NDJSON deltas.

Two items worth fixing before this leaves draft (see inline):

  • test_pipeline_waiting.py imports from build.lib (build artifact)
  • NDJSON content type should be application/x-ndjson

Comment thread python/tests/platform/test_pipeline_waiting.py Outdated
Comment thread crates/pipeline-manager/src/api/endpoints/pipeline_management.rs Outdated
Comment thread crates/pipeline-manager/src/db/storage_postgres.rs
snkas added 2 commits April 21, 2026 13:52
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants