Skip to content

[Feature] Add Distributed Posting Router for SPANN#448

Open
TerrenceZhangX wants to merge 119 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv
Open

[Feature] Add Distributed Posting Router for SPANN#448
TerrenceZhangX wants to merge 119 commits into
users/qiazh/pre-merge-tikv-bugfixfrom
users/zhangt/merge-distributed-to-tikv

Conversation

@TerrenceZhangX
Copy link
Copy Markdown

No description provided.

zhol01825 and others added 30 commits April 7, 2026 07:33
…c, benchmarks

Core routing (PostingRouter.h):
- Hash routing: GetOwner uses headID %% NumNodes for deterministic assignment
- RemoteLock RPC for cross-node Merge serialization (try_lock + retry)
- BatchAppend, HeadSync, InsertBatch packet types and handlers
- TCP-based server/client for inter-node communication

ExtraDynamicSearcher.h integration:
- EnableRouter/AdoptRouter for index lifecycle management
- Split: BroadcastHeadSync after creating/deleting heads
- MergePostings: Cross-node lock for neighbor headID on different node
- MergePostings: BroadcastHeadSync for deleted head after merge
- Reassign: Route Append to owner node + FlushRemoteAppends
- AddIndex: Route appends to owner node via QueueRemoteAppend
- SetHeadSyncCallback: Wire up HeadSync + RemoteLock callbacks

Infrastructure:
- IExtraSearcher/Index/VectorIndex: Add routing virtual method chain
- Options/ParameterDefinitionList: RouterEnabled, RouterLocalNodeIndex,
  RouterNodeAddrs, RouterNodeStores config params
- CMakeLists: Link Socket sources and Boost into SPTAGLibStatic
- Connection.cpp: Safe remote_endpoint() with error_code (no throw)
- Packet.h: Append, BatchAppend, InsertBatch, HeadSync, RemoteLock types
- SPFreshTest.cpp: ApplyRouterParams, FlushRemoteAppends, WorkerNode test
- Benchmark configs: 100k/1m/10m x 1/2/3 node
- run_scale_benchmarks.sh: Automated benchmark runner
- docker/tikv: TiKV cluster docker-compose + config

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix index dir creation: create parent dir only (not spann_index subdir)
  so the build code creates spann_index and the safety check passes
- Clear checkpoint after build phase so driver re-runs all insert batches
- Add VectorIndex::AdoptRouter to transfer router between batch clones
  instead of creating new TCP server per batch (port conflict fix)
- Fix ExtraDynamicSearcher::AdoptRouter to override IExtraSearcher interface

100k results (routing works, ~50/50 local/remote split):
  1-node steady-state: 90.2 vps
  2-node steady-state: 80.5 vps
  3-node steady-state: 77.7 vps
No scaling at 100k due to small batch size (100 vectors) and shared TiKV.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
12 slides covering: problem statement, solution architecture, full flow
comparison (single vs distributed), hash routing, append write path,
split/merge/reassign routing, HeadSync broadcast, 3-node sequence
diagram, design decisions, network protocol, config, and 100k results.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix ini thread params: replace NumThreads with NumSearchThreads/NumInsertThreads
- Add 100M benchmark ini files (1-node, 2-node, 3-node)
- Update data paths to /mnt/data_disk/sift1b in all ini files
- Add BENCHMARK_GUIDE_SCALE_COMPUTE.md (English)
- Add BENCHMARK_RESULTS_SCALE_COMPUTE.md with 100K/1M/10M results
- Update docker-compose and tikv.toml for 3-PD/3-TiKV cluster
- Update run_scale_benchmarks.sh with multi-scale orchestration
- Add .gitignore entries for generated benchmark artifacts
- Fix FullSearch routing for multi-node search (per-node build)
- Update 10M benchmark: insert throughput 2-node 1.65x, 3-node 1.98x
- Search latency 10M: 2-node -35%, 3-node -50% vs 1-node
- Near-linear insert scaling across all data sizes (100K, 1M, 10M)
- Update benchmark configs, test harness, and scale benchmark script
- Delete all 24 benchmark INI files from Test/
- Replace section 5 in BENCHMARK_GUIDE_SCALE_COMPUTE.md with complete
  deterministic generation rules (scale table, topology rules, template,
  Python generator script)
- INI files can be regenerated on demand via the guide
- Embed full docker-compose.yml and tikv.toml contents in
  BENCHMARK_GUIDE_SCALE_COMPUTE.md section 4.1
- Remove docker/tikv/ from git tracking (files stay on disk)
- Use <NVME_DIR> placeholder instead of absolute paths

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Bug fixes:
- SPANNIndex.cpp: Remove redundant SortResult() in FullSearchCallback that
  corrupted remote search results (heapsort on already-sorted data)
- TestDataGenerator.cpp: Fix EvaluateRecall truth NN stride from 1 to K

Feature:
- SPFreshTest.cpp: Add BuildOnly parameter to skip insert batches

Benchmark results (Float32/dim64, 10M scale):
- 1-node: 93.8 vps, 2-node: 200.0 vps (2.13x), 3-node: 271.4 vps (2.89x)
- Recall stable within each config after double-sort fix

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Measure RPC round-trip time (sendTime → future.get()) and assign
per-query latency for remote search results. Previously p50/min
latency showed 0 for remote queries.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Revert TiKVVersionMap node-scoped prefix optimization (vc:{nodeIndex}:...)
that broke cross-node version check correctness. Version map now uses
shared namespace (vc:{layer}:...) so all nodes can read/write the same
version data. Node-scoped optimization deferred to future branch.

Also add explanatory comments for AdoptRouter and HeadSync broadcast.
Restore commented-out debug log lines.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
These were only used by the reverted node-scoped version map.
LocalToGlobalVID is kept (used in insert path for VID uniqueness).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
InsertVectors now uses dual path: per-vector multi-threaded insert
for single-node (original behavior), bulk AddIndex for router-enabled
multi-node (amortizes RPC overhead via batched remote appends).

Also add comment on AddIndex explaining caller-side shard partitioning
and LocalToGlobalVID purpose.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Restore the original vidIdx counter loop for post-heap version
filtering instead of the candidateIndices array approach. Both are
functionally equivalent but the original pattern is simpler.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Restore 'For quantized index, pass GetFeatureDim()' comment
- Remove else { func(); } branch that was not in the original code

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use ConvertToString(valueType) and INI parameters to build the
perftest_* filenames, matching TestDataGenerator::RunLargeBatches
convention. Moves filename construction out of the per-batch loop.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Document the 3-file INI pattern for multi-node benchmarks:
- _build.ini: Rebuild=true, no Router (build phase)
- _driver.ini: Rebuild=false, Router enabled (driver/n0)
- _n{i}.ini: worker nodes (n1, n2, ...)

Update Python generator and shell script to match. Remove the
sed-based approach of patching _n0.ini at runtime.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move Router parameters (RouterEnabled, RouterLocalNodeIndex,
RouterNodeAddrs, RouterNodeStores) from DefineSSDParameter to
DefineRouterParameter with its own [Router] section in:
- ParameterDefinitionList.h: new DefineRouterParameter macro
- Options.h: SetParameter/GetParameter handle 'Router' section
- SPANNIndex.cpp: SaveConfig outputs [Router] block
- SPFreshTest.cpp: read [Router] INI section, ApplyRouterParams
  uses 'Router' section
- Benchmark guide: updated INI template and Python generator

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Distributed scale results belong in BENCHMARK_RESULTS_SCALE_COMPUTE.md,
not the single-node 10M results file.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
DefineRouterParameter was nested inside #ifdef DefineSSDParameter, causing
router parameters to be silently ignored. Moved the #endif to the correct
position so DefineRouterParameter is at top-level scope. Also removed a
debug log line from the DefineRouterParameter macro in Options.h.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Implement benchmark-level query distribution where each node independently
searches its contiguous partition of the query set, coordinated by barrier
files (same mechanism as insert distribution). This replaces the previous
RPC-based approach, eliminating RPC overhead and serial head search.

Key changes in SPFreshTest.cpp:
- BenchmarkQueryPerformance: partition queries across nodes, use barrier
  files for synchronization, compute QPS = totalQueries / max(wallTime)
- WorkerNode: unified command loop handling both search and insert commands
  via shared index directory

Results (10M Float32, 200 queries, TopK=5):
- 1-node: 194 QPS baseline
- 2-node: 404 QPS (2.08x speedup, super-linear due to cache effects)
- 3-node: 488 QPS (2.52x speedup)
- Insert scaling: 1→2→3 node = 119→211→314 vec/s (1.77x, 2.64x)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1. Fix remote lock leak in MergePostings (ExtraDynamicSearcher.h)
   - RAII RemoteLockGuard ensures remote lock is released on all exit
     paths (continue/return/exception), preventing distributed deadlock

2. Fix buffer overflow in BatchRouteSearch (SPANNIndex.cpp)
   - Validate response array sizes before accessing result vectors
   - Fall back to local search on size mismatch

3. Fix missing send-failure callback in SendRemoteLock (PostingRouter.h)
   - Add failure callback to complete the promise on send error,
     matching the pattern used by all other SendPacket call sites
   - Prevents 5-second stall on every send failure

4. Normalize atomic operation in SendRemoteLock (PostingRouter.h)
   - Change m_nextResourceId++ to fetch_add(1) for consistency

5. Fix uninitialized workerTime in barrier coordination (SPFreshTest.cpp)
   - Initialize workerTime and validate ifstream read
   - Skip worker timing on parse failure instead of using garbage value

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove BatchRouteSearch, SetFullSearchCallback, GetSearchNodeCount, and all
supporting RPC infrastructure (SearchPostingRequest/Response,
FullSearchBatchRequest/Response structs, search callbacks, handler methods,
and related member variables) from PostingRouter, SPANNIndex, Index,
VectorIndex, ExtraDynamicSearcher, SPFresh, and Packet.

Tests use barrier-based distributed search exclusively; the RPC-based search
routing is dead code. Existing SearchRequest/SearchResponse packet types are
preserved as they are used by the pre-existing Aggregator/Client/Server code.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Critical fixes:
- Fix integer overflow in DecodeVectorSearchResponse (ExtraTiKVController.h)
  Prevents OOB reads from corrupt TiKV responses with large numResults.
- Fix m_mergeJobsInFlight counter underflow in MergePostings retry paths
  (ExtraDynamicSearcher.h) Add increment before re-enqueued MergeAsyncJob
  to match the unconditional decrement in exec().

High fixes:
- Add FlushRemoteAppends after Split reassignment (ExtraDynamicSearcher.h)
  Ensures queued remote appends are sent after CollectReAssign in Split().
- Fix data race on m_nodeAddrs in ConnectToPeer (PostingRouter.h)
  Snapshot address under m_connMutex before retry loop.
- Fix BroadcastHeadSync reading m_nodeAddrs without lock (PostingRouter.h)
  Snapshot node count under m_connMutex before iterating.

Medium fixes:
- Fix m_storeToNodes race in AddNode - move inside m_connMutex scope.
- Fix unvalidated entryCount in HandleHeadSyncRequest with buffer-end
  tracking to prevent overruns from corrupt packets.
- Add buffer-end tracking in BatchRemoteAppendRequest::Read to catch
  overruns during per-item deserialization.
- Make m_asyncStatus atomic to fix race between async jobs and Checkpoint.
  Use exchange() for atomic read-and-reset in Checkpoint.
- Make shared ErrorCode ret atomic in LoadIndex and WriteDownAllPostingToDB
  parallel loops.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
zhangt and others added 22 commits May 9, 2026 05:23
Address feedback: doc previously jumped into mechanics without
establishing motivation. Add §0 covering:

  0.1 What problem are we solving — three single-node walls
      (storage, insert throughput, search tail) and three explicit
      non-goals (no replication, no distributed transactions, no
      live re-sharding).
  0.2 Why shard by headID — comparison vs per-vector / per-query /
      per-bucket alternatives. Posting is the natural unit of
      read/write/storage; head index stays cheap to replicate.
  0.3 Why a consistent hash ring (not modulo) — minimal remap on
      bring-up, vnodes for small-N smoothing, sensitivity to vnode
      hash quality (Bug 27).
  0.4 Operations and jobs at a glance — moved the original short
      list of Search/Insert/Append/Split/Merge/Reassign here.

Existing §1..§8 unchanged in content; numbering preserved.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Worker side observed 3,873 "MergePostings: abandoning headID … after
64 retries" warnings during v20 BATCH 1 (39% of submitted merges
abandoned), driver side had zero. Root cause:

* The receive-side BatchAppend handler runs Append() with
  m_rwLocks[headID] held during a TiKV Get+Put round-trip
  (tens to hundreds of ms in distributed mode).
* MergePostings tries to acquire m_rwLocks[candidate] with a
  non-blocking try_lock and re-queues on failure.
* The merge re-queue cycle through m_splitThreadPool is much
  shorter than the Append hold time, so the candidate is
  essentially never observed unlocked within the 64-retry budget.
* Driver doesn't see this because it doesn't process incoming
  BatchAppend RPCs at the same volume — appends to driver-owned
  heads come from local SPFreshTest threads, which release the
  lock between the Get+Put RTTs of bulk inserts.

Fix: use try_lock_for(50ms) so a single attempt can absorb one
Append RTT instead of bouncing through the thread pool. Total
worst-case wall time per merge stays bounded at 64 * 50ms = 3.2s,
preserving the original anti-livelock property.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The stride-shard env var was being read on the driver but the SSH
command line for workers wasn't forwarding it, so workers always saw
stride=0. Add explicit forwarding (defaulting to 0 when unset) for
both the build-receiver and run paths.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
LocalToGlobalVID was computing globalVID = init + offset*numNodes + slotIdx
where numNodes was m_nodeAddrs.size() (dispatcher + workers = 3 for 2-worker
config) and slotIdx was the 1-indexed network slot. The correct interleaving
formula for stride sharding requires the COMPUTE worker count and 0-indexed
shard position. The off-by-shape mapping placed inserted vectors' globalVIDs
at e.g. 5.5M (= 1M + 1.5M*3 + 2) when the truth file expected them in
[1M, 1.5M), causing BATCH 1 recall to collapse from 0.94 to 0.218.

Fix: introduce explicit compute-role accessors on NetworkNode:
  - GetNumWorkerNodes()    -- count of data-bearing shards (was m_nodeAddrs.size()-1)
  - GetNumDispatchNodes()  -- count of dispatchers (typically 1)
  - GetWorkerNodeIndex()   -- 0-indexed shard position (-1 if dispatcher-only)

Subclass Initialize() populates the m_num*Nodes / m_workerNodeIndex fields
explicitly; the existing GetLocalNodeIndex() / GetNumNodes() are kept
unchanged for callers that want network-routing semantics (dispatch
coordinator, packet routing, peer connection table).

ExtraDynamicSearcher's local wrappers are renamed to GetNumWorkerNodes /
GetWorkerNodeIndex so the five callers (LocalToGlobalVID, AddIDCapacity,
three Bug 25 grow paths) automatically use the right values. Log strings
updated to say 'numWorkers' instead of 'numNodes' for clarity.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
EvaluateRecall used res.size() to compute both distbase and the truth
file row index. In distributed mode each node only owns Q/N queries
(driver runs queries [0, Q/N), worker runs [Q/N, 2Q/N), ...), so
res.size() = Q/N rather than the global query count Q. The truth file
is laid out by global query count: row[batch * Q + q] holds the post-
batch ground truth for query q. With res.size()=Q/N at batch>=1, each
node read iter=0 truth rows for the next slice of queries instead of
iter=batch truth rows for its own queries, producing recall ~= 0 by
random chance. BATCH 0 happened to look correct because batch * res.size()
collapses to 0 regardless.

v23 BATCH 1 recall = 0.0000 was caused by this bug, not by the Bug 30
LocalToGlobalVID refactor (which is separately verified as correct).
v24 with this fix: BATCH 0 = 0.9180, BATCH 1 = 0.7820 on the shrunken
200k+100k 2-node verification, with 0 [Bug 25] grew versionMap markers
on either node.

Add optional totalQueries / queryOffset parameters (default -1, 0) to
EvaluateRecall; legacy single-node call sites stay correct via defaults.
BenchmarkQueryPerformance passes the global numQueries and per-node
myStart in distributed mode.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Previously SearchIndex ran two phases sequentially:
  Stage 1: db->MultiGet (or MultiScanPostings) for local TiKV postings
  Stage 2: per-remote-node SendRemoteFetchPostings RPC fan-out
Stage 2 only began after Stage 1 returned, so total search latency was
the sum of local TiKV IO + cross-node RPC even though they target
disjoint data. Profiles showed ~50% of search latency was Stage 2
serialised behind Stage 1.

Plan A overlaps the two stages with a barrier-based merge that keeps
versionMap consistent for the Stage 3 scoring loop:
  Stage 0 (cheap): build per-remote-node fan-out plan from postingIDs.
  Stage 1A (peer threads): launch one std::thread per remote node;
                           each thread runs SendRemoteFetchPostings
                           and stashes results in PER-FETCHER SCRATCH
                           ONLY (rf.postings). They DO NOT touch
                           m_pageBuffers or m_versionMap, so they
                           cannot race with the in-flight MultiGet
                           (which zeroes-then-fills every pageBuffer
                           slot) or with each other.
  Stage 1B (this thread): db->MultiGet runs concurrently.
  Barrier:                join all fetchers regardless of localRC.
  Stage 2 (this thread, sequential): copy each fetched posting into
                           its pageBuffer slot and apply Bug 21/22/25/30
                           versionMap sync. Single-writer post-barrier
                           guarantees record A and curVer A come from
                           the same versionMap snapshot during the
                           Stage 3 scoring loop.

Functional behaviour for Bug 17 (scatter-gather), Bug 21 (mirror
remote versions), Bug 22 (SetVersionBatch), Bug 25 (auto-grow
versionMap for cross-node VIDs), and Bug 30 (numWorkers safeBound)
is preserved bit-for-bit; only the schedule changes.

Expected latency saving: ~30% of total query time (the cross-node
RPC duration that previously serialised behind the local read).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…etry budget

Two-part fix for the BATCH 1 livelock observed in v25 (1M base + 1M insert).

Part A (TiKVVersionMap.h): five RMW paths inside SetVersionBatch /
WriteVersionByte / IncVersion / AddBatch / AddBatchInitialDeleted now use
the cached chunk reader (ReadChunkCached) instead of a fresh ReadChunk.
This keeps blocking TiKV gRPC out of the chunk-stripe critical sections
on the hot path, so HandleBatchAppendRequest receivers no longer
serialize behind the SetVersionBatch worker holding the chunk mutex.

Part B (ExtraTiKVController.h): bound a single Get/Put attempt to a
15-second per-RPC deadline (down from 60s) and add an outer wallclock
retry budget of 3 attempts (45s). Callers commonly pass MaxTimeout =
microseconds::max(); without the explicit cap and overflow guard, gRPC
deadline math overflows and the BlockingUnaryCall stalls indefinitely
when the channel is under heavy concurrent load. With this fix, stuck
calls fail-fast and the outer retry path can recover or surface the
error rather than wedging the receiver pool.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…e::Initialize

Initialize() previously created m_server first, then m_client. This is unsafe
because server handlers can fire as soon as a peer connects to the listening
socket. In particular, the ring-update / HeadSync handlers call back into
ConnectToPeer → m_client->ConnectToServer, which dereferences m_client.
With the original ordering, that lookup hits a null unique_ptr and
segfaults during the Pre-build phase.

Symptom (segfault during Pre-build, several launches in v27/v28):
  ConnectionID Client::ConnectToServer(...)
    + WorkerNode::RegisterServerHandlers lambda
    + NetworkNode::ConnectToPeer
    + null m_client.get() deref

Fix: construct m_client first. The server starts only after the client is
ready, so any handler invocation that needs the client finds a live
unique_ptr.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The AppendCallback head-record probe at ExtraDynamicSearcher.h:538 used
MaxTimeout, which is std::chrono::microseconds::max(). Combined with the
gRPC channel saturation we hit during BATCH 1 cross-node drain, the
BlockingUnaryCall wedges indefinitely (deadlines don't fire under load,
attempt 0 never returns, retry budget never reached).

Fix: pass an explicit 5s timeout for this best-effort probe. Failure
already handles the failure path by injecting the head record. A
duplicate head record is benign — MergePostings dedups on the first
VID==headID match in the merged posting list, so a second entry is just
slight wasted bytes.

Note: this fixes one specific path. The general gRPC-deadline-starvation
problem (BATCH 1 receiver-pool stalls with all 32 sub-workers in
gpr_cv_wait) needs a watchdog around RawGet/RawPut (Bug 32d, separate
commit).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
End-of-bulk hangs persisted after Bug 32 (deadline) and Bug 32c
(AppendCallback head-probe) — gdb of v28c/v29 driver showed receivers
stuck in BlockingUnaryCallImpl<RawGet/RawPut> for 10+ minutes despite
ctx.set_deadline(15s). gRPC's deadline timer apparently never fired
under the saturated callback-driven Append path.

