1
0

check_client_ip_scale_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package job
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/op/go-logging"
  12. "gorm.io/gorm"
  13. "github.com/mhsanaei/3x-ui/v3/internal/config"
  14. "github.com/mhsanaei/3x-ui/v3/internal/database"
  15. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  16. xuilogger "github.com/mhsanaei/3x-ui/v3/internal/logger"
  17. )
  18. // setupScaleJobDB mirrors the service package's scale gating: Postgres via
  19. // XUI_DB_TYPE/XUI_DB_DSN, SQLite via XUI_SCALE_TEST=1, skip otherwise.
  20. func setupScaleJobDB(t *testing.T) {
  21. t.Helper()
  22. loggerInitOnce.Do(func() { xuilogger.InitLogger(logging.ERROR) })
  23. t.Setenv("XUI_LOG_FOLDER", t.TempDir())
  24. if os.Getenv("XUI_DB_TYPE") == "postgres" && strings.TrimSpace(os.Getenv("XUI_DB_DSN")) != "" {
  25. if err := database.InitDB(""); err != nil {
  26. t.Fatalf("InitDB(postgres): %v", err)
  27. }
  28. t.Cleanup(func() { _ = database.CloseDB() })
  29. return
  30. }
  31. switch strings.ToLower(strings.TrimSpace(os.Getenv("XUI_SCALE_TEST"))) {
  32. case "1", "true", "yes":
  33. if err := database.InitDB(filepath.Join(t.TempDir(), "scale.db")); err != nil {
  34. t.Fatalf("InitDB(sqlite): %v", err)
  35. }
  36. t.Cleanup(func() { _ = database.CloseDB() })
  37. return
  38. }
  39. t.Skip("set XUI_SCALE_TEST=1 (sqlite) or XUI_DB_TYPE=postgres + XUI_DB_DSN (postgres) to run the scale benchmark")
  40. }
  41. func scaleJobSizes(t *testing.T, def ...int) []int {
  42. t.Helper()
  43. raw := strings.TrimSpace(os.Getenv("XUI_SCALE_SIZES"))
  44. if raw == "" {
  45. return def
  46. }
  47. var out []int
  48. for _, part := range strings.Split(raw, ",") {
  49. part = strings.TrimSpace(part)
  50. if part == "" {
  51. continue
  52. }
  53. n, err := strconv.Atoi(part)
  54. if err != nil || n <= 0 {
  55. t.Fatalf("XUI_SCALE_SIZES: invalid size %q", part)
  56. }
  57. out = append(out, n)
  58. }
  59. if len(out) == 0 {
  60. return def
  61. }
  62. return out
  63. }
  64. func resetScaleJobTables(t *testing.T, db *gorm.DB) {
  65. t.Helper()
  66. if config.GetDBKind() == "postgres" {
  67. if err := db.Exec("TRUNCATE TABLE inbounds, clients, client_inbounds RESTART IDENTITY CASCADE").Error; err != nil {
  68. t.Fatalf("truncate: %v", err)
  69. }
  70. } else {
  71. for _, tbl := range []string{"inbounds", "clients", "client_inbounds"} {
  72. if err := db.Exec("DELETE FROM " + tbl).Error; err != nil {
  73. t.Fatalf("delete %s: %v", tbl, err)
  74. }
  75. }
  76. db.Exec("DELETE FROM sqlite_sequence")
  77. }
  78. if err := db.Where("1 = 1").Delete(&model.InboundClientIps{}).Error; err != nil {
  79. t.Fatalf("clear inbound client ips: %v", err)
  80. }
  81. if err := db.Where("1 = 1").Delete(&model.NodeClientIp{}).Error; err != nil {
  82. t.Fatalf("clear node client ips: %v", err)
  83. }
  84. }
  85. // seedScaleIPDataset seeds n clients across numInbounds inbounds. Every client
  86. // in the LAST inbound carries limitIp=3 (and 0 elsewhere), so hasLimitIp pays
  87. // its full scan cost before finding a hit, and the returned emails all resolve
  88. // to that last inbound for the processObserved measurement.
  89. func seedScaleIPDataset(t *testing.T, n, numInbounds int) []string {
  90. t.Helper()
  91. db := database.GetDB()
  92. resetScaleJobTables(t, db)
  93. tx := db.Begin()
  94. if tx.Error != nil {
  95. t.Fatalf("begin seed tx: %v", tx.Error)
  96. }
  97. committed := false
  98. defer func() {
  99. if !committed {
  100. tx.Rollback()
  101. }
  102. }()
  103. var limitedEmails []string
  104. per := n / numInbounds
  105. for i := range numInbounds {
  106. lo, hi := i*per, (i+1)*per
  107. if i == numInbounds-1 {
  108. hi = n
  109. }
  110. limitIp := 0
  111. if i == numInbounds-1 {
  112. limitIp = 3
  113. }
  114. clients := make([]model.Client, 0, hi-lo)
  115. records := make([]*model.ClientRecord, 0, hi-lo)
  116. for j := lo; j < hi; j++ {
  117. email := fmt.Sprintf("user-%07d@ipscale", j)
  118. clients = append(clients, model.Client{Email: email, LimitIP: limitIp, Enable: true})
  119. records = append(records, &model.ClientRecord{Email: email, LimitIP: limitIp, Enable: true})
  120. if limitIp > 0 {
  121. limitedEmails = append(limitedEmails, email)
  122. }
  123. }
  124. settings, err := json.Marshal(map[string][]model.Client{"clients": clients})
  125. if err != nil {
  126. t.Fatalf("marshal settings: %v", err)
  127. }
  128. ib := &model.Inbound{
  129. UserId: 1,
  130. Tag: fmt.Sprintf("ipscale-%d-%d", n, i),
  131. Enable: true,
  132. Port: 42000 + i,
  133. Protocol: model.VLESS,
  134. Settings: string(settings),
  135. }
  136. if err := tx.Create(ib).Error; err != nil {
  137. t.Fatalf("seed inbound %d: %v", i, err)
  138. }
  139. if err := tx.CreateInBatches(records, 500).Error; err != nil {
  140. t.Fatalf("seed clients %d: %v", i, err)
  141. }
  142. links := make([]model.ClientInbound, len(records))
  143. for j := range records {
  144. links[j] = model.ClientInbound{ClientId: records[j].Id, InboundId: ib.Id}
  145. }
  146. if err := tx.CreateInBatches(links, 1000).Error; err != nil {
  147. t.Fatalf("seed client_inbounds %d: %v", i, err)
  148. }
  149. }
  150. if err := tx.Commit().Error; err != nil {
  151. t.Fatalf("commit seed tx: %v", err)
  152. }
  153. committed = true
  154. db.Exec("ANALYZE")
  155. return limitedEmails
  156. }
  157. // TestCheckClientIpScale measures the @every 10s ip-limit job pieces: the
  158. // hasLimitIp gate (settings LIKE scan + full JSON parse of every matching
  159. // inbound) and processObserved with M online users (per-email inbound lookup,
  160. // settings parse and autocommit save). Run twice: first scan half add / half
  161. // update, second scan all update path.
  162. func TestCheckClientIpScale(t *testing.T) {
  163. shapes := []struct {
  164. name string
  165. inbounds int
  166. observed int
  167. }{{"single", 1, 50}, {"spread50", 50, 1000}}
  168. for _, n := range scaleJobSizes(t, 10000, 100000) {
  169. for _, shape := range shapes {
  170. t.Run(fmt.Sprintf("N=%d_%s", n, shape.name), func(t *testing.T) {
  171. setupScaleJobDB(t)
  172. limited := seedScaleIPDataset(t, n, shape.inbounds)
  173. m := min(shape.observed, len(limited))
  174. j := NewCheckClientIpJob()
  175. const reps = 3
  176. start := time.Now()
  177. for range reps {
  178. if !j.hasLimitIp() {
  179. t.Fatal("hasLimitIp = false, want true")
  180. }
  181. }
  182. t.Logf("N=%-7d shape=%-8s hasLimitIp=%v/call", n, shape.name, (time.Since(start) / reps).Round(time.Millisecond))
  183. now := time.Now().Unix()
  184. observed := make(map[string]map[string]int64, m)
  185. for i := range m {
  186. observed[limited[i]] = map[string]int64{
  187. fmt.Sprintf("10.0.%d.%d", i/250, i%250+1): now,
  188. }
  189. }
  190. for i := range m / 2 {
  191. seedClientIps(t, limited[i], []IPWithTimestamp{{IP: "10.99.0.1", Timestamp: now - 60}})
  192. }
  193. start = time.Now()
  194. j.processObserved(observed, true, true)
  195. firstScan := time.Since(start)
  196. start = time.Now()
  197. j.processObserved(observed, true, true)
  198. secondScan := time.Since(start)
  199. t.Logf("N=%-7d shape=%-8s processObserved M=%-5d first=%-10v second=%-10v (%.1fms/email)",
  200. n, shape.name, m, firstScan.Round(time.Millisecond), secondScan.Round(time.Millisecond),
  201. float64(secondScan.Milliseconds())/float64(m))
  202. var rows int64
  203. if err := database.GetDB().Model(&model.InboundClientIps{}).Count(&rows).Error; err != nil {
  204. t.Fatalf("count ip rows: %v", err)
  205. }
  206. if rows != int64(m) {
  207. t.Fatalf("inbound_client_ips rows = %d, want %d", rows, m)
  208. }
  209. })
  210. }
  211. }
  212. }