From 0e943f2ad5d8126e3094da491d959bbe1fed454b Mon Sep 17 00:00:00 2001 From: Pratyay Date: Wed, 24 Jun 2026 18:40:18 +0530 Subject: [PATCH] Add mcp-resilience4j module with transport-level resilience MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new optional mcp-resilience4j module that wraps any McpClientTransport with configurable Resilience4j policies, making MCP tool calls resilient to transient failures, slow servers, and traffic spikes. ResilientMcpClientTransport implements McpClientTransport and applies up to five policies in the standard recommended order: Retry -> CircuitBreaker -> RateLimiter -> TimeLimiter -> Bulkhead All policies are optional. sendMessage() applies all five. connect() applies only CircuitBreaker and Retry — session establishment is not throttled or timed out. McpResilienceConfig provides a high-level fluent facade over the builder for the common configuration case. Includes fail-fast null validation on all setters and a WARN log when a registry name collision would silently discard a supplied config. Circuit breaker state transitions and retry events are logged automatically via Resilience4j event publishers at construction time. Also includes 13 unit tests and a README covering usage, policy ordering rationale, registry guidance, and observability. Bumps project version to 2.1.1-SNAPSHOT. --- .../client-jdk-http-client/pom.xml | 4 +- .../client-spring-http-client/pom.xml | 2 +- conformance-tests/pom.xml | 2 +- conformance-tests/server-servlet/pom.xml | 4 +- mcp-bom/pom.xml | 9 +- mcp-core/pom.xml | 2 +- mcp-json-jackson2/pom.xml | 4 +- mcp-json-jackson3/pom.xml | 4 +- mcp-resilience4j/README.md | 239 +++++++ mcp-resilience4j/pom.xml | 193 ++++++ .../resilience/McpResilienceConfig.java | 335 ++++++++++ .../ResilientMcpClientTransport.java | 607 ++++++++++++++++++ .../ResilientMcpClientTransportTests.java | 213 ++++++ mcp-test/pom.xml | 8 +- mcp/pom.xml | 6 +- pom.xml | 3 +- 16 files changed, 1615 insertions(+), 20 deletions(-) create mode 100644 mcp-resilience4j/README.md create mode 100644 mcp-resilience4j/pom.xml create mode 100644 mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java create mode 100644 mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java create mode 100644 mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java diff --git a/conformance-tests/client-jdk-http-client/pom.xml b/conformance-tests/client-jdk-http-client/pom.xml index e09d565c5..3e5cce5a8 100644 --- a/conformance-tests/client-jdk-http-client/pom.xml +++ b/conformance-tests/client-jdk-http-client/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT client-jdk-http-client jar @@ -28,7 +28,7 @@ io.modelcontextprotocol.sdk mcp - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/conformance-tests/client-spring-http-client/pom.xml b/conformance-tests/client-spring-http-client/pom.xml index cbf0d1970..a3bd41d08 100644 --- a/conformance-tests/client-spring-http-client/pom.xml +++ b/conformance-tests/client-spring-http-client/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT client-spring-http-client jar diff --git a/conformance-tests/pom.xml b/conformance-tests/pom.xml index 9512ddd34..d9d090c3c 100644 --- a/conformance-tests/pom.xml +++ b/conformance-tests/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT conformance-tests pom diff --git a/conformance-tests/server-servlet/pom.xml b/conformance-tests/server-servlet/pom.xml index 17b38542b..7ca06bbd9 100644 --- a/conformance-tests/server-servlet/pom.xml +++ b/conformance-tests/server-servlet/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk conformance-tests - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT server-servlet jar @@ -28,7 +28,7 @@ io.modelcontextprotocol.sdk mcp - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml index 180dde0ef..32158fec0 100644 --- a/mcp-bom/pom.xml +++ b/mcp-bom/pom.xml @@ -7,7 +7,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-bom @@ -61,6 +61,13 @@ ${project.version} + + + io.modelcontextprotocol.sdk + mcp-resilience4j + ${project.version} + + diff --git a/mcp-core/pom.xml b/mcp-core/pom.xml index 4eabb8ec2..5a3c66d61 100644 --- a/mcp-core/pom.xml +++ b/mcp-core/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-core jar diff --git a/mcp-json-jackson2/pom.xml b/mcp-json-jackson2/pom.xml index 4ecbf98b4..89294e3c8 100644 --- a/mcp-json-jackson2/pom.xml +++ b/mcp-json-jackson2/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-json-jackson2 jar @@ -74,7 +74,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT com.networknt diff --git a/mcp-json-jackson3/pom.xml b/mcp-json-jackson3/pom.xml index 4f4c9ad1f..dae101cf7 100644 --- a/mcp-json-jackson3/pom.xml +++ b/mcp-json-jackson3/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-json-jackson3 jar @@ -68,7 +68,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT tools.jackson.core diff --git a/mcp-resilience4j/README.md b/mcp-resilience4j/README.md new file mode 100644 index 000000000..81e48d2a1 --- /dev/null +++ b/mcp-resilience4j/README.md @@ -0,0 +1,239 @@ +# mcp-resilience4j + +Resilience4j integration for the Java MCP SDK. Wraps any `McpClientTransport` with configurable circuit breaking, retry, rate limiting, time limiting, and bulkhead policies to make MCP tool calls resilient to transient failures, slow servers, and traffic spikes. + +## Overview + +MCP tool calls cross a network. Without resilience: + +- A slow server blocks a thread indefinitely +- A flaky server causes cascading failures upstream +- A burst of parallel agent calls can overwhelm a rate-limited endpoint +- One failing server keeps being called even though it cannot recover by itself + +`mcp-resilience4j` addresses all of these at the **transport level** — the single integration point exposed by the MCP SDK and frameworks like Google ADK. Because one transport wraps one MCP server connection, the policies are effectively per-server and composable across multiple clients. + +## Maven Dependency + +```xml + + io.modelcontextprotocol.sdk + mcp-resilience4j + 2.1.1-SNAPSHOT + +``` + +Or via the BOM: + +```xml + + + + io.modelcontextprotocol.sdk + mcp-bom + 2.1.1-SNAPSHOT + pom + import + + + +``` + +## Quick Start + +### High-level facade: `McpResilienceConfig` + +```java +McpResilienceConfig config = McpResilienceConfig.builder() + .transportCircuitBreaker(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .transportRetry(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .transportTimeLimiter(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .build(); + +McpClientTransport resilientTransport = config.wrapTransport(rawTransport); +McpAsyncClient client = McpClient.async(resilientTransport).build(); +``` + +### Direct builder: `ResilientMcpClientTransport` + +For full control over all five policies: + +```java +McpClientTransport resilientTransport = ResilientMcpClientTransport.builder(rawTransport) + .circuitBreakerConfig(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .retryConfig(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .timeLimiterConfig(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .rateLimiterConfig(RateLimiterConfig.custom() + .limitForPeriod(20) + .limitRefreshPeriod(Duration.ofSeconds(1)) + .build()) + .bulkheadConfig(BulkheadConfig.custom() + .maxConcurrentCalls(10) + .build()) + .build(); +``` + +Policies are optional — configure only what you need. + +## Policy Reference + +| Policy | Guards against | Exception thrown | Applied on | +|---|---|---|---| +| **CircuitBreaker** | Persistent server failures | `CallNotPermittedException` | `sendMessage`, `connect` | +| **Retry** | Transient failures | Last exception / `MaxRetriesExceededException` | `sendMessage`, `connect` | +| **TimeLimiter** | Slow servers exceeding deadline | `TimeoutException` | `sendMessage` only | +| **RateLimiter** | Request rate exceeding a threshold | `RequestNotPermitted` | `sendMessage` only | +| **Bulkhead** | Too many concurrent in-flight requests | `BulkheadFullException` | `sendMessage` only | + +`connect()` uses only CircuitBreaker and Retry — session establishment is not throttled or timed out, as it can legitimately take longer than a single request. + +## Policy Ordering + +Policies are applied in the following order (outermost to innermost): + +``` +Retry → CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead → MCP Server +``` + +This is the standard Resilience4j recommended hierarchy. Each position is deliberate: + +- **Retry is outermost** so it orchestrates the entire inner chain per attempt. If Retry were inside CircuitBreaker, the CB would only see the outcome of the entire retry loop — delaying its failure detection by `maxAttempts × timeout`. +- **Bulkhead is innermost** so concurrency slots are held only during actual execution, not during Retry's backoff sleep. If Bulkhead were outermost, a failing request would clog a slot for the full backoff duration, blocking healthy concurrent callers. +- **RateLimiter is inside Retry** so each retry attempt consumes a rate token. This ensures your local token count matches the actual number of requests the server receives. +- **TimeLimiter is per-attempt** so each retry gets a fresh timeout window rather than sharing one budget across all attempts. + +## Using Shared Registries + +Register instances by name to observe policy state via Micrometer or the Resilience4j admin endpoints: + +```java +CircuitBreakerRegistry registry = CircuitBreakerRegistry.ofDefaults(); + +// Each transport gets a unique name — essential when sharing a registry +McpClientTransport weatherTransport = ResilientMcpClientTransport.builder(rawWeatherTransport) + .circuitBreakerName("mcp-weather") + .circuitBreakerRegistry(registry) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); + +McpClientTransport searchTransport = ResilientMcpClientTransport.builder(rawSearchTransport) + .circuitBreakerName("mcp-search") + .circuitBreakerRegistry(registry) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); +``` + +> **Note:** Resilience4j registries cache instances by name. If you supply a registry and a config but the name already exists in the registry, the existing instance is returned and your config is **silently ignored** by the registry. `ResilientMcpClientTransport` logs a `WARN` when this happens. Always use unique names when creating multiple transports from a shared registry. + +## Observability + +Circuit breaker state transitions and retry events are logged automatically at construction time — no metrics system required. + +| Event | Level | Example | +|---|---|---| +| Circuit breaker state change | `INFO` | `MCP circuit breaker 'mcp-weather': CLOSED -> OPEN` | +| Call rejected (circuit open) | `WARN` | `MCP circuit breaker 'mcp-weather' is OPEN, call rejected` | +| Retry attempt | `DEBUG` | `MCP retry 'mcp-weather': attempt #2` | +| Retry exhausted | `WARN` | `MCP retry 'mcp-weather' exhausted after 3 attempt(s)` | + +For Micrometer-based metrics (Prometheus, Datadog, etc.), add `resilience4j-micrometer` to your dependencies and bind the registry to your `MeterRegistry`: + +```java +TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(registry) + .bindTo(meterRegistry); +``` + +## Integration with Google ADK + +When using [Google ADK](https://google.github.io/adk-docs/), the `McpTransportBuilder` interface is the only injection point for custom transport behaviour. Implement it to wrap the raw transport transparently: + +```java +public class ResilientMcpTransportBuilder implements McpTransportBuilder { + + private final McpTransportBuilder delegate; + private final CircuitBreakerRegistry cbRegistry; + + @Override + public McpClientTransport build(Object serverParameters) { + McpClientTransport raw = delegate.build(serverParameters); + return ResilientMcpClientTransport.builder(raw) + .circuitBreakerName("mcp-" + endpointName) + .circuitBreakerRegistry(cbRegistry) + .circuitBreakerConfig(CircuitBreakerConfig.custom() + .slidingWindowSize(10) + .failureRateThreshold(50) + .waitDurationInOpenState(Duration.ofSeconds(30)) + .build()) + .retryConfig(RetryConfig.custom() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(500)) + .build()) + .timeLimiterConfig(TimeLimiterConfig.custom() + .timeoutDuration(Duration.ofSeconds(8)) + .build()) + .build(); + } +} +``` + +Pass `ResilientMcpTransportBuilder` wherever ADK expects an `McpTransportBuilder`. `McpSessionManager` calls `build()` lazily on first use, so each server connection gets its own named policy instance. + +## Sample Request Flow + +A normal `sendMessage()` call with all five policies configured: + +``` +caller + └─ Retry (attempt 1) + └─ CircuitBreaker [CLOSED — passes through, records attempt] + └─ RateLimiter [token available — acquires, passes through] + └─ TimeLimiter [8s countdown starts] + └─ Bulkhead [slot available — acquires] + └─ MCP Server ──► response in 200ms + Bulkhead slot released + TimeLimiter cancelled + CircuitBreaker records SUCCESS + Retry: no failure, done +caller receives response +``` + +On a transient failure with retry: + +``` +Retry (attempt 1) → server fails → CB records failure #1 +Retry waits 500ms (Bulkhead slot already released) +Retry (attempt 2) → server succeeds → CB records success #1 +caller receives response +``` + +On persistent failures, the CircuitBreaker opens after the sliding window fills with failures and subsequent calls are rejected immediately without a network round-trip. + +## Building from Source + +```bash +./mvnw clean install -pl mcp-resilience4j -am -DskipTests +``` + +To run the module's tests: + +```bash +./mvnw test -pl mcp-resilience4j +``` diff --git a/mcp-resilience4j/pom.xml b/mcp-resilience4j/pom.xml new file mode 100644 index 000000000..889c3d469 --- /dev/null +++ b/mcp-resilience4j/pom.xml @@ -0,0 +1,193 @@ + + + 4.0.0 + + io.modelcontextprotocol.sdk + mcp-parent + 2.1.1-SNAPSHOT + + mcp-resilience4j + jar + Java MCP SDK Resilience4j Integration + Resilience4j integration for the Java MCP SDK providing circuit breaker, retry, and time limiter support for MCP transport + https://github.com/modelcontextprotocol/java-sdk + + + https://github.com/modelcontextprotocol/java-sdk + scm:git:git://github.com/modelcontextprotocol/java-sdk.git + scm:git:ssh://git@github.com/modelcontextprotocol/java-sdk.git + + + + 2.2.0 + + + + + io.modelcontextprotocol.sdk + mcp-core + 2.1.1-SNAPSHOT + provided + + + + io.projectreactor + reactor-core + + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + + + io.github.resilience4j + resilience4j-circuitbreaker + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-retry + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-bulkhead + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-ratelimiter + ${resilience4j.version} + + + + io.github.resilience4j + resilience4j-timelimiter + ${resilience4j.version} + + + + + io.github.resilience4j + resilience4j-reactor + ${resilience4j.version} + + + + + io.github.resilience4j + resilience4j-micrometer + ${resilience4j.version} + true + + + + io.micrometer + micrometer-core + 1.13.0 + true + + + + + jakarta.servlet + jakarta.servlet-api + ${jakarta.servlet.version} + true + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit.version} + test + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + org.assertj + assertj-core + ${assert4j.version} + test + + + + io.projectreactor + reactor-test + test + + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + + net.bytebuddy + byte-buddy + ${byte-buddy.version} + test + + + + + + + jackson3 + + true + + + + io.modelcontextprotocol.sdk + mcp-json-jackson3 + 2.1.1-SNAPSHOT + test + + + + + jackson2 + + + io.modelcontextprotocol.sdk + mcp-json-jackson2 + 2.1.1-SNAPSHOT + test + + + + + + diff --git a/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java new file mode 100644 index 000000000..c26a7cbdf --- /dev/null +++ b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/McpResilienceConfig.java @@ -0,0 +1,335 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.github.resilience4j.timelimiter.TimeLimiterRegistry; +import io.modelcontextprotocol.spec.McpClientTransport; + +/** + * High-level configuration facade that wires transport-level resilience for MCP clients. + * + *

+ * {@code McpResilienceConfig} provides a fluent DSL for configuring + * {@link ResilientMcpClientTransport} — applying circuit breaking, retry, and time + * limiting at the wire level to all outbound MCP messages. + * + *

+ * Usage example: + * + *

{@code
+ * McpResilienceConfig config = McpResilienceConfig.builder()
+ * 		.transportCircuitBreaker(CircuitBreakerConfig.custom()
+ * 				.slidingWindowSize(10)
+ * 				.failureRateThreshold(50)
+ * 				.waitDurationInOpenState(Duration.ofSeconds(30))
+ * 				.build())
+ * 		.transportRetry(RetryConfig.custom()
+ * 				.maxAttempts(3)
+ * 				.waitDuration(Duration.ofMillis(500))
+ * 				.build())
+ * 		.transportTimeLimiter(TimeLimiterConfig.custom()
+ * 				.timeoutDuration(Duration.ofSeconds(8))
+ * 				.build())
+ * 		.build();
+ *
+ * McpClientTransport resilientTransport = config.wrapTransport(rawTransport);
+ * McpAsyncClient client = McpClient.async(resilientTransport).build();
+ * }
+ * + * @author Pratyay Pandey + * @see ResilientMcpClientTransport + */ +public final class McpResilienceConfig { + + private final CircuitBreakerConfig transportCircuitBreakerConfig; + + private final String transportCircuitBreakerName; + + private final CircuitBreakerRegistry transportCircuitBreakerRegistry; + + private final RetryConfig transportRetryConfig; + + private final String transportRetryName; + + private final RetryRegistry transportRetryRegistry; + + private final TimeLimiterConfig transportTimeLimiterConfig; + + private final String transportTimeLimiterName; + + private final TimeLimiterRegistry transportTimeLimiterRegistry; + + private final RateLimiterConfig transportRateLimiterConfig; + + private final String transportRateLimiterName; + + private final RateLimiterRegistry transportRateLimiterRegistry; + + private final BulkheadConfig transportBulkheadConfig; + + private final String transportBulkheadName; + + private final BulkheadRegistry transportBulkheadRegistry; + + private McpResilienceConfig(Builder builder) { + this.transportCircuitBreakerConfig = builder.transportCircuitBreakerConfig; + this.transportCircuitBreakerName = builder.transportCircuitBreakerName; + this.transportCircuitBreakerRegistry = builder.transportCircuitBreakerRegistry; + this.transportRetryConfig = builder.transportRetryConfig; + this.transportRetryName = builder.transportRetryName; + this.transportRetryRegistry = builder.transportRetryRegistry; + this.transportTimeLimiterConfig = builder.transportTimeLimiterConfig; + this.transportTimeLimiterName = builder.transportTimeLimiterName; + this.transportTimeLimiterRegistry = builder.transportTimeLimiterRegistry; + this.transportRateLimiterConfig = builder.transportRateLimiterConfig; + this.transportRateLimiterName = builder.transportRateLimiterName; + this.transportRateLimiterRegistry = builder.transportRateLimiterRegistry; + this.transportBulkheadConfig = builder.transportBulkheadConfig; + this.transportBulkheadName = builder.transportBulkheadName; + this.transportBulkheadRegistry = builder.transportBulkheadRegistry; + } + + /** + * Wraps the given transport with the configured transport-level resilience policies. + * @param transport the raw transport to wrap + * @return a {@link ResilientMcpClientTransport} with circuit breaker, retry, and time + * limiter applied + */ + public McpClientTransport wrapTransport(McpClientTransport transport) { + ResilientMcpClientTransport.Builder builder = ResilientMcpClientTransport.builder(transport); + // Always forward the name whenever either config or registry is set, so that + // a registry-only caller's custom name is not silently dropped. + if (this.transportCircuitBreakerConfig != null || this.transportCircuitBreakerRegistry != null) { + builder.circuitBreakerName(this.transportCircuitBreakerName); + } + if (this.transportCircuitBreakerConfig != null) { + builder.circuitBreakerConfig(this.transportCircuitBreakerConfig); + } + if (this.transportCircuitBreakerRegistry != null) { + builder.circuitBreakerRegistry(this.transportCircuitBreakerRegistry); + } + if (this.transportRetryConfig != null || this.transportRetryRegistry != null) { + builder.retryName(this.transportRetryName); + } + if (this.transportRetryConfig != null) { + builder.retryConfig(this.transportRetryConfig); + } + if (this.transportRetryRegistry != null) { + builder.retryRegistry(this.transportRetryRegistry); + } + if (this.transportTimeLimiterConfig != null || this.transportTimeLimiterRegistry != null) { + builder.timeLimiterName(this.transportTimeLimiterName); + } + if (this.transportTimeLimiterConfig != null) { + builder.timeLimiterConfig(this.transportTimeLimiterConfig); + } + if (this.transportTimeLimiterRegistry != null) { + builder.timeLimiterRegistry(this.transportTimeLimiterRegistry); + } + if (this.transportRateLimiterConfig != null || this.transportRateLimiterRegistry != null) { + builder.rateLimiterName(this.transportRateLimiterName); + } + if (this.transportRateLimiterConfig != null) { + builder.rateLimiterConfig(this.transportRateLimiterConfig); + } + if (this.transportRateLimiterRegistry != null) { + builder.rateLimiterRegistry(this.transportRateLimiterRegistry); + } + if (this.transportBulkheadConfig != null || this.transportBulkheadRegistry != null) { + builder.bulkheadName(this.transportBulkheadName); + } + if (this.transportBulkheadConfig != null) { + builder.bulkheadConfig(this.transportBulkheadConfig); + } + if (this.transportBulkheadRegistry != null) { + builder.bulkheadRegistry(this.transportBulkheadRegistry); + } + return builder.build(); + } + + /** + * Creates a new builder for {@link McpResilienceConfig}. + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link McpResilienceConfig}. + */ + public static final class Builder { + + private static final String DEFAULT_TRANSPORT_NAME = "mcp-transport"; + + private CircuitBreakerConfig transportCircuitBreakerConfig; + + private String transportCircuitBreakerName = DEFAULT_TRANSPORT_NAME; + + private CircuitBreakerRegistry transportCircuitBreakerRegistry; + + private RetryConfig transportRetryConfig; + + private String transportRetryName = DEFAULT_TRANSPORT_NAME; + + private RetryRegistry transportRetryRegistry; + + private TimeLimiterConfig transportTimeLimiterConfig; + + private String transportTimeLimiterName = DEFAULT_TRANSPORT_NAME; + + private TimeLimiterRegistry transportTimeLimiterRegistry; + + private RateLimiterConfig transportRateLimiterConfig; + + private String transportRateLimiterName = DEFAULT_TRANSPORT_NAME; + + private RateLimiterRegistry transportRateLimiterRegistry; + + private BulkheadConfig transportBulkheadConfig; + + private String transportBulkheadName = DEFAULT_TRANSPORT_NAME; + + private BulkheadRegistry transportBulkheadRegistry; + + private Builder() { + } + + public Builder transportCircuitBreaker(CircuitBreakerConfig config) { + if (config == null) { + throw new IllegalArgumentException("CircuitBreakerConfig must not be null"); + } + this.transportCircuitBreakerConfig = config; + return this; + } + + public Builder transportCircuitBreakerName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Circuit breaker name must not be blank"); + } + this.transportCircuitBreakerName = name; + return this; + } + + public Builder transportCircuitBreakerRegistry(CircuitBreakerRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("CircuitBreakerRegistry must not be null"); + } + this.transportCircuitBreakerRegistry = registry; + return this; + } + + public Builder transportRetry(RetryConfig config) { + if (config == null) { + throw new IllegalArgumentException("RetryConfig must not be null"); + } + this.transportRetryConfig = config; + return this; + } + + public Builder transportRetryName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Retry name must not be blank"); + } + this.transportRetryName = name; + return this; + } + + public Builder transportRetryRegistry(RetryRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RetryRegistry must not be null"); + } + this.transportRetryRegistry = registry; + return this; + } + + public Builder transportTimeLimiter(TimeLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("TimeLimiterConfig must not be null"); + } + this.transportTimeLimiterConfig = config; + return this; + } + + public Builder transportTimeLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("TimeLimiter name must not be blank"); + } + this.transportTimeLimiterName = name; + return this; + } + + public Builder transportTimeLimiterRegistry(TimeLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("TimeLimiterRegistry must not be null"); + } + this.transportTimeLimiterRegistry = registry; + return this; + } + + public Builder transportRateLimiter(RateLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("RateLimiterConfig must not be null"); + } + this.transportRateLimiterConfig = config; + return this; + } + + public Builder transportRateLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("RateLimiter name must not be blank"); + } + this.transportRateLimiterName = name; + return this; + } + + public Builder transportRateLimiterRegistry(RateLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RateLimiterRegistry must not be null"); + } + this.transportRateLimiterRegistry = registry; + return this; + } + + public Builder transportBulkhead(BulkheadConfig config) { + if (config == null) { + throw new IllegalArgumentException("BulkheadConfig must not be null"); + } + this.transportBulkheadConfig = config; + return this; + } + + public Builder transportBulkheadName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Bulkhead name must not be blank"); + } + this.transportBulkheadName = name; + return this; + } + + public Builder transportBulkheadRegistry(BulkheadRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("BulkheadRegistry must not be null"); + } + this.transportBulkheadRegistry = registry; + return this; + } + + public McpResilienceConfig build() { + return new McpResilienceConfig(this); + } + + } + +} diff --git a/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java new file mode 100644 index 000000000..907def3a6 --- /dev/null +++ b/mcp-resilience4j/src/main/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransport.java @@ -0,0 +1,607 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; +import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; +import io.github.resilience4j.reactor.retry.RetryOperator; +import io.github.resilience4j.reactor.timelimiter.TimeLimiterOperator; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.retry.RetryRegistry; +import io.github.resilience4j.timelimiter.TimeLimiter; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.github.resilience4j.timelimiter.TimeLimiterRegistry; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * A decorator for {@link McpClientTransport} that applies Resilience4j policies to all + * outbound MCP messages and connection attempts. + * + *

+ * Wraps any {@link McpClientTransport} implementation and applies circuit breaking, retry + * with backoff, and time limiting at the wire level, before requests reach the server. + * This is the lowest-level resilience point in the MCP call graph: + * + *

+ * McpAsyncClient.callTool()
+ *   └─> McpClientSession.sendRequest()
+ *        └─> [ResilientMcpClientTransport] ← policies applied here
+ *             └─> HttpClientStreamableHttpTransport / StdioClientTransport
+ * 
+ * + *

+ * The standard policy hierarchy for {@code sendMessage} (outermost to innermost) is: + * Retry → CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead. Bulkhead is innermost so + * a slot is only held during active execution, not during retry backoff sleeps. + * RateLimiter is inside Retry so every attempt — including retries — consumes a token, + * accurately reflecting the load the downstream service actually sees. + * + *

+ * Bulkhead and RateLimiter are intentionally not applied to {@code connect()} — session + * establishment should never be throttled. + * + *

+ * Usage example: + * + *

{@code
+ * McpClientTransport transport = ResilientMcpClientTransport.builder(rawTransport)
+ * 		.circuitBreakerConfig(CircuitBreakerConfig.custom()
+ * 				.slidingWindowSize(10)
+ * 				.failureRateThreshold(50)
+ * 				.waitDurationInOpenState(Duration.ofSeconds(30))
+ * 				.build())
+ * 		.retryConfig(RetryConfig.custom()
+ * 				.maxAttempts(3)
+ * 				.waitDuration(Duration.ofMillis(500))
+ * 				.build())
+ * 		.timeLimiterConfig(TimeLimiterConfig.custom()
+ * 				.timeoutDuration(Duration.ofSeconds(8))
+ * 				.build())
+ * 		.rateLimiterConfig(RateLimiterConfig.custom()
+ * 				.limitForPeriod(10)
+ * 				.limitRefreshPeriod(Duration.ofSeconds(1))
+ * 				.build())
+ * 		.bulkheadConfig(BulkheadConfig.custom()
+ * 				.maxConcurrentCalls(5)
+ * 				.build())
+ * 		.build();
+ *
+ * McpAsyncClient client = McpClient.async(transport).build();
+ * }
+ * + * @author Pratyay Pandey + * @see McpResilienceConfig + */ +public final class ResilientMcpClientTransport implements McpClientTransport { + + private static final Logger logger = LoggerFactory.getLogger(ResilientMcpClientTransport.class); + + private static final String DEFAULT_NAME = "mcp-transport"; + + private final McpClientTransport delegate; + + private final CircuitBreaker circuitBreaker; + + private final Retry retry; + + private final TimeLimiter timeLimiter; + + private final RateLimiter rateLimiter; + + private final Bulkhead bulkhead; + + private ResilientMcpClientTransport(McpClientTransport delegate, CircuitBreaker circuitBreaker, Retry retry, + TimeLimiter timeLimiter, RateLimiter rateLimiter, Bulkhead bulkhead) { + this.delegate = delegate; + this.circuitBreaker = circuitBreaker; + this.retry = retry; + this.timeLimiter = timeLimiter; + this.rateLimiter = rateLimiter; + this.bulkhead = bulkhead; + if (circuitBreaker != null) { + circuitBreaker.getEventPublisher() + .onStateTransition(e -> logger.info("MCP circuit breaker '{}': {} -> {}", circuitBreaker.getName(), + e.getStateTransition().getFromState(), e.getStateTransition().getToState())) + .onCallNotPermitted( + e -> logger.warn("MCP circuit breaker '{}' is OPEN, call rejected", circuitBreaker.getName())); + } + if (retry != null) { + retry.getEventPublisher() + .onRetry( + e -> logger.debug("MCP retry '{}': attempt #{}", retry.getName(), e.getNumberOfRetryAttempts())) + .onError(e -> logger.warn("MCP retry '{}' exhausted after {} attempt(s)", retry.getName(), + e.getNumberOfRetryAttempts())); + } + } + + /** + * Sends a message through the delegate transport, applying configured resilience + * policies. The standard hierarchy (outermost to innermost) is: Retry → + * CircuitBreaker → RateLimiter → TimeLimiter → Bulkhead. + * + *

+ * Bulkhead is innermost so a slot is held only during active execution — during a + * retry backoff sleep the slot is released back to the pool. RateLimiter is inside + * Retry so every retry attempt consumes a token, accurately reflecting the request + * rate the downstream service actually sees. + * @param message the JSON-RPC message to send + * @return a {@link Mono} that completes when the message is sent or fails after + * exhausting resilience policies + */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + // Mono.defer ensures delegate.sendMessage() is called on each retry attempt, + // not just once at assembly time. + // Operators are applied innermost-first: each transformDeferred wraps the + // previous, + // so the last one applied becomes the outermost subscriber. + Mono op = Mono.defer(() -> this.delegate.sendMessage(message)); + if (this.bulkhead != null) { + op = op.transformDeferred(BulkheadOperator.of(this.bulkhead)); + } + if (this.timeLimiter != null) { + op = op.transformDeferred(TimeLimiterOperator.of(this.timeLimiter)); + } + if (this.rateLimiter != null) { + op = op.transformDeferred(RateLimiterOperator.of(this.rateLimiter)); + } + if (this.circuitBreaker != null) { + op = op.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker)); + } + if (this.retry != null) { + op = op.transformDeferred(RetryOperator.of(this.retry)); + } + return op; + } + + /** + * Connects to the MCP server, applying circuit breaker and retry policies to the + * connection attempt. The time limiter is not applied here as connection setup may + * legitimately take longer than a single request timeout. + * @param handler the incoming message handler + * @return a {@link Mono} that completes when the transport is ready + */ + @Override + public Mono connect(Function, Mono> handler) { + Mono connectOp = Mono.defer(() -> this.delegate.connect(handler)); + if (this.circuitBreaker != null) { + connectOp = connectOp.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker)); + } + if (this.retry != null) { + connectOp = connectOp.transformDeferred(RetryOperator.of(this.retry)); + } + return connectOp; + } + + @Override + public void setExceptionHandler(Consumer handler) { + this.delegate.setExceptionHandler(handler); + } + + @Override + public Mono closeGracefully() { + return this.delegate.closeGracefully(); + } + + @Override + public T unmarshalFrom(Object data, TypeRef typeRef) { + return this.delegate.unmarshalFrom(data, typeRef); + } + + @Override + public List protocolVersions() { + return this.delegate.protocolVersions(); + } + + /** + * Returns the underlying {@link CircuitBreaker} for monitoring or metric binding. + * @return the circuit breaker, or {@code null} if not configured + */ + public CircuitBreaker getCircuitBreaker() { + return this.circuitBreaker; + } + + /** + * Returns the underlying {@link Retry} policy. + * @return the retry policy, or {@code null} if not configured + */ + public Retry getRetry() { + return this.retry; + } + + /** + * Returns the underlying {@link TimeLimiter} policy. + * @return the time limiter, or {@code null} if not configured + */ + public TimeLimiter getTimeLimiter() { + return this.timeLimiter; + } + + /** + * Returns the underlying {@link RateLimiter} policy. + * @return the rate limiter, or {@code null} if not configured + */ + public RateLimiter getRateLimiter() { + return this.rateLimiter; + } + + /** + * Returns the underlying {@link Bulkhead} policy. + * @return the bulkhead, or {@code null} if not configured + */ + public Bulkhead getBulkhead() { + return this.bulkhead; + } + + /** + * Creates a new builder for {@link ResilientMcpClientTransport}. + * @param delegate the transport to wrap + * @return a new builder instance + */ + public static Builder builder(McpClientTransport delegate) { + return new Builder(delegate); + } + + /** + * Builder for {@link ResilientMcpClientTransport}. + */ + public static final class Builder { + + private final McpClientTransport delegate; + + private CircuitBreakerConfig circuitBreakerConfig; + + private String circuitBreakerName = DEFAULT_NAME; + + private CircuitBreakerRegistry circuitBreakerRegistry; + + private RetryConfig retryConfig; + + private String retryName = DEFAULT_NAME; + + private RetryRegistry retryRegistry; + + private TimeLimiterConfig timeLimiterConfig; + + private String timeLimiterName = DEFAULT_NAME; + + private TimeLimiterRegistry timeLimiterRegistry; + + private RateLimiterConfig rateLimiterConfig; + + private String rateLimiterName = DEFAULT_NAME; + + private RateLimiterRegistry rateLimiterRegistry; + + private BulkheadConfig bulkheadConfig; + + private String bulkheadName = DEFAULT_NAME; + + private BulkheadRegistry bulkheadRegistry; + + private Builder(McpClientTransport delegate) { + if (delegate == null) { + throw new IllegalArgumentException("Delegate transport must not be null"); + } + this.delegate = delegate; + } + + /** + * Configures a circuit breaker with the given configuration. A circuit breaker + * registry will be created internally using this configuration. + * @param config the circuit breaker configuration + * @return this builder + */ + public Builder circuitBreakerConfig(CircuitBreakerConfig config) { + if (config == null) { + throw new IllegalArgumentException("CircuitBreakerConfig must not be null"); + } + this.circuitBreakerConfig = config; + return this; + } + + /** + * Sets the name used to register the circuit breaker in its registry. Useful when + * exposing the registry to a Hystrix-compatible metrics stream. + * @param name the circuit breaker name + * @return this builder + */ + public Builder circuitBreakerName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Circuit breaker name must not be blank"); + } + this.circuitBreakerName = name; + return this; + } + + /** + * Provides an existing {@link CircuitBreakerRegistry} from which the circuit + * breaker will be retrieved. When provided, the registry-level default config is + * used unless {@link #circuitBreakerConfig} is also set. + * @param registry the circuit breaker registry + * @return this builder + */ + public Builder circuitBreakerRegistry(CircuitBreakerRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("CircuitBreakerRegistry must not be null"); + } + this.circuitBreakerRegistry = registry; + return this; + } + + /** + * Configures a retry policy. + * @param config the retry configuration + * @return this builder + */ + public Builder retryConfig(RetryConfig config) { + if (config == null) { + throw new IllegalArgumentException("RetryConfig must not be null"); + } + this.retryConfig = config; + return this; + } + + /** + * Sets the name used to register the retry policy in its registry. + * @param name the retry name + * @return this builder + */ + public Builder retryName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Retry name must not be blank"); + } + this.retryName = name; + return this; + } + + /** + * Provides an existing {@link RetryRegistry}. + * @param registry the retry registry + * @return this builder + */ + public Builder retryRegistry(RetryRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RetryRegistry must not be null"); + } + this.retryRegistry = registry; + return this; + } + + /** + * Configures a time limiter applied to each individual {@code sendMessage} call. + * @param config the time limiter configuration + * @return this builder + */ + public Builder timeLimiterConfig(TimeLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("TimeLimiterConfig must not be null"); + } + this.timeLimiterConfig = config; + return this; + } + + /** + * Sets the name for the time limiter in its registry. + * @param name the time limiter name + * @return this builder + */ + public Builder timeLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("TimeLimiter name must not be blank"); + } + this.timeLimiterName = name; + return this; + } + + /** + * Provides an existing {@link TimeLimiterRegistry}. + * @param registry the time limiter registry + * @return this builder + */ + public Builder timeLimiterRegistry(TimeLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("TimeLimiterRegistry must not be null"); + } + this.timeLimiterRegistry = registry; + return this; + } + + /** + * Configures a rate limiter applied to each {@code sendMessage} call. + * @param config the rate limiter configuration + * @return this builder + */ + public Builder rateLimiterConfig(RateLimiterConfig config) { + if (config == null) { + throw new IllegalArgumentException("RateLimiterConfig must not be null"); + } + this.rateLimiterConfig = config; + return this; + } + + /** + * Sets the name for the rate limiter in its registry. + * @param name the rate limiter name + * @return this builder + */ + public Builder rateLimiterName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("RateLimiter name must not be blank"); + } + this.rateLimiterName = name; + return this; + } + + /** + * Provides an existing {@link RateLimiterRegistry}. + * @param registry the rate limiter registry + * @return this builder + */ + public Builder rateLimiterRegistry(RateLimiterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("RateLimiterRegistry must not be null"); + } + this.rateLimiterRegistry = registry; + return this; + } + + /** + * Configures a bulkhead limiting concurrent in-flight {@code sendMessage} calls. + * @param config the bulkhead configuration + * @return this builder + */ + public Builder bulkheadConfig(BulkheadConfig config) { + if (config == null) { + throw new IllegalArgumentException("BulkheadConfig must not be null"); + } + this.bulkheadConfig = config; + return this; + } + + /** + * Sets the name for the bulkhead in its registry. + * @param name the bulkhead name + * @return this builder + */ + public Builder bulkheadName(String name) { + if (name == null || name.isBlank()) { + throw new IllegalArgumentException("Bulkhead name must not be blank"); + } + this.bulkheadName = name; + return this; + } + + /** + * Provides an existing {@link BulkheadRegistry}. + * @param registry the bulkhead registry + * @return this builder + */ + public Builder bulkheadRegistry(BulkheadRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("BulkheadRegistry must not be null"); + } + this.bulkheadRegistry = registry; + return this; + } + + /** + * Builds the {@link ResilientMcpClientTransport}. + * @return a new instance wrapping the delegate with the configured policies + */ + public ResilientMcpClientTransport build() { + CircuitBreaker cb = resolveCircuitBreaker(); + Retry retry = resolveRetry(); + TimeLimiter timeLimiter = resolveTimeLimiter(); + RateLimiter rateLimiter = resolveRateLimiter(); + Bulkhead bulkhead = resolveBulkhead(); + return new ResilientMcpClientTransport(this.delegate, cb, retry, timeLimiter, rateLimiter, bulkhead); + } + + private CircuitBreaker resolveCircuitBreaker() { + if (this.circuitBreakerConfig == null && this.circuitBreakerRegistry == null) { + return null; + } + CircuitBreakerRegistry registry = (this.circuitBreakerRegistry != null) ? this.circuitBreakerRegistry + : CircuitBreakerRegistry.of(this.circuitBreakerConfig); + if (this.circuitBreakerConfig != null && this.circuitBreakerRegistry != null + && registry.find(this.circuitBreakerName).isPresent()) { + logger.warn( + "Circuit breaker '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.circuitBreakerName); + } + CircuitBreakerConfig config = (this.circuitBreakerConfig != null) ? this.circuitBreakerConfig + : CircuitBreakerConfig.ofDefaults(); + return registry.circuitBreaker(this.circuitBreakerName, config); + } + + private Retry resolveRetry() { + if (this.retryConfig == null && this.retryRegistry == null) { + return null; + } + RetryRegistry registry = (this.retryRegistry != null) ? this.retryRegistry + : RetryRegistry.of(this.retryConfig); + if (this.retryConfig != null && this.retryRegistry != null && registry.find(this.retryName).isPresent()) { + logger.warn( + "Retry '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.retryName); + } + RetryConfig config = (this.retryConfig != null) ? this.retryConfig : RetryConfig.ofDefaults(); + return registry.retry(this.retryName, config); + } + + private TimeLimiter resolveTimeLimiter() { + if (this.timeLimiterConfig == null && this.timeLimiterRegistry == null) { + return null; + } + TimeLimiterRegistry registry = (this.timeLimiterRegistry != null) ? this.timeLimiterRegistry + : TimeLimiterRegistry.of(this.timeLimiterConfig); + if (this.timeLimiterConfig != null && this.timeLimiterRegistry != null + && registry.find(this.timeLimiterName).isPresent()) { + logger.warn( + "TimeLimiter '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.timeLimiterName); + } + TimeLimiterConfig config = (this.timeLimiterConfig != null) ? this.timeLimiterConfig + : TimeLimiterConfig.ofDefaults(); + return registry.timeLimiter(this.timeLimiterName, config); + } + + private RateLimiter resolveRateLimiter() { + if (this.rateLimiterConfig == null && this.rateLimiterRegistry == null) { + return null; + } + RateLimiterRegistry registry = (this.rateLimiterRegistry != null) ? this.rateLimiterRegistry + : RateLimiterRegistry.of(this.rateLimiterConfig); + if (this.rateLimiterConfig != null && this.rateLimiterRegistry != null + && registry.find(this.rateLimiterName).isPresent()) { + logger.warn( + "RateLimiter '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.rateLimiterName); + } + RateLimiterConfig config = (this.rateLimiterConfig != null) ? this.rateLimiterConfig + : RateLimiterConfig.ofDefaults(); + return registry.rateLimiter(this.rateLimiterName, config); + } + + private Bulkhead resolveBulkhead() { + if (this.bulkheadConfig == null && this.bulkheadRegistry == null) { + return null; + } + BulkheadRegistry registry = (this.bulkheadRegistry != null) ? this.bulkheadRegistry + : BulkheadRegistry.of(this.bulkheadConfig); + if (this.bulkheadConfig != null && this.bulkheadRegistry != null + && registry.find(this.bulkheadName).isPresent()) { + logger.warn( + "Bulkhead '{}' already exists in the provided registry; the supplied config is ignored. Use a unique name to register a distinct instance.", + this.bulkheadName); + } + BulkheadConfig config = (this.bulkheadConfig != null) ? this.bulkheadConfig : BulkheadConfig.ofDefaults(); + return registry.bulkhead(this.bulkheadName, config); + } + + } + +} diff --git a/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java b/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java new file mode 100644 index 000000000..f21a5d7b8 --- /dev/null +++ b/mcp-resilience4j/src/test/java/io/modelcontextprotocol/resilience/ResilientMcpClientTransportTests.java @@ -0,0 +1,213 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.resilience; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.retry.RetryConfig; +import io.github.resilience4j.timelimiter.TimeLimiterConfig; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ResilientMcpClientTransport}. + */ +@ExtendWith(MockitoExtension.class) +class ResilientMcpClientTransportTests { + + @Mock + private McpClientTransport delegateTransport; + + private McpSchema.JSONRPCMessage testMessage; + + @BeforeEach + void setUp() { + this.testMessage = new McpSchema.JSONRPCRequest("tools/call", "1", null); + } + + @Test + void builderRequiresNonNullDelegate() { + assertThatThrownBy(() -> ResilientMcpClientTransport.builder(null).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Delegate transport must not be null"); + } + + @Test + void sendMessageDelegatesToUnderlyingTransportWhenNoPolicies() { + when(this.delegateTransport.sendMessage(any())).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyComplete(); + + verify(this.delegateTransport).sendMessage(this.testMessage); + } + + @Test + void sendMessageAppliesRetryOnTransientFailure() { + AtomicInteger callCount = new AtomicInteger(0); + when(this.delegateTransport.sendMessage(any())).thenAnswer(inv -> { + int count = callCount.incrementAndGet(); + if (count < 3) { + return Mono.error(new RuntimeException("transient error")); + } + return Mono.empty(); + }); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .retryConfig(RetryConfig.custom().maxAttempts(3).waitDuration(Duration.ZERO).build()) + .build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyComplete(); + + assertThat(callCount.get()).isEqualTo(3); + } + + @Test + void sendMessageFailsAfterMaxRetryAttempts() { + when(this.delegateTransport.sendMessage(any())) + .thenReturn(Mono.error(new RuntimeException("persistent error"))); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .retryConfig(RetryConfig.custom().maxAttempts(2).waitDuration(Duration.ZERO).build()) + .build(); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(RuntimeException.class); + + verify(this.delegateTransport, times(2)).sendMessage(this.testMessage); + } + + @Test + void sendMessageOpensCircuitBreakerAfterThresholdExceeded() { + when(this.delegateTransport.sendMessage(any())).thenReturn(Mono.error(new RuntimeException("server error"))); + + CircuitBreakerConfig cbConfig = CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(4) + .minimumNumberOfCalls(4) + .failureRateThreshold(100) + .build(); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .circuitBreakerConfig(cbConfig) + .build(); + + // Force the circuit breaker open by exhausting the sliding window + for (int i = 0; i < 4; i++) { + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(RuntimeException.class); + } + + // Circuit should now be OPEN — next call must short-circuit + assertThat(transport.getCircuitBreaker().getState()).isEqualTo(CircuitBreaker.State.OPEN); + + StepVerifier.create(transport.sendMessage(this.testMessage)).verifyError(CallNotPermittedException.class); + } + + @Test + void sendMessageTimeLimiterCancelsSlowOperations() { + // thenAnswer defers Mono.delay creation to call time so it is constructed inside + // the withVirtualTime supplier and uses the virtual scheduler, not real time. + when(this.delegateTransport.sendMessage(any())).thenAnswer(inv -> Mono.delay(Duration.ofSeconds(10)).then()); + + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(100)).build()) + .build(); + + StepVerifier.withVirtualTime(() -> transport.sendMessage(this.testMessage)) + .thenAwait(Duration.ofSeconds(1)) + .verifyError(java.util.concurrent.TimeoutException.class); + } + + @Test + void connectDelegatesToUnderlyingTransport() { + when(this.delegateTransport.connect(any())).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + Function, Mono> handler = m -> m; + + StepVerifier.create(transport.connect(handler)).verifyComplete(); + + verify(this.delegateTransport).connect(handler); + } + + @Test + void closeGracefullyDelegatesToUnderlyingTransport() { + when(this.delegateTransport.closeGracefully()).thenReturn(Mono.empty()); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + verify(this.delegateTransport).closeGracefully(); + } + + @Test + void setExceptionHandlerDelegatesToUnderlyingTransport() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + Consumer handler = ex -> { + }; + + transport.setExceptionHandler(handler); + + verify(this.delegateTransport).setExceptionHandler(handler); + } + + @Test + void unmarshalFromDelegatesToUnderlyingTransport() { + TypeRef typeRef = new TypeRef<>() { + }; + when(this.delegateTransport.unmarshalFrom(any(), any())).thenReturn("result"); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + String result = transport.unmarshalFrom("data", typeRef); + + assertThat(result).isEqualTo("result"); + verify(this.delegateTransport).unmarshalFrom("data", typeRef); + } + + @Test + void protocolVersionsDelegatesToUnderlyingTransport() { + when(this.delegateTransport.protocolVersions()).thenReturn(List.of("2025-03-26")); + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + + List versions = transport.protocolVersions(); + + assertThat(versions).containsExactly("2025-03-26"); + } + + @Test + void getCircuitBreakerReturnsNullWhenNotConfigured() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport).build(); + assertThat(transport.getCircuitBreaker()).isNull(); + } + + @Test + void getCircuitBreakerReturnsInstanceWhenConfigured() { + ResilientMcpClientTransport transport = ResilientMcpClientTransport.builder(this.delegateTransport) + .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); + assertThat(transport.getCircuitBreaker()).isNotNull(); + } + +} diff --git a/mcp-test/pom.xml b/mcp-test/pom.xml index 40cf42d36..4ef5043bc 100644 --- a/mcp-test/pom.xml +++ b/mcp-test/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp-test jar @@ -24,7 +24,7 @@ io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT @@ -159,7 +159,7 @@ io.modelcontextprotocol.sdk mcp-json-jackson3 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT test @@ -170,7 +170,7 @@ io.modelcontextprotocol.sdk mcp-json-jackson2 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT test diff --git a/mcp/pom.xml b/mcp/pom.xml index 8749bb0d2..63b5b9f93 100644 --- a/mcp/pom.xml +++ b/mcp/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT mcp jar @@ -25,13 +25,13 @@ io.modelcontextprotocol.sdk mcp-json-jackson3 - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT io.modelcontextprotocol.sdk mcp-core - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT diff --git a/pom.xml b/pom.xml index f60f3918b..0e7f63fce 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.modelcontextprotocol.sdk mcp-parent - 2.0.1-SNAPSHOT + 2.1.1-SNAPSHOT pom https://github.com/modelcontextprotocol/java-sdk @@ -109,6 +109,7 @@ mcp-json-jackson2 mcp-json-jackson3 mcp-test + mcp-resilience4j conformance-tests