Skip to content

Add low-latency raw memory search#173

Open
strongkeep-debug wants to merge 5 commits into
XortexAI:mainfrom
strongkeep-debug:codex/163-search-fast-path
Open

Add low-latency raw memory search#173
strongkeep-debug wants to merge 5 commits into
XortexAI:mainfrom
strongkeep-debug:codex/163-search-fast-path

Conversation

@strongkeep-debug
Copy link
Copy Markdown

@strongkeep-debug strongkeep-debug commented May 11, 2026

Addresses #163.

This PR turns memory search into a true low-latency path. Raw search now goes through RetrievalPipeline.search_raw and returns ranked profile, temporal, summary, snippet, and code annotation hits without retrieval-plan tool selection. answer=true synthesizes from those already-fetched hits when a caller wants a generated answer, and the root /search alias is wired for clients that need the shorter path.

Area Change Evidence
Raw retrieval Selected domains are searched directly and ranked by score, including the code annotation domain requested by the issue. test_raw_search_returns_ranked_hits_without_tool_selection confirms no tool-selection call is made and verifies a code hit keeps file and symbol metadata.
API contract /v1/memory/search accepts code in the domain list and includes it in the default raw search domain set. test_memory_search_route_accepts_code_domain covers the request validator and serialized response shape.
Optional answer answer=true synthesizes from collected raw hits without doing agentic retrieval planning first. test_root_search_alias_can_synthesize_answer covers the alias and answer mode.
Caching and latency Profile catalogs and retrieval plans are cached for the agentic path, and bounded p50/p95/p99 latency snapshots are recorded for raw, answer, and agentic modes. test_retrieval_pipeline_caches_catalog_and_retrieval_plan covers cache reuse.
Robustness Raw search now normalizes missing/non-finite backend scores and keeps healthy domain results when another requested domain fails. test_raw_search_skips_failed_domains_and_normalizes_scores and the API route regression cover both pipeline and serialization behavior.

Validation was run locally:

.\.venv\Scripts\python.exe -m pytest -q
50 passed

.\.venv\Scripts\python.exe -m pytest tests\integration\test_retrieval_pipeline.py tests\api\test_memory_search_routes.py -q
12 passed

.\.venv\Scripts\python.exe -m ruff check src\pipelines\retrieval.py src\api\routes\memory.py tests\integration\test_retrieval_pipeline.py tests\api\test_memory_search_routes.py
All checks passed!

git diff --check
passed

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a low-latency raw search endpoint and enhances the existing search functionality with optional answer synthesis and latency tracking. Key changes include the implementation of search_raw and answer_from_sources in the retrieval pipeline, the addition of TTL-based caching for profile catalogs and retrieval plans, and the inclusion of detailed latency metrics in search responses. Feedback focuses on optimizing performance by parallelizing domain searches, ensuring the profile catalog retrieval is asynchronous to avoid blocking the event loop, and managing cache memory usage through bounded collections and hashed keys.

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +280 to +287
if "profile" in domain_set:
results.extend(await self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
results.extend(await self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
results.extend(await self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
results.extend(await self._search_snippet(query, user_id, top_k))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of search_raw executes searches across different domains sequentially. To achieve true low-latency as intended by this PR, these searches should be executed in parallel using asyncio.gather.

Suggested change
if "profile" in domain_set:
results.extend(await self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
results.extend(await self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
results.extend(await self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
results.extend(await self._search_snippet(query, user_id, top_k))
domain_set = set(domains)
tasks = []
if "profile" in domain_set:
tasks.append(self._search_profile_raw(query, user_id, top_k))
if "temporal" in domain_set:
tasks.append(self._search_temporal(query, user_id, top_k))
if "summary" in domain_set:
tasks.append(self._search_summary(query, user_id, top_k))
if "snippet" in domain_set:
tasks.append(self._search_snippet(query, user_id, top_k))
task_results = await asyncio.gather(*tasks)
results: List[SourceRecord] = [item for sublist in task_results for item in sublist]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in the current head: search_raw now builds per-domain tasks and awaits asyncio.gather(*tasks, return_exceptions=True), so the raw path runs requested domain searches concurrently while preserving healthy domain results if one domain fails. The same concurrent path now includes the code domain as well.

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +572 to +584
def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]

catalog, results = self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The _get_profile_catalog method is synchronous and performs network I/O via _fetch_profile_catalog (which calls vector_store.search_by_metadata). Calling this from an async context like run or search_raw will block the entire event loop, significantly impacting performance and defeating the purpose of a low-latency path. This should be made asynchronous.

Suggested change
def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]
catalog, results = self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results
async def _get_profile_catalog(self, user_id: str):
cached = self._profile_catalog_cache.get(user_id)
now = time.monotonic()
if cached and cached[0] > now:
return cached[1], cached[2]
# Assuming search_by_metadata is made async or wrapped in an executor
catalog, results = await self._fetch_profile_catalog(user_id)
self._profile_catalog_cache[user_id] = (
now + _CACHE_TTL_SECONDS,
catalog,
results,
)
return catalog, results

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in the current head: _get_profile_catalog is now async, and the blocking metadata fetch runs through asyncio.to_thread(self._fetch_profile_catalog, user_id), so the retrieval path no longer blocks the event loop while loading the profile catalog.

Comment thread src/pipelines/retrieval.py Outdated
Comment on lines +140 to +142
self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], List[Any]]] = {}
self._retrieval_plan_cache: Dict[tuple[str, str, int, str], tuple[float, AIMessage]] = {}
self._latency_samples: Dict[str, List[float]] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The caches _profile_catalog_cache and _retrieval_plan_cache are unbounded dictionaries that only expire entries on access. This can lead to a memory leak as entries for users who do not return will persist indefinitely. Consider using a cache with a maximum size and an eviction policy (e.g., cachetools.TTLCache).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in the current head: _profile_catalog_cache and _retrieval_plan_cache are now OrderedDict caches with TTL checks plus explicit max-size trimming via _trim_cache. Latency samples are also bounded to avoid unbounded growth.

Comment thread src/pipelines/retrieval.py Outdated
]

