Add low-latency raw memory search#173
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a low-latency raw search endpoint and enhances the existing search functionality with optional answer synthesis and latency tracking. Key changes include the implementation of search_raw and answer_from_sources in the retrieval pipeline, the addition of TTL-based caching for profile catalogs and retrieval plans, and the inclusion of detailed latency metrics in search responses. Feedback focuses on optimizing performance by parallelizing domain searches, ensuring the profile catalog retrieval is asynchronous to avoid blocking the event loop, and managing cache memory usage through bounded collections and hashed keys.
| if "profile" in domain_set: | ||
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | ||
| if "temporal" in domain_set: | ||
| results.extend(await self._search_temporal(query, user_id, top_k)) | ||
| if "summary" in domain_set: | ||
| results.extend(await self._search_summary(query, user_id, top_k)) | ||
| if "snippet" in domain_set: | ||
| results.extend(await self._search_snippet(query, user_id, top_k)) |
There was a problem hiding this comment.
The current implementation of search_raw executes searches across different domains sequentially. To achieve true low-latency as intended by this PR, these searches should be executed in parallel using asyncio.gather.
| if "profile" in domain_set: | |
| results.extend(await self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| results.extend(await self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| results.extend(await self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| results.extend(await self._search_snippet(query, user_id, top_k)) | |
| domain_set = set(domains) | |
| tasks = [] | |
| if "profile" in domain_set: | |
| tasks.append(self._search_profile_raw(query, user_id, top_k)) | |
| if "temporal" in domain_set: | |
| tasks.append(self._search_temporal(query, user_id, top_k)) | |
| if "summary" in domain_set: | |
| tasks.append(self._search_summary(query, user_id, top_k)) | |
| if "snippet" in domain_set: | |
| tasks.append(self._search_snippet(query, user_id, top_k)) | |
| task_results = await asyncio.gather(*tasks) | |
| results: List[SourceRecord] = [item for sublist in task_results for item in sublist] |
There was a problem hiding this comment.
Resolved in the current head: search_raw now builds per-domain tasks and awaits asyncio.gather(*tasks, return_exceptions=True), so the raw path runs requested domain searches concurrently while preserving healthy domain results if one domain fails. The same concurrent path now includes the code domain as well.
| def _get_profile_catalog(self, user_id: str): | ||
| cached = self._profile_catalog_cache.get(user_id) | ||
| now = time.monotonic() | ||
| if cached and cached[0] > now: | ||
| return cached[1], cached[2] | ||
|
|
||
| catalog, results = self._fetch_profile_catalog(user_id) | ||
| self._profile_catalog_cache[user_id] = ( | ||
| now + _CACHE_TTL_SECONDS, | ||
| catalog, | ||
| results, | ||
| ) | ||
| return catalog, results |
There was a problem hiding this comment.
The _get_profile_catalog method is synchronous and performs network I/O via _fetch_profile_catalog (which calls vector_store.search_by_metadata). Calling this from an async context like run or search_raw will block the entire event loop, significantly impacting performance and defeating the purpose of a low-latency path. This should be made asynchronous.
| def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| catalog, results = self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results | |
| async def _get_profile_catalog(self, user_id: str): | |
| cached = self._profile_catalog_cache.get(user_id) | |
| now = time.monotonic() | |
| if cached and cached[0] > now: | |
| return cached[1], cached[2] | |
| # Assuming search_by_metadata is made async or wrapped in an executor | |
| catalog, results = await self._fetch_profile_catalog(user_id) | |
| self._profile_catalog_cache[user_id] = ( | |
| now + _CACHE_TTL_SECONDS, | |
| catalog, | |
| results, | |
| ) | |
| return catalog, results |
There was a problem hiding this comment.
Resolved in the current head: _get_profile_catalog is now async, and the blocking metadata fetch runs through asyncio.to_thread(self._fetch_profile_catalog, user_id), so the retrieval path no longer blocks the event loop while loading the profile catalog.
| self._profile_catalog_cache: Dict[str, tuple[float, List[Dict[str, str]], List[Any]]] = {} | ||
| self._retrieval_plan_cache: Dict[tuple[str, str, int, str], tuple[float, AIMessage]] = {} | ||
| self._latency_samples: Dict[str, List[float]] = {} |
There was a problem hiding this comment.
The caches _profile_catalog_cache and _retrieval_plan_cache are unbounded dictionaries that only expire entries on access. This can lead to a memory leak as entries for users who do not return will persist indefinitely. Consider using a cache with a maximum size and an eviction policy (e.g., cachetools.TTLCache).
There was a problem hiding this comment.
Resolved in the current head: _profile_catalog_cache and _retrieval_plan_cache are now OrderedDict caches with TTL checks plus explicit max-size trimming via _trim_cache. Latency samples are also bounded to avoid unbounded growth.
| ] | ||
|
|
||
| ai_response: AIMessage = await self.model_with_tools.ainvoke(messages) | ||
| plan_key = (user_id, query.strip(), top_k, catalog_text) |
There was a problem hiding this comment.
Using the entire catalog_text as part of the cache key for _retrieval_plan_cache can be memory-intensive if the catalog is large. Consider using a hash of the catalog_text instead.
import hashlib
catalog_hash = hashlib.sha256(catalog_text.encode()).hexdigest()
plan_key = (user_id, query.strip(), top_k, catalog_hash)There was a problem hiding this comment.
Resolved in the current head: the retrieval plan key now uses hashlib.sha256(catalog_text.encode(utf-8)).hexdigest() instead of storing the full catalog text in the cache key.
|
Follow-up after the latest push at that time: The earlier performance review items were addressed in the same branch: raw domain searches run concurrently, profile catalog lookup is async, and both caches are bounded with hashed plan keys. Local verification at that point was Update from the same 2026-05-11 head: GitHub Actions Test Suite was green, with Unit, API, and Integration Tests and End-to-End Tests passing on |
|
@strongkeep-debug thank you for your contribution pls review the gemini suggestions and resolve them pls make sure to also add comment on the suggestions :) |
|
@ved015 done. I replied directly on all four Gemini suggestions and verified that branch head covered them at the time: raw domain searches run concurrently with asyncio.gather, profile catalog loading is async via asyncio.to_thread, the profile/plan caches and latency samples are bounded, and the plan cache key uses a SHA-256 catalog hash. PR checks were green on that earlier head; the newer test-only follow-up comment below has the current CI state. |
|
Small follow-up pushed: Local validation on the current head: The PR label check passed on the new head. The Test Suite workflow is currently marked |
Addresses #163.
This PR turns memory search into a true low-latency path. Raw search now goes through
RetrievalPipeline.search_rawand returns ranked profile, temporal, summary, snippet, and code annotation hits without retrieval-plan tool selection.answer=truesynthesizes from those already-fetched hits when a caller wants a generated answer, and the root/searchalias is wired for clients that need the shorter path.test_raw_search_returns_ranked_hits_without_tool_selectionconfirms no tool-selection call is made and verifies a code hit keeps file and symbol metadata./v1/memory/searchacceptscodein the domain list and includes it in the default raw search domain set.test_memory_search_route_accepts_code_domaincovers the request validator and serialized response shape.answer=truesynthesizes from collected raw hits without doing agentic retrieval planning first.test_root_search_alias_can_synthesize_answercovers the alias and answer mode.test_retrieval_pipeline_caches_catalog_and_retrieval_plancovers cache reuse.test_raw_search_skips_failed_domains_and_normalizes_scoresand the API route regression cover both pipeline and serialization behavior.Validation was run locally: