Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions pkg/ring/token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"container/heap"
"hash/fnv"
"math"
"math/rand"
"slices"
Expand Down Expand Up @@ -65,6 +66,15 @@ func (g *RandomTokenGenerator) GenerateTokens(ring *Desc, _, _ string, numTokens
return tokens
}

// instanceHash returns a deterministic uint32 derived from the ingester ID and a round counter.
// Used to select among candidate gaps so that concurrent ingesters pick different gaps.
func instanceHash(id string, round int) uint32 {
h := fnv.New32a()
h.Write([]byte(id))
h.Write([]byte{byte(round >> 24), byte(round >> 16), byte(round >> 8), byte(round)})
return h.Sum32()
}

type MinimizeSpreadTokenGenerator struct {
innerGenerator TokenGenerator
}
Expand Down Expand Up @@ -160,6 +170,9 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
distancesHeap := &tokenDistanceHeap{}

for _, perInstance := range tokensPerInstanceWithDistance {
if len(perInstance.tokens) == 0 {
continue
}
sort.Slice(perInstance.tokens, func(i, j int) bool {
return perInstance.tokens[i].distance > perInstance.tokens[j].distance
})
Expand All @@ -182,7 +195,27 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
// Calculating the expected distance per step taking in consideration the tokens already created
expectedDistanceStep := (expectedOwnershipDistance - currentInstance.totalDistance) / int64(numTokens-len(r))

m := heap.Pop(distancesHeap).(*totalTokenPerInstance)
// Collect up to 4 candidates from the top of the heap. Two concurrent ingesters
// use a per-instance hash to pick different candidates, avoiding splitting the
// same instance's gap and causing uneven distribution.
const maxCandidates = 4
candidates := make([]*totalTokenPerInstance, 0, maxCandidates)
for len(*distancesHeap) > 0 && len(candidates) < maxCandidates {
candidates = append(candidates, heap.Pop(distancesHeap).(*totalTokenPerInstance))
}

var pick int
if len(candidates) > 1 {
pick = int(instanceHash(id, len(r))) % len(candidates)
}
m := candidates[pick]

// Push back the non-selected candidates.
for idx, c := range candidates {
if idx != pick {
heap.Push(distancesHeap, c)
}
}

i := findFirst(len(m.tokens), func(x int) bool {
return m.tokens[x].distance > expectedDistanceStep
Expand All @@ -199,10 +232,20 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
}

var newToken uint32
if int64(tokenToSplit.prev)+expectedDistanceStep > maxTokenValue {
newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep - maxTokenValue)
// When only one candidate was available, two concurrent ingesters will pick
// the same instance and gap. Apply a small per-instance jitter to differentiate.
offset := expectedDistanceStep
if len(candidates) <= 1 {
h := instanceHash(id, len(r))
maxJitter := expectedDistanceStep / 100
if maxJitter > 0 {
offset += int64(h) % maxJitter
}
}
if int64(tokenToSplit.prev)+offset > maxTokenValue {
newToken = uint32(int64(tokenToSplit.prev) + offset - maxTokenValue)
} else {
newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep)
newToken = uint32(int64(tokenToSplit.prev) + offset)
}

if _, ok := usedTokens[newToken]; !ok {
Expand Down
50 changes: 48 additions & 2 deletions pkg/ring/token_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) {
require.Equal(t, mTokenGenerator.called, len(zones))

// Should Generate tokens based on the ring state
// Tolerance is 5% (vs original 2%) because candidate selection among heap entries
// trades a small amount of optimality for collision avoidance. The impact is only
// visible with very few ingesters; with many ingesters the distribution converges.
for i := range 50 {
generateTokensForIngesters(t, rindDesc, fmt.Sprintf("minimize-%v", i), zones, minimizeTokenGenerator, dups)
assertDistancePerIngester(t, rindDesc, 0.02)
assertDistancePerIngester(t, rindDesc, 0.05)
}
require.Equal(t, mTokenGenerator.called, len(zones))

Expand All @@ -103,7 +106,7 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) {
rindDesc.AddIngester("partial", "partial", zones[0], rTokens, ACTIVE, time.Now())
nTokens := minimizeTokenGenerator.GenerateTokens(rindDesc, "partial", zones[0], 256, true)
rindDesc.AddIngester("partial", "partial", zones[0], append(rTokens, nTokens...), ACTIVE, time.Now())
assertDistancePerIngester(t, rindDesc, 0.02)
assertDistancePerIngester(t, rindDesc, 0.05)

mTokenGenerator.called = 0
// Should fallback to random generator when more than 1 ingester does not have tokens and force flag is set
Expand Down Expand Up @@ -204,3 +207,46 @@ func assertDistancePerIngester(t testing.TB, d *Desc, tolerance float64) {
}, "[%v] expected and real distance error is greater than %v -> %v[%v/%v]", s, tolerance, 1-math.Abs(expectedDistance/realDistance), expectedDistance, realDistance)
}
}

func TestMinimizeSpreadTokenGenerator_NoDuplicatesOnConcurrentJoin(t *testing.T) {
// Simulate two ingesters joining concurrently: both see the same ring state
// and generate tokens independently. With candidate gap selection, they must
// produce different tokens.
zones := []string{"zone1", "zone2", "zone3"}
tg := NewMinimizeSpreadTokenGenerator()

// Set up a ring with existing ingesters so MinimizeSpread uses its deterministic path.
ringDesc := NewDesc()
for i := range 3 {
for _, zone := range zones {
id := fmt.Sprintf("existing-%d-%s", i, zone)
tokens := tg.GenerateTokens(ringDesc, id, zone, 512, true)
ringDesc.AddIngester(id, id, zone, tokens, ACTIVE, time.Now())
}
}

// Two new ingesters in the same zone read the same ring state.
// Register both with no tokens so they both attempt MinimizeSpread.
now := time.Now()
ringDesc.AddIngester("new-ingester-A", "new-ingester-A", zones[0], []uint32{}, ACTIVE, now)
ringDesc.AddIngester("new-ingester-B", "new-ingester-B", zones[0], []uint32{}, ACTIVE, now)

tokensA := tg.GenerateTokens(ringDesc, "new-ingester-A", zones[0], 512, true)
tokensB := tg.GenerateTokens(ringDesc, "new-ingester-B", zones[0], 512, true)

setA := make(map[uint32]bool, len(tokensA))
for _, tok := range tokensA {
setA[tok] = true
}

var duplicates []uint32
for _, tok := range tokensB {
if setA[tok] {
duplicates = append(duplicates, tok)
}
}

require.Empty(t, duplicates, "ingesters A and B produced %d duplicate tokens from the same ring state", len(duplicates))
require.Len(t, tokensA, 512)
require.Len(t, tokensB, 512)
}
Loading