| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852 |
- package service
- import (
- "context"
- "errors"
- "fmt"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/database"
- "github.com/mhsanaei/3x-ui/v3/internal/database/model"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
- "github.com/mhsanaei/3x-ui/v3/internal/xray"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- var reportedRemoteTagConflict sync.Map
- func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
- mgr := runtime.GetManager()
- if mgr == nil {
- return nil, fmt.Errorf("runtime manager not initialised")
- }
- return mgr.RuntimeFor(ib.NodeID)
- }
- func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
- if ib.NodeID == nil {
- rt, err := s.runtimeFor(ib)
- if err != nil {
- return nil, false, false, nil
- }
- return rt, true, false, nil
- }
- nodeSvc := NodeService{}
- enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
- if err != nil {
- return nil, false, false, err
- }
- if !enabled || status == "offline" {
- return nil, false, true, nil
- }
- rt, err := s.runtimeFor(ib)
- if err != nil {
- return nil, false, true, nil
- }
- return rt, true, false, nil
- }
- func (s *InboundService) NodeIsPending(nodeID *int) bool {
- if nodeID == nil {
- return false
- }
- return (&NodeService{}).IsNodePending(*nodeID)
- }
- func (s *InboundService) AnyNodePending(inboundIds []int) bool {
- if len(inboundIds) == 0 {
- return false
- }
- nodeSvc := NodeService{}
- for _, id := range inboundIds {
- ib, err := s.GetInbound(id)
- if err != nil || ib.NodeID == nil {
- continue
- }
- if nodeSvc.IsNodePending(*ib.NodeID) {
- return true
- }
- }
- return false
- }
- func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error {
- if rt == nil || nodeID <= 0 {
- return nil
- }
- db := database.GetDB()
- var inbounds []*model.Inbound
- if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
- return err
- }
- remoteTags, err := rt.ListRemoteTags(ctx)
- if err != nil {
- return err
- }
- prefix := nodeTagPrefix(&nodeID)
- desiredTags := make(map[string]struct{}, len(inbounds)*2)
- for _, ib := range inbounds {
- desiredTags[ib.Tag] = struct{}{}
- if prefix != "" {
- if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
- desiredTags[stripped] = struct{}{}
- } else {
- desiredTags[prefix+ib.Tag] = struct{}{}
- }
- }
- if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
- return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
- }
- }
- for _, tag := range remoteTags {
- if _, want := desiredTags[tag]; want {
- continue
- }
- if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
- return fmt.Errorf("reconcile delete %q: %w", tag, err)
- }
- }
- return nil
- }
- const resetGracePeriodMs int64 = 30000
- // onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
- // Xray's stats counters often report a zero delta for an active session across
- // a single poll, so a 5s grace would still drop the client on the next tick.
- // ~4 polls of slack keeps idle-but-connected clients visible without lingering
- // long after a real disconnect.
- const onlineGracePeriodMs int64 = 20000
- type nodeTrafficCounter struct {
- Up int64
- Down int64
- }
- func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email string, up, down int64) error {
- return tx.Clauses(clause.OnConflict{
- Columns: []clause.Column{{Name: "node_id"}, {Name: "email"}},
- DoUpdates: clause.AssignmentColumns([]string{"up", "down"}),
- }).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
- }
- func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
- var structuralChange bool
- err := submitTrafficWrite(func() error {
- var inner error
- structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
- return inner
- })
- return structuralChange, err
- }
- func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
- if snap == nil || nodeID <= 0 {
- return false, nil
- }
- db := database.GetDB()
- now := time.Now().UnixMilli()
- // originGuidFor attributes a synced inbound to the panel that physically
- // hosts it: inbounds the node forwards from its own sub-nodes already carry
- // a non-empty OriginNodeGuid (kept as-is across hops); the node's own local
- // inbounds report empty, so they are attributed to the node's own GUID. An
- // empty result (old-build node with no GUID yet) leaves attribution to the
- // node_id fallback downstream (#4983).
- var nodeRow model.Node
- db.Select("guid").Where("id = ?", nodeID).First(&nodeRow)
- originGuidFor := func(snapIb *model.Inbound) string {
- if snapIb.OriginNodeGuid != "" {
- return snapIb.OriginNodeGuid
- }
- return nodeRow.Guid
- }
- var central []model.Inbound
- if err := db.Model(model.Inbound{}).
- Where("node_id = ?", nodeID).
- Find(¢ral).Error; err != nil {
- return false, err
- }
- // Index under the stored tag and its prefix-flipped form so a snap matches
- // whether the n<id>- prefix lives on the node side, the central side, or
- // neither — a mismatch must never spawn a duplicate central inbound.
- tagToCentral := make(map[string]*model.Inbound, len(central)*2)
- prefix := nodeTagPrefix(&nodeID)
- for i := range central {
- tagToCentral[central[i].Tag] = ¢ral[i]
- if prefix != "" {
- if stripped, found := strings.CutPrefix(central[i].Tag, prefix); found {
- tagToCentral[stripped] = ¢ral[i]
- } else {
- tagToCentral[prefix+central[i].Tag] = ¢ral[i]
- }
- }
- }
- var centralClientStats []xray.ClientTraffic
- if len(central) > 0 {
- ids := make([]int, 0, len(central))
- for i := range central {
- ids = append(ids, central[i].Id)
- }
- if err := db.Model(xray.ClientTraffic{}).
- Where("inbound_id IN ?", ids).
- Find(¢ralClientStats).Error; err != nil {
- return false, err
- }
- }
- type csKey struct {
- inboundID int
- email string
- }
- centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
- centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
- for i := range centralClientStats {
- centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
- centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i]
- }
- nodeBaselines := make(map[string]nodeTrafficCounter)
- var baselineRows []model.NodeClientTraffic
- if err := db.Model(&model.NodeClientTraffic{}).
- Where("node_id = ?", nodeID).
- Find(&baselineRows).Error; err != nil {
- return false, err
- }
- for i := range baselineRows {
- nodeBaselines[baselineRows[i].Email] = nodeTrafficCounter{Up: baselineRows[i].Up, Down: baselineRows[i].Down}
- }
- var existingEmailsList []string
- if err := db.Model(xray.ClientTraffic{}).Pluck("email", &existingEmailsList).Error; err != nil {
- return false, err
- }
- existingEmails := make(map[string]struct{}, len(existingEmailsList))
- for _, e := range existingEmailsList {
- existingEmails[e] = struct{}{}
- }
- var defaultUserId int
- if len(central) > 0 {
- defaultUserId = central[0].UserId
- } else {
- var u model.User
- if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
- defaultUserId = u.Id
- } else {
- defaultUserId = 1
- }
- }
- tx := db.Begin()
- committed := false
- defer func() {
- if !committed {
- tx.Rollback()
- }
- }()
- structuralChange := false
- snapTags := make(map[string]struct{}, len(snap.Inbounds))
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- snapTags[snapIb.Tag] = struct{}{}
- // Record the prefix-flipped form too so the orphan sweep below keeps a
- // central inbound whether its tag carries the n<id>- prefix or not.
- if prefix != "" {
- if stripped, found := strings.CutPrefix(snapIb.Tag, prefix); found {
- snapTags[stripped] = struct{}{}
- } else {
- snapTags[prefix+snapIb.Tag] = struct{}{}
- }
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- if dirty {
- continue
- }
- // Try snap.Tag first; on collision fall back to the n<id>-
- // prefixed form so local+node can both own the same port.
- pickFreeTag := func() (string, error) {
- candidates := []string{snapIb.Tag}
- if prefix != "" && !strings.HasPrefix(snapIb.Tag, prefix) {
- candidates = append(candidates, prefix+snapIb.Tag)
- }
- for _, t := range candidates {
- var owner model.Inbound
- err := tx.Where("tag = ?", t).First(&owner).Error
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return t, nil
- }
- if err != nil {
- return "", err
- }
- }
- return "", nil
- }
- chosenTag, err := pickFreeTag()
- if err != nil {
- logger.Warningf("setRemoteTraffic: check tag %q failed: %v", snapIb.Tag, err)
- continue
- }
- if chosenTag == "" {
- key := fmt.Sprintf("%d:%s", nodeID, snapIb.Tag)
- if _, seen := reportedRemoteTagConflict.LoadOrStore(key, struct{}{}); !seen {
- logger.Warningf(
- "setRemoteTraffic: tag %q from node %d collides with an existing inbound even after the n%d- prefix — skipping (rename one side to remove the duplicate)",
- snapIb.Tag, nodeID, nodeID,
- )
- }
- continue
- }
- newIb := model.Inbound{
- UserId: defaultUserId,
- NodeID: &nodeID,
- OriginNodeGuid: originGuidFor(snapIb),
- Tag: chosenTag,
- Listen: snapIb.Listen,
- Port: snapIb.Port,
- Protocol: snapIb.Protocol,
- Settings: snapIb.Settings,
- StreamSettings: snapIb.StreamSettings,
- Sniffing: snapIb.Sniffing,
- TrafficReset: snapIb.TrafficReset,
- LastTrafficResetTime: snapIb.LastTrafficResetTime,
- Enable: snapIb.Enable,
- Remark: snapIb.Remark,
- Total: snapIb.Total,
- ExpiryTime: snapIb.ExpiryTime,
- Up: snapIb.Up,
- Down: snapIb.Down,
- }
- if err := tx.Create(&newIb).Error; err != nil {
- logger.Warningf("setRemoteTraffic: create central inbound for tag %q failed: %v", snapIb.Tag, err)
- continue
- }
- tagToCentral[snapIb.Tag] = &newIb
- if newIb.Tag != snapIb.Tag {
- tagToCentral[newIb.Tag] = &newIb
- }
- structuralChange = true
- continue
- }
- inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
- updates := map[string]any{}
- if !dirty {
- updates["enable"] = snapIb.Enable
- updates["remark"] = snapIb.Remark
- updates["listen"] = snapIb.Listen
- updates["port"] = snapIb.Port
- updates["protocol"] = snapIb.Protocol
- updates["total"] = snapIb.Total
- updates["expiry_time"] = snapIb.ExpiryTime
- updates["settings"] = snapIb.Settings
- updates["stream_settings"] = snapIb.StreamSettings
- updates["sniffing"] = snapIb.Sniffing
- updates["traffic_reset"] = snapIb.TrafficReset
- updates["last_traffic_reset_time"] = snapIb.LastTrafficResetTime
- }
- if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
- updates["up"] = snapIb.Up
- updates["down"] = snapIb.Down
- }
- // Physical-home attribution is independent of config-dirty state, so
- // keep it current even while the node has pending offline edits. Writes
- // once to backfill an existing row, then stays equal (#4983).
- if og := originGuidFor(snapIb); c.OriginNodeGuid != og {
- updates["origin_node_guid"] = og
- }
- if !dirty && (c.Settings != snapIb.Settings ||
- c.Remark != snapIb.Remark ||
- c.Listen != snapIb.Listen ||
- c.Port != snapIb.Port ||
- c.Total != snapIb.Total ||
- c.ExpiryTime != snapIb.ExpiryTime ||
- c.Enable != snapIb.Enable) {
- structuralChange = true
- }
- if len(updates) > 0 {
- if err := tx.Model(model.Inbound{}).
- Where("id = ?", c.Id).
- Updates(updates).Error; err != nil {
- return false, err
- }
- }
- }
- for _, c := range central {
- if dirty {
- continue
- }
- if _, kept := snapTags[c.Tag]; kept {
- continue
- }
- var goneEmails []string
- if err := tx.Model(xray.ClientTraffic{}).
- Where("inbound_id = ?", c.Id).
- Pluck("email", &goneEmails).Error; err != nil {
- return false, err
- }
- if len(goneEmails) > 0 {
- // Chunk to avoid SQLite bind var limit when a node has many clients
- // removed (e.g. after API bulk delete or structural change on node inbound).
- for _, batch := range chunkStrings(goneEmails, sqliteMaxVars) {
- if err := tx.Where("node_id = ? AND email IN ?", nodeID, batch).
- Delete(&model.NodeClientTraffic{}).Error; err != nil {
- return false, err
- }
- }
- }
- if err := tx.Where("inbound_id = ?", c.Id).
- Delete(&xray.ClientTraffic{}).Error; err != nil {
- return false, err
- }
- if err := s.clientService.DetachInbound(tx, c.Id); err != nil {
- return false, err
- }
- if err := tx.Where("id = ?", c.Id).
- Delete(&model.Inbound{}).Error; err != nil {
- return false, err
- }
- delete(tagToCentral, c.Tag)
- structuralChange = true
- }
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- continue
- }
- snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
- for _, cs := range snapIb.ClientStats {
- snapEmails[cs.Email] = struct{}{}
- base, seen := nodeBaselines[cs.Email]
- var deltaUp, deltaDown int64
- if seen {
- if deltaUp = cs.Up - base.Up; deltaUp < 0 {
- deltaUp = cs.Up
- }
- if deltaDown = cs.Down - base.Down; deltaDown < 0 {
- deltaDown = cs.Down
- }
- }
- if _, rowExists := existingEmails[cs.Email]; !rowExists {
- if dirty {
- continue
- }
- row := &xray.ClientTraffic{
- InboundId: c.Id,
- Email: cs.Email,
- Enable: cs.Enable,
- Total: cs.Total,
- ExpiryTime: cs.ExpiryTime,
- Reset: cs.Reset,
- Up: cs.Up,
- Down: cs.Down,
- LastOnline: cs.LastOnline,
- }
- if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
- Create(row).Error; err != nil {
- return false, err
- }
- centralCS[csKey{c.Id, cs.Email}] = row
- centralCSByEmail[cs.Email] = row
- existingEmails[cs.Email] = struct{}{}
- structuralChange = true
- if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
- return false, err
- }
- nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
- continue
- }
- if existing := centralCSByEmail[cs.Email]; existing != nil &&
- (existing.Enable != cs.Enable ||
- existing.Total != cs.Total ||
- existing.ExpiryTime != cs.ExpiryTime ||
- existing.Reset != cs.Reset) {
- structuralChange = true
- }
- enableExpr := database.ClientTrafficEnableMergeExpr()
- if err := tx.Exec(
- fmt.Sprintf(
- `UPDATE client_traffics
- SET up = up + ?, down = down + ?, enable = %s, total = ?, expiry_time = ?, reset = ?,
- last_online = %s
- WHERE email = ?`,
- enableExpr,
- database.GreatestExpr("last_online", "?"),
- ),
- deltaUp, deltaDown, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
- cs.LastOnline, cs.Email,
- ).Error; err != nil {
- return false, err
- }
- if err := s.upsertNodeBaseline(tx, nodeID, cs.Email, cs.Up, cs.Down); err != nil {
- return false, err
- }
- nodeBaselines[cs.Email] = nodeTrafficCounter{Up: cs.Up, Down: cs.Down}
- }
- for k, existing := range centralCS {
- if dirty {
- continue
- }
- if k.inboundID != c.Id {
- continue
- }
- if _, kept := snapEmails[k.email]; kept {
- continue
- }
- if err := tx.Where("node_id = ? AND email = ?", nodeID, existing.Email).
- Delete(&model.NodeClientTraffic{}).Error; err != nil {
- return false, err
- }
- if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
- Delete(&xray.ClientTraffic{}).Error; err != nil {
- return false, err
- }
- structuralChange = true
- }
- }
- type oldSet struct {
- inboundID int
- emails map[string]struct{}
- }
- var perInboundOld []oldSet
- for _, snapIb := range snap.Inbounds {
- if snapIb == nil {
- continue
- }
- c, ok := tagToCentral[snapIb.Tag]
- if !ok {
- continue
- }
- if dirty {
- continue
- }
- var oldEmailsRows []string
- if err := tx.Table("clients").
- Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
- Where("client_inbounds.inbound_id = ?", c.Id).
- Pluck("email", &oldEmailsRows).Error; err == nil {
- oldEmails := make(map[string]struct{}, len(oldEmailsRows))
- for _, e := range oldEmailsRows {
- if e != "" {
- oldEmails[e] = struct{}{}
- }
- }
- perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
- }
- clients, gcErr := s.GetClients(snapIb)
- if gcErr != nil {
- logger.Warningf("setRemoteTraffic: parse clients for tag %q failed: %v", snapIb.Tag, gcErr)
- continue
- }
- csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
- for _, cs := range snapIb.ClientStats {
- csEnableByEmail[cs.Email] = cs.Enable
- }
- filtered := clients[:0]
- for i := range clients {
- if isClientEmailTombstoned(clients[i].Email) {
- continue
- }
- if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
- clients[i].Enable = false
- }
- filtered = append(filtered, clients[i])
- }
- localEmails := make([]string, 0, len(filtered))
- for i := range filtered {
- if filtered[i].Email != "" {
- localEmails = append(localEmails, filtered[i].Email)
- }
- }
- if len(localEmails) > 0 {
- var localMeta []struct {
- Email string
- Comment string `gorm:"column:comment"`
- }
- if err := tx.Table("clients").
- Select("email, comment").
- Where("email IN ?", localEmails).
- Find(&localMeta).Error; err == nil {
- commentByEmail := make(map[string]string, len(localMeta))
- for _, m := range localMeta {
- commentByEmail[m.Email] = m.Comment
- }
- for i := range filtered {
- if cmt, ok := commentByEmail[filtered[i].Email]; ok {
- filtered[i].Comment = cmt
- }
- }
- }
- }
- if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
- logger.Warningf("setRemoteTraffic: sync clients for tag %q failed: %v", snapIb.Tag, err)
- }
- }
- for _, old := range perInboundOld {
- var stillAttached []string
- if err := tx.Table("clients").
- Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
- Where("client_inbounds.inbound_id = ?", old.inboundID).
- Pluck("email", &stillAttached).Error; err != nil {
- continue
- }
- stillSet := make(map[string]struct{}, len(stillAttached))
- for _, e := range stillAttached {
- stillSet[e] = struct{}{}
- }
- for email := range old.emails {
- if _, kept := stillSet[email]; kept {
- continue
- }
- var attachmentCount int64
- if err := tx.Table("client_inbounds").
- Joins("JOIN clients ON clients.id = client_inbounds.client_id").
- Where("clients.email = ?", email).
- Count(&attachmentCount).Error; err != nil {
- continue
- }
- if attachmentCount > 0 {
- continue
- }
- if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete ClientRecord %q failed: %v", email, err)
- }
- if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete ClientTraffic %q failed: %v", email, err)
- }
- if err := tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error; err != nil {
- logger.Warningf("setRemoteTraffic: delete NodeClientTraffic %q failed: %v", email, err)
- }
- structuralChange = true
- }
- }
- if err := tx.Commit().Error; err != nil {
- return false, err
- }
- committed = true
- if p != nil {
- tree := snap.OnlineTree
- if len(tree) == 0 && len(snap.OnlineEmails) > 0 {
- // Old-build node (no GUID tree): key its flat online list under its
- // own effective identity so attribution still works for that branch.
- effectiveGuid := nodeRow.Guid
- if effectiveGuid == "" {
- effectiveGuid = synthNodeGuid(nodeID)
- }
- tree = map[string][]string{effectiveGuid: snap.OnlineEmails}
- }
- p.SetNodeOnlineTree(nodeID, tree)
- }
- return structuralChange, nil
- }
- func (s *InboundService) restartRemoteNodesOnDisable(nodeIDs []int) {
- restartOnDisable, err := (&SettingService{}).GetRestartXrayOnClientDisable()
- if err != nil {
- logger.Warning("disableInvalidClients: get RestartXrayOnClientDisable failed:", err)
- return
- }
- if !restartOnDisable {
- return
- }
- for _, nodeID := range nodeIDs {
- nodeIDCopy := nodeID
- rt, rtErr := runtime.GetManager().RuntimeFor(&nodeIDCopy)
- if rtErr != nil {
- logger.Warning("disableInvalidClients: get runtime for node", nodeID, "failed:", rtErr)
- continue
- }
- if rtErr = rt.RestartXray(context.Background()); rtErr != nil {
- logger.Warning("disableInvalidClients: restart xray on node", nodeID, "failed:", rtErr)
- }
- }
- }
- func (s *InboundService) GetOnlineClients() []string {
- if p == nil {
- return []string{}
- }
- return p.GetOnlineClients()
- }
- // GetOnlineClientsByGuid returns online emails keyed by the panelGuid of the
- // node that physically hosts each set: this panel's own clients under its own
- // GUID, plus every node in the tree under its GUID (#4983). Replaces the old
- // node-id keying so a client three hops down is attributed to its real node,
- // not the intermediate one it was synced through.
- func (s *InboundService) GetOnlineClientsByGuid() map[string][]string {
- if p == nil {
- return map[string][]string{}
- }
- out := p.GetMergedNodeTrees()
- if local := p.GetLocalOnlineClients(); len(local) > 0 {
- if guid := s.panelGuid(); guid != "" {
- out[guid] = mergeEmails(out[guid], local)
- }
- }
- return out
- }
- // GetActiveInboundsByGuid returns the inbound tags that carried traffic within
- // the grace window for THIS panel, under its own GUID. Remote nodes don't
- // report per-inbound activity, so a GUID missing from the map means "don't
- // gate" for that node's inbounds.
- func (s *InboundService) GetActiveInboundsByGuid() map[string][]string {
- if p == nil {
- return map[string][]string{}
- }
- active := p.GetLocalActiveInbounds()
- if len(active) == 0 {
- return map[string][]string{}
- }
- guid := s.panelGuid()
- if guid == "" {
- return map[string][]string{}
- }
- return map[string][]string{guid: active}
- }
- func (s *InboundService) SetNodeOnlineTree(nodeID int, tree map[string][]string) {
- if p != nil {
- p.SetNodeOnlineTree(nodeID, tree)
- }
- }
- func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
- if p != nil {
- p.ClearNodeOnlineClients(nodeID)
- }
- }
- // panelGuid returns this panel's stable self-identifier, used to key the local
- // panel's own clients in the per-node online maps (#4983).
- func (s *InboundService) panelGuid() string {
- guid, _ := (&SettingService{}).GetPanelGuid()
- return guid
- }
- // synthNodeGuid is the stable per-node fallback identity for a directly-attached
- // node whose panel hasn't reported a panelGuid yet (old build). Node ids are
- // master-local, so this only composes for direct nodes — exactly the pre-#4983
- // flat-topology case where an old-build node appears.
- func synthNodeGuid(nodeID int) string {
- return fmt.Sprintf("node:%d", nodeID)
- }
- // mergeEmails returns the deduped union of two email slices.
- func mergeEmails(a, b []string) []string {
- if len(a) == 0 {
- return b
- }
- seen := make(map[string]struct{}, len(a)+len(b))
- out := make([]string, 0, len(a)+len(b))
- for _, e := range a {
- if _, ok := seen[e]; !ok {
- seen[e] = struct{}{}
- out = append(out, e)
- }
- }
- for _, e := range b {
- if _, ok := seen[e]; !ok {
- seen[e] = struct{}{}
- out = append(out, e)
- }
- }
- return out
- }
- func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
- db := database.GetDB()
- var rows []xray.ClientTraffic
- err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
- if err != nil && err != gorm.ErrRecordNotFound {
- return nil, err
- }
- result := make(map[string]int64, len(rows))
- for _, r := range rows {
- result[r.Email] = r.LastOnline
- }
- return result, nil
- }
- // RefreshLocalOnlineClients folds the emails and inbound tags active on this
- // panel's own xray this poll into the local online/active sets, applying the
- // grace window and pruning stale entries. Pass nil to only prune. See
- // xray.Process for why the local sets are kept separate from the shared
- // last_online column.
- func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
- if p != nil {
- p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
- }
- }
- func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
- db := database.GetDB()
- // Step 1: Get ClientTraffic records for emails in the input list.
- // Chunked to stay under SQLite's bind-variable limit on huge inputs.
- uniqEmails := uniqueNonEmptyStrings(emails)
- clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
- for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
- var page []xray.ClientTraffic
- if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
- return nil, nil, err
- }
- clients = append(clients, page...)
- }
- // Step 2: Sort clients by (Up + Down) descending
- sort.Slice(clients, func(i, j int) bool {
- return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
- })
- // Step 3: Extract sorted valid emails and track found ones
- validEmails := make([]string, 0, len(clients))
- found := make(map[string]bool)
- for _, client := range clients {
- validEmails = append(validEmails, client.Email)
- found[client.Email] = true
- }
- // Step 4: Identify emails that were not found in the database
- extraEmails := make([]string, 0)
- for _, email := range emails {
- if !found[email] {
- extraEmails = append(extraEmails, email)
- }
- }
- return validEmails, extraEmails, nil
- }
|