Ext_proc filter#12792
Conversation
…estsAreForwardedImmediately to use in-process server
…roc response but only sent a mode override. As per envoy ext-processing ext-proc cannot omit request_headers (even if empty) in response to a request_headers processing request.
… track close already called for handling immediate response.
…lizingExecutor to use real dataPlaneChannel
…ubHasCorrectDeadline to use real dataPlaneChannel
…xtProcStreamSendsMetadata to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel and ServerInterceptor
…entToExtProc to use real dataPlaneChannel
…henMessagesAreDiscarded to use real dataPlaneChannel
…ExtProcAndSuperHalfCloseIsDeferred to use real dataPlaneChannel
…nSuperHalfCloseIsCalled to use real dataPlaneChannel
…False to use real dataPlaneChannel
…sTrue to use real dataPlaneChannel
…OnReady to use real dataPlaneChannel
…Ready to use real dataPlaneChannel
…stsAreForwardedImmediately to use real dataPlaneChannel
…sHeadersAndCallIsBuffered to use real dataPlaneChannel
…henMutationsAreAppliedAndCallIsActivated to use real dataPlaneChannel
…sActivatedImmediately to use real dataPlaneChannel
…entToExtProc to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
…eCallFailsOpen to use real dataPlaneChannel
…lizingExecutor to use real dataPlaneChannel
…henMutatedBodyIsForwardedToDataPlane to use real dataPlaneChannel
Implemented a FIFO queue in ExternalProcessorFilter to ensure that responses from the external processor server arrive in the same order as the events sent by the filter, as required by gRFC A93. Added unit tests to verify that out-of-order responses correctly trigger a protocol error and fail the stream.
- Added rejection of CONTINUE_AND_REPLACE status in HeadersResponse for both request and response headers, treating it as a stream failure. - Fixed potential hangs by ensuring proceedWithClose() is called upon stream failure, especially in fail-open scenarios. - Added explicit sidecar notification via requestStream.onError() upon detecting protocol errors to ensure robust stream termination. - Added new unit test categories 17 in ExternalProcessorFilterTest to verify status enforcement.
Updated ExternalProcessorFilter to include the `protocol_config` field in the very first `ProcessingRequest` sent to the sidecar (RequestHeaders). The configuration includes the `request_body_mode` and `response_body_mode` derived from the filter's processing mode, as required by gRFC A93. Added a unit test in Category 4 to verify that `protocol_config` is correctly populated on the first message and omitted from all subsequent messages on the stream. Add tests for trailers HeaderSendMode default to SKIP for DEFAULT. nit: Renumbered out of order test categories.
…ment rather than a granular merge of its fields.
# Conflicts: # xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutationsTest.java
# Conflicts: # xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
Fix style and warning as errors.
|
Reviewed approximately ~500 LOC of source(not tests), which currently mostly only covers the config part of things. Sending comments incrementally to kick start progress, since it seems that I'll need probably about 3 days to review all of source. |
| MethodDescriptor<ReqT, RespT> method, | ||
| CallOptions callOptions, | ||
| Channel next) { | ||
| SerializingExecutor serializingExecutor = new SerializingExecutor(callOptions.getExecutor()); |
There was a problem hiding this comment.
We have a potential null pointer crash here. getExecutor is nullable and SerializingExecutor expects null.
Side question: Do we have some static analysis that can catch these things?
There was a problem hiding this comment.
No, this was discussed at length offline in the context of go/safeguarding-channel-executor-for-interceptor. That design was about providing a wrapped executor for any interceptor, xds or not. Then we decided to focus on only xds interceptors for the current needs, and for xds calls, ConfigSelectingClientCall is involved and it always populates the executor in the call options.
| cachedChannelManager.getChannel(filterConfig.grpcServiceConfig)) | ||
| .withExecutor(serializingExecutor); | ||
|
|
||
| if (filterConfig.grpcServiceConfig.timeout() != null |
There was a problem hiding this comment.
I couldn't find an explicit style guide around it (please add if you find one) , but I believe java prefers data members to be private and be accessed via getters.
| cachedChannelManager.getChannel(filterConfig.grpcServiceConfig)) | ||
| .withExecutor(serializingExecutor); | ||
|
|
||
| if (filterConfig.grpcServiceConfig.timeout() != null |
There was a problem hiding this comment.
IIRC GrpcServiceConfig is an autovalue which has non nullable as precondition unless specified otherwise. So, this check might not be necessary.
Additionally, nullable optionals should be avoided go/java-practices/optional#nullable
There was a problem hiding this comment.
Auto value initializes Duration to Optional.empty() so it is not an Optional that can hold nullable. The null check here is redundant, removed that.
|
|
||
| if (filterConfig.grpcServiceConfig.timeout() != null | ||
| && filterConfig.grpcServiceConfig.timeout().isPresent()) { | ||
| long timeoutNanos = filterConfig.grpcServiceConfig.timeout().get().toNanos(); |
There was a problem hiding this comment.
nit: Might be worth moving to a variable
There was a problem hiding this comment.
Not feeling strongly to to do it, creating a variable for use in such a small scope block.
| } | ||
|
|
||
| ImmutableList<HeaderValue> initialMetadata = filterConfig.grpcServiceConfig.initialMetadata(); | ||
| extProcStub = extProcStub.withInterceptors(new ClientInterceptor() { |
There was a problem hiding this comment.
So, maybe we can use MetadataUtils.newAttachHeadersInterceptor(parsedMetadata).
This may have a few advantages:
- Reduce code bloat
- Reduce computation in datapath. We create a new Metadata object whenever we create an interceptor moving the expensive operation to control path.
There was a problem hiding this comment.
Good point, didn't know about this until method.
| io.envoyproxy.envoy.config.core.v3.HeaderValue headerValue = | ||
| io.envoyproxy.envoy.config.core.v3.HeaderValue.newBuilder() | ||
| .setKey(key.toLowerCase(Locale.ROOT)) | ||
| .setValue(encoded) |
There was a problem hiding this comment.
I vaguely recall that we were supposed to use rawValue for binary data and value for ascii.
I recall that we still had some confusion around that . I don't know what the latest here is?
There was a problem hiding this comment.
You are right, fixed it now.
| Metadata.Key<byte[]> key; | ||
| try { | ||
| key = Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER); | ||
| } catch (IllegalArgumentException e) { |
There was a problem hiding this comment.
Should we be catching this ?
- Most usages in codebase currently don't catch this.
- If we need it for validation to catch and ignore valid ones, we might be better off validating headers ourselves according to spec before calling this. There may still be a gap where our validation is a subset of Metadata key validation and we may crash on bad input, which may be acceptable and we need to update either our validation utilitity or the ext proc header validation spec.
There was a problem hiding this comment.
Removed it, I don't think we need to validate application set headers for violations.
| } | ||
| java.util.List<String> encoded = new ArrayList<>(); | ||
| for (byte[] v : values) { | ||
| encoded.add(BaseEncoding.base64().omitPadding().encode(v)); |
There was a problem hiding this comment.
There's a bit of inconcistency here, we use omitPadding here but don't use it in ToHeaderdmap
There was a problem hiding this comment.
I have refactored toHeaderMap to use setRawValue(ByteString) for binary headers, which sends raw bytes instead of a base64 string.
| for (byte[] v : values) { | ||
| encoded.add(BaseEncoding.base64().omitPadding().encode(v)); | ||
| } | ||
| return com.google.common.base.Joiner.on(",").join(encoded); |
There was a problem hiding this comment.
nit: Maybe worth adding an import for this and for the java util List
|
Reviewed about another 300 LOC, which covers static utilities and the interceptors. Speed was slower than expected, but will try to catch up. |
|
|
||
| private final ExternalProcessor externalProcessor; | ||
| private final GrpcServiceConfig grpcServiceConfig; | ||
| private final Optional<HeaderMutationRulesConfig> mutationRulesConfig; |
There was a problem hiding this comment.
We should initialise it to empty by default optional nullables are usually considered a bad practice.
There was a problem hiding this comment.
It is initialized to Optional.empty() when there are no header mutation rules. See caller site passing it as Optional.ofNullable(mutationRulesConfig)
c75ba11 to
64010fd
Compare
Implements ext_proc filter from A93 (internal design doc)
Metrics aren't implemented yet.
Includes commits from unmerged Filter API enhancements and channel caching PRs.
Only the ExternaProcessingFilter.java, ExternaProcessingFilterTest.java and the envoy xds proto import and generated code need to be reviewed.
Rebasing commit history caused all received and merged commits to show my name as the committer, ignore all commits for which I'm not shown as the author.