| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- package service
- import (
- "encoding/json"
- "sort"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/database"
- "github.com/mhsanaei/3x-ui/v3/internal/database/model"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
- )
- func (s *InboundService) GetAllInboundClientIps() ([]model.InboundClientIps, error) {
- db := database.GetDB()
- var ips []model.InboundClientIps
- err := db.Model(&model.InboundClientIps{}).Find(&ips).Error
- return ips, err
- }
- // clientIpStaleAfterSeconds mirrors job.ipStaleAfterSeconds: client IPs older than
- // 30 minutes are evicted. Applying the same cutoff inside the cross-node merge keeps
- // the synced blob bounded and stops the master's push-back from resurrecting IPs that
- // a node has already pruned (otherwise the merge defeats the eviction cluster-wide).
- const clientIpStaleAfterSeconds = int64(30 * 60)
- // clientIpEntry is the on-disk shape of each element of InboundClientIps.Ips. Tags
- // match job.IPWithTimestamp so the blob round-trips with the access.log scanner.
- type clientIpEntry struct {
- IP string `json:"ip"`
- Timestamp int64 `json:"timestamp"`
- }
- // mergeClientIpEntries unions old and incoming IP observations, dropping anything
- // older than cutoff, keeping the most recent timestamp per IP, and returning the
- // result sorted newest-first.
- func mergeClientIpEntries(old, incoming []clientIpEntry, cutoff int64) []clientIpEntry {
- ipMap := make(map[string]int64, len(old)+len(incoming))
- for _, e := range old {
- if e.Timestamp < cutoff {
- continue
- }
- ipMap[e.IP] = e.Timestamp
- }
- for _, e := range incoming {
- if e.Timestamp < cutoff {
- continue
- }
- if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
- ipMap[e.IP] = e.Timestamp
- }
- }
- out := make([]clientIpEntry, 0, len(ipMap))
- for ip, ts := range ipMap {
- out = append(out, clientIpEntry{IP: ip, Timestamp: ts})
- }
- sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
- return out
- }
- // MergeInboundClientIps folds client IPs synced from another node into the local
- // inbound_client_ips table without double-counting an IP seen on multiple nodes and
- // without resurrecting stale entries. Existing rows are updated in place; brand-new
- // clients (typically node-only clients with no local row) are created with a fresh
- // local id.
- func (s *InboundService) MergeInboundClientIps(incomingIps []model.InboundClientIps) error {
- db := database.GetDB()
- var currentIps []model.InboundClientIps
- if err := db.Model(&model.InboundClientIps{}).Find(¤tIps).Error; err != nil {
- return err
- }
- currentMap := make(map[string]*model.InboundClientIps, len(currentIps))
- for i := range currentIps {
- currentMap[currentIps[i].ClientEmail] = ¤tIps[i]
- }
- now := time.Now().Unix()
- cutoff := now - clientIpStaleAfterSeconds
- tx := db.Begin()
- defer func() {
- if r := recover(); r != nil {
- tx.Rollback()
- }
- }()
- for _, incoming := range incomingIps {
- if incoming.ClientEmail == "" || incoming.Ips == "" {
- continue
- }
- var incomingEntries []clientIpEntry
- _ = json.Unmarshal([]byte(incoming.Ips), &incomingEntries)
- current, exists := currentMap[incoming.ClientEmail]
- if !exists {
- // New client we've never seen locally. Drop stale entries up front and
- // skip the row entirely if nothing is fresh, so we don't persist a row
- // that is dead on arrival.
- fresh := mergeClientIpEntries(nil, incomingEntries, cutoff)
- if len(fresh) == 0 {
- continue
- }
- b, _ := json.Marshal(fresh)
- incoming.Ips = string(b)
- // Never carry the remote node's primary key into the local table: id
- // spaces are independent across nodes and the remote id would collide
- // with an unrelated local row. OnConflict guards the race where
- // check_client_ip_job creates the same brand-new email between the
- // snapshot above and this insert.
- incoming.Id = 0
- if err := tx.Clauses(clause.OnConflict{
- Columns: []clause.Column{{Name: "client_email"}},
- DoNothing: true,
- }).Create(&incoming).Error; err != nil {
- tx.Rollback()
- return err
- }
- continue
- }
- var oldEntries []clientIpEntry
- if current.Ips != "" {
- _ = json.Unmarshal([]byte(current.Ips), &oldEntries)
- }
- merged := mergeClientIpEntries(oldEntries, incomingEntries, cutoff)
- b, _ := json.Marshal(merged)
- mergedStr := string(b)
- // A concurrent check_client_ip_job db.Save on the same row can interleave
- // with this update (benign last-writer-wins; any dropped IP reappears on the
- // next scan/sync), so only write when the blob actually changed.
- if current.Ips != mergedStr {
- if err := tx.Model(&model.InboundClientIps{}).Where("id = ?", current.Id).Update("ips", mergedStr).Error; err != nil {
- tx.Rollback()
- return err
- }
- }
- }
- return tx.Commit().Error
- }
- func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail string) error {
- return tx.Model(model.InboundClientIps{}).Where("client_email = ?", oldEmail).Update("client_email", newEmail).Error
- }
- func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
- return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
- }
- func (s *InboundService) delClientIPsByEmails(tx *gorm.DB, emails []string) error {
- const chunk = 400
- for start := 0; start < len(emails); start += chunk {
- end := min(start+chunk, len(emails))
- if err := tx.Where("client_email IN ?", emails[start:end]).Delete(model.InboundClientIps{}).Error; err != nil {
- return err
- }
- }
- return nil
- }
- func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error) {
- db := database.GetDB()
- InboundClientIps := &model.InboundClientIps{}
- err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
- if err != nil {
- return "", err
- }
- if InboundClientIps.Ips == "" {
- return "", nil
- }
- // Try to parse as new format (with timestamps)
- type IPWithTimestamp struct {
- IP string `json:"ip"`
- Timestamp int64 `json:"timestamp"`
- }
- var ipsWithTime []IPWithTimestamp
- err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
- // If successfully parsed as new format, return with timestamps
- if err == nil && len(ipsWithTime) > 0 {
- return InboundClientIps.Ips, nil
- }
- // Otherwise, assume it's old format (simple string array)
- // Try to parse as simple array and convert to new format
- var oldIps []string
- err = json.Unmarshal([]byte(InboundClientIps.Ips), &oldIps)
- if err == nil && len(oldIps) > 0 {
- // Convert old format to new format with current timestamp
- newIpsWithTime := make([]IPWithTimestamp, len(oldIps))
- for i, ip := range oldIps {
- newIpsWithTime[i] = IPWithTimestamp{
- IP: ip,
- Timestamp: time.Now().Unix(),
- }
- }
- result, _ := json.Marshal(newIpsWithTime)
- return string(result), nil
- }
- // Return as-is if parsing fails
- return InboundClientIps.Ips, nil
- }
- func (s *InboundService) ClearClientIps(clientEmail string) error {
- db := database.GetDB()
- result := db.Model(model.InboundClientIps{}).
- Where("client_email = ?", clientEmail).
- Update("ips", "")
- err := result.Error
- if err != nil {
- return err
- }
- return nil
- }
|