From 22703d3fd81855d5931d81260367433329125c8d Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 20:57:56 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(metrics):=20=E9=9B=B6=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E8=BF=9B=E7=A8=8B=E5=86=85=E6=8C=87=E6=A0=87=E2=80=94?= =?UTF-8?q?=E2=80=94=E5=8E=9F=E5=AD=90=E8=AE=A1=E6=95=B0=E5=99=A8+?= =?UTF-8?q?=E4=BB=AA=E8=A1=A8=E5=9B=9E=E8=B0=83+=E5=91=A8=E6=9C=9F=20slog?= =?UTF-8?q?=20=E5=BF=AB=E7=85=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 pkg/metrics:埋点与暴露解耦。计数器(丢帧/读写/背压)就地累加, 仪表(MemTable 未刷盘字节/预算)经回调实时读取,StartLogger 周期性用默认 slog 打一行快照——headless 边缘设备 tail 日志即可观测,无需端口/Prometheus。 未来可再接 HTTP 出口而无需改埋点。 Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/metrics/metrics.go | 115 ++++++++++++++++++++++++++++++++++++ pkg/metrics/metrics_test.go | 57 ++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/metrics_test.go diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..d29d654 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,115 @@ +// Package metrics 提供零依赖的进程内可观测性:一组原子计数器 + 仪表回调, +// 以「周期性 slog 快照」作为暴露出口——headless 边缘设备直接 tail 日志即可观测, +// 无需开端口、无需 Prometheus/Grafana 等外部基础设施。 +// +// 设计上把「埋点」与「暴露」解耦:业务代码只管在此处累加计数,未来若要再接 +// HTTP /metrics 或 Prometheus 出口,只需新增读取出口,无需改动任何埋点。 +package metrics + +import ( + "context" + "log/slog" + "sync/atomic" + "time" +) + +// 计数器:单调递增的累计量。各业务路径直接 .Add(1)。 +var ( + FramesDroppedMalformed atomic.Int64 // 被钩子按「畸形帧」丢弃 + FramesDroppedOversized atomic.Int64 // 被钩子按「value 超限」丢弃 + FramesDroppedNonMonotonic atomic.Int64 // 被钩子按「时间戳回退/重放」丢弃 + Writes atomic.Int64 // 成功写入(PUT)次数 + Reads atomic.Int64 // 读取(GET)次数 + Deletes atomic.Int64 // 成功删除(DEL)次数 + WriteErrors atomic.Int64 // 写入/删除失败次数 + BackpressureStalls atomic.Int64 // 写入被字节信用背压阻塞的次数 +) + +// 仪表:当前瞬时值,由持有者注册回调,快照时实时读取。 +var ( + memTableInflightFn atomic.Value // 存 func() int64 + memTableBudget atomic.Int64 +) + +// SetMemTableGauges 注册 MemTable 未刷盘字节数的实时读取回调与字节预算。 +// 通常在 MemTable 构造时调用一次。 +func SetMemTableGauges(inflight func() int64, budget int64) { + if inflight != nil { + memTableInflightFn.Store(inflight) + } + memTableBudget.Store(budget) +} + +func memTableInflight() int64 { + if f, ok := memTableInflightFn.Load().(func() int64); ok { + return f() + } + return 0 +} + +// Snapshot 是某一时刻全部指标的取值。 +type Snapshot struct { + DroppedMalformed int64 + DroppedOversized int64 + DroppedNonMonotonic int64 + Writes int64 + Reads int64 + Deletes int64 + WriteErrors int64 + BackpressureStalls int64 + MemTableInflightBytes int64 + MemTableBudgetBytes int64 +} + +// Take 读取当前各指标,组成一份快照。 +func Take() Snapshot { + return Snapshot{ + DroppedMalformed: FramesDroppedMalformed.Load(), + DroppedOversized: FramesDroppedOversized.Load(), + DroppedNonMonotonic: FramesDroppedNonMonotonic.Load(), + Writes: Writes.Load(), + Reads: Reads.Load(), + Deletes: Deletes.Load(), + WriteErrors: WriteErrors.Load(), + BackpressureStalls: BackpressureStalls.Load(), + MemTableInflightBytes: memTableInflight(), + MemTableBudgetBytes: memTableBudget.Load(), + } +} + +// LogSnapshot 用默认 slog logger 打印一行指标快照。 +func LogSnapshot() { + s := Take() + slog.Info("metrics", + "dropped_malformed", s.DroppedMalformed, + "dropped_oversized", s.DroppedOversized, + "dropped_non_monotonic", s.DroppedNonMonotonic, + "writes", s.Writes, + "reads", s.Reads, + "deletes", s.Deletes, + "write_errors", s.WriteErrors, + "backpressure_stalls", s.BackpressureStalls, + "memtable_inflight_bytes", s.MemTableInflightBytes, + "memtable_budget_bytes", s.MemTableBudgetBytes, + ) +} + +// StartLogger 启动后台 goroutine,每隔 interval 打印一次指标快照, +// 直到 ctx 取消。interval<=0 时不启动。 +func StartLogger(ctx context.Context, interval time.Duration) { + if interval <= 0 { + return + } + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + LogSnapshot() + } + } + }() +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000..7b96fac --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,57 @@ +package metrics + +import "testing" + +// 计数器为进程级全局,本测试以「增量」断言,避免依赖其它测试是否先跑过。 +func TestCountersAndSnapshot(t *testing.T) { + before := Take() + + FramesDroppedMalformed.Add(2) + FramesDroppedNonMonotonic.Add(1) + Writes.Add(5) + WriteErrors.Add(1) + BackpressureStalls.Add(3) + + after := Take() + + if d := after.DroppedMalformed - before.DroppedMalformed; d != 2 { + t.Fatalf("DroppedMalformed 增量应为 2,得到 %d", d) + } + if d := after.DroppedNonMonotonic - before.DroppedNonMonotonic; d != 1 { + t.Fatalf("DroppedNonMonotonic 增量应为 1,得到 %d", d) + } + if d := after.Writes - before.Writes; d != 5 { + t.Fatalf("Writes 增量应为 5,得到 %d", d) + } + if d := after.WriteErrors - before.WriteErrors; d != 1 { + t.Fatalf("WriteErrors 增量应为 1,得到 %d", d) + } + if d := after.BackpressureStalls - before.BackpressureStalls; d != 3 { + t.Fatalf("BackpressureStalls 增量应为 3,得到 %d", d) + } +} + +func TestMemTableGauges(t *testing.T) { + // 未注册回调时仪表读 0,不应 panic。 + s := Take() + if s.MemTableInflightBytes != 0 { + t.Fatalf("未注册回调时 inflight 应为 0,得到 %d", s.MemTableInflightBytes) + } + + inflight := int64(8192) + SetMemTableGauges(func() int64 { return inflight }, 16384) + + s = Take() + if s.MemTableInflightBytes != 8192 { + t.Fatalf("inflight 应实时读回调值 8192,得到 %d", s.MemTableInflightBytes) + } + if s.MemTableBudgetBytes != 16384 { + t.Fatalf("budget 应为 16384,得到 %d", s.MemTableBudgetBytes) + } + + // 回调实时反映变化。 + inflight = 12288 + if got := Take().MemTableInflightBytes; got != 12288 { + t.Fatalf("inflight 应实时反映为 12288,得到 %d", got) + } +} From 7bd348079cabfb12109e62f87bcbdd5ac2db6863 Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Sun, 14 Jun 2026 21:01:15 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(metrics):=20=E5=9C=A8=E9=92=A9?= =?UTF-8?q?=E5=AD=90/=E8=B7=AF=E7=94=B1/=E8=83=8C=E5=8E=8B=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=E5=9F=8B=E7=82=B9=E5=B9=B6=E5=90=AF=E5=8A=A8=E5=91=A8?= =?UTF-8?q?=E6=9C=9F=E5=BF=AB=E7=85=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ingesthook: 三类丢帧分别计数(畸形/超限/回退) - router: PUT/GET/DEL 读写计数与写错误计数 - memtable: 背压慢路径停顿计数,并注册未刷盘字节数仪表 - Server(两个 main): 每 10s 打印一行指标快照,tail 日志即可观测 Co-Authored-By: Claude Opus 4.8 (1M context) --- Server/server.go | 6 ++++++ Server/server_pprof.go | 6 ++++++ service/ingesthook/filter.go | 4 ++++ service/router.go | 6 ++++++ storage/zstorage/memtable.go | 7 ++++++- 5 files changed, 28 insertions(+), 1 deletion(-) diff --git a/Server/server.go b/Server/server.go index 2a37702..e044c06 100644 --- a/Server/server.go +++ b/Server/server.go @@ -1,9 +1,12 @@ package main import ( + "context" "fmt" + "time" "github.com/NeverENG/BanDB/network/banNet" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" "github.com/NeverENG/BanDB/service/ingesthook" @@ -39,6 +42,9 @@ func main() { server.SetConnStartFunc(router.OnConnStart) server.SetConnStopFunc(router.OnConnStop) + // 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态 + metrics.StartLogger(context.Background(), 10*time.Second) + // 启动服务 fmt.Println("Starting Server...") fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy()) diff --git a/Server/server_pprof.go b/Server/server_pprof.go index 31c76ce..ed08997 100644 --- a/Server/server_pprof.go +++ b/Server/server_pprof.go @@ -3,11 +3,14 @@ package main import ( + "context" "fmt" "net/http" _ "net/http/pprof" + "time" "github.com/NeverENG/BanDB/network/banNet" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" "github.com/NeverENG/BanDB/service" "github.com/NeverENG/BanDB/service/ingesthook" @@ -49,6 +52,9 @@ func main() { server.SetConnStartFunc(router.OnConnStart) server.SetConnStopFunc(router.OnConnStop) + // 启动周期性指标快照:headless 边缘设备 tail 日志即可观测运行状态 + metrics.StartLogger(context.Background(), 10*time.Second) + // 启动服务 fmt.Println("Starting Server...") fmt.Printf("HA initialized, initial health status: %v\n", ha.IsHealthy()) diff --git a/service/ingesthook/filter.go b/service/ingesthook/filter.go index e437e39..e75ba26 100644 --- a/service/ingesthook/filter.go +++ b/service/ingesthook/filter.go @@ -14,6 +14,7 @@ import ( "sync" "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" ) @@ -54,10 +55,12 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction { key, value, ok := parsePut(req.GetMsgData()) if !ok { + metrics.FramesDroppedMalformed.Add(1) return banIface.HookDrop // 畸形帧:长度字段与实际数据不符 } if f.maxValueLen > 0 && len(value) > f.maxValueLen { + metrics.FramesDroppedOversized.Add(1) return banIface.HookDrop // 畸形帧:value 超过上限 } @@ -70,6 +73,7 @@ func (f *Filter) Handle(req banIface.IRequest) banIface.HookAction { last, seen := f.lastTS[device] if seen && ts <= last { f.mu.Unlock() + metrics.FramesDroppedNonMonotonic.Add(1) return banIface.HookDrop // 回退/重放帧 } f.lastTS[device] = ts diff --git a/service/router.go b/service/router.go index 5796205..e781c75 100644 --- a/service/router.go +++ b/service/router.go @@ -5,6 +5,7 @@ import ( "log/slog" "github.com/NeverENG/BanDB/network/banIface" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/pkg/proto" ) @@ -113,10 +114,12 @@ func (r *Router) handlePut(data []byte, request banIface.IRequest) { if err := r.kv.Write(cmd); err != nil { slog.Error("[ERROR] handlePut: write failed", "error", err) + metrics.WriteErrors.Add(1) sendErr(request) return } + metrics.Writes.Add(1) sendOK(request) } @@ -134,6 +137,7 @@ func (r *Router) handleGet(data []byte, request banIface.IRequest) { key := data[4 : 4+keyLen] + metrics.Reads.Add(1) value, err := r.kv.Get(key) if err != nil { sendErr(request) @@ -171,10 +175,12 @@ func (r *Router) handleDelete(data []byte, request banIface.IRequest) { } if err := r.kv.Write(cmd); err != nil { + metrics.WriteErrors.Add(1) sendErr(request) return } + metrics.Deletes.Add(1) sendOK(request) } diff --git a/storage/zstorage/memtable.go b/storage/zstorage/memtable.go index 8dbf4b8..0cc7f34 100644 --- a/storage/zstorage/memtable.go +++ b/storage/zstorage/memtable.go @@ -10,6 +10,7 @@ import ( "github.com/NeverENG/BanDB/config" "github.com/NeverENG/BanDB/pkg/credit" + "github.com/NeverENG/BanDB/pkg/metrics" "github.com/NeverENG/BanDB/storage/istorage" ) @@ -70,6 +71,9 @@ func NewMemTable() *MemTable { sst: NewSSTable(), credits: credit.New(config.G.MemTableMaxInflightBytes), } + // 注册未刷盘字节数仪表,供周期性指标快照实时读取。 + metrics.SetMemTableGauges(mt.InflightBytes, config.G.MemTableMaxInflightBytes) + go mt.FlushWorker() go mt.ListenCompactCh() @@ -191,7 +195,8 @@ func (m *MemTable) acquireCredit(n int64) { if m.credits.TryAcquire(n) { return } - m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 + metrics.BackpressureStalls.Add(1) // 快路径未命中,将触发刷盘并阻塞等待信用 + m.StartFlush() // 确保有 flush 在路上来归还信用,避免永久阻塞 m.credits.Acquire(n) }