| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/mhsanaei/3x-ui/v3/database"
- "github.com/mhsanaei/3x-ui/v3/database/model"
- "github.com/mhsanaei/3x-ui/v3/logger"
- "github.com/mhsanaei/3x-ui/v3/util/common"
- "github.com/mhsanaei/3x-ui/v3/util/link"
- )
- // OutboundSubscriptionService manages remote outbound subscriptions.
- type OutboundSubscriptionService struct {
- settingService SettingService
- }
- // NewOutboundSubscriptionService returns a service for managing outbound subscriptions.
- func NewOutboundSubscriptionService() *OutboundSubscriptionService {
- return &OutboundSubscriptionService{}
- }
- // List returns all subscriptions (newest first).
- func (s *OutboundSubscriptionService) List() ([]*model.OutboundSubscription, error) {
- db := database.GetDB()
- var subs []*model.OutboundSubscription
- if err := db.Model(&model.OutboundSubscription{}).Order("priority asc, id asc").Find(&subs).Error; err != nil {
- return nil, err
- }
- for _, sub := range subs {
- sub.OutboundCount = countOutbounds(sub.LastFetchedOutbounds)
- // Don't ship the heavy raw blobs to the list view.
- sub.LastFetchedOutbounds = ""
- sub.LinkIdentities = ""
- }
- return subs, nil
- }
- // countOutbounds returns the number of outbounds in a stored LastFetchedOutbounds
- // JSON array (0 for empty/invalid).
- func countOutbounds(raw string) int {
- if strings.TrimSpace(raw) == "" {
- return 0
- }
- var arr []any
- if json.Unmarshal([]byte(raw), &arr) != nil {
- return 0
- }
- return len(arr)
- }
- // Get returns a single subscription by id.
- func (s *OutboundSubscriptionService) Get(id int) (*model.OutboundSubscription, error) {
- db := database.GetDB()
- var sub model.OutboundSubscription
- if err := db.First(&sub, id).Error; err != nil {
- return nil, err
- }
- return &sub, nil
- }
- // Create persists a new subscription. It does not fetch immediately; the caller
- // can call Refresh on the returned id if desired.
- var defaultPrefixRe = regexp.MustCompile(`^sub(\d+)-$`)
- // defaultPrefixNumber returns the smallest positive integer N that is not already
- // in use as a "subN-" tag prefix among the given subscriptions. This is used to
- // auto-name a subscription's outbounds when the user leaves the prefix blank, so
- // deleting a subscription frees its number for reuse instead of letting the
- // number grow forever with the auto-increment DB id. A subscription with a blank
- // prefix reserves its own id (it falls back to id-based "sub<id>-" tags).
- func defaultPrefixNumber(subs []*model.OutboundSubscription, excludeId int) int {
- used := map[int]bool{}
- for _, sub := range subs {
- if sub.Id == excludeId {
- continue
- }
- if sub.TagPrefix == "" {
- used[sub.Id] = true
- continue
- }
- if m := defaultPrefixRe.FindStringSubmatch(sub.TagPrefix); m != nil {
- if n, err := strconv.Atoi(m[1]); err == nil {
- used[n] = true
- }
- }
- }
- n := 1
- for used[n] {
- n++
- }
- return n
- }
- // nextDefaultSubPrefix builds the default "subN-" prefix for a new/edited
- // subscription, picking the smallest free N (excludeId skips a subscription's
- // own current prefix when editing).
- func (s *OutboundSubscriptionService) nextDefaultSubPrefix(excludeId int) string {
- var subs []*model.OutboundSubscription
- _ = database.GetDB().Find(&subs).Error
- return fmt.Sprintf("sub%d-", defaultPrefixNumber(subs, excludeId))
- }
- func (s *OutboundSubscriptionService) Create(remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) (*model.OutboundSubscription, error) {
- cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
- if err != nil {
- return nil, common.NewError("invalid subscription URL:", err)
- }
- if cleanURL == "" {
- return nil, common.NewError("subscription URL is required")
- }
- if updateInterval <= 0 {
- updateInterval = 600
- }
- prefix := strings.TrimSpace(tagPrefix)
- if prefix == "" {
- prefix = s.nextDefaultSubPrefix(0)
- }
- // New subscriptions go to the end of the priority order.
- var count int64
- database.GetDB().Model(&model.OutboundSubscription{}).Count(&count)
- sub := &model.OutboundSubscription{
- Remark: strings.TrimSpace(remark),
- Url: cleanURL,
- Enabled: enabled,
- AllowPrivate: allowPrivate,
- Prepend: prepend,
- Priority: int(count),
- TagPrefix: prefix,
- UpdateInterval: updateInterval,
- }
- if err := database.GetDB().Create(sub).Error; err != nil {
- return nil, err
- }
- return sub, nil
- }
- // Update updates editable fields.
- func (s *OutboundSubscriptionService) Update(id int, remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) error {
- sub, err := s.Get(id)
- if err != nil {
- return err
- }
- cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
- if err != nil {
- return common.NewError("invalid subscription URL:", err)
- }
- if cleanURL == "" {
- return common.NewError("subscription URL is required")
- }
- if updateInterval <= 0 {
- updateInterval = 600
- }
- prefix := strings.TrimSpace(tagPrefix)
- if prefix == "" {
- prefix = s.nextDefaultSubPrefix(sub.Id)
- }
- sub.Remark = strings.TrimSpace(remark)
- sub.Url = cleanURL
- sub.Enabled = enabled
- sub.AllowPrivate = allowPrivate
- sub.Prepend = prepend
- sub.TagPrefix = prefix
- sub.UpdateInterval = updateInterval
- return database.GetDB().Save(sub).Error
- }
- // Delete removes a subscription.
- func (s *OutboundSubscriptionService) Delete(id int) error {
- return database.GetDB().Delete(&model.OutboundSubscription{}, id).Error
- }
- // GetLastOutbounds returns the last successfully fetched outbounds for a subscription
- // (as raw interface slice ready for JSON merge). Returns nil slice when none.
- func (s *OutboundSubscriptionService) GetLastOutbounds(id int) ([]any, error) {
- sub, err := s.Get(id)
- if err != nil {
- return nil, err
- }
- if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
- return nil, nil
- }
- var arr []any
- if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
- return nil, err
- }
- return arr, nil
- }
- // Refresh fetches the subscription URL, parses the links, assigns stable tags,
- // persists the results, and returns the generated outbounds.
- func (s *OutboundSubscriptionService) Refresh(id int) ([]any, error) {
- sub, err := s.Get(id)
- if err != nil {
- return nil, err
- }
- outbounds, err := s.fetchAndStore(sub)
- return outbounds, err
- }
- // RefreshAllEnabled fetches every enabled subscription whose due time has passed
- // (lastUpdated + updateInterval <= now). It returns the number of subscriptions
- // that were actually refreshed.
- func (s *OutboundSubscriptionService) RefreshAllEnabled() (int, error) {
- db := database.GetDB()
- var subs []*model.OutboundSubscription
- if err := db.Where("enabled = ?", true).Find(&subs).Error; err != nil {
- return 0, err
- }
- now := time.Now().Unix()
- refreshed := 0
- for _, sub := range subs {
- due := sub.LastUpdated + int64(sub.UpdateInterval)
- if sub.LastUpdated == 0 || due <= now {
- if _, err := s.fetchAndStore(sub); err != nil {
- logger.Warningf("outbound sub %d (%s) refresh failed: %v", sub.Id, sub.Remark, err)
- // continue with others
- } else {
- refreshed++
- }
- }
- }
- return refreshed, nil
- }
- // fetchAndStore does the actual network + parse + stability + persist work.
- func (s *OutboundSubscriptionService) fetchAndStore(sub *model.OutboundSubscription) ([]any, error) {
- // Re-sanitize on every fetch (handles legacy rows + defense in depth against
- // any direct DB tampering). Private targets are blocked unless this
- // subscription was explicitly created with AllowPrivate.
- cleanURL, err := SanitizePublicHTTPURL(sub.Url, sub.AllowPrivate)
- if err != nil {
- s.recordError(sub, err)
- return nil, err
- }
- if cleanURL == "" {
- return nil, common.NewError("subscription has no valid URL")
- }
- sub.Url = cleanURL // persist the cleaned version
- client := s.settingService.NewProxiedHTTPClient(30 * time.Second)
- // Re-validate every redirect hop: the initial host is checked above, but a
- // redirect could still point at a private/internal address (SSRF). Cap the
- // redirect chain as well.
- client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
- if len(via) >= 10 {
- return fmt.Errorf("stopped after 10 redirects")
- }
- if sub.AllowPrivate {
- return nil
- }
- ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
- defer cancel()
- return rejectPrivateHost(ctx, req.URL.Hostname())
- }
- req, err := http.NewRequest("GET", sub.Url, nil)
- if err != nil {
- s.recordError(sub, err)
- return nil, err
- }
- req.Header.Set("User-Agent", "3x-ui-outbound-sub/1.0")
- resp, err := client.Do(req)
- if err != nil {
- s.recordError(sub, err)
- return nil, err
- }
- defer resp.Body.Close()
- if resp.StatusCode != 200 {
- err := fmt.Errorf("http %d", resp.StatusCode)
- s.recordError(sub, err)
- return nil, err
- }
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- s.recordError(sub, err)
- return nil, err
- }
- parsed, identities, err := link.ParseSubscriptionBody(body)
- if err != nil {
- s.recordError(sub, err)
- return nil, err
- }
- // Load previous identities -> tags for stability
- prev := map[string]string{}
- if strings.TrimSpace(sub.LinkIdentities) != "" {
- _ = json.Unmarshal([]byte(sub.LinkIdentities), &prev)
- }
- // Also load previous outbounds so we can reuse tags even for identities we
- // temporarily lost (defensive).
- prevTagByIndex := map[int]string{}
- if strings.TrimSpace(sub.LastFetchedOutbounds) != "" {
- var prevObs []any
- if json.Unmarshal([]byte(sub.LastFetchedOutbounds), &prevObs) == nil {
- for i, o := range prevObs {
- if m, ok := o.(map[string]any); ok {
- if tag, _ := m["tag"].(string); tag != "" {
- prevTagByIndex[i] = tag
- }
- }
- }
- }
- }
- // Assign tags with stability (identity reuse, positional fallback, then a
- // fresh allocation), keeping tags unique within this batch. Extracted into a
- // pure function so it can be unit-tested without network/DB. Tags are written
- // back into the parsed outbounds in place.
- assigned := assignStableTags(parsed, identities, prev, prevTagByIndex, sub.Id, sub.TagPrefix)
- // Persist identities for next time
- newIdent := map[string]string{}
- for i, id := range identities {
- newIdent[id] = assigned[i]
- }
- identJSON, _ := json.Marshal(newIdent)
- // Persist the outbounds (as compact JSON array)
- obsJSON, _ := json.Marshal(parsed)
- sub.LastFetchedOutbounds = string(obsJSON)
- sub.LinkIdentities = string(identJSON)
- sub.LastUpdated = time.Now().Unix()
- sub.LastError = ""
- if err := database.GetDB().Save(sub).Error; err != nil {
- return nil, err
- }
- // Return as []any for the config merger
- result := make([]any, len(parsed))
- for i := range parsed {
- result[i] = parsed[i]
- }
- return result, nil
- }
- func (s *OutboundSubscriptionService) recordError(sub *model.OutboundSubscription, err error) {
- sub.LastError = err.Error()
- _ = database.GetDB().Model(sub).Update("last_error", sub.LastError).Error
- }
- // assignStableTags assigns a tag to each parsed outbound, preferring stability:
- // 1. reuse the tag previously mapped to the link's identity (prev),
- // 2. else reuse the tag at the same position from the last fetch (prevTagByIndex),
- // 3. else allocate a fresh tag from the prefix + remark (link.SuggestTag).
- //
- // Tags are kept unique within the batch by appending "-N" on collision, and are
- // written back into parsed[i]["tag"]. The returned slice holds the assigned tags
- // in order. When tagPrefix is empty a "sub<subID>-" prefix is used for fresh tags.
- func assignStableTags(parsed []link.Outbound, identities []string, prev map[string]string, prevTagByIndex map[int]string, subID int, tagPrefix string) []string {
- used := map[string]bool{} // uniqueness within this refresh batch
- assigned := make([]string, len(parsed))
- for i := range parsed {
- id := ""
- if i < len(identities) {
- id = identities[i]
- }
- candidate := ""
- if old, ok := prev[id]; ok && old != "" {
- candidate = old
- }
- if candidate == "" {
- // try to reuse by rough positional match from previous fetch (best effort)
- if old, ok := prevTagByIndex[i]; ok && old != "" {
- candidate = old
- }
- }
- if candidate == "" {
- // fresh allocation
- prefix := tagPrefix
- if prefix == "" {
- prefix = fmt.Sprintf("sub%d-", subID)
- }
- remark := ""
- if m, ok := parsed[i]["tag"].(string); ok {
- remark = m
- }
- candidate = link.SuggestTag(prefix, remark, i)
- }
- // ensure local uniqueness inside this batch
- final := candidate
- for k := 1; used[final]; k++ {
- final = fmt.Sprintf("%s-%d", candidate, k)
- }
- used[final] = true
- assigned[i] = final
- // write back the tag into the outbound
- parsed[i]["tag"] = final
- }
- return assigned
- }
- // AllActiveOutbounds returns the concatenation of the last-fetched outbounds
- // for every enabled subscription. This is the set that should be merged into
- // the final Xray config. Order: subscription creation order (by id asc) so
- // that later subscriptions can shadow earlier ones if the admin uses colliding
- // prefixes (last writer wins inside xray, but we try to keep tags unique).
- func (s *OutboundSubscriptionService) AllActiveOutbounds() ([]any, error) {
- prepend, appendList, err := s.activeOutboundsSplit()
- if err != nil {
- return nil, err
- }
- return append(prepend, appendList...), nil
- }
- // activeOutboundsSplit returns the active subscription outbounds split into those
- // that should be placed BEFORE the manual template outbounds (Prepend) and those
- // placed AFTER. Within each group, subscriptions are ordered by Priority (then id)
- // so the admin can control the merged order.
- func (s *OutboundSubscriptionService) activeOutboundsSplit() (prepend []any, appendList []any, err error) {
- db := database.GetDB()
- var subs []*model.OutboundSubscription
- if err := db.Where("enabled = ?", true).Order("priority asc, id asc").Find(&subs).Error; err != nil {
- return nil, nil, err
- }
- for _, sub := range subs {
- if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
- continue
- }
- var arr []any
- if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
- logger.Warningf("outbound sub %d has corrupt LastFetchedOutbounds: %v", sub.Id, err)
- continue
- }
- if sub.Prepend {
- prepend = append(prepend, arr...)
- } else {
- appendList = append(appendList, arr...)
- }
- }
- return prepend, appendList, nil
- }
- // Move shifts a subscription one step up or down in the priority order and
- // re-normalizes all priorities to a 0..n-1 sequence.
- func (s *OutboundSubscriptionService) Move(id int, up bool) error {
- db := database.GetDB()
- var subs []*model.OutboundSubscription
- if err := db.Order("priority asc, id asc").Find(&subs).Error; err != nil {
- return err
- }
- idx := -1
- for i, sub := range subs {
- if sub.Id == id {
- idx = i
- break
- }
- }
- if idx == -1 {
- return common.NewError("subscription not found")
- }
- swap := idx + 1
- if up {
- swap = idx - 1
- }
- if swap < 0 || swap >= len(subs) {
- return nil // already at the edge
- }
- subs[idx], subs[swap] = subs[swap], subs[idx]
- for i, sub := range subs {
- if sub.Priority != i {
- if err := db.Model(sub).Update("priority", i).Error; err != nil {
- return err
- }
- }
- }
- return nil
- }
- // AllActiveOutboundTags returns only the tags of active subscription outbounds.
- // Useful for populating balancer / routing selectors without shipping full objects.
- func (s *OutboundSubscriptionService) AllActiveOutboundTags() ([]string, error) {
- obs, err := s.AllActiveOutbounds()
- if err != nil {
- return nil, err
- }
- tags := make([]string, 0, len(obs))
- for _, o := range obs {
- if m, ok := o.(map[string]any); ok {
- if t, _ := m["tag"].(string); t != "" {
- tags = append(tags, t)
- }
- }
- }
- return tags, nil
- }
- /*
- Tag stability strategy (important for balancers and routing rules)
- When a subscription is refreshed we try very hard to keep the *same* tag for the
- same logical outbound so that existing balancers and routing rules keep working.
- How we do it:
- - On every successful parse we compute a stable "identity" for each link
- (the core of the URI with the remark fragment removed, or for vmess the inner
- JSON without the "ps" field).
- - We persist a map identity -> tag in the LinkIdentities column.
- - On the next refresh, if we see the same identity again we reuse the previous tag,
- even if the remark changed or minor parameters moved.
- - Only when we have never seen the identity before do we allocate a fresh tag
- using the user-supplied TagPrefix + slug(remark) (or an index fallback).
- - Within one refresh we still deduplicate with -N suffixes.
- Consequences for balancers / routing:
- - If you use an *exact* tag in a balancer selector or a routing rule, that
- specific server will continue to be used after refreshes (as long as the
- provider still returns a link that produces the same identity).
- - If you use a *prefix/wildcard* selector (e.g. "hk-*", "sg-.*"), then any
- *new* servers that the subscription later returns will automatically be
- eligible for that balancer on the next Xray reload — this is the recommended
- way to "subscribe to a pool".
- - When a server disappears from the subscription, its tag simply stops
- existing in the final outbounds array. The balancer will have fewer
- candidates. If you configured a `fallbackTag` on the balancer, Xray will use
- it. Otherwise connections that would have used the missing member may fail
- or be routed by the next rule.
- - If the provider rotates credentials/UUIDs/hosts for a server, the identity
- changes → we treat it as a brand new outbound and give it a new tag. Any
- balancer/rule that referenced the *old* tag will no longer see it. This is
- an inherent limitation of subscription-based outbounds.
- We deliberately do *not* mutate the saved xrayTemplateConfig. Subscription
- outbounds are always injected at runtime in GetXrayConfig.
- */
|