From 204b40a14bd2b5c7dfbcb68afa85241dfcca6fe8 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 26 May 2026 11:07:12 +0000 Subject: [PATCH] bundle: add DATABRICKS_BUNDLE_MANAGED_STATE gate + DMS-backed deployment lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires up a new env-var gate (DATABRICKS_BUNDLE_MANAGED_STATE) that switches the deployment lock from the workspace filesystem to the bundle deployment metadata service (DMS). The env var accepts the usual boolean spellings (true/false, 1/0, yes/no, on/off); the historical filesystem lock is the default. The DMS-backed lock uses the SDK's databricks-sdk-go/service/bundle client (merged via #5311, available since v0.135.0) — no hand-rolled DMS client. Acquire calls CreateDeployment / CreateVersion and starts a background heartbeat goroutine; Release stops the heartbeat and calls CompleteVersion (plus DeleteDeployment on successful destroy). Bind and Unbind are not yet supported under DMS and return an error at lock construction. Deployment ID persistence lives inside the lock package for now (managed_service.json in the workspace state dir). In step 4 of the DMS split it will move to bundle/statemgmt so it can be shared with the state-from-DMS path. Co-authored-by: Isaac --- .../dms/release-lock-error/databricks.yml | 11 + .../dms/release-lock-error/out.test.toml | 4 + .../bundle/dms/release-lock-error/output.txt | 38 ++ .../bundle/dms/release-lock-error/script | 8 + .../bundle/dms/release-lock-error/test.toml | 4 + acceptance/bundle/dms/test.toml | 7 + .../lock/deployment_metadata_service.go | 302 +++++++++++++ .../lock/deployment_metadata_service_test.go | 58 +++ bundle/deploy/lock/lock.go | 19 +- bundle/env/deployment_metadata.go | 30 ++ bundle/phases/bind.go | 12 +- bundle/phases/deploy.go | 6 +- bundle/phases/destroy.go | 6 +- libs/testserver/deployment_metadata.go | 405 ++++++++++++++++++ libs/testserver/fake_workspace.go | 5 + libs/testserver/handlers.go | 29 ++ 16 files changed, 935 insertions(+), 9 deletions(-) create mode 100644 acceptance/bundle/dms/release-lock-error/databricks.yml create mode 100644 acceptance/bundle/dms/release-lock-error/out.test.toml create mode 100644 acceptance/bundle/dms/release-lock-error/output.txt create mode 100755 acceptance/bundle/dms/release-lock-error/script create mode 100644 acceptance/bundle/dms/release-lock-error/test.toml create mode 100644 acceptance/bundle/dms/test.toml create mode 100644 bundle/deploy/lock/deployment_metadata_service.go create mode 100644 bundle/deploy/lock/deployment_metadata_service_test.go create mode 100644 bundle/env/deployment_metadata.go create mode 100644 libs/testserver/deployment_metadata.go diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 00000000000..94323b84d93 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 00000000000..476ba422ef5 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,38 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Updating deployment state... +Deployment complete! +Warn: Failed to release deployment lock: simulated complete version failure + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "fail-complete", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100755 index 00000000000..86220e4be0a --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,8 @@ +# Deploy with the deployment metadata service enabled. +# The target name "fail-complete" instructs the fake DMS server to fail the +# CompleteVersion call so the CLI exercises the "lock release failed" branch. +trace $CLI bundle deploy + +# Print the DMS requests to verify the lock release was attempted. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 00000000000..58ed9f5172c --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,4 @@ +Ignore = [".databricks"] + +# Override target to "fail-complete" which makes the fake DMS server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..1b258d2613a --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,7 @@ +Badness = "Uses local test server; enable on cloud once the deployment metadata service is in production" +Local = true +Cloud = false +RecordRequests = true + +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..afb91ef5f79 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,302 @@ +package lock + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/google/uuid" +) + +// defaultHeartbeatInterval is how often the background heartbeat goroutine +// renews the DMS-side lock lease while a deployment is in progress. +const defaultHeartbeatInterval = 30 * time.Second + +// managedServiceFileName is the workspace state file where the lock package +// persists the DMS deployment_id across CLI invocations. It is intentionally +// scoped to this package for now; once the state-from-DMS path lands the +// file (and accompanying struct) will move to bundle/statemgmt so both the +// lock and state managers can share it. +const managedServiceFileName = "managed_service.json" + +// managedServiceJSON is the on-disk shape of managedServiceFileName. +type managedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} + +// metadataServiceLock implements DeploymentLock against the bundle deployment +// metadata service (DMS). The lock is acquired by creating a new Version +// under the deployment; a background goroutine renews the lock lease via +// Heartbeat calls; the lock is released by CompleteVersion. +type metadataServiceLock struct { + b *bundle.Bundle + versionType sdkbundle.VersionType + + svc sdkbundle.BundleInterface + deploymentID string + versionID string + + stopHeartbeat context.CancelFunc +} + +func newMetadataServiceLock(b *bundle.Bundle, goal Goal) (*metadataServiceLock, error) { + versionType, err := goalToVersionType(goal) + if err != nil { + return nil, err + } + return &metadataServiceLock{b: b, versionType: versionType}, nil +} + +// goalToVersionType maps a deployment Goal onto the DMS VersionType enum. +// Bind and Unbind are not yet supported under DMS — they will gain dedicated +// DMS operations in a later change. +func goalToVersionType(goal Goal) (sdkbundle.VersionType, error) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, nil + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, nil + case GoalBind, GoalUnbind: + return "", fmt.Errorf("%s is not supported with the deployment metadata service", goal) + default: + return "", fmt.Errorf("unknown deployment goal: %s", goal) + } +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return errors.New("force lock is not supported with the deployment metadata service") + } + + l.svc = l.b.WorkspaceClient(ctx).Bundle + + deploymentID, versionID, err := acquireLock(ctx, l.b, l.svc, l.versionType) + if err != nil { + return err + } + + l.deploymentID = deploymentID + l.versionID = versionID + l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + // Stop the heartbeat first so its in-flight request doesn't race with + // CompleteVersion below. + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + // If Acquire failed before reaching CreateVersion there is nothing to release. + if l.svc == nil || l.deploymentID == "" || l.versionID == "" { + return nil + } + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + versionName := fmt.Sprintf("deployments/%s/versions/%s", l.deploymentID, l.versionID) + if _, err := l.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }); err != nil { + return err + } + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", + l.deploymentID, l.versionID, reason) + + // On successful destroy, delete the deployment record. Surface failures + // to the caller — they are deploy-correctness issues, not best-effort + // cleanup. + if status == DeploymentSuccess && l.versionType == sdkbundle.VersionTypeVersionTypeDestroy { + if err := l.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + l.deploymentID, + }); err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + return nil +} + +// acquireLock implements the lock acquisition protocol: +// 1. Resolve the deployment ID from managed_service.json (or generate a new one). +// 2. CreateDeployment for fresh IDs; GetDeployment otherwise to learn the +// next version number. +// 3. CreateVersion to acquire the lock. +func acquireLock(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, versionType sdkbundle.VersionType) (deploymentID, versionID string, err error) { + deploymentID, isNew, err := resolveDeploymentID(ctx, b) + if err != nil { + return "", "", err + } + + if isNew { + // Fresh deployment: create the record at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + // Persist the deployment ID only after the server-side record exists, + // so a failed CreateDeployment doesn't leave a dangling ID on disk. + if err := writeDeploymentID(ctx, b, deploymentID); err != nil { + return "", "", err + } + versionID = "1" + } else { + // Existing deployment: ask the server for the last version ID. + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + if getErr != nil { + return "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + next, parseErr := nextVersionID(dep.LastVersionId) + if parseErr != nil { + return "", "", parseErr + } + versionID = next + } + + if _, err := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }); err != nil { + return "", "", fmt.Errorf("failed to acquire deployment lock: %w", err) + } + + return deploymentID, versionID, nil +} + +// nextVersionID returns the next monotonic version ID following lastVersionID. +// An empty lastVersionID means "no prior versions" so the next ID is "1". +func nextVersionID(lastVersionID string) (string, error) { + if lastVersionID == "" { + return "1", nil + } + n, err := strconv.ParseInt(lastVersionID, 10, 64) + if err != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", lastVersionID, err) + } + return strconv.FormatInt(n+1, 10), nil +} + +// resolveDeploymentID returns the deployment ID for this bundle. If +// managed_service.json exists in the workspace state directory and contains a +// deployment ID, it is reused. Otherwise a new UUID is generated and the +// caller must write it to disk after CreateDeployment succeeds. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, error) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return "", false, fmt.Errorf("failed to create state filer: %w", err) + } + + reader, readErr := f.Read(ctx, managedServiceFileName) + if readErr == nil { + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, err) + } + var sj managedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + return "", false, fmt.Errorf("failed to parse %s: %w", managedServiceFileName, err) + } + if sj.DeploymentID != "" { + return sj.DeploymentID, false, nil + } + // File exists but has no deployment_id — treat as fresh. + } else if !errors.Is(readErr, fs.ErrNotExist) { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, readErr) + } + + return uuid.New().String(), true, nil +} + +func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID string) error { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return fmt.Errorf("failed to create state filer: %w", err) + } + data, err := json.Marshal(managedServiceJSON{DeploymentID: deploymentID}) + if err != nil { + return fmt.Errorf("failed to marshal %s: %w", managedServiceFileName, err) + } + if err := f.Write(ctx, managedServiceFileName, bytes.NewReader(data), + filer.CreateParentDirectories, filer.OverwriteIfExists); err != nil { + return fmt.Errorf("failed to write %s: %w", managedServiceFileName, err) + } + return nil +} + +// startHeartbeat spawns a goroutine that renews the DMS lock lease at +// defaultHeartbeatInterval. The returned cancel func stops the goroutine. +// Heartbeat errors that indicate the version was already completed (HTTP 409 +// ABORTED) are treated as benign termination; all other errors are logged +// and the goroutine continues so a transient network blip doesn't tear down +// the deploy. +func startHeartbeat(parent context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(parent) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}); err != nil { + if isAborted(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + continue + } + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", + deploymentID, versionID) + } + } + }() + + return cancel +} + +// isAborted reports whether err is the DMS-specific "409 ABORTED" response +// the server emits when the heartbeat target version is no longer active. +func isAborted(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" { + return true + } + return false +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 00000000000..76b09d6532c --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,58 @@ +package lock + +import ( + "testing" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" +) + +func TestGoalToVersionType(t *testing.T) { + tests := []struct { + goal Goal + want sdkbundle.VersionType + wantErr bool + }{ + {goal: GoalDeploy, want: sdkbundle.VersionTypeVersionTypeDeploy}, + {goal: GoalDestroy, want: sdkbundle.VersionTypeVersionTypeDestroy}, + {goal: GoalBind, wantErr: true}, + {goal: GoalUnbind, wantErr: true}, + {goal: Goal("garbage"), wantErr: true}, + } + for _, tt := range tests { + t.Run(string(tt.goal), func(t *testing.T) { + got, err := goalToVersionType(tt.goal) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNextVersionID(t *testing.T) { + tests := []struct { + name string + last string + want string + wantErr bool + }{ + {name: "empty starts at 1", last: "", want: "1"}, + {name: "increments numeric", last: "1", want: "2"}, + {name: "increments larger numeric", last: "42", want: "43"}, + {name: "rejects non-numeric", last: "v1", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := nextVersionID(tt.last) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go index 6e3339d6fdb..58162aba946 100644 --- a/bundle/deploy/lock/lock.go +++ b/bundle/deploy/lock/lock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" ) // Goal describes the purpose of a deployment operation. @@ -33,9 +34,17 @@ type DeploymentLock interface { Release(ctx context.Context, status DeploymentStatus) error } -// NewDeploymentLock returns a DeploymentLock backed by the workspace -// filesystem. This factory exists so a future change can swap in alternative -// lock implementations without touching callers. -func NewDeploymentLock(b *bundle.Bundle, goal Goal) DeploymentLock { - return newWorkspaceFilesystemLock(b, goal) +// NewDeploymentLock returns a DeploymentLock implementation chosen based on +// the DATABRICKS_BUNDLE_MANAGED_STATE environment variable: when set to a +// truthy value the deployment metadata service backs the lock, otherwise the +// historical workspace-filesystem lock is used. +// +// Note: today the env var alone gates the DMS path. Once the broader managed- +// state feature lands the gate will move behind a richer predicate (e.g. +// statemgmt.IsDmsActive) that also checks server-side opt-in. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) (DeploymentLock, error) { + if env.IsManagedState(ctx) { + return newMetadataServiceLock(b, goal) + } + return newWorkspaceFilesystemLock(b, goal), nil } diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 00000000000..004f22c495b --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,30 @@ +package env + +import ( + "context" + + envlib "github.com/databricks/cli/libs/env" +) + +// managedStateVariable names the environment variable that controls whether +// server-managed state (via the deployment metadata service) is used for +// locking and, in a future change, resource state management. +// +// The variable is treated as a boolean and accepts the usual spellings: +// "true"/"false", "1"/"0", "yes"/"no", "on"/"off" (case-insensitive). An +// empty or absent value falls back to the historical filesystem-based +// behavior. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the raw value of DATABRICKS_BUNDLE_MANAGED_STATE if +// set. Callers that only need a bool should use IsManagedState. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{managedStateVariable}) +} + +// IsManagedState reports whether the DATABRICKS_BUNDLE_MANAGED_STATE +// environment variable is set to a truthy value. +func IsManagedState(ctx context.Context) bool { + v, ok := envlib.GetBool(ctx, managedStateVariable) + return ok && v +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 7b3ce12df64..943d4902cd1 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,7 +23,11 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - dl := lock.NewDeploymentLock(b, lock.GoalBind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return @@ -126,7 +130,11 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - dl := lock.NewDeploymentLock(b, lock.GoalUnbind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index d23b9ac2c91..f0cceda3c92 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -131,7 +131,11 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - dl := lock.NewDeploymentLock(b, lock.GoalDeploy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 741a30c99c0..27ddc2cb725 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,7 +120,11 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - dl := lock.NewDeploymentLock(b, lock.GoalDestroy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 00000000000..277dd1e112e --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,405 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + sdktime "github.com/databricks/databricks-sdk-go/common/types/time" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// deploymentMetadata holds in-memory state for the deployment metadata +// service. One instance lives inside each FakeWorkspace so tests can drive +// CRUD against the DMS routes the same way they drive jobs/apps/etc. +type deploymentMetadata struct { + // deployments keyed by deployment_id. + deployments map[string]sdkbundle.Deployment + + // versions keyed by "deploymentId/versionId". + versions map[string]sdkbundle.Version + + // operations keyed by "deploymentId/versionId/resourceKey". + operations map[string]sdkbundle.Operation + + // resources keyed by "deploymentId/resourceKey". + resources map[string]sdkbundle.Resource + + // lockHolder maps deploymentId -> the full version name that holds the + // lock (e.g. "deployments/{id}/versions/{vid}"). Absent when no lock is + // held. + lockHolder map[string]string + + // lockExpiry maps deploymentId -> when the lock expires; checked against + // time.Now() on every lock-acquiring or lock-respecting call. + lockExpiry map[string]time.Time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]sdkbundle.Deployment{}, + versions: map[string]sdkbundle.Version{}, + operations: map[string]sdkbundle.Operation{}, + resources: map[string]sdkbundle.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +// lockDuration matches the real service's default lease so heartbeat-renewal +// tests have a comfortable margin. +const lockDuration = 2 * time.Minute + +func nowPtr() *sdktime.Time { + return sdktime.New(time.Now().UTC()) +} + +func toSDKTime(t time.Time) *sdktime.Time { + return sdktime.New(t.UTC()) +} + +func badRequest(msg string) Response { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": msg}, + } +} + +func notFound(msg string) Response { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": msg}, + } +} + +func aborted(msg string) Response { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": msg}, + } +} + +// DeploymentMetadataCreateDeployment is mounted at +// POST /api/2.0/bundle/deployments. The SDK sends the inner Deployment as the +// body and passes deployment_id as a query parameter. +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return badRequest("deployment_id is required") + } + + var body sdkbundle.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ALREADY_EXISTS", + "message": fmt.Sprintf("deployment %s already exists", deploymentID), + }, + } + } + + now := nowPtr() + dep := sdkbundle.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: body.TargetName, + Status: sdkbundle.DeploymentStatusDeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: now, + UpdateTime: now, + } + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataGetDeployment is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.deploymentMetadata.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + return Response{Body: dep} +} + +// DeploymentMetadataDeleteDeployment is mounted at +// DELETE /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + now := nowPtr() + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusDeleted + dep.DestroyTime = now + dep.DestroyedBy = s.CurrentUser().UserName + dep.UpdateTime = now + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataCreateVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions. Body = Version, +// query = version_id. Validates monotonic version IDs and enforces the +// deployment-level lock. +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return badRequest("version_id is required") + } + + var body sdkbundle.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + // Enforce monotonic versions: version_id must equal last_version_id + 1. + expected := "1" + if dep.LastVersionId != "" { + n, err := strconv.ParseInt(dep.LastVersionId, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "stored last_version_id is not a valid number: " + dep.LastVersionId, + }, + } + } + expected = strconv.FormatInt(n+1, 10) + } + if versionID != expected { + return aborted(fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", + expected, versionID)) + } + + // Enforce lock: if a lock is held and not expired, reject. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if exp, ok := state.lockExpiry[deploymentID]; ok && exp.After(now) { + return aborted(fmt.Sprintf("deployment is locked by %s until %s", + holder, exp.Format(time.RFC3339))) + } + } + + versionKey := deploymentID + "/" + versionID + createTime := toSDKTime(now) + version := sdkbundle.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionId: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: createTime, + Status: sdkbundle.VersionStatusVersionStatusInProgress, + CliVersion: body.CliVersion, + VersionType: body.VersionType, + TargetName: body.TargetName, + } + state.versions[versionKey] = version + + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = now.Add(lockDuration) + + dep.LastVersionId = versionID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusInProgress + dep.UpdateTime = createTime + state.deployments[deploymentID] = dep + + return Response{Body: version} +} + +// DeploymentMetadataGetVersion is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}. +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + versionKey := deploymentID + "/" + versionID + version, ok := s.deploymentMetadata.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + return Response{Body: version} +} + +// DeploymentMetadataHeartbeat is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat. +// Validates the version is in-progress and holds the lock, then resets the +// lock expiry. +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(_ Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is no longer in progress") + } + + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return aborted("lock is not held by this version") + } + + now := time.Now().UTC() + expiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = expiry + return Response{Body: sdkbundle.HeartbeatResponse{ExpireTime: toSDKTime(expiry)}} +} + +// DeploymentMetadataCompleteVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete. +// Tests can inject a simulated failure by setting the deployment's target_name +// to "fail-complete": the endpoint returns a 500 so the caller exercises its +// "lock release failed" path. +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + if dep, ok := state.deployments[deploymentID]; ok && dep.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "simulated complete version failure", + }, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is already completed") + } + + var body sdkbundle.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + + now := nowPtr() + version.Status = sdkbundle.VersionStatusVersionStatusCompleted + version.CompleteTime = now + version.CompletionReason = body.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + if dep, ok := state.deployments[deploymentID]; ok { + switch body.CompletionReason { + case sdkbundle.VersionCompleteVersionCompleteSuccess: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + case sdkbundle.VersionCompleteVersionCompleteFailure, + sdkbundle.VersionCompleteVersionCompleteForceAbort, + sdkbundle.VersionCompleteVersionCompleteLeaseExpired: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusFailed + } + dep.UpdateTime = now + state.deployments[deploymentID] = dep + } + + return Response{Body: version} +} + +// DeploymentMetadataCreateOperation is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations. +// Records the operation and upserts the deployment-level Resource so a +// follow-up ListResources sees the merged view. +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return badRequest("resource_key is required") + } + + var body sdkbundle.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + now := nowPtr() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + op := sdkbundle.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: now, + ActionType: body.ActionType, + State: body.State, + ResourceId: body.ResourceId, + Status: body.Status, + ErrorMessage: body.ErrorMessage, + } + state.operations[opKey] = op + + resKey := deploymentID + "/" + resourceKey + state.resources[resKey] = sdkbundle.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: body.State, + ResourceId: body.ResourceId, + LastActionType: body.ActionType, + LastVersionId: versionID, + } + + return Response{Body: op} +} + +// DeploymentMetadataListResources is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/resources. +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []sdkbundle.Resource + for key, r := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, r) + } + } + if resources == nil { + resources = []sdkbundle.Resource{} + } + return Response{Body: sdkbundle.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 4870e25e07c..07876994219 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -185,6 +185,10 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + // deploymentMetadata is the in-memory bundle deployment metadata service + // state. Accessed via the DeploymentMetadata* methods. + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -314,6 +318,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ed405470f20..0225f47311e 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -993,4 +993,33 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Bundle deployment metadata service. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) }