inbound_traffic_global.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package service
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/mhsanaei/3x-ui/v3/internal/database"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  8. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  9. "gorm.io/gorm"
  10. "gorm.io/gorm/clause"
  11. )
  12. // AcceptGlobalTraffic ingests a master panel's aggregated per-client usage
  13. // into client_global_traffics, keyed by (masterGuid, email). The numbers are
  14. // display/enforcement inputs only — client_traffics keeps this panel's
  15. // local-only counters, so the pushing master's (and any other master's)
  16. // delta accounting over our snapshot stays correct.
  17. //
  18. // Rows are overwritten, not max-merged: a reset on the master propagates here
  19. // on its next push. Emails this panel doesn't host are dropped.
  20. func (s *InboundService) AcceptGlobalTraffic(masterGuid string, traffics []*xray.ClientTraffic) error {
  21. masterGuid = strings.TrimSpace(masterGuid)
  22. if masterGuid == "" {
  23. return nil
  24. }
  25. emails := make([]string, 0, len(traffics))
  26. byEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  27. for _, t := range traffics {
  28. if t == nil || t.Email == "" {
  29. continue
  30. }
  31. if _, dup := byEmail[t.Email]; !dup {
  32. emails = append(emails, t.Email)
  33. }
  34. byEmail[t.Email] = t
  35. }
  36. if len(emails) == 0 {
  37. return nil
  38. }
  39. return submitTrafficWrite(func() error {
  40. db := database.GetDB()
  41. known := make([]string, 0, len(emails))
  42. for _, batch := range chunkStrings(emails, sqlInChunk) {
  43. var page []string
  44. if err := db.Model(xray.ClientTraffic{}).
  45. Where("email IN ?", batch).
  46. Pluck("email", &page).Error; err != nil {
  47. return err
  48. }
  49. known = append(known, page...)
  50. }
  51. if len(known) == 0 {
  52. return nil
  53. }
  54. now := time.Now().UnixMilli()
  55. rows := make([]model.ClientGlobalTraffic, 0, len(known))
  56. for _, email := range known {
  57. t := byEmail[email]
  58. if t == nil {
  59. continue
  60. }
  61. rows = append(rows, model.ClientGlobalTraffic{
  62. MasterGuid: masterGuid,
  63. Email: email,
  64. Up: t.Up,
  65. Down: t.Down,
  66. UpdatedAt: now,
  67. })
  68. }
  69. return db.Transaction(func(tx *gorm.DB) error {
  70. for _, batch := range chunkGlobalRows(rows, 200) {
  71. if err := tx.Clauses(clause.OnConflict{
  72. Columns: []clause.Column{{Name: "master_guid"}, {Name: "email"}},
  73. DoUpdates: clause.AssignmentColumns([]string{"up", "down", "updated_at"}),
  74. }).Create(&batch).Error; err != nil {
  75. return err
  76. }
  77. }
  78. return nil
  79. })
  80. })
  81. }
  82. func chunkGlobalRows(rows []model.ClientGlobalTraffic, size int) [][]model.ClientGlobalTraffic {
  83. if len(rows) == 0 {
  84. return nil
  85. }
  86. out := make([][]model.ClientGlobalTraffic, 0, (len(rows)+size-1)/size)
  87. for start := 0; start < len(rows); start += size {
  88. end := min(start+size, len(rows))
  89. out = append(out, rows[start:end])
  90. }
  91. return out
  92. }
  93. // overlayGlobalTraffic raises Up/Down on the given rows to the largest global
  94. // value any master pushed for that email. Read-path only — callers hand it
  95. // rows about to be serialized for display; the stored counters are untouched.
  96. func overlayGlobalTraffic(db *gorm.DB, rows []*xray.ClientTraffic) {
  97. if len(rows) == 0 {
  98. return
  99. }
  100. // Cheap short-circuit for the common case (a panel no master pushes to).
  101. var probe int64
  102. if err := db.Model(&model.ClientGlobalTraffic{}).Limit(1).Count(&probe).Error; err != nil || probe == 0 {
  103. return
  104. }
  105. emails := make([]string, 0, len(rows))
  106. byEmail := make(map[string][]*xray.ClientTraffic, len(rows))
  107. for _, r := range rows {
  108. if r == nil || r.Email == "" {
  109. continue
  110. }
  111. key := strings.ToLower(r.Email)
  112. if _, ok := byEmail[key]; !ok {
  113. emails = append(emails, r.Email)
  114. }
  115. byEmail[key] = append(byEmail[key], r)
  116. }
  117. for _, batch := range chunkStrings(emails, sqlInChunk) {
  118. var globals []model.ClientGlobalTraffic
  119. if err := db.Where("email IN ?", batch).Find(&globals).Error; err != nil {
  120. logger.Warning("overlayGlobalTraffic:", err)
  121. return
  122. }
  123. for i := range globals {
  124. for _, r := range byEmail[strings.ToLower(globals[i].Email)] {
  125. if globals[i].Up > r.Up {
  126. r.Up = globals[i].Up
  127. }
  128. if globals[i].Down > r.Down {
  129. r.Down = globals[i].Down
  130. }
  131. }
  132. }
  133. }
  134. }
  135. // overlayGlobalTrafficValues is overlayGlobalTraffic for value slices.
  136. func overlayGlobalTrafficValues(db *gorm.DB, rows []xray.ClientTraffic) {
  137. if len(rows) == 0 {
  138. return
  139. }
  140. ptrs := make([]*xray.ClientTraffic, 0, len(rows))
  141. for i := range rows {
  142. ptrs = append(ptrs, &rows[i])
  143. }
  144. overlayGlobalTraffic(db, ptrs)
  145. }
  146. // GetNodeClientTraffics returns this panel's aggregated traffic rows for the
  147. // clients known to live on the given node (those with a delta baseline) —
  148. // the payload for Remote.PushGlobalClientTraffics. The rows carry the global
  149. // overlay so a mid-chain panel forwards the widest view it has seen, not just
  150. // its own aggregate.
  151. func (s *InboundService) GetNodeClientTraffics(nodeID int) ([]*xray.ClientTraffic, error) {
  152. db := database.GetDB()
  153. var emails []string
  154. if err := db.Model(&model.NodeClientTraffic{}).
  155. Where("node_id = ?", nodeID).
  156. Pluck("email", &emails).Error; err != nil {
  157. return nil, err
  158. }
  159. if len(emails) == 0 {
  160. return nil, nil
  161. }
  162. out := make([]*xray.ClientTraffic, 0, len(emails))
  163. for _, batch := range chunkStrings(emails, sqlInChunk) {
  164. var page []*xray.ClientTraffic
  165. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  166. return nil, err
  167. }
  168. out = append(out, page...)
  169. }
  170. overlayGlobalTraffic(db, out)
  171. return out, nil
  172. }
  173. // overlayInboundsClientStats applies the global overlay to every preloaded
  174. // ClientStats row across the given inbounds. UI read paths only — never the
  175. // full /panel/api/inbounds/list payload, which doubles as the traffic
  176. // snapshot masters poll: overlaying that would leak pushed globals back into
  177. // the masters' delta accounting.
  178. func (s *InboundService) overlayInboundsClientStats(db *gorm.DB, inbounds []*model.Inbound) {
  179. rows := make([]*xray.ClientTraffic, 0)
  180. for _, ib := range inbounds {
  181. for j := range ib.ClientStats {
  182. rows = append(rows, &ib.ClientStats[j])
  183. }
  184. }
  185. overlayGlobalTraffic(db, rows)
  186. }
  187. // clearGlobalTraffic drops every master's pushed rows for the given emails.
  188. // Used by client deletion and traffic-reset flows: after a node-local reset
  189. // the next master push restores the master's (authoritative) numbers, and
  190. // after a master-side reset that push carries the reset values anyway.
  191. func clearGlobalTraffic(tx *gorm.DB, emails ...string) error {
  192. if len(emails) == 0 {
  193. return nil
  194. }
  195. for _, batch := range chunkStrings(emails, sqlInChunk) {
  196. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  197. return err
  198. }
  199. }
  200. return nil
  201. }