| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004 |
- package service
- import (
- "context"
- "errors"
- "fmt"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/database"
- "github.com/mhsanaei/3x-ui/v3/internal/database/model"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
- "github.com/mhsanaei/3x-ui/v3/internal/xray"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- var reportedRemoteTagConflict sync.Map
- // nodeBulkPushThreshold caps how many per-client RPCs a single operation will
- // stream to a remote node. Above it, the panel marks the node dirty instead and
- // lets one ReconcileNode push converge the whole inbound — far cheaper than M
- // sequential round-trips. Small ops stay on the live per-client path.
- const nodeBulkPushThreshold = 32
- func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
- mgr := runtime.GetManager()
- if mgr == nil {
- return nil, fmt.Errorf("runtime manager not initialised")
- }
- return mgr.RuntimeFor(ib.NodeID)
- }
- func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
- if ib.NodeID == nil {
- rt, err := s.runtimeFor(ib)
- if err != nil {
- return nil, false, false, nil
- }
- return rt, true, false, nil
- }
- nodeSvc := NodeService{}
- enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
- if err != nil {
- return nil, false, false, err
- }
- if !enabled || status == "offline" {
- return nil, false, true, nil
- }
- rt, err := s.runtimeFor(ib)
- if err != nil {
- return nil, false, true, nil
- }
- return rt, true, false, nil
- }
- func (s *InboundService) NodeIsPending(nodeID *int) bool {
- if nodeID == nil {
- return false
- }
- return (&NodeService{}).IsNodePending(*nodeID)
- }
- func (s *InboundService) AnyNodePending(inboundIds []int) bool {
- if len(inboundIds) == 0 {
- return false
- }
- nodeSvc := NodeService{}
- for _, id := range inboundIds {
- ib, err := s.GetInbound(id)
- if err != nil || ib.NodeID == nil {
- continue
- }
- if nodeSvc.IsNodePending(*ib.NodeID) {
- return true
- }
- }
- return false
- }
- func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, n *model.Node) error {
- if rt == nil || n == nil || n.Id <= 0 {
- return nil
- }
- nodeID := n.Id
- db := database.GetDB()
- var inbounds []*model.Inbound
- if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
- return err
- }
- remoteTags, err := rt.ListRemoteTags(ctx)
- if err != nil {
- return err
- }
- remoteTagSet := make(map[string]struct{}, len(remoteTags))
- for _, tag := range remoteTags {
- remoteTagSet[tag] = struct{}{}
- }
- prefix := nodeTagPrefix(&nodeID)
- desiredTags := make(map[string]struct{}, len(inbounds)*2)
- for _, ib := range inbounds {
- desiredTags[ib.Tag] = struct{}{}
- // existsOnNode: does the node already report this inbound under any of the
- // tag forms it may be stored as? If so, an unchanged push can be skipped.
- _, existsOnNode := remoteTagSet[ib.Tag]
- if prefix != "" {
- if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
- desiredTags[stripped] = struct{}{}
- if _, ok := remoteTagSet[stripped]; ok {
- existsOnNode = true
- }
- } else {
- desiredTags[prefix+ib.Tag] = struct{}{}
- if _, ok := remoteTagSet[prefix+ib.Tag]; ok {
- existsOnNode = true
- }
- }
- }
- if _, err := rt.ReconcileInbound(ctx, ib, existsOnNode); err != nil {
- return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
- }
- }
- // In "selected" sync mode the panel only manages the selected tags: the
- // rest were never imported, so their absence from the local DB must not
- // delete them from the node. Only a selected tag missing locally (the
- // panel deleted it while the node was unreachable) may be swept.
- var selected map[string]struct{}
- if n.InboundSyncMode == "selected" {
- selected = make(map[string]struct{}, len(n.InboundTags))
- for _, tag := range n.InboundTags {
- selected[tag] = struct{}{}
- }
- }
- for _, tag := range remoteTags {
- if _, want := desiredTags[tag]; want {
- continue
- }
- if selected != nil {
- if _, managed := selected[tag]; !managed {
- continue
- }
- }
- if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
- return fmt.Errorf("reconcile delete %q: %w", tag, err)
- }
- }
- return nil
- }
- const resetGracePeriodMs int64 = 30000
- // onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
- // Xray's stats counters often report a zero delta for an active session across
- // a single poll, so a 5s grace would still drop the client on the next tick.
- // ~4 polls of slack keeps idle-but-connected clients visible without lingering
- // long after a real disconnect.
- const onlineGracePeriodMs int64 = 20000
- type nodeTrafficCounter struct {
- Up int64
- Down int64
- }
- func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
- return tx.Clauses(clause.OnConflict{
- Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
- DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
- }).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
- }
- // mergeActivationExpiry reconciles a node-reported client expiry with the value
- // already stored on the master. "Start after first connect" persists a negative
- // duration that each node converts to an absolute deadline (now+duration) the
- // first time the client connects there. The per-email client_traffics row is
- // shared across every node, so a node that has not yet seen a first connection
- // keeps reporting the negative duration — which must never reset a deadline
- // another node already activated.
- //
- // A node may legitimately move an already-activated deadline forward (traffic
- // reset / auto-renew extends it), so any positive node value is still adopted —
- // only an un-activated (<= 0) value is rejected once an absolute deadline
- // exists. Kept in lockstep with the SQL CASE in setRemoteTrafficLocked.
- func mergeActivationExpiry(existing, node int64) int64 {
- if existing > 0 && node <= 0 {
- return existing
- }
- return node
- }
- func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
- var structuralChange bool
- err := submitTrafficWrite(func() error {
- var inner error
- structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
- return inner
- })
- return structuralChange, err
- }
- func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
- if snap == nil || nodeID <= 0 {
- return false, nil
- }
- db := database.GetDB()
- now := time.Now().UnixMilli()
- // originGuidFor attributes a synced inbound to the panel that physically
- // hosts it: inbounds the node forwards from its own sub-nodes already carry
- // a non-empty OriginNodeGuid (kept as-is across hops); the node's own local
- // inbounds report empty, so they are attributed to the node's own GUID. An
- // empty result (old-build node with no GUID yet) leaves attribution to the
- // node_id fallback downstream (#4983).
- var nodeRow model.Node
- db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
- originGuidFor := func(snapIb *model.Inbound) string {
- if snapIb.OriginNodeGuid != "" {
- return snapIb.OriginNodeGuid
- }
- return nodeRow.Guid
- }
- var central []model.Inbound
- if err := db.Model(model.Inbound{}).
- Where("node_id = ?", nodeID).
- Find(¢ral).Error; err != nil {
- return false, err
- }
- // Index under the stored tag and its prefix-flipped form so a snap matches
- // whether the n<id>- prefix lives on the node side, the central side, or
- // neither — a mismatch must never spawn a duplicate central inbound.
- tagToCentral := make(map[string]*model.Inbound, len(central)*2)
- prefix := nodeTagPrefix(&nodeID)
- for i := range central {
- tagToCentral[central[i].Tag] = ¢ral[i]
- if prefix != "" {
- if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
- tagToCentral[stripped] = ¢ral[i]
- } else {
- tagToCentral[prefix+central[i].Tag] = ¢ral[i]
- }
- }
- }
- var centralClientStats []xray.ClientTraffic
- if len(central) > 0 {
- ids := make([]int, 0, len(central))
- for i := range central {
- ids = append(ids, central[i].Id)
- }
- if err := db.Model(xray.ClientTraffic{}).
- Where("inbound_id IN ?", ids).
- Find(¢ralClientStats).Error; err != nil {
- return false, err
- }
- }
- type csKey struct {
- inboundID int
- email string
- }
- centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
- centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
- for i := range centralClientStats {
- centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
- centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i]
- }
- nodeBaselines := make(map[string]nodeTrafficCounter)
- var baselineRows []model.NodeClientTraffic
- if err := db.Model(&model.NodeClientTraffic{}).
- Where("node_id = ?", nodeID).
- Find(&baselineRows).Error; err != nil {
- return false, err
- }
- for i := range baselineRows {
- nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
- }
- var defaultUserId int
- if len(central) > 0 {
- defaultUserId = central[0].UserId
- } else {
- var u model.User
- if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
- defaultUserId = u.Id
- } else {
- defaultUserId = 1
- }
- }
- // Union of every email the snapshot still reports, across all inbounds.
- // The (node, email) baseline rows are keyed per node, not per inbound, so
- // the sweeps below must only drop one when the email left the node
- // entirely — an email whose stats moved to (or always lived under) a
- // sibling inbound still needs its baseline for the sibling's delta
- // computation (#5202).
- //
- // Xray counts traffic per email, not per inbound, so a multi-attached
- // client's shared counter is copied onto every inbound it's on. Fold each
- // email to its per-field max (nodeEmailTotals) so divergent copies can't make
- // the reset clamp re-add a lower sibling as fresh traffic (#5274).
- snapEmailsAll := make(map[string]struct{})
- nodeEmailTotals := make(map[string]nodeTrafficCounter)
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- for i := range snapIb.ClientStats {
- email := snapIb.ClientStats[i].Email
- snapEmailsAll[email] = struct{}{}
- cur := nodeEmailTotals[email]
- if snapIb.ClientStats[i].Up > cur.Up {
- cur.Up = snapIb.ClientStats[i].Up
- }
- if snapIb.ClientStats[i].Down > cur.Down {
- cur.Down = snapIb.ClientStats[i].Down
- }
- nodeEmailTotals[email] = cur
- }
- }
- // Membership set for the rowExists checks below. Only the snapshot's emails
- // are ever probed, so scope the lookup to those instead of plucking the whole
- // client_traffics table (50k+ rows) on every node poll.
- existingEmails := make(map[string]struct{}, len(snapEmailsAll))
- if len(snapEmailsAll) > 0 {
- snapEmailList := make([]string, 0, len(snapEmailsAll))
- for email := range snapEmailsAll {
- snapEmailList = append(snapEmailList, email)
- }
- for _, batch := range chunkStrings(snapEmailList, sqliteMaxVars) {
- var found []string
- if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Pluck("email", &found).Error; err != nil {
- return false, err
- }
- for _, e := range found {
- existingEmails[e] = struct{}{}
- }
- }
- }
- tx := db.Begin()
- committed := false
- defer func() {
- if !committed {
- tx.Rollback()
- }
- }()
- structuralChange := false
- snapTags := make(map[string]struct{}, len(snap.Inbounds))
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- snapTags[snapIb.Tag] = struct{}{}
- // Record the prefix-flipped form too so the orphan sweep below keeps a
- // central inbound whether its tag carries the n<id>- prefix or not.
- if prefix != "" {
- if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
- snapTags[stripped] = struct{}{}
- } else {
- snapTags[prefix+snapIb.Tag] = struct{}{}
- }
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- if dirty {
- continue
- }
- // Try snap.Tag first; on collision fall back to the n<id>-
- // prefixed form so local+node can both own the same port.
- pickFreeTag := func() (string, error) {
- candidates := []string{snapIb.Tag}
- if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
- candidates = append(candidates, prefix+snapIb.Tag)
- }
- for _, t := range candidates {
- var owner model.Inbound
- err := tx.Where("tag = ?", t).First(&owner).Error
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return t, nil
- }
- if err != nil {
- return "", err
- }
- }
- return "", nil
- }
- chosenTag, err := pickFreeTag()
- if err != nil {
- logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
- continue
- }
- if chosenTag == "" {
- key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
- if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
- logger.Warningf(
- "setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
- snapIb.Tag, nodeID, nodeID,
- )
- }
- continue
- }
- newIb := model.Inbound{
- UserId: defaultUserId,
- NodeID: &nodeID,
- OriginNodeGuid: originGuidFor(snapIb),
- Tag: chosenTag,
- Listen: snapIb.Listen,
- Port: snapIb.Port,
- Protocol: snapIb.Protocol,
- Settings: snapIb.Settings,
- StreamSettings: snapIb.StreamSettings,
- Sniffing: snapIb.Sniffing,
- TrafficReset: snapIb.TrafficReset,
- LastTrafficResetTime: snapIb.LastTrafficResetTime,
- Enable: snapIb.Enable,
- Remark: snapIb.Remark,
- SubSortIndex: normalizeSubSortIndex(snapIb.SubSortIndex),
- Total: snapIb.Total,
- ExpiryTime: snapIb.ExpiryTime,
- Up: snapIb.Up,
- Down: snapIb.Down,
- ShareAddrStrategy: "node",
- }
- if err := tx.Create(&newIb).Error; err != nil {
- logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
- continue
- }
- tagToCentral[snapIb.Tag] = &newIb
- if newIb.Tag != snapIb.Tag {
- tagToCentral[newIb.Tag] = &newIb
- }
- structuralChange = true
- continue
- }
- inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
- updates := map[string]any{}
- if !dirty {
- updates["enable"] = snapIb.Enable
- updates["remark"] = snapIb.Remark
- updates["sub_sort_index"] = normalizeSubSortIndex(snapIb.SubSortIndex)
- updates["listen"] = snapIb.Listen
- updates["port"] = snapIb.Port
- updates["protocol"] = snapIb.Protocol
- updates["total"] = snapIb.Total
- updates["expiry_time"] = snapIb.ExpiryTime
- updates["settings"] = snapIb.Settings
- updates["stream_settings"] = snapIb.StreamSettings
- updates["sniffing"] = snapIb.Sniffing
- updates["traffic_reset"] = snapIb.TrafficReset
- updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
- }
- if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
- updates["up"] = snapIb.Up
- updates["down"] = snapIb.Down
- }
- // Physical-home attribution is independent of config-dirty state, so
- // keep it current even while the node has pending offline edits. Writes
- // once to backfill an existing row, then stays equal (#4983).
- if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
- updates["origin_node_guid"] = og
- }
- if !dirty && (c.Settings != snapIb.Settings ||
- c.Remark != snapIb.Remark ||
- c.Listen != snapIb.Listen ||
- c.Port != snapIb.Port ||
- c.Total != snapIb.Total ||
- c.ExpiryTime != snapIb.ExpiryTime ||
- c.Enable != snapIb.Enable) {
- structuralChange = true
- }
- if len(updates) > 0 {
- if err := tx.Model(model.Inbound{}).
- Where("id = ?", c.Id).
- Updates(updates).Error; err != nil {
- return false, err
- }
- }
- }
- for _, c := range central {
- if dirty {
- continue
- }
- if _, kept := snapTags[c.Tag]; kept {
- continue
- }
- var goneEmails []string
- if err := tx.Model(xray.ClientTraffic{}).
- Where("inbound_id = ?", c.Id).
- Pluck("email", &goneEmails).Error; err != nil {
- return false, err
- }
- if len(goneEmails) > 0 {
- // Baselines are per (node, email), not per inbound: keep them for
- // emails the snapshot still reports under a sibling inbound (#5202).
- baselineGone := make([]string, 0, len(goneEmails))
- for _, e := range goneEmails {
- if _, still := snapEmailsAll[e]; !still {
- baselineGone = append(baselineGone, e)
- }
- }
- // Chunk to avoid SQLite bind var limit when a node has many clients
- // removed (e.g. after API bulk delete or structural change on node inbound).
- for _, batch := range chunkStrings(baselineGone, sqliteMaxVars) {
- if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
- Delete(&model.NodeClientTraffic{}).Error; err != nil {
- return false, err
- }
- }
- // The per-email row is the shared accumulator across every inbound
- // (and node) the email is attached to. Only drop it when this was the
- // email's last inbound — wiping it while a sibling still feeds it
- // loses the summed history, and the next node sync would re-seed the
- // row with that node's counter alone.
- sharedEmails, sErr := s.emailsUsedByOtherInbounds(goneEmails, c.Id)
- if sErr != nil {
- return false, sErr
- }
- delEmails := make([]string, 0, len(goneEmails))
- for _, e := range goneEmails {
- if !sharedEmails[strings.ToLower(strings.TrimSpace(e))] {
- delEmails = append(delEmails, e)
- }
- }
- for _, batch := range chunkStrings(delEmails, sqliteMaxVars) {
- if err := tx.Where("inbound_id = ? AND email IN ?", c.Id, batch).
- Delete(&xray.ClientTraffic{}).Error; err != nil {
- return false, err
- }
- }
- }
- if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
- return false, err
- }
- if err := tx.Where("id = ?", c.Id).
- Delete(&model.Inbound{}).Error; err != nil {
- return false, err
- }
- delete(tagToCentral, c.Tag)
- structuralChange = true
- }
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- continue
- }
- snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
- for _, cs := range snapIb.ClientStats {
- snapEmails[cs.Email] = struct{}{}
- // Node-wide total, not this inbound's possibly-stale copy (#5274).
- canon := nodeEmailTotals[cs.Email]
- base, seen := nodeBaselines[cs.Email]
- var deltaUp, deltaDown int64
- if seen {
- if deltaUp = canon.Up - base.Up; deltaUp < 0 {
- deltaUp = canon.Up
- }
- if deltaDown = canon.Down - base.Down; deltaDown < 0 {
- deltaDown = canon.Down
- }
- }
- if _, rowExists := existingEmails[cs.Email]; !rowExists {
- if dirty {
- continue
- }
- row := &xray.ClientTraffic{
- InboundId: c.Id,
- Email: cs.Email,
- Enable: cs.Enable,
- Total: cs.Total,
- ExpiryTime: cs.ExpiryTime,
- Reset: cs.Reset,
- Up: 0,
- Down: 0,
- LastOnline: cs.LastOnline,
- }
- if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
- Create(row).Error; err != nil {
- return false, err
- }
- centralCS[csKey{c.Id, cs.Email}] = row
- centralCSByEmail[cs.Email] = row
- existingEmails[cs.Email] = struct{}{}
- structuralChange = true
- if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil {
- return false, err
- }
- nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down}
- continue
- }
- if existing := centralCSByEmail[cs.Email]; existing != nil &&
- (existing.Enable != cs.Enable ||
- existing.Total != cs.Total ||
- existing.ExpiryTime != mergeActivationExpiry(existing.ExpiryTime, cs.ExpiryTime) ||
- existing.Reset != cs.Reset) {
- structuralChange = true
- }
- enableExpr := database.ClientTrafficEnableMergeExpr()
- // expiry_time merge mirrors mergeActivationExpiry: a node that has not
- // yet seen the client's first connection keeps reporting the negative
- // "start after first connect" duration, which must never reset the
- // absolute deadline another node already activated. A positive node
- // value is still adopted (e.g. auto-renew moves the deadline forward).
- // CAST(? AS BIGINT): in the `<= 0` comparison Postgres would otherwise
- // infer int4 from the literal and overflow on real expiry values.
- if err := tx.Exec(
- fmt.Sprintf(
- `UPDATE client_traffics
- SET up = up + ?, down = down + ?, enable = %s, total = ?,
- expiry_time = CASE WHEN expiry_time > 0 AND CAST(? AS BIGINT) <= 0 THEN expiry_time ELSE CAST(? AS BIGINT) END,
- reset = ?, last_online = %s
- WHERE email = ?`,
- enableExpr,
- database.GreatestExpr("last_online", "?"),
- ),
- deltaUp, deltaDown, cs.Enable, cs.Total,
- cs.ExpiryTime, cs.ExpiryTime, cs.Reset,
- cs.LastOnline, cs.Email,
- ).Error; err != nil {
- return false, err
- }
- if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, canon.Up, canon.Down); err != nil {
- return false, err
- }
- nodeBaselines[cs.Email] = nodeTrafficCounter{Up: canon.Up, Down: canon.Down}
- }
- for k, existing := range centralCS {
- if dirty {
- continue
- }
- if k.inboundID != c.Id {
- continue
- }
- if _, kept := snapEmails[k.email]; kept {
- continue
- }
- // Gone from this inbound's stats but still reported by the node under
- // a sibling inbound: both the shared accumulator row and the (node,
- // email) baseline must survive, or the sibling's next delta would
- // compute against nothing and freeze the counter (#5202).
- if _, still := snapEmailsAll[k.email]; still {
- continue
- }
- if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
- Delete(&model.NodeClientTraffic{}).Error; err != nil {
- return false, err
- }
- // Same shared-accumulator rule as the inbound-removal sweep above:
- // keep the row while another inbound still references the email.
- stillUsed, uErr := s.emailUsedByOtherInbounds(existing.Email, c.Id)
- if uErr != nil {
- return false, uErr
- }
- if !stillUsed {
- if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
- Delete(&xray.ClientTraffic{}).Error; err != nil {
- return false, err
- }
- }
- structuralChange = true
- }
- }
- type oldSet struct {
- inboundID int
- emails map[string]struct{}
- }
- var perInboundOld []oldSet
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- continue
- }
- if dirty {
- continue
- }
- var oldEmailsRows []string
- if err := tx.Table("clients").
- Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
- Where("client_inbounds.inbound_id = ?", c.Id).
- Pluck("email", &oldEmailsRows).Error; err == nil {
- oldEmails := make(map[string]struct{}, len(oldEmailsRows))
- for _, e := range oldEmailsRows {
- if e != "" {
- oldEmails[e] = struct{}{}
- }
- }
- perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
- }
- clients, gcErr := s.GetClients(snapIb)
- if gcErr != nil {
- logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
- continue
- }
- csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
- for _, cs := range snapIb.ClientStats {
- csEnableByEmail[cs.Email] = cs.Enable
- }
- filtered := clients[:0]
- for i := range clients {
- if isClientEmailTombstoned(clients[i].Email) {
- continue
- }
- if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
- clients[i].Enable = false
- }
- filtered = append(filtered, clients[i])
- }
- localEmails := make([]string, 0, len(filtered))
- for i := range filtered {
- if filtered[i].Email != "" {
- localEmails = append(localEmails, filtered[i].Email)
- }
- }
- if len(localEmails) > 0 {
- var localMeta []struct {
- Email string
- Comment string `gorm:"column:comment"`
- }
- if err := tx.Table("clients").
- Select("email, comment").
- Where("email IN ?", localEmails).
- Find(&localMeta).Error; err == nil {
- commentByEmail := make(map[string]string, len(localMeta))
- for _, m := range localMeta {
- commentByEmail[m.Email] = m.Comment
- }
- for i := range filtered {
- if cmt, ok := commentByEmail[filtered[i].Email]; ok {
- filtered[i].Comment = cmt
- }
- }
- }
- }
- if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
- logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
- }
- }
- for _, old := range perInboundOld {
- var stillAttached []string
- if err := tx.Table("clients").
- Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
- Where("client_inbounds.inbound_id = ?", old.inboundID).
- Pluck("email", &stillAttached).Error; err != nil {
- continue
- }
- stillSet := make(map[string]struct{}, len(stillAttached))
- for _, e := range stillAttached {
- stillSet[e] = struct{}{}
- }
- for email := range old.emails {
- if _, kept := stillSet[email]; kept {
- continue
- }
- var attachmentCount int64
- if err := tx.Table("client_inbounds").
- Joins("JOIN clients ON clients.id = client_inbounds.client_id").
- Where("clients.email = ?", email).
- Count(&attachmentCount).Error; err != nil {
- continue
- }
- if attachmentCount > 0 {
- continue
- }
- if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
- }
- if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
- }
- if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
- }
- structuralChange = true
- }
- }
- if err := tx.Commit().Error; err != nil {
- return false, err
- }
- committed = true
- if p != nil {
- tree := snap.OnlineTree
- if len(tree) == 0 && len(snap.OnlineEmails) > 0 {
- // Old-build node (no GUID tree): key its flat online list under its
- // own effective identity so attribution still works for that branch.
- effectiveGuid := nodeRow.Guid
- if effectiveGuid == "" {
- effectiveGuid = synthNodeGuid(nodeID)
- }
- tree = map[string][]string{effectiveGuid: snap.OnlineEmails}
- }
- p.SetNodeOnlineTree(nodeID, tree)
- }
- return structuralChange, nil
- }
- func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
- restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
- if err != nil {
- logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
- return
- }
- if !restartOnDisable {
- return
- }
- for _, nodeID := range nodeIDs {
- nodeIDCopy := nodeID
- rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
- if rtErr != nil {
- logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
- continue
- }
- if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
- logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
- }
- }
- }
- func (s *InboundService) GetOnlineClients() []string {
- if p == nil {
- return []string{}
- }
- return p.GetOnlineClients()
- }
- // GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
- // node that physically hosts each set: this panel's own clients under its own
- // GUID, plus every node in the tree under its GUID (#4983). Replaces the old
- // node-id keying so a client three hops down is attributed to its real node,
- // not the intermediate one it was synced through.
- func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
- if p == nil {
- return map[string][]string{}
- }
- out := p.GetMergedNodeTrees()
- if local := p.GetLocalOnlineClients(); len(local) > 0 {
- if guid := s.panelGuid(); guid != "" {
- out[guid] = mergeEmails(out[guid], local)
- }
- }
- return out
- }
- // GetActiveInboundsByGuid returns the inbound tags that carried traffic within
- // the grace window for THIS panel, under its own GUID. Remote nodes don't
- // report per-inbound activity, so a GUID missing from the map means "don't
- // gate" for that node's inbounds.
- func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
- if p == nil {
- return map[string][]string{}
- }
- active := p.GetLocalActiveInbounds()
- if len(active) == 0 {
- return map[string][]string{}
- }
- guid := s.panelGuid()
- if guid == "" {
- return map[string][]string{}
- }
- return map[string][]string{guid: active}
- }
- func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
- if p != nil {
- p.SetNodeOnlineTree(nodeID, tree)
- }
- }
- func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
- if p != nil {
- p.ClearNodeOnlineClients(nodeID)
- }
- }
- // panelGuid returns this panel's stable self-identifier, used to key the local
- // panel's own clients in the per-node online maps (#4983).
- func (s *InboundService) panelGuid() string {
- guid, _ := (&SettingService{}).GetPanelGuid()
- return guid
- }
- // synthNodeGuid is the stable per-node fallback identity for a directly-attached
- // node whose panel hasn't reported a panelGuid yet (old build). Node ids are
- // master-local, so this only composes for direct nodes — exactly the pre-#4983
- // flat-topology case where an old-build node appears.
- func synthNodeGuid(nodeID int) string {
- return fmt.Sprintf("node:%d", nodeID)
- }
- // mergeEmails returns the deduped union of two email slices.
- func mergeEmails(a, b []string) []string {
- if len(a) == 0 {
- return b
- }
- seen := make(map[string]struct{}, len(a)+len(b))
- out := make([]string, 0, len(a)+len(b))
- for _, e := range a {
- if _, ok := seen[e]; !ok {
- seen[e] = struct{}{}
- out = append(out, e)
- }
- }
- for _, e := range b {
- if _, ok := seen[e]; !ok {
- seen[e] = struct{}{}
- out = append(out, e)
- }
- }
- return out
- }
- func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
- db := database.GetDB()
- var rows []xray.ClientTraffic
- err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
- if err != nil && err != gorm.ErrRecordNotFound {
- return nil, err
- }
- result := make(map[string]int64, len(rows))
- for _, r := range rows {
- result[r.Email] = r.LastOnline
- }
- return result, nil
- }
- // RefreshLocalOnlineClients folds the emails and inbound tags active on this
- // panel's own xray this poll into the local online/active sets, applying the
- // grace window and pruning stale entries. Pass nil to only prune. See
- // xray.Process for why the local sets are kept separate from the shared
- // last_online column.
- func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
- if p != nil {
- p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
- }
- }
- func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
- db := database.GetDB()
- // Step 1: Get ClientTraffic records for emails in the input list.
- // Chunked to stay under SQLite's bind-variable limit on huge inputs.
- uniqEmails := uniqueNonEmptyStrings(emails)
- clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
- for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
- var page []xray.ClientTraffic
- if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
- return nil, nil, err
- }
- clients = append(clients, page...)
- }
- // Step 2: Sort clients by (Up + Down) descending
- sort.Slice(clients, func(i, j int) bool {
- return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
- })
- // Step 3: Extract sorted valid emails and track found ones
- validEmails := make([]string, 0, len(clients))
- found := make(map[string]bool)
- for _, client := range clients {
- validEmails = append(validEmails, client.Email)
- found[client.Email] = true
- }
- // Step 4: Identify emails that were not found in the database
- extraEmails := make([]string, 0)
- for _, email := range emails {
- if !found[email] {
- extraEmails = append(extraEmails, email)
- }
- }
- return validEmails, extraEmails, nil
- }
|