Bug 32d adds an explicit application-level watchdog around every
BlockingUnaryCall: each call now runs in std::async and the calling
thread fut.wait_for(budget+3s grace); on timeout we ctx.TryCancel().
All 19 RawGet/RawPut/RawDelete/RawScan/RawBatchPut sites converted via
sed; 1 RawScan in StartToScan special-cased to add an explicit 15s
local timeout (no enclosing variable existed).

V29 BATCH 0 query: recall=0.876, p50=19ms (target ≥0.78 / 30ms ✅).
~6ms overhead per call from std::async thread spawn.

V29 BATCH 1: still hung at end-of-bulk. fut.wait_for did time out (gdb
threads cycle), but fut.get() blocks indefinitely because TryCancel
also can't progress under whatever wedges the channel — see Bug 32e
follow-up that adds gRPC keepalive and reduces sub-worker fan-out.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Bug 32d watchdog could not unblock end-of-bulk hangs because both
ctx.set_deadline() and ctx.TryCancel() failed to fire on the wedged
channel — gdb of the v29 driver showed grpc internal threads stuck in
epoll_wait(timeout=-1), meaning no timer was ever scheduled to wake
them.

Two complementary changes:

  1. ChannelArguments now sets HTTP/2 keepalive: PING every 10s,
     teardown if no PING ack within 5s, allow PING with zero active
     streams. This is the only mechanism that reliably forces grpc to
     mark a stuck channel UNAVAILABLE so RPCs return error and our
     outer retry loop can fire.

  2. HandleBatchAppendRequest / HandleBatchPutRequest sub-worker fan-
     out reduced from 4 → 2. With 2-way fan-out the channel sees half
     the in-flight BlockingUnaryCalls per receiver pool, which on its
     own may avoid the wedge the v29 hang exposed; combined with
     keepalive we have belt-and-braces.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Bug 32e cut HandleBatchAppendRequest/Put receiver fan-out from 4 to 2,
which was the wrong direction: the worker's overload comes from its
local AppendThread/InsertThread pools (~40 threads), not from the
receiver pool (4 threads, ~5% of total). Halving the receiver pool
only doubled per-chunk processing time on the receiving side, pushing
the driver's 180s wait_for past timeout and causing v30/v31 BATCH 1
end-of-bulk hangs.

This commit rebalances within the worker's 48-thread budget:
- AppendThreadNum:           32 -> 16  (template ini)
- NumInsertThreads:           8 -> 4   (template ini)
- HandleBatchAppendRequest:   2 -> 8   (RemotePostingOps.h)
- HandleBatchPutRequest:      2 -> 8   (RemotePostingOps.h)

Net active TiKV concurrency on worker drops from ~42 to ~32, while
receiver throughput quadruples. A 50k-item BatchAppend chunk now
drains in ~62s typical (vs ~250s with pool=2), comfortably under the
180s wait_for timeout in SendBatchRemoteAppendChunk.

Also adds LL_Info instrumentation around SendBatchRemoteAppendChunk's
wait_for so we can confirm slow-vs-deadlock at end-of-bulk.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Distributed BATCH 1 recall stuck at 0.62 in v32 even though latency and
BATCH 0 recall met targets. Diagnosis from v32 logs:

  driver layer-1 BKTree:  117K samples, RMWs=451,        splits=0
  worker layer-1 BKTree:  657K samples, RMWs=4,197,126,  splits=273
  global alive heads:     200K

Driver's BKTree (and underlying m_pSamples) is missing ~83K of the 200K
alive heads — its query-time head index is blind to ~40% of the corpus,
which matches the observed recall drop. Underlying mechanism is that
BroadcastHeadSync was fire-and-forget: when the per-peer SendPacket
completion reported failure, entries were logged at LL_Debug and
dropped forever. Under v32's worker overload, many sends fail and the
peer's headIndex / m_pSamples diverge.

This commit:

1. RemotePostingOps: counters + per-peer retry queue
   - Atomic counters for broadcast entries, send OK / fail, recv
     entries, applied Add/Delete, retry enqueued / succeeded /
     dropped.
   - Per-peer HeadSyncBacklog (cap 256K entries) holds entries when
     a send completes with success=false; a dedicated retry thread
     periodically (every 500ms, tunable via SPTAG_HEADSYNC_RETRY_INTERVAL_MS)
     re-broadcasts up to 1024 entries per peer.
   - DumpHeadSyncStats() called at every layer-N ALL_DONE boundary +
     every 30s from the retry thread, so we can diff sender/receiver
     and detect any remaining gap.

2. ExtraDynamicSearcher Stage 2B fallback (GatherBarrier silent drop)
   - When a remote FetchPostings batch fails (rf.ok=false), instead
     of silently skipping ~rf.items.size() postings, fall back to
     local TiKV reads for those headIDs. Driver and worker share
     the same TiKV cluster so the routing was an optimization, not
     a correctness boundary.
   - Adds [v33] LL_Warning log + per-batch fallback hit counter so
     we can see when the path triggers.

3. ExtraDynamicSearcher worker FetchPostingsCallback diagnostic
   - Distinguish ErrorCode::Key_NotFound (legitimate empty posting)
     from any other failure inside GetPostingFromDB; log
     LL_Warning with fail / notFound counts so transient TiKV
     errors are no longer indistinguishable from empty-key misses.

4. WorkerNode forward methods for DumpHeadSyncStats /
   GetHeadSyncBacklogSize / DrainHeadSyncBacklog /
   NoteHeadSyncApplyAdd / NoteHeadSyncApplyDelete.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Root cause of BATCH 1 recall regression (v32 0.62 / v33 0.60 vs target
0.78): RemotePostingOps held SINGLE-SLOT callbacks (m_appendCallback,
m_headSyncCallback, m_remoteLockCallback, m_putPostingCallback,
m_fetchPostingsCallback). With Layers=2, each layer's
ExtraDynamicSearcher::SetWorker overwrites the previous; layer 1's
SetWorker runs last so all five callbacks captured layer-1's `this`
and m_layer=1. Every incoming RemoteAppend/HeadSync/Lock/Put/Fetch
event from peers — regardless of which layer it originated from —
routed to layer-1's lambda and called AddHeadIndex(..., m_layer+1=2,
...) which routes to the BKT (Index.h:246-269 fallback). Layer-0 head
VIDs polluted the BKT (m_pSamples is append-only, so churn caused
unbounded growth: worker BKT 327K mid-batch tracking v32's 657K final).

Forensics confirm HeadSync delivery is 100% correct (v33 counters:
broadcast 6720, send_ok 6720, send_fail 0, recv 533580). The 'loss'
suspected from v32 was a counter artifact (dispatcher counted as a
peer in m_nodeAddrs leading to double-count on send). Fix is therefore
NOT delivery — it is dispatch routing.

Generic per-layer design (supports >2 layers):
- DistributedProtocol.h: add std::int32_t m_layer = 0 to all 5 wire
  formats. Bumped MirrorVersion 0->1 on the four formats with a
  preamble (RemoteAppendRequest, RemoteLockRequest,
  RemotePutPostingRequest, RemoteFetchPostingsRequest) with conditional
  read on mirrorVer >= 1 so legacy packets default to layer=0.
  HeadSyncEntry has no preamble; appended unconditionally (driver and
  worker rebuilt monolithically).
- RemotePostingOps.h: replaced single-slot callbacks with
  std::vector<Callback> indexed by layer. EnsureLayerSlot_NoLock(layer)
  lazily grows. Owners array is std::vector<std::atomic<const void*>>
  with manual element migration on growth (atomic isn't movable). All
  5 Handle*Request dispatch sites look up by req.m_layer; missing
  callbacks emit LL_Warning rather than crash.
- WorkerNode.h: pass-throughs take int layer first arg. Default
  layer-agnostic Put/Fetch installed at layer 0. Lookup helpers fall
  back to layer 0 for PutPosting/FetchPostings only (layer-agnostic
  raw m_db ops; build-receiver mode still works without ES binding).
  Append/HeadSync/RemoteLock have NO fallback (layer semantics are
  required for correct routing).
- ExtraDynamicSearcher.h: SetWorker passes m_layer to all 5 setters,
  ClaimCallbackOwnership and ClearCallbacksIfOwner. All call sites
  set req.m_layer = m_layer before sending (4 QueueRemoteAppend, 2
  HeadSync entry construction blocks, SendRemoteFetchPostings,
  SendRemoteLock RAII guard).

This makes the per-layer callback routing the source of truth: the
dispatch table is keyed by layer, the captured `this` in each lambda
is the right instance for that layer, and the wire format carries
the layer so the receiver can route correctly without inferring it.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…it lock storm

Root cause:
- AppendBatchAsync / Append() had no ownership check. For splits with
  ~119 reassign-target heads, ~50% are remote-owned. The lock pipeline
  in AddIndexAsyncSingleKey acquired m_rwLocks[headID] for ALL of them
  (held across Pass 1-4, blocking 16 concurrent split workers), then
  did MultiGet against LOCAL TiKV (returning NotFound for remote heads),
  then Pass-4 serial Append() retries that wrote the new bytes under the
  remote head's key, creating orphan data the real owner never sees.
- Effect: 99-225s per split (avg) vs 2.2s on 1-node = 45-100x slower.

Fix (two complementary guards):
1. AppendBatchAsync entry-time pre-filter: when worker is enabled,
   partition headAppends into local vs remote; remote -> QueueRemoteAppend
   immediately; local-only subset flows through existing lock pipeline.
2. Append() entry-time defensive check mirroring the same routing.
   Catches direct callers that bypass AppendBatchAsync (ReassignAsync,
   recursive Split, Pass-4 sync-retry fallback).

Consistent with existing ownership checks in Split (line 996),
MergePostings (line 1714), AddIndex distributed path (line 3782).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… storm

In 2-node sharded mode, each node only writes half of layer-1 posting
data locally. This makes layer-1 postings unnaturally small (<10
entries), so AsyncMergeInSearch trigger at SearchIndex
(realNum<=mergeThreshold) fires on nearly every visited layer-1 head:
  1-node BATCH 1: 182 layer-1 merges total
  2-node BATCH 1: 14M (driver) + 18M (worker) = 32M layer-1 merges

That floods m_splitThreadPool with noop merge jobs (Bug-11 fast path
drops 50% as remote-owned), starving real layer-0 split/append work.
Layer 1+ in SPANN holds BKT-structural data; merge-on-small is
meaningless there.

Fix: in distributed mode gate MergeAsync to (a) leaf layer (m_layer==0)
(b) locally-owned head. 1-node behavior preserved.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…sync

In sharded distributed mode, AddIndex was synchronously calling Append()
per vector for the ~50% of selections owned by the local node. Each
Append() does a TiKV Get+Put roundtrip (~8ms), and AddIndex runs on a
single insert thread, capping pre-split insert throughput at ~125/s/node
vs 466/s for the 1-node baseline (which batches all appends via
AppendBatchAsync and fan-outs through m_splitThreadPool).

This change mirrors the 1-node path: accumulate local-owned appends into
a headID->buffer map and call AppendBatchAsync once at end-of-batch.
Remote-owned appends continue to flow through QueueRemoteAppend.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move 2-node from independent-PD-per-node to a single PD-raft group with
max-replicas=1. Compute nodes become stateless TiKV clients: PD routes
each region read/write to whichever store owns it, so the distributed
routing layer no longer needs cross-compute fetch RPCs.

Files

run_distributed.sh
  - tikv_start: build a shared initial-cluster and pd-endpoints list;
    every PD/TiKV joins the same raft group.
  - PD readiness now waits for the expected member count, not just
    /pd/api/v1/members reachability.
  - Fix max-replicas POST: previous code used GET-only /config/replicate
    and silently failed. Now POSTs to /pd/api/v1/config, verifies via
    GET, retries up to 5x. (Still racey if PD raft is slow to settle;
    manual workaround is to POST after Phase 0.)
  - tikv_switch_to_nocache mirrors the shared pd-endpoints layout.

SPFreshTest.cpp
  - BenchmarkFromConfig now sets TiKVPDAddresses to the FULL endpoint
    list (was per-worker local PD only). Same for the build-distribute
    receiver path.

cluster_2node.conf
  - Documentation block on shared raft mode.

tikv.toml (server tuning, v41)
  - apply-pool-size 4 -> 32, store-pool-size 4 -> 16: apply pool was the
    primary write-amp source (TiKV pegged at ~8/96 cores while client
    ops queued). Moves us to ~10 cores per node under burst.
  - grpc-concurrency 16 -> 32, grpc-memory-pool-quota 8GB -> 16GB.
  - scheduler-worker-pool-size 16 (new).
  - readpool.unified max=32 min=8 (new): cap reads so they don't steal
    CPU from writes.
  - max-background-flushes=8, raft-write-batch-size=1MB (new).
  - defaultcf write-buffer-size 512MB -> 1GB, max-write-buffer-number
    5 -> 8, level0 slowdown/stop 28/40 -> 40/60.

ExtraDynamicSearcher.h (HeadSync top-only + remote-fetch removal)
  - Broadcast HeadSync for split/merge only when the head update targets
    the in-memory BKT (m_headIndex->GetDiskIndex(m_layer+1) == nullptr).
    Lower-layer head updates already write to shared TiKV via the local
    DiskIndex chain and are visible to every compute, so broadcasting
    them duplicates writes and can pick a different posting if peer BKT
    state diverges.
  - MultiGet/MultiScan no longer fan out SendRemoteFetchPostings;
    everything goes through the local PD-routed TiKVIO path.

WorkerNode.h (auto-flush + per-node inflight cap)
  - QueueRemoteAppend auto-flushes per node once the per-node queue
    reaches kAutoFlushThreshold=50000 (was: hold everything until
    end-of-batch FlushRemoteAppends, then serial drain). Up to
    kMaxInflightPerNode=4 chunks may be in flight per node so a
    producer burst (split fan-out, reassign wave) can saturate the
    receiver bg-executor pool.
  - FlushRemoteAppends waits for any straggler auto-flush to drain
    before sending the final tail. Per-node mutex on the end-of-batch
    sender keeps tail-to-same-node sends ordered.

RemotePostingOps.h
  - kChunkSize history block; current setting 50000. v42 (10k) was
    throughput-best (906/s) but during-insert p50 222ms; v43 (50k)
    trades throughput (-22% -> 704/s) for during-insert p50 (-36% ->
    141ms) and post-insert r1 QPS 47 -> 85. v44 (100k) blew up tail
    drain: a single 100k chunk took 116s on the receiver -> 40+ min
    end-of-batch drain (vs 8 min at 50k). 50k is the sweet spot.
  - Per-chunk sub-worker fan-out comment: kept at 8 (v39 baseline);
    combined with cap=4 inflight yields TiKV-side concurrency 4*8=32.

benchmark_insert_dominant_2node.ini
  - VersionCacheTTLMs=0, VersionCacheMaxChunks=0 for nocache mode.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Background

ExtraDynamicSearcher::AddIndex had two near-identical per-vector loops:
one for the distributed (router-enabled) path with cross-node ownership
routing, and one for the single-node fallback with the WAL hook.
PutAssignment was only called in the second loop, so the in-flight
queue / RPC pipeline in the distributed path bypassed the WAL even when
EnableWAL=true.

Change

Collapse the two loops into one. RNGSelection, version bump, Serialize,
and the WAL PutAssignment happen exactly once per vector, before any
routing decision. The routing branch (m_worker->GetOwner ->
QueueRemoteAppend) lives inside the per-replica inner loop and only
fires when m_worker && m_worker->IsEnabled(). Local-owned (and
non-routed) heads accumulate into a single headAppends map and are
flushed via AppendBatchAsync at the end.

VID encoding

routed mode serializes LocalToGlobalVID(VID) for cross-node uniqueness;
non-routed mode keeps the local VID. Same behavior as before, just
expressed via a single payloadVID local.

Durability semantics

Gated by m_opt->m_enableWAL (default false in
ParameterDefinitionList.h), so this commit is a no-op at runtime for
existing benchmarks. When EnableWAL=true is set, AddIndex Success now
implies that every vector — including remote-owned ones — has been
persisted to the local PersistentBuffer (RocksDB write, default
sync=false) before the producer returns. The in-memory append queue
and BatchAppendChunk RPC pipeline can then be replayed from the WAL on
restart.

Note: PersistentBuffer's underlying RocksDBIO::Put still uses default
WriteOptions (sync=false), so this protects against producer crash but
not power loss. Upgrading to options.sync=true is a follow-up and
should be measured against a durable-throughput target separately.

Replay caveat

PersistentBuffer's existing replay (StartToScan/NextToScan) was written
for local-VID-only records. When EnableWAL=true is turned on for a
distributed run, replay will need to be made VID-mode aware (decode the
serialized record and re-dispatch via the same routed/non-routed split
this commit just unified). Left as a follow-up since the immediate goal
was to have the code path present without measuring overhead.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ibuted-to-tikv

Merged SearchStats latency breakdown support into distributed query paths.
Both distributed (multi-node dispatch) and single-node search now use
spannIndex->SearchIndex(result, &searchStats) for per-query latency breakdown.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@TerrenceZhangX TerrenceZhangX marked this pull request as ready for review May 14, 2026 10:54
TerrenceZhangX and others added 5 commits May 14, 2026 13:10
…batch version checks)

Conflicts resolved:
  - .gitignore: kept both sides' ignore patterns
  - Test/src/SPFreshTest.cpp: kept HEAD's distributed-mode BenchmarkQueryPerformance
    routing (dispatcher broadcast + per-node query partition); HEAD already has
    the SearchBreakdown JSON output block that qiazh added at a different
    location, so qiazh's duplicate was dropped.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
gRPC dedupes subchannels by (address, channel-args). With identical args
across all kStubPoolSize stubs, every stub multiplexed onto a SINGLE TCP
socket — serviced by ONE grpc-server worker thread regardless of the
TiKV-side grpc-concurrency setting. Under high client concurrency this
single server thread saturates near 85% CPU, queues all incoming RPCs,
and the 100ms client deadline expires while requests sit in the server
completion queue (manifests as a flood of "Deadline Exceeded" even
though RocksDB engine_get p99 is <100us and there is no write stall).

Setting GRPC_ARG_CHANNEL_POOL_DOMAIN to a per-stub unique string
("sptag-stub-" + index) puts each channel in its own subchannel pool,
forcing a separate TCP connection per stub. After this fix:
  * TCP connections to TiKV: 1 → 96 (48 stubs × {TiKV, PD})
  * grpc-server-0 CPU: 84% → 3.6% (load balanced across all 32 threads)
  * Deadline Exceeded / retry exhausted / read postings fail: all 0
during a 26-min insert_dominant bench (1M base + 1M insert).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When MergePostings runs locally and the surviving head wins over a remote
candidate's posting, the initiator now sends a RemoteDeletePosting to the
owner node which deletes its posting, removes the local head entry, and
broadcasts a HeadSync(Delete) to peers so other BKTs drop the head.
Idempotent on the owner side (already-deleted returns Success).

Also adds HeadSync aggregation during RefineIndex: each MergePostings
firing its own single-entry BroadcastHeadSync produces
|mergelist| * (numNodes-1) tiny RPCs at RefineIndex time. The new
m_refineHeadSyncBuffer collects them so RefineIndex flushes one
batched broadcast after all submitted merges drain.

Files:
  * Packet.h           - DeletePostingRequest/Response opcodes (0x10)
  * DistributedProtocol.h - RemoteDeletePostingRequest/Response structs
  * RemotePostingOps.h - DeletePostingCallback registration + send path
  * WorkerNode.h       - SendRemoteDeletePosting / SetDeletePostingCallback
  * ExtraDynamicSearcher.h - integrate delete-posting callback,
                             HeadSync aggregation, RefineIndex flush

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
  * Add benchmark_10m_template.ini: 9M base + 1M insert (1.2GB head index,
    ~13 min build). Useful intermediate between insert_dominant (1M+1M,
    ~10 min total) and 100m (99M+1M, ~5h total) for validating 1node→2node
    scaling without paying the full 100M build cost.

  * Fix benchmark_100m_template.ini + benchmark_insert_dominant_template.ini
    dataset paths from /mnt/data/sift1b/{base.1B,query.public.10K}.u8bin
    (reference machine layout) to /mnt/nvme/sift1b/{bigann_base,query.10K}.u8bin
    (our layout). insert_dominant scale fixed at 1M base + 1M insert (the
    1M+10M variant was too long to drive iteration).

  * cluster_{2,3}node.conf: document image-ref overrides (tikv_image,
    pd_image, helper_image) for environments that cannot push to the
    registry — pull-only ACR usage now self-explained.

  * run_distributed.sh (+181/-40): assorted benchmark driver fixes
    including better TiKV start/stop ordering, SKIP_HEAD_BUILD support
    at 100M scale, and improved logging.

  * .gitignore: ignore generated benchmark_*_{1,2,3}node.ini snapshots
    (created at runtime from templates) and tikv.toml.v*bak backups.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
These three docs predate the current TiKV-backed distributed design and
have diverged from the actual implementation:

  * distributed-bugs.md          - early bug list, no longer accurate
  * distributed-flow-diagrams.md - stride-sharding-era flow diagrams
  * distributed-job-routing.md   - superseded job-routing design

The authoritative distributed design now lives in
docs/TiKVDistributedVersionMapDesign.md (kept).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants