node_traffic_sync_job.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package job
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  9. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
  13. )
  14. const (
  15. nodeTrafficSyncConcurrency = 8
  16. nodeTrafficSyncRequestTimeout = 4 * time.Second
  17. nodeReconcileTimeout = 30 * time.Second
  18. nodeClientIpSyncInterval = 10 * time.Second
  19. nodeGlobalPushInterval = 30 * time.Second
  20. )
  21. type NodeTrafficSyncJob struct {
  22. nodeService service.NodeService
  23. inboundService service.InboundService
  24. settingService service.SettingService
  25. xrayService service.XrayService
  26. running sync.Mutex
  27. structural atomicBool
  28. ipSyncMu sync.Mutex
  29. lastIpSync int64
  30. globalPushMu sync.Mutex
  31. lastGlobalPush int64
  32. }
  33. type atomicBool struct {
  34. mu sync.Mutex
  35. v bool
  36. }
  37. func (a *atomicBool) set() {
  38. a.mu.Lock()
  39. a.v = true
  40. a.mu.Unlock()
  41. }
  42. func (a *atomicBool) takeAndReset() bool {
  43. a.mu.Lock()
  44. v := a.v
  45. a.v = false
  46. a.mu.Unlock()
  47. return v
  48. }
  49. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  50. return &NodeTrafficSyncJob{}
  51. }
  52. func (j *NodeTrafficSyncJob) Run() {
  53. if !j.running.TryLock() {
  54. return
  55. }
  56. defer j.running.Unlock()
  57. mgr := runtime.GetManager()
  58. if mgr == nil {
  59. return
  60. }
  61. nodes, err := j.nodeService.GetAll()
  62. if err != nil {
  63. logger.Warning("node traffic sync: load nodes failed:", err)
  64. return
  65. }
  66. if len(nodes) == 0 {
  67. return
  68. }
  69. // Decide once per tick whether this run also syncs client IPs, and stamp the
  70. // clock before the loop so two back-to-back 5s ticks can't both qualify.
  71. doIpSync := false
  72. j.ipSyncMu.Lock()
  73. if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) {
  74. doIpSync = true
  75. j.lastIpSync = now
  76. }
  77. j.ipSyncMu.Unlock()
  78. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  79. var wg sync.WaitGroup
  80. for _, n := range nodes {
  81. if !n.Enable || n.Status != "online" {
  82. continue
  83. }
  84. wg.Add(1)
  85. sem <- struct{}{}
  86. n := n
  87. common.GoRecover("node-traffic-sync:"+n.Name, func() {
  88. defer wg.Done()
  89. defer func() { <-sem }()
  90. j.syncOne(mgr, n, doIpSync)
  91. })
  92. }
  93. wg.Wait()
  94. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  95. if err != nil {
  96. logger.Warning("node traffic sync: depletion check failed:", err)
  97. }
  98. if clientsDisabled {
  99. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  100. if err := j.xrayService.RestartXray(true); err != nil {
  101. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  102. j.xrayService.SetToNeedRestart()
  103. }
  104. } else if settingErr != nil {
  105. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  106. }
  107. j.structural.set()
  108. }
  109. j.maybePushGlobals(mgr, nodes)
  110. lastOnline, err := j.inboundService.GetClientsLastOnline()
  111. if err != nil {
  112. logger.Warning("node traffic sync: get last-online failed:", err)
  113. }
  114. if lastOnline == nil {
  115. lastOnline = map[string]int64{}
  116. }
  117. // Prune stale local-online entries (no local active emails or inbound tags
  118. // to add here — only the local xray poll feeds those) so a stopped local
  119. // xray's clients and inbounds still age out between traffic polls.
  120. j.inboundService.RefreshLocalOnlineClients(nil, nil)
  121. if !websocket.HasClients() {
  122. return
  123. }
  124. online := j.inboundService.GetOnlineClients()
  125. if online == nil {
  126. online = []string{}
  127. }
  128. websocket.BroadcastTraffic(map[string]any{
  129. "onlineClients": online,
  130. "onlineByGuid": j.inboundService.GetOnlineClientsByGuid(),
  131. "activeInbounds": j.inboundService.GetActiveInboundsByGuid(),
  132. "lastOnlineMap": lastOnline,
  133. })
  134. clientStats := map[string]any{}
  135. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  136. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  137. } else if len(stats) > 0 {
  138. clientStats["clients"] = stats
  139. }
  140. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  141. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  142. } else if len(summary) > 0 {
  143. clientStats["inbounds"] = summary
  144. }
  145. if len(clientStats) > 0 {
  146. websocket.BroadcastClientStats(clientStats)
  147. }
  148. if j.structural.takeAndReset() {
  149. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  150. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  151. }
  152. }
  153. // maybePushGlobals broadcasts this panel's aggregated per-client usage to its
  154. // online nodes so each node can display the client's cross-panel total and
  155. // enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
  156. // per node to the clients that node actually hosts, and throttled — the
  157. // aggregates only need to reach nodes on a human timescale, not every poll.
  158. func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) {
  159. j.globalPushMu.Lock()
  160. now := time.Now().Unix()
  161. if now-j.lastGlobalPush < int64(nodeGlobalPushInterval/time.Second) {
  162. j.globalPushMu.Unlock()
  163. return
  164. }
  165. j.lastGlobalPush = now
  166. j.globalPushMu.Unlock()
  167. masterGuid, err := j.settingService.GetPanelGuid()
  168. if err != nil || masterGuid == "" {
  169. return
  170. }
  171. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  172. var wg sync.WaitGroup
  173. for _, n := range nodes {
  174. if !n.Enable || n.Status != "online" {
  175. continue
  176. }
  177. remote, err := mgr.RemoteFor(n)
  178. if err != nil {
  179. continue
  180. }
  181. traffics, err := j.inboundService.GetNodeClientTraffics(n.Id)
  182. if err != nil {
  183. logger.Warning("node traffic sync: load globals for", n.Name, "failed:", err)
  184. continue
  185. }
  186. if len(traffics) == 0 {
  187. continue
  188. }
  189. wg.Add(1)
  190. sem <- struct{}{}
  191. n, remote, traffics := n, remote, traffics
  192. common.GoRecover("node-global-push:"+n.Name, func() {
  193. defer wg.Done()
  194. defer func() { <-sem }()
  195. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  196. defer cancel()
  197. if err := remote.PushGlobalClientTraffics(ctx, masterGuid, traffics); err != nil {
  198. // An old-build node without the endpoint answers 404 — not worth a
  199. // warning every cycle.
  200. if strings.Contains(err.Error(), "HTTP 404") {
  201. logger.Debug("node traffic sync: node", n.Name, "has no global-traffic endpoint (old build)")
  202. } else {
  203. logger.Warning("node traffic sync: push globals to", n.Name, "failed:", err)
  204. }
  205. }
  206. })
  207. }
  208. wg.Wait()
  209. }
  210. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
  211. rt, err := mgr.RemoteFor(n)
  212. if err != nil {
  213. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  214. return
  215. }
  216. if n.ConfigDirty {
  217. reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
  218. reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n)
  219. reconcileCancel()
  220. if reconcileErr != nil {
  221. logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr)
  222. return
  223. }
  224. if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
  225. logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr)
  226. }
  227. j.structural.set()
  228. }
  229. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  230. defer cancel()
  231. snap, err := rt.FetchTrafficSnapshot(ctx)
  232. if err != nil {
  233. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  234. j.inboundService.ClearNodeOnlineClients(n.Id)
  235. return
  236. }
  237. service.FilterNodeSnapshot(n, snap)
  238. _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
  239. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
  240. if err != nil {
  241. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  242. return
  243. }
  244. if changed {
  245. j.structural.set()
  246. }
  247. if !doIpSync {
  248. return
  249. }
  250. nodeIps, err := rt.FetchAllClientIps(ctx)
  251. if err == nil && len(nodeIps) > 0 {
  252. if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
  253. logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err)
  254. }
  255. } else if err != nil {
  256. logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err)
  257. }
  258. masterIps, err := j.inboundService.GetAllInboundClientIps()
  259. if err != nil {
  260. logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err)
  261. return
  262. }
  263. if len(masterIps) > 0 {
  264. if err := rt.PushAllClientIps(ctx, masterIps); err != nil {
  265. logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err)
  266. }
  267. }
  268. // Per-node IP attribution: pull the node's guid-keyed subtree (its own
  269. // observations plus any descendants) so the master can tell which node each
  270. // IP is on. Old nodes without the endpoint just return an error — skip them.
  271. if guidTrees, err := rt.FetchClientIpsByGuid(ctx); err != nil {
  272. logger.Debug("node traffic sync: fetch client ip attribution from", n.Name, "failed:", err)
  273. } else if len(guidTrees) > 0 {
  274. if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil {
  275. logger.Warning("node traffic sync: merge client ip attribution from", n.Name, "failed:", err)
  276. }
  277. }
  278. }