Skip to content

feat: 采集入口可编程钩子真正可拦截/改写 + 真实过滤示例(方向 A)#130

Merged
NeverENG merged 5 commits into
mainfrom
test/bangd
Jun 14, 2026
Merged

feat: 采集入口可编程钩子真正可拦截/改写 + 真实过滤示例(方向 A)#130
NeverENG merged 5 commits into
mainfrom
test/bangd

Conversation

@NeverENG

@NeverENG NeverENG commented Jun 14, 2026

Copy link
Copy Markdown
Owner

背景(采集员视角痛点 #2

文档把「落盘前可编程预处理」当唯一护城河,但代码层 PreHandle 是只读旁观者——签名 func(IRequest) 无返回值,Handle 又自己重读 GetMsgData(),钩子既不能丢帧也不能改写。本 PR 把它补成真的。

改动(两个原子 commit)

1. 机制层 feat(net) — 钩子可拦截/改写

  • banIface 新增 HookAction(HookPass/HookDrop),IRouter.PreHandle 返回处置决定
  • DoMsgHandleHookDrop 时短路 Handle
  • IRequest.SetMsgData 改写负载并同步 DataLen(脱敏/裁剪用)
  • service 层持有「丢弃即回写唯一 StatusDropped 响应」的不变式,避免纯请求-响应协议错位

2. 内容层 feat(service) — 真实采集入口过滤钩子 service/ingesthook.Filter

  • 丢弃畸形 PUT 帧(长度字段不符 / value 超限)
  • 按设备 best-effort 时间戳单调校验,丢回退/重放帧(work-stealing 下乱序非顺序保证,注释已说明,可关闭)
  • 对 JSON 敏感字段(gps/user_id)脱敏,改写时重建 valueLen 前缀
  • 仅作用于 PUT,GET/DELETE 原样放行

测试

go build ./... 通过;service/ingesthook 8 个单测覆盖 GET 放行、畸形丢弃、超限丢弃、单调丢弃/关闭、无约定 key 放行、脱敏重编码、非 JSON 原样;service 既有 FSM 测试不回归。

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added standalone mode operation as alternative to Raft-based clustering
    • Added input filtering and validation to process frames before persistence
    • Added sensitive field redaction for GPS and user ID data
    • Added per-device timestamp monotonicity validation
    • Added frame dropping for malformed payloads
  • Configuration

    • New "Mode" configuration option to select standalone or Raft deployment

NeverENG and others added 5 commits June 7, 2026 17:36
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…与 StatusDropped

钩子从只读旁观者升级为可丢弃/改写一帧的真实拦截点:
- banIface 新增 HookAction(HookPass/HookDrop),IRouter.PreHandle 返回处置决定
- DoMsgHandle 在 HookDrop 时短路 Handle
- IRequest.SetMsgData 改写负载并同步 DataLen,供脱敏/裁剪
- service 层 PreHandle 持有「丢弃即回写唯一 StatusDropped 响应」的不变式,避免响应错位

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
新增 service/ingesthook.Filter 并在两个 Server main 上用 SetPreHandle 挂载,
把「落盘前可编程预处理」从挂载点变成端到端可演示能力:
- 丢弃畸形 PUT 帧(长度字段与数据不符 / value 超限)
- 按设备 best-effort 时间戳单调校验,丢回退/重放帧(work-stealing 下乱序
  非顺序保证,已在注释说明,可经 dropBackward 关闭)
- 对 JSON 中 gps/user_id 等敏感字段脱敏,改写时重建 valueLen 前缀
仅针对 PUT,GET/DELETE 原样放行;钩子不触碰连接,丢弃响应由 Router 统一回写。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@NeverENG NeverENG merged commit e2ef3e3 into main Jun 14, 2026
3 of 4 checks passed
@coderabbitai

coderabbitai Bot commented Jun 14, 2026

Copy link
Copy Markdown

Review Change Stack

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 1526e7b0-6a8d-4ad1-a0ed-9bc5851488c2

📥 Commits

Reviewing files that changed from the base of the PR and between f71e8a0 and 6f3bbd7.

📒 Files selected for processing (20)
  • Server/server.go
  • Server/server_pprof.go
  • config/config.json
  • config/global.go
  • config/global_test.go
  • docs/edge-positioning-feasibility.md
  • network/banIface/iRequest.go
  • network/banIface/iRouter.go
  • network/banNet/msgHandle.go
  • network/banNet/request.go
  • network/banNet/router.go
  • pkg/proto/codes.go
  • service/fsm.go
  • service/fsm_test.go
  • service/ha.go
  • service/ingesthook/filter.go
  • service/ingesthook/filter_test.go
  • service/router.go
  • storage/wal.go
  • storage/wal_test.go

📝 Walkthrough

Walkthrough

This PR adds two independent capabilities: (1) a WAL-backed standalone mode for KVServer (bypassing Raft) with a new Mode config field, and (2) a typed HookAction PreHandle system with an ingest filter that drops malformed frames, enforces per-device timestamp monotonicity, and redacts JSON fields before persistence.

Changes

Standalone WAL Mode

Layer / File(s) Summary
WAL storage implementation and tests
storage/wal.go, storage/wal_test.go
Defines WALOpPut/WALOpDelete, WAL struct, NewWAL, Append (BigEndian length-prefixed records with Sync), and Replay (callback iteration stopping on torn tail). Tests cover append/replay, reopen persistence, torn-tail stop, and missing-file replay.
Mode constants, GlobalConfig field, and resolveMode
config/global.go, config/config.json, config/global_test.go
Adds ModeStandalone/ModeRaft constants and Mode string to GlobalConfig. ParseFlags calls resolveMode() which infers mode from Peers length when empty and panics on invalid values. Standalone mode skips Me/Peers validation. config.json sets "standalone". Tests parameterize peer validation by mode.
KVServer standalone path, health check, and FSM tests
service/fsm.go, service/ha.go, service/fsm_test.go
NewKVServer branches on ModeStandalone to open and replay WAL without starting Raft. Write routes to writeStandalone (WAL append then storage apply) when raft is nil. Run and WaitUntilReady no-op in standalone. checkHealth guards GetState() behind a nil-raft check. Adds TestStandalone_WriteAndRecover for persistence/recovery.
handlePut/handleDelete unified to kv.Write
service/router.go
Replaces AppendEntry+WaitForCommit in handlePut and handleDelete with a single r.kv.Write(cmd), routing both standalone and Raft through the same write abstraction.

PreHandle Hook & Ingest Filter

Layer / File(s) Summary
HookAction interface contracts and StatusDropped
network/banIface/iRouter.go, network/banIface/iRequest.go, pkg/proto/codes.go
Adds HookAction type with HookPass/HookDrop constants. Updates IRouter.PreHandle to return HookAction. Extends IRequest with SetMsgData([]byte). Adds StatusDropped response constant.
Network layer enforcement
network/banNet/request.go, network/banNet/router.go, network/banNet/msgHandle.go
Request.SetMsgData updates data and syncs MsgLen. BaseRouter.PreHandle returns HookPass. DoMsgHandle checks the PreHandle return for HookDrop and exits early.
Service router PreHandle wiring and sendDropped
service/router.go
preHandleFunc field and SetPreHandle parameter now return HookAction. PreHandle returns the hook action and calls sendDropped (writes MsgRespErr with StatusDropped) on HookDrop.
Ingest filter: validation, monotonicity, redaction
service/ingesthook/filter.go, service/ingesthook/filter_test.go
Filter drops malformed/oversized PUT frames, enforces per-device timestamp monotonicity when dropBackward is set, and rewrites payloads with JSON field redaction. Helpers: parsePut/encodePut, parseKey, redact. Tests cover all filter behaviors including non-JSON pass-through.
Server entry points wired with ingesthook filter
Server/server.go, Server/server_pprof.go
Both server entry points construct a Filter with gps/user_id redact fields and monotonic options, and register it via router.SetPreHandle(filter.Handle).

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant DoMsgHandle
  participant Router as service.Router
  participant Filter as ingesthook.Filter
  participant KVServer

  Client->>DoMsgHandle: frame
  DoMsgHandle->>Router: PreHandle(request)
  Router->>Filter: Handle(request)

  alt malformed / oversized / backward timestamp
    Filter-->>Router: HookDrop
    Router->>Client: MsgRespErr (StatusDropped)
    Router-->>DoMsgHandle: HookDrop
    note over DoMsgHandle: return early
  else valid frame (with optional field redaction)
    Filter->>Filter: SetMsgData(redacted payload)
    Filter-->>Router: HookPass
    Router-->>DoMsgHandle: HookPass
    DoMsgHandle->>Router: Handle(request)
    Router->>KVServer: Write(cmd)
    KVServer->>KVServer: writeStandalone → WAL.Append + storage.Put/Delete
    KVServer-->>Router: nil
    Router->>Client: MsgRespOK (StatusOK)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • NeverENG/BanDB#89: Modifies the same service/router.go and pkg/proto/codes.go plumbing for router pre-handle behavior and status payload encoding, directly overlapping with this PR's hook action and drop response changes.

Poem

🐰 Hoppity-hop through the WAL I go,
Appending each record with BigEndian flow.
Malformed frames? I drop with a flick!
Redact the gps field—oh so slick.
Standalone or Raft, the bunny decides,
No backward timestamps on these furry rides! 🌿

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch test/bangd

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions

Copy link
Copy Markdown

🐯 BanGD 数据库内核评审

整体风险:🔴 高

变更总结:这个 PR 做了四件事,跨越两个维度:
1. 钩子机制真正可拦截/改写(network/banIface + banNet 层):将 PreHandle 签名从无返回值改为返回 HookActionHookPass / HookDrop),新增 IRequest.SetMsgData 支持改写负载并同步 DataLenDoMsgHandleHookDrop 时短路 Handle。这是系统网络层与路由层之间控制流不变量的变更——之前钩子只是只读旁路,现在是真正的处置网关。

