Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Server/server.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package main

import (
"context"
"fmt"
"time"

"github.com/NeverENG/BanDB/network/banNet"
"github.com/NeverENG/BanDB/pkg/metrics"
"github.com/NeverENG/BanDB/pkg/proto"
"github.com/NeverENG/BanDB/service"
"github.com/NeverENG/BanDB/service/ingesthook"
Expand Down Expand Up @@ -39,6 +42,9 @@ func main() {
server.SetConnStartFunc(router.OnConnStart)
server.SetConnStopFunc(router.OnConnStop)

// 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态
metrics.StartLogger(context.Background(), 10*time.Second)

// 启动服务
fmt.Println("Starting Server...")
fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy())
Expand Down
6 changes: 6 additions & 0 deletions Server/server_pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"time"

"github.com/NeverENG/BanDB/network/banNet"
"github.com/NeverENG/BanDB/pkg/metrics"
"github.com/NeverENG/BanDB/pkg/proto"
"github.com/NeverENG/BanDB/service"
"github.com/NeverENG/BanDB/service/ingesthook"
Expand Down Expand Up @@ -49,6 +52,9 @@ func main() {
server.SetConnStartFunc(router.OnConnStart)
server.SetConnStopFunc(router.OnConnStop)

// 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态
metrics.StartLogger(context.Background(), 10*time.Second)

// 启动服务
fmt.Println("Starting Server...")
fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy())
Expand Down
115 changes: 115 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Package metrics 提供零依赖的进程内可观测性:一组原子计数器 + 仪表回调,
// 以「周期性 slog 快照」作为暴露出口——headless 边缘设备直接 tail 日志即可观测,
// 无需开端口、无需 Prometheus/Grafana 等外部基础设施。
//
// 设计上把「埋点」与「暴露」解耦:业务代码只管在此处累加计数,未来若要再接
// HTTP /metrics 或 Prometheus 出口,只需新增读取出口,无需改动任何埋点。
package metrics

import (
"context"
"log/slog"
"sync/atomic"
"time"
)

// 计数器:单调递增的累计量。各业务路径直接 .Add(1)。
var (
FramesDroppedMalformed atomic.Int64 // 被钩子按「畸形帧」丢弃
FramesDroppedOversized atomic.Int64 // 被钩子按「value 超限」丢弃
FramesDroppedNonMonotonic atomic.Int64 // 被钩子按「时间戳回退/重放」丢弃
Writes atomic.Int64 // 成功写入(PUT)次数
Reads atomic.Int64 // 读取(GET)次数
Deletes atomic.Int64 // 成功删除(DEL)次数
WriteErrors atomic.Int64 // 写入/删除失败次数
BackpressureStalls atomic.Int64 // 写入被字节信用背压阻塞的次数
)

// 仪表:当前瞬时值,由持有者注册回调,快照时实时读取。
var (
memTableInflightFn atomic.Value // 存 func() int64
memTableBudget atomic.Int64
)

// SetMemTableGauges 注册 MemTable 未刷盘字节数的实时读取回调与字节预算。
// 通常在 MemTable 构造时调用一次。
func SetMemTableGauges(inflight func() int64, budget int64) {
if inflight != nil {
memTableInflightFn.Store(inflight)
}
memTableBudget.Store(budget)
}

func memTableInflight() int64 {
if f, ok := memTableInflightFn.Load().(func() int64); ok {
return f()
}
return 0
}

// Snapshot 是某一时刻全部指标的取值。
type Snapshot struct {
DroppedMalformed int64
DroppedOversized int64
DroppedNonMonotonic int64
Writes int64
Reads int64
Deletes int64
WriteErrors int64
BackpressureStalls int64
MemTableInflightBytes int64
MemTableBudgetBytes int64
}

// Take 读取当前各指标,组成一份快照。
func Take() Snapshot {
return Snapshot{
DroppedMalformed: FramesDroppedMalformed.Load(),
DroppedOversized: FramesDroppedOversized.Load(),
DroppedNonMonotonic: FramesDroppedNonMonotonic.Load(),
Writes: Writes.Load(),
Reads: Reads.Load(),
Deletes: Deletes.Load(),
WriteErrors: WriteErrors.Load(),
BackpressureStalls: BackpressureStalls.Load(),
MemTableInflightBytes: memTableInflight(),
MemTableBudgetBytes: memTableBudget.Load(),
}
}

// LogSnapshot 用默认 slog logger 打印一行指标快照。
func LogSnapshot() {
s := Take()
slog.Info("metrics",
"dropped_malformed", s.DroppedMalformed,
"dropped_oversized", s.DroppedOversized,
"dropped_non_monotonic", s.DroppedNonMonotonic,
"writes", s.Writes,
"reads", s.Reads,
"deletes", s.Deletes,
"write_errors", s.WriteErrors,
"backpressure_stalls", s.BackpressureStalls,
"memtable_inflight_bytes", s.MemTableInflightBytes,
"memtable_budget_bytes", s.MemTableBudgetBytes,
)
}

// StartLogger 启动后台 goroutine,每隔 interval 打印一次指标快照,
// 直到 ctx 取消。interval<=0 时不启动。
func StartLogger(ctx context.Context, interval time.Duration) {
if interval <= 0 {
return
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
LogSnapshot()
}
}
}()
}
57 changes: 57 additions & 0 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package metrics

import "testing"

// 计数器为进程级全局,本测试以「增量」断言,避免依赖其它测试是否先跑过。
func TestCountersAndSnapshot(t *testing.T) {
before := Take()

FramesDroppedMalformed.Add(2)
FramesDroppedNonMonotonic.Add(1)
Writes.Add(5)
WriteErrors.Add(1)
BackpressureStalls.Add(3)

after := Take()

if d := after.DroppedMalformed - before.DroppedMalformed; d != 2 {
t.Fatalf("DroppedMalformed 增量应为 2,得到 %d", d)
}
if d := after.DroppedNonMonotonic - before.DroppedNonMonotonic; d != 1 {
t.Fatalf("DroppedNonMonotonic 增量应为 1,得到 %d", d)
}
if d := after.Writes - before.Writes; d != 5 {
t.Fatalf("Writes 增量应为 5,得到 %d", d)
}
if d := after.WriteErrors - before.WriteErrors; d != 1 {
t.Fatalf("WriteErrors 增量应为 1,得到 %d", d)
}
if d := after.BackpressureStalls - before.BackpressureStalls; d != 3 {
t.Fatalf("BackpressureStalls 增量应为 3,得到 %d", d)
}
}

func TestMemTableGauges(t *testing.T) {
// 未注册回调时仪表读 0,不应 panic。
s := Take()
if s.MemTableInflightBytes != 0 {
t.Fatalf("未注册回调时 inflight 应为 0,得到 %d", s.MemTableInflightBytes)
}

inflight := int64(8192)
SetMemTableGauges(func() int64 { return inflight }, 16384)

s = Take()
if s.MemTableInflightBytes != 8192 {
t.Fatalf("inflight 应实时读回调值 8192,得到 %d", s.MemTableInflightBytes)
}
if s.MemTableBudgetBytes != 16384 {
t.Fatalf("budget 应为 16384,得到 %d", s.MemTableBudgetBytes)
}

// 回调实时反映变化。
inflight = 12288
if got := Take().MemTableInflightBytes; got != 12288 {
t.Fatalf("inflight 应实时反映为 12288,得到 %d", got)
}
}
4 changes: 4 additions & 0 deletions service/ingesthook/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"

"github.com/NeverENG/BanDB/network/banIface"
"github.com/NeverENG/BanDB/pkg/metrics"
"github.com/NeverENG/BanDB/pkg/proto"
)

Expand Down Expand Up @@ -54,10 +55,12 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction {

key, value, ok := parsePut(req.GetMsgData())
if !ok {
metrics.FramesDroppedMalformed.Add(1)
return banIface.HookDrop // 畸形帧:长度字段与实际数据不符
}

if f.maxValueLen > 0 && len(value) > f.maxValueLen {
metrics.FramesDroppedOversized.Add(1)
return banIface.HookDrop // 畸形帧:value 超过上限
}

Expand All @@ -70,6 +73,7 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction {
last, seen := f.lastTS[device]
if seen && ts <= last {
f.mu.Unlock()
metrics.FramesDroppedNonMonotonic.Add(1)
return banIface.HookDrop // 回退/重放帧
}
f.lastTS[device] = ts
Expand Down
6 changes: 6 additions & 0 deletions service/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"

"github.com/NeverENG/BanDB/network/banIface"
"github.com/NeverENG/BanDB/pkg/metrics"
"github.com/NeverENG/BanDB/pkg/proto"
)

Expand Down Expand Up @@ -113,10 +114,12 @@ func (r *Router) handlePut(data []byte, request banIface.IRequest) {

if err := r.kv.Write(cmd); err != nil {
slog.Error("[ERROR] handlePut: write failed", "error", err)
metrics.WriteErrors.Add(1)
sendErr(request)
return
}

metrics.Writes.Add(1)
sendOK(request)
}

Expand All @@ -134,6 +137,7 @@ func (r *Router) handleGet(data []byte, request banIface.IRequest) {

key := data[4 : 4+keyLen]

metrics.Reads.Add(1)
value, err := r.kv.Get(key)
if err != nil {
sendErr(request)
Expand Down Expand Up @@ -171,10 +175,12 @@ func (r *Router) handleDelete(data []byte, request banIface.IRequest) {
}

if err := r.kv.Write(cmd); err != nil {
metrics.WriteErrors.Add(1)
sendErr(request)
return
}

metrics.Deletes.Add(1)
sendOK(request)
}

Expand Down
7 changes: 6 additions & 1 deletion storage/zstorage/memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/NeverENG/BanDB/config"
"github.com/NeverENG/BanDB/pkg/credit"
"github.com/NeverENG/BanDB/pkg/metrics"
"github.com/NeverENG/BanDB/storage/istorage"
)

Expand Down Expand Up @@ -70,6 +71,9 @@ func NewMemTable() *MemTable {
sst: NewSSTable(),
credits: credit.New(config.G.MemTableMaxInflightBytes),
}
// 注册未刷盘字节数仪表,供周期性指标快照实时读取。
metrics.SetMemTableGauges(mt.InflightBytes, config.G.MemTableMaxInflightBytes)

go mt.FlushWorker()
go mt.ListenCompactCh()

Expand Down Expand Up @@ -191,7 +195,8 @@ func (m *MemTable) acquireCredit(n int64) {
if m.credits.TryAcquire(n) {
return
}
m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞
metrics.BackpressureStalls.Add(1) // 快路径未命中,将触发刷盘并阻塞等待信用
m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞
m.credits.Acquire(n)
}

Expand Down
Loading