node_traffic_sync_job.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package job
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  9. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
  13. )
  14. const (
  15. nodeTrafficSyncConcurrency = 8
  16. nodeTrafficSyncRequestTimeout = 4 * time.Second
  17. nodeReconcileTimeout = 30 * time.Second
  18. nodeClientIpSyncInterval = 10 * time.Second
  19. nodeClientIpSyncTimeout = 6 * time.Second
  20. nodeGlobalPushInterval = 30 * time.Second
  21. )
  22. type NodeTrafficSyncJob struct {
  23. nodeService service.NodeService
  24. inboundService service.InboundService
  25. settingService service.SettingService
  26. xrayService service.XrayService
  27. running sync.Mutex
  28. structural atomicBool
  29. ipSyncMu sync.Mutex
  30. lastIpSync int64
  31. globalPushMu sync.Mutex
  32. lastGlobalPush int64
  33. }
  34. type atomicBool struct {
  35. mu sync.Mutex
  36. v bool
  37. }
  38. func (a *atomicBool) set() {
  39. a.mu.Lock()
  40. a.v = true
  41. a.mu.Unlock()
  42. }
  43. func (a *atomicBool) takeAndReset() bool {
  44. a.mu.Lock()
  45. v := a.v
  46. a.v = false
  47. a.mu.Unlock()
  48. return v
  49. }
  50. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  51. return &NodeTrafficSyncJob{}
  52. }
  53. func (j *NodeTrafficSyncJob) Run() {
  54. if !j.running.TryLock() {
  55. return
  56. }
  57. defer j.running.Unlock()
  58. mgr := runtime.GetManager()
  59. if mgr == nil {
  60. return
  61. }
  62. nodes, err := j.nodeService.GetAll()
  63. if err != nil {
  64. logger.Warning("node traffic sync: load nodes failed:", err)
  65. return
  66. }
  67. if len(nodes) == 0 {
  68. return
  69. }
  70. // Decide once per tick whether this run also syncs client IPs, and stamp the
  71. // clock before the loop so two back-to-back 5s ticks can't both qualify.
  72. doIpSync := false
  73. j.ipSyncMu.Lock()
  74. if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) {
  75. doIpSync = true
  76. j.lastIpSync = now
  77. }
  78. j.ipSyncMu.Unlock()
  79. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  80. var wg sync.WaitGroup
  81. for _, n := range nodes {
  82. if !n.Enable || n.Status != "online" {
  83. continue
  84. }
  85. wg.Add(1)
  86. sem <- struct{}{}
  87. n := n
  88. common.GoRecover("node-traffic-sync:"+n.Name, func() {
  89. defer wg.Done()
  90. defer func() { <-sem }()
  91. j.syncOne(mgr, n, doIpSync)
  92. })
  93. }
  94. wg.Wait()
  95. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  96. if err != nil {
  97. logger.Warning("node traffic sync: depletion check failed:", err)
  98. }
  99. if clientsDisabled {
  100. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  101. if err := j.xrayService.RestartXray(true); err != nil {
  102. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  103. j.xrayService.SetToNeedRestart()
  104. }
  105. } else if settingErr != nil {
  106. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  107. }
  108. j.structural.set()
  109. }
  110. j.maybePushGlobals(mgr, nodes)
  111. lastOnline, err := j.inboundService.GetClientsLastOnline()
  112. if err != nil {
  113. logger.Warning("node traffic sync: get last-online failed:", err)
  114. }
  115. if lastOnline == nil {
  116. lastOnline = map[string]int64{}
  117. }
  118. // Prune stale local-online entries (no local active emails or inbound tags
  119. // to add here — only the local xray poll feeds those) so a stopped local
  120. // xray's clients and inbounds still age out between traffic polls.
  121. j.inboundService.RefreshLocalOnlineClients(nil, nil)
  122. if !websocket.HasClients() {
  123. return
  124. }
  125. online := j.inboundService.GetOnlineClients()
  126. if online == nil {
  127. online = []string{}
  128. }
  129. websocket.BroadcastTraffic(map[string]any{
  130. "onlineClients": online,
  131. "onlineByGuid": j.inboundService.GetOnlineClientsByGuid(),
  132. "activeInbounds": j.inboundService.GetActiveInboundsByGuid(),
  133. "lastOnlineMap": lastOnline,
  134. })
  135. clientStats := map[string]any{}
  136. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  137. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  138. } else if len(stats) > 0 {
  139. clientStats["clients"] = stats
  140. }
  141. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  142. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  143. } else if len(summary) > 0 {
  144. clientStats["inbounds"] = summary
  145. }
  146. if len(clientStats) > 0 {
  147. websocket.BroadcastClientStats(clientStats)
  148. }
  149. if j.structural.takeAndReset() {
  150. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  151. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  152. }
  153. }
  154. // maybePushGlobals broadcasts this panel's aggregated per-client usage to its
  155. // online nodes so each node can display the client's cross-panel total and
  156. // enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
  157. // per node to the clients that node actually hosts, and throttled — the
  158. // aggregates only need to reach nodes on a human timescale, not every poll.
  159. func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) {
  160. j.globalPushMu.Lock()
  161. now := time.Now().Unix()
  162. if now-j.lastGlobalPush < int64(nodeGlobalPushInterval/time.Second) {
  163. j.globalPushMu.Unlock()
  164. return
  165. }
  166. j.lastGlobalPush = now
  167. j.globalPushMu.Unlock()
  168. masterGuid, err := j.settingService.GetPanelGuid()
  169. if err != nil || masterGuid == "" {
  170. return
  171. }
  172. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  173. var wg sync.WaitGroup
  174. for _, n := range nodes {
  175. if !n.Enable || n.Status != "online" {
  176. continue
  177. }
  178. remote, err := mgr.RemoteFor(n)
  179. if err != nil {
  180. continue
  181. }
  182. traffics, err := j.inboundService.GetNodeClientTraffics(n.Id)
  183. if err != nil {
  184. logger.Warningf("node traffic sync: load globals for %s failed: %v", n.Name, err)
  185. continue
  186. }
  187. if len(traffics) == 0 {
  188. continue
  189. }
  190. wg.Add(1)
  191. sem <- struct{}{}
  192. n, remote, traffics := n, remote, traffics
  193. common.GoRecover("node-global-push:"+n.Name, func() {
  194. defer wg.Done()
  195. defer func() { <-sem }()
  196. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  197. defer cancel()
  198. if err := remote.PushGlobalClientTraffics(ctx, masterGuid, traffics); err != nil {
  199. // An old-build node without the endpoint answers 404 — not worth a
  200. // warning every cycle.
  201. if strings.Contains(err.Error(), "HTTP 404") {
  202. logger.Debugf("node traffic sync: node %s has no global-traffic endpoint (old build)", n.Name)
  203. } else {
  204. logger.Warningf("node traffic sync: push globals to %s failed: %v", n.Name, err)
  205. }
  206. }
  207. })
  208. }
  209. wg.Wait()
  210. }
  211. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
  212. rt, err := mgr.RemoteFor(n)
  213. if err != nil {
  214. logger.Warningf("node traffic sync: remote lookup failed for %s: %v", n.Name, err)
  215. return
  216. }
  217. if n.ConfigDirty {
  218. reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
  219. reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n)
  220. reconcileCancel()
  221. if reconcileErr != nil {
  222. logger.Warningf("node traffic sync: reconcile for %s failed: %v", n.Name, reconcileErr)
  223. return
  224. }
  225. if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
  226. logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
  227. }
  228. j.structural.set()
  229. }
  230. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  231. defer cancel()
  232. snap, err := rt.FetchTrafficSnapshot(ctx)
  233. if err != nil {
  234. logger.Warningf("node traffic sync: fetch from %s failed: %v", n.Name, err)
  235. j.inboundService.ClearNodeOnlineClients(n.Id)
  236. return
  237. }
  238. service.FilterNodeSnapshot(n, snap)
  239. _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
  240. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
  241. if err != nil {
  242. logger.Warningf("node traffic sync: merge for %s failed: %v", n.Name, err)
  243. return
  244. }
  245. if changed {
  246. j.structural.set()
  247. }
  248. if !doIpSync {
  249. return
  250. }
  251. ipCtx, ipCancel := context.WithTimeout(context.Background(), nodeClientIpSyncTimeout)
  252. defer ipCancel()
  253. nodeIps, err := rt.FetchAllClientIps(ipCtx)
  254. if err == nil && len(nodeIps) > 0 {
  255. if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
  256. logger.Warningf("node traffic sync: merge client ips from %s failed: %v", n.Name, err)
  257. }
  258. } else if err != nil {
  259. logger.Warningf("node traffic sync: fetch client ips from %s failed: %v", n.Name, err)
  260. }
  261. masterIps, err := j.inboundService.GetAllInboundClientIps()
  262. if err != nil {
  263. logger.Warningf("node traffic sync: load client ips for push to %s failed: %v", n.Name, err)
  264. return
  265. }
  266. if len(masterIps) > 0 {
  267. if err := rt.PushAllClientIps(ipCtx, masterIps); err != nil {
  268. logger.Warningf("node traffic sync: push client ips to %s failed: %v", n.Name, err)
  269. }
  270. }
  271. // Per-node IP attribution: pull the node's guid-keyed subtree (its own
  272. // observations plus any descendants) so the master can tell which node each
  273. // IP is on. Old nodes without the endpoint just return an error — skip them.
  274. if guidTrees, err := rt.FetchClientIpsByGuid(ipCtx); err != nil {
  275. logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err)
  276. } else if len(guidTrees) > 0 {
  277. if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil {
  278. logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err)
  279. }
  280. }
  281. }