Skip to content

chipingress: add batching client with per-event partial delivery#2085

Open
pkcll wants to merge 1 commit into
mainfrom
chipingress-batch-partial-delivery
Open

chipingress: add batching client with per-event partial delivery#2085
pkcll wants to merge 1 commit into
mainfrom
chipingress-batch-partial-delivery

Conversation

@pkcll
Copy link
Copy Markdown
Contributor

@pkcll pkcll commented May 23, 2026

Summary

Adds proto types and client wiring for chipingress batch publishing with per-event partial-delivery semantics.

All wire-format changes are additive and backwards-compatible with origin/main.

Proto (pkg/chipingress/pb/chip_ingress.proto)

  • PublishOptions { optional bool transaction_enabled = 1 }
    • Unset or false ⇒ partial delivery (server default): valid events are produced, per-event errors are reported in PublishResponse.results.
    • Explicit true ⇒ all-or-nothing: any per-event failure fails the entire batch.
  • CloudEventBatch.options = 2 (PublishOptions).
  • PublishResult.error = 2 (PublishError).
  • PublishError { PublishErrorCode error_code, string reason }.
  • PublishErrorCode enum:
    • PUBLISH_ERROR_CODE_UNKNOWN = 0
    • PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1
    • PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2
    • PUBLISH_ERROR_CODE_ENCODE_ERROR = 3
    • PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4
  • PublishResult.eventIdevent_id (same tag 1, same wire type, generated Go field EventId unchanged).

Single-shot client (pkg/chipingress/client.go)

  • BatchOpt + WithTransactionEnabled(bool) helper.
  • EventsToBatchWithOpts(events, opts...) variadic constructor.
  • EventsToBatch and EventsToBatchWithOpts default-populate Options{TransactionEnabled: false} so client intent is always explicit on the wire (defensive against any future server-default drift; server treats unset and explicit-false identically).
  • Type aliases in types.go: PublishOptions, PublishError, PublishErrorCode.

Batch client (pkg/chipingress/batch) — extend existing package

  • Extends the existing batching client (client.go only; batcher.go and stop_chan.go untouched).
  • New transactionEnabled field (default false = partial delivery); Opt: WithTransactionEnabled(bool).
  • newBatchRequest always emits PublishOptions explicitly (both true and false) matching the single-shot client contract.
  • splitMessagesByRequestSize now sizes requests with Options populated so the byte budget matches the wire form.
  • Per-event callback dispatch via new completeBatchCallbacksFromResults:
    • Partial-delivery path returns *PublishError per failed event.
    • Mismatch detection emits a synthetic ErrCodeResultsMismatch when len(results) != len(messages).
    • Positional event_id mismatch is logged and counted via the resultsMismatchTotal metric.
  • New PublishError{Code chipingress.PublishErrorCode, Reason string} with re-exported ErrCode* constants.

Observability (OTel metrics)

  • configInfo gauge gains a transaction_enabled bool attribute (reflects the effective client flag, false by default).
  • New resultsMismatchTotal counter (incremented on positional event_id mismatch).
  • Existing sendRequestsTotal, requestSizeMessages, requestSizeBytes, requestLatencyMS, batchSplitsTotal are unchanged.

Tests

  • Proto roundtrip: PublishResult with PublishError; PublishOptions.GetTransactionEnabled edge cases.
  • Single-shot helper: default, WithTransactionEnabled(true), WithTransactionEnabled(false).
  • Batch client: partial-delivery callbacks, RPC error fallback, WithTransactionEnabled(true) wire matcher, results-mismatch synthetic error, split-on-size with explicit-Options sizing.

Compatibility vs origin/main

  • Public Go API: strictly additive (no removed or renamed symbols).
  • Proto wire: additive (new fields, same tag numbers; eventId → event_id is a name-only change, generated Go field EventId and binary wire unchanged).
  • Behavior: EventsToBatch now returns a batch with Options{TransactionEnabled: false} populated where it was previously nil. Server treats both identically.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 23, 2026

✅ API Diff Results - github.com/smartcontractkit/chainlink-common/pkg/chipingress

✅ Compatible Changes (28)

./ (6)
  • BatchOpt — ➕ Added

  • EventsToBatchWithOpts — ➕ Added

  • PublishError — ➕ Added

  • PublishErrorCode — ➕ Added

  • PublishOptions — ➕ Added

  • WithTransactionEnabled — ➕ Added

batch (8)
  • ErrCodeDomainMisconfiguration — ➕ Added

  • ErrCodeEncodeError — ➕ Added

  • ErrCodeResultsMismatch — ➕ Added

  • ErrCodeSchemaMissing — ➕ Added

  • ErrCodeUnknown — ➕ Added

  • ErrCodeValidationFailed — ➕ Added

  • PublishError — ➕ Added

  • WithTransactionEnabled — ➕ Added

pb (10)
  • PublishError — ➕ Added

  • PublishErrorCode — ➕ Added

  • PublishErrorCode_name — ➕ Added

  • PublishErrorCode_PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION — ➕ Added

  • PublishErrorCode_PUBLISH_ERROR_CODE_ENCODE_ERROR — ➕ Added

  • PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING — ➕ Added

  • PublishErrorCode_PUBLISH_ERROR_CODE_UNKNOWN — ➕ Added

  • PublishErrorCode_PUBLISH_ERROR_CODE_VALIDATION_FAILED — ➕ Added

  • PublishErrorCode_value — ➕ Added

  • PublishOptions — ➕ Added

pb.(*CloudEventBatch) (1)
  • GetOptions — ➕ Added
pb.(*PublishResult) (1)
  • GetError — ➕ Added
pb.CloudEventBatch (1)
  • Options — ➕ Added
pb.PublishResult (1)
  • Error — ➕ Added

📄 View full apidiff report

@pkcll pkcll force-pushed the chipingress-batch-partial-delivery branch from 53854ab to 1071bc9 Compare May 27, 2026 02:55
Comment thread pkg/chipingress/pb/chip_ingress.proto Outdated
// Invariant: error is set if and only if the event was not produced to Kafka.
message PublishResult {
string eventId = 1;
string event_id = 1;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a breaking change.

  • Binary format: Protobuf serializes by field number, not name. Since the field number remains 1, existing binary-encoded messages are fully compatible.

  • JSON format: Protobuf's JSON mapping uses the lowerCamelCase of the field name by default. event_id automatically maps to "eventId" in JSON — the same as the old eventId field name did.

  • generated-code

    • Go: Both eventId and event_id produce the same generated accessor (EventId), so no breakage.

@pkcll pkcll force-pushed the chipingress-batch-partial-delivery branch from 6b830f6 to 56d73e9 Compare May 30, 2026 04:24
@pkcll pkcll changed the title chipingress: batch partial delivery support chipingress: add batching client with per-event partial delivery May 30, 2026
Add proto types and client wiring for chipingress batch publishing with
per-event partial-delivery semantics. All wire-format changes are
additive and backwards-compatible with origin/main.

Proto (pkg/chipingress/pb/chip_ingress.proto):
- Add PublishOptions{optional bool transaction_enabled = 1}.
  Unset or false => partial delivery (server default): valid events are
  produced, per-event errors reported in PublishResponse.results.
  Explicit true => all-or-nothing: any per-event failure fails the batch.
- Add CloudEventBatch.options = 2 (PublishOptions).
- Add PublishResult.error = 2 (PublishError).
- Add PublishError{PublishErrorCode error_code, string reason}.
- Add PublishErrorCode enum:
    PUBLISH_ERROR_CODE_UNKNOWN = 0,
    PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1,
    PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2,
    PUBLISH_ERROR_CODE_ENCODE_ERROR = 3,
    PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4.
- Rename PublishResult.eventId => event_id (same tag 1, same wire type,
  same generated Go field 'EventId').

Single-shot client (pkg/chipingress/client.go):
- BatchOpt + WithTransactionEnabled(bool) helper.
- EventsToBatchWithOpts(events, opts...) variadic constructor.
- EventsToBatch and EventsToBatchWithOpts default-populate
  Options{TransactionEnabled: false} so client intent is always
  explicit on the wire (defensive against future server-default drift).
- Type aliases in types.go: PublishOptions, PublishError,
  PublishErrorCode.

Batch client (pkg/chipingress/batch):
- New batching client (NewBatchClient) that accumulates messages and
  flushes by batch size, byte budget, or interval.
- transactionEnabled flag (default false = partial delivery);
  Opt: WithTransactionEnabled(bool).
- newBatchRequest always emits PublishOptions explicitly (both true and
  false) to match the single-shot client's contract.
- splitMessagesByRequestSize splits oversized batches by gRPC max
  request size, dropping any single event that exceeds the limit.
- Per-event callbacks: completeBatchCallbacksFromResults dispatches
  PublishResult outcomes; partial-delivery path returns *PublishError
  per failed event; mismatch detection emits a synthetic
  ErrCodeResultsMismatch when results length != messages length;
  positional event_id mismatch is logged and counted via
  resultsMismatchTotal.
- PublishError{Code chipingress.PublishErrorCode, Reason string} with
  re-exported ErrCode* constants.
- OTel metrics: sendRequestsTotal, requestSizeMessages,
  requestSizeBytes, requestLatencyMS, configInfo (carries
  transaction_enabled bool attribute), batchSplitsTotal,
  resultsMismatchTotal.

Tests:
- Proto roundtrip (PublishResult with PublishError;
  PublishOptions.GetTransactionEnabled edge cases).
- Single-shot helper covers default, WithTransactionEnabled(true),
  WithTransactionEnabled(false).
- Batch client: partial-delivery callbacks, RPC error fallback,
  WithTransactionEnabled(true) wire matcher, results-mismatch
  synthetic error, split-on-size with explicit-Options sizing.
@pkcll pkcll force-pushed the chipingress-batch-partial-delivery branch from 56d73e9 to 14df06f Compare May 30, 2026 04:35
@pkcll pkcll marked this pull request as ready for review May 30, 2026 04:49
@pkcll pkcll requested a review from a team as a code owner May 30, 2026 04:49
Copilot AI review requested due to automatic review settings May 30, 2026 04:49
@pkcll pkcll requested a review from patrickhuie19 May 30, 2026 04:49
@pkcll pkcll requested review from jmank88 and removed request for 4of9, engnke, fouadkada, kirqz23 and thomaska May 30, 2026 04:49
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds additive proto/API support for PublishBatch options and per-event partial-delivery reporting, and wires those semantics through both the single-shot helper and the existing batching client (including callback dispatch and OTel metrics), with accompanying tests.

Changes:

  • Extend chip_ingress.proto with PublishOptions, PublishError{code,reason}, and PublishResult.error, plus CloudEventBatch.options.
  • Add single-shot batch construction helpers (BatchOpt, WithTransactionEnabled, EventsToBatchWithOpts) that always emit explicit PublishOptions on the wire.
  • Extend the batch client to size/split requests including options, dispatch per-event callback errors from PublishResponse.results, and add mismatch observability.

Reviewed changes

Copilot reviewed 6 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
pkg/chipingress/types.go Re-exports new proto types (options/error/code) through the chipingress package.
pkg/chipingress/pb/chip_ingress.proto Adds publish options + per-event error/result schema and documents partial vs transactional semantics.
pkg/chipingress/pb/chip_ingress.pb.go Regenerated Go bindings for the updated proto (including PublishResult.error).
pkg/chipingress/pb/chip_ingress_grpc.pb.go Regenerated gRPC stubs + updated service docstrings.
pkg/chipingress/client.go Adds batch construction options and defaults to explicit transaction_enabled=false.
pkg/chipingress/client_test.go Adds proto/type roundtrip and helper defaulting tests.
pkg/chipingress/batch/client.go Adds transaction option wiring, per-event callback dispatch, mismatch metric, and request sizing with options.
pkg/chipingress/batch/client_test.go Adds coverage for partial-delivery callbacks, mismatch behavior, wire option emission, and sizing.
Files not reviewed (2)
  • pkg/chipingress/pb/chip_ingress.pb.go: Language not supported
  • pkg/chipingress/pb/chip_ingress_grpc.pb.go: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +46 to +50
ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN
ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED
ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING
ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR
ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION
Comment on lines 314 to +318
b.log.Errorw("failed to publish batch", "error", err)
b.completeBatchCallbacks(batchMessages, err)
} else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 {
b.completeBatchCallbacksFromResults(batchMessages, resp.Results)
} else {
Comment on lines 297 to 300
state protoimpl.MessageState `protogen:"open.v1"`
EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"`
EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
Error *PublishError `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
unknownFields protoimpl.UnknownFields
Comment on lines 54 to 55
// Client is a batching client that accumulates messages and sends them in batches.
type Client struct {
Comment on lines +66 to +68
log *zap.SugaredLogger
callbackWg sync.WaitGroup
shutdownTimeout time.Duration
Comment on lines +108 to +112
batchInterval: 100 * time.Millisecond,
maxPublishTimeout: 5 * time.Second,
stopCh: make(chan struct{}),
callbackWg: sync.WaitGroup{},
shutdownTimeout: 5 * time.Second,
@pkcll pkcll closed this May 30, 2026
@pkcll pkcll reopened this May 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants