From ac6c5431b07a0e11142aae5c2b3904bcd2927341 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 24 Jun 2026 23:22:43 -0700 Subject: [PATCH] refactor(runway): collapse orchestrator into single-service domain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? Runway only ever had a single service, so the per-domain `gateway/` + `orchestrator/` split wrapped it in an empty service layer. Making the runway domain *be* the service removes that indirection and matches how runway actually works — a consumer-only landing service with no gateway. ### What? Collapses the `orchestrator` service segment out of the runway domain: - `runway/orchestrator/controller/*` → `runway/controller/*` - `api/runway/orchestrator/{proto,protopb}` → `api/runway/{proto,protopb}` - `example/runway/orchestrator/{client,server}` → `example/runway/{client,server}` The proto package becomes `uber.runway` (was `uber.runway.orchestrator`) and the RPC service `RunwayOrchestrator` becomes `Runway`. Server/client identifiers, metric scope, and the `ServiceName` ping field follow suit. Build tooling (`tool/proto`, `Makefile` targets, `docker-compose.yml`, `Dockerfile`) and docs (`CLAUDE.md`, `runway/README.md`, runway workflow RFC) are updated, documenting the single-service-domain pattern. ## Test Plan - ✅ `make build` - ✅ `make proto` + `make gazelle` (idempotent) - ✅ runway tests: `//runway/...`, `//api/runway/...`, `//example/runway/...` Co-authored-by: Cursor --- BUILD.bazel | 2 +- CLAUDE.md | 18 +- Makefile | 50 ++-- .../protopb/orchestrator.pb.yarpc.go | 255 ------------------ .../{orchestrator => }/proto/BUILD.bazel | 20 +- .../orchestrator.proto => proto/runway.proto} | 12 +- .../{orchestrator => }/protopb/BUILD.bazel | 8 +- .../runway.pb.go} | 74 ++--- api/runway/protopb/runway.pb.yarpc.go | 254 +++++++++++++++++ .../runway_grpc.pb.go} | 74 ++--- doc/rfc/runway/workflow.md | 6 +- .../{orchestrator => }/client/BUILD.bazel | 10 +- .../runway/{orchestrator => }/client/main.go | 8 +- .../{orchestrator => }/server/BUILD.bazel | 16 +- .../{orchestrator => }/server/Dockerfile | 6 +- .../server/docker-compose.yml | 14 +- .../runway/{orchestrator => }/server/main.go | 34 +-- runway/README.md | 9 +- .../{orchestrator => }/controller/BUILD.bazel | 6 +- .../controller/merge/BUILD.bazel | 2 +- .../controller/merge/merge.go | 2 +- .../controller/merge/merge_test.go | 0 .../controller/mergeconflictcheck/BUILD.bazel | 2 +- .../mergeconflictcheck/mergeconflictcheck.go | 2 +- .../mergeconflictcheck_test.go | 0 runway/{orchestrator => }/controller/ping.go | 8 +- .../controller/ping_test.go | 4 +- runway/orchestrator/README.md | 9 - tool/proto/BUILD.bazel | 8 +- 29 files changed, 456 insertions(+), 457 deletions(-) delete mode 100644 api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go rename api/runway/{orchestrator => }/proto/BUILD.bazel (54%) rename api/runway/{orchestrator/proto/orchestrator.proto => proto/runway.proto} (79%) rename api/runway/{orchestrator => }/protopb/BUILD.bazel (80%) rename api/runway/{orchestrator/protopb/orchestrator.pb.go => protopb/runway.pb.go} (68%) create mode 100644 api/runway/protopb/runway.pb.yarpc.go rename api/runway/{orchestrator/protopb/orchestrator_grpc.pb.go => protopb/runway_grpc.pb.go} (54%) rename example/runway/{orchestrator => }/client/BUILD.bazel (58%) rename example/runway/{orchestrator => }/client/main.go (88%) rename example/runway/{orchestrator => }/server/BUILD.bazel (63%) rename example/runway/{orchestrator => }/server/Dockerfile (53%) rename example/runway/{orchestrator => }/server/docker-compose.yml (80%) rename example/runway/{orchestrator => }/server/main.go (85%) rename runway/{orchestrator => }/controller/BUILD.bazel (77%) rename runway/{orchestrator => }/controller/merge/BUILD.bazel (90%) rename runway/{orchestrator => }/controller/merge/merge.go (97%) rename runway/{orchestrator => }/controller/merge/merge_test.go (100%) rename runway/{orchestrator => }/controller/mergeconflictcheck/BUILD.bazel (89%) rename runway/{orchestrator => }/controller/mergeconflictcheck/mergeconflictcheck.go (99%) rename runway/{orchestrator => }/controller/mergeconflictcheck/mergeconflictcheck_test.go (100%) rename runway/{orchestrator => }/controller/ping.go (86%) rename runway/{orchestrator => }/controller/ping_test.go (93%) delete mode 100644 runway/orchestrator/README.md diff --git a/BUILD.bazel b/BUILD.bazel index 2774adc9..918d25e3 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -10,7 +10,7 @@ load("@gazelle//:def.bzl", "gazelle") # gazelle:resolve go github.com/uber/submitqueue/api/base/change/protopb //api/base/change/protopb # gazelle:resolve go github.com/uber/submitqueue/api/base/mergestrategy/protopb //api/base/mergestrategy/protopb # gazelle:resolve go github.com/uber/submitqueue/api/runway/messagequeue/protopb //api/runway/messagequeue/protopb -# gazelle:resolve go github.com/uber/submitqueue/api/runway/orchestrator/protopb //api/runway/orchestrator/protopb +# gazelle:resolve go github.com/uber/submitqueue/api/runway/protopb //api/runway/protopb # gazelle:resolve go github.com/uber/submitqueue/api/submitqueue/gateway/protopb //api/submitqueue/gateway/protopb # gazelle:resolve go github.com/uber/submitqueue/api/submitqueue/orchestrator/protopb //api/submitqueue/orchestrator/protopb # gazelle:resolve go github.com/uber/submitqueue/api/stovepipe/gateway/protopb //api/stovepipe/gateway/protopb diff --git a/CLAUDE.md b/CLAUDE.md index 11a3b070..6a6ecbd2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -39,6 +39,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi ├── api/ # Published wire contracts (cross-domain/external) │ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto) │ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/ +│ ├── runway/{proto,protopb}/ # RPC (proto) — single-service domain, no service segment │ └── runway/messagequeue/ # external queue contracts (proto + protojson) ├── platform/ # SHARED cross-domain packages — no domain deps │ ├── errs/, metrics/, consumer/, http/ @@ -56,10 +57,13 @@ submitqueue/ # repo root (Go module github.com/uber/submi │ ├── entity/ # Stovepipe-specific domain entities │ ├── extension/ # Stovepipe-specific extension impls │ └── core/ # Stovepipe-internal shared infra (placeholder; mirrors submitqueue/core) +├── runway/ # Runway domain (single service — the domain *is* the service) +│ └── controller/ # Runway service controllers (consumes the merge queues; no gateway/orchestrator split) ├── tool/ # Development and CI tooling ├── example/ │ ├── submitqueue/ # Runnable SubmitQueue servers/clients + Docker Compose -│ └── stovepipe/ # Runnable Stovepipe servers/clients +│ ├── stovepipe/ # Runnable Stovepipe servers/clients +│ └── runway/ # Runnable Runway server/client + Docker Compose ├── test/ │ ├── e2e/submitqueue/ # End-to-end tests (full stack) │ ├── integration/ # Integration tests (platform/, submitqueue/, stovepipe/, …) @@ -67,9 +71,9 @@ submitqueue/ # repo root (Go module github.com/uber/submi └── doc/ # Documentation ``` -The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services. +The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). A multi-service **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services. A **single-service domain** collapses that split — the domain *is* the service, so its controllers live directly under the domain root (e.g. `runway/controller/`) with no `gateway/`/`orchestrator/` segment. `runway` is the reference example: a consumer-only landing service with no gateway. -The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`. +The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); for a single-service domain the service segment is dropped, so the contract lives directly at `api/{domain}/` (e.g. `api/runway/{proto,protopb}/`). A service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`. ### Platform notes @@ -90,7 +94,7 @@ Each service follows the same layout: └── {step}_test.go ``` -Wire contracts for a service live separately under `api/{domain}/{service}/` (see Project Layout): `proto/` holds `.proto` sources and `protopb/` holds the committed generated stubs. +Wire contracts for a service live separately under `api/{domain}/{service}/` (see Project Layout): `proto/` holds `.proto` sources and `protopb/` holds the committed generated stubs. For a single-service domain the service root *is* the domain root (e.g. `runway/controller/`), and its wire contract lives at `api/{domain}/` (e.g. `api/runway/`) with no service segment. ### Controllers @@ -156,9 +160,9 @@ When in doubt, ask: *"If the next implementation were DynamoDB / Kafka / Bigtabl Paths follow the directory layout: shared packages live under `platform/` at the repo root; domain code nests under `submitqueue/`, `stovepipe/`, and other domain folders. -- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`) -- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}` -- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb` +- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`; single-service domains drop the `{service}` segment, e.g. `.../runway/controller`) +- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}` (single-service: `.../runway/controller/{step}`, e.g. `.../runway/controller/merge`) +- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb` (single-service: `.../api/{domain}/protopb`, e.g. `.../api/runway/protopb`) - Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue` - Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`) - Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`) diff --git a/Makefile b/Makefile index 00a6655c..0453ee9e 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ STOVEPIPE_STACK_COMPOSE_FILE = example/stovepipe/docker-compose.yml STOVEPIPE_LOCAL_PROJECT = stovepipe # Runway compose files -RUNWAY_ORCHESTRATOR_COMPOSE_FILE = example/runway/orchestrator/server/docker-compose.yml +RUNWAY_COMPOSE_FILE = example/runway/server/docker-compose.yml # Fixed project name for local manual testing (tests use unique random names) RUNWAY_LOCAL_PROJECT = runway @@ -37,7 +37,7 @@ GOIMPORTS_VERSION ?= v0.33.0 # (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A # package may hold multiple .proto files (e.g. an RPC contract plus messagequeue # contracts); all generated stubs land in the same protopb/ dir. -PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway/orchestrator api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator +PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/runway api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) @@ -53,7 +53,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-runway-orchestrator-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-orchestrator-start local-runway-orchestrator-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway-orchestrator run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-runway-linux build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-submitqueue-consumer integration-test-extensions integration-test-submitqueue-gateway integration-test-submitqueue-orchestrator license-fix lint lint-fmt lint-license local-init-runway-queue-schema local-runway-start local-runway-stop local-submitqueue-clean local-submitqueue-gateway-start local-submitqueue-gateway-stop local-init-submitqueue-schemas local-init-stovepipe-queue-schema local-submitqueue-logs local-submitqueue-orchestrator-start local-submitqueue-orchestrator-stop local-submitqueue-ps local-submitqueue-restart local-submitqueue-start local-stop local-stovepipe-gateway-start local-stovepipe-orchestrator-start local-stovepipe-start mocks proto query-deps query-targets run-client-runway run-client-submitqueue-gateway run-client-submitqueue-orchestrator run-client-stovepipe-gateway run-client-stovepipe-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -62,16 +62,16 @@ build: ## Build all services and examples @echo "Build complete!" # Build Linux binaries required for Docker containers -build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux build-runway-orchestrator-linux ## Build all Linux binaries for Docker +build-all-linux: build-submitqueue-gateway-linux build-submitqueue-orchestrator-linux build-stovepipe-gateway-linux build-stovepipe-orchestrator-linux build-runway-linux ## Build all Linux binaries for Docker @echo "All Linux binaries ready for Docker" -build-runway-orchestrator-linux: ## Build Runway orchestrator Linux binary for Docker - @echo "Building Runway orchestrator Linux binary for Docker..." - @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator +build-runway-linux: ## Build Runway Linux binary for Docker + @echo "Building Runway Linux binary for Docker..." + @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/server:runway @mkdir -p .docker-bin - @cp -f bazel-bin/example/runway/orchestrator/server/orchestrator_/orchestrator .docker-bin/runway-orchestrator 2>/dev/null || \ - cp -f bazel-bin/example/runway/orchestrator/server/orchestrator .docker-bin/runway-orchestrator - @echo "Runway orchestrator Linux binary ready at .docker-bin/runway-orchestrator" + @cp -f bazel-bin/example/runway/server/runway_/runway .docker-bin/runway 2>/dev/null || \ + cp -f bazel-bin/example/runway/server/runway .docker-bin/runway + @echo "Runway Linux binary ready at .docker-bin/runway" build-submitqueue-gateway-linux: ## Build Gateway Linux binary for Docker @echo "Building Gateway Linux binary for Docker..." @@ -239,23 +239,23 @@ local-init-runway-queue-schema: ## Apply queue schema only (mysql-queue) for Run done @echo "✅ Runway queue schema applied successfully" -local-runway-orchestrator-start: build-runway-orchestrator-linux ## Start Runway orchestrator locally (orchestrator + MySQL queue) - @echo "Starting Runway orchestrator with compose..." - @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait +local-runway-start: build-runway-linux ## Start Runway locally (runway + MySQL queue) + @echo "Starting Runway with compose..." + @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) up -d --build --wait @echo "Applying queue schema to mysql-queue (no Runway app schema)..." @$(MAKE) -s local-init-runway-queue-schema @echo "" - @echo "✅ Runway orchestrator is running!" + @echo "✅ Runway is running!" @echo "" - @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps + @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) ps @echo "" - @echo "Runway orchestrator gRPC port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-orchestrator-service-1 8080 2>/dev/null | cut -d: -f2 || echo 'unknown')" - @echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')" + @echo "Runway gRPC port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-runway-service-1 8080 2>/dev/null | cut -d: -f2 || echo 'unknown')" + @echo "MySQL Queue port: $$(docker port $(RUNWAY_LOCAL_PROJECT)-mysql-queue-1 3306 2>/dev/null | cut -d: -f2 || echo 'unknown')" -local-runway-orchestrator-stop: ## Stop Runway orchestrator service - @echo "Stopping Runway orchestrator services..." - @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down - @echo "Runway orchestrator services stopped." +local-runway-stop: ## Stop Runway service + @echo "Stopping Runway services..." + @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down + @echo "Runway services stopped." local-submitqueue-logs: ## View logs from all running services @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) logs -f @@ -322,7 +322,7 @@ local-stop: ## Stop all services (keep data) @echo "Stopping all services..." @$(COMPOSE) -f $(COMPOSE_FILE) -p $(SUBMITQUEUE_LOCAL_PROJECT) down @$(COMPOSE) -f $(STOVEPIPE_STACK_COMPOSE_FILE) -p $(STOVEPIPE_LOCAL_PROJECT) down - @$(COMPOSE) -f $(RUNWAY_ORCHESTRATOR_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down + @$(COMPOSE) -f $(RUNWAY_COMPOSE_FILE) -p $(RUNWAY_LOCAL_PROJECT) down @echo "Services stopped. Data volumes preserved." local-stovepipe-logs: ## View logs from all running Stovepipe services @@ -419,9 +419,9 @@ run-client-stovepipe-gateway: run-client-stovepipe-orchestrator: @$(BAZEL) run //example/stovepipe/orchestrator/client:orchestrator -- -addr $(or $(SERVER_ADDR),localhost:8084) -message "$(or $(MESSAGE),ping)" -# Run runway orchestrator client (connects to any running runway orchestrator service) -run-client-runway-orchestrator: - @$(BAZEL) run //example/runway/orchestrator/client:orchestrator -- -addr $(or $(SERVER_ADDR),localhost:8086) -message "$(or $(MESSAGE),ping)" +# Run runway client (connects to any running runway service) +run-client-runway: + @$(BAZEL) run //example/runway/client:runway -- -addr $(or $(SERVER_ADDR),localhost:8086) -message "$(or $(MESSAGE),ping)" run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics") @$(BAZEL) run //platform/extension/messagequeue/mysql/ctl -- $(ARGS) diff --git a/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go b/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go deleted file mode 100644 index f2174e13..00000000 --- a/api/runway/orchestrator/protopb/orchestrator.pb.yarpc.go +++ /dev/null @@ -1,255 +0,0 @@ -// Code generated by protoc-gen-yarpc-go. DO NOT EDIT. -// source: orchestrator.proto - -package protopb - -import ( - "context" - "io/ioutil" - "reflect" - - "go.uber.org/fx" - "go.uber.org/yarpc" - "go.uber.org/yarpc/api/transport" - "go.uber.org/yarpc/api/x/restriction" - "go.uber.org/yarpc/encoding/protobuf/reflection" - v2 "go.uber.org/yarpc/encoding/protobuf/v2" - "google.golang.org/protobuf/proto" -) - -var _ = ioutil.NopCloser - -// RunwayOrchestratorYARPCClient is the YARPC client-side interface for the RunwayOrchestrator service. -type RunwayOrchestratorYARPCClient interface { - Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) -} - -func newRunwayOrchestratorYARPCClient(clientConfig transport.ClientConfig, anyResolver v2.AnyResolver, options ...v2.ClientOption) RunwayOrchestratorYARPCClient { - return &_RunwayOrchestratorYARPCCaller{v2.NewStreamClient( - v2.ClientParams{ - ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", - ClientConfig: clientConfig, - AnyResolver: anyResolver, - Options: options, - }, - )} -} - -// NewRunwayOrchestratorYARPCClient builds a new YARPC client for the RunwayOrchestrator service. -func NewRunwayOrchestratorYARPCClient(clientConfig transport.ClientConfig, options ...v2.ClientOption) RunwayOrchestratorYARPCClient { - return newRunwayOrchestratorYARPCClient(clientConfig, nil, options...) -} - -// RunwayOrchestratorYARPCServer is the YARPC server-side interface for the RunwayOrchestrator service. -type RunwayOrchestratorYARPCServer interface { - Ping(context.Context, *PingRequest) (*PingResponse, error) -} - -type buildRunwayOrchestratorYARPCProceduresParams struct { - Server RunwayOrchestratorYARPCServer - AnyResolver v2.AnyResolver -} - -func buildRunwayOrchestratorYARPCProcedures(params buildRunwayOrchestratorYARPCProceduresParams) []transport.Procedure { - handler := &_RunwayOrchestratorYARPCHandler{params.Server} - return v2.BuildProcedures( - v2.BuildProceduresParams{ - ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", - UnaryHandlerParams: []v2.BuildProceduresUnaryHandlerParams{ - { - MethodName: "Ping", - Handler: v2.NewUnaryHandler( - v2.UnaryHandlerParams{ - Handle: handler.Ping, - NewRequest: newRunwayOrchestratorServicePingYARPCRequest, - AnyResolver: params.AnyResolver, - }, - ), - }, - }, - OnewayHandlerParams: []v2.BuildProceduresOnewayHandlerParams{}, - StreamHandlerParams: []v2.BuildProceduresStreamHandlerParams{}, - }, - ) -} - -// BuildRunwayOrchestratorYARPCProcedures prepares an implementation of the RunwayOrchestrator service for YARPC registration. -func BuildRunwayOrchestratorYARPCProcedures(server RunwayOrchestratorYARPCServer) []transport.Procedure { - return buildRunwayOrchestratorYARPCProcedures(buildRunwayOrchestratorYARPCProceduresParams{Server: server}) -} - -// FxRunwayOrchestratorYARPCClientParams defines the input -// for NewFxRunwayOrchestratorYARPCClient. It provides the -// paramaters to get a RunwayOrchestratorYARPCClient in an -// Fx application. -type FxRunwayOrchestratorYARPCClientParams struct { - fx.In - - Provider yarpc.ClientConfig - AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` - Restriction restriction.Checker `optional:"true"` -} - -// FxRunwayOrchestratorYARPCClientResult defines the output -// of NewFxRunwayOrchestratorYARPCClient. It provides a -// RunwayOrchestratorYARPCClient to an Fx application. -type FxRunwayOrchestratorYARPCClientResult struct { - fx.Out - - Client RunwayOrchestratorYARPCClient - - // We are using an fx.Out struct here instead of just returning a client - // so that we can add more values or add named versions of the client in - // the future without breaking any existing code. -} - -// NewFxRunwayOrchestratorYARPCClient provides a RunwayOrchestratorYARPCClient -// to an Fx application using the given name for routing. -// -// fx.Provide( -// protopb.NewFxRunwayOrchestratorYARPCClient("service-name"), -// ... -// ) -func NewFxRunwayOrchestratorYARPCClient(name string, options ...v2.ClientOption) interface{} { - return func(params FxRunwayOrchestratorYARPCClientParams) FxRunwayOrchestratorYARPCClientResult { - cc := params.Provider.ClientConfig(name) - - if params.Restriction != nil { - if namer, ok := cc.GetUnaryOutbound().(transport.Namer); ok { - if err := params.Restriction.Check(v2.Encoding, namer.TransportName()); err != nil { - panic(err.Error()) - } - } - } - - return FxRunwayOrchestratorYARPCClientResult{ - Client: newRunwayOrchestratorYARPCClient(cc, params.AnyResolver, options...), - } - } -} - -// FxRunwayOrchestratorYARPCProceduresParams defines the input -// for NewFxRunwayOrchestratorYARPCProcedures. It provides the -// paramaters to get RunwayOrchestratorYARPCServer procedures in an -// Fx application. -type FxRunwayOrchestratorYARPCProceduresParams struct { - fx.In - - Server RunwayOrchestratorYARPCServer - AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` -} - -// FxRunwayOrchestratorYARPCProceduresResult defines the output -// of NewFxRunwayOrchestratorYARPCProcedures. It provides -// RunwayOrchestratorYARPCServer procedures to an Fx application. -// -// The procedures are provided to the "yarpcfx" value group. -// Dig 1.2 or newer must be used for this feature to work. -type FxRunwayOrchestratorYARPCProceduresResult struct { - fx.Out - - Procedures []transport.Procedure `group:"yarpcfx"` - ReflectionMeta reflection.ServerMeta `group:"yarpcfx"` -} - -// NewFxRunwayOrchestratorYARPCProcedures provides RunwayOrchestratorYARPCServer procedures to an Fx application. -// It expects a RunwayOrchestratorYARPCServer to be present in the container. -// -// fx.Provide( -// protopb.NewFxRunwayOrchestratorYARPCProcedures(), -// ... -// ) -func NewFxRunwayOrchestratorYARPCProcedures() interface{} { - return func(params FxRunwayOrchestratorYARPCProceduresParams) FxRunwayOrchestratorYARPCProceduresResult { - return FxRunwayOrchestratorYARPCProceduresResult{ - Procedures: buildRunwayOrchestratorYARPCProcedures(buildRunwayOrchestratorYARPCProceduresParams{ - Server: params.Server, - AnyResolver: params.AnyResolver, - }), - ReflectionMeta: reflection.ServerMeta{ - ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", - FileDescriptors: yarpcFileDescriptorClosure96b6e6782baaa298, - }, - } - } -} - -type _RunwayOrchestratorYARPCCaller struct { - streamClient v2.StreamClient -} - -func (c *_RunwayOrchestratorYARPCCaller) Ping(ctx context.Context, request *PingRequest, options ...yarpc.CallOption) (*PingResponse, error) { - responseMessage, err := c.streamClient.Call(ctx, "Ping", request, newRunwayOrchestratorServicePingYARPCResponse, options...) - if responseMessage == nil { - return nil, err - } - response, ok := responseMessage.(*PingResponse) - if !ok { - return nil, v2.CastError(emptyRunwayOrchestratorServicePingYARPCResponse, responseMessage) - } - return response, err -} - -type _RunwayOrchestratorYARPCHandler struct { - server RunwayOrchestratorYARPCServer -} - -func (h *_RunwayOrchestratorYARPCHandler) Ping(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { - var request *PingRequest - var ok bool - if requestMessage != nil { - request, ok = requestMessage.(*PingRequest) - if !ok { - return nil, v2.CastError(emptyRunwayOrchestratorServicePingYARPCRequest, requestMessage) - } - } - response, err := h.server.Ping(ctx, request) - if response == nil { - return nil, err - } - return response, err -} - -func newRunwayOrchestratorServicePingYARPCRequest() proto.Message { - return &PingRequest{} -} - -func newRunwayOrchestratorServicePingYARPCResponse() proto.Message { - return &PingResponse{} -} - -var ( - emptyRunwayOrchestratorServicePingYARPCRequest = &PingRequest{} - emptyRunwayOrchestratorServicePingYARPCResponse = &PingResponse{} -) - -var yarpcFileDescriptorClosure96b6e6782baaa298 = [][]byte{ - // orchestrator.proto - []byte{ - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x3f, 0x4f, 0xc3, 0x30, - 0x10, 0xc5, 0x09, 0xad, 0x80, 0x5e, 0xbb, 0xe0, 0x29, 0xaa, 0x18, 0x4a, 0x24, 0x20, 0x93, 0x2d, - 0xc1, 0xc8, 0xd6, 0x0f, 0x00, 0x51, 0x16, 0x24, 0x16, 0xe4, 0x44, 0xa7, 0xc4, 0x83, 0x63, 0xd7, - 0x67, 0x83, 0x90, 0x58, 0xf9, 0xde, 0x28, 0x0e, 0x7f, 0x3c, 0x50, 0x75, 0xf3, 0xdd, 0xbd, 0xf7, - 0x74, 0x3f, 0x1f, 0x30, 0xe3, 0xda, 0x1e, 0xc9, 0x3b, 0xe9, 0x8d, 0xe3, 0xd6, 0x19, 0x6f, 0x58, - 0x1e, 0x1a, 0x74, 0xdc, 0x85, 0xe1, 0x4d, 0xbe, 0xf3, 0x74, 0x5e, 0xdc, 0xc0, 0xb2, 0x52, 0x43, - 0x57, 0xe3, 0x2e, 0x20, 0x79, 0x96, 0xc3, 0xa9, 0x46, 0x22, 0xd9, 0x61, 0x9e, 0x6d, 0xb2, 0x72, - 0x51, 0xff, 0x94, 0xc5, 0x67, 0x06, 0xab, 0x49, 0x49, 0xd6, 0x0c, 0x84, 0xfb, 0xa5, 0xec, 0x12, - 0x56, 0x84, 0xee, 0x55, 0xb5, 0xf8, 0x32, 0x48, 0x8d, 0xf9, 0x71, 0x1c, 0x2f, 0xbf, 0x7b, 0x0f, - 0x52, 0x23, 0xbb, 0x80, 0x85, 0x57, 0x1a, 0xc9, 0x4b, 0x6d, 0xf3, 0xd9, 0x26, 0x2b, 0x67, 0xf5, - 0x5f, 0x83, 0xad, 0xe1, 0xac, 0x37, 0xe4, 0xa3, 0x79, 0x1e, 0xcd, 0xbf, 0xf5, 0xad, 0x06, 0x56, - 0x47, 0x8e, 0xc7, 0x04, 0x83, 0x3d, 0xc1, 0x7c, 0x5c, 0x8e, 0x5d, 0xf1, 0x7d, 0xa4, 0x3c, 0xc1, - 0x5c, 0x5f, 0x1f, 0x92, 0x4d, 0x8c, 0xc5, 0xd1, 0xf6, 0x03, 0xca, 0xd6, 0xe8, 0x49, 0x4e, 0xa1, - 0xd1, 0xca, 0xef, 0x02, 0x06, 0xfc, 0xcf, 0xba, 0x3d, 0x4f, 0x57, 0xaa, 0xc6, 0x8f, 0xaf, 0xb2, - 0xe7, 0xfb, 0x4e, 0xf9, 0x3e, 0x34, 0xbc, 0x35, 0x5a, 0x8c, 0x29, 0x22, 0x49, 0x11, 0xd2, 0x2a, - 0x31, 0x25, 0x89, 0x34, 0x49, 0xc4, 0xab, 0xd9, 0xa6, 0x39, 0x89, 0x8f, 0xbb, 0xaf, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x54, 0xc6, 0xb6, 0x89, 0xd4, 0x01, 0x00, 0x00, - }, -} - -func init() { - yarpc.RegisterClientBuilder( - func(clientConfig transport.ClientConfig, structField reflect.StructField) RunwayOrchestratorYARPCClient { - return NewRunwayOrchestratorYARPCClient(clientConfig, v2.ClientBuilderOptions(clientConfig, structField)...) - }, - ) -} diff --git a/api/runway/orchestrator/proto/BUILD.bazel b/api/runway/proto/BUILD.bazel similarity index 54% rename from api/runway/orchestrator/proto/BUILD.bazel rename to api/runway/proto/BUILD.bazel index a22fa9f4..4fd905cf 100644 --- a/api/runway/orchestrator/proto/BUILD.bazel +++ b/api/runway/proto/BUILD.bazel @@ -3,38 +3,38 @@ load("@rules_go//proto:def.bzl", "go_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") exports_files( - ["orchestrator.proto"], + ["runway.proto"], visibility = ["//tool/proto:__pkg__"], ) proto_library( - name = "orchestratorpb_proto", - srcs = ["orchestrator.proto"], + name = "runwaypb_proto", + srcs = ["runway.proto"], visibility = ["//visibility:public"], ) # keep go_proto_library( - name = "orchestratorpb_go_proto", + name = "runwaypb_go_proto", compilers = [ "@rules_go//proto:go_proto", "@rules_go//proto:go_grpc_v2", ], - importpath = "github.com/uber/submitqueue/api/runway/orchestrator/proto", - proto = ":orchestratorpb_proto", + importpath = "github.com/uber/submitqueue/api/runway/proto", + proto = ":runwaypb_proto", visibility = ["//visibility:public"], ) go_library( name = "proto", - embed = [":orchestratorpb_go_proto"], - importpath = "github.com/uber/submitqueue/api/runway/orchestrator/proto", + embed = [":runwaypb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/proto", visibility = ["//visibility:public"], ) go_library( name = "protopb", - embed = [":orchestratorpb_go_proto"], - importpath = "github.com/uber/submitqueue/api/runway/orchestrator/protopb", + embed = [":runwaypb_go_proto"], + importpath = "github.com/uber/submitqueue/api/runway/protopb", visibility = ["//visibility:public"], ) diff --git a/api/runway/orchestrator/proto/orchestrator.proto b/api/runway/proto/runway.proto similarity index 79% rename from api/runway/orchestrator/proto/orchestrator.proto rename to api/runway/proto/runway.proto index 28301b75..99ac7222 100644 --- a/api/runway/orchestrator/proto/orchestrator.proto +++ b/api/runway/proto/runway.proto @@ -14,12 +14,12 @@ syntax = "proto3"; -package uber.runway.orchestrator; +package uber.runway; -option go_package = "github.com/uber/submitqueue/api/runway/orchestrator/protopb"; +option go_package = "github.com/uber/submitqueue/api/runway/protopb"; option java_multiple_files = true; -option java_outer_classname = "OrchestratorProto"; -option java_package = "com.uber.submitqueue.runway.orchestrator"; +option java_outer_classname = "RunwayProto"; +option java_package = "com.uber.submitqueue.runway"; // PingRequest is the request for the Ping method message PingRequest { @@ -39,8 +39,8 @@ message PingResponse { string hostname = 4; } -// RunwayOrchestrator provides the Runway orchestrator API. -service RunwayOrchestrator { +// Runway provides the Runway service API. +service Runway { // Ping returns a response indicating the service is alive rpc Ping(PingRequest) returns (PingResponse) {} } diff --git a/api/runway/orchestrator/protopb/BUILD.bazel b/api/runway/protopb/BUILD.bazel similarity index 80% rename from api/runway/orchestrator/protopb/BUILD.bazel rename to api/runway/protopb/BUILD.bazel index b6e7b6d2..4efca46a 100644 --- a/api/runway/orchestrator/protopb/BUILD.bazel +++ b/api/runway/protopb/BUILD.bazel @@ -3,11 +3,11 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "protopb", srcs = [ - "orchestrator.pb.go", - "orchestrator.pb.yarpc.go", - "orchestrator_grpc.pb.go", + "runway.pb.go", + "runway.pb.yarpc.go", + "runway_grpc.pb.go", ], - importpath = "github.com/uber/submitqueue/api/runway/orchestrator/protopb", + importpath = "github.com/uber/submitqueue/api/runway/protopb", visibility = ["//visibility:public"], deps = [ "@org_golang_google_grpc//:grpc", diff --git a/api/runway/orchestrator/protopb/orchestrator.pb.go b/api/runway/protopb/runway.pb.go similarity index 68% rename from api/runway/orchestrator/protopb/orchestrator.pb.go rename to api/runway/protopb/runway.pb.go index 52492096..2ecaba19 100644 --- a/api/runway/orchestrator/protopb/orchestrator.pb.go +++ b/api/runway/protopb/runway.pb.go @@ -16,7 +16,7 @@ // versions: // protoc-gen-go v1.36.10 // protoc v5.29.3 -// source: orchestrator.proto +// source: runway.proto package protopb @@ -47,7 +47,7 @@ type PingRequest struct { func (x *PingRequest) Reset() { *x = PingRequest{} - mi := &file_orchestrator_proto_msgTypes[0] + mi := &file_runway_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -59,7 +59,7 @@ func (x *PingRequest) String() string { func (*PingRequest) ProtoMessage() {} func (x *PingRequest) ProtoReflect() protoreflect.Message { - mi := &file_orchestrator_proto_msgTypes[0] + mi := &file_runway_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -72,7 +72,7 @@ func (x *PingRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. func (*PingRequest) Descriptor() ([]byte, []int) { - return file_orchestrator_proto_rawDescGZIP(), []int{0} + return file_runway_proto_rawDescGZIP(), []int{0} } func (x *PingRequest) GetMessage() string { @@ -99,7 +99,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_orchestrator_proto_msgTypes[1] + mi := &file_runway_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -111,7 +111,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_orchestrator_proto_msgTypes[1] + mi := &file_runway_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -124,7 +124,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_orchestrator_proto_rawDescGZIP(), []int{1} + return file_runway_proto_rawDescGZIP(), []int{1} } func (x *PingResponse) GetMessage() string { @@ -155,42 +155,42 @@ func (x *PingResponse) GetHostname() string { return "" } -var File_orchestrator_proto protoreflect.FileDescriptor +var File_runway_proto protoreflect.FileDescriptor -const file_orchestrator_proto_rawDesc = "" + +const file_runway_proto_rawDesc = "" + "\n" + - "\x12orchestrator.proto\x12\x18uber.runway.orchestrator\"'\n" + + "\frunway.proto\x12\vuber.runway\"'\n" + "\vPingRequest\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"\x85\x01\n" + "\fPingResponse\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\x12!\n" + "\fservice_name\x18\x02 \x01(\tR\vserviceName\x12\x1c\n" + "\ttimestamp\x18\x03 \x01(\x03R\ttimestamp\x12\x1a\n" + - "\bhostname\x18\x04 \x01(\tR\bhostname2m\n" + - "\x12RunwayOrchestrator\x12W\n" + - "\x04Ping\x12%.uber.runway.orchestrator.PingRequest\x1a&.uber.runway.orchestrator.PingResponse\"\x00B|\n" + - "(com.uber.submitqueue.runway.orchestratorB\x11OrchestratorProtoP\x01Z;github.com/uber/submitqueue/api/runway/orchestrator/protopbb\x06proto3" + "\bhostname\x18\x04 \x01(\tR\bhostname2G\n" + + "\x06Runway\x12=\n" + + "\x04Ping\x12\x18.uber.runway.PingRequest\x1a\x19.uber.runway.PingResponse\"\x00B\\\n" + + "\x1bcom.uber.submitqueue.runwayB\vRunwayProtoP\x01Z.github.com/uber/submitqueue/api/runway/protopbb\x06proto3" var ( - file_orchestrator_proto_rawDescOnce sync.Once - file_orchestrator_proto_rawDescData []byte + file_runway_proto_rawDescOnce sync.Once + file_runway_proto_rawDescData []byte ) -func file_orchestrator_proto_rawDescGZIP() []byte { - file_orchestrator_proto_rawDescOnce.Do(func() { - file_orchestrator_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_orchestrator_proto_rawDesc), len(file_orchestrator_proto_rawDesc))) +func file_runway_proto_rawDescGZIP() []byte { + file_runway_proto_rawDescOnce.Do(func() { + file_runway_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_runway_proto_rawDesc), len(file_runway_proto_rawDesc))) }) - return file_orchestrator_proto_rawDescData + return file_runway_proto_rawDescData } -var file_orchestrator_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_orchestrator_proto_goTypes = []any{ - (*PingRequest)(nil), // 0: uber.runway.orchestrator.PingRequest - (*PingResponse)(nil), // 1: uber.runway.orchestrator.PingResponse +var file_runway_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_runway_proto_goTypes = []any{ + (*PingRequest)(nil), // 0: uber.runway.PingRequest + (*PingResponse)(nil), // 1: uber.runway.PingResponse } -var file_orchestrator_proto_depIdxs = []int32{ - 0, // 0: uber.runway.orchestrator.RunwayOrchestrator.Ping:input_type -> uber.runway.orchestrator.PingRequest - 1, // 1: uber.runway.orchestrator.RunwayOrchestrator.Ping:output_type -> uber.runway.orchestrator.PingResponse +var file_runway_proto_depIdxs = []int32{ + 0, // 0: uber.runway.Runway.Ping:input_type -> uber.runway.PingRequest + 1, // 1: uber.runway.Runway.Ping:output_type -> uber.runway.PingResponse 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -198,26 +198,26 @@ var file_orchestrator_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_orchestrator_proto_init() } -func file_orchestrator_proto_init() { - if File_orchestrator_proto != nil { +func init() { file_runway_proto_init() } +func file_runway_proto_init() { + if File_runway_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_orchestrator_proto_rawDesc), len(file_orchestrator_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_runway_proto_rawDesc), len(file_runway_proto_rawDesc)), NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_orchestrator_proto_goTypes, - DependencyIndexes: file_orchestrator_proto_depIdxs, - MessageInfos: file_orchestrator_proto_msgTypes, + GoTypes: file_runway_proto_goTypes, + DependencyIndexes: file_runway_proto_depIdxs, + MessageInfos: file_runway_proto_msgTypes, }.Build() - File_orchestrator_proto = out.File - file_orchestrator_proto_goTypes = nil - file_orchestrator_proto_depIdxs = nil + File_runway_proto = out.File + file_runway_proto_goTypes = nil + file_runway_proto_depIdxs = nil } diff --git a/api/runway/protopb/runway.pb.yarpc.go b/api/runway/protopb/runway.pb.yarpc.go new file mode 100644 index 00000000..f5d6d74d --- /dev/null +++ b/api/runway/protopb/runway.pb.yarpc.go @@ -0,0 +1,254 @@ +// Code generated by protoc-gen-yarpc-go. DO NOT EDIT. +// source: runway.proto + +package protopb + +import ( + "context" + "io/ioutil" + "reflect" + + "go.uber.org/fx" + "go.uber.org/yarpc" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/api/x/restriction" + "go.uber.org/yarpc/encoding/protobuf/reflection" + v2 "go.uber.org/yarpc/encoding/protobuf/v2" + "google.golang.org/protobuf/proto" +) + +var _ = ioutil.NopCloser + +// RunwayYARPCClient is the YARPC client-side interface for the Runway service. +type RunwayYARPCClient interface { + Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) +} + +func newRunwayYARPCClient(clientConfig transport.ClientConfig, anyResolver v2.AnyResolver, options ...v2.ClientOption) RunwayYARPCClient { + return &_RunwayYARPCCaller{v2.NewStreamClient( + v2.ClientParams{ + ServiceName: "uber.runway.Runway", + ClientConfig: clientConfig, + AnyResolver: anyResolver, + Options: options, + }, + )} +} + +// NewRunwayYARPCClient builds a new YARPC client for the Runway service. +func NewRunwayYARPCClient(clientConfig transport.ClientConfig, options ...v2.ClientOption) RunwayYARPCClient { + return newRunwayYARPCClient(clientConfig, nil, options...) +} + +// RunwayYARPCServer is the YARPC server-side interface for the Runway service. +type RunwayYARPCServer interface { + Ping(context.Context, *PingRequest) (*PingResponse, error) +} + +type buildRunwayYARPCProceduresParams struct { + Server RunwayYARPCServer + AnyResolver v2.AnyResolver +} + +func buildRunwayYARPCProcedures(params buildRunwayYARPCProceduresParams) []transport.Procedure { + handler := &_RunwayYARPCHandler{params.Server} + return v2.BuildProcedures( + v2.BuildProceduresParams{ + ServiceName: "uber.runway.Runway", + UnaryHandlerParams: []v2.BuildProceduresUnaryHandlerParams{ + { + MethodName: "Ping", + Handler: v2.NewUnaryHandler( + v2.UnaryHandlerParams{ + Handle: handler.Ping, + NewRequest: newRunwayServicePingYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, + }, + OnewayHandlerParams: []v2.BuildProceduresOnewayHandlerParams{}, + StreamHandlerParams: []v2.BuildProceduresStreamHandlerParams{}, + }, + ) +} + +// BuildRunwayYARPCProcedures prepares an implementation of the Runway service for YARPC registration. +func BuildRunwayYARPCProcedures(server RunwayYARPCServer) []transport.Procedure { + return buildRunwayYARPCProcedures(buildRunwayYARPCProceduresParams{Server: server}) +} + +// FxRunwayYARPCClientParams defines the input +// for NewFxRunwayYARPCClient. It provides the +// paramaters to get a RunwayYARPCClient in an +// Fx application. +type FxRunwayYARPCClientParams struct { + fx.In + + Provider yarpc.ClientConfig + AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` + Restriction restriction.Checker `optional:"true"` +} + +// FxRunwayYARPCClientResult defines the output +// of NewFxRunwayYARPCClient. It provides a +// RunwayYARPCClient to an Fx application. +type FxRunwayYARPCClientResult struct { + fx.Out + + Client RunwayYARPCClient + + // We are using an fx.Out struct here instead of just returning a client + // so that we can add more values or add named versions of the client in + // the future without breaking any existing code. +} + +// NewFxRunwayYARPCClient provides a RunwayYARPCClient +// to an Fx application using the given name for routing. +// +// fx.Provide( +// protopb.NewFxRunwayYARPCClient("service-name"), +// ... +// ) +func NewFxRunwayYARPCClient(name string, options ...v2.ClientOption) interface{} { + return func(params FxRunwayYARPCClientParams) FxRunwayYARPCClientResult { + cc := params.Provider.ClientConfig(name) + + if params.Restriction != nil { + if namer, ok := cc.GetUnaryOutbound().(transport.Namer); ok { + if err := params.Restriction.Check(v2.Encoding, namer.TransportName()); err != nil { + panic(err.Error()) + } + } + } + + return FxRunwayYARPCClientResult{ + Client: newRunwayYARPCClient(cc, params.AnyResolver, options...), + } + } +} + +// FxRunwayYARPCProceduresParams defines the input +// for NewFxRunwayYARPCProcedures. It provides the +// paramaters to get RunwayYARPCServer procedures in an +// Fx application. +type FxRunwayYARPCProceduresParams struct { + fx.In + + Server RunwayYARPCServer + AnyResolver v2.AnyResolver `name:"yarpcfx" optional:"true"` +} + +// FxRunwayYARPCProceduresResult defines the output +// of NewFxRunwayYARPCProcedures. It provides +// RunwayYARPCServer procedures to an Fx application. +// +// The procedures are provided to the "yarpcfx" value group. +// Dig 1.2 or newer must be used for this feature to work. +type FxRunwayYARPCProceduresResult struct { + fx.Out + + Procedures []transport.Procedure `group:"yarpcfx"` + ReflectionMeta reflection.ServerMeta `group:"yarpcfx"` +} + +// NewFxRunwayYARPCProcedures provides RunwayYARPCServer procedures to an Fx application. +// It expects a RunwayYARPCServer to be present in the container. +// +// fx.Provide( +// protopb.NewFxRunwayYARPCProcedures(), +// ... +// ) +func NewFxRunwayYARPCProcedures() interface{} { + return func(params FxRunwayYARPCProceduresParams) FxRunwayYARPCProceduresResult { + return FxRunwayYARPCProceduresResult{ + Procedures: buildRunwayYARPCProcedures(buildRunwayYARPCProceduresParams{ + Server: params.Server, + AnyResolver: params.AnyResolver, + }), + ReflectionMeta: reflection.ServerMeta{ + ServiceName: "uber.runway.Runway", + FileDescriptors: yarpcFileDescriptorClosure156de782f65f3a00, + }, + } + } +} + +type _RunwayYARPCCaller struct { + streamClient v2.StreamClient +} + +func (c *_RunwayYARPCCaller) Ping(ctx context.Context, request *PingRequest, options ...yarpc.CallOption) (*PingResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Ping", request, newRunwayServicePingYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*PingResponse) + if !ok { + return nil, v2.CastError(emptyRunwayServicePingYARPCResponse, responseMessage) + } + return response, err +} + +type _RunwayYARPCHandler struct { + server RunwayYARPCServer +} + +func (h *_RunwayYARPCHandler) Ping(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *PingRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*PingRequest) + if !ok { + return nil, v2.CastError(emptyRunwayServicePingYARPCRequest, requestMessage) + } + } + response, err := h.server.Ping(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + +func newRunwayServicePingYARPCRequest() proto.Message { + return &PingRequest{} +} + +func newRunwayServicePingYARPCResponse() proto.Message { + return &PingResponse{} +} + +var ( + emptyRunwayServicePingYARPCRequest = &PingRequest{} + emptyRunwayServicePingYARPCResponse = &PingResponse{} +) + +var yarpcFileDescriptorClosure156de782f65f3a00 = [][]byte{ + // runway.proto + []byte{ + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0x31, 0x4f, 0xc3, 0x30, + 0x10, 0x85, 0x31, 0xad, 0x0a, 0xbd, 0x64, 0xf2, 0x64, 0x0a, 0x43, 0xc9, 0x42, 0x27, 0x5b, 0x82, + 0x99, 0xa5, 0x0b, 0x1b, 0x8a, 0x32, 0x22, 0x24, 0x64, 0x47, 0xa7, 0xd4, 0x83, 0x63, 0x37, 0xb6, + 0x41, 0xfc, 0x00, 0xfe, 0x37, 0x8a, 0x1d, 0x20, 0x03, 0xdd, 0x7c, 0xef, 0xde, 0x27, 0xbf, 0x7b, + 0x50, 0x0e, 0xb1, 0xff, 0x90, 0x9f, 0xdc, 0x0d, 0x36, 0x58, 0x5a, 0x44, 0x85, 0x03, 0xcf, 0x52, + 0x75, 0x07, 0x45, 0xad, 0xfb, 0xae, 0xc1, 0x63, 0x44, 0x1f, 0x28, 0x83, 0x0b, 0x83, 0xde, 0xcb, + 0x0e, 0x19, 0xd9, 0x92, 0xdd, 0xba, 0xf9, 0x19, 0xab, 0x2f, 0x02, 0x65, 0x76, 0x7a, 0x67, 0x7b, + 0x8f, 0xa7, 0xad, 0xf4, 0x16, 0x4a, 0x8f, 0xc3, 0xbb, 0x6e, 0xf1, 0xad, 0x97, 0x06, 0xd9, 0x79, + 0x5a, 0x17, 0x93, 0xf6, 0x2c, 0x0d, 0xd2, 0x1b, 0x58, 0x07, 0x6d, 0xd0, 0x07, 0x69, 0x1c, 0x5b, + 0x6c, 0xc9, 0x6e, 0xd1, 0xfc, 0x09, 0x74, 0x03, 0x97, 0x07, 0xeb, 0x43, 0x82, 0x97, 0x09, 0xfe, + 0x9d, 0xef, 0x9f, 0x60, 0xd5, 0xa4, 0xe8, 0xf4, 0x11, 0x96, 0x63, 0x20, 0xca, 0xf8, 0xec, 0x20, + 0x3e, 0xbb, 0x66, 0x73, 0xf5, 0xcf, 0x26, 0xa7, 0xaf, 0xce, 0xf6, 0xaf, 0x70, 0xdd, 0x5a, 0x93, + 0x1d, 0x3e, 0x2a, 0xa3, 0xc3, 0x31, 0x62, 0xc4, 0xc9, 0xbd, 0x2f, 0xf2, 0x2f, 0xf5, 0x58, 0x59, + 0x4d, 0x5e, 0x78, 0xa7, 0xc3, 0x21, 0x2a, 0xde, 0x5a, 0x23, 0x46, 0x44, 0xcc, 0x10, 0x21, 0x9d, + 0x16, 0x19, 0x13, 0xa9, 0x62, 0xa7, 0xd4, 0x2a, 0x3d, 0x1e, 0xbe, 0x03, 0x00, 0x00, 0xff, 0xff, + 0xdf, 0x6e, 0x26, 0x49, 0x7b, 0x01, 0x00, 0x00, + }, +} + +func init() { + yarpc.RegisterClientBuilder( + func(clientConfig transport.ClientConfig, structField reflect.StructField) RunwayYARPCClient { + return NewRunwayYARPCClient(clientConfig, v2.ClientBuilderOptions(clientConfig, structField)...) + }, + ) +} diff --git a/api/runway/orchestrator/protopb/orchestrator_grpc.pb.go b/api/runway/protopb/runway_grpc.pb.go similarity index 54% rename from api/runway/orchestrator/protopb/orchestrator_grpc.pb.go rename to api/runway/protopb/runway_grpc.pb.go index bb6c5fe5..818b2944 100644 --- a/api/runway/orchestrator/protopb/orchestrator_grpc.pb.go +++ b/api/runway/protopb/runway_grpc.pb.go @@ -16,7 +16,7 @@ // versions: // - protoc-gen-go-grpc v1.5.1 // - protoc v5.29.3 -// source: orchestrator.proto +// source: runway.proto package protopb @@ -34,109 +34,109 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - RunwayOrchestrator_Ping_FullMethodName = "/uber.runway.orchestrator.RunwayOrchestrator/Ping" + Runway_Ping_FullMethodName = "/uber.runway.Runway/Ping" ) -// RunwayOrchestratorClient is the client API for RunwayOrchestrator service. +// RunwayClient is the client API for Runway service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // -// RunwayOrchestrator provides the Runway orchestrator API. -type RunwayOrchestratorClient interface { +// Runway provides the Runway service API. +type RunwayClient interface { // Ping returns a response indicating the service is alive Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) } -type runwayOrchestratorClient struct { +type runwayClient struct { cc grpc.ClientConnInterface } -func NewRunwayOrchestratorClient(cc grpc.ClientConnInterface) RunwayOrchestratorClient { - return &runwayOrchestratorClient{cc} +func NewRunwayClient(cc grpc.ClientConnInterface) RunwayClient { + return &runwayClient{cc} } -func (c *runwayOrchestratorClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { +func (c *runwayClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(PingResponse) - err := c.cc.Invoke(ctx, RunwayOrchestrator_Ping_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, Runway_Ping_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } return out, nil } -// RunwayOrchestratorServer is the server API for RunwayOrchestrator service. -// All implementations must embed UnimplementedRunwayOrchestratorServer +// RunwayServer is the server API for Runway service. +// All implementations must embed UnimplementedRunwayServer // for forward compatibility. // -// RunwayOrchestrator provides the Runway orchestrator API. -type RunwayOrchestratorServer interface { +// Runway provides the Runway service API. +type RunwayServer interface { // Ping returns a response indicating the service is alive Ping(context.Context, *PingRequest) (*PingResponse, error) - mustEmbedUnimplementedRunwayOrchestratorServer() + mustEmbedUnimplementedRunwayServer() } -// UnimplementedRunwayOrchestratorServer must be embedded to have +// UnimplementedRunwayServer must be embedded to have // forward compatible implementations. // // NOTE: this should be embedded by value instead of pointer to avoid a nil // pointer dereference when methods are called. -type UnimplementedRunwayOrchestratorServer struct{} +type UnimplementedRunwayServer struct{} -func (UnimplementedRunwayOrchestratorServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { +func (UnimplementedRunwayServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } -func (UnimplementedRunwayOrchestratorServer) mustEmbedUnimplementedRunwayOrchestratorServer() {} -func (UnimplementedRunwayOrchestratorServer) testEmbeddedByValue() {} +func (UnimplementedRunwayServer) mustEmbedUnimplementedRunwayServer() {} +func (UnimplementedRunwayServer) testEmbeddedByValue() {} -// UnsafeRunwayOrchestratorServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to RunwayOrchestratorServer will +// UnsafeRunwayServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RunwayServer will // result in compilation errors. -type UnsafeRunwayOrchestratorServer interface { - mustEmbedUnimplementedRunwayOrchestratorServer() +type UnsafeRunwayServer interface { + mustEmbedUnimplementedRunwayServer() } -func RegisterRunwayOrchestratorServer(s grpc.ServiceRegistrar, srv RunwayOrchestratorServer) { - // If the following call pancis, it indicates UnimplementedRunwayOrchestratorServer was +func RegisterRunwayServer(s grpc.ServiceRegistrar, srv RunwayServer) { + // If the following call pancis, it indicates UnimplementedRunwayServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { t.testEmbeddedByValue() } - s.RegisterService(&RunwayOrchestrator_ServiceDesc, srv) + s.RegisterService(&Runway_ServiceDesc, srv) } -func _RunwayOrchestrator_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Runway_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PingRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(RunwayOrchestratorServer).Ping(ctx, in) + return srv.(RunwayServer).Ping(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: RunwayOrchestrator_Ping_FullMethodName, + FullMethod: Runway_Ping_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RunwayOrchestratorServer).Ping(ctx, req.(*PingRequest)) + return srv.(RunwayServer).Ping(ctx, req.(*PingRequest)) } return interceptor(ctx, in, info, handler) } -// RunwayOrchestrator_ServiceDesc is the grpc.ServiceDesc for RunwayOrchestrator service. +// Runway_ServiceDesc is the grpc.ServiceDesc for Runway service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) -var RunwayOrchestrator_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "uber.runway.orchestrator.RunwayOrchestrator", - HandlerType: (*RunwayOrchestratorServer)(nil), +var Runway_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "uber.runway.Runway", + HandlerType: (*RunwayServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Ping", - Handler: _RunwayOrchestrator_Ping_Handler, + Handler: _Runway_Ping_Handler, }, }, Streams: []grpc.StreamDesc{}, - Metadata: "orchestrator.proto", + Metadata: "runway.proto", } diff --git a/doc/rfc/runway/workflow.md b/doc/rfc/runway/workflow.md index 1f3588c2..8a180110 100644 --- a/doc/rfc/runway/workflow.md +++ b/doc/rfc/runway/workflow.md @@ -1,6 +1,6 @@ # Runway Workflow -Runway is the landing service: it owns VCS operations — mergeability checking and landing — on behalf of SubmitQueue. The orchestrator subscribes to two inbound topics (`merge-conflict-checker`, `merger`) and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It is a consumer-only service with no gateway; work arrives via topic queues and results leave via topic queues. +Runway is the landing service: it owns VCS operations — mergeability checking and landing — on behalf of SubmitQueue. Runway is a single service (the domain *is* the service): it subscribes to two inbound topics (`merge-conflict-checker`, `merger`) and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It is a consumer-only service with no gateway; work arrives via topic queues and results leave via topic queues. ## Merge-conflict check and merge @@ -67,9 +67,9 @@ Runway has no persistent state — no request store, no job store, no database. ## Ownership by service -### Orchestrator +### Runway -The orchestrator is the only service. It subscribes to two inbound topics (`merge-conflict-checker`, `merger`), performs VCS operations through a pluggable extension, and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It owns no persistent data. +Runway is a single service. It subscribes to two inbound topics (`merge-conflict-checker`, `merger`), performs VCS operations through a pluggable extension, and publishes results to two outbound topics (`merge-conflict-checker-signal`, `merger-signal`). It owns no persistent data. ### Shared: the messaging queue diff --git a/example/runway/orchestrator/client/BUILD.bazel b/example/runway/client/BUILD.bazel similarity index 58% rename from example/runway/orchestrator/client/BUILD.bazel rename to example/runway/client/BUILD.bazel index 5973e7f8..12dad7a9 100644 --- a/example/runway/orchestrator/client/BUILD.bazel +++ b/example/runway/client/BUILD.bazel @@ -1,19 +1,19 @@ load("@rules_go//go:def.bzl", "go_binary", "go_library") go_library( - name = "orchestrator_lib", + name = "client_lib", srcs = ["main.go"], - importpath = "github.com/uber/submitqueue/example/runway/orchestrator/client", + importpath = "github.com/uber/submitqueue/example/runway/client", visibility = ["//visibility:private"], deps = [ - "//api/runway/orchestrator/protopb", + "//api/runway/protopb", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//credentials/insecure", ], ) go_binary( - name = "orchestrator", - embed = [":orchestrator_lib"], + name = "runway", + embed = [":client_lib"], visibility = ["//visibility:public"], ) diff --git a/example/runway/orchestrator/client/main.go b/example/runway/client/main.go similarity index 88% rename from example/runway/orchestrator/client/main.go rename to example/runway/client/main.go index 730e642f..859eabfb 100644 --- a/example/runway/orchestrator/client/main.go +++ b/example/runway/client/main.go @@ -21,13 +21,13 @@ import ( "os" "time" - pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + pb "github.com/uber/submitqueue/api/runway/protopb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func main() { - addr := flag.String("addr", "localhost:8086", "orchestrator server address") + addr := flag.String("addr", "localhost:8086", "runway server address") message := flag.String("message", "", "message to send in ping request") timeout := flag.Duration("timeout", 5*time.Second, "request timeout") flag.Parse() @@ -50,7 +50,7 @@ func run(addr, message string, timeout time.Duration) error { defer conn.Close() // Create a client - client := pb.NewRunwayOrchestratorClient(conn) + client := pb.NewRunwayClient(conn) // Create context with timeout ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -61,7 +61,7 @@ func run(addr, message string, timeout time.Duration) error { Message: message, } - fmt.Printf("Sending ping to orchestrator at %s...\n", addr) + fmt.Printf("Sending ping to runway at %s...\n", addr) resp, err := client.Ping(ctx, req) if err != nil { return fmt.Errorf("ping failed: %w", err) diff --git a/example/runway/orchestrator/server/BUILD.bazel b/example/runway/server/BUILD.bazel similarity index 63% rename from example/runway/orchestrator/server/BUILD.bazel rename to example/runway/server/BUILD.bazel index a87f97cd..e8bb1200 100644 --- a/example/runway/orchestrator/server/BUILD.bazel +++ b/example/runway/server/BUILD.bazel @@ -1,22 +1,22 @@ load("@rules_go//go:def.bzl", "go_binary", "go_library") go_library( - name = "orchestrator_server_lib", + name = "server_lib", srcs = ["main.go"], - importpath = "github.com/uber/submitqueue/example/runway/orchestrator/server", + importpath = "github.com/uber/submitqueue/example/runway/server", visibility = ["//visibility:private"], deps = [ "//api/runway/messagequeue", - "//api/runway/orchestrator/protopb", + "//api/runway/protopb", "//platform/consumer", "//platform/errs", "//platform/errs/generic", "//platform/errs/mysql", "//platform/extension/messagequeue", "//platform/extension/messagequeue/mysql", - "//runway/orchestrator/controller", - "//runway/orchestrator/controller/merge", - "//runway/orchestrator/controller/mergeconflictcheck", + "//runway/controller", + "//runway/controller/merge", + "//runway/controller/mergeconflictcheck", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", @@ -26,7 +26,7 @@ go_library( ) go_binary( - name = "orchestrator", - embed = [":orchestrator_server_lib"], + name = "runway", + embed = [":server_lib"], visibility = ["//visibility:public"], ) diff --git a/example/runway/orchestrator/server/Dockerfile b/example/runway/server/Dockerfile similarity index 53% rename from example/runway/orchestrator/server/Dockerfile rename to example/runway/server/Dockerfile index dd982f9e..e70ebd77 100644 --- a/example/runway/orchestrator/server/Dockerfile +++ b/example/runway/server/Dockerfile @@ -3,9 +3,9 @@ FROM debian:bookworm-slim RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* WORKDIR /root/ -# Built via: make build-runway-orchestrator-linux -COPY .docker-bin/runway-orchestrator ./orchestrator +# Built via: make build-runway-linux +COPY .docker-bin/runway ./runway EXPOSE 8080 -CMD ["./orchestrator"] +CMD ["./runway"] diff --git a/example/runway/orchestrator/server/docker-compose.yml b/example/runway/server/docker-compose.yml similarity index 80% rename from example/runway/orchestrator/server/docker-compose.yml rename to example/runway/server/docker-compose.yml index 3864d95b..e900cb06 100644 --- a/example/runway/orchestrator/server/docker-compose.yml +++ b/example/runway/server/docker-compose.yml @@ -1,13 +1,13 @@ -# Docker Compose for Runway orchestrator manual testing +# Docker Compose for Runway manual testing # # # IMPORTANT: Before running compose, build the Linux binary: -# make build-runway-orchestrator-linux +# make build-runway-linux # OR -# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/orchestrator/server:orchestrator +# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/runway/server:runway # # Quick start: -# make local-runway-orchestrator-start +# make local-runway-start # # After `up`, only the queue schema is applied (`local-init-runway-queue-schema`). @@ -30,17 +30,17 @@ services: timeout: 5s retries: 10 - orchestrator-service: + runway-service: build: context: ${REPO_ROOT} - dockerfile: example/runway/orchestrator/server/Dockerfile + dockerfile: example/runway/server/Dockerfile ports: - "8080" # Random ephemeral port to avoid conflicts environment: - PORT=:8080 # Queue infrastructure connection - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true - - HOSTNAME=runway-orchestrator-dev + - HOSTNAME=runway-dev depends_on: mysql-queue: condition: service_healthy diff --git a/example/runway/orchestrator/server/main.go b/example/runway/server/main.go similarity index 85% rename from example/runway/orchestrator/server/main.go rename to example/runway/server/main.go index db3d0798..e5b4d485 100644 --- a/example/runway/orchestrator/server/main.go +++ b/example/runway/server/main.go @@ -29,29 +29,29 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" - pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + pb "github.com/uber/submitqueue/api/runway/protopb" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" genericerrs "github.com/uber/submitqueue/platform/errs/generic" mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql" extqueue "github.com/uber/submitqueue/platform/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql" - "github.com/uber/submitqueue/runway/orchestrator/controller" - "github.com/uber/submitqueue/runway/orchestrator/controller/merge" - "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck" + "github.com/uber/submitqueue/runway/controller" + "github.com/uber/submitqueue/runway/controller/merge" + "github.com/uber/submitqueue/runway/controller/mergeconflictcheck" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) -// OrchestratorServer wraps the controller and implements the gRPC service interface. -type OrchestratorServer struct { - pb.UnimplementedRunwayOrchestratorServer +// RunwayServer wraps the controller and implements the gRPC service interface. +type RunwayServer struct { + pb.UnimplementedRunwayServer pingController *controller.PingController } // Ping delegates to the controller. -func (s *OrchestratorServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { +func (s *RunwayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { return s.pingController.Ping(ctx, req) } @@ -59,13 +59,13 @@ func main() { code := 0 if err := run(); err != nil { if errors.Is(err, context.Canceled) { - fmt.Println("Runway orchestrator server stopped by signal") + fmt.Println("Runway server stopped by signal") // Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM, // but it will require a special processing not yet available in the standard library. code = 128 + int(syscall.SIGTERM) } else { - fmt.Fprintf(os.Stderr, "Runway orchestrator server failure: %v\n", err) + fmt.Fprintf(os.Stderr, "Runway server failure: %v\n", err) // TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything code = 1 } @@ -83,7 +83,7 @@ func run() error { } defer logger.Sync() - scope := tally.NewTestScope("runway_orchestrator", nil) + scope := tally.NewTestScope("runway", nil) metricsStopCh := make(chan interface{}, 1) metricsWgDone := sync.WaitGroup{} metricsWgDone.Add(1) @@ -137,7 +137,7 @@ func run() error { subscriberName := os.Getenv("HOSTNAME") if subscriberName == "" { - subscriberName = fmt.Sprintf("runway-orchestrator-%d", time.Now().Unix()) + subscriberName = fmt.Sprintf("runway-%d", time.Now().Unix()) } registry, err := newTopicRegistry(mysqlQueue, subscriberName) @@ -181,10 +181,10 @@ func run() error { grpcServer := grpc.NewServer() pingController := controller.NewPingController(logger, scope) - srv := &OrchestratorServer{ + srv := &RunwayServer{ pingController: pingController, } - pb.RegisterRunwayOrchestratorServer(grpcServer, srv) + pb.RegisterRunwayServer(grpcServer, srv) reflection.Register(grpcServer) @@ -197,7 +197,7 @@ func run() error { return fmt.Errorf("failed to listen on port %s: %w", port, err) } - fmt.Printf("Runway orchestrator gRPC server is running on %s\n", port) + fmt.Printf("Runway gRPC server is running on %s\n", port) fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") serverErrCh := make(chan error, 1) @@ -208,14 +208,14 @@ func run() error { var serverErr error select { case <-ctx.Done(): - fmt.Println("Shutting down runway orchestrator server due to interruption signal...") + fmt.Println("Shutting down runway server due to interruption signal...") err = ctx.Err() grpcServer.GracefulStop() serverErr = <-serverErrCh case serverErr = <-serverErrCh: - fmt.Println("Shutting down runway orchestrator server due to critical GRPC server error...") + fmt.Println("Shutting down runway server due to critical GRPC server error...") cancel() } diff --git a/runway/README.md b/runway/README.md index 294c8585..ed4882b7 100644 --- a/runway/README.md +++ b/runway/README.md @@ -5,6 +5,11 @@ Runway owns the merge queues defined by the external contract in requests, performs the work, and (eventually) publishes the result to the corresponding signal queue. SubmitQueue is a client of these queues. -Runway service layout: +Runway is a single service (the domain *is* the service); its controllers live directly under +[`controller/`](controller). It consumes Runway's merge queues: -- `orchestrator/` — Orchestrator service: consumes the merge-conflict-check and merge queues. +- `merge-conflict-check` — dry-run check that an ordered sequence of merge steps applies cleanly, without committing. +- `merge` — committing merge: apply and commit the ordered steps. + +Both controllers currently deserialize the `MergeRequest` off the queue and log it; performing the +merge and publishing a `MergeResult` to the corresponding signal queue is not wired yet. diff --git a/runway/orchestrator/controller/BUILD.bazel b/runway/controller/BUILD.bazel similarity index 77% rename from runway/orchestrator/controller/BUILD.bazel rename to runway/controller/BUILD.bazel index 4824beb7..640097c8 100644 --- a/runway/orchestrator/controller/BUILD.bazel +++ b/runway/controller/BUILD.bazel @@ -3,10 +3,10 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", srcs = ["ping.go"], - importpath = "github.com/uber/submitqueue/runway/orchestrator/controller", + importpath = "github.com/uber/submitqueue/runway/controller", visibility = ["//visibility:public"], deps = [ - "//api/runway/orchestrator/protopb", + "//api/runway/protopb", "//platform/metrics", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", @@ -18,7 +18,7 @@ go_test( srcs = ["ping_test.go"], embed = [":controller"], deps = [ - "//api/runway/orchestrator/protopb", + "//api/runway/protopb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/runway/orchestrator/controller/merge/BUILD.bazel b/runway/controller/merge/BUILD.bazel similarity index 90% rename from runway/orchestrator/controller/merge/BUILD.bazel rename to runway/controller/merge/BUILD.bazel index 96558ac2..3053e8f7 100644 --- a/runway/orchestrator/controller/merge/BUILD.bazel +++ b/runway/controller/merge/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "merge", srcs = ["merge.go"], - importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/merge", + importpath = "github.com/uber/submitqueue/runway/controller/merge", visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", diff --git a/runway/orchestrator/controller/merge/merge.go b/runway/controller/merge/merge.go similarity index 97% rename from runway/orchestrator/controller/merge/merge.go rename to runway/controller/merge/merge.go index 9de51680..d3f889ae 100644 --- a/runway/orchestrator/controller/merge/merge.go +++ b/runway/controller/merge/merge.go @@ -53,7 +53,7 @@ type Params struct { Logger *zap.SugaredLogger } -// NewController creates a new merge controller for the orchestrator. +// NewController creates a new merge controller for the runway service. func NewController(p Params) *Controller { return &Controller{ logger: p.Logger.Named("merge_controller"), diff --git a/runway/orchestrator/controller/merge/merge_test.go b/runway/controller/merge/merge_test.go similarity index 100% rename from runway/orchestrator/controller/merge/merge_test.go rename to runway/controller/merge/merge_test.go diff --git a/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel b/runway/controller/mergeconflictcheck/BUILD.bazel similarity index 89% rename from runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel rename to runway/controller/mergeconflictcheck/BUILD.bazel index 3e810ab6..6fa3c304 100644 --- a/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel +++ b/runway/controller/mergeconflictcheck/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "mergeconflictcheck", srcs = ["mergeconflictcheck.go"], - importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck", + importpath = "github.com/uber/submitqueue/runway/controller/mergeconflictcheck", visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/controller/mergeconflictcheck/mergeconflictcheck.go similarity index 99% rename from runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go rename to runway/controller/mergeconflictcheck/mergeconflictcheck.go index 73ec4736..689a64e9 100644 --- a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go +++ b/runway/controller/mergeconflictcheck/mergeconflictcheck.go @@ -52,7 +52,7 @@ type Params struct { Logger *zap.SugaredLogger } -// NewController creates a new merge-conflict-check controller for the orchestrator. +// NewController creates a new merge-conflict-check controller for the runway service. func NewController(p Params) *Controller { return &Controller{ logger: p.Logger.Named("mergeconflictcheck_controller"), diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go similarity index 100% rename from runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go rename to runway/controller/mergeconflictcheck/mergeconflictcheck_test.go diff --git a/runway/orchestrator/controller/ping.go b/runway/controller/ping.go similarity index 86% rename from runway/orchestrator/controller/ping.go rename to runway/controller/ping.go index 8a913f19..b7b59f5e 100644 --- a/runway/orchestrator/controller/ping.go +++ b/runway/controller/ping.go @@ -20,18 +20,18 @@ import ( "time" "github.com/uber-go/tally" - pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + pb "github.com/uber/submitqueue/api/runway/protopb" "github.com/uber/submitqueue/platform/metrics" "go.uber.org/zap" ) -// PingController handles ping business logic for the Runway orchestrator. +// PingController handles ping business logic for the Runway service. type PingController struct { logger *zap.Logger metricsScope tally.Scope } -// NewPingController creates a new instance of the Runway orchestrator ping controller. +// NewPingController creates a new instance of the Runway ping controller. func NewPingController(logger *zap.Logger, scope tally.Scope) *PingController { return &PingController{ logger: logger, @@ -64,7 +64,7 @@ func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (resp *p return &pb.PingResponse{ Message: message, - ServiceName: "runway-orchestrator", + ServiceName: "runway", Timestamp: time.Now().Unix(), Hostname: hostname, }, nil diff --git a/runway/orchestrator/controller/ping_test.go b/runway/controller/ping_test.go similarity index 93% rename from runway/orchestrator/controller/ping_test.go rename to runway/controller/ping_test.go index e2950804..50fe79f6 100644 --- a/runway/orchestrator/controller/ping_test.go +++ b/runway/controller/ping_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" - pb "github.com/uber/submitqueue/api/runway/orchestrator/protopb" + pb "github.com/uber/submitqueue/api/runway/protopb" "go.uber.org/zap" ) @@ -50,7 +50,7 @@ func TestPing_ServiceName(t *testing.T) { resp, err := ctrl.Ping(ctx, req) require.NoError(t, err) - assert.Equal(t, "runway-orchestrator", resp.ServiceName) + assert.Equal(t, "runway", resp.ServiceName) } func TestPing_Timestamp(t *testing.T) { diff --git a/runway/orchestrator/README.md b/runway/orchestrator/README.md deleted file mode 100644 index f85fb180..00000000 --- a/runway/orchestrator/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Runway Orchestrator - -Consumes Runway's merge queues (defined in [`api/runway/messagequeue`](../../api/runway/messagequeue)): - -- `merge-conflict-check` — dry-run check that an ordered sequence of merge steps applies cleanly, without committing. -- `merge` — committing merge: apply and commit the ordered steps. - -Both controllers currently deserialize the `MergeRequest` off the queue and log it; performing the -merge and publishing a `MergeResult` to the corresponding signal queue is not wired yet. diff --git a/tool/proto/BUILD.bazel b/tool/proto/BUILD.bazel index 8ef56571..96d3fd25 100644 --- a/tool/proto/BUILD.bazel +++ b/tool/proto/BUILD.bazel @@ -36,9 +36,9 @@ go_proto_generated_files( ) go_proto_generated_files( - name = "api_runway_orchestrator", - srcs = ["//api/runway/orchestrator/proto:orchestrator.proto"], - out_dir = "api_runway_orchestrator", + name = "api_runway", + srcs = ["//api/runway/proto:runway.proto"], + out_dir = "api_runway", ) go_proto_generated_files( @@ -76,8 +76,8 @@ filegroup( ":api_base_change", ":api_base_mergestrategy", ":api_base_messagequeue", + ":api_runway", ":api_runway_messagequeue", - ":api_runway_orchestrator", ":api_stovepipe_gateway", ":api_stovepipe_orchestrator", ":api_submitqueue_gateway",