2. 采集入口过滤钩子实现(service/ingesthook):新增 filter.go 实现三个过滤器——丢弃畸形 PUT 帧(长度不符/value 超限)、按设备 best-effort 时间戳单调校验(丢回退/重放)、JSON 敏感字段脱敏。这为「可编程边缘采集缓冲网关」差异化提供了真实代码演示。

3. 运行模式独立化 + 存储层 WAL(service/fsm.go, config/global.go, storage/wal.go):新增 Mode 配置(standalone/raft),standalone 模式下不启动 Raft、使用存储层 WAL(storage/wal.go),写路径改为 WAL→memtable(先 append+fsync 再写存储),并在启动时重放 WAL 恢复。原有 Raft 路径写经 AppendEntry→WaitForCommit→Apply 的流程不受影响。这是系统架构层从「强依赖 Raft 才可持久化」到「standalone 模式独立 WAL 路径」的演变——增加了运行时模式选择,但引入了两条不同的持久化路径。

4. 协议层新增 StatusDropped 响应状态 + Router 层的「丢弃即回写」不变式(pkg/proto/codes.go, service/router.go):钩子丢弃帧时,Router.PreHandle 代为回写唯一响应,确保纯请求-响应协议下不发生响应错位。

此外有 docs 文档微调和 config.json 的 Mode 字段初始化。

本评审不阻塞合入;架构级建议以 Issue 形式跟踪,普通问题在下方内联列出。

架构问题(共 5 项)

普通问题(共 5 项)

🛑 [阻塞 · 逻辑错误] service/ingesthook/filter.go:72 单调性校验中相等时间戳判为丢弃与文档不一致

  • 第 72 行 if seen && ts <= last 将相等时间戳视为回退丢弃。文档注释(第 106 行)说「时间戳回退/重放的帧」,重放=相同时间戳被丢弃。但文档第 56–57 行说「时间戳单调性校验」——单调递增通常允许相等(非严格单调)。这是一个语义分歧。PR 描述说「丢回退/重放帧」。如果设计意图是严格单调(ts 必须大于 last),那代码正确;如果意图是非严格单调(ts >= last 就放行),则应该用 < 而非 <=。代码与 PR 描述一致(丢重放帧),但注释「单调校验」可能被误解为允许相等。
  • 建议:如果设计意图是严格单调(不允许相等重放),保持 <= 但将注释从「单调校验」改为「严格单调递增校验」。如果要允许相等(非严格单调),改为 ts < last

🛑 [阻塞 · 逻辑错误] service/ingesthook/filter.go:49 parsePut 中 keyLen/valueLen 负数检查对 uint32 无效

  • 第 55 行 if keyLen < 0 || valueLen < 0keyLenvalueLenint(binary.LittleEndian.Uint32(...)) 转换而来。binary.LittleEndian.Uint32 返回 uint32,转换为 int 在 Go 中当 uint32 值在 [0, 2^31-1] 范围内时为正数,在 [2^31, 2^32-1] 范围内时为负数(int 是带符号类型)。这个检查确实能捕获超大的 uint32 值(当高位为 1 时 int 变为负数)。但这依赖于 int 在 64 位平台上是 64 位宽——在 32 位平台上,uint32 转 int 永远不会为负(因为 int 也是 32 位,uint32 最大可表示)。所以这个检查在 32 位架构上是无效的。更严重的是,当 keyLen 和 valueLen 被解析为极大数据(接近 2^32)时,8+keyLen+valueLen 会整型溢出变为小值,绕过长度检查。
  • 建议:将 keyLen/valueLen 改为 uint32 类型,长度检查改为 if 8+uint64(keyLen)+uint64(valueLen) > uint64(len(data)),防止整型溢出。

⚠️ [重要 · 逻辑错误] service/fsm.go:78 NewKVServer 中 raft 模式丢弃初始化时的错误

  • 第 72 行 kv.raft = Raft.NewRaft(config.G.Peers, config.G.Me)NewRaft 返回 *Raft.Raft 但不返回 error。如果 Raft 初始化失败(如端口占用、WAL 打开失败),该错误被静默吞掉,kv.raft 可能是一个部分初始化的 Raft 实例(依赖 panic 而非 error 传递)。虽然这是已有代码的问题,但本 PR 新增了 standalone 分支(第 58–67 行使用 error 检查的 NewWAL),使两种模式的错误处理风格不一致。
  • 建议:将 Raft.NewRaft 改为返回 error 的签名,或在调用后检查 Raft 状态是否可用。与 standalone 分支的错误处理风格保持一致。

⚠️ [重要 · 资源泄漏] service/ingesthook/filter.go:77 redact 中 JSON 反序列化失败时可能泄漏 map 内存

  • 第 127 行 var m map[string]json.RawMessagejson.Unmarshal 失败时,m 为零值(nil map),changed 为 false,函数返回 (value, false)。这不是内存泄漏——Go GC 会回收。但考虑性能:每次 PUT 帧(包括非 JSON value 的帧)都会执行 json.Unmarshal,产生一次堆分配(map 的底层结构),即使该帧是二进制负载本就不该走 JSON 路径。在高频采集(数千 Hz)下,这个分配会显著增加 GC 压力。文档说「非 JSON 原样放行」是正确的,但代价是每次 PUT 都触发一次 JSON 解析尝试。
  • 建议:在 redact 入口处检查 value 的第一个字符是否为 {[(JSON 对象/数组的起始字节),如果非 {,立即返回 (value, false) 避免不必要的 json.Unmarshal 调用。

💡 [建议 · 错误处理] storage/wal.go:67 WAL.Append 中 fsync 后 Write 错误被掩盖

  • 第 66–68 行:如果 w.file.Write(buf) 失败返回 error,不会执行 w.file.Sync(),函数返回 Write 的错误。但如果 Write 成功、Sync 失败,函数返回 Sync 的错误——此时数据已写入文件但可能未落盘(fsync 失败)。如果调用方认为 error 意味着「数据未写入」,可能错误地认为可以在不持久化的情况下继续。正确的 crash-safe 处理是:Sync 失败时仍需记录数据可能已部分落盘。
  • 建议:在 Sync 失败时,标记 WAL 为「损坏」状态,后续 Append 和 Replay 操作应拒绝执行,避免在损坏的 WAL 上继续写入。

本次评审消耗 token:共 544028 tokens(输入 494574,输出 15022,缓存命中 34432,缓存写入 0)|维度 [concurrency, memory, lock, storage, schema]|对抗式复核 3 票/条,过滤疑似误报 0 条

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant