| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package job
- import (
- "context"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v2/database/model"
- "github.com/mhsanaei/3x-ui/v2/logger"
- "github.com/mhsanaei/3x-ui/v2/web/runtime"
- "github.com/mhsanaei/3x-ui/v2/web/service"
- "github.com/mhsanaei/3x-ui/v2/web/websocket"
- )
- // nodeTrafficSyncConcurrency caps how many nodes we sync simultaneously.
- // Each sync does three HTTP calls in series, so the wall-clock budget
- // per node is the request timeout below — keeping the cap modest avoids
- // flooding the network while still getting through dozens of nodes
- // inside a 10s tick.
- const nodeTrafficSyncConcurrency = 8
- // nodeTrafficSyncRequestTimeout bounds the per-node sync. Three probes
- // in series at 8s each would blow past the cron interval, so the budget
- // here covers the whole snapshot — FetchTrafficSnapshot internally caps
- // each HTTP call at the runtime's own 10s ceiling but uses ctx for the
- // outer total.
- const nodeTrafficSyncRequestTimeout = 8 * time.Second
- // NodeTrafficSyncJob pulls absolute traffic + online stats from every
- // enabled, currently-online remote node and merges them into the central
- // DB. Mirrors NodeHeartbeatJob's structure: TryLock to skip pile-ups,
- // errgroup-style fan-out with a concurrency cap, per-node ctx timeout.
- //
- // Offline nodes are skipped entirely — the heartbeat job already owns
- // status tracking, and we'd just waste sockets retrying a node we know
- // is unreachable. As soon as heartbeat marks a node online again, the
- // next traffic tick picks it up.
- type NodeTrafficSyncJob struct {
- nodeService service.NodeService
- inboundService service.InboundService
- // Coarse mutex prevents two ticks running concurrently if a single
- // sync stalls past the 10s cron interval (rare but possible when
- // many nodes are slow simultaneously).
- running sync.Mutex
- }
- // NewNodeTrafficSyncJob builds a singleton sync job. Cron hands the same
- // instance to every tick so the running mutex is preserved across runs.
- func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
- return &NodeTrafficSyncJob{}
- }
- func (j *NodeTrafficSyncJob) Run() {
- if !j.running.TryLock() {
- return
- }
- defer j.running.Unlock()
- mgr := runtime.GetManager()
- if mgr == nil {
- // Server still booting — pre-Manager runs are normal during
- // the first few seconds of startup.
- return
- }
- nodes, err := j.nodeService.GetAll()
- if err != nil {
- logger.Warning("node traffic sync: load nodes failed:", err)
- return
- }
- if len(nodes) == 0 {
- return
- }
- sem := make(chan struct{}, nodeTrafficSyncConcurrency)
- var wg sync.WaitGroup
- for _, n := range nodes {
- if !n.Enable || n.Status != "online" {
- continue
- }
- wg.Add(1)
- sem <- struct{}{}
- go func(n *model.Node) {
- defer wg.Done()
- defer func() { <-sem }()
- j.syncOne(mgr, n)
- }(n)
- }
- wg.Wait()
- // One broadcast per tick, batched across all nodes — frontend code
- // is invariant to whether the rows came from local xray or a node,
- // so we reuse the same WebSocket envelope XrayTrafficJob uses.
- if websocket.HasClients() {
- online := j.inboundService.GetOnlineClients()
- if online == nil {
- online = []string{}
- }
- lastOnline, err := j.inboundService.GetClientsLastOnline()
- if err != nil {
- logger.Warning("node traffic sync: get last-online failed:", err)
- }
- if lastOnline == nil {
- lastOnline = map[string]int64{}
- }
- websocket.BroadcastTraffic(map[string]any{
- "onlineClients": online,
- "lastOnlineMap": lastOnline,
- })
- }
- }
- // syncOne fetches and merges one node's snapshot. Errors are logged
- // per-node and don't propagate; one slow node shouldn't keep the rest
- // from running.
- func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
- ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
- defer cancel()
- rt, err := mgr.RemoteFor(n)
- if err != nil {
- logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
- return
- }
- snap, err := rt.FetchTrafficSnapshot(ctx)
- if err != nil {
- logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
- // Drop node-online contribution so a hiccup doesn't leave the
- // online filter showing stale clients indefinitely.
- j.inboundService.ClearNodeOnlineClients(n.Id)
- return
- }
- if err := j.inboundService.SetRemoteTraffic(n.Id, snap); err != nil {
- logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
- }
- }
|