ai_response: AIMessage = await self.model_with_tools.ainvoke(messages)
plan_key = (user_id, query.strip(), top_k, catalog_text)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using the entire catalog_text as part of the cache key for _retrieval_plan_cache can be memory-intensive if the catalog is large. Consider using a hash of the catalog_text instead.

        import hashlib
        catalog_hash = hashlib.sha256(catalog_text.encode()).hexdigest()
        plan_key = (user_id, query.strip(), top_k, catalog_hash)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in the current head: the retrieval plan key now uses hashlib.sha256(catalog_text.encode(utf-8)).hexdigest() instead of storing the full catalog text in the cache key.

@strongkeep-debug
Copy link
Copy Markdown
Author

strongkeep-debug commented May 11, 2026

Follow-up after the latest push at that time: 5548b63 included the requested code domain in the raw search path, preserved repository/file/symbol/type/severity metadata, and hardened raw search against missing or non-finite backend scores. It also kept healthy domain results if another requested domain failed.

The earlier performance review items were addressed in the same branch: raw domain searches run concurrently, profile catalog lookup is async, and both caches are bounded with hashed plan keys. Local verification at that point was 50 passed for the full suite, 12 passed for the targeted retrieval/API tests, touched-file Ruff clean, and git diff --check clean.

Update from the same 2026-05-11 head: GitHub Actions Test Suite was green, with Unit, API, and Integration Tests and End-to-End Tests passing on 5548b63. A later test-only commit, a3f6429, added missing-score regression coverage; see the newer comment for its current CI state.

@ved015
Copy link
Copy Markdown
Contributor

ved015 commented May 13, 2026

@strongkeep-debug thank you for your contribution pls review the gemini suggestions and resolve them pls make sure to also add comment on the suggestions :)

@strongkeep-debug
Copy link
Copy Markdown
Author

strongkeep-debug commented May 13, 2026

@ved015 done. I replied directly on all four Gemini suggestions and verified that branch head covered them at the time: raw domain searches run concurrently with asyncio.gather, profile catalog loading is async via asyncio.to_thread, the profile/plan caches and latency samples are bounded, and the plan cache key uses a SHA-256 catalog hash. PR checks were green on that earlier head; the newer test-only follow-up comment below has the current CI state.

@strongkeep-debug
Copy link
Copy Markdown
Author

strongkeep-debug commented May 13, 2026

Small follow-up pushed: a3f6429 adds a regression test proving answer_from_sources() handles sources with score=None without exposing a score marker or re-entering tool selection. This is test-only; runtime behavior is unchanged from the prior hardening commit.

Local validation on the current head:

.venv\Scripts\python -m pytest tests/unit tests/api tests/integration tests/test_deterministic_memory_layer.py tests/test_enterprise_chat.py --cov=src/utils --cov=src/schemas --cov=src/pipelines --cov=src/enterprise --cov=src/api --cov-report=term-missing --cov-report=xml --cov-fail-under=70
50 passed, coverage 79.61% (70% gate)

.venv\Scripts\python -m pytest tests/e2e
1 passed

.venv\Scripts\python -m pytest tests\integration\test_retrieval_pipeline.py tests\api\test_memory_search_routes.py -q
13 passed

git diff --check
clean

The PR label check passed on the new head. The Test Suite workflow is currently marked action_required, so it may need maintainer approval to run on the forked PR; the local commands above mirror the two jobs in .github/workflows/tests.yml with the same test env values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants