node_client_traffic_sum_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package service
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "testing"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  10. "gorm.io/gorm"
  11. )
  12. func initTrafficTestDB(t *testing.T) *gorm.DB {
  13. t.Helper()
  14. dbDir := t.TempDir()
  15. t.Setenv("XUI_DB_FOLDER", dbDir)
  16. if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
  17. t.Fatalf("InitDB: %v", err)
  18. }
  19. t.Cleanup(func() { _ = database.CloseDB() })
  20. return database.GetDB()
  21. }
  22. func createNodeInbound(t *testing.T, db *gorm.DB, nodeID int, tag string, port int) {
  23. t.Helper()
  24. nid := nodeID
  25. ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid}
  26. if err := db.Create(ib).Error; err != nil {
  27. t.Fatalf("create node inbound %q: %v", tag, err)
  28. }
  29. }
  30. // createNodeInboundWithClient mirrors createNodeInbound but stores the client
  31. // in the settings JSON so emailUsedByOtherInbounds can see the attachment.
  32. func createNodeInboundWithClient(t *testing.T, db *gorm.DB, nodeID int, tag string, port int, email string) {
  33. t.Helper()
  34. nid := nodeID
  35. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  36. ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid, Settings: settings}
  37. if err := db.Create(ib).Error; err != nil {
  38. t.Fatalf("create node inbound %q: %v", tag, err)
  39. }
  40. }
  41. func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats ...xray.ClientTraffic) {
  42. t.Helper()
  43. snap := &runtime.TrafficSnapshot{
  44. Inbounds: []*model.Inbound{{Tag: tag, ClientStats: stats}},
  45. }
  46. if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
  47. t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
  48. }
  49. }
  50. // syncNodeWithSettings mirrors syncNode but carries a real settings JSON on
  51. // the snapshot inbound, like production nodes do — the sync mirrors snapshot
  52. // settings onto the central row, and the shared-accumulator guard reads the
  53. // clients out of those settings.
  54. func syncNodeWithSettings(t *testing.T, svc *InboundService, nodeID int, tag, settings string, stats ...xray.ClientTraffic) {
  55. t.Helper()
  56. snap := &runtime.TrafficSnapshot{
  57. Inbounds: []*model.Inbound{{Tag: tag, Settings: settings, ClientStats: stats}},
  58. }
  59. if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
  60. t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
  61. }
  62. }
  63. func readTraffic(t *testing.T, db *gorm.DB, email string) xray.ClientTraffic {
  64. t.Helper()
  65. var ct xray.ClientTraffic
  66. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).First(&ct).Error; err != nil {
  67. t.Fatalf("read client_traffics %q: %v", email, err)
  68. }
  69. return ct
  70. }
  71. func assertUpDown(t *testing.T, ct xray.ClientTraffic, wantUp, wantDown int64, when string) {
  72. t.Helper()
  73. if ct.Up != wantUp || ct.Down != wantDown {
  74. t.Errorf("%s: up=%d down=%d, want %d/%d", when, ct.Up, ct.Down, wantUp, wantDown)
  75. }
  76. }
  77. func TestTwoNodesShareEmail_SumsCorrectly(t *testing.T) {
  78. db := initTrafficTestDB(t)
  79. createNodeInbound(t, db, 1, "n1-in", 41001)
  80. createNodeInbound(t, db, 2, "n2-in", 41002)
  81. svc := &InboundService{}
  82. const email = "shared"
  83. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  84. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  85. assertUpDown(t, readTraffic(t, db, email), 100, 100, "after baselines")
  86. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
  87. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
  88. assertUpDown(t, readTraffic(t, db, email), 210, 210, "after both nodes grow")
  89. }
  90. func TestSingleNode_MirrorsCorrectly(t *testing.T) {
  91. db := initTrafficTestDB(t)
  92. createNodeInbound(t, db, 1, "n1-in", 41001)
  93. svc := &InboundService{}
  94. const email = "solo"
  95. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
  96. assertUpDown(t, readTraffic(t, db, email), 500, 600, "first sync")
  97. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
  98. assertUpDown(t, readTraffic(t, db, email), 700, 800, "second sync mirrors cumulative")
  99. }
  100. func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
  101. db := initTrafficTestDB(t)
  102. createNodeInbound(t, db, 1, "n1-in", 41001)
  103. svc := &InboundService{}
  104. const email = "legacy"
  105. var ib model.Inbound
  106. if err := db.Where("tag = ?", "n1-in").First(&ib).Error; err != nil {
  107. t.Fatalf("load inbound: %v", err)
  108. }
  109. if err := db.Create(&xray.ClientTraffic{InboundId: ib.Id, Email: email, Up: 1000, Down: 2000, Enable: true}).Error; err != nil {
  110. t.Fatalf("seed pre-existing row: %v", err)
  111. }
  112. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1000, Down: 2000, Enable: true})
  113. assertUpDown(t, readTraffic(t, db, email), 1000, 2000, "first snapshot must not double-count")
  114. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1100, Down: 2100, Enable: true})
  115. assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
  116. }
  117. func TestNodeCounterReset_Clamped(t *testing.T) {
  118. db := initTrafficTestDB(t)
  119. createNodeInbound(t, db, 1, "n1-in", 41001)
  120. svc := &InboundService{}
  121. const email = "restart"
  122. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
  123. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
  124. assertUpDown(t, readTraffic(t, db, email), 950, 950, "before node reset")
  125. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
  126. ct := readTraffic(t, db, email)
  127. if ct.Up < 0 || ct.Down < 0 {
  128. t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
  129. }
  130. assertUpDown(t, ct, 1000, 1000, "after node counter reset (clamped)")
  131. }
  132. func TestCentralReset_NoReAdd(t *testing.T) {
  133. db := initTrafficTestDB(t)
  134. createNodeInbound(t, db, 1, "n1-in", 41001)
  135. createNodeInbound(t, db, 2, "n2-in", 41002)
  136. svc := &InboundService{}
  137. const email = "reset"
  138. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  139. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  140. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  141. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  142. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).
  143. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  144. t.Fatalf("simulate central reset: %v", err)
  145. }
  146. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 210, Down: 210, Enable: true})
  147. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 205, Down: 205, Enable: true})
  148. assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue")
  149. }
  150. func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
  151. db := initTrafficTestDB(t)
  152. createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
  153. createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
  154. svc := &InboundService{}
  155. const email = "shared"
  156. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  157. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  158. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  159. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
  160. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
  161. assertUpDown(t, readTraffic(t, db, email), 210, 210, "baseline sum")
  162. // Node 1 rebuilt (reinstall / another master's reconcile): its inbound
  163. // vanishes from the snapshot. The shared accumulator must survive — losing
  164. // it would let the next node sync re-seed the row with that node's counter
  165. // alone, showing only the last panel's number instead of the sum.
  166. if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
  167. t.Fatalf("sync node 1 with empty snapshot: %v", err)
  168. }
  169. assertUpDown(t, readTraffic(t, db, email), 210, 210, "after node 1 inbound removal")
  170. // Node 2 keeps accruing onto the surviving row.
  171. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
  172. assertUpDown(t, readTraffic(t, db, email), 250, 250, "after node 2 grows")
  173. }
  174. func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
  175. db := initTrafficTestDB(t)
  176. createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
  177. createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
  178. svc := &InboundService{}
  179. const email = "shared"
  180. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  181. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  182. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  183. // Client detached from node 1's inbound only: its stats vanish from that
  184. // inbound's snapshot while node 2 still hosts the email.
  185. syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
  186. assertUpDown(t, readTraffic(t, db, email), 100, 100, "after client left node 1")
  187. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
  188. assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing")
  189. }
  190. // TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
  191. // sweep bug behind #5202: the client is attached to two inbounds of the SAME
  192. // node, the node reports its stats under n1-a only, but the master-side row
  193. // is owned by n1-b's mirror. The per-email sweep for n1-b must not drop the
  194. // (node, email) baseline that n1-a's delta computation needs — doing so every
  195. // cycle froze the client's counter permanently.
  196. func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) {
  197. db := initTrafficTestDB(t)
  198. createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "fresh")
  199. createNodeInbound(t, db, 1, "n1-b", 41002)
  200. svc := &InboundService{}
  201. const email = "fresh"
  202. var ibB model.Inbound
  203. if err := db.Where("tag = ?", "n1-b").First(&ibB).Error; err != nil {
  204. t.Fatalf("load n1-b: %v", err)
  205. }
  206. // Master-side row created when the client was added on the panel, owned by
  207. // n1-b's mirror (e.g. the client form targeted that inbound).
  208. if err := db.Create(&xray.ClientTraffic{InboundId: ibB.Id, Email: email, Enable: true}).Error; err != nil {
  209. t.Fatalf("seed master row: %v", err)
  210. }
  211. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  212. sync := func(up, down int64) {
  213. t.Helper()
  214. snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{
  215. {Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: up, Down: down, Enable: true}}},
  216. {Tag: "n1-b", Settings: `{"clients": []}`},
  217. }}
  218. if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil {
  219. t.Fatalf("sync: %v", err)
  220. }
  221. }
  222. sync(630, 630)
  223. var baselines int64
  224. if err := db.Model(&model.NodeClientTraffic{}).Where("node_id = ? AND email = ?", 1, email).Count(&baselines).Error; err != nil {
  225. t.Fatalf("count baselines: %v", err)
  226. }
  227. if baselines != 1 {
  228. t.Fatalf("baseline must survive the sibling-inbound sweep, found %d rows", baselines)
  229. }
  230. sync(700, 700)
  231. assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives")
  232. }
  233. func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
  234. db := initTrafficTestDB(t)
  235. svc := &InboundService{}
  236. const email = "gone"
  237. if err := db.Create(&xray.ClientTraffic{InboundId: 1, Email: email, Enable: true}).Error; err != nil {
  238. t.Fatalf("seed client_traffics: %v", err)
  239. }
  240. if err := db.Create(&model.NodeClientTraffic{NodeId: 1, Email: email, Up: 10, Down: 10}).Error; err != nil {
  241. t.Fatalf("seed node baseline 1: %v", err)
  242. }
  243. if err := db.Create(&model.NodeClientTraffic{NodeId: 2, Email: email, Up: 20, Down: 20}).Error; err != nil {
  244. t.Fatalf("seed node baseline 2: %v", err)
  245. }
  246. if err := svc.DelClientStat(db, email); err != nil {
  247. t.Fatalf("DelClientStat: %v", err)
  248. }
  249. var cnt int64
  250. if err := db.Model(&model.NodeClientTraffic{}).Where("email = ?", email).Count(&cnt).Error; err != nil {
  251. t.Fatalf("count baselines: %v", err)
  252. }
  253. if cnt != 0 {
  254. t.Errorf("expected node baselines cleaned, found %d", cnt)
  255. }
  256. }
  257. func TestNodeDelete_CleansNodeBaselines(t *testing.T) {
  258. db := initTrafficTestDB(t)
  259. nodeSvc := NodeService{}
  260. if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "a", Up: 1, Down: 1}).Error; err != nil {
  261. t.Fatalf("seed node 7 a: %v", err)
  262. }
  263. if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "b", Up: 2, Down: 2}).Error; err != nil {
  264. t.Fatalf("seed node 7 b: %v", err)
  265. }
  266. if err := db.Create(&model.NodeClientTraffic{NodeId: 8, Email: "c", Up: 3, Down: 3}).Error; err != nil {
  267. t.Fatalf("seed node 8 c: %v", err)
  268. }
  269. if err := nodeSvc.Delete(7); err != nil {
  270. t.Fatalf("NodeService.Delete(7): %v", err)
  271. }
  272. var sevenCnt, eightCnt int64
  273. db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 7).Count(&sevenCnt)
  274. db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 8).Count(&eightCnt)
  275. if sevenCnt != 0 {
  276. t.Errorf("node 7 baselines not cleaned: %d remain", sevenCnt)
  277. }
  278. if eightCnt != 1 {
  279. t.Errorf("node 8 baseline should survive, found %d", eightCnt)
  280. }
  281. }