Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.101
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260506144252-c100eabfda74
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260527110243-883689d933be
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260528143427-0daeb12345d7
github.com/smartcontractkit/chainlink-common/keystore v1.1.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
Expand Down Expand Up @@ -598,12 +598,12 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
go.yaml.in/yaml/v4 v4.0.0-rc.4 // indirect
golang.org/x/arch v0.11.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/crypto v0.52.0 // indirect
golang.org/x/exp v0.0.0-20260508232706-74f9aab9d74a // indirect
golang.org/x/mod v0.36.0 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/net v0.55.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/term v0.43.0 // indirect
golang.org/x/time v0.15.0 // indirect
golang.org/x/tools v0.45.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

206 changes: 0 additions & 206 deletions core/services/beholder/durable_event_store_orm.go

This file was deleted.

64 changes: 18 additions & 46 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/durableemitter"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
nodeauthjwt "github.com/smartcontractkit/chainlink-common/pkg/nodeauth/jwt"
commonsrv "github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -375,13 +377,26 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
jwtGenerator := nodeauthjwt.NewNodeJWTGenerator(csaSigner, csaPubKey)

// Wire DurableEmitter for persistent chip ingress delivery when enabled.
/* TODO: CRE-4422 Re-enable after refactor
if cfg.Telemetry().DurableEmitterEnabled() && cfg.Telemetry().ChipIngressEndpoint() != "" {
if err = setupDurableEmitter(ctx, opts.DS, globalLogger, cfg.Telemetry()); err != nil {
// beholderAuthHeaders holds the node's CSA-key-signed credential required
// by the Chip Ingress service — used by all chip clients, not just beholder.
var auth chipingress.HeaderProvider
if len(beholderAuthHeaders) > 0 {
auth = beholder.NewStaticAuth(beholderAuthHeaders, !cfg.Telemetry().ChipIngressInsecureConnection())
}
Comment on lines +383 to +386
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var auth chipingress.HeaderProvider
if len(beholderAuthHeaders) > 0 {
auth = beholder.NewStaticAuth(beholderAuthHeaders, !cfg.Telemetry().ChipIngressInsecureConnection())
}
auth, err := chipingress.NewHeaderProvider(chipingress.HeaderProviderConfig{
AuthHeaders: beholderAuthHeaders,
AuthHeadersTTL: cfg.Telemetry().AuthHeadersTTL(),
AuthPublicKeyHex: csaPubKeyHex,
AuthKeySigner: csaKeystore, // already constructed at line 285
InsecureConnection: cfg.Telemetry().ChipIngressInsecureConnection(),
})
if err != nil {
return nil, fmt.Errorf("failed to build chip ingress auth: %w", err)
}

durableCfg := durableemitter.SetupConfig{
Endpoint: cfg.Telemetry().ChipIngressEndpoint(),
InsecureConnection: cfg.Telemetry().ChipIngressInsecureConnection(),
Auth: auth,
RetransmitEnabled: true, // host process owns retransmit
}
pgStore := durableemitter.NewPgDurableEventStore(opts.DS)
durableEmitter, err := durableemitter.Setup(pgStore, durableCfg, globalLogger)
if err != nil {
return nil, fmt.Errorf("failed to set up chip durable emitter: %w", err)
}
srvcs = append(srvcs, durableEmitter)
}
*/

creServices, err := cre.NewServices(
globalLogger,
Expand Down Expand Up @@ -1257,46 +1272,3 @@ func (app *ChainlinkApplication) DeleteLogPollerDataAfter(ctx context.Context, c

return nil
}

// setupDurableEmitter replaces the global beholder emitter with a DurableEmitter
// backed by Postgres. Events are persisted before async gRPC delivery, surviving
// node restarts and chip ingress outages.
/* TODO: CRE-4422 Re-enable after refactor
func setupDurableEmitter(ctx context.Context, ds sqlutil.DataSource, lggr logger.SugaredLogger, _ config.Telemetry) error {
client := beholder.GetClient()
if client == nil {
return errors.New("beholder client not initialized")
}

chipClient := client.Chip
if chipClient == nil || isNoopChipClient(chipClient) {
return errors.New("chip ingress client not available")
}

pgStore := beholdersvc.NewPgDurableEventStore(ds)
durableCfg := durableemitter.DefaultDurableEmitterConfig()
durableEmitter, err := durableemitter.NewDurableEmitter(pgStore, chipClient, true, durableCfg, lggr)
if err != nil {
return fmt.Errorf("failed to create durable emitter: %w", err)
}

// Build a new DualSourceEmitter: durable chip + OTLP.
messageLogger := client.MessageLoggerProvider.Logger("durable-emitter")
otlpEmitter := beholder.NewMessageEmitter(messageLogger)
dualEmitter, err := beholder.NewDualSourceEmitter(durableEmitter, otlpEmitter)
if err != nil {
return fmt.Errorf("failed to create dual source emitter: %w", err)
}

durableEmitter.Start(ctx)
client.Emitter = dualEmitter

lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue")
return nil
}

func isNoopChipClient(c chipingress.Client) bool {
_, ok := c.(*chipingress.NoopClient)
return ok
}
*/
Loading
Loading