sync_scale_postgres_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "testing"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "gorm.io/gorm"
  12. )
  13. func syncInboundOld(tx *gorm.DB, inboundId int, clients []model.Client) error {
  14. if tx == nil {
  15. tx = database.GetDB()
  16. }
  17. if err := tx.Where("inbound_id = ?", inboundId).Delete(&model.ClientInbound{}).Error; err != nil {
  18. return err
  19. }
  20. for i := range clients {
  21. c := clients[i]
  22. email := strings.TrimSpace(c.Email)
  23. if email == "" {
  24. continue
  25. }
  26. incoming := c.ToRecord()
  27. row := &model.ClientRecord{}
  28. err := tx.Where("email = ?", email).First(row).Error
  29. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  30. return err
  31. }
  32. if errors.Is(err, gorm.ErrRecordNotFound) {
  33. if err := tx.Create(incoming).Error; err != nil {
  34. return err
  35. }
  36. row = incoming
  37. } else {
  38. row.Flow = incoming.Flow
  39. row.SubID = incoming.SubID
  40. row.LimitIP = incoming.LimitIP
  41. row.TotalGB = incoming.TotalGB
  42. row.ExpiryTime = incoming.ExpiryTime
  43. row.Enable = incoming.Enable
  44. row.TgID = incoming.TgID
  45. row.Comment = incoming.Comment
  46. row.Reset = incoming.Reset
  47. preservedUpdatedAt := max(incoming.UpdatedAt, row.UpdatedAt)
  48. row.UpdatedAt = preservedUpdatedAt
  49. if err := tx.Save(row).Error; err != nil {
  50. return err
  51. }
  52. if err := tx.Model(&model.ClientRecord{}).
  53. Where("id = ?", row.Id).
  54. UpdateColumn("updated_at", preservedUpdatedAt).Error; err != nil {
  55. return err
  56. }
  57. }
  58. link := model.ClientInbound{ClientId: row.Id, InboundId: inboundId, FlowOverride: c.Flow}
  59. if err := tx.Create(&link).Error; err != nil {
  60. return err
  61. }
  62. }
  63. return nil
  64. }
  65. func makeScaleClients(n int) []model.Client {
  66. out := make([]model.Client, n)
  67. for i := range n {
  68. out[i] = model.Client{
  69. ID: uuid.NewString(),
  70. Email: fmt.Sprintf("user-%07d@scale", i),
  71. SubID: fmt.Sprintf("sub-%07d", i),
  72. Enable: true,
  73. }
  74. }
  75. return out
  76. }
  77. func TestSyncInboundPostgresScale(t *testing.T) {
  78. setupScaleDB(t)
  79. svc := &ClientService{}
  80. sizes := []int{5000, 10000, 20000, 50000, 100000, 200000}
  81. for _, n := range sizes {
  82. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  83. db := database.GetDB()
  84. resetScaleTables(t, db, "inbounds", "clients", "client_inbounds")
  85. clients := makeScaleClients(n)
  86. ib := &model.Inbound{
  87. Tag: fmt.Sprintf("scale-%d", n),
  88. Enable: true,
  89. Port: 40000,
  90. Protocol: model.VLESS,
  91. Settings: clientsSettings(t, clients),
  92. }
  93. if err := db.Create(ib).Error; err != nil {
  94. t.Fatalf("create inbound: %v", err)
  95. }
  96. start := time.Now()
  97. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  98. t.Fatalf("seed SyncInbound: %v", err)
  99. }
  100. seed := time.Since(start)
  101. clients[n/2].Enable = !clients[n/2].Enable
  102. start = time.Now()
  103. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  104. t.Fatalf("toggle SyncInbound (new): %v", err)
  105. }
  106. toggleNew := time.Since(start)
  107. start = time.Now()
  108. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  109. t.Fatalf("noop SyncInbound (new): %v", err)
  110. }
  111. noopNew := time.Since(start)
  112. toggleOld := time.Duration(0)
  113. if n <= 10000 {
  114. clients[n/2].Enable = !clients[n/2].Enable
  115. start = time.Now()
  116. if err := syncInboundOld(db, ib.Id, clients); err != nil {
  117. t.Fatalf("toggle SyncInbound (old): %v", err)
  118. }
  119. toggleOld = time.Since(start)
  120. }
  121. var linkCount, recCount int64
  122. db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount)
  123. db.Model(&model.ClientRecord{}).Count(&recCount)
  124. if int(linkCount) != n || int(recCount) != n {
  125. t.Fatalf("row mismatch: links=%d records=%d want %d", linkCount, recCount, n)
  126. }
  127. oldStr, speedup := "skipped", ""
  128. if toggleOld > 0 {
  129. oldStr = toggleOld.Round(time.Millisecond).String()
  130. speedup = fmt.Sprintf(" speedup=%.0fx", float64(toggleOld)/float64(maxDur(toggleNew, time.Millisecond)))
  131. }
  132. t.Logf("N=%-7d seed=%-10v toggle_new=%-10v noop_new=%-10v toggle_old=%-10s%s",
  133. n, seed.Round(time.Millisecond), toggleNew.Round(time.Millisecond),
  134. noopNew.Round(time.Millisecond), oldStr, speedup)
  135. })
  136. }
  137. }
  138. func maxDur(d, floor time.Duration) time.Duration {
  139. if d < floor {
  140. return floor
  141. }
  142. return d
  143. }
  144. func TestAddDelClientPostgresScale(t *testing.T) {
  145. setupScaleDB(t)
  146. svc := &ClientService{}
  147. inboundSvc := &InboundService{}
  148. sizes := []int{5000, 20000, 50000, 100000, 200000}
  149. for _, n := range sizes {
  150. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  151. db := database.GetDB()
  152. resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
  153. clients := makeScaleClients(n)
  154. ib := &model.Inbound{
  155. Tag: fmt.Sprintf("adddel-%d", n),
  156. Enable: true,
  157. Port: 40000,
  158. Protocol: model.VLESS,
  159. Settings: clientsSettings(t, clients),
  160. }
  161. if err := db.Create(ib).Error; err != nil {
  162. t.Fatalf("create inbound: %v", err)
  163. }
  164. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  165. t.Fatalf("seed SyncInbound: %v", err)
  166. }
  167. newC := model.Client{
  168. ID: uuid.NewString(),
  169. Email: "added-client@scale",
  170. SubID: "added-sub",
  171. Enable: true,
  172. }
  173. addData := &model.Inbound{Id: ib.Id, Protocol: model.VLESS, Settings: clientsSettings(t, []model.Client{newC})}
  174. start := time.Now()
  175. if _, err := svc.AddInboundClient(inboundSvc, addData); err != nil {
  176. t.Fatalf("AddInboundClient: %v", err)
  177. }
  178. addDur := time.Since(start)
  179. delEmail := clients[n/2].Email
  180. start = time.Now()
  181. if _, err := svc.DelInboundClientByEmail(inboundSvc, ib.Id, delEmail, false); err != nil {
  182. t.Fatalf("DelInboundClientByEmail: %v", err)
  183. }
  184. delDur := time.Since(start)
  185. var recCount, linkCount int64
  186. db.Model(&model.ClientRecord{}).Count(&recCount)
  187. db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount)
  188. t.Logf("N=%-7d add=%-10v del=%-10v records=%d links=%d", n,
  189. addDur.Round(time.Millisecond), delDur.Round(time.Millisecond), recCount, linkCount)
  190. })
  191. }
  192. }
  193. func TestGroupAndListPostgresScale(t *testing.T) {
  194. setupScaleDB(t)
  195. svc := &ClientService{}
  196. sizes := []int{5000, 100000}
  197. for _, n := range sizes {
  198. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  199. db := database.GetDB()
  200. resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
  201. clients := makeScaleClients(n)
  202. ib := &model.Inbound{Tag: fmt.Sprintf("grp-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
  203. if err := db.Create(ib).Error; err != nil {
  204. t.Fatalf("create inbound: %v", err)
  205. }
  206. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  207. t.Fatalf("seed SyncInbound: %v", err)
  208. }
  209. db.Exec("ANALYZE")
  210. emails := make([]string, n)
  211. for i := range n {
  212. emails[i] = clients[i].Email
  213. }
  214. start := time.Now()
  215. if _, err := svc.AddToGroup(emails, "benchgroup"); err != nil {
  216. t.Fatalf("AddToGroup: %v", err)
  217. }
  218. addDur := time.Since(start)
  219. start = time.Now()
  220. if _, err := svc.RemoveFromGroup(emails); err != nil {
  221. t.Fatalf("RemoveFromGroup: %v", err)
  222. }
  223. rmDur := time.Since(start)
  224. start = time.Now()
  225. list, err := svc.List()
  226. if err != nil {
  227. t.Fatalf("List: %v", err)
  228. }
  229. listDur := time.Since(start)
  230. if len(list) != n {
  231. t.Fatalf("List returned %d, want %d", len(list), n)
  232. }
  233. t.Logf("N=%-7d bulkAdd=%-9v bulkRemove=%-9v list=%-9v", n,
  234. addDur.Round(time.Millisecond), rmDur.Round(time.Millisecond), listDur.Round(time.Millisecond))
  235. })
  236. }
  237. }
  238. func TestDelAllClientsPostgresScale(t *testing.T) {
  239. setupScaleDB(t)
  240. svc := &ClientService{}
  241. inboundSvc := &InboundService{}
  242. sizes := []int{5000, 50000, 100000}
  243. for _, n := range sizes {
  244. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  245. db := database.GetDB()
  246. resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
  247. clients := makeScaleClients(n)
  248. ib := &model.Inbound{Tag: fmt.Sprintf("delall-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
  249. if err := db.Create(ib).Error; err != nil {
  250. t.Fatalf("create inbound: %v", err)
  251. }
  252. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  253. t.Fatalf("seed SyncInbound: %v", err)
  254. }
  255. emails, err := inboundSvc.EmailsByInbound(ib.Id)
  256. if err != nil {
  257. t.Fatalf("EmailsByInbound: %v", err)
  258. }
  259. start := time.Now()
  260. res, _, err := svc.BulkDelete(inboundSvc, emails, false)
  261. if err != nil {
  262. t.Fatalf("BulkDelete: %v", err)
  263. }
  264. dur := time.Since(start)
  265. var recCount, linkCount int64
  266. db.Model(&model.ClientRecord{}).Count(&recCount)
  267. db.Model(&model.ClientInbound{}).Where("inbound_id = ?", ib.Id).Count(&linkCount)
  268. if recCount != 0 || linkCount != 0 {
  269. t.Fatalf("after delAll: records=%d links=%d want 0/0", recCount, linkCount)
  270. }
  271. t.Logf("N=%-7d delAllClients=%-10v deleted=%d", n, dur.Round(time.Millisecond), res.Deleted)
  272. })
  273. }
  274. }
  275. func TestBulkOpsPostgresScale(t *testing.T) {
  276. setupScaleDB(t)
  277. svc := &ClientService{}
  278. inboundSvc := &InboundService{}
  279. sizes := []int{5000, 20000, 50000, 100000}
  280. const m = 2000
  281. for _, n := range sizes {
  282. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  283. db := database.GetDB()
  284. resetScaleTables(t, db, "inbounds", "clients", "client_inbounds", "client_traffics")
  285. clients := makeScaleClients(n)
  286. exp := time.Now().AddDate(1, 0, 0).UnixMilli()
  287. for i := range clients {
  288. clients[i].ExpiryTime = exp
  289. clients[i].TotalGB = 100 << 30
  290. }
  291. ib := &model.Inbound{Tag: fmt.Sprintf("bulk-%d", n), Enable: true, Port: 40000, Protocol: model.VLESS, Settings: clientsSettings(t, clients)}
  292. if err := db.Create(ib).Error; err != nil {
  293. t.Fatalf("create inbound: %v", err)
  294. }
  295. if err := svc.SyncInbound(nil, ib.Id, clients); err != nil {
  296. t.Fatalf("seed SyncInbound: %v", err)
  297. }
  298. ib2 := &model.Inbound{Tag: fmt.Sprintf("bulk2-%d", n), Enable: true, Port: 40001, Protocol: model.VLESS, Settings: `{"clients":[]}`}
  299. if err := db.Create(ib2).Error; err != nil {
  300. t.Fatalf("create inbound2: %v", err)
  301. }
  302. emailsM := make([]string, m)
  303. for i := range m {
  304. emailsM[i] = clients[i].Email
  305. }
  306. t0 := time.Now()
  307. if _, _, err := svc.BulkAdjust(inboundSvc, emailsM, 7, 1<<30); err != nil {
  308. t.Fatalf("BulkAdjust: %v", err)
  309. }
  310. adjustDur := time.Since(t0)
  311. t0 = time.Now()
  312. if _, _, err := svc.BulkAttach(inboundSvc, emailsM, []int{ib2.Id}); err != nil {
  313. t.Fatalf("BulkAttach: %v", err)
  314. }
  315. attachDur := time.Since(t0)
  316. t0 = time.Now()
  317. if _, _, err := svc.BulkDetach(inboundSvc, emailsM, []int{ib2.Id}); err != nil {
  318. t.Fatalf("BulkDetach: %v", err)
  319. }
  320. detachDur := time.Since(t0)
  321. payloads := make([]ClientCreatePayload, m)
  322. for i := range m {
  323. payloads[i] = ClientCreatePayload{
  324. Client: model.Client{ID: uuid.NewString(), Email: fmt.Sprintf("bulknew-%07d@scale", i), SubID: fmt.Sprintf("bnsub-%07d", i), Enable: true},
  325. InboundIds: []int{ib.Id},
  326. }
  327. }
  328. t0 = time.Now()
  329. if _, _, err := svc.BulkCreate(inboundSvc, payloads); err != nil {
  330. t.Fatalf("BulkCreate: %v", err)
  331. }
  332. createDur := time.Since(t0)
  333. t0 = time.Now()
  334. if _, _, err := svc.BulkDelete(inboundSvc, emailsM, false); err != nil {
  335. t.Fatalf("BulkDelete: %v", err)
  336. }
  337. deleteDur := time.Since(t0)
  338. t.Logf("N=%-6d M=%d adjust=%-9v attach=%-9v detach=%-9v create=%-9v delete=%-9v", n, m,
  339. adjustDur.Round(time.Millisecond), attachDur.Round(time.Millisecond), detachDur.Round(time.Millisecond),
  340. createDur.Round(time.Millisecond), deleteDur.Round(time.Millisecond))
  341. })
  342. }
  343. }