chipingress: add batching client with per-event partial delivery#2085
chipingress: add batching client with per-event partial delivery#2085pkcll wants to merge 1 commit into
Conversation
✅ API Diff Results -
|
53854ab to
1071bc9
Compare
| // Invariant: error is set if and only if the event was not produced to Kafka. | ||
| message PublishResult { | ||
| string eventId = 1; | ||
| string event_id = 1; |
There was a problem hiding this comment.
This is not a breaking change.
-
Binary format: Protobuf serializes by field number, not name. Since the field number remains 1, existing binary-encoded messages are fully compatible.
-
JSON format: Protobuf's JSON mapping uses the lowerCamelCase of the field name by default. event_id automatically maps to "eventId" in JSON — the same as the old eventId field name did.
-
generated-code
- Go: Both eventId and event_id produce the same generated accessor (EventId), so no breakage.
6b830f6 to
56d73e9
Compare
Add proto types and client wiring for chipingress batch publishing with
per-event partial-delivery semantics. All wire-format changes are
additive and backwards-compatible with origin/main.
Proto (pkg/chipingress/pb/chip_ingress.proto):
- Add PublishOptions{optional bool transaction_enabled = 1}.
Unset or false => partial delivery (server default): valid events are
produced, per-event errors reported in PublishResponse.results.
Explicit true => all-or-nothing: any per-event failure fails the batch.
- Add CloudEventBatch.options = 2 (PublishOptions).
- Add PublishResult.error = 2 (PublishError).
- Add PublishError{PublishErrorCode error_code, string reason}.
- Add PublishErrorCode enum:
PUBLISH_ERROR_CODE_UNKNOWN = 0,
PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1,
PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2,
PUBLISH_ERROR_CODE_ENCODE_ERROR = 3,
PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4.
- Rename PublishResult.eventId => event_id (same tag 1, same wire type,
same generated Go field 'EventId').
Single-shot client (pkg/chipingress/client.go):
- BatchOpt + WithTransactionEnabled(bool) helper.
- EventsToBatchWithOpts(events, opts...) variadic constructor.
- EventsToBatch and EventsToBatchWithOpts default-populate
Options{TransactionEnabled: false} so client intent is always
explicit on the wire (defensive against future server-default drift).
- Type aliases in types.go: PublishOptions, PublishError,
PublishErrorCode.
Batch client (pkg/chipingress/batch):
- New batching client (NewBatchClient) that accumulates messages and
flushes by batch size, byte budget, or interval.
- transactionEnabled flag (default false = partial delivery);
Opt: WithTransactionEnabled(bool).
- newBatchRequest always emits PublishOptions explicitly (both true and
false) to match the single-shot client's contract.
- splitMessagesByRequestSize splits oversized batches by gRPC max
request size, dropping any single event that exceeds the limit.
- Per-event callbacks: completeBatchCallbacksFromResults dispatches
PublishResult outcomes; partial-delivery path returns *PublishError
per failed event; mismatch detection emits a synthetic
ErrCodeResultsMismatch when results length != messages length;
positional event_id mismatch is logged and counted via
resultsMismatchTotal.
- PublishError{Code chipingress.PublishErrorCode, Reason string} with
re-exported ErrCode* constants.
- OTel metrics: sendRequestsTotal, requestSizeMessages,
requestSizeBytes, requestLatencyMS, configInfo (carries
transaction_enabled bool attribute), batchSplitsTotal,
resultsMismatchTotal.
Tests:
- Proto roundtrip (PublishResult with PublishError;
PublishOptions.GetTransactionEnabled edge cases).
- Single-shot helper covers default, WithTransactionEnabled(true),
WithTransactionEnabled(false).
- Batch client: partial-delivery callbacks, RPC error fallback,
WithTransactionEnabled(true) wire matcher, results-mismatch
synthetic error, split-on-size with explicit-Options sizing.
56d73e9 to
14df06f
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds additive proto/API support for PublishBatch options and per-event partial-delivery reporting, and wires those semantics through both the single-shot helper and the existing batching client (including callback dispatch and OTel metrics), with accompanying tests.
Changes:
- Extend
chip_ingress.protowithPublishOptions,PublishError{code,reason}, andPublishResult.error, plusCloudEventBatch.options. - Add single-shot batch construction helpers (
BatchOpt,WithTransactionEnabled,EventsToBatchWithOpts) that always emit explicitPublishOptionson the wire. - Extend the batch client to size/split requests including options, dispatch per-event callback errors from
PublishResponse.results, and add mismatch observability.
Reviewed changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/chipingress/types.go | Re-exports new proto types (options/error/code) through the chipingress package. |
| pkg/chipingress/pb/chip_ingress.proto | Adds publish options + per-event error/result schema and documents partial vs transactional semantics. |
| pkg/chipingress/pb/chip_ingress.pb.go | Regenerated Go bindings for the updated proto (including PublishResult.error). |
| pkg/chipingress/pb/chip_ingress_grpc.pb.go | Regenerated gRPC stubs + updated service docstrings. |
| pkg/chipingress/client.go | Adds batch construction options and defaults to explicit transaction_enabled=false. |
| pkg/chipingress/client_test.go | Adds proto/type roundtrip and helper defaulting tests. |
| pkg/chipingress/batch/client.go | Adds transaction option wiring, per-event callback dispatch, mismatch metric, and request sizing with options. |
| pkg/chipingress/batch/client_test.go | Adds coverage for partial-delivery callbacks, mismatch behavior, wire option emission, and sizing. |
Files not reviewed (2)
- pkg/chipingress/pb/chip_ingress.pb.go: Language not supported
- pkg/chipingress/pb/chip_ingress_grpc.pb.go: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN | ||
| ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED | ||
| ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING | ||
| ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR | ||
| ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION |
| b.log.Errorw("failed to publish batch", "error", err) | ||
| b.completeBatchCallbacks(batchMessages, err) | ||
| } else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 { | ||
| b.completeBatchCallbacksFromResults(batchMessages, resp.Results) | ||
| } else { |
| state protoimpl.MessageState `protogen:"open.v1"` | ||
| EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` | ||
| EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` | ||
| Error *PublishError `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` | ||
| unknownFields protoimpl.UnknownFields |
| // Client is a batching client that accumulates messages and sends them in batches. | ||
| type Client struct { |
| log *zap.SugaredLogger | ||
| callbackWg sync.WaitGroup | ||
| shutdownTimeout time.Duration |
| batchInterval: 100 * time.Millisecond, | ||
| maxPublishTimeout: 5 * time.Second, | ||
| stopCh: make(chan struct{}), | ||
| callbackWg: sync.WaitGroup{}, | ||
| shutdownTimeout: 5 * time.Second, |
Summary
Adds proto types and client wiring for chipingress batch publishing with per-event partial-delivery semantics.
All wire-format changes are additive and backwards-compatible with
origin/main.Proto (
pkg/chipingress/pb/chip_ingress.proto)PublishOptions { optional bool transaction_enabled = 1 }false⇒ partial delivery (server default): valid events are produced, per-event errors are reported inPublishResponse.results.true⇒ all-or-nothing: any per-event failure fails the entire batch.CloudEventBatch.options = 2(PublishOptions).PublishResult.error = 2(PublishError).PublishError { PublishErrorCode error_code, string reason }.PublishErrorCodeenum:PUBLISH_ERROR_CODE_UNKNOWN = 0PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2PUBLISH_ERROR_CODE_ENCODE_ERROR = 3PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4PublishResult.eventId→event_id(same tag 1, same wire type, generated Go fieldEventIdunchanged).Single-shot client (
pkg/chipingress/client.go)BatchOpt+WithTransactionEnabled(bool)helper.EventsToBatchWithOpts(events, opts...)variadic constructor.EventsToBatchandEventsToBatchWithOptsdefault-populateOptions{TransactionEnabled: false}so client intent is always explicit on the wire (defensive against any future server-default drift; server treats unset and explicit-false identically).types.go:PublishOptions,PublishError,PublishErrorCode.Batch client (
pkg/chipingress/batch) — extend existing packageclient.goonly;batcher.goandstop_chan.gountouched).transactionEnabledfield (defaultfalse= partial delivery);Opt: WithTransactionEnabled(bool).newBatchRequestalways emitsPublishOptionsexplicitly (bothtrueandfalse) matching the single-shot client contract.splitMessagesByRequestSizenow sizes requests withOptionspopulated so the byte budget matches the wire form.completeBatchCallbacksFromResults:*PublishErrorper failed event.ErrCodeResultsMismatchwhenlen(results) != len(messages).event_idmismatch is logged and counted via theresultsMismatchTotalmetric.PublishError{Code chipingress.PublishErrorCode, Reason string}with re-exportedErrCode*constants.Observability (OTel metrics)
configInfogauge gains atransaction_enabledbool attribute (reflects the effective client flag,falseby default).resultsMismatchTotalcounter (incremented on positionalevent_idmismatch).sendRequestsTotal,requestSizeMessages,requestSizeBytes,requestLatencyMS,batchSplitsTotalare unchanged.Tests
PublishResultwithPublishError;PublishOptions.GetTransactionEnablededge cases.WithTransactionEnabled(true),WithTransactionEnabled(false).WithTransactionEnabled(true)wire matcher, results-mismatch synthetic error, split-on-size with explicit-Optionssizing.Compatibility vs
origin/maineventId → event_idis a name-only change, generated Go fieldEventIdand binary wire unchanged).EventsToBatchnow returns a batch withOptions{TransactionEnabled: false}populated where it was previouslynil. Server treats both identically.