From 14df06f2f13e1ce6ab6c43a2a41735266594ce1c Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Sat, 30 May 2026 00:23:42 -0400 Subject: [PATCH] chipingress: batch partial delivery support Add proto types and client wiring for chipingress batch publishing with per-event partial-delivery semantics. All wire-format changes are additive and backwards-compatible with origin/main. Proto (pkg/chipingress/pb/chip_ingress.proto): - Add PublishOptions{optional bool transaction_enabled = 1}. Unset or false => partial delivery (server default): valid events are produced, per-event errors reported in PublishResponse.results. Explicit true => all-or-nothing: any per-event failure fails the batch. - Add CloudEventBatch.options = 2 (PublishOptions). - Add PublishResult.error = 2 (PublishError). - Add PublishError{PublishErrorCode error_code, string reason}. - Add PublishErrorCode enum: PUBLISH_ERROR_CODE_UNKNOWN = 0, PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1, PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2, PUBLISH_ERROR_CODE_ENCODE_ERROR = 3, PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4. - Rename PublishResult.eventId => event_id (same tag 1, same wire type, same generated Go field 'EventId'). Single-shot client (pkg/chipingress/client.go): - BatchOpt + WithTransactionEnabled(bool) helper. - EventsToBatchWithOpts(events, opts...) variadic constructor. - EventsToBatch and EventsToBatchWithOpts default-populate Options{TransactionEnabled: false} so client intent is always explicit on the wire (defensive against future server-default drift). - Type aliases in types.go: PublishOptions, PublishError, PublishErrorCode. Batch client (pkg/chipingress/batch): - New batching client (NewBatchClient) that accumulates messages and flushes by batch size, byte budget, or interval. - transactionEnabled flag (default false = partial delivery); Opt: WithTransactionEnabled(bool). - newBatchRequest always emits PublishOptions explicitly (both true and false) to match the single-shot client's contract. - splitMessagesByRequestSize splits oversized batches by gRPC max request size, dropping any single event that exceeds the limit. - Per-event callbacks: completeBatchCallbacksFromResults dispatches PublishResult outcomes; partial-delivery path returns *PublishError per failed event; mismatch detection emits a synthetic ErrCodeResultsMismatch when results length != messages length; positional event_id mismatch is logged and counted via resultsMismatchTotal. - PublishError{Code chipingress.PublishErrorCode, Reason string} with re-exported ErrCode* constants. - OTel metrics: sendRequestsTotal, requestSizeMessages, requestSizeBytes, requestLatencyMS, configInfo (carries transaction_enabled bool attribute), batchSplitsTotal, resultsMismatchTotal. Tests: - Proto roundtrip (PublishResult with PublishError; PublishOptions.GetTransactionEnabled edge cases). - Single-shot helper covers default, WithTransactionEnabled(true), WithTransactionEnabled(false). - Batch client: partial-delivery callbacks, RPC error fallback, WithTransactionEnabled(true) wire matcher, results-mismatch synthetic error, split-on-size with explicit-Options sizing. --- pkg/chipingress/batch/client.go | 207 +++++++--- pkg/chipingress/batch/client_test.go | 415 ++++++++++++++++++++- pkg/chipingress/client.go | 37 +- pkg/chipingress/client_test.go | 107 ++++++ pkg/chipingress/pb/chip_ingress.pb.go | 352 +++++++++++++---- pkg/chipingress/pb/chip_ingress.proto | 78 +++- pkg/chipingress/pb/chip_ingress_grpc.pb.go | 38 +- pkg/chipingress/types.go | 3 + 8 files changed, 1079 insertions(+), 158 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index 93c9efd049..42f9c437d5 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -29,6 +29,28 @@ type seqnumKey struct { eventType string } +// PublishError is an error returned per-event when partial delivery is enabled +// and an individual event fails validation/production. +type PublishError struct { + Code chipingress.PublishErrorCode + Reason string +} + +func (e *PublishError) Error() string { + return fmt.Sprintf("%s: %s", e.Code.String(), e.Reason) +} + +// Error codes returned by the server in PublishError.Code. +// Re-exported from the proto package for convenience. +const ( + ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN + ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED + ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING + ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR + ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION + ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code +) + // Client is a batching client that accumulates messages and sends them in batches. type Client struct { client chipingress.Client @@ -36,33 +58,36 @@ type Client struct { maxGRPCRequestSize int // configured max, used for metrics/error reporting effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting cloneEvent bool - maxConcurrentSends chan struct{} - batchInterval time.Duration - maxPublishTimeout time.Duration - messageBuffer chan *messageWithCallback - stopCh stopCh - log *zap.SugaredLogger - callbackWg sync.WaitGroup - shutdownTimeout time.Duration - shutdownOnce sync.Once - batcherDone chan struct{} - started bool - counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() + maxConcurrentSends chan struct{} + batchInterval time.Duration + maxPublishTimeout time.Duration + messageBuffer chan *messageWithCallback + stopCh stopCh + log *zap.SugaredLogger + callbackWg sync.WaitGroup + shutdownTimeout time.Duration + shutdownOnce sync.Once + batcherDone chan struct{} + started bool + counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() metrics batchClientMetrics + + transactionEnabled bool } type batchClientMetrics struct { - sendRequestsTotal otelmetric.Int64Counter - requestSizeMessages otelmetric.Int64Histogram - requestSizeBytes otelmetric.Int64Histogram - requestLatencyMS otelmetric.Float64Histogram - configInfo otelmetric.Int64Gauge - batchSplitsTotal otelmetric.Int64Counter - batchSizeAttr otelmetric.MeasurementOption - maxGRPCReqSizeAttr otelmetric.MeasurementOption - successStatusAttr otelmetric.MeasurementOption - failureStatusAttr otelmetric.MeasurementOption + sendRequestsTotal otelmetric.Int64Counter + requestSizeMessages otelmetric.Int64Histogram + requestSizeBytes otelmetric.Int64Histogram + requestLatencyMS otelmetric.Float64Histogram + configInfo otelmetric.Int64Gauge + batchSplitsTotal otelmetric.Int64Counter + resultsMismatchTotal otelmetric.Int64Counter + batchSizeAttr otelmetric.MeasurementOption + maxGRPCReqSizeAttr otelmetric.MeasurementOption + successStatusAttr otelmetric.MeasurementOption + failureStatusAttr otelmetric.MeasurementOption } // Opt is a functional option for configuring the batch Client. @@ -77,14 +102,15 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { maxGRPCRequestSize: 10 * 1024 * 1024, effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead, cloneEvent: true, - maxConcurrentSends: make(chan struct{}, 1), - messageBuffer: make(chan *messageWithCallback, 200), - batchInterval: 100 * time.Millisecond, - maxPublishTimeout: 5 * time.Second, - stopCh: make(chan struct{}), - callbackWg: sync.WaitGroup{}, - shutdownTimeout: 5 * time.Second, - batcherDone: make(chan struct{}), + transactionEnabled: false, + maxConcurrentSends: make(chan struct{}, 1), + messageBuffer: make(chan *messageWithCallback, 200), + batchInterval: 100 * time.Millisecond, + maxPublishTimeout: 5 * time.Second, + stopCh: make(chan struct{}), + callbackWg: sync.WaitGroup{}, + shutdownTimeout: 5 * time.Second, + batcherDone: make(chan struct{}), } for _, opt := range opts { @@ -263,12 +289,12 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) go func() { defer func() { <-b.maxConcurrentSends }() - splitBatches := splitMessagesByRequestSize(messages, b.effectiveMaxRequestSize) + splitBatches := splitMessagesByRequestSize(messages, b.effectiveMaxRequestSize, b.transactionEnabled) if len(splitBatches) > 1 { b.metrics.batchSplitsTotal.Add(ctx, 1) } for _, batchMessages := range splitBatches { - batchReq, batchBytes := newBatchRequest(batchMessages) + batchReq, batchBytes := newBatchRequest(batchMessages, b.transactionEnabled) if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize { err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize) b.metrics.recordSend(ctx, len(batchMessages), batchBytes, 0, false) @@ -280,14 +306,18 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) // this is specifically to prevent long running network calls ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) startedAt := time.Now() - _, err := b.client.PublishBatch(ctxTimeout, batchReq) + resp, err := b.client.PublishBatch(ctxTimeout, batchReq) cancel() b.metrics.recordSend(ctx, len(batchMessages), batchBytes, time.Since(startedAt), err == nil) if err != nil { b.log.Errorw("failed to publish batch", "error", err) + b.completeBatchCallbacks(batchMessages, err) + } else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 { + b.completeBatchCallbacksFromResults(batchMessages, resp.Results) + } else { + b.completeBatchCallbacks(batchMessages, nil) } - b.completeBatchCallbacks(batchMessages, err) } }() } @@ -305,6 +335,62 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err }) } +// completeBatchCallbacksFromResults dispatches per-event callbacks using the server's +// PublishResult slice. Results are matched to messages by index (server contract guarantees +// positional correspondence). If a result has a non-nil Error, the callback receives a +// PublishError; otherwise it receives nil. +// +// Defensive behaviour: +// - If len(results) < len(messages): remaining callbacks get a synthetic RESULTS_MISMATCH error. +// - If len(results) > len(messages): extras are ignored. +// - If results[i].EventId != messages[i].event.Id: a warning is logged. +func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallback, results []*chipingress.PublishResult) { + if len(results) != len(messages) { + b.log.Warnw("publish results length mismatch", + "results", len(results), + "messages", len(messages), + ) + b.metrics.resultsMismatchTotal.Add(context.Background(), 1) + } + + b.callbackWg.Go(func() { + for i, msg := range messages { + if msg.callback == nil { + continue + } + if i >= len(results) { + msg.callback(&PublishError{ + Code: ErrCodeResultsMismatch, + Reason: fmt.Sprintf("server returned %d results for %d events", len(results), len(messages)), + }) + continue + } + result := results[i] + if result == nil { + msg.callback(nil) + continue + } + // Defensive: warn on ID mismatch but still dispatch by index. + if result.EventId != "" && msg.event.Id != "" && result.EventId != msg.event.Id { + b.log.Warnw("publish result event_id mismatch at index", + "index", i, + "expected", msg.event.Id, + "got", result.EventId, + ) + b.metrics.resultsMismatchTotal.Add(context.Background(), 1) + } + if result.Error != nil { + msg.callback(&PublishError{ + Code: result.Error.ErrorCode, + Reason: result.Error.Reason, + }) + } else { + msg.callback(nil) + } + } + }) +} + // grpcFramingOverhead accounts for gRPC framing, HTTP/2 headers, auth tokens, // tracing metadata, and other per-request overhead not captured by proto.Size. const grpcFramingOverhead = 10 * 1024 // 10 KiB @@ -314,7 +400,7 @@ const grpcFramingOverhead = 10 * 1024 // 10 KiB // reservation remains meaningful. const minMaxGRPCRequestSize = 1024 * 1024 // 1 MiB -func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback { +func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int, transactionEnabled bool) [][]*messageWithCallback { if len(messages) == 0 { return nil } @@ -326,7 +412,7 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize current := make([]*messageWithCallback, 0, len(messages)) for _, msg := range messages { candidate := append(current, msg) - _, candidateBytes := newBatchRequest(candidate) + _, candidateBytes := newBatchRequest(candidate, transactionEnabled) if len(current) > 0 && candidateBytes > maxRequestSize { batches = append(batches, current) current = []*messageWithCallback{msg} @@ -340,12 +426,20 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize return batches } -func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) { +func newBatchRequest(messages []*messageWithCallback, transactionEnabled bool) (*chipingress.CloudEventBatch, int) { events := make([]*chipingress.CloudEventPb, len(messages)) for i, msg := range messages { events[i] = msg.event } - batchReq := &chipingress.CloudEventBatch{Events: events} + // Always emit PublishOptions so the wire form unambiguously reflects + // client intent. The server treats unset and explicit false identically, + // but explicit-false is defensive against any future server-default drift + // and makes traces/logs self-describing. + te := transactionEnabled + batchReq := &chipingress.CloudEventBatch{ + Events: events, + Options: &chipingress.PublishOptions{TransactionEnabled: &te}, + } return batchReq, proto.Size(batchReq) } @@ -417,6 +511,20 @@ func WithLogger(log *zap.SugaredLogger) Opt { } } +// WithTransactionEnabled sets PublishOptions.transaction_enabled on every +// batch request. The option is always emitted on the wire so client intent +// is explicit in traces/logs; the server treats unset and explicit false +// identically (partial delivery). +// - false (the default for NewBatchClient): partial delivery. Valid events +// are produced and per-event errors are returned for invalid ones rather +// than failing the entire batch. +// - true: all-or-nothing. Any per-event failure fails the entire batch. +func WithTransactionEnabled(transactionEnabled bool) Opt { + return func(c *Client) { + c.transactionEnabled = transactionEnabled + } +} + func newBatchClientMetrics() (batchClientMetrics, error) { meter := otel.Meter("chipingress/batch_client") sendRequestsTotal, err := meter.Int64Counter( @@ -467,15 +575,23 @@ func newBatchClientMetrics() (batchClientMetrics, error) { if err != nil { return batchClientMetrics{}, err } + resultsMismatchTotal, err := meter.Int64Counter( + "chip_ingress.batch.results_mismatch_total", + otelmetric.WithDescription("Total publish responses where result count or event IDs did not match the request"), + otelmetric.WithUnit("{mismatch}"), + ) + if err != nil { + return batchClientMetrics{}, err + } return batchClientMetrics{ - sendRequestsTotal: sendRequestsTotal, - - requestSizeMessages: requestSizeMessages, - requestSizeBytes: requestSizeBytes, - requestLatencyMS: requestLatencyMS, - configInfo: configInfo, - batchSplitsTotal: batchSplitsTotal, + sendRequestsTotal: sendRequestsTotal, + requestSizeMessages: requestSizeMessages, + requestSizeBytes: requestSizeBytes, + requestLatencyMS: requestLatencyMS, + configInfo: configInfo, + batchSplitsTotal: batchSplitsTotal, + resultsMismatchTotal: resultsMismatchTotal, successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet( attribute.String("status", "success"), )), @@ -500,6 +616,7 @@ func (m *batchClientMetrics) recordConfig(ctx context.Context, c *Client) { attribute.Int64("max_publish_timeout_ms", c.maxPublishTimeout.Milliseconds()), attribute.Int64("shutdown_timeout_ms", c.shutdownTimeout.Milliseconds()), attribute.Bool("clone_event", c.cloneEvent), + attribute.Bool("transaction_enabled", c.transactionEnabled), attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize), )) } diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index e698e75c61..ce29dd0c0e 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -2,6 +2,7 @@ package batch import ( "context" + "errors" "sort" "strconv" "strings" @@ -309,9 +310,21 @@ func TestSendBatch(t *testing.T) { largeTestEvent("test-id-4"), largeTestEvent("test-id-5"), } - maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]}) - require.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:1]}), maxRequestSize) - require.Greater(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:3]}), maxRequestSize) + // Default NewBatchClient is partial delivery but always emits + // PublishOptions{TransactionEnabled: false} to make intent explicit + // on the wire. Size via newBatchRequest so we match what the + // production path actually emits. + sizeOf := func(evs []*chipingress.CloudEventPb) int { + msgs := make([]*messageWithCallback, len(evs)) + for i, e := range evs { + msgs[i] = &messageWithCallback{event: e} + } + _, n := newBatchRequest(msgs, false) + return n + } + maxRequestSize := sizeOf(events[:2]) + require.LessOrEqual(t, sizeOf(events[:1]), maxRequestSize) + require.Greater(t, sizeOf(events[:3]), maxRequestSize) mockClient := mocks.NewClient(t) done := make(chan struct{}) @@ -389,8 +402,11 @@ func TestSendBatch(t *testing.T) { largeTestEvent("split-metric-2"), largeTestEvent("split-metric-3"), } - // Set maxRequestSize so that 2 events fit but 3 do not, forcing a split. - maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]}) + // Set maxRequestSize so that 2 events fit but 3 do not, forcing a + // split. Size via newBatchRequest to include the always-emitted + // PublishOptions, matching the production wire form. + msgs2 := []*messageWithCallback{{event: events[0]}, {event: events[1]}} + _, maxRequestSize := newBatchRequest(msgs2, false) mockClient := mocks.NewClient(t) done := make(chan struct{}) @@ -439,7 +455,8 @@ func TestSendBatch(t *testing.T) { mockClient := mocks.NewClient(t) callbackDone := make(chan error, 1) event := largeTestEvent("oversized-id") - maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1 + _, oneEventSize := newBatchRequest([]*messageWithCallback{{event: event}}, false) + maxRequestSize := oneEventSize - 1 client, err := NewBatchClient(mockClient) require.NoError(t, err) @@ -1551,7 +1568,7 @@ func TestBatchClient_Metrics(t *testing.T) { func TestSplitMessagesByRequestSize(t *testing.T) { t.Run("empty messages returns nil", func(t *testing.T) { - result := splitMessagesByRequestSize(nil, 1024) + result := splitMessagesByRequestSize(nil, 1024, false) assert.Nil(t, result) }) @@ -1560,7 +1577,7 @@ func TestSplitMessagesByRequestSize(t *testing.T) { {event: largeTestEvent("a")}, {event: largeTestEvent("b")}, } - result := splitMessagesByRequestSize(msgs, 0) + result := splitMessagesByRequestSize(msgs, 0, false) require.Len(t, result, 1) assert.Len(t, result[0], 2) }) @@ -1569,7 +1586,7 @@ func TestSplitMessagesByRequestSize(t *testing.T) { msgs := []*messageWithCallback{ {event: largeTestEvent("a")}, } - result := splitMessagesByRequestSize(msgs, -1) + result := splitMessagesByRequestSize(msgs, -1, false) require.Len(t, result, 1) assert.Len(t, result[0], 1) }) @@ -1580,7 +1597,7 @@ func TestSplitMessagesByRequestSize(t *testing.T) { {event: largeTestEvent("b")}, } allBatch := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event, msgs[1].event}} - result := splitMessagesByRequestSize(msgs, proto.Size(allBatch)+100) + result := splitMessagesByRequestSize(msgs, proto.Size(allBatch)+100, false) require.Len(t, result, 1) assert.Len(t, result[0], 2) }) @@ -1593,7 +1610,7 @@ func TestSplitMessagesByRequestSize(t *testing.T) { } singleBatch := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event}} // Set limit to exactly fit one event but not two. - result := splitMessagesByRequestSize(msgs, proto.Size(singleBatch)) + result := splitMessagesByRequestSize(msgs, proto.Size(singleBatch), false) require.Len(t, result, 3) for _, batch := range result { assert.Len(t, batch, 1) @@ -1774,3 +1791,379 @@ func hasIntAttr(set attribute.Set, key string, want int) bool { } return false } + +func TestTransactionEnabledCallbacks(t *testing.T) { + t.Run("per-event error dispatched to callback", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + {EventId: "e2", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(2), Reason: "no schema"}}, + {EventId: "e3"}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + var mu sync.Mutex + callbackResults := make(map[string]error) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); callbackResults["e1"] = err; mu.Unlock() }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); callbackResults["e2"] = err; mu.Unlock() }}, + {event: &chipingress.CloudEventPb{Id: "e3", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); callbackResults["e3"] = err; mu.Unlock() }}, + } + + client.sendBatch(t.Context(), messages) + // Wait for callbacks + time.Sleep(50 * time.Millisecond) + client.callbackWg.Wait() + + mu.Lock() + defer mu.Unlock() + require.NoError(t, callbackResults["e1"]) + require.Error(t, callbackResults["e2"]) + assert.Equal(t, "PUBLISH_ERROR_CODE_SCHEMA_MISSING: no schema", callbackResults["e2"].Error()) + var re *PublishError + require.ErrorAs(t, callbackResults["e2"], &re) + assert.Equal(t, ErrCodeSchemaMissing, re.Code) + assert.NoError(t, callbackResults["e3"]) + }) + + t.Run("all success results yield nil callbacks", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + {EventId: "e2"}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + for range 2 { + assert.NoError(t, <-errs) + } + }) + + t.Run("RPC error still fails all callbacks when partial delivery enabled", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, errors.New("connection refused")) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + for range 2 { + err := <-errs + assert.EqualError(t, err, "connection refused") + } + }) + + t.Run("WithTransactionEnabled(true) emits PublishOptions{TransactionEnabled: true}", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return batch.Options != nil && batch.Options.GetTransactionEnabled() + })). + Return(&chipingress.PublishResponse{}, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(true)) + require.NoError(t, err) + + errs := make(chan error, 1) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + require.NoError(t, <-errs) + mockClient.AssertExpectations(t) + }) +} + +func TestTransactionEnabledEdgeCases(t *testing.T) { + t.Run("fewer results than messages fails remaining with RESULTS_MISMATCH", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + // only 1 result for 3 messages + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + var mu sync.Mutex + results := make(map[string]error) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e1"] = err; mu.Unlock() }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e2"] = err; mu.Unlock() }}, + {event: &chipingress.CloudEventPb{Id: "e3", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { mu.Lock(); results["e3"] = err; mu.Unlock() }}, + } + + client.sendBatch(t.Context(), messages) + // Wait for send goroutine to finish (acquire+release semaphore slot). + client.maxConcurrentSends <- struct{}{} + <-client.maxConcurrentSends + client.callbackWg.Wait() + + mu.Lock() + defer mu.Unlock() + require.NoError(t, results["e1"]) + require.Error(t, results["e2"]) + assert.Contains(t, results["e2"].Error(), "server returned 1 results for 3 events") + require.Error(t, results["e3"]) + assert.Contains(t, results["e3"].Error(), "server returned 1 results for 3 events") + }) + + t.Run("more results than messages does not panic", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + {EventId: "e2"}, + {EventId: "extra"}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + for range 2 { + assert.NoError(t, <-errs) + } + }) + + t.Run("nil result entry treated as success", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + nil, + {EventId: "e2"}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + for range 2 { + assert.NoError(t, <-errs) + } + }) + + t.Run("all events fail individually", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(1), Reason: "bad1"}}, + {EventId: "e2", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(2), Reason: "bad2"}}, + }, + }, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + err1 := <-errs + err2 := <-errs + require.Error(t, err1) + require.Error(t, err2) + // Both should be PublishError + var re1, re2 *PublishError + require.ErrorAs(t, err1, &re1) + require.ErrorAs(t, err2, &re2) + }) + + t.Run("nil response with partial delivery enabled treats as all success", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return((*chipingress.PublishResponse)(nil), nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 2) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + for range 2 { + assert.NoError(t, <-errs) + } + }) + + t.Run("empty results with partial delivery enabled treats as all success", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{Results: []*chipingress.PublishResult{}}, nil) + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + + errs := make(chan error, 1) + messages := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs <- err }}, + } + + client.sendBatch(t.Context(), messages) + client.callbackWg.Wait() + + assert.NoError(t, <-errs) + }) + + t.Run("PublishError with unknown code", func(t *testing.T) { + re := &PublishError{Code: 0, Reason: ""} + assert.Equal(t, "PUBLISH_ERROR_CODE_UNKNOWN: ", re.Error()) + }) + + t.Run("split batch with mixed partial and RPC failure", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + + callCount := 0 + var mu sync.Mutex + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Run(func(args mock.Arguments) { + mu.Lock() + callCount++ + count := callCount + mu.Unlock() + // We can't easily control per-call return values with Run, + // so we use a different approach below. + _ = count + }) + + // Use a more targeted approach: mock returns different values per call + mockClient.ExpectedCalls = nil // reset + mockClient.EXPECT().Close().Return(nil).Maybe() + + // First call: partial success + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{ + Results: []*chipingress.PublishResult{ + {EventId: "e1"}, + {EventId: "e2", Error: &chipingress.PublishError{ErrorCode: chipingress.PublishErrorCode(2), Reason: "no schema"}}, + }, + }, nil).Once() + // Second call: RPC error + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, errors.New("kafka unavailable")).Once() + + client, err := NewBatchClient(mockClient, WithTransactionEnabled(false)) + require.NoError(t, err) + // Set very small effective size so 2 events split into 2 batches of 1 each... + // Actually we need 2 events per batch. Let's use direct sendBatch calls instead. + + // Batch 1: gets partial response + errs1 := make(chan error, 2) + batch1 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e1", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs1 <- err }}, + {event: &chipingress.CloudEventPb{Id: "e2", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs1 <- err }}, + } + client.sendBatch(t.Context(), batch1) + + // Batch 2: gets RPC error + errs2 := make(chan error, 1) + batch2 := []*messageWithCallback{ + {event: &chipingress.CloudEventPb{Id: "e3", Source: "s", SpecVersion: "1.0", Type: "t"}, callback: func(err error) { errs2 <- err }}, + } + client.sendBatch(t.Context(), batch2) + + // Wait for semaphore release + time.Sleep(50 * time.Millisecond) + client.callbackWg.Wait() + + // Batch 1: e1 success, e2 partial error + require.NoError(t, <-errs1) + err2 := <-errs1 + require.Error(t, err2) + assert.Contains(t, err2.Error(), "SCHEMA_MISSING") + + // Batch 2: RPC failure + err3 := <-errs2 + require.Error(t, err3) + assert.EqualError(t, err3, "kafka unavailable") + }) +} diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index 2c760ab9d7..61a84265f3 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -327,9 +327,41 @@ func ProtoToEvent(eventPb *CloudEventPb) (CloudEvent, error) { return *event, nil } +// BatchOpt configures optional fields on a CloudEventBatch. +type BatchOpt func(*CloudEventBatch) + +// WithTransactionEnabled sets PublishOptions.transaction_enabled on a single +// batch. The option is always emitted on the wire (both true and false) so the +// client's intent is explicit; the server treats unset and explicit false +// identically (partial delivery). +// - true: all-or-nothing; any per-event failure fails the entire batch. +// - false: partial delivery; valid events are produced and per-event errors +// are returned for invalid ones. +// +// Omitting this option leaves the default applied by EventsToBatchWithOpts in +// place: PublishOptions{TransactionEnabled: false} (explicit partial delivery). +func WithTransactionEnabled(enabled bool) BatchOpt { + return func(b *CloudEventBatch) { + if b.Options == nil { + b.Options = &pb.PublishOptions{} + } + e := enabled + b.Options.TransactionEnabled = &e + } +} + func EventsToBatch(events []CloudEvent) (*CloudEventBatch, error) { + return EventsToBatchWithOpts(events) +} + +func EventsToBatchWithOpts(events []CloudEvent, opts ...BatchOpt) (*CloudEventBatch, error) { + // Default to explicit transaction_enabled=false (partial delivery) so the + // wire form unambiguously reflects client intent. Options remain mutable + // via BatchOpts below. + defaultFalse := false batch := &CloudEventBatch{ - Events: make([]*CloudEventPb, 0, len(events)), + Events: make([]*CloudEventPb, 0, len(events)), + Options: &pb.PublishOptions{TransactionEnabled: &defaultFalse}, } for _, event := range events { eventPb, err := EventToProto(event) @@ -338,6 +370,9 @@ func EventsToBatch(events []CloudEvent) (*CloudEventBatch, error) { } batch.Events = append(batch.Events, eventPb) } + for _, opt := range opts { + opt(batch) + } return batch, nil } diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 12d567d0d1..68b8831d2f 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" ce "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -730,3 +731,109 @@ func TestClient_RegisterSchemas(t *testing.T) { assert.EqualError(t, err, "failed to register schema: registration failed") }) } + +func TestTransactionEnabledProtoTypes(t *testing.T) { + t.Run("PublishResult roundtrip with Error", func(t *testing.T) { + original := &pb.PublishResult{ + EventId: "evt-123", + Error: &pb.PublishError{ + ErrorCode: pb.PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING, + Reason: "subject test-domain-test.Entity not found", + }, + } + data, err := proto.Marshal(original) + require.NoError(t, err) + + decoded := &pb.PublishResult{} + err = proto.Unmarshal(data, decoded) + require.NoError(t, err) + + assert.Equal(t, "evt-123", decoded.EventId) + require.NotNil(t, decoded.Error) + assert.Equal(t, pb.PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING, decoded.Error.ErrorCode) + assert.Equal(t, "subject test-domain-test.Entity not found", decoded.Error.Reason) + }) + + t.Run("PublishResult success has nil Error", func(t *testing.T) { + original := &pb.PublishResult{EventId: "evt-ok"} + data, err := proto.Marshal(original) + require.NoError(t, err) + + decoded := &pb.PublishResult{} + err = proto.Unmarshal(data, decoded) + require.NoError(t, err) + + assert.Equal(t, "evt-ok", decoded.EventId) + assert.Nil(t, decoded.Error) + }) + + t.Run("CloudEventBatch with PublishOptions roundtrip", func(t *testing.T) { + txn := true + batch := &pb.CloudEventBatch{ + Events: []*cepb.CloudEvent{{Id: "e1", Source: "src", SpecVersion: "1.0", Type: "t"}}, + Options: &pb.PublishOptions{TransactionEnabled: &txn}, + } + data, err := proto.Marshal(batch) + require.NoError(t, err) + + decoded := &pb.CloudEventBatch{} + err = proto.Unmarshal(data, decoded) + require.NoError(t, err) + + require.NotNil(t, decoded.Options) + assert.True(t, decoded.Options.GetTransactionEnabled()) + assert.Len(t, decoded.Events, 1) + assert.Equal(t, "e1", decoded.Events[0].Id) + }) + + t.Run("GetTransactionEnabled edge cases", func(t *testing.T) { + // nil PublishOptions + var nilOpts *pb.PublishOptions + assert.False(t, nilOpts.GetTransactionEnabled()) + + // TransactionEnabled unset (nil pointer) + emptyOpts := &pb.PublishOptions{} + assert.False(t, emptyOpts.GetTransactionEnabled()) + + // Explicitly true + trueVal := true + assert.True(t, (&pb.PublishOptions{TransactionEnabled: &trueVal}).GetTransactionEnabled()) + + // Explicitly false + falseVal := false + assert.False(t, (&pb.PublishOptions{TransactionEnabled: &falseVal}).GetTransactionEnabled()) + }) + + t.Run("EventsToBatchWithOpts with WithTransactionEnabled(false) emits PublishOptions{TransactionEnabled: false}", func(t *testing.T) { + event, err := NewEvent("domain", "entity", []byte("data"), nil) + require.NoError(t, err) + + batch, err := EventsToBatchWithOpts([]CloudEvent{event}, WithTransactionEnabled(false)) + require.NoError(t, err) + require.NotNil(t, batch.Options, "client always emits PublishOptions to make intent explicit on the wire") + require.NotNil(t, batch.Options.TransactionEnabled) + assert.False(t, batch.Options.GetTransactionEnabled()) + }) + + t.Run("EventsToBatchWithOpts with WithTransactionEnabled(true) emits PublishOptions{TransactionEnabled: true}", func(t *testing.T) { + event, err := NewEvent("domain", "entity", []byte("data"), nil) + require.NoError(t, err) + + batch, err := EventsToBatchWithOpts([]CloudEvent{event}, WithTransactionEnabled(true)) + require.NoError(t, err) + require.NotNil(t, batch.Options) + require.NotNil(t, batch.Options.TransactionEnabled) + assert.True(t, batch.Options.GetTransactionEnabled()) + }) + + t.Run("EventsToBatch without options defaults to PublishOptions{TransactionEnabled: false}", func(t *testing.T) { + event, err := NewEvent("domain", "entity", []byte("data"), nil) + require.NoError(t, err) + + batch, err := EventsToBatch([]CloudEvent{event}) + require.NoError(t, err) + require.NotNil(t, batch.Options, "client always emits PublishOptions to make intent explicit on the wire") + require.NotNil(t, batch.Options.TransactionEnabled) + assert.False(t, batch.Options.GetTransactionEnabled()) + }) +} diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 7117f287c6..04f11d943a 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -22,17 +22,190 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type PublishErrorCode int32 + +const ( + // Default value. + PublishErrorCode_PUBLISH_ERROR_CODE_UNKNOWN PublishErrorCode = 0 + // The CloudEvent structure is invalid. Common causes: missing required fields + // (id, source, type, specversion), a nil attribute value, or an unsupported + // attribute type in the extensions map. Fix the CloudEvent and retry. + PublishErrorCode_PUBLISH_ERROR_CODE_VALIDATION_FAILED PublishErrorCode = 1 + // No schema was found in the Schema Registry for the subject derived from + // this event's source and type. The schema must be registered before + // publishing events of this type. Note: registry connectivity failures are + // returned as a gRPC INTERNAL error, not this code. + PublishErrorCode_PUBLISH_ERROR_CODE_SCHEMA_MISSING PublishErrorCode = 2 + // The event payload could not be encoded against its registered schema. + // Common causes: the data bytes are not valid protobuf (e.g. JSON was used + // instead of proto.Marshal), or the event type does not match any message + // descriptor in the registered schema. + PublishErrorCode_PUBLISH_ERROR_CODE_ENCODE_ERROR PublishErrorCode = 3 + // The event source does not map to a recognized domain. Either the source + // field is set to an unknown value or is missing entirely. Verify the source + // field matches a domain configured in chip-ingress. + PublishErrorCode_PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION PublishErrorCode = 4 +) + +// Enum value maps for PublishErrorCode. +var ( + PublishErrorCode_name = map[int32]string{ + 0: "PUBLISH_ERROR_CODE_UNKNOWN", + 1: "PUBLISH_ERROR_CODE_VALIDATION_FAILED", + 2: "PUBLISH_ERROR_CODE_SCHEMA_MISSING", + 3: "PUBLISH_ERROR_CODE_ENCODE_ERROR", + 4: "PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION", + } + PublishErrorCode_value = map[string]int32{ + "PUBLISH_ERROR_CODE_UNKNOWN": 0, + "PUBLISH_ERROR_CODE_VALIDATION_FAILED": 1, + "PUBLISH_ERROR_CODE_SCHEMA_MISSING": 2, + "PUBLISH_ERROR_CODE_ENCODE_ERROR": 3, + "PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION": 4, + } +) + +func (x PublishErrorCode) Enum() *PublishErrorCode { + p := new(PublishErrorCode) + *p = x + return p +} + +func (x PublishErrorCode) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PublishErrorCode) Descriptor() protoreflect.EnumDescriptor { + return file_pb_chip_ingress_proto_enumTypes[0].Descriptor() +} + +func (PublishErrorCode) Type() protoreflect.EnumType { + return &file_pb_chip_ingress_proto_enumTypes[0] +} + +func (x PublishErrorCode) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PublishErrorCode.Descriptor instead. +func (PublishErrorCode) EnumDescriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} +} + +type PublishOptions struct { + state protoimpl.MessageState `protogen:"open.v1"` + // When unset or false, valid events are produced and invalid ones are + // skipped; per-event outcomes are reported in PublishResponse.results + // (partial delivery — the server default). + // When explicitly true, any per-event failure fails the entire batch + // (all-or-nothing). + TransactionEnabled *bool `protobuf:"varint,1,opt,name=transaction_enabled,json=transactionEnabled,proto3,oneof" json:"transaction_enabled,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishOptions) Reset() { + *x = PublishOptions{} + mi := &file_pb_chip_ingress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishOptions) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishOptions) ProtoMessage() {} + +func (x *PublishOptions) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishOptions.ProtoReflect.Descriptor instead. +func (*PublishOptions) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishOptions) GetTransactionEnabled() bool { + if x != nil && x.TransactionEnabled != nil { + return *x.TransactionEnabled + } + return false +} + +// PublishError carries machine- and human-readable detail for a non-produced event. +type PublishError struct { + state protoimpl.MessageState `protogen:"open.v1"` + ErrorCode PublishErrorCode `protobuf:"varint,1,opt,name=error_code,json=errorCode,proto3,enum=chipingress.pb.PublishErrorCode" json:"error_code,omitempty"` + Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishError) Reset() { + *x = PublishError{} + mi := &file_pb_chip_ingress_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishError) ProtoMessage() {} + +func (x *PublishError) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishError.ProtoReflect.Descriptor instead. +func (*PublishError) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} +} + +func (x *PublishError) GetErrorCode() PublishErrorCode { + if x != nil { + return x.ErrorCode + } + return PublishErrorCode_PUBLISH_ERROR_CODE_UNKNOWN +} + +func (x *PublishError) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + // CloudEventBatch is used to send many ChipIngress type CloudEventBatch struct { state protoimpl.MessageState `protogen:"open.v1"` Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + Options *PublishOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *CloudEventBatch) Reset() { *x = CloudEventBatch{} - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +217,7 @@ func (x *CloudEventBatch) String() string { func (*CloudEventBatch) ProtoMessage() {} func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +230,7 @@ func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use CloudEventBatch.ProtoReflect.Descriptor instead. func (*CloudEventBatch) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} } func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { @@ -67,6 +240,13 @@ func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { return nil } +func (x *CloudEventBatch) GetOptions() *PublishOptions { + if x != nil { + return x.Options + } + return nil +} + type PublishResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Results []*PublishResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` @@ -76,7 +256,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -88,7 +268,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -101,7 +281,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} } func (x *PublishResponse) GetResults() []*PublishResult { @@ -111,16 +291,19 @@ func (x *PublishResponse) GetResults() []*PublishResult { return nil } +// PublishResult reports the outcome for one input event. +// Invariant: error is set if and only if the event was not produced to Kafka. type PublishResult struct { state protoimpl.MessageState `protogen:"open.v1"` - EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + Error *PublishError `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *PublishResult) Reset() { *x = PublishResult{} - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +315,7 @@ func (x *PublishResult) String() string { func (*PublishResult) ProtoMessage() {} func (x *PublishResult) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +328,7 @@ func (x *PublishResult) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResult.ProtoReflect.Descriptor instead. func (*PublishResult) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} } func (x *PublishResult) GetEventId() string { @@ -155,6 +338,13 @@ func (x *PublishResult) GetEventId() string { return "" } +func (x *PublishResult) GetError() *PublishError { + if x != nil { + return x.Error + } + return nil +} + // EmptyRequest is just an empty request type EmptyRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -164,7 +354,7 @@ type EmptyRequest struct { func (x *EmptyRequest) Reset() { *x = EmptyRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -176,7 +366,7 @@ func (x *EmptyRequest) String() string { func (*EmptyRequest) ProtoMessage() {} func (x *EmptyRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -189,7 +379,7 @@ func (x *EmptyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use EmptyRequest.ProtoReflect.Descriptor instead. func (*EmptyRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} } // PingResponse responds to pings @@ -202,7 +392,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -214,7 +404,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -227,7 +417,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} } func (x *PingResponse) GetMessage() string { @@ -247,7 +437,7 @@ type StreamEventsRequest struct { func (x *StreamEventsRequest) Reset() { *x = StreamEventsRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -259,7 +449,7 @@ func (x *StreamEventsRequest) String() string { func (*StreamEventsRequest) ProtoMessage() {} func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -272,7 +462,7 @@ func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. func (*StreamEventsRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} } func (x *StreamEventsRequest) GetEvent() *pb.CloudEvent { @@ -292,7 +482,7 @@ type StreamEventsResponse struct { func (x *StreamEventsResponse) Reset() { *x = StreamEventsResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -304,7 +494,7 @@ func (x *StreamEventsResponse) String() string { func (*StreamEventsResponse) ProtoMessage() {} func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -317,7 +507,7 @@ func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. func (*StreamEventsResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} } func (x *StreamEventsResponse) GetEventId() string { @@ -344,7 +534,7 @@ type RegisterSchemaRequest struct { func (x *RegisterSchemaRequest) Reset() { *x = RegisterSchemaRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -356,7 +546,7 @@ func (x *RegisterSchemaRequest) String() string { func (*RegisterSchemaRequest) ProtoMessage() {} func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -369,7 +559,7 @@ func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaRequest.ProtoReflect.Descriptor instead. func (*RegisterSchemaRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{9} } func (x *RegisterSchemaRequest) GetSchemas() []*Schema { @@ -389,7 +579,7 @@ type RegisterSchemaResponse struct { func (x *RegisterSchemaResponse) Reset() { *x = RegisterSchemaResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -401,7 +591,7 @@ func (x *RegisterSchemaResponse) String() string { func (*RegisterSchemaResponse) ProtoMessage() {} func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -414,7 +604,7 @@ func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaResponse.ProtoReflect.Descriptor instead. func (*RegisterSchemaResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{10} } func (x *RegisterSchemaResponse) GetRegistered() []*RegisteredSchema { @@ -428,13 +618,22 @@ var File_pb_chip_ingress_proto protoreflect.FileDescriptor const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"^\n" + + "\x0ePublishOptions\x124\n" + + "\x13transaction_enabled\x18\x01 \x01(\bH\x00R\x12transactionEnabled\x88\x01\x01B\x16\n" + + "\x14_transaction_enabled\"g\n" + + "\fPublishError\x12?\n" + + "\n" + + "error_code\x18\x01 \x01(\x0e2 .chipingress.pb.PublishErrorCodeR\terrorCode\x12\x16\n" + + "\x06reason\x18\x02 \x01(\tR\x06reason\"\x82\x01\n" + "\x0fCloudEventBatch\x125\n" + - "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\x128\n" + + "\aoptions\x18\x02 \x01(\v2\x1e.chipingress.pb.PublishOptionsR\aoptions\"J\n" + "\x0fPublishResponse\x127\n" + - "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\")\n" + - "\rPublishResult\x12\x18\n" + - "\aeventId\x18\x01 \x01(\tR\aeventId\"\x0e\n" + + "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\"^\n" + + "\rPublishResult\x12\x19\n" + + "\bevent_id\x18\x01 \x01(\tR\aeventId\x122\n" + + "\x05error\x18\x02 \x01(\v2\x1c.chipingress.pb.PublishErrorR\x05error\"\x0e\n" + "\fEmptyRequest\"(\n" + "\fPingResponse\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"J\n" + @@ -448,7 +647,13 @@ const file_pb_chip_ingress_proto_rawDesc = "" + "\x16RegisterSchemaResponse\x12=\n" + "\n" + "registered\x18\x01 \x03(\v2\x1d.chip_common.RegisteredSchemaR\n" + - "registered2\xb6\x03\n" + + "registered*\xd8\x01\n" + + "\x10PublishErrorCode\x12\x1e\n" + + "\x1aPUBLISH_ERROR_CODE_UNKNOWN\x10\x00\x12(\n" + + "$PUBLISH_ERROR_CODE_VALIDATION_FAILED\x10\x01\x12%\n" + + "!PUBLISH_ERROR_CODE_SCHEMA_MISSING\x10\x02\x12#\n" + + "\x1fPUBLISH_ERROR_CODE_ENCODE_ERROR\x10\x03\x12.\n" + + "*PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION\x10\x042\xb6\x03\n" + "\vChipIngress\x12K\n" + "\aPublish\x12\x1d.io.cloudevents.v1.CloudEvent\x1a\x1f.chipingress.pb.PublishResponse\"\x00\x12R\n" + "\fPublishBatch\x12\x1f.chipingress.pb.CloudEventBatch\x1a\x1f.chipingress.pb.PublishResponse\"\x00\x12B\n" + @@ -468,42 +673,49 @@ func file_pb_chip_ingress_proto_rawDescGZIP() []byte { return file_pb_chip_ingress_proto_rawDescData } -var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pb_chip_ingress_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_pb_chip_ingress_proto_goTypes = []any{ - (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch - (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse - (*PublishResult)(nil), // 2: chipingress.pb.PublishResult - (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest - (*PingResponse)(nil), // 4: chipingress.pb.PingResponse - (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest - (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse - (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest - (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse - (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent - (*Schema)(nil), // 10: chip_common.Schema - (*RegisteredSchema)(nil), // 11: chip_common.RegisteredSchema + (PublishErrorCode)(0), // 0: chipingress.pb.PublishErrorCode + (*PublishOptions)(nil), // 1: chipingress.pb.PublishOptions + (*PublishError)(nil), // 2: chipingress.pb.PublishError + (*CloudEventBatch)(nil), // 3: chipingress.pb.CloudEventBatch + (*PublishResponse)(nil), // 4: chipingress.pb.PublishResponse + (*PublishResult)(nil), // 5: chipingress.pb.PublishResult + (*EmptyRequest)(nil), // 6: chipingress.pb.EmptyRequest + (*PingResponse)(nil), // 7: chipingress.pb.PingResponse + (*StreamEventsRequest)(nil), // 8: chipingress.pb.StreamEventsRequest + (*StreamEventsResponse)(nil), // 9: chipingress.pb.StreamEventsResponse + (*RegisterSchemaRequest)(nil), // 10: chipingress.pb.RegisterSchemaRequest + (*RegisterSchemaResponse)(nil), // 11: chipingress.pb.RegisterSchemaResponse + (*pb.CloudEvent)(nil), // 12: io.cloudevents.v1.CloudEvent + (*Schema)(nil), // 13: chip_common.Schema + (*RegisteredSchema)(nil), // 14: chip_common.RegisteredSchema } var file_pb_chip_ingress_proto_depIdxs = []int32{ - 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent - 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 9, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 10, // 3: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema - 11, // 4: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema - 9, // 5: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 6: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 7: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 8: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 7, // 9: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest - 1, // 10: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 11: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 12: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 13: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 8, // 14: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse - 10, // [10:15] is the sub-list for method output_type - 5, // [5:10] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 0, // 0: chipingress.pb.PublishError.error_code:type_name -> chipingress.pb.PublishErrorCode + 12, // 1: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent + 1, // 2: chipingress.pb.CloudEventBatch.options:type_name -> chipingress.pb.PublishOptions + 5, // 3: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult + 2, // 4: chipingress.pb.PublishResult.error:type_name -> chipingress.pb.PublishError + 12, // 5: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 13, // 6: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 14, // 7: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 12, // 8: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 3, // 9: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 6, // 10: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 8, // 11: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 10, // 12: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 4, // 13: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 4, // 14: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 7, // 15: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 9, // 16: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 11, // 17: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 13, // [13:18] is the sub-list for method output_type + 8, // [8:13] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_pb_chip_ingress_proto_init() } @@ -512,18 +724,20 @@ func file_pb_chip_ingress_proto_init() { return } file_pb_chip_common_proto_init() + file_pb_chip_ingress_proto_msgTypes[0].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc)), - NumEnums: 0, - NumMessages: 9, + NumEnums: 1, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, GoTypes: file_pb_chip_ingress_proto_goTypes, DependencyIndexes: file_pb_chip_ingress_proto_depIdxs, + EnumInfos: file_pb_chip_ingress_proto_enumTypes, MessageInfos: file_pb_chip_ingress_proto_msgTypes, }.Build() File_pb_chip_ingress_proto = out.File diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index 1675f5d9c6..6ff0c09607 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -11,34 +11,82 @@ option go_package = "./pb"; service ChipIngress { // Publish sends a single CloudEvent to the ChipIngress service. rpc Publish (io.cloudevents.v1.CloudEvent) returns (PublishResponse) {} -// PublishBatch sends a batch of CloudEvents to the ChipIngress service. -// This method is atomic, meaning it will either succeed or fail for the entire batch. -// When the server receives a batch of events, it will open a kafka transaction, and begin producing each event sequentially in order received. -// If any 1 message fails, all previous messages in the same batch that were already produced won't be marked as committed, -// and the server will respond with an error. -// Consumers can set isolation.level=read_committed to only read committed records + // PublishBatch sends a batch of CloudEvents to the ChipIngress service. + // Default behavior is partial delivery (the default for both server and client): + // when transaction_enabled is unset (PublishOptions absent or the field omitted) + // or explicitly false, the server produces valid events and returns per-event + // errors in PublishResult for invalid ones. + // When transaction_enabled is explicitly true, any per-event failure fails the + // entire batch (all-or-nothing). + // Results in PublishResponse are positionally ordered: results[i] corresponds to events[i]. rpc PublishBatch (CloudEventBatch) returns (PublishResponse) {} -// Ping sends a request to the ChipIngress service to check if it is alive. + // Ping sends a request to the ChipIngress service to check if it is alive. rpc Ping(EmptyRequest) returns (PingResponse); -// StreamEvents; EXPERIMENTAL ~ allows clients to stream CloudEvents to the server. -// This API is experimental and may change in the future. + // StreamEvents; EXPERIMENTAL ~ allows clients to stream CloudEvents to the server. + // This API is experimental and may change in the future. rpc StreamEvents (stream StreamEventsRequest) returns (stream StreamEventsResponse) {} // New streaming endpoint -// RegisterSchema allows registering one or more schemas that define the structure of CloudEvent data. + // RegisterSchema allows registering one or more schemas that define the structure of CloudEvent data. rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse) {} } +message PublishOptions { + // When unset or false, valid events are produced and invalid ones are + // skipped; per-event outcomes are reported in PublishResponse.results + // (partial delivery — the server default). + // When explicitly true, any per-event failure fails the entire batch + // (all-or-nothing). + optional bool transaction_enabled = 1; +} + +enum PublishErrorCode { + // Default value. + PUBLISH_ERROR_CODE_UNKNOWN = 0; + + // The CloudEvent structure is invalid. Common causes: missing required fields + // (id, source, type, specversion), a nil attribute value, or an unsupported + // attribute type in the extensions map. Fix the CloudEvent and retry. + PUBLISH_ERROR_CODE_VALIDATION_FAILED = 1; + + // No schema was found in the Schema Registry for the subject derived from + // this event's source and type. The schema must be registered before + // publishing events of this type. Note: registry connectivity failures are + // returned as a gRPC INTERNAL error, not this code. + PUBLISH_ERROR_CODE_SCHEMA_MISSING = 2; + + // The event payload could not be encoded against its registered schema. + // Common causes: the data bytes are not valid protobuf (e.g. JSON was used + // instead of proto.Marshal), or the event type does not match any message + // descriptor in the registered schema. + PUBLISH_ERROR_CODE_ENCODE_ERROR = 3; + + // The event source does not map to a recognized domain. Either the source + // field is set to an unknown value or is missing entirely. Verify the source + // field matches a domain configured in chip-ingress. + PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION = 4; +} + +// PublishError carries machine- and human-readable detail for a non-produced event. +message PublishError { + PublishErrorCode error_code = 1; + string reason = 2; +} + // CloudEventBatch is used to send many ChipIngress -message CloudEventBatch{ +message CloudEventBatch { repeated io.cloudevents.v1.CloudEvent events = 1; + PublishOptions options = 2; } message PublishResponse { - repeated PublishResult results = 1; + repeated PublishResult results = 1; } +// PublishResult reports the outcome for one input event. +// Invariant: error is set if and only if the event was not produced to Kafka. message PublishResult { - string eventId = 1; + string event_id = 1; + PublishError error = 2; } // EmptyRequest is just an empty request @@ -46,7 +94,7 @@ message EmptyRequest{} // PingResponse responds to pings message PingResponse { - string message = 1; + string message = 1; } // Define request and response messages for the new streaming endpoint @@ -67,4 +115,4 @@ message RegisterSchemaRequest { // RegisterSchema response message message RegisterSchemaResponse { repeated chip_common.RegisteredSchema registered = 1; // List of registered schema subjects -} \ No newline at end of file +} diff --git a/pkg/chipingress/pb/chip_ingress_grpc.pb.go b/pkg/chipingress/pb/chip_ingress_grpc.pb.go index 7b68869f16..de36ceba46 100644 --- a/pkg/chipingress/pb/chip_ingress_grpc.pb.go +++ b/pkg/chipingress/pb/chip_ingress_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.2 // - protoc v5.29.3 // source: pb/chip_ingress.proto @@ -36,11 +36,13 @@ type ChipIngressClient interface { // Publish sends a single CloudEvent to the ChipIngress service. Publish(ctx context.Context, in *pb.CloudEvent, opts ...grpc.CallOption) (*PublishResponse, error) // PublishBatch sends a batch of CloudEvents to the ChipIngress service. - // This method is atomic, meaning it will either succeed or fail for the entire batch. - // When the server receives a batch of events, it will open a kafka transaction, and begin producing each event sequentially in order received. - // If any 1 message fails, all previous messages in the same batch that were already produced won't be marked as committed, - // and the server will respond with an error. - // Consumers can set isolation.level=read_committed to only read committed records + // Default behavior is partial delivery (the default for both server and client): + // when transaction_enabled is unset (PublishOptions absent or the field omitted) + // or explicitly false, the server produces valid events and returns per-event + // errors in PublishResult for invalid ones. + // When transaction_enabled is explicitly true, any per-event failure fails the + // entire batch (all-or-nothing). + // Results in PublishResponse are positionally ordered: results[i] corresponds to events[i]. PublishBatch(ctx context.Context, in *CloudEventBatch, opts ...grpc.CallOption) (*PublishResponse, error) // Ping sends a request to the ChipIngress service to check if it is alive. Ping(ctx context.Context, in *EmptyRequest, opts ...grpc.CallOption) (*PingResponse, error) @@ -121,11 +123,13 @@ type ChipIngressServer interface { // Publish sends a single CloudEvent to the ChipIngress service. Publish(context.Context, *pb.CloudEvent) (*PublishResponse, error) // PublishBatch sends a batch of CloudEvents to the ChipIngress service. - // This method is atomic, meaning it will either succeed or fail for the entire batch. - // When the server receives a batch of events, it will open a kafka transaction, and begin producing each event sequentially in order received. - // If any 1 message fails, all previous messages in the same batch that were already produced won't be marked as committed, - // and the server will respond with an error. - // Consumers can set isolation.level=read_committed to only read committed records + // Default behavior is partial delivery (the default for both server and client): + // when transaction_enabled is unset (PublishOptions absent or the field omitted) + // or explicitly false, the server produces valid events and returns per-event + // errors in PublishResult for invalid ones. + // When transaction_enabled is explicitly true, any per-event failure fails the + // entire batch (all-or-nothing). + // Results in PublishResponse are positionally ordered: results[i] corresponds to events[i]. PublishBatch(context.Context, *CloudEventBatch) (*PublishResponse, error) // Ping sends a request to the ChipIngress service to check if it is alive. Ping(context.Context, *EmptyRequest) (*PingResponse, error) @@ -145,19 +149,19 @@ type ChipIngressServer interface { type UnimplementedChipIngressServer struct{} func (UnimplementedChipIngressServer) Publish(context.Context, *pb.CloudEvent) (*PublishResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") + return nil, status.Error(codes.Unimplemented, "method Publish not implemented") } func (UnimplementedChipIngressServer) PublishBatch(context.Context, *CloudEventBatch) (*PublishResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method PublishBatch not implemented") + return nil, status.Error(codes.Unimplemented, "method PublishBatch not implemented") } func (UnimplementedChipIngressServer) Ping(context.Context, *EmptyRequest) (*PingResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") + return nil, status.Error(codes.Unimplemented, "method Ping not implemented") } func (UnimplementedChipIngressServer) StreamEvents(grpc.BidiStreamingServer[StreamEventsRequest, StreamEventsResponse]) error { - return status.Errorf(codes.Unimplemented, "method StreamEvents not implemented") + return status.Error(codes.Unimplemented, "method StreamEvents not implemented") } func (UnimplementedChipIngressServer) RegisterSchema(context.Context, *RegisterSchemaRequest) (*RegisterSchemaResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method RegisterSchema not implemented") + return nil, status.Error(codes.Unimplemented, "method RegisterSchema not implemented") } func (UnimplementedChipIngressServer) mustEmbedUnimplementedChipIngressServer() {} func (UnimplementedChipIngressServer) testEmbeddedByValue() {} @@ -170,7 +174,7 @@ type UnsafeChipIngressServer interface { } func RegisterChipIngressServer(s grpc.ServiceRegistrar, srv ChipIngressServer) { - // If the following call pancis, it indicates UnimplementedChipIngressServer was + // If the following call panics, it indicates UnimplementedChipIngressServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pkg/chipingress/types.go b/pkg/chipingress/types.go index 15f94c04a3..b21f279d42 100644 --- a/pkg/chipingress/types.go +++ b/pkg/chipingress/types.go @@ -19,9 +19,12 @@ type ( // Message types CloudEventBatch = pb.CloudEventBatch EmptyRequest = pb.EmptyRequest + PublishErrorCode = pb.PublishErrorCode PingResponse = pb.PingResponse + PublishOptions = pb.PublishOptions PublishResponse = pb.PublishResponse PublishResult = pb.PublishResult + PublishError = pb.PublishError StreamEventsRequest = pb.StreamEventsRequest StreamEventsResponse = pb.StreamEventsResponse )