|
|
@@ -1877,71 +1877,101 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
|
|
|
}
|
|
|
|
|
|
func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
|
|
|
- inboundIds := make([]int, 0, len(dbClientTraffics))
|
|
|
- for _, dbClientTraffic := range dbClientTraffics {
|
|
|
- if dbClientTraffic.ExpiryTime < 0 {
|
|
|
- inboundIds = append(inboundIds, dbClientTraffic.InboundId)
|
|
|
+ now := time.Now().UnixMilli()
|
|
|
+
|
|
|
+ // "Start After First Use" stores a negative expiry (the duration). On the
|
|
|
+ // first traffic tick it becomes an absolute deadline of now+duration. Compute
|
|
|
+ // it once per email so every inbound the client is attached to lands on the
|
|
|
+ // same value (recomputing per inbound would skip all but the first one).
|
|
|
+ newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
|
|
|
+ for traffic_index := range dbClientTraffics {
|
|
|
+ if dbClientTraffics[traffic_index].ExpiryTime < 0 {
|
|
|
+ newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
|
|
|
}
|
|
|
}
|
|
|
+ if len(newExpiryByEmail) == 0 {
|
|
|
+ return dbClientTraffics, nil
|
|
|
+ }
|
|
|
|
|
|
- if len(inboundIds) > 0 {
|
|
|
- var inbounds []*model.Inbound
|
|
|
- err := tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- for inbound_index := range inbounds {
|
|
|
- settings := map[string]any{}
|
|
|
- json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
|
|
|
- clients, ok := settings["clients"].([]any)
|
|
|
- if ok {
|
|
|
- var newClients []any
|
|
|
- for client_index := range clients {
|
|
|
- c := clients[client_index].(map[string]any)
|
|
|
- for traffic_index := range dbClientTraffics {
|
|
|
- if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email {
|
|
|
- oldExpiryTime := c["expiryTime"].(float64)
|
|
|
- newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime)
|
|
|
- c["expiryTime"] = newExpiryTime
|
|
|
- c["updated_at"] = time.Now().Unix() * 1000
|
|
|
- dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if _, ok := c["created_at"]; !ok {
|
|
|
- c["created_at"] = time.Now().Unix() * 1000
|
|
|
- }
|
|
|
- if _, ok := c["updated_at"]; !ok {
|
|
|
- c["updated_at"] = time.Now().Unix() * 1000
|
|
|
- }
|
|
|
- newClients = append(newClients, any(c))
|
|
|
- }
|
|
|
- settings["clients"] = newClients
|
|
|
- modifiedSettings, err := json.MarshalIndent(settings, "", " ")
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+ delayedEmails := make([]string, 0, len(newExpiryByEmail))
|
|
|
+ for email := range newExpiryByEmail {
|
|
|
+ delayedEmails = append(delayedEmails, email)
|
|
|
+ }
|
|
|
|
|
|
- inbounds[inbound_index].Settings = string(modifiedSettings)
|
|
|
- }
|
|
|
- }
|
|
|
- err = tx.Save(inbounds).Error
|
|
|
- if err != nil {
|
|
|
- logger.Warning("AddClientTraffic update inbounds ", err)
|
|
|
- logger.Error(inbounds)
|
|
|
- } else {
|
|
|
- for _, ib := range inbounds {
|
|
|
- if ib == nil {
|
|
|
- continue
|
|
|
+ // Resolve the owning inbounds through the client_inbounds link, which is
|
|
|
+ // authoritative. client_traffics.inbound_id goes stale when an inbound is
|
|
|
+ // deleted and recreated, which would leave the negative expiry unconverted.
|
|
|
+ var inboundIds []int
|
|
|
+ err := tx.Table("client_inbounds").
|
|
|
+ Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
+ Where("clients.email IN (?)", delayedEmails).
|
|
|
+ Distinct().
|
|
|
+ Pluck("client_inbounds.inbound_id", &inboundIds).Error
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if len(inboundIds) == 0 {
|
|
|
+ return dbClientTraffics, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var inbounds []*model.Inbound
|
|
|
+ err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ for inbound_index := range inbounds {
|
|
|
+ settings := map[string]any{}
|
|
|
+ json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
|
|
|
+ clients, ok := settings["clients"].([]any)
|
|
|
+ if ok {
|
|
|
+ var newClients []any
|
|
|
+ for client_index := range clients {
|
|
|
+ c := clients[client_index].(map[string]any)
|
|
|
+ email, _ := c["email"].(string)
|
|
|
+ if newExpiry, ok := newExpiryByEmail[email]; ok {
|
|
|
+ c["expiryTime"] = newExpiry
|
|
|
+ c["updated_at"] = now
|
|
|
}
|
|
|
- cs, gcErr := s.GetClients(ib)
|
|
|
- if gcErr != nil {
|
|
|
- logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
|
|
|
- continue
|
|
|
+ if _, ok := c["created_at"]; !ok {
|
|
|
+ c["created_at"] = now
|
|
|
}
|
|
|
- if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
|
|
|
- logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
|
|
|
+ if _, ok := c["updated_at"]; !ok {
|
|
|
+ c["updated_at"] = now
|
|
|
}
|
|
|
+ newClients = append(newClients, any(c))
|
|
|
+ }
|
|
|
+ settings["clients"] = newClients
|
|
|
+ modifiedSettings, err := json.MarshalIndent(settings, "", " ")
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ inbounds[inbound_index].Settings = string(modifiedSettings)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for traffic_index := range dbClientTraffics {
|
|
|
+ if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
|
|
|
+ dbClientTraffics[traffic_index].ExpiryTime = newExpiry
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ err = tx.Save(inbounds).Error
|
|
|
+ if err != nil {
|
|
|
+ logger.Warning("AddClientTraffic update inbounds ", err)
|
|
|
+ logger.Error(inbounds)
|
|
|
+ } else {
|
|
|
+ for _, ib := range inbounds {
|
|
|
+ if ib == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ cs, gcErr := s.GetClients(ib)
|
|
|
+ if gcErr != nil {
|
|
|
+ logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
|
|
|
+ logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -1976,8 +2006,23 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
|
|
|
client map[string]any
|
|
|
}
|
|
|
|
|
|
+ // Resolve the inbounds to renew through the client_inbounds link rather than
|
|
|
+ // client_traffics.inbound_id, which goes stale after an inbound is deleted and
|
|
|
+ // recreated and would otherwise skip the renew entirely.
|
|
|
+ renewEmails := make([]string, 0, len(traffics))
|
|
|
for _, traffic := range traffics {
|
|
|
- inbound_ids = append(inbound_ids, traffic.InboundId)
|
|
|
+ renewEmails = append(renewEmails, traffic.Email)
|
|
|
+ }
|
|
|
+ for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
|
|
|
+ var ids []int
|
|
|
+ if err = tx.Table("client_inbounds").
|
|
|
+ Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
|
|
+ Where("clients.email IN ?", batch).
|
|
|
+ Distinct().
|
|
|
+ Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
|
|
|
+ return false, 0, err
|
|
|
+ }
|
|
|
+ inbound_ids = append(inbound_ids, ids...)
|
|
|
}
|
|
|
// Dedupe so an inbound hosting N expired clients is fetched and saved once
|
|
|
// per tick instead of N times across chunk boundaries.
|
|
|
@@ -2401,11 +2446,24 @@ func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xr
|
|
|
logger.Warningf("Error retrieving ClientTraffic with trafficId %d: %v", trafficId, err)
|
|
|
return nil, nil, err
|
|
|
}
|
|
|
- if len(traffics) > 0 {
|
|
|
- inbound, err = s.GetInbound(traffics[0].InboundId)
|
|
|
- return traffics[0], inbound, err
|
|
|
+ if len(traffics) == 0 {
|
|
|
+ return nil, nil, nil
|
|
|
+ }
|
|
|
+ traffic = traffics[0]
|
|
|
+
|
|
|
+ inbound, err = s.GetInbound(traffic.InboundId)
|
|
|
+ if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
+ // client_traffics.inbound_id goes stale when an inbound is deleted and
|
|
|
+ // recreated; fall back to the authoritative client_inbounds link by email.
|
|
|
+ ids, idErr := s.clientService.GetInboundIdsForEmail(db, traffic.Email)
|
|
|
+ if idErr != nil {
|
|
|
+ return traffic, nil, idErr
|
|
|
+ }
|
|
|
+ if len(ids) > 0 {
|
|
|
+ inbound, err = s.GetInbound(ids[0])
|
|
|
+ }
|
|
|
}
|
|
|
- return nil, nil, nil
|
|
|
+ return traffic, inbound, err
|
|
|
}
|
|
|
|
|
|
func (s *InboundService) GetClientInboundByEmail(email string) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
|
|
|
@@ -2416,11 +2474,26 @@ func (s *InboundService) GetClientInboundByEmail(email string) (traffic *xray.Cl
|
|
|
logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
|
|
|
return nil, nil, err
|
|
|
}
|
|
|
- if len(traffics) > 0 {
|
|
|
- inbound, err = s.GetInbound(traffics[0].InboundId)
|
|
|
- return traffics[0], inbound, err
|
|
|
+ if len(traffics) == 0 {
|
|
|
+ return nil, nil, nil
|
|
|
+ }
|
|
|
+ traffic = traffics[0]
|
|
|
+
|
|
|
+ inbound, err = s.GetInbound(traffic.InboundId)
|
|
|
+ if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
+ // client_traffics.inbound_id is a legacy single-inbound pointer that goes
|
|
|
+ // stale when an inbound is deleted and recreated: the email-keyed traffic
|
|
|
+ // row survives but still references the missing inbound. Fall back to the
|
|
|
+ // authoritative client_inbounds link so email lookups (reset, info, …) work.
|
|
|
+ ids, idErr := s.clientService.GetInboundIdsForEmail(db, email)
|
|
|
+ if idErr != nil {
|
|
|
+ return traffic, nil, idErr
|
|
|
+ }
|
|
|
+ if len(ids) > 0 {
|
|
|
+ inbound, err = s.GetInbound(ids[0])
|
|
|
+ }
|
|
|
}
|
|
|
- return nil, nil, nil
|
|
|
+ return traffic, inbound, err
|
|
|
}
|
|
|
|
|
|
func (s *InboundService) GetClientByEmail(clientEmail string) (*xray.ClientTraffic, *model.Client, error) {
|