diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 00000000000..94323b84d93 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 00000000000..476ba422ef5 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,38 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Updating deployment state... +Deployment complete! +Warn: Failed to release deployment lock: simulated complete version failure + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "fail-complete", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100755 index 00000000000..86220e4be0a --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,8 @@ +# Deploy with the deployment metadata service enabled. +# The target name "fail-complete" instructs the fake DMS server to fail the +# CompleteVersion call so the CLI exercises the "lock release failed" branch. +trace $CLI bundle deploy + +# Print the DMS requests to verify the lock release was attempted. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 00000000000..58ed9f5172c --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,4 @@ +Ignore = [".databricks"] + +# Override target to "fail-complete" which makes the fake DMS server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..1b258d2613a --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,7 @@ +Badness = "Uses local test server; enable on cloud once the deployment metadata service is in production" +Local = true +Cloud = false +RecordRequests = true + +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..afb91ef5f79 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,302 @@ +package lock + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/google/uuid" +) + +// defaultHeartbeatInterval is how often the background heartbeat goroutine +// renews the DMS-side lock lease while a deployment is in progress. +const defaultHeartbeatInterval = 30 * time.Second + +// managedServiceFileName is the workspace state file where the lock package +// persists the DMS deployment_id across CLI invocations. It is intentionally +// scoped to this package for now; once the state-from-DMS path lands the +// file (and accompanying struct) will move to bundle/statemgmt so both the +// lock and state managers can share it. +const managedServiceFileName = "managed_service.json" + +// managedServiceJSON is the on-disk shape of managedServiceFileName. +type managedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} + +// metadataServiceLock implements DeploymentLock against the bundle deployment +// metadata service (DMS). The lock is acquired by creating a new Version +// under the deployment; a background goroutine renews the lock lease via +// Heartbeat calls; the lock is released by CompleteVersion. +type metadataServiceLock struct { + b *bundle.Bundle + versionType sdkbundle.VersionType + + svc sdkbundle.BundleInterface + deploymentID string + versionID string + + stopHeartbeat context.CancelFunc +} + +func newMetadataServiceLock(b *bundle.Bundle, goal Goal) (*metadataServiceLock, error) { + versionType, err := goalToVersionType(goal) + if err != nil { + return nil, err + } + return &metadataServiceLock{b: b, versionType: versionType}, nil +} + +// goalToVersionType maps a deployment Goal onto the DMS VersionType enum. +// Bind and Unbind are not yet supported under DMS — they will gain dedicated +// DMS operations in a later change. +func goalToVersionType(goal Goal) (sdkbundle.VersionType, error) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, nil + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, nil + case GoalBind, GoalUnbind: + return "", fmt.Errorf("%s is not supported with the deployment metadata service", goal) + default: + return "", fmt.Errorf("unknown deployment goal: %s", goal) + } +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return errors.New("force lock is not supported with the deployment metadata service") + } + + l.svc = l.b.WorkspaceClient(ctx).Bundle + + deploymentID, versionID, err := acquireLock(ctx, l.b, l.svc, l.versionType) + if err != nil { + return err + } + + l.deploymentID = deploymentID + l.versionID = versionID + l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + // Stop the heartbeat first so its in-flight request doesn't race with + // CompleteVersion below. + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + // If Acquire failed before reaching CreateVersion there is nothing to release. + if l.svc == nil || l.deploymentID == "" || l.versionID == "" { + return nil + } + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + versionName := fmt.Sprintf("deployments/%s/versions/%s", l.deploymentID, l.versionID) + if _, err := l.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }); err != nil { + return err + } + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", + l.deploymentID, l.versionID, reason) + + // On successful destroy, delete the deployment record. Surface failures + // to the caller — they are deploy-correctness issues, not best-effort + // cleanup. + if status == DeploymentSuccess && l.versionType == sdkbundle.VersionTypeVersionTypeDestroy { + if err := l.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + l.deploymentID, + }); err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + return nil +} + +// acquireLock implements the lock acquisition protocol: +// 1. Resolve the deployment ID from managed_service.json (or generate a new one). +// 2. CreateDeployment for fresh IDs; GetDeployment otherwise to learn the +// next version number. +// 3. CreateVersion to acquire the lock. +func acquireLock(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, versionType sdkbundle.VersionType) (deploymentID, versionID string, err error) { + deploymentID, isNew, err := resolveDeploymentID(ctx, b) + if err != nil { + return "", "", err + } + + if isNew { + // Fresh deployment: create the record at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + // Persist the deployment ID only after the server-side record exists, + // so a failed CreateDeployment doesn't leave a dangling ID on disk. + if err := writeDeploymentID(ctx, b, deploymentID); err != nil { + return "", "", err + } + versionID = "1" + } else { + // Existing deployment: ask the server for the last version ID. + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + if getErr != nil { + return "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + next, parseErr := nextVersionID(dep.LastVersionId) + if parseErr != nil { + return "", "", parseErr + } + versionID = next + } + + if _, err := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }); err != nil { + return "", "", fmt.Errorf("failed to acquire deployment lock: %w", err) + } + + return deploymentID, versionID, nil +} + +// nextVersionID returns the next monotonic version ID following lastVersionID. +// An empty lastVersionID means "no prior versions" so the next ID is "1". +func nextVersionID(lastVersionID string) (string, error) { + if lastVersionID == "" { + return "1", nil + } + n, err := strconv.ParseInt(lastVersionID, 10, 64) + if err != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", lastVersionID, err) + } + return strconv.FormatInt(n+1, 10), nil +} + +// resolveDeploymentID returns the deployment ID for this bundle. If +// managed_service.json exists in the workspace state directory and contains a +// deployment ID, it is reused. Otherwise a new UUID is generated and the +// caller must write it to disk after CreateDeployment succeeds. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, error) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return "", false, fmt.Errorf("failed to create state filer: %w", err) + } + + reader, readErr := f.Read(ctx, managedServiceFileName) + if readErr == nil { + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, err) + } + var sj managedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + return "", false, fmt.Errorf("failed to parse %s: %w", managedServiceFileName, err) + } + if sj.DeploymentID != "" { + return sj.DeploymentID, false, nil + } + // File exists but has no deployment_id — treat as fresh. + } else if !errors.Is(readErr, fs.ErrNotExist) { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, readErr) + } + + return uuid.New().String(), true, nil +} + +func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID string) error { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return fmt.Errorf("failed to create state filer: %w", err) + } + data, err := json.Marshal(managedServiceJSON{DeploymentID: deploymentID}) + if err != nil { + return fmt.Errorf("failed to marshal %s: %w", managedServiceFileName, err) + } + if err := f.Write(ctx, managedServiceFileName, bytes.NewReader(data), + filer.CreateParentDirectories, filer.OverwriteIfExists); err != nil { + return fmt.Errorf("failed to write %s: %w", managedServiceFileName, err) + } + return nil +} + +// startHeartbeat spawns a goroutine that renews the DMS lock lease at +// defaultHeartbeatInterval. The returned cancel func stops the goroutine. +// Heartbeat errors that indicate the version was already completed (HTTP 409 +// ABORTED) are treated as benign termination; all other errors are logged +// and the goroutine continues so a transient network blip doesn't tear down +// the deploy. +func startHeartbeat(parent context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(parent) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}); err != nil { + if isAborted(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + continue + } + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", + deploymentID, versionID) + } + } + }() + + return cancel +} + +// isAborted reports whether err is the DMS-specific "409 ABORTED" response +// the server emits when the heartbeat target version is no longer active. +func isAborted(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" { + return true + } + return false +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 00000000000..76b09d6532c --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,58 @@ +package lock + +import ( + "testing" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" +) + +func TestGoalToVersionType(t *testing.T) { + tests := []struct { + goal Goal + want sdkbundle.VersionType + wantErr bool + }{ + {goal: GoalDeploy, want: sdkbundle.VersionTypeVersionTypeDeploy}, + {goal: GoalDestroy, want: sdkbundle.VersionTypeVersionTypeDestroy}, + {goal: GoalBind, wantErr: true}, + {goal: GoalUnbind, wantErr: true}, + {goal: Goal("garbage"), wantErr: true}, + } + for _, tt := range tests { + t.Run(string(tt.goal), func(t *testing.T) { + got, err := goalToVersionType(tt.goal) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNextVersionID(t *testing.T) { + tests := []struct { + name string + last string + want string + wantErr bool + }{ + {name: "empty starts at 1", last: "", want: "1"}, + {name: "increments numeric", last: "1", want: "2"}, + {name: "increments larger numeric", last: "42", want: "43"}, + {name: "rejects non-numeric", last: "v1", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := nextVersionID(tt.last) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go index 6e3339d6fdb..58162aba946 100644 --- a/bundle/deploy/lock/lock.go +++ b/bundle/deploy/lock/lock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" ) // Goal describes the purpose of a deployment operation. @@ -33,9 +34,17 @@ type DeploymentLock interface { Release(ctx context.Context, status DeploymentStatus) error } -// NewDeploymentLock returns a DeploymentLock backed by the workspace -// filesystem. This factory exists so a future change can swap in alternative -// lock implementations without touching callers. -func NewDeploymentLock(b *bundle.Bundle, goal Goal) DeploymentLock { - return newWorkspaceFilesystemLock(b, goal) +// NewDeploymentLock returns a DeploymentLock implementation chosen based on +// the DATABRICKS_BUNDLE_MANAGED_STATE environment variable: when set to a +// truthy value the deployment metadata service backs the lock, otherwise the +// historical workspace-filesystem lock is used. +// +// Note: today the env var alone gates the DMS path. Once the broader managed- +// state feature lands the gate will move behind a richer predicate (e.g. +// statemgmt.IsDmsActive) that also checks server-side opt-in. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) (DeploymentLock, error) { + if env.IsManagedState(ctx) { + return newMetadataServiceLock(b, goal) + } + return newWorkspaceFilesystemLock(b, goal), nil } diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 00000000000..004f22c495b --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,30 @@ +package env + +import ( + "context" + + envlib "github.com/databricks/cli/libs/env" +) + +// managedStateVariable names the environment variable that controls whether +// server-managed state (via the deployment metadata service) is used for +// locking and, in a future change, resource state management. +// +// The variable is treated as a boolean and accepts the usual spellings: +// "true"/"false", "1"/"0", "yes"/"no", "on"/"off" (case-insensitive). An +// empty or absent value falls back to the historical filesystem-based +// behavior. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the raw value of DATABRICKS_BUNDLE_MANAGED_STATE if +// set. Callers that only need a bool should use IsManagedState. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{managedStateVariable}) +} + +// IsManagedState reports whether the DATABRICKS_BUNDLE_MANAGED_STATE +// environment variable is set to a truthy value. +func IsManagedState(ctx context.Context) bool { + v, ok := envlib.GetBool(ctx, managedStateVariable) + return ok && v +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 7b3ce12df64..943d4902cd1 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,7 +23,11 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - dl := lock.NewDeploymentLock(b, lock.GoalBind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return @@ -126,7 +130,11 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - dl := lock.NewDeploymentLock(b, lock.GoalUnbind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index d23b9ac2c91..f0cceda3c92 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -131,7 +131,11 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - dl := lock.NewDeploymentLock(b, lock.GoalDeploy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 741a30c99c0..27ddc2cb725 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,7 +120,11 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - dl := lock.NewDeploymentLock(b, lock.GoalDestroy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 00000000000..277dd1e112e --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,405 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + sdktime "github.com/databricks/databricks-sdk-go/common/types/time" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// deploymentMetadata holds in-memory state for the deployment metadata +// service. One instance lives inside each FakeWorkspace so tests can drive +// CRUD against the DMS routes the same way they drive jobs/apps/etc. +type deploymentMetadata struct { + // deployments keyed by deployment_id. + deployments map[string]sdkbundle.Deployment + + // versions keyed by "deploymentId/versionId". + versions map[string]sdkbundle.Version + + // operations keyed by "deploymentId/versionId/resourceKey". + operations map[string]sdkbundle.Operation + + // resources keyed by "deploymentId/resourceKey". + resources map[string]sdkbundle.Resource + + // lockHolder maps deploymentId -> the full version name that holds the + // lock (e.g. "deployments/{id}/versions/{vid}"). Absent when no lock is + // held. + lockHolder map[string]string + + // lockExpiry maps deploymentId -> when the lock expires; checked against + // time.Now() on every lock-acquiring or lock-respecting call. + lockExpiry map[string]time.Time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]sdkbundle.Deployment{}, + versions: map[string]sdkbundle.Version{}, + operations: map[string]sdkbundle.Operation{}, + resources: map[string]sdkbundle.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +// lockDuration matches the real service's default lease so heartbeat-renewal +// tests have a comfortable margin. +const lockDuration = 2 * time.Minute + +func nowPtr() *sdktime.Time { + return sdktime.New(time.Now().UTC()) +} + +func toSDKTime(t time.Time) *sdktime.Time { + return sdktime.New(t.UTC()) +} + +func badRequest(msg string) Response { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": msg}, + } +} + +func notFound(msg string) Response { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": msg}, + } +} + +func aborted(msg string) Response { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": msg}, + } +} + +// DeploymentMetadataCreateDeployment is mounted at +// POST /api/2.0/bundle/deployments. The SDK sends the inner Deployment as the +// body and passes deployment_id as a query parameter. +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return badRequest("deployment_id is required") + } + + var body sdkbundle.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ALREADY_EXISTS", + "message": fmt.Sprintf("deployment %s already exists", deploymentID), + }, + } + } + + now := nowPtr() + dep := sdkbundle.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: body.TargetName, + Status: sdkbundle.DeploymentStatusDeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: now, + UpdateTime: now, + } + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataGetDeployment is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.deploymentMetadata.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + return Response{Body: dep} +} + +// DeploymentMetadataDeleteDeployment is mounted at +// DELETE /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + now := nowPtr() + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusDeleted + dep.DestroyTime = now + dep.DestroyedBy = s.CurrentUser().UserName + dep.UpdateTime = now + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataCreateVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions. Body = Version, +// query = version_id. Validates monotonic version IDs and enforces the +// deployment-level lock. +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return badRequest("version_id is required") + } + + var body sdkbundle.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + // Enforce monotonic versions: version_id must equal last_version_id + 1. + expected := "1" + if dep.LastVersionId != "" { + n, err := strconv.ParseInt(dep.LastVersionId, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "stored last_version_id is not a valid number: " + dep.LastVersionId, + }, + } + } + expected = strconv.FormatInt(n+1, 10) + } + if versionID != expected { + return aborted(fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", + expected, versionID)) + } + + // Enforce lock: if a lock is held and not expired, reject. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if exp, ok := state.lockExpiry[deploymentID]; ok && exp.After(now) { + return aborted(fmt.Sprintf("deployment is locked by %s until %s", + holder, exp.Format(time.RFC3339))) + } + } + + versionKey := deploymentID + "/" + versionID + createTime := toSDKTime(now) + version := sdkbundle.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionId: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: createTime, + Status: sdkbundle.VersionStatusVersionStatusInProgress, + CliVersion: body.CliVersion, + VersionType: body.VersionType, + TargetName: body.TargetName, + } + state.versions[versionKey] = version + + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = now.Add(lockDuration) + + dep.LastVersionId = versionID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusInProgress + dep.UpdateTime = createTime + state.deployments[deploymentID] = dep + + return Response{Body: version} +} + +// DeploymentMetadataGetVersion is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}. +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + versionKey := deploymentID + "/" + versionID + version, ok := s.deploymentMetadata.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + return Response{Body: version} +} + +// DeploymentMetadataHeartbeat is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat. +// Validates the version is in-progress and holds the lock, then resets the +// lock expiry. +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(_ Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is no longer in progress") + } + + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return aborted("lock is not held by this version") + } + + now := time.Now().UTC() + expiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = expiry + return Response{Body: sdkbundle.HeartbeatResponse{ExpireTime: toSDKTime(expiry)}} +} + +// DeploymentMetadataCompleteVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete. +// Tests can inject a simulated failure by setting the deployment's target_name +// to "fail-complete": the endpoint returns a 500 so the caller exercises its +// "lock release failed" path. +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + if dep, ok := state.deployments[deploymentID]; ok && dep.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "simulated complete version failure", + }, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is already completed") + } + + var body sdkbundle.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + + now := nowPtr() + version.Status = sdkbundle.VersionStatusVersionStatusCompleted + version.CompleteTime = now + version.CompletionReason = body.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + if dep, ok := state.deployments[deploymentID]; ok { + switch body.CompletionReason { + case sdkbundle.VersionCompleteVersionCompleteSuccess: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + case sdkbundle.VersionCompleteVersionCompleteFailure, + sdkbundle.VersionCompleteVersionCompleteForceAbort, + sdkbundle.VersionCompleteVersionCompleteLeaseExpired: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusFailed + } + dep.UpdateTime = now + state.deployments[deploymentID] = dep + } + + return Response{Body: version} +} + +// DeploymentMetadataCreateOperation is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations. +// Records the operation and upserts the deployment-level Resource so a +// follow-up ListResources sees the merged view. +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return badRequest("resource_key is required") + } + + var body sdkbundle.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + now := nowPtr() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + op := sdkbundle.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: now, + ActionType: body.ActionType, + State: body.State, + ResourceId: body.ResourceId, + Status: body.Status, + ErrorMessage: body.ErrorMessage, + } + state.operations[opKey] = op + + resKey := deploymentID + "/" + resourceKey + state.resources[resKey] = sdkbundle.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: body.State, + ResourceId: body.ResourceId, + LastActionType: body.ActionType, + LastVersionId: versionID, + } + + return Response{Body: op} +} + +// DeploymentMetadataListResources is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/resources. +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []sdkbundle.Resource + for key, r := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, r) + } + } + if resources == nil { + resources = []sdkbundle.Resource{} + } + return Response{Body: sdkbundle.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 4870e25e07c..07876994219 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -185,6 +185,10 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + // deploymentMetadata is the in-memory bundle deployment metadata service + // state. Accessed via the DeploymentMetadata* methods. + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -314,6 +318,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ed405470f20..0225f47311e 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -993,4 +993,33 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Bundle deployment metadata service. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) }