node_traffic_sync_job.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package job
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  9. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. )
  15. const (
  16. nodeTrafficSyncConcurrency = 8
  17. nodeTrafficSyncRequestTimeout = 4 * time.Second
  18. nodeReconcileTimeout = 30 * time.Second
  19. nodeClientIpSyncInterval = 10 * time.Second
  20. nodeClientIpSyncTimeout = 6 * time.Second
  21. nodeGlobalPushInterval = 30 * time.Second
  22. // nodeInboundSpeedWindowMs is the poll window node-inbound speed deltas are
  23. // normalized to; it MUST match the dashboard's TRAFFIC_POLL_INTERVAL_S (5s),
  24. // the fixed divisor the frontend applies to turn a delta into a rate.
  25. nodeInboundSpeedWindowMs int64 = 5000
  26. )
  27. // inboundSample is a node inbound's last-seen cumulative up/down and the time
  28. // (unix millis) its counter last changed, used to derive a normalized speed.
  29. type inboundSample struct {
  30. up, down, at int64
  31. }
  32. type NodeTrafficSyncJob struct {
  33. nodeService service.NodeService
  34. inboundService service.InboundService
  35. settingService service.SettingService
  36. xrayService service.XrayService
  37. running sync.Mutex
  38. structural atomicBool
  39. ipSyncMu sync.Mutex
  40. lastIpSync int64
  41. globalPushMu sync.Mutex
  42. lastGlobalPush int64
  43. // noGuidIpEndpoint tracks nodes (by id) whose client-IP attribution endpoint
  44. // returned 404, so an old-build node is noted once instead of every cycle.
  45. noGuidIpEndpoint sync.Map
  46. // prevInboundTotals holds the previous poll's cumulative up/down (and the time
  47. // the counter last changed) per node inbound tag, so the next poll can derive
  48. // a per-inbound speed delta — node inbounds have no local Xray poll. Touched
  49. // only from Run (serialized).
  50. prevInboundTotals map[string]inboundSample
  51. }
  52. type atomicBool struct {
  53. mu sync.Mutex
  54. v bool
  55. }
  56. func (a *atomicBool) set() {
  57. a.mu.Lock()
  58. a.v = true
  59. a.mu.Unlock()
  60. }
  61. func (a *atomicBool) takeAndReset() bool {
  62. a.mu.Lock()
  63. v := a.v
  64. a.v = false
  65. a.mu.Unlock()
  66. return v
  67. }
  68. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  69. return &NodeTrafficSyncJob{}
  70. }
  71. func (j *NodeTrafficSyncJob) Run() {
  72. if !j.running.TryLock() {
  73. return
  74. }
  75. defer j.running.Unlock()
  76. mgr := runtime.GetManager()
  77. if mgr == nil {
  78. return
  79. }
  80. nodes, err := j.nodeService.GetAll()
  81. if err != nil {
  82. logger.Warning("node traffic sync: load nodes failed:", err)
  83. return
  84. }
  85. if len(nodes) == 0 {
  86. return
  87. }
  88. // Decide once per tick whether this run also syncs client IPs, and stamp the
  89. // clock before the loop so two back-to-back 5s ticks can't both qualify.
  90. doIpSync := false
  91. j.ipSyncMu.Lock()
  92. if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) {
  93. doIpSync = true
  94. j.lastIpSync = now
  95. }
  96. j.ipSyncMu.Unlock()
  97. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  98. var wg sync.WaitGroup
  99. var activeMu sync.Mutex
  100. var activeEmails []string
  101. for _, n := range nodes {
  102. if !n.Enable || n.Status != "online" {
  103. continue
  104. }
  105. wg.Add(1)
  106. sem <- struct{}{}
  107. n := n
  108. common.GoRecover("node-traffic-sync:"+n.Name, func() {
  109. defer wg.Done()
  110. defer func() { <-sem }()
  111. if emails := j.syncOne(mgr, n, doIpSync); len(emails) > 0 {
  112. activeMu.Lock()
  113. activeEmails = append(activeEmails, emails...)
  114. activeMu.Unlock()
  115. }
  116. })
  117. }
  118. wg.Wait()
  119. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  120. if err != nil {
  121. logger.Warning("node traffic sync: depletion check failed:", err)
  122. }
  123. if clientsDisabled {
  124. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  125. if err := j.xrayService.RestartXray(true); err != nil {
  126. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  127. j.xrayService.SetToNeedRestart()
  128. }
  129. } else if settingErr != nil {
  130. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  131. }
  132. j.structural.set()
  133. }
  134. j.maybePushGlobals(mgr, nodes)
  135. // Prune stale local-online entries (no local active emails or inbound tags
  136. // to add here — only the local xray poll feeds those) so a stopped local
  137. // xray's clients and inbounds still age out between traffic polls.
  138. j.inboundService.RefreshLocalOnlineClients(nil, nil)
  139. // Derive per-node-inbound speed every tick (keeps the baseline fresh even
  140. // with no dashboard open); only broadcast it when someone is watching.
  141. inboundSpeed := j.nodeInboundSpeed()
  142. if !websocket.HasClients() {
  143. return
  144. }
  145. // Same snapshot-vs-delta split as the local traffic job: above the
  146. // threshold a full snapshot would be dropped by the hub's payload cap, so
  147. // send only the rows for clients online on the synced nodes this tick.
  148. snapshot := true
  149. if total, countErr := j.inboundService.CountClientTraffics(); countErr != nil {
  150. logger.Warning("node traffic sync: count client traffics failed:", countErr)
  151. } else if total > clientStatsSnapshotMaxClients {
  152. snapshot = false
  153. }
  154. var stats []*xray.ClientTraffic
  155. var statsErr error
  156. if snapshot {
  157. stats, statsErr = j.inboundService.GetAllClientTraffics()
  158. } else {
  159. stats, statsErr = j.inboundService.GetActiveClientTraffics(activeEmails)
  160. }
  161. if statsErr != nil {
  162. logger.Warning("node traffic sync: get client traffics for websocket failed:", statsErr)
  163. }
  164. var lastOnline map[string]int64
  165. if snapshot {
  166. var loErr error
  167. if lastOnline, loErr = j.inboundService.GetClientsLastOnline(); loErr != nil {
  168. logger.Warning("node traffic sync: get last-online failed:", loErr)
  169. }
  170. } else {
  171. lastOnline = make(map[string]int64, len(stats))
  172. for _, ct := range stats {
  173. if ct != nil {
  174. lastOnline[ct.Email] = ct.LastOnline
  175. }
  176. }
  177. }
  178. if lastOnline == nil {
  179. lastOnline = map[string]int64{}
  180. }
  181. online := j.inboundService.GetOnlineClients()
  182. if online == nil {
  183. online = []string{}
  184. }
  185. trafficPayload := map[string]any{
  186. "onlineClients": online,
  187. "onlineByGuid": j.inboundService.GetOnlineClientsByGuid(),
  188. "activeInbounds": j.inboundService.GetActiveInboundsByGuid(),
  189. "lastOnlineMap": lastOnline,
  190. }
  191. // Always send the key so the dashboard clears node inbounds that went idle
  192. // this tick. A nil result (query error) marshals to null and is skipped
  193. // client-side, leaving the last shown value untouched; an empty (non-nil)
  194. // slice marshals to [] and clears stale speeds.
  195. trafficPayload["nodeTraffics"] = inboundSpeed
  196. websocket.BroadcastTraffic(trafficPayload)
  197. clientStats := map[string]any{"snapshot": snapshot}
  198. if len(stats) > 0 {
  199. clientStats["clients"] = stats
  200. }
  201. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  202. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  203. } else if len(summary) > 0 {
  204. clientStats["inbounds"] = summary
  205. }
  206. if len(clientStats) > 1 {
  207. websocket.BroadcastClientStats(clientStats)
  208. }
  209. if j.structural.takeAndReset() {
  210. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  211. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  212. }
  213. }
  214. // nodeInboundSpeed derives a per-node-inbound speed delta by diffing the current
  215. // cumulative up/down against the previous poll's, keyed by the central tag the
  216. // dashboard matches. The node's counter keeps climbing while the master can't
  217. // reach it, so the first delta after a gap (node outage, skipped poll, slow
  218. // node) spans more than one poll window; it is normalized to the fixed
  219. // nodeInboundSpeedWindowMs using the real elapsed time so the dashboard's fixed
  220. // divisor yields the true average rate over the gap instead of an impossible
  221. // one-tick spike. The change timestamp only advances when the value actually
  222. // moves, so an idle stretch is averaged correctly when traffic resumes. A reset
  223. // rebaselines to the lower value; a first-seen tag yields no delta until the
  224. // next poll.
  225. func (j *NodeTrafficSyncJob) nodeInboundSpeed() []*xray.Traffic {
  226. totals, err := j.inboundService.GetNodeInboundTrafficTotals()
  227. if err != nil {
  228. return nil
  229. }
  230. now := time.Now().UnixMilli()
  231. deltas := make([]*xray.Traffic, 0, len(totals))
  232. next := make(map[string]inboundSample, len(totals))
  233. for tag, cur := range totals {
  234. prev, ok := j.prevInboundTotals[tag]
  235. if !ok {
  236. next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
  237. continue
  238. }
  239. dUp := cur[0] - prev.up
  240. dDown := cur[1] - prev.down
  241. if dUp <= 0 && dDown <= 0 {
  242. // No movement, or a counter reset: hold the change timestamp so a
  243. // later jump is averaged over the real elapsed window, not shown as a
  244. // spike. Adopt the lower value on a reset.
  245. if cur[0] < prev.up || cur[1] < prev.down {
  246. next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
  247. } else {
  248. next[tag] = prev
  249. }
  250. continue
  251. }
  252. if dUp < 0 {
  253. dUp = 0
  254. }
  255. if dDown < 0 {
  256. dDown = 0
  257. }
  258. elapsed := max(now-prev.at, nodeInboundSpeedWindowMs)
  259. up := dUp * nodeInboundSpeedWindowMs / elapsed
  260. down := dDown * nodeInboundSpeedWindowMs / elapsed
  261. if up > 0 || down > 0 {
  262. deltas = append(deltas, &xray.Traffic{Tag: tag, IsInbound: true, Up: up, Down: down})
  263. }
  264. next[tag] = inboundSample{up: cur[0], down: cur[1], at: now}
  265. }
  266. j.prevInboundTotals = next
  267. return deltas
  268. }
  269. // maybePushGlobals broadcasts this panel's aggregated per-client usage to its
  270. // online nodes so each node can display the client's cross-panel total and
  271. // enforce its quota locally (see InboundService.AcceptGlobalTraffic). Scoped
  272. // per node to the clients that node actually hosts, and throttled — the
  273. // aggregates only need to reach nodes on a human timescale, not every poll.
  274. func (j *NodeTrafficSyncJob) maybePushGlobals(mgr *runtime.Manager, nodes []*model.Node) {
  275. j.globalPushMu.Lock()
  276. now := time.Now().Unix()
  277. if now-j.lastGlobalPush < int64(nodeGlobalPushInterval/time.Second) {
  278. j.globalPushMu.Unlock()
  279. return
  280. }
  281. j.lastGlobalPush = now
  282. j.globalPushMu.Unlock()
  283. masterGuid, err := j.settingService.GetPanelGuid()
  284. if err != nil || masterGuid == "" {
  285. return
  286. }
  287. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  288. var wg sync.WaitGroup
  289. for _, n := range nodes {
  290. if !n.Enable || n.Status != "online" {
  291. continue
  292. }
  293. remote, err := mgr.RemoteFor(n)
  294. if err != nil {
  295. continue
  296. }
  297. traffics, err := j.inboundService.GetNodeClientTraffics(n.Id)
  298. if err != nil {
  299. logger.Warningf("node traffic sync: load globals for %s failed: %v", n.Name, err)
  300. continue
  301. }
  302. if len(traffics) == 0 {
  303. continue
  304. }
  305. wg.Add(1)
  306. sem <- struct{}{}
  307. n, remote, traffics := n, remote, traffics
  308. common.GoRecover("node-global-push:"+n.Name, func() {
  309. defer wg.Done()
  310. defer func() { <-sem }()
  311. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  312. defer cancel()
  313. if err := remote.PushGlobalClientTraffics(ctx, masterGuid, traffics); err != nil {
  314. // An old-build node without the endpoint answers 404 — not worth a
  315. // warning every cycle.
  316. if strings.Contains(err.Error(), "HTTP 404") {
  317. logger.Debugf("node traffic sync: node %s has no global-traffic endpoint (old build)", n.Name)
  318. } else {
  319. logger.Warningf("node traffic sync: push globals to %s failed: %v", n.Name, err)
  320. }
  321. }
  322. })
  323. }
  324. wg.Wait()
  325. }
  326. // syncOne pulls one node's traffic snapshot and merges it. It returns the
  327. // emails online on that node this tick, feeding the delta broadcast above the
  328. // snapshot threshold; nil on any failure path.
  329. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) []string {
  330. rt, err := mgr.RemoteFor(n)
  331. if err != nil {
  332. logger.Warningf("node traffic sync: remote lookup failed for %s: %v", n.Name, err)
  333. return nil
  334. }
  335. if n.ConfigDirty {
  336. reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
  337. reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n)
  338. reconcileCancel()
  339. if reconcileErr != nil {
  340. // The dirty flag stays set so reconcile retries next tick, but traffic
  341. // accounting must keep flowing: one rejected inbound used to starve the
  342. // whole node's traffic/online sync forever (#5685).
  343. logger.Warningf("node traffic sync: reconcile for %s failed, continuing with traffic pull: %v", n.Name, reconcileErr)
  344. } else {
  345. if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
  346. logger.Warningf("node traffic sync: clear dirty for %s failed: %v", n.Name, clearErr)
  347. }
  348. j.structural.set()
  349. }
  350. }
  351. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  352. defer cancel()
  353. snap, err := rt.FetchTrafficSnapshot(ctx)
  354. if err != nil {
  355. logger.Warningf("node traffic sync: fetch from %s failed: %v", n.Name, err)
  356. j.inboundService.ClearNodeOnlineClients(n.Id)
  357. return nil
  358. }
  359. service.FilterNodeSnapshot(n, snap)
  360. _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
  361. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
  362. if err != nil {
  363. logger.Warningf("node traffic sync: merge for %s failed: %v", n.Name, err)
  364. return nil
  365. }
  366. if changed {
  367. j.structural.set()
  368. }
  369. active := make([]string, 0, len(snap.OnlineEmails))
  370. active = append(active, snap.OnlineEmails...)
  371. for _, emails := range snap.OnlineTree {
  372. active = append(active, emails...)
  373. }
  374. if !doIpSync {
  375. return active
  376. }
  377. ipCtx, ipCancel := context.WithTimeout(context.Background(), nodeClientIpSyncTimeout)
  378. defer ipCancel()
  379. nodeIps, err := rt.FetchAllClientIps(ipCtx)
  380. if err == nil && len(nodeIps) > 0 {
  381. if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
  382. logger.Warningf("node traffic sync: merge client ips from %s failed: %v", n.Name, err)
  383. }
  384. } else if err != nil {
  385. logger.Warningf("node traffic sync: fetch client ips from %s failed: %v", n.Name, err)
  386. }
  387. masterIps, err := j.inboundService.GetAllInboundClientIps()
  388. if err != nil {
  389. logger.Warningf("node traffic sync: load client ips for push to %s failed: %v", n.Name, err)
  390. return active
  391. }
  392. if len(masterIps) > 0 {
  393. if err := rt.PushAllClientIps(ipCtx, masterIps); err != nil {
  394. logger.Warningf("node traffic sync: push client ips to %s failed: %v", n.Name, err)
  395. }
  396. }
  397. // Per-node IP attribution: pull the node's guid-keyed subtree (its own
  398. // observations plus any descendants) so the master can tell which node each
  399. // IP is on. Old nodes without the endpoint return HTTP 404 every cycle — note
  400. // it once per node (re-armed on recovery) instead of flooding the log.
  401. if guidTrees, err := rt.FetchClientIpsByGuid(ipCtx); err != nil {
  402. if strings.Contains(err.Error(), "HTTP 404") {
  403. if _, seen := j.noGuidIpEndpoint.LoadOrStore(n.Id, true); !seen {
  404. logger.Debugf("node traffic sync: node %s has no client-IP attribution endpoint (old build)", n.Name)
  405. }
  406. } else {
  407. logger.Debugf("node traffic sync: fetch client ip attribution from %s failed: %v", n.Name, err)
  408. }
  409. } else {
  410. j.noGuidIpEndpoint.Delete(n.Id)
  411. if len(guidTrees) > 0 {
  412. if err := j.inboundService.MergeClientIpsByGuid(n, guidTrees); err != nil {
  413. logger.Warningf("node traffic sync: merge client ip attribution from %s failed: %v", n.Name, err)
  414. }
  415. }
  416. }
  417. return active
  418. }