Skip to content

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512

Merged
bowenli86 merged 5 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset
Jun 24, 2026
Merged

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512
bowenli86 merged 5 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset

Conversation

@bowenli86

@bowenli86 bowenli86 commented Jun 22, 2026

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR addresses FLINK-38918 by adding PyFlink support for configuring per-cluster starting and stopping offsets for DynamicKafkaSource.

The Java dynamic Kafka connector already supports offset initializers on ClusterMetadata and SingleClusterTopicMetadataService; this change exposes that capability through the PyFlink wrappers.

Brief change log

  • Added a PyFlink ClusterMetadata wrapper that forwards topics, properties, and optional starting/stopping KafkaOffsetsInitializers to the Java ClusterMetadata.
  • Extended SingleClusterTopicMetadataService with optional per-cluster starting and stopping offsets while preserving the existing constructor behavior when offsets are omitted.
  • Exported ClusterMetadata through pyflink.datastream.connectors.
  • Updated the flink-python test Kafka SQL connector dependency to 5.0.0-2.2.
  • Added Python unit tests for full, default, starting-only, and stopping-only per-cluster offset configurations.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for SingleClusterTopicMetadataService offset forwarding.
  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for ClusterMetadata offset forwarding.
  • Added coverage for backward-compatible default behavior when no per-cluster offsets are provided.
  • Added coverage for partial configurations where only starting offsets or only stopping offsets are provided.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes, upgrades a flink-python test-scoped Kafka SQL connector dependency
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes, exposes PyFlink connector API additions
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Python API docstrings

Was generative AI tooling used to co-author this PR?
  • Yes (Codex GPT-5)

Generated-by: Codex GPT-5

@flinkbot

flinkbot commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment thread flink-python/pyflink/datastream/connectors/dynamic_kafka.py
Comment thread flink-python/pyflink/datastream/connectors/dynamic_kafka.py Outdated
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 23, 2026
@bowenli86

Copy link
Copy Markdown
Member Author

thank you everyone for review, the pr is good to go per offline sync
merging

@bowenli86 bowenli86 merged commit b383d4b into apache:master Jun 24, 2026
@bowenli86 bowenli86 deleted the dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset branch June 24, 2026 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants