client_traffic_sync_job.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package job
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. "x-ui/database"
  7. "x-ui/database/model"
  8. "x-ui/logger"
  9. "x-ui/xray"
  10. "gorm.io/gorm"
  11. )
  12. type SyncClientTrafficJob struct {
  13. subClientsCollection map[string][]string
  14. }
  15. func NewClientTrafficSyncJob() *SyncClientTrafficJob {
  16. return new(SyncClientTrafficJob)
  17. }
  18. func (j *SyncClientTrafficJob) Run() {
  19. // Step 1: Group clients by SubID
  20. subClientsCollection := j.collectClientsGroupedBySubId()
  21. // Step 2: Sync client traffics for each SubID group
  22. for subId, emails := range subClientsCollection {
  23. err := j.syncClientTraffics(map[string][]string{subId: emails})
  24. if err != nil {
  25. logger.Error("Failed to sync traffics for SubID ", subId, ": ", err)
  26. }
  27. }
  28. }
  29. // collectClientsGroupedBySubId groups clients by their SubIDs
  30. func (j *SyncClientTrafficJob) collectClientsGroupedBySubId() map[string][]string {
  31. db := database.GetDB()
  32. result := make(map[string][]string)
  33. // Fetch all inbounds
  34. var inbounds []*model.Inbound
  35. if err := db.Model(&model.Inbound{}).Find(&inbounds).Error; err != nil {
  36. logger.Error("Error fetching inbounds: ", err)
  37. return result // Return empty map on error
  38. }
  39. // Process each inbound
  40. for _, inbound := range inbounds {
  41. if inbound.Settings == "" {
  42. continue
  43. }
  44. settingsMap, err := parseSettings(inbound.Settings, uint(inbound.Id))
  45. if err != nil {
  46. logger.Error(err)
  47. continue
  48. }
  49. clients, ok := settingsMap["clients"].([]interface{})
  50. if !ok {
  51. continue
  52. }
  53. processClients(clients, result)
  54. }
  55. // Remove SubIDs with one or fewer emails
  56. filterSingleEmailSubIDs(result)
  57. return result
  58. }
  59. // parseSettings unmarshals the JSON settings and returns it as a map
  60. func parseSettings(settings string, inboundID uint) (map[string]interface{}, error) {
  61. if !json.Valid([]byte(settings)) {
  62. return nil, fmt.Errorf("Invalid JSON format in Settings for inbound ID %d", inboundID)
  63. }
  64. var tempData map[string]interface{}
  65. if err := json.Unmarshal([]byte(settings), &tempData); err != nil {
  66. return nil, fmt.Errorf("Error unmarshalling settings for inbound ID %d: %v", inboundID, err)
  67. }
  68. return tempData, nil
  69. }
  70. // processClients extracts SubID and email from the clients and populates the result map
  71. func processClients(clients []interface{}, result map[string][]string) {
  72. for _, client := range clients {
  73. clientMap, ok := client.(map[string]interface{})
  74. if !ok {
  75. continue
  76. }
  77. subId, ok := clientMap["subId"].(string)
  78. if !ok || subId == "" {
  79. continue
  80. }
  81. email, ok := clientMap["email"].(string)
  82. if !ok || email == "" {
  83. continue
  84. }
  85. result[subId] = append(result[subId], email)
  86. }
  87. }
  88. // filterSingleEmailSubIDs removes SubIDs with one or fewer emails from the result map
  89. func filterSingleEmailSubIDs(result map[string][]string) {
  90. for subId, emails := range result {
  91. if len(emails) <= 1 {
  92. delete(result, subId)
  93. }
  94. }
  95. }
  96. // syncClientTraffics synchronizes traffic data for each SubID group
  97. func (j *SyncClientTrafficJob) syncClientTraffics(result map[string][]string) error {
  98. for subId, emails := range result {
  99. db := database.GetDB()
  100. // Step 1: Calculate maxUp and maxDown (outside transaction)
  101. var maxUp, maxDown int64
  102. err := calculateMaxTraffic(db, emails, &maxUp, &maxDown)
  103. if err != nil {
  104. logger.Error("Failed to calculate max traffic for SubID ", subId, ": ", err)
  105. continue
  106. }
  107. // Step 2: Update traffic data with retry mechanism
  108. err = retryOperation(func() error {
  109. return updateTraffic(db, emails, maxUp, maxDown)
  110. }, 5, 100*time.Millisecond)
  111. if err != nil {
  112. logger.Error("Failed to update client traffics for SubID ", subId, ": ", err)
  113. }
  114. }
  115. return nil
  116. }
  117. // calculateMaxTraffic calculates max up and down traffic for a group of emails
  118. func calculateMaxTraffic(db *gorm.DB, emails []string, maxUp, maxDown *int64) error {
  119. return db.Model(&xray.ClientTraffic{}).
  120. Where("email IN ?", emails).
  121. Select("MAX(up) AS max_up, MAX(down) AS max_down").
  122. Row().
  123. Scan(maxUp, maxDown)
  124. }
  125. // updateTraffic updates the traffic data in the database within a transaction
  126. func updateTraffic(db *gorm.DB, emails []string, maxUp, maxDown int64) error {
  127. return db.Transaction(func(tx *gorm.DB) error {
  128. return tx.Model(&xray.ClientTraffic{}).
  129. Where("email IN ?", emails).
  130. Updates(map[string]interface{}{
  131. "up": maxUp,
  132. "down": maxDown,
  133. }).Error
  134. })
  135. }
  136. // retryOperation retries an operation multiple times with a delay
  137. func retryOperation(operation func() error, maxRetries int, delay time.Duration) error {
  138. var err error
  139. for i := 0; i < maxRetries; i++ {
  140. err = operation()
  141. if err == nil {
  142. return nil
  143. }
  144. logger.Info(fmt.Sprintf("Retry %d/%d failed: %v", i+1, maxRetries, err))
  145. time.Sleep(delay)
  146. }
  147. return err
  148. }