Skip to content

Migrate stream gRPC protos to LightProto#4783

Merged
merlimat merged 3 commits intoapache:masterfrom
merlimat:lightproto-stream-grpc
May 8, 2026
Merged

Migrate stream gRPC protos to LightProto#4783
merlimat merged 3 commits intoapache:masterfrom
merlimat:lightproto-stream-grpc

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 8, 2026

Summary

Migrates the remaining stream protos (the gRPC service ones plus everything they import) from protobuf-java to LightProto. After this PR, the entire bookkeeper repo runs without com.google.protobuf:protobuf-java (combined with #4779 / #4780 / #4781).

The lightproto-maven-plugin produces both messages and gRPC service stubs natively, so protobuf-maven-plugin and protoc-gen-grpc-java are removed — same pattern oxia-java and pulsar-functions use.

Proto changes

  • stream/proto/src/main/proto/: cluster, common, kv, kv_rpc, kv_store, storage, stream — all generated by lightproto.
  • stream/tests-common/src/main/proto/rpc.proto: same.
  • stream/tests-common/src/main/proto/proto2_coder_test_messages.proto: deleted (proto2 `extensions`, unsupported by lightproto, dead code with zero Java references).

API translations

Standard lightproto patterns applied across ~100 Java files in `stream/{api, statelib, storage/api, storage/impl, clients/java/{base, kv, all}, server, common, tests-common}`, `tools/stream`, and `tests/integration/cluster`:

  • `Foo.newBuilder().setX(v).build()` → `new Foo().setX(v)` (no `.build()`)
  • `Foo.newBuilder(other)` → `new Foo().copyFrom(other)`
  • nested message setters: `outer.setInner(inner)` → `outer.setInner().copyFrom(inner)`
  • repeated message setters: `addRequests(req)` → `addRequest().copyFrom(req)`
  • bytes: `UnsafeByteOperations.unsafeWrap(b)` → just `b`; `getX().toByteArray()` → just `getX()`
  • parsing: static `Foo.parseFrom(byte[])` → `Foo f = new Foo(); f.parseFrom(b)`
  • getter renames (`getRoEndpointCount` → `getRoEndpointsCount`, `getRequests(i)` → `getRequestAt(i)`)
  • primitive boolean accessors `getX()` → `isX()`
  • `InvalidProtocolBufferException` / `CodedOutputStream` removed
  • enum `UNRECOGNIZED` checks dropped

FutureStub adapters

LightProto's gRPC generator emits async + blocking stubs but not `*FutureStub`. Where existing client code used `ListenableFuture`-based APIs, four hand-written adapters were added under `stream/clients/java/base/.../clients/grpc/`:

  • `RootRangeServiceFutureStub`
  • `MetaRangeServiceFutureStub`
  • `StorageContainerServiceFutureStub`
  • `TableServiceFutureStub`

Each extends `AbstractStub` and calls `ClientCalls.futureUnaryCall` against the lightproto-generated `MethodDescriptor`s.

Test plan

  • `mvn install -DskipTests -pl '!native-io'` succeeds end-to-end.
  • `mvn checkstyle:check`, `apache-rat:check`, `spotbugs:check`, `spotless:check` clean for stream + tools modules.
  • Unit tests pass for `stream/proto`, `stream/statelib`, `stream/storage/impl`, `stream/clients/java/{base,kv,all}`, `stream/server`.
  • Integration tests (CI).

Behavioural note

A handful of tests previously relied on protobuf-java's gRPC in-process transport passing message references by identity (`assertSame(req, receivedReq)` / `assertTrue(a == b)`). LightProto's gRPC marshaller always serializes/deserializes (matching oxia/pulsar), so those checks were switched to `.equals()` / `assertEquals`.

Migrates the remaining stream protos (the ones with gRPC services and
their imports) from `protobuf-java` to LightProto:

- `stream/proto/src/main/proto/`: cluster, common, kv, kv_rpc, kv_store,
  storage, stream — all switched to lightproto-only generation. The
  protobuf-maven-plugin (and protoc-gen-grpc-java) are removed; the
  lightproto plugin produces both message classes and gRPC service stubs
  natively (the same pattern oxia-java uses).
- `stream/tests-common/src/main/proto/rpc.proto`: same migration. The
  unused `proto2_coder_test_messages.proto` is deleted (it relied on
  proto2 `extensions` which lightproto doesn't support, and had no Java
  references).

Java sources and tests across stream/{api,statelib,storage/api,storage/impl,
clients/java/{base,kv,all},server,common,tests-common}, tools/stream and
tests/integration/cluster were updated to the lightproto API:

- `Foo.newBuilder().setX(v).build()` → `new Foo().setX(v)` (no .build())
- `Foo.newBuilder(other)` → `new Foo().copyFrom(other)`
- nested-message setters: `outer.setInner(inner)` → `outer.setInner().copyFrom(inner)`
- repeated-message setters: `addRequests(req)` → `addRequest().copyFrom(req)`
  (singular `addX()` returns the new instance to mutate)
- bytes: `UnsafeByteOperations.unsafeWrap(b)` → just `b`; `getX().toByteArray()`
  → just `getX()` (returns byte[] directly)
- parsing: static `Foo.parseFrom(byte[])` → `Foo f = new Foo(); f.parseFrom(b)`
- `getXMap()` / `getXList()` getter renames where lightproto uses different
  pluralisation (e.g. `getRoEndpointCount` → `getRoEndpointsCount`,
  `getRequests(i)` → `getRequestAt(i)`)
- boolean accessors `getX()` → `isX()` for primitive booleans
- enums: drop `UNRECOGNIZED` checks (lightproto enums don't have it)
- `InvalidProtocolBufferException` / `CodedOutputStream` removed; lightproto
  parses to/from `ByteBuf` directly and throws `RuntimeException` on bad input

LightProto's plugin only emits async + blocking stubs — it does not emit
`*FutureStub`. Where the existing client code relied on `ListenableFuture`
APIs, four hand-written `*FutureStub` adapters were added under
`stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/grpc/`
(`RootRangeServiceFutureStub`, `MetaRangeServiceFutureStub`,
`StorageContainerServiceFutureStub`, `TableServiceFutureStub`). They wrap
`AbstractStub` and call `ClientCalls.futureUnaryCall` against the
lightproto-generated `MethodDescriptor`s, so the rest of the client code
needed only an import swap.

A handful of tests had `assertSame(req, receivedReq)` /
`assertTrue(a == b)` checks that previously worked because gRPC's
in-process transport happened to pass protobuf-java messages by reference.
LightProto's gRPC marshaller always serializes/deserializes (matching the
oxia/pulsar implementation), so those identity checks were switched to
`assertEquals` / `.equals(...)`.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

merlimat added 2 commits May 8, 2026 12:56
`bkctl table put foo bar` and `Table#put(key, value)` route requests by
passing the key as both the partition key and the local key. The simple
table flow ends up storing the *same* ByteBuf reference in
`request.key` and `request.header.rKey`. LightProto's serializer for
`bytes` fields backed by ByteBuf calls `dst.writeBytes(src)`, which
advances `src`'s readerIndex; once the request body's `key` field has
been written, the same ByteBuf has zero readable bytes left, so when
the routing header's `rKey` field is serialized next it writes nothing.

The routing header therefore reaches the server with an empty rKey,
the put goes to the wrong storage container, and a cross-language
reader (e.g. the Python integration test) computing the correct
routing key against the actual key bytes finds no value.

Fix: pass `pKey.slice()` to `RoutingHeader#setRKey` in both
`PByteBufSimpleTableImpl` and `PByteBufTableRangeImpl` so the routing
header gets its own readerIndex independent of any other ByteBuf
fields aliasing the same buffer.
The previous fix only sliced pKey before setRKey. There's a second
manifestation of the same underlying issue: the integration test
TableClientTest reuses the same lKey/value ByteBuf across two
consecutive put() calls. After the first request is serialized,
lightproto's writeBytes(src) has consumed lKey's readerIndex, so the
second request's setKey(lKey) records _keyLen = 0, and the wire format
omits the key field entirely. The second put silently writes under an
empty key, prevKv comes back null, and the test fails.

Wrap every user-supplied ByteBuf with .slice() inside KvUtils before
handing it to a lightproto setter (newPutRequest, newRangeRequest,
newDeleteRequest, newIncrementRequest, populateProtoCompare,
populateProtoPutRequest, populateProtoDeleteRequest,
populateProtoRangeRequest). The slice gives lightproto its own
readerIndex over the same backing data, so mutating it during
serialization doesn't affect the caller's buffer or any other field
backed by the same ByteBuf.

The right long-term fix is in lightproto itself — setX(ByteBuf) should
either slice internally or use the non-mutating dst.writeBytes(src,
idx, len). Worth filing upstream.
@merlimat merlimat merged commit 9633bb7 into apache:master May 8, 2026
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants