diff --git a/Server/server.go b/Server/server.go index 2a37702..e044c06 100644 --- a/Server/server.go +++ b/Server/server.go @@ -1,9 +1,12 @@ package main import ( + "context" "fmt" + "time" "github.com/NeverENG/BanDB/network/banNet" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" "github.com/NeverENG/BanDB/service/ingesthook" @@ -39,6 +42,9 @@ func main() { server.SetConnStartFunc(router.OnConnStart) server.SetConnStopFunc(router.OnConnStop) + // 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态 + metrics.StartLogger(context.Background(), 10*time.Second) + // 启动服务 fmt.Println("Starting Server...") fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy()) diff --git a/Server/server_pprof.go b/Server/server_pprof.go index 31c76ce..ed08997 100644 --- a/Server/server_pprof.go +++ b/Server/server_pprof.go @@ -3,11 +3,14 @@ package main import ( + "context" "fmt" "net/http" _ "net/http/pprof" + "time" "github.com/NeverENG/BanDB/network/banNet" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" "github.com/NeverENG/BanDB/service/ingesthook" @@ -49,6 +52,9 @@ func main() { server.SetConnStartFunc(router.OnConnStart) server.SetConnStopFunc(router.OnConnStop) + // 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态 + metrics.StartLogger(context.Background(), 10*time.Second) + // 启动服务 fmt.Println("Starting Server...") fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy()) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..d29d654 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,115 @@ +// Package metrics 提供零依赖的进程内可观测性:一组原子计数器 + 仪表回调, +// 以「周期性 slog 快照」作为暴露出口——headless 边缘设备直接 tail 日志即可观测, +// 无需开端口、无需 Prometheus/Grafana 等外部基础设施。 +// +// 设计上把「埋点」与「暴露」解耦:业务代码只管在此处累加计数,未来若要再接 +// HTTP /metrics 或 Prometheus 出口,只需新增读取出口,无需改动任何埋点。 +package metrics + +import ( + "context" + "log/slog" + "sync/atomic" + "time" +) + +// 计数器:单调递增的累计量。各业务路径直接 .Add(1)。 +var ( + FramesDroppedMalformed atomic.Int64 // 被钩子按「畸形帧」丢弃 + FramesDroppedOversized atomic.Int64 // 被钩子按「value 超限」丢弃 + FramesDroppedNonMonotonic atomic.Int64 // 被钩子按「时间戳回退/重放」丢弃 + Writes atomic.Int64 // 成功写入(PUT)次数 + Reads atomic.Int64 // 读取(GET)次数 + Deletes atomic.Int64 // 成功删除(DEL)次数 + WriteErrors atomic.Int64 // 写入/删除失败次数 + BackpressureStalls atomic.Int64 // 写入被字节信用背压阻塞的次数 +) + +// 仪表:当前瞬时值,由持有者注册回调,快照时实时读取。 +var ( + memTableInflightFn atomic.Value // 存 func() int64 + memTableBudget atomic.Int64 +) + +// SetMemTableGauges 注册 MemTable 未刷盘字节数的实时读取回调与字节预算。 +// 通常在 MemTable 构造时调用一次。 +func SetMemTableGauges(inflight func() int64, budget int64) { + if inflight != nil { + memTableInflightFn.Store(inflight) + } + memTableBudget.Store(budget) +} + +func memTableInflight() int64 { + if f, ok := memTableInflightFn.Load().(func() int64); ok { + return f() + } + return 0 +} + +// Snapshot 是某一时刻全部指标的取值。 +type Snapshot struct { + DroppedMalformed int64 + DroppedOversized int64 + DroppedNonMonotonic int64 + Writes int64 + Reads int64 + Deletes int64 + WriteErrors int64 + BackpressureStalls int64 + MemTableInflightBytes int64 + MemTableBudgetBytes int64 +} + +// Take 读取当前各指标,组成一份快照。 +func Take() Snapshot { + return Snapshot{ + DroppedMalformed: FramesDroppedMalformed.Load(), + DroppedOversized: FramesDroppedOversized.Load(), + DroppedNonMonotonic: FramesDroppedNonMonotonic.Load(), + Writes: Writes.Load(), + Reads: Reads.Load(), + Deletes: Deletes.Load(), + WriteErrors: WriteErrors.Load(), + BackpressureStalls: BackpressureStalls.Load(), + MemTableInflightBytes: memTableInflight(), + MemTableBudgetBytes: memTableBudget.Load(), + } +} + +// LogSnapshot 用默认 slog logger 打印一行指标快照。 +func LogSnapshot() { + s := Take() + slog.Info("metrics", + "dropped_malformed", s.DroppedMalformed, + "dropped_oversized", s.DroppedOversized, + "dropped_non_monotonic", s.DroppedNonMonotonic, + "writes", s.Writes, + "reads", s.Reads, + "deletes", s.Deletes, + "write_errors", s.WriteErrors, + "backpressure_stalls", s.BackpressureStalls, + "memtable_inflight_bytes", s.MemTableInflightBytes, + "memtable_budget_bytes", s.MemTableBudgetBytes, + ) +} + +// StartLogger 启动后台 goroutine,每隔 interval 打印一次指标快照, +// 直到 ctx 取消。interval<=0 时不启动。 +func StartLogger(ctx context.Context, interval time.Duration) { + if interval <= 0 { + return + } + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + LogSnapshot() + } + } + }() +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000..7b96fac --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,57 @@ +package metrics + +import "testing" + +// 计数器为进程级全局,本测试以「增量」断言,避免依赖其它测试是否先跑过。 +func TestCountersAndSnapshot(t *testing.T) { + before := Take() + + FramesDroppedMalformed.Add(2) + FramesDroppedNonMonotonic.Add(1) + Writes.Add(5) + WriteErrors.Add(1) + BackpressureStalls.Add(3) + + after := Take() + + if d := after.DroppedMalformed - before.DroppedMalformed; d != 2 { + t.Fatalf("DroppedMalformed 增量应为 2,得到 %d", d) + } + if d := after.DroppedNonMonotonic - before.DroppedNonMonotonic; d != 1 { + t.Fatalf("DroppedNonMonotonic 增量应为 1,得到 %d", d) + } + if d := after.Writes - before.Writes; d != 5 { + t.Fatalf("Writes 增量应为 5,得到 %d", d) + } + if d := after.WriteErrors - before.WriteErrors; d != 1 { + t.Fatalf("WriteErrors 增量应为 1,得到 %d", d) + } + if d := after.BackpressureStalls - before.BackpressureStalls; d != 3 { + t.Fatalf("BackpressureStalls 增量应为 3,得到 %d", d) + } +} + +func TestMemTableGauges(t *testing.T) { + // 未注册回调时仪表读 0,不应 panic。 + s := Take() + if s.MemTableInflightBytes != 0 { + t.Fatalf("未注册回调时 inflight 应为 0,得到 %d", s.MemTableInflightBytes) + } + + inflight := int64(8192) + SetMemTableGauges(func() int64 { return inflight }, 16384) + + s = Take() + if s.MemTableInflightBytes != 8192 { + t.Fatalf("inflight 应实时读回调值 8192,得到 %d", s.MemTableInflightBytes) + } + if s.MemTableBudgetBytes != 16384 { + t.Fatalf("budget 应为 16384,得到 %d", s.MemTableBudgetBytes) + } + + // 回调实时反映变化。 + inflight = 12288 + if got := Take().MemTableInflightBytes; got != 12288 { + t.Fatalf("inflight 应实时反映为 12288,得到 %d", got) + } +} diff --git a/service/ingesthook/filter.go b/service/ingesthook/filter.go index e437e39..e75ba26 100644 --- a/service/ingesthook/filter.go +++ b/service/ingesthook/filter.go @@ -14,6 +14,7 @@ import ( "sync" "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" ) @@ -54,10 +55,12 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction { key, value, ok := parsePut(req.GetMsgData()) if !ok { + metrics.FramesDroppedMalformed.Add(1) return banIface.HookDrop // 畸形帧:长度字段与实际数据不符 } if f.maxValueLen > 0 && len(value) > f.maxValueLen { + metrics.FramesDroppedOversized.Add(1) return banIface.HookDrop // 畸形帧:value 超过上限 } @@ -70,6 +73,7 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction { last, seen := f.lastTS[device] if seen && ts <= last { f.mu.Unlock() + metrics.FramesDroppedNonMonotonic.Add(1) return banIface.HookDrop // 回退/重放帧 } f.lastTS[device] = ts diff --git a/service/router.go b/service/router.go index 5796205..e781c75 100644 --- a/service/router.go +++ b/service/router.go @@ -5,6 +5,7 @@ import ( "log/slog" "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" ) @@ -113,10 +114,12 @@ func (r *Router) handlePut(data []byte, request banIface.IRequest) { if err := r.kv.Write(cmd); err != nil { slog.Error("[ERROR] handlePut: write failed", "error", err) + metrics.WriteErrors.Add(1) sendErr(request) return } + metrics.Writes.Add(1) sendOK(request) } @@ -134,6 +137,7 @@ func (r *Router) handleGet(data []byte, request banIface.IRequest) { key := data[4 : 4+keyLen] + metrics.Reads.Add(1) value, err := r.kv.Get(key) if err != nil { sendErr(request) @@ -171,10 +175,12 @@ func (r *Router) handleDelete(data []byte, request banIface.IRequest) { } if err := r.kv.Write(cmd); err != nil { + metrics.WriteErrors.Add(1) sendErr(request) return } + metrics.Deletes.Add(1) sendOK(request) } diff --git a/storage/zstorage/memtable.go b/storage/zstorage/memtable.go index 8dbf4b8..0cc7f34 100644 --- a/storage/zstorage/memtable.go +++ b/storage/zstorage/memtable.go @@ -10,6 +10,7 @@ import ( "github.com/NeverENG/BanDB/config" "github.com/NeverENG/BanDB/pkg/credit" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/storage/istorage" ) @@ -70,6 +71,9 @@ func NewMemTable() *MemTable { sst: NewSSTable(), credits: credit.New(config.G.MemTableMaxInflightBytes), } + // 注册未刷盘字节数仪表,供周期性指标快照实时读取。 + metrics.SetMemTableGauges(mt.InflightBytes, config.G.MemTableMaxInflightBytes) + go mt.FlushWorker() go mt.ListenCompactCh() @@ -191,7 +195,8 @@ func (m *MemTable) acquireCredit(n int64) { if m.credits.TryAcquire(n) { return } - m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 + metrics.BackpressureStalls.Add(1) // 快路径未命中,将触发刷盘并阻塞等待信用 + m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 m.credits.Acquire(n) }