|
@@ -23,25 +23,10 @@ import (
|
|
|
"gorm.io/gorm/clause"
|
|
"gorm.io/gorm/clause"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// InboundService provides business logic for managing Xray inbound configurations.
|
|
|
|
|
-// It handles CRUD operations for inbounds, client management, traffic monitoring,
|
|
|
|
|
-// and integration with the Xray API for real-time updates.
|
|
|
|
|
type InboundService struct {
|
|
type InboundService struct {
|
|
|
- // xrayApi is retained for backwards compatibility with bulk paths
|
|
|
|
|
- // that still talk to the local engine directly (e.g. traffic-reset
|
|
|
|
|
- // jobs that scope to NodeID IS NULL inbounds anyway). New code paths
|
|
|
|
|
- // route through runtimeFor() instead so they can target remote nodes.
|
|
|
|
|
xrayApi xray.XrayAPI
|
|
xrayApi xray.XrayAPI
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// runtimeFor returns the Runtime adapter for an inbound's destination
|
|
|
|
|
-// engine. Returns the local runtime when the inbound has no NodeID
|
|
|
|
|
-// (legacy/local inbounds); otherwise the cached Remote for that node.
|
|
|
|
|
-//
|
|
|
|
|
-// nil is returned only when the runtime Manager hasn't been wired yet
|
|
|
|
|
-// (extremely early bootstrap). Callers treat nil as a transient error
|
|
|
|
|
-// and either fall back to needRestart=true or surface "panel still
|
|
|
|
|
-// starting" upstream.
|
|
|
|
|
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
|
|
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
|
|
|
mgr := runtime.GetManager()
|
|
mgr := runtime.GetManager()
|
|
|
if mgr == nil {
|
|
if mgr == nil {
|
|
@@ -399,10 +384,6 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
|
|
|
if inbound.Enable {
|
|
if inbound.Enable {
|
|
|
rt, rterr := s.runtimeFor(inbound)
|
|
rt, rterr := s.runtimeFor(inbound)
|
|
|
if rterr != nil {
|
|
if rterr != nil {
|
|
|
- // Fail-fast on remote routing errors. Assign to the named
|
|
|
|
|
- // `err` so the deferred tx handler rolls back the central
|
|
|
|
|
- // DB row that tx.Save just inserted — otherwise we'd leave
|
|
|
|
|
- // an orphan that the user sees succeed despite the toast.
|
|
|
|
|
err = rterr
|
|
err = rterr
|
|
|
return inbound, false, err
|
|
return inbound, false, err
|
|
|
}
|
|
}
|
|
@@ -411,12 +392,9 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
|
|
|
} else {
|
|
} else {
|
|
|
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
|
|
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
|
|
|
if inbound.NodeID != nil {
|
|
if inbound.NodeID != nil {
|
|
|
- // Remote add failed — roll back so central + node stay
|
|
|
|
|
- // in sync (no row on either side).
|
|
|
|
|
err = err1
|
|
err = err1
|
|
|
return inbound, false, err
|
|
return inbound, false, err
|
|
|
}
|
|
}
|
|
|
- // Local: keep the existing fall-through-to-restart behaviour.
|
|
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -424,25 +402,13 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
|
|
|
return inbound, needRestart, err
|
|
return inbound, needRestart, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// DelInbound deletes an inbound configuration by ID.
|
|
|
|
|
-// It removes the inbound from the database and the running Xray instance if active.
|
|
|
|
|
-// Returns whether Xray needs restart and any error.
|
|
|
|
|
func (s *InboundService) DelInbound(id int) (bool, error) {
|
|
func (s *InboundService) DelInbound(id int) (bool, error) {
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
|
|
|
|
|
needRestart := false
|
|
needRestart := false
|
|
|
- // Load the full inbound (not just the tag) so we know its NodeID and
|
|
|
|
|
- // can route the runtime call to the right engine. Skip-on-not-found
|
|
|
|
|
- // preserves the old "no-op when DB row doesn't exist" behaviour.
|
|
|
|
|
var ib model.Inbound
|
|
var ib model.Inbound
|
|
|
loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error
|
|
loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error
|
|
|
if loadErr == nil {
|
|
if loadErr == nil {
|
|
|
- // Delete is best-effort on the runtime side: the user's intent is
|
|
|
|
|
- // to get rid of the inbound, so a missing node row, an offline
|
|
|
|
|
- // node, or a remote-side "already gone" should NEVER block the
|
|
|
|
|
- // central DB cleanup. Worst case the remote keeps an orphan that
|
|
|
|
|
- // the user can clean up manually — far less painful than the row
|
|
|
|
|
- // being stuck on central.
|
|
|
|
|
rt, rterr := s.runtimeFor(&ib)
|
|
rt, rterr := s.runtimeFor(&ib)
|
|
|
if rterr != nil {
|
|
if rterr != nil {
|
|
|
logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
|
|
logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
|
|
@@ -531,11 +497,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
|
|
|
}
|
|
}
|
|
|
inbound.Enable = enable
|
|
inbound.Enable = enable
|
|
|
|
|
|
|
|
- // Sync xray runtime via the Runtime adapter. For local inbounds we
|
|
|
|
|
- // also rebuild the runtime config (drops clients flagged as disabled
|
|
|
|
|
- // in ClientTraffic) so the live xray sees the same filtered view it
|
|
|
|
|
- // did pre-refactor. Remote runtimes ship the unfiltered inbound —
|
|
|
|
|
- // the remote panel does its own filtering before pushing to its xray.
|
|
|
|
|
needRestart := false
|
|
needRestart := false
|
|
|
rt, rterr := s.runtimeFor(inbound)
|
|
rt, rterr := s.runtimeFor(inbound)
|
|
|
if rterr != nil {
|
|
if rterr != nil {
|
|
@@ -573,9 +534,6 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
|
|
|
return needRestart, nil
|
|
return needRestart, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// UpdateInbound modifies an existing inbound configuration.
|
|
|
|
|
-// It validates changes, updates the database, and syncs with the running Xray instance.
|
|
|
|
|
-// Returns the updated inbound, whether Xray needs restart, and any error.
|
|
|
|
|
func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
|
|
func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
|
|
|
exist, err := s.checkPortConflict(inbound, inbound.Id)
|
|
exist, err := s.checkPortConflict(inbound, inbound.Id)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -667,8 +625,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- oldInbound.Up = inbound.Up
|
|
|
|
|
- oldInbound.Down = inbound.Down
|
|
|
|
|
oldInbound.Total = inbound.Total
|
|
oldInbound.Total = inbound.Total
|
|
|
oldInbound.Remark = inbound.Remark
|
|
oldInbound.Remark = inbound.Remark
|
|
|
oldInbound.Enable = inbound.Enable
|
|
oldInbound.Enable = inbound.Enable
|
|
@@ -696,13 +652,9 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
|
|
|
}
|
|
}
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
} else {
|
|
} else {
|
|
|
- // Use a snapshot of the OLD tag so the remote can resolve its
|
|
|
|
|
- // remote-id even when the new tag has changed (port/listen edit).
|
|
|
|
|
oldSnapshot := *oldInbound
|
|
oldSnapshot := *oldInbound
|
|
|
oldSnapshot.Tag = tag
|
|
oldSnapshot.Tag = tag
|
|
|
if oldInbound.NodeID == nil {
|
|
if oldInbound.NodeID == nil {
|
|
|
- // Local: keep the old del-then-add-filtered behaviour to
|
|
|
|
|
- // preserve runtime client filtering.
|
|
|
|
|
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
|
|
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
|
|
|
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
|
|
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
|
|
|
}
|
|
}
|
|
@@ -719,10 +671,6 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // Remote: a single UpdateInbound call (the Remote adapter
|
|
|
|
|
- // resolves remote-id by old tag, then POSTs /update/{id}).
|
|
|
|
|
- // Assign to the outer `err` on failure so the deferred tx
|
|
|
|
|
- // handler rolls back the central DB write.
|
|
|
|
|
if !inbound.Enable {
|
|
if !inbound.Enable {
|
|
|
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
|
|
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
|
|
|
err = err2
|
|
err = err2
|
|
@@ -851,13 +799,15 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // Added clients — create their stats rows.
|
|
|
|
|
for i := range newClients {
|
|
for i := range newClients {
|
|
|
email := newClients[i].Email
|
|
email := newClients[i].Email
|
|
|
if email == "" {
|
|
if email == "" {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
if _, existed := oldEmails[email]; existed {
|
|
if _, existed := oldEmails[email]; existed {
|
|
|
|
|
+ if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
|
|
if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
|
|
@@ -964,9 +914,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
|
|
|
}
|
|
}
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
} else if oldInbound.NodeID == nil {
|
|
} else if oldInbound.NodeID == nil {
|
|
|
- // Local: per-client AddUser keeps existing connections alive
|
|
|
|
|
- // (incremental hot-add). Walk every new client; on any failure
|
|
|
|
|
- // fall back to needRestart so cron rebuilds from scratch.
|
|
|
|
|
for _, client := range clients {
|
|
for _, client := range clients {
|
|
|
if len(client.Email) == 0 {
|
|
if len(client.Email) == 0 {
|
|
|
needRestart = true
|
|
needRestart = true
|
|
@@ -997,11 +944,6 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // Remote: a single UpdateInbound ships the new clients in one
|
|
|
|
|
- // HTTP round-trip rather than N. Settings are already mutated
|
|
|
|
|
- // in-memory (oldInbound.Settings) so the remote sees the final
|
|
|
|
|
- // state. Per-client ClientStat rows still need the central DB
|
|
|
|
|
- // update so the loop runs that branch first.
|
|
|
|
|
for _, client := range clients {
|
|
for _, client := range clients {
|
|
|
if len(client.Email) > 0 {
|
|
if len(client.Email) > 0 {
|
|
|
s.AddClientStat(tx, data.Id, &client)
|
|
s.AddClientStat(tx, data.Id, &client)
|
|
@@ -1318,8 +1260,6 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool,
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // Remote: settings already mutated above; one UpdateInbound
|
|
|
|
|
- // ships the post-deletion state to the node.
|
|
|
|
|
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
|
|
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
|
|
|
return false, err1
|
|
return false, err1
|
|
|
}
|
|
}
|
|
@@ -1530,8 +1470,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
|
|
|
}
|
|
}
|
|
|
needRestart = true
|
|
needRestart = true
|
|
|
} else if oldInbound.NodeID == nil {
|
|
} else if oldInbound.NodeID == nil {
|
|
|
- // Local: paired Remove+Add on the live xray, keeping other
|
|
|
|
|
- // clients online (full-restart fallback on partial failure).
|
|
|
|
|
if oldClients[clientIndex].Enable {
|
|
if oldClients[clientIndex].Enable {
|
|
|
err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
|
|
err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
|
|
|
if err1 == nil {
|
|
if err1 == nil {
|
|
@@ -1565,7 +1503,6 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // Remote: settings already mutated; one UpdateInbound suffices.
|
|
|
|
|
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
|
|
if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
|
|
|
err = err1
|
|
err = err1
|
|
|
return false, err
|
|
return false, err
|
|
@@ -1578,43 +1515,69 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
|
|
|
return needRestart, tx.Save(oldInbound).Error
|
|
return needRestart, tx.Save(oldInbound).Error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// resetGracePeriodMs is the window after a reset during which incoming
|
|
|
|
|
-// traffic snapshots from the node are ignored if they would resurrect
|
|
|
|
|
-// non-zero counters. Three sync ticks (10s each) is enough headroom for
|
|
|
|
|
-// the central → node reset HTTP call to land before the next pull.
|
|
|
|
|
const resetGracePeriodMs int64 = 30000
|
|
const resetGracePeriodMs int64 = 30000
|
|
|
|
|
|
|
|
-// SetRemoteTraffic merges absolute counters from a remote node into the
|
|
|
|
|
-// central DB. Unlike AddTraffic, which adds deltas pulled from the local
|
|
|
|
|
-// xray gRPC stats endpoint, this SETs the values — the node already has
|
|
|
|
|
-// the canonical absolute value and we just mirror it.
|
|
|
|
|
-//
|
|
|
|
|
-// Rows in the post-reset grace window are skipped if the snapshot would
|
|
|
|
|
-// regress them, so a user-initiated reset survives until the propagation
|
|
|
|
|
-// HTTP call has completed on the node. After the grace window expires
|
|
|
|
|
-// the snapshot wins regardless (the node is authoritative for the
|
|
|
|
|
-// inbounds it hosts).
|
|
|
|
|
-func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) error {
|
|
|
|
|
|
|
+func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
|
|
|
|
|
+ var structuralChange bool
|
|
|
|
|
+ err := submitTrafficWrite(func() error {
|
|
|
|
|
+ var inner error
|
|
|
|
|
+ structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
|
|
|
|
|
+ return inner
|
|
|
|
|
+ })
|
|
|
|
|
+ return structuralChange, err
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
|
|
|
if snap == nil || nodeID <= 0 {
|
|
if snap == nil || nodeID <= 0 {
|
|
|
- return nil
|
|
|
|
|
|
|
+ return false, nil
|
|
|
}
|
|
}
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
now := time.Now().UnixMilli()
|
|
now := time.Now().UnixMilli()
|
|
|
|
|
|
|
|
- // Load central inbounds for this node so we can resolve tag→id and
|
|
|
|
|
- // honour the per-inbound grace window. One query covers every row
|
|
|
|
|
- // touched in this tick.
|
|
|
|
|
var central []model.Inbound
|
|
var central []model.Inbound
|
|
|
if err := db.Model(model.Inbound{}).
|
|
if err := db.Model(model.Inbound{}).
|
|
|
Where("node_id = ?", nodeID).
|
|
Where("node_id = ?", nodeID).
|
|
|
Find(¢ral).Error; err != nil {
|
|
Find(¢ral).Error; err != nil {
|
|
|
- return err
|
|
|
|
|
|
|
+ return false, err
|
|
|
}
|
|
}
|
|
|
tagToCentral := make(map[string]*model.Inbound, len(central))
|
|
tagToCentral := make(map[string]*model.Inbound, len(central))
|
|
|
for i := range central {
|
|
for i := range central {
|
|
|
tagToCentral[central[i].Tag] = ¢ral[i]
|
|
tagToCentral[central[i].Tag] = ¢ral[i]
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ var centralClientStats []xray.ClientTraffic
|
|
|
|
|
+ if len(central) > 0 {
|
|
|
|
|
+ ids := make([]int, 0, len(central))
|
|
|
|
|
+ for i := range central {
|
|
|
|
|
+ ids = append(ids, central[i].Id)
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("inbound_id IN ?", ids).
|
|
|
|
|
+ Find(¢ralClientStats).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ type csKey struct {
|
|
|
|
|
+ inboundID int
|
|
|
|
|
+ email string
|
|
|
|
|
+ }
|
|
|
|
|
+ centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
|
|
|
|
|
+ for i := range centralClientStats {
|
|
|
|
|
+ centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ var defaultUserId int
|
|
|
|
|
+ if len(central) > 0 {
|
|
|
|
|
+ defaultUserId = central[0].UserId
|
|
|
|
|
+ } else {
|
|
|
|
|
+ var u model.User
|
|
|
|
|
+ if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
|
|
|
|
|
+ defaultUserId = u.Id
|
|
|
|
|
+ } else {
|
|
|
|
|
+ defaultUserId = 1
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
tx := db.Begin()
|
|
tx := db.Begin()
|
|
|
committed := false
|
|
committed := false
|
|
|
defer func() {
|
|
defer func() {
|
|
@@ -1623,42 +1586,101 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
- // Per-inbound counter merge. Skip rows whose central allTime is
|
|
|
|
|
- // suspiciously lower than the snapshot AND we're inside the grace
|
|
|
|
|
- // window — that's the "reset hit central but not the node yet"
|
|
|
|
|
- // pattern we want to defer until next tick.
|
|
|
|
|
|
|
+ structuralChange := false
|
|
|
|
|
+
|
|
|
|
|
+ snapTags := make(map[string]struct{}, len(snap.Inbounds))
|
|
|
for _, snapIb := range snap.Inbounds {
|
|
for _, snapIb := range snap.Inbounds {
|
|
|
if snapIb == nil {
|
|
if snapIb == nil {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
+ snapTags[snapIb.Tag] = struct{}{}
|
|
|
|
|
+
|
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
c, ok := tagToCentral[snapIb.Tag]
|
|
|
if !ok {
|
|
if !ok {
|
|
|
- continue // node has an inbound the central doesn't know about — ignore
|
|
|
|
|
- }
|
|
|
|
|
- snapAllTime := snapIb.AllTime
|
|
|
|
|
- if snapAllTime == 0 {
|
|
|
|
|
- snapAllTime = snapIb.Up + snapIb.Down
|
|
|
|
|
|
|
+ newIb := model.Inbound{
|
|
|
|
|
+ UserId: defaultUserId,
|
|
|
|
|
+ NodeID: &nodeID,
|
|
|
|
|
+ Tag: snapIb.Tag,
|
|
|
|
|
+ Listen: snapIb.Listen,
|
|
|
|
|
+ Port: snapIb.Port,
|
|
|
|
|
+ Protocol: snapIb.Protocol,
|
|
|
|
|
+ Settings: snapIb.Settings,
|
|
|
|
|
+ StreamSettings: snapIb.StreamSettings,
|
|
|
|
|
+ Sniffing: snapIb.Sniffing,
|
|
|
|
|
+ TrafficReset: snapIb.TrafficReset,
|
|
|
|
|
+ Enable: snapIb.Enable,
|
|
|
|
|
+ Remark: snapIb.Remark,
|
|
|
|
|
+ Total: snapIb.Total,
|
|
|
|
|
+ ExpiryTime: snapIb.ExpiryTime,
|
|
|
|
|
+ Up: snapIb.Up,
|
|
|
|
|
+ Down: snapIb.Down,
|
|
|
|
|
+ AllTime: snapIb.AllTime,
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := tx.Create(&newIb).Error; err != nil {
|
|
|
|
|
+ logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ tagToCentral[snapIb.Tag] = &newIb
|
|
|
|
|
+ structuralChange = true
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
|
- if inGrace && snapAllTime > c.AllTime {
|
|
|
|
|
- logger.Debug("SetRemoteTraffic: skipping inbound", c.Id, "in reset grace window")
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+
|
|
|
|
|
+ updates := map[string]any{
|
|
|
|
|
+ "enable": snapIb.Enable,
|
|
|
|
|
+ "remark": snapIb.Remark,
|
|
|
|
|
+ "listen": snapIb.Listen,
|
|
|
|
|
+ "port": snapIb.Port,
|
|
|
|
|
+ "protocol": snapIb.Protocol,
|
|
|
|
|
+ "total": snapIb.Total,
|
|
|
|
|
+ "expiry_time": snapIb.ExpiryTime,
|
|
|
|
|
+ "settings": snapIb.Settings,
|
|
|
|
|
+ "stream_settings": snapIb.StreamSettings,
|
|
|
|
|
+ "sniffing": snapIb.Sniffing,
|
|
|
|
|
+ "traffic_reset": snapIb.TrafficReset,
|
|
|
|
|
+ }
|
|
|
|
|
+ if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
|
|
|
|
|
+ updates["up"] = snapIb.Up
|
|
|
|
|
+ updates["down"] = snapIb.Down
|
|
|
|
|
+ }
|
|
|
|
|
+ if snapIb.AllTime > c.AllTime {
|
|
|
|
|
+ updates["all_time"] = snapIb.AllTime
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if c.Settings != snapIb.Settings ||
|
|
|
|
|
+ c.Remark != snapIb.Remark ||
|
|
|
|
|
+ c.Listen != snapIb.Listen ||
|
|
|
|
|
+ c.Port != snapIb.Port ||
|
|
|
|
|
+ c.Total != snapIb.Total ||
|
|
|
|
|
+ c.ExpiryTime != snapIb.ExpiryTime ||
|
|
|
|
|
+ c.Enable != snapIb.Enable {
|
|
|
|
|
+ structuralChange = true
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
if err := tx.Model(model.Inbound{}).
|
|
if err := tx.Model(model.Inbound{}).
|
|
|
Where("id = ?", c.Id).
|
|
Where("id = ?", c.Id).
|
|
|
- Updates(map[string]any{
|
|
|
|
|
- "up": snapIb.Up,
|
|
|
|
|
- "down": snapIb.Down,
|
|
|
|
|
- "all_time": snapAllTime,
|
|
|
|
|
- }).Error; err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+ Updates(updates).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Per-client merge. The snapshot's ClientStats are nested under
|
|
|
|
|
- // each Inbound, so flatten before walking. Each client_traffics row
|
|
|
|
|
- // is keyed by (inbound_id, email) — we resolve inbound_id from the
|
|
|
|
|
- // central inbound row matched above.
|
|
|
|
|
|
|
+ for _, c := range central {
|
|
|
|
|
+ if _, kept := snapTags[c.Tag]; kept {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := tx.Where("inbound_id = ?", c.Id).
|
|
|
|
|
+ Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := tx.Where("id = ?", c.Id).
|
|
|
|
|
+ Delete(&model.Inbound{}).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ delete(tagToCentral, c.Tag)
|
|
|
|
|
+ structuralChange = true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
for _, snapIb := range snap.Inbounds {
|
|
for _, snapIb := range snap.Inbounds {
|
|
|
if snapIb == nil {
|
|
if snapIb == nil {
|
|
|
continue
|
|
continue
|
|
@@ -1667,52 +1689,105 @@ func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnaps
|
|
|
if !ok {
|
|
if !ok {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
- // Honour the same grace window for client rows: if the parent
|
|
|
|
|
- // inbound was just reset, leave its clients alone too.
|
|
|
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
|
|
|
|
+
|
|
|
|
|
+ snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
|
|
|
for _, cs := range snapIb.ClientStats {
|
|
for _, cs := range snapIb.ClientStats {
|
|
|
- snapAllTime := cs.AllTime
|
|
|
|
|
- if snapAllTime == 0 {
|
|
|
|
|
- snapAllTime = cs.Up + cs.Down
|
|
|
|
|
- }
|
|
|
|
|
- if inGrace {
|
|
|
|
|
- // Skip client rows whose snapshot would push counters
|
|
|
|
|
- // back up; allow rows that are zero on the node side
|
|
|
|
|
- // (those are normal — node was reset alongside central).
|
|
|
|
|
- if snapAllTime > 0 {
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ snapEmails[cs.Email] = struct{}{}
|
|
|
|
|
+
|
|
|
|
|
+ existing := centralCS[csKey{c.Id, cs.Email}]
|
|
|
|
|
+ if existing == nil {
|
|
|
|
|
+ if err := tx.Create(&xray.ClientTraffic{
|
|
|
|
|
+ InboundId: c.Id,
|
|
|
|
|
+ Email: cs.Email,
|
|
|
|
|
+ Enable: cs.Enable,
|
|
|
|
|
+ Total: cs.Total,
|
|
|
|
|
+ ExpiryTime: cs.ExpiryTime,
|
|
|
|
|
+ Reset: cs.Reset,
|
|
|
|
|
+ Up: cs.Up,
|
|
|
|
|
+ Down: cs.Down,
|
|
|
|
|
+ AllTime: cs.AllTime,
|
|
|
|
|
+ LastOnline: cs.LastOnline,
|
|
|
|
|
+ }).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ structuralChange = true
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if existing.Enable != cs.Enable ||
|
|
|
|
|
+ existing.Total != cs.Total ||
|
|
|
|
|
+ existing.ExpiryTime != cs.ExpiryTime ||
|
|
|
|
|
+ existing.Reset != cs.Reset {
|
|
|
|
|
+ structuralChange = true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ allTime := existing.AllTime
|
|
|
|
|
+ if cs.AllTime > allTime {
|
|
|
|
|
+ allTime = cs.AllTime
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if inGrace && cs.Up+cs.Down > 0 {
|
|
|
|
|
+ if err := tx.Exec(
|
|
|
|
|
+ `UPDATE client_traffics
|
|
|
|
|
+ SET enable = ?, total = ?, expiry_time = ?, reset = ?, all_time = ?
|
|
|
|
|
+ WHERE inbound_id = ? AND email = ?`,
|
|
|
|
|
+ cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, c.Id, cs.Email,
|
|
|
|
|
+ ).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
}
|
|
}
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
- // MAX(last_online, ?) so a momentary clock skew on the node
|
|
|
|
|
- // can't regress the central row's last-seen timestamp.
|
|
|
|
|
|
|
+
|
|
|
if err := tx.Exec(
|
|
if err := tx.Exec(
|
|
|
`UPDATE client_traffics
|
|
`UPDATE client_traffics
|
|
|
- SET up = ?, down = ?, all_time = ?, last_online = MAX(last_online, ?)
|
|
|
|
|
|
|
+ SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
|
|
|
|
|
+ all_time = ?, last_online = MAX(last_online, ?)
|
|
|
WHERE inbound_id = ? AND email = ?`,
|
|
WHERE inbound_id = ? AND email = ?`,
|
|
|
- cs.Up, cs.Down, snapAllTime, cs.LastOnline, c.Id, cs.Email,
|
|
|
|
|
|
|
+ cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime,
|
|
|
|
|
+ cs.LastOnline, c.Id, cs.Email,
|
|
|
).Error; err != nil {
|
|
).Error; err != nil {
|
|
|
- return err
|
|
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for k, existing := range centralCS {
|
|
|
|
|
+ if k.inboundID != c.Id {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, kept := snapEmails[k.email]; kept {
|
|
|
|
|
+ continue
|
|
|
}
|
|
}
|
|
|
|
|
+ if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
|
|
|
|
|
+ Delete(&xray.ClientTraffic{}).Error; err != nil {
|
|
|
|
|
+ return false, err
|
|
|
|
|
+ }
|
|
|
|
|
+ structuralChange = true
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if err := tx.Commit().Error; err != nil {
|
|
if err := tx.Commit().Error; err != nil {
|
|
|
- return err
|
|
|
|
|
|
|
+ return false, err
|
|
|
}
|
|
}
|
|
|
committed = true
|
|
committed = true
|
|
|
|
|
|
|
|
- // Push the node's online-clients contribution into xray.Process so
|
|
|
|
|
- // GetOnlineClients returns the union of local + every node. Empty
|
|
|
|
|
- // list still calls Set so a node that just had everyone disconnect
|
|
|
|
|
- // updates promptly.
|
|
|
|
|
if p != nil {
|
|
if p != nil {
|
|
|
p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
|
|
p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return nil
|
|
|
|
|
|
|
+ return structuralChange, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
|
|
|
|
|
+ err = submitTrafficWrite(func() error {
|
|
|
|
|
+ var inner error
|
|
|
|
|
+ needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
|
|
|
|
|
+ return inner
|
|
|
|
|
+ })
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
|
|
|
|
|
|
|
+func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
|
|
|
var err error
|
|
var err error
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
tx := db.Begin()
|
|
tx := db.Begin()
|
|
@@ -1767,7 +1842,7 @@ func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic
|
|
|
|
|
|
|
|
for _, traffic := range traffics {
|
|
for _, traffic := range traffics {
|
|
|
if traffic.IsInbound {
|
|
if traffic.IsInbound {
|
|
|
- err = tx.Model(&model.Inbound{}).Where("tag = ?", traffic.Tag).
|
|
|
|
|
|
|
+ err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
|
|
|
Updates(map[string]any{
|
|
Updates(map[string]any{
|
|
|
"up": gorm.Expr("up + ?", traffic.Up),
|
|
"up": gorm.Expr("up + ?", traffic.Up),
|
|
|
"down": gorm.Expr("down + ?", traffic.Down),
|
|
"down": gorm.Expr("down + ?", traffic.Down),
|
|
@@ -1797,7 +1872,10 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
|
|
|
emails = append(emails, traffic.Email)
|
|
emails = append(emails, traffic.Email)
|
|
|
}
|
|
}
|
|
|
dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
|
|
dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
|
|
|
- err = tx.Model(xray.ClientTraffic{}).Where("email IN (?)", emails).Find(&dbClientTraffics).Error
|
|
|
|
|
|
|
+ err = tx.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("email IN (?) AND inbound_id IN (?)", emails,
|
|
|
|
|
+ tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
|
|
|
|
+ Find(&dbClientTraffics).Error
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -1911,7 +1989,10 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
|
|
|
now := time.Now().Unix() * 1000
|
|
now := time.Now().Unix() * 1000
|
|
|
var err, err1 error
|
|
var err, err1 error
|
|
|
|
|
|
|
|
- err = tx.Model(xray.ClientTraffic{}).Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).Find(&traffics).Error
|
|
|
|
|
|
|
+ err = tx.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
|
|
|
|
|
+ Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
|
|
|
|
+ Find(&traffics).Error
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, 0, err
|
|
return false, 0, err
|
|
|
}
|
|
}
|
|
@@ -2017,7 +2098,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
|
|
|
var tags []string
|
|
var tags []string
|
|
|
err := tx.Table("inbounds").
|
|
err := tx.Table("inbounds").
|
|
|
Select("inbounds.tag").
|
|
Select("inbounds.tag").
|
|
|
- Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
|
|
|
|
|
|
|
+ Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
|
|
|
Scan(&tags).Error
|
|
Scan(&tags).Error
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, 0, err
|
|
return false, 0, err
|
|
@@ -2036,7 +2117,7 @@ func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
result := tx.Model(model.Inbound{}).
|
|
result := tx.Model(model.Inbound{}).
|
|
|
- Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
|
|
|
|
|
|
|
+ Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
|
|
|
Update("enable", false)
|
|
Update("enable", false)
|
|
|
err := result.Error
|
|
err := result.Error
|
|
|
count := result.RowsAffected
|
|
count := result.RowsAffected
|
|
@@ -2050,6 +2131,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
|
|
|
var depletedRows []xray.ClientTraffic
|
|
var depletedRows []xray.ClientTraffic
|
|
|
err := tx.Model(xray.ClientTraffic{}).
|
|
err := tx.Model(xray.ClientTraffic{}).
|
|
|
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
|
|
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
|
|
|
|
|
+ Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
|
|
Find(&depletedRows).Error
|
|
Find(&depletedRows).Error
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return false, 0, err
|
|
return false, 0, err
|
|
@@ -2152,6 +2234,7 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
|
|
|
|
|
|
|
|
result := tx.Model(xray.ClientTraffic{}).
|
|
result := tx.Model(xray.ClientTraffic{}).
|
|
|
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
|
|
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
|
|
|
|
|
+ Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
|
|
Update("enable", false)
|
|
Update("enable", false)
|
|
|
err = result.Error
|
|
err = result.Error
|
|
|
count := result.RowsAffected
|
|
count := result.RowsAffected
|
|
@@ -2163,8 +2246,6 @@ func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error)
|
|
|
return needRestart, count, nil
|
|
return needRestart, count, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Mirror enable=false + the row's authoritative quota/expiry into every
|
|
|
|
|
- // (inbound, email) we just removed via the API.
|
|
|
|
|
inboundEmailMap := make(map[int]map[string]struct{})
|
|
inboundEmailMap := make(map[int]map[string]struct{})
|
|
|
for _, t := range targets {
|
|
for _, t := range targets {
|
|
|
if inboundEmailMap[t.InboundId] == nil {
|
|
if inboundEmailMap[t.InboundId] == nil {
|
|
@@ -2744,22 +2825,24 @@ func (s *InboundService) ResetClientTrafficLimitByEmail(clientEmail string, tota
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
|
|
func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
|
|
|
- db := database.GetDB()
|
|
|
|
|
-
|
|
|
|
|
- // Reset traffic stats in ClientTraffic table
|
|
|
|
|
- result := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
- Where("email = ?", clientEmail).
|
|
|
|
|
- Updates(map[string]any{"enable": true, "up": 0, "down": 0})
|
|
|
|
|
-
|
|
|
|
|
- err := result.Error
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ return submitTrafficWrite(func() error {
|
|
|
|
|
+ db := database.GetDB()
|
|
|
|
|
+ return db.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("email = ?", clientEmail).
|
|
|
|
|
+ Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- return nil
|
|
|
|
|
|
|
+func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
|
|
|
|
|
+ err = submitTrafficWrite(func() error {
|
|
|
|
|
+ var inner error
|
|
|
|
|
+ needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
|
|
|
|
|
+ return inner
|
|
|
|
|
+ })
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, error) {
|
|
|
|
|
|
|
+func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
|
|
|
needRestart := false
|
|
needRestart := false
|
|
|
|
|
|
|
|
traffic, err := s.GetClientTrafficByEmail(clientEmail)
|
|
traffic, err := s.GetClientTrafficByEmail(clientEmail)
|
|
@@ -2825,18 +2908,11 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e
|
|
|
return false, err
|
|
return false, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Stamp last_traffic_reset_time on the parent inbound so the next
|
|
|
|
|
- // NodeTrafficSyncJob tick honours the grace window and doesn't pull
|
|
|
|
|
- // the pre-reset absolute back from the node.
|
|
|
|
|
now := time.Now().UnixMilli()
|
|
now := time.Now().UnixMilli()
|
|
|
_ = db.Model(model.Inbound{}).
|
|
_ = db.Model(model.Inbound{}).
|
|
|
Where("id = ?", id).
|
|
Where("id = ?", id).
|
|
|
Update("last_traffic_reset_time", now).Error
|
|
Update("last_traffic_reset_time", now).Error
|
|
|
|
|
|
|
|
- // Propagate to the remote node if this inbound is node-managed.
|
|
|
|
|
- // Best-effort: an offline node shouldn't block a user-driven reset
|
|
|
|
|
- // — the central DB is already zeroed and the next successful sync
|
|
|
|
|
- // (within the grace window) will re-pull whatever the node has.
|
|
|
|
|
inbound, err := s.GetInbound(id)
|
|
inbound, err := s.GetInbound(id)
|
|
|
if err == nil && inbound != nil && inbound.NodeID != nil {
|
|
if err == nil && inbound != nil && inbound.NodeID != nil {
|
|
|
if rt, rterr := s.runtimeFor(inbound); rterr == nil {
|
|
if rt, rterr := s.runtimeFor(inbound); rterr == nil {
|
|
@@ -2852,6 +2928,12 @@ func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, e
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) ResetAllClientTraffics(id int) error {
|
|
func (s *InboundService) ResetAllClientTraffics(id int) error {
|
|
|
|
|
+ return submitTrafficWrite(func() error {
|
|
|
|
|
+ return s.resetAllClientTrafficsLocked(id)
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *InboundService) resetAllClientTrafficsLocked(id int) error {
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
now := time.Now().Unix() * 1000
|
|
now := time.Now().Unix() * 1000
|
|
|
|
|
|
|
@@ -2889,19 +2971,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Propagate to remote nodes after the central DB is settled. Single
|
|
|
|
|
- // inbound: one rt.ResetInboundClientTraffics call. id == -1 (all
|
|
|
|
|
- // inbounds across panel): walk every node-managed inbound and call
|
|
|
|
|
- // the per-inbound endpoint — there's no panel-wide endpoint that
|
|
|
|
|
- // only resets clients without zeroing inbound counters.
|
|
|
|
|
var inbounds []model.Inbound
|
|
var inbounds []model.Inbound
|
|
|
q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
|
|
q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
|
|
|
if id != -1 {
|
|
if id != -1 {
|
|
|
q = q.Where("id = ?", id)
|
|
q = q.Where("id = ?", id)
|
|
|
}
|
|
}
|
|
|
if err := q.Find(&inbounds).Error; err != nil {
|
|
if err := q.Find(&inbounds).Error; err != nil {
|
|
|
- // Failed to discover which inbounds to propagate to — central
|
|
|
|
|
- // DB is already correct, log and move on.
|
|
|
|
|
logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
|
|
logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -2920,6 +2995,12 @@ func (s *InboundService) ResetAllClientTraffics(id int) error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) ResetAllTraffics() error {
|
|
func (s *InboundService) ResetAllTraffics() error {
|
|
|
|
|
+ return submitTrafficWrite(func() error {
|
|
|
|
|
+ return s.resetAllTrafficsLocked()
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s *InboundService) resetAllTrafficsLocked() error {
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
now := time.Now().UnixMilli()
|
|
now := time.Now().UnixMilli()
|
|
|
|
|
|
|
@@ -2933,10 +3014,6 @@ func (s *InboundService) ResetAllTraffics() error {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Propagate to every node that has at least one inbound on this
|
|
|
|
|
- // panel. We can't blanket-call rt.ResetAllTraffics because that
|
|
|
|
|
- // would also zero traffic for inbounds the node hosts but the
|
|
|
|
|
- // central panel doesn't know about — instead reset per inbound.
|
|
|
|
|
var inbounds []model.Inbound
|
|
var inbounds []model.Inbound
|
|
|
if err := db.Model(model.Inbound{}).
|
|
if err := db.Model(model.Inbound{}).
|
|
|
Where("node_id IS NOT NULL").
|
|
Where("node_id IS NOT NULL").
|
|
@@ -2959,13 +3036,12 @@ func (s *InboundService) ResetAllTraffics() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) ResetInboundTraffic(id int) error {
|
|
func (s *InboundService) ResetInboundTraffic(id int) error {
|
|
|
- db := database.GetDB()
|
|
|
|
|
-
|
|
|
|
|
- result := db.Model(model.Inbound{}).
|
|
|
|
|
- Where("id = ?", id).
|
|
|
|
|
- Updates(map[string]any{"up": 0, "down": 0})
|
|
|
|
|
-
|
|
|
|
|
- return result.Error
|
|
|
|
|
|
|
+ return submitTrafficWrite(func() error {
|
|
|
|
|
+ db := database.GetDB()
|
|
|
|
|
+ return db.Model(model.Inbound{}).
|
|
|
|
|
+ Where("id = ?", id).
|
|
|
|
|
+ Updates(map[string]any{"up": 0, "down": 0}).Error
|
|
|
|
|
+ })
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) DelDepletedClients(id int) (err error) {
|
|
func (s *InboundService) DelDepletedClients(id int) (err error) {
|
|
@@ -3229,11 +3305,6 @@ func chunkInts(s []int, size int) [][]int {
|
|
|
return out
|
|
return out
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given
|
|
|
|
|
-// emails. Used by the WebSocket delta path to push per-client absolute
|
|
|
|
|
-// counters without re-serializing the full inbound list. The query is chunked
|
|
|
|
|
-// to stay under SQLite's bind-variable limit on very large active sets.
|
|
|
|
|
-// Empty input returns (nil, nil).
|
|
|
|
|
func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
|
|
func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
|
|
|
uniq := uniqueNonEmptyStrings(emails)
|
|
uniq := uniqueNonEmptyStrings(emails)
|
|
|
if len(uniq) == 0 {
|
|
if len(uniq) == 0 {
|
|
@@ -3251,9 +3322,6 @@ func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.Clien
|
|
|
return traffics, nil
|
|
return traffics, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// InboundTrafficSummary is the minimal projection of an inbound's traffic
|
|
|
|
|
-// counters used by the WebSocket delta path. Excludes Settings/StreamSettings
|
|
|
|
|
-// blobs so the broadcast stays compact even with many inbounds.
|
|
|
|
|
type InboundTrafficSummary struct {
|
|
type InboundTrafficSummary struct {
|
|
|
Id int `json:"id"`
|
|
Id int `json:"id"`
|
|
|
Up int64 `json:"up"`
|
|
Up int64 `json:"up"`
|
|
@@ -3263,9 +3331,6 @@ type InboundTrafficSummary struct {
|
|
|
Enable bool `json:"enable"`
|
|
Enable bool `json:"enable"`
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// GetInboundsTrafficSummary returns inbound-level absolute traffic counters
|
|
|
|
|
-// (no per-client expansion). Companion to GetActiveClientTraffics — together
|
|
|
|
|
-// they replace the heavy "full inbound list" broadcast on each cron tick.
|
|
|
|
|
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
|
|
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
|
|
|
db := database.GetDB()
|
|
db := database.GetDB()
|
|
|
var summaries []InboundTrafficSummary
|
|
var summaries []InboundTrafficSummary
|
|
@@ -3293,26 +3358,20 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
|
|
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
|
|
|
- db := database.GetDB()
|
|
|
|
|
-
|
|
|
|
|
- // Keep all_time monotonic: it represents historical cumulative usage and
|
|
|
|
|
- // must never be less than the currently-tracked up+down. Without this,
|
|
|
|
|
- // the UI showed "Общий трафик" (allTime) below the live consumed value
|
|
|
|
|
- // after admins manually edited a client's counters.
|
|
|
|
|
- result := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
- Where("email = ?", email).
|
|
|
|
|
- Updates(map[string]any{
|
|
|
|
|
- "up": upload,
|
|
|
|
|
- "down": download,
|
|
|
|
|
- "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- err := result.Error
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
|
|
|
|
|
|
|
+ return submitTrafficWrite(func() error {
|
|
|
|
|
+ db := database.GetDB()
|
|
|
|
|
+ err := db.Model(xray.ClientTraffic{}).
|
|
|
|
|
+ Where("email = ?", email).
|
|
|
|
|
+ Updates(map[string]any{
|
|
|
|
|
+ "up": upload,
|
|
|
|
|
+ "down": download,
|
|
|
|
|
+ "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
|
|
|
|
|
+ }).Error
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
|
|
|
|
|
+ }
|
|
|
return err
|
|
return err
|
|
|
- }
|
|
|
|
|
- return nil
|
|
|
|
|
|
|
+ })
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) {
|
|
func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) {
|
|
@@ -3642,18 +3701,12 @@ func (s *InboundService) GetOnlineClients() []string {
|
|
|
return p.GetOnlineClients()
|
|
return p.GetOnlineClients()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// SetNodeOnlineClients records a remote node's online-clients list on
|
|
|
|
|
-// the panel-wide xray.Process so GetOnlineClients returns the union of
|
|
|
|
|
-// local + every node's contribution. Called by NodeTrafficSyncJob.
|
|
|
|
|
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
|
|
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
|
|
|
if p != nil {
|
|
if p != nil {
|
|
|
p.SetNodeOnlineClients(nodeID, emails)
|
|
p.SetNodeOnlineClients(nodeID, emails)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// ClearNodeOnlineClients drops one node's contribution to the online
|
|
|
|
|
-// set. Used when the per-node sync probe fails so a downed node
|
|
|
|
|
-// doesn't keep its clients listed as online forever.
|
|
|
|
|
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
|
|
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
|
|
|
if p != nil {
|
|
if p != nil {
|
|
|
p.ClearNodeOnlineClients(nodeID)
|
|
p.ClearNodeOnlineClients(nodeID)
|