node_client_traffic_sum_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. package service
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "strings"
  6. "testing"
  7. "gorm.io/gorm"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  11. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  12. )
  13. func initTrafficTestDB(t *testing.T) *gorm.DB {
  14. t.Helper()
  15. dbDir := t.TempDir()
  16. t.Setenv("XUI_DB_FOLDER", dbDir)
  17. if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
  18. t.Fatalf("InitDB: %v", err)
  19. }
  20. t.Cleanup(func() { _ = database.CloseDB() })
  21. return database.GetDB()
  22. }
  23. func createNodeInbound(t *testing.T, db *gorm.DB, nodeID int, tag string, port int) {
  24. t.Helper()
  25. nid := nodeID
  26. ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid}
  27. if err := db.Create(ib).Error; err != nil {
  28. t.Fatalf("create node inbound %q: %v", tag, err)
  29. }
  30. }
  31. // createNodeInboundWithClient mirrors createNodeInbound but stores the client
  32. // in the settings JSON so emailUsedByOtherInbounds can see the attachment.
  33. func createNodeInboundWithClient(t *testing.T, db *gorm.DB, nodeID int, tag string, port int, email string) {
  34. t.Helper()
  35. nid := nodeID
  36. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  37. ib := &model.Inbound{UserId: 1, Tag: tag, Enable: true, Port: port, Protocol: model.VLESS, NodeID: &nid, Settings: settings}
  38. if err := db.Create(ib).Error; err != nil {
  39. t.Fatalf("create node inbound %q: %v", tag, err)
  40. }
  41. }
  42. func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats ...xray.ClientTraffic) {
  43. t.Helper()
  44. snap := &runtime.TrafficSnapshot{
  45. Inbounds: []*model.Inbound{{Tag: tag, ClientStats: stats}},
  46. }
  47. if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
  48. t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
  49. }
  50. }
  51. // syncNodeWithSettings mirrors syncNode but carries a real settings JSON on
  52. // the snapshot inbound, like production nodes do — the sync mirrors snapshot
  53. // settings onto the central row, and the shared-accumulator guard reads the
  54. // clients out of those settings.
  55. func syncNodeWithSettings(t *testing.T, svc *InboundService, nodeID int, tag, settings string, stats ...xray.ClientTraffic) {
  56. t.Helper()
  57. snap := &runtime.TrafficSnapshot{
  58. Inbounds: []*model.Inbound{{Tag: tag, Settings: settings, ClientStats: stats}},
  59. }
  60. if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
  61. t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
  62. }
  63. }
  64. func readTraffic(t *testing.T, db *gorm.DB, email string) xray.ClientTraffic {
  65. t.Helper()
  66. var ct xray.ClientTraffic
  67. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).First(&ct).Error; err != nil {
  68. t.Fatalf("read client_traffics %q: %v", email, err)
  69. }
  70. return ct
  71. }
  72. func assertUpDown(t *testing.T, ct xray.ClientTraffic, wantUp, wantDown int64, when string) {
  73. t.Helper()
  74. if ct.Up != wantUp || ct.Down != wantDown {
  75. t.Errorf("%s: up=%d down=%d, want %d/%d", when, ct.Up, ct.Down, wantUp, wantDown)
  76. }
  77. }
  78. func TestTwoNodesShareEmail_SumsCorrectly(t *testing.T) {
  79. db := initTrafficTestDB(t)
  80. createNodeInbound(t, db, 1, "n1-in", 41001)
  81. createNodeInbound(t, db, 2, "n2-in", 41002)
  82. svc := &InboundService{}
  83. const email = "shared"
  84. // First-ever sync from each node: the row is created at 0 and the current
  85. // node counters become the baseline. Only deltas beyond those baselines count.
  86. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  87. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  88. assertUpDown(t, readTraffic(t, db, email), 0, 0, "after baselines — historical node traffic not imported")
  89. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
  90. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
  91. assertUpDown(t, readTraffic(t, db, email), 110, 110, "after both nodes grow — deltas (50+60) accrue")
  92. }
  93. func TestSingleNode_MirrorsCorrectly(t *testing.T) {
  94. db := initTrafficTestDB(t)
  95. createNodeInbound(t, db, 1, "n1-in", 41001)
  96. svc := &InboundService{}
  97. const email = "solo"
  98. // First sync: row seeded at 0, current counter becomes the baseline.
  99. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
  100. assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync — historical traffic not imported")
  101. // Second sync: delta (700-500=200 / 800-600=200) accrues normally.
  102. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
  103. assertUpDown(t, readTraffic(t, db, email), 200, 200, "second sync — delta accrues")
  104. }
  105. func TestNodeAdd_ImportsClientHistoryWithNewInbound(t *testing.T) {
  106. db := initTrafficTestDB(t)
  107. svc := &InboundService{}
  108. const email = "newnode-client"
  109. const histUp, histDown int64 = 6_000_000_000, 200_000_000_000
  110. syncNode(t, svc, 1, "fresh-in", xray.ClientTraffic{Email: email, Up: histUp, Down: histDown, Enable: true})
  111. assertUpDown(t, readTraffic(t, db, email), histUp, histDown, "node-add: client history imported with its brand-new inbound")
  112. syncNode(t, svc, 1, "fresh-in", xray.ClientTraffic{Email: email, Up: histUp + 1024, Down: histDown + 2048, Enable: true})
  113. assertUpDown(t, readTraffic(t, db, email), histUp+1024, histDown+2048, "post-import delta accrues, no double count")
  114. }
  115. // TestStaleSnapshot_DeletedClientNotResurrected reproduces #5739: a snapshot
  116. // fetched just before a client's deletion still names the email. Applying it
  117. // must neither recreate the client_traffics row (at zero) nor re-add the
  118. // client to the central inbound's settings while the delete tombstone lives.
  119. func TestStaleSnapshot_DeletedClientNotResurrected(t *testing.T) {
  120. db := initTrafficTestDB(t)
  121. createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "victim-5739")
  122. svc := &InboundService{}
  123. const email = "victim-5739"
  124. withClient := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  125. syncNodeWithSettings(t, svc, 1, "n1-in", withClient, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  126. if err := db.Model(&model.Inbound{}).Where("tag = ?", "n1-in").
  127. Update("settings", `{"clients": []}`).Error; err != nil {
  128. t.Fatalf("clear central settings: %v", err)
  129. }
  130. if err := db.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
  131. t.Fatalf("delete stats row: %v", err)
  132. }
  133. tombstoneClientEmail(email)
  134. syncNodeWithSettings(t, svc, 1, "n1-in", withClient, xray.ClientTraffic{Email: email, Up: 120, Down: 120, Enable: true})
  135. var rows int64
  136. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Count(&rows).Error; err != nil {
  137. t.Fatalf("count stats rows: %v", err)
  138. }
  139. if rows != 0 {
  140. t.Errorf("deleted client's stats row resurrected by stale snapshot (%d rows)", rows)
  141. }
  142. var ib model.Inbound
  143. if err := db.Where("tag = ?", "n1-in").First(&ib).Error; err != nil {
  144. t.Fatalf("load inbound: %v", err)
  145. }
  146. if strings.Contains(ib.Settings, email) {
  147. t.Errorf("deleted client re-added to central settings: %s", ib.Settings)
  148. }
  149. }
  150. func TestNodeAdd_TombstonedClientNotResurrected(t *testing.T) {
  151. db := initTrafficTestDB(t)
  152. svc := &InboundService{}
  153. const email = "deleted-ghost"
  154. const stale int64 = 50_000_000_000
  155. tombstoneClientEmail(email)
  156. syncNode(t, svc, 1, "fresh-in", xray.ClientTraffic{Email: email, Up: stale, Down: stale, Enable: true})
  157. assertUpDown(t, readTraffic(t, db, email), 0, 0, "tombstoned client must not resurrect via node-add seed")
  158. }
  159. func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
  160. db := initTrafficTestDB(t)
  161. createNodeInbound(t, db, 1, "n1-in", 41001)
  162. svc := &InboundService{}
  163. const email = "legacy"
  164. var ib model.Inbound
  165. if err := db.Where("tag = ?", "n1-in").First(&ib).Error; err != nil {
  166. t.Fatalf("load inbound: %v", err)
  167. }
  168. if err := db.Create(&xray.ClientTraffic{InboundId: ib.Id, Email: email, Up: 1000, Down: 2000, Enable: true}).Error; err != nil {
  169. t.Fatalf("seed pre-existing row: %v", err)
  170. }
  171. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1000, Down: 2000, Enable: true})
  172. assertUpDown(t, readTraffic(t, db, email), 1000, 2000, "first snapshot must not double-count")
  173. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 1100, Down: 2100, Enable: true})
  174. assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
  175. }
  176. // TestGhostData_NoPhantomTraffic reproduces the 50 GB phantom traffic bug:
  177. // a node retains stale data for a deleted client (e.g. de-1 was offline during
  178. // deletion). When the master first syncs this email again it must NOT import the
  179. // stale counter — it sets Up=0 and records the current node value as the
  180. // baseline so only future deltas count.
  181. func TestGhostData_NoPhantomTraffic(t *testing.T) {
  182. db := initTrafficTestDB(t)
  183. createNodeInbound(t, db, 1, "de1-in", 41001)
  184. svc := &InboundService{}
  185. const email = "pouya"
  186. const staleBytes int64 = 50 * 1024 * 1024 * 1024 // 50 GB stale on de-1
  187. // Node reports a client with a large pre-existing counter (ghost data from a
  188. // deleted account that survived on the node). Master has no row for this email.
  189. syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes, Down: staleBytes, Enable: true})
  190. ct := readTraffic(t, db, email)
  191. if ct.Up >= staleBytes || ct.Down >= staleBytes {
  192. t.Errorf("ghost data imported: up=%d down=%d; stale node counter must not count as new traffic", ct.Up, ct.Down)
  193. }
  194. assertUpDown(t, ct, 0, 0, "first sync of ghost email — imported as 0, not 50 GB")
  195. // Subsequent syncs only add incremental usage beyond the baseline.
  196. syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes + 1024, Down: staleBytes + 2048, Enable: true})
  197. assertUpDown(t, readTraffic(t, db, email), 1024, 2048, "only incremental traffic beyond baseline counts")
  198. }
  199. func TestNodeCounterReset_NoReAdd(t *testing.T) {
  200. db := initTrafficTestDB(t)
  201. createNodeInbound(t, db, 1, "n1-in", 41001)
  202. svc := &InboundService{}
  203. const email = "restart"
  204. // First sync seeds at 0 (baseline=900). Second sync adds delta 950-900=50.
  205. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
  206. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
  207. assertUpDown(t, readTraffic(t, db, email), 50, 50, "before node reset")
  208. // Node reboot drops the counter to 50. delta=50-950=-900 is a counter reset,
  209. // not new traffic: add 0 and rebaseline to 50, never re-add the node's full
  210. // cumulative counter onto the master total (#5456).
  211. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
  212. ct := readTraffic(t, db, email)
  213. if ct.Up < 0 || ct.Down < 0 {
  214. t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
  215. }
  216. assertUpDown(t, ct, 50, 50, "after node counter reset: rebaselined, not re-added")
  217. // Post-reset accrual resumes from the new baseline: 80-50=30.
  218. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 80, Down: 80, Enable: true})
  219. assertUpDown(t, readTraffic(t, db, email), 80, 80, "post-reset delta accrues from rebaselined counter")
  220. }
  221. func TestCentralReset_NoReAdd(t *testing.T) {
  222. db := initTrafficTestDB(t)
  223. createNodeInbound(t, db, 1, "n1-in", 41001)
  224. createNodeInbound(t, db, 2, "n2-in", 41002)
  225. svc := &InboundService{}
  226. const email = "reset"
  227. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  228. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  229. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  230. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  231. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).
  232. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  233. t.Fatalf("simulate central reset: %v", err)
  234. }
  235. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 210, Down: 210, Enable: true})
  236. syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 205, Down: 205, Enable: true})
  237. assertUpDown(t, readTraffic(t, db, email), 15, 15, "after central reset only increments accrue")
  238. }
  239. // A real reset (ResetClientTrafficByEmail) must clear the per-node baseline so
  240. // the node's pre-reset cumulative — including traffic it counted but had not yet
  241. // synced — cannot leak back onto the master after the reset (#5476, #5390).
  242. func TestCentralResetClearsNodeBaseline_NoLeak(t *testing.T) {
  243. db := initTrafficTestDB(t)
  244. createNodeInbound(t, db, 1, "n1-in", 41001)
  245. StartTrafficWriter()
  246. svc := &InboundService{}
  247. const email = "reset-revert"
  248. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  249. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
  250. assertUpDown(t, readTraffic(t, db, email), 200, 200, "before reset")
  251. if err := svc.ResetClientTrafficByEmail(email); err != nil {
  252. t.Fatalf("ResetClientTrafficByEmail: %v", err)
  253. }
  254. assertUpDown(t, readTraffic(t, db, email), 0, 0, "right after reset")
  255. var baselines int64
  256. if err := db.Model(&model.NodeClientTraffic{}).Where("email = ?", email).Count(&baselines).Error; err != nil {
  257. t.Fatalf("count baselines: %v", err)
  258. }
  259. if baselines != 0 {
  260. t.Fatalf("reset must clear node baseline rows, found %d", baselines)
  261. }
  262. // Node still reports its pre-reset cumulative (340 > last synced 300: usage it
  263. // had not synced before the reset). It must not revert the reset.
  264. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 340, Down: 340, Enable: true})
  265. assertUpDown(t, readTraffic(t, db, email), 0, 0, "stale node counter must not revert reset")
  266. // Genuine post-reset usage accrues from the rebaselined counter: 370-340=30.
  267. syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 370, Down: 370, Enable: true})
  268. assertUpDown(t, readTraffic(t, db, email), 30, 30, "post-reset usage accrues")
  269. }
  270. func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
  271. db := initTrafficTestDB(t)
  272. createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
  273. createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
  274. svc := &InboundService{}
  275. const email = "shared"
  276. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  277. // Baselines set: node1=100, node2=200. Row seeded at 0.
  278. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  279. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  280. // node1 delta=50, node2 delta=60 → total=110.
  281. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
  282. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
  283. assertUpDown(t, readTraffic(t, db, email), 110, 110, "baseline sum")
  284. // Node 1 rebuilt (reinstall / another master's reconcile): its inbound
  285. // vanishes from the snapshot. The shared accumulator must survive — losing
  286. // it would let the next node sync re-seed the row with that node's counter
  287. // alone, showing only the last panel's number instead of the sum.
  288. if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
  289. t.Fatalf("sync node 1 with empty snapshot: %v", err)
  290. }
  291. assertUpDown(t, readTraffic(t, db, email), 110, 110, "after node 1 inbound removal")
  292. // Node 2 keeps accruing onto the surviving row. node2 delta=300-260=40 → 150.
  293. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
  294. assertUpDown(t, readTraffic(t, db, email), 150, 150, "after node 2 grows")
  295. }
  296. func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
  297. db := initTrafficTestDB(t)
  298. createNodeInboundWithClient(t, db, 1, "n1-in", 41001, "shared")
  299. createNodeInboundWithClient(t, db, 2, "n2-in", 41002, "shared")
  300. svc := &InboundService{}
  301. const email = "shared"
  302. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  303. // First syncs set baselines (node1=100, node2=200). Row seeded at 0.
  304. syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
  305. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
  306. // Client detached from node 1's inbound only: its stats vanish from that
  307. // inbound's snapshot while node 2 still hosts the email.
  308. syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
  309. assertUpDown(t, readTraffic(t, db, email), 0, 0, "after client left node 1 — row unchanged at 0")
  310. // Node 2 delta: 240-200=40 → accrues to 40.
  311. syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
  312. assertUpDown(t, readTraffic(t, db, email), 40, 40, "node 2 keeps accruing")
  313. }
  314. // TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
  315. // sweep bug behind #5202: the client is attached to two inbounds of the SAME
  316. // node, the node reports its stats under n1-a only, but the master-side row
  317. // is owned by n1-b's mirror. The per-email sweep for n1-b must not drop the
  318. // (node, email) baseline that n1-a's delta computation needs — doing so every
  319. // cycle froze the client's counter permanently.
  320. func TestStatsUnderSiblingInbound_KeepsNodeBaseline(t *testing.T) {
  321. db := initTrafficTestDB(t)
  322. createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "fresh")
  323. createNodeInbound(t, db, 1, "n1-b", 41002)
  324. svc := &InboundService{}
  325. const email = "fresh"
  326. var ibB model.Inbound
  327. if err := db.Where("tag = ?", "n1-b").First(&ibB).Error; err != nil {
  328. t.Fatalf("load n1-b: %v", err)
  329. }
  330. // Master-side row created when the client was added on the panel, owned by
  331. // n1-b's mirror (e.g. the client form targeted that inbound).
  332. if err := db.Create(&xray.ClientTraffic{InboundId: ibB.Id, Email: email, Enable: true}).Error; err != nil {
  333. t.Fatalf("seed master row: %v", err)
  334. }
  335. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  336. sync := func(up, down int64) {
  337. t.Helper()
  338. snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{
  339. {Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: up, Down: down, Enable: true}}},
  340. {Tag: "n1-b", Settings: `{"clients": []}`},
  341. }}
  342. if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil {
  343. t.Fatalf("sync: %v", err)
  344. }
  345. }
  346. sync(630, 630)
  347. var baselines int64
  348. if err := db.Model(&model.NodeClientTraffic{}).Where("node_id = ? AND email = ?", 1, email).Count(&baselines).Error; err != nil {
  349. t.Fatalf("count baselines: %v", err)
  350. }
  351. if baselines != 1 {
  352. t.Fatalf("baseline must survive the sibling-inbound sweep, found %d rows", baselines)
  353. }
  354. sync(700, 700)
  355. assertUpDown(t, readTraffic(t, db, email), 70, 70, "delta accrues once baseline survives")
  356. }
  357. // TestMultiAttach_SameNode_DivergentSiblings reproduces #5274: a client is
  358. // attached to several inbounds of the SAME node. Xray reports client traffic
  359. // globally per email, so the node's enriched inbound list copies one shared
  360. // counter onto every inbound the client is on. When those copies diverge — a
  361. // legacy per-inbound row surviving the v3.2.x→v3.3.x upgrade, or any drift —
  362. // the per-inbound delta loop used to treat the lower sibling as a node-counter
  363. // reset and re-add its full value, inflating the client far past real usage.
  364. // The merge must collapse the email to its node-wide total and count it once.
  365. func TestMultiAttach_SameNode_DivergentSiblings(t *testing.T) {
  366. db := initTrafficTestDB(t)
  367. createNodeInboundWithClient(t, db, 1, "n1-a", 41001, "multi")
  368. createNodeInboundWithClient(t, db, 1, "n1-b", 41002, "multi")
  369. createNodeInboundWithClient(t, db, 1, "n1-c", 41003, "multi")
  370. svc := &InboundService{}
  371. const email = "multi"
  372. settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
  373. // The three inbounds report the same email with diverging values; the
  374. // node's true per-email total is the largest (the shared global counter).
  375. sync := func(a, b, c int64) {
  376. t.Helper()
  377. snap := &runtime.TrafficSnapshot{Inbounds: []*model.Inbound{
  378. {Tag: "n1-a", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: a, Down: a, Enable: true}}},
  379. {Tag: "n1-b", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: b, Down: b, Enable: true}}},
  380. {Tag: "n1-c", Settings: settings, ClientStats: []xray.ClientTraffic{{Email: email, Up: c, Down: c, Enable: true}}},
  381. }}
  382. if _, err := svc.setRemoteTrafficLocked(1, snap, false); err != nil {
  383. t.Fatalf("sync: %v", err)
  384. }
  385. }
  386. // First-ever sync: row seeded at 0, canon (max of siblings = 100) becomes the
  387. // baseline. The node total is NOT imported as historical traffic — this prevents
  388. // ghost data from a previously-deleted account being counted as real usage.
  389. sync(100, 50, 80)
  390. assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync seeds at 0 — not the sum (230) nor the max (100)")
  391. // Second sync: canon grew from 100→150, delta=50 accrues. Siblings 60 and 90
  392. // do not re-add their full values — the canon clamp prevents inflation.
  393. sync(150, 60, 90)
  394. assertUpDown(t, readTraffic(t, db, email), 50, 50, "second sync: only the 50-unit increment counts, not every sibling")
  395. // Equal siblings (the healthy current-schema case): canon grew from 150→200,
  396. // delta=50 accrues once.
  397. sync(200, 200, 200)
  398. assertUpDown(t, readTraffic(t, db, email), 100, 100, "equal siblings accrue the single increment")
  399. }
  400. func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
  401. db := initTrafficTestDB(t)
  402. svc := &InboundService{}
  403. const email = "gone"
  404. if err := db.Create(&xray.ClientTraffic{InboundId: 1, Email: email, Enable: true}).Error; err != nil {
  405. t.Fatalf("seed client_traffics: %v", err)
  406. }
  407. if err := db.Create(&model.NodeClientTraffic{NodeId: 1, Email: email, Up: 10, Down: 10}).Error; err != nil {
  408. t.Fatalf("seed node baseline 1: %v", err)
  409. }
  410. if err := db.Create(&model.NodeClientTraffic{NodeId: 2, Email: email, Up: 20, Down: 20}).Error; err != nil {
  411. t.Fatalf("seed node baseline 2: %v", err)
  412. }
  413. if err := svc.DelClientStat(db, email); err != nil {
  414. t.Fatalf("DelClientStat: %v", err)
  415. }
  416. var cnt int64
  417. if err := db.Model(&model.NodeClientTraffic{}).Where("email = ?", email).Count(&cnt).Error; err != nil {
  418. t.Fatalf("count baselines: %v", err)
  419. }
  420. if cnt != 0 {
  421. t.Errorf("expected node baselines cleaned, found %d", cnt)
  422. }
  423. }
  424. func TestNodeDelete_CleansNodeBaselines(t *testing.T) {
  425. db := initTrafficTestDB(t)
  426. nodeSvc := NodeService{}
  427. if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "a", Up: 1, Down: 1}).Error; err != nil {
  428. t.Fatalf("seed node 7 a: %v", err)
  429. }
  430. if err := db.Create(&model.NodeClientTraffic{NodeId: 7, Email: "b", Up: 2, Down: 2}).Error; err != nil {
  431. t.Fatalf("seed node 7 b: %v", err)
  432. }
  433. if err := db.Create(&model.NodeClientTraffic{NodeId: 8, Email: "c", Up: 3, Down: 3}).Error; err != nil {
  434. t.Fatalf("seed node 8 c: %v", err)
  435. }
  436. if err := nodeSvc.Delete(7); err != nil {
  437. t.Fatalf("NodeService.Delete(7): %v", err)
  438. }
  439. var sevenCnt, eightCnt int64
  440. db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 7).Count(&sevenCnt)
  441. db.Model(&model.NodeClientTraffic{}).Where("node_id = ?", 8).Count(&eightCnt)
  442. if sevenCnt != 0 {
  443. t.Errorf("node 7 baselines not cleaned: %d remain", sevenCnt)
  444. }
  445. if eightCnt != 1 {
  446. t.Errorf("node 8 baseline should survive, found %d", eightCnt)
  447. }
  448. }