-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Add list_all_* helpers that drain pagination on the client #3021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| from typing import Any | ||
|
|
||
| from mcp_types import ListResourcesResult, PaginatedRequestParams, Resource | ||
|
|
||
| from mcp import Client | ||
| from mcp.server import Server, ServerRequestContext | ||
|
|
||
| BOOKS = [f"book-{n}" for n in range(1, 101)] | ||
|
|
||
| PAGE_SIZE = 10 | ||
|
|
||
|
|
||
| async def list_books(ctx: ServerRequestContext[Any], params: PaginatedRequestParams | None) -> ListResourcesResult: | ||
| start = 0 if params is None or params.cursor is None else int(params.cursor) | ||
| end = start + PAGE_SIZE | ||
| page = [Resource(uri=f"books://catalog/{name}", name=name) for name in BOOKS[start:end]] | ||
| next_cursor = str(end) if end < len(BOOKS) else None | ||
| return ListResourcesResult(resources=page, next_cursor=next_cursor) | ||
|
|
||
|
|
||
| server = Server("Bookshop", on_list_resources=list_books) | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| async with Client(server) as client: | ||
| # Every page, stitched into one list. | ||
| resources = await client.list_all_resources() | ||
| print(f"{len(resources)} resources") | ||
|
|
||
| # Or stream them, and stop as soon as you have what you need. | ||
| async for resource in client.iter_all_resources(): | ||
| print(f"first: {resource.name}") | ||
| break |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ | |
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Track all previously seen pagination cursors, not only the immediately previous one. Alternating/cyclic cursors can still make Prompt for AI agents |
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Awaitable, Callable, Mapping | ||
| from collections.abc import AsyncIterator, Awaitable, Callable, Mapping | ||
| from contextlib import AsyncExitStack | ||
| from dataclasses import KW_ONLY, dataclass, field | ||
| from typing import Any, Literal, TypeVar | ||
|
|
@@ -26,11 +26,15 @@ | |
| ListToolsResult, | ||
| LoggingLevel, | ||
| PaginatedRequestParams, | ||
| Prompt, | ||
| PromptReference, | ||
| ReadResourceResult, | ||
| RequestParamsMeta, | ||
| Resource, | ||
| ResourceTemplate, | ||
| ResourceTemplateReference, | ||
| ServerCapabilities, | ||
| Tool, | ||
| ) | ||
| from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, MODERN_PROTOCOL_VERSIONS | ||
| from typing_extensions import deprecated | ||
|
|
@@ -367,7 +371,11 @@ async def list_resources( | |
| cursor: str | None = None, | ||
| meta: RequestParamsMeta | None = None, | ||
| ) -> ListResourcesResult: | ||
| """List available resources from the server.""" | ||
| """List a single page of available resources from the server. | ||
|
|
||
| Returns one page only. The result may include a `next_cursor` if more | ||
| pages are available. Use `list_all_resources` to drain every page. | ||
| """ | ||
| return await self.session.list_resources(params=PaginatedRequestParams(cursor=cursor, _meta=meta)) | ||
|
|
||
| async def list_resource_templates( | ||
|
|
@@ -376,7 +384,12 @@ async def list_resource_templates( | |
| cursor: str | None = None, | ||
| meta: RequestParamsMeta | None = None, | ||
| ) -> ListResourceTemplatesResult: | ||
| """List available resource templates from the server.""" | ||
| """List a single page of available resource templates from the server. | ||
|
|
||
| Returns one page only. The result may include a `next_cursor` if more | ||
| pages are available. Use `list_all_resource_templates` to drain every | ||
| page. | ||
| """ | ||
| return await self.session.list_resource_templates(params=PaginatedRequestParams(cursor=cursor, _meta=meta)) | ||
|
|
||
| async def read_resource( | ||
|
|
@@ -482,7 +495,11 @@ async def list_prompts( | |
| cursor: str | None = None, | ||
| meta: RequestParamsMeta | None = None, | ||
| ) -> ListPromptsResult: | ||
| """List available prompts from the server.""" | ||
| """List a single page of available prompts from the server. | ||
|
|
||
| Returns one page only. The result may include a `next_cursor` if more | ||
| pages are available. Use `list_all_prompts` to drain every page. | ||
| """ | ||
| return await self.session.list_prompts(params=PaginatedRequestParams(cursor=cursor, _meta=meta)) | ||
|
|
||
| async def get_prompt( | ||
|
|
@@ -566,9 +583,130 @@ async def complete( | |
| return await self.session.complete(ref=ref, argument=argument, context_arguments=context_arguments) | ||
|
|
||
| async def list_tools(self, *, cursor: str | None = None, meta: RequestParamsMeta | None = None) -> ListToolsResult: | ||
| """List available tools from the server.""" | ||
| """List a single page of available tools from the server. | ||
|
|
||
| Returns one page only. The result may include a `next_cursor` if more | ||
| pages are available. Use `list_all_tools` to drain every page. | ||
| """ | ||
| return await self.session.list_tools(params=PaginatedRequestParams(cursor=cursor, _meta=meta)) | ||
|
|
||
| async def iter_all_tools(self, *, meta: RequestParamsMeta | None = None) -> AsyncIterator[Tool]: | ||
| """Yield every tool from the server, paging through `next_cursor`. | ||
|
|
||
| Useful for streaming consumers that want to process tools without | ||
| materializing the full list in memory. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| cursor: str | None = None | ||
| while True: | ||
| result = await self.list_tools(cursor=cursor, meta=meta) | ||
| for tool in result.tools: | ||
| yield tool | ||
| if result.next_cursor is None: | ||
| return | ||
| if result.next_cursor == cursor: | ||
| raise RuntimeError( | ||
| "Server returned a pagination cursor that did not advance; refusing to page forever." | ||
| ) | ||
| cursor = result.next_cursor | ||
|
|
||
| async def list_all_tools(self, *, meta: RequestParamsMeta | None = None) -> list[Tool]: | ||
| """List every tool from the server, draining `next_cursor` across pages. | ||
|
|
||
| Unlike `list_tools`, which returns one page, this walks pagination | ||
| until the server reports no further pages and returns the combined | ||
| list. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| return [tool async for tool in self.iter_all_tools(meta=meta)] | ||
|
|
||
| async def iter_all_prompts(self, *, meta: RequestParamsMeta | None = None) -> AsyncIterator[Prompt]: | ||
| """Yield every prompt from the server, paging through `next_cursor`. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| cursor: str | None = None | ||
| while True: | ||
| result = await self.list_prompts(cursor=cursor, meta=meta) | ||
| for prompt in result.prompts: | ||
| yield prompt | ||
| if result.next_cursor is None: | ||
| return | ||
| if result.next_cursor == cursor: | ||
| raise RuntimeError( | ||
| "Server returned a pagination cursor that did not advance; refusing to page forever." | ||
| ) | ||
| cursor = result.next_cursor | ||
|
|
||
| async def list_all_prompts(self, *, meta: RequestParamsMeta | None = None) -> list[Prompt]: | ||
| """List every prompt from the server, draining `next_cursor` across pages. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| return [prompt async for prompt in self.iter_all_prompts(meta=meta)] | ||
|
|
||
| async def iter_all_resources(self, *, meta: RequestParamsMeta | None = None) -> AsyncIterator[Resource]: | ||
| """Yield every resource from the server, paging through `next_cursor`. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| cursor: str | None = None | ||
| while True: | ||
| result = await self.list_resources(cursor=cursor, meta=meta) | ||
| for resource in result.resources: | ||
| yield resource | ||
| if result.next_cursor is None: | ||
| return | ||
| if result.next_cursor == cursor: | ||
| raise RuntimeError( | ||
| "Server returned a pagination cursor that did not advance; refusing to page forever." | ||
| ) | ||
| cursor = result.next_cursor | ||
|
|
||
| async def list_all_resources(self, *, meta: RequestParamsMeta | None = None) -> list[Resource]: | ||
| """List every resource from the server, draining `next_cursor` across pages. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| return [resource async for resource in self.iter_all_resources(meta=meta)] | ||
|
|
||
| async def iter_all_resource_templates( | ||
| self, *, meta: RequestParamsMeta | None = None | ||
| ) -> AsyncIterator[ResourceTemplate]: | ||
| """Yield every resource template from the server, paging through `next_cursor`. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| cursor: str | None = None | ||
| while True: | ||
| result = await self.list_resource_templates(cursor=cursor, meta=meta) | ||
| for template in result.resource_templates: | ||
| yield template | ||
| if result.next_cursor is None: | ||
| return | ||
| if result.next_cursor == cursor: | ||
| raise RuntimeError( | ||
| "Server returned a pagination cursor that did not advance; refusing to page forever." | ||
| ) | ||
| cursor = result.next_cursor | ||
|
|
||
| async def list_all_resource_templates(self, *, meta: RequestParamsMeta | None = None) -> list[ResourceTemplate]: | ||
| """List every resource template from the server, draining `next_cursor` across pages. | ||
|
|
||
| Raises: | ||
| RuntimeError: The server returned a pagination cursor that did not advance. | ||
| """ | ||
| return [template async for template in self.iter_all_resource_templates(meta=meta)] | ||
|
|
||
| @deprecated("The roots capability is deprecated as of 2026-07-28 (SEP-2577).", category=MCPDeprecationWarning) | ||
| async def send_roots_list_changed(self) -> None: | ||
| """Send a notification that the roots list has changed.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |||||||||||||
|
|
||||||||||||||
| import contextlib | ||||||||||||||
| import logging | ||||||||||||||
| from collections.abc import Callable | ||||||||||||||
| from collections.abc import Awaitable, Callable | ||||||||||||||
| from dataclasses import dataclass | ||||||||||||||
| from types import TracebackType | ||||||||||||||
| from typing import Any, Literal, TypeAlias, overload | ||||||||||||||
|
|
@@ -67,6 +67,28 @@ class StreamableHttpParameters(BaseModel): | |||||||||||||
| ServerParameters: TypeAlias = StdioServerParameters | SseServerParameters | StreamableHttpParameters | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| async def _drain_paginated( | ||||||||||||||
| fetch_page: Callable[..., Awaitable[Any]], | ||||||||||||||
| attribute: str, | ||||||||||||||
| ) -> list[Any]: | ||||||||||||||
| """Drain a paginated `session.list_*` call across `next_cursor` pages. | ||||||||||||||
|
|
||||||||||||||
| `fetch_page` is one of the ClientSession `list_*` methods that takes a | ||||||||||||||
| `params=PaginatedRequestParams(...)` keyword. `attribute` is the name of | ||||||||||||||
| the list attribute on the result (e.g. `"tools"`, `"prompts"`). | ||||||||||||||
| """ | ||||||||||||||
| items: list[Any] = [] | ||||||||||||||
| cursor: str | None = None | ||||||||||||||
| while True: | ||||||||||||||
| params = types.PaginatedRequestParams(cursor=cursor) if cursor is not None else None | ||||||||||||||
| result = await fetch_page(params=params) | ||||||||||||||
| items.extend(getattr(result, attribute)) | ||||||||||||||
| next_cursor = getattr(result, "next_cursor", None) | ||||||||||||||
| if next_cursor is None: | ||||||||||||||
| return items | ||||||||||||||
| cursor = next_cursor | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Pagination draining in Prompt for AI agents
Suggested change
|
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| # Use dataclass instead of Pydantic BaseModel | ||||||||||||||
| # because Pydantic BaseModel cannot handle Protocol fields. | ||||||||||||||
| @dataclass | ||||||||||||||
|
|
@@ -383,9 +405,11 @@ async def _aggregate_components(self, server_info: types.Implementation, session | |||||||||||||
| tools_temp: dict[str, types.Tool] = {} | ||||||||||||||
| tool_to_session_temp: dict[str, mcp.ClientSession] = {} | ||||||||||||||
|
|
||||||||||||||
| # Query the server for its prompts and aggregate to list. | ||||||||||||||
| # Query the server for its prompts and aggregate to list. Drain | ||||||||||||||
| # pagination so we don't drop later pages on servers that split | ||||||||||||||
| # results across multiple `next_cursor` responses. | ||||||||||||||
| try: | ||||||||||||||
| prompts = (await session.list_prompts()).prompts | ||||||||||||||
| prompts = await _drain_paginated(session.list_prompts, "prompts") | ||||||||||||||
| for prompt in prompts: | ||||||||||||||
| name = self._component_name(prompt.name, server_info) | ||||||||||||||
| prompts_temp[name] = prompt | ||||||||||||||
|
|
@@ -395,7 +419,7 @@ async def _aggregate_components(self, server_info: types.Implementation, session | |||||||||||||
|
|
||||||||||||||
| # Query the server for its resources and aggregate to list. | ||||||||||||||
| try: | ||||||||||||||
| resources = (await session.list_resources()).resources | ||||||||||||||
| resources = await _drain_paginated(session.list_resources, "resources") | ||||||||||||||
| for resource in resources: | ||||||||||||||
| name = self._component_name(resource.name, server_info) | ||||||||||||||
| resources_temp[name] = resource | ||||||||||||||
|
|
@@ -405,7 +429,7 @@ async def _aggregate_components(self, server_info: types.Implementation, session | |||||||||||||
|
|
||||||||||||||
| # Query the server for its tools and aggregate to list. | ||||||||||||||
| try: | ||||||||||||||
| tools = (await session.list_tools()).tools | ||||||||||||||
| tools = await _drain_paginated(session.list_tools, "tools") | ||||||||||||||
| for tool in tools: | ||||||||||||||
| name = self._component_name(tool.name, server_info) | ||||||||||||||
| tools_temp[name] = tool | ||||||||||||||
|
|
||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Warning overstates the non-advancing cursor guard:
ClientSessionGrouppagination drains do not raise and can still loop forever. Scope the warning toClientlist/iter helpers or add the same guard to the group implementation.Prompt for AI agents