1
0

outbound_subscription.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/mhsanaei/3x-ui/v3/database"
  13. "github.com/mhsanaei/3x-ui/v3/database/model"
  14. "github.com/mhsanaei/3x-ui/v3/logger"
  15. "github.com/mhsanaei/3x-ui/v3/util/common"
  16. "github.com/mhsanaei/3x-ui/v3/util/link"
  17. )
  18. // OutboundSubscriptionService manages remote outbound subscriptions.
  19. type OutboundSubscriptionService struct {
  20. settingService SettingService
  21. }
  22. // NewOutboundSubscriptionService returns a service for managing outbound subscriptions.
  23. func NewOutboundSubscriptionService() *OutboundSubscriptionService {
  24. return &OutboundSubscriptionService{}
  25. }
  26. // List returns all subscriptions (newest first).
  27. func (s *OutboundSubscriptionService) List() ([]*model.OutboundSubscription, error) {
  28. db := database.GetDB()
  29. var subs []*model.OutboundSubscription
  30. if err := db.Model(&model.OutboundSubscription{}).Order("priority asc, id asc").Find(&subs).Error; err != nil {
  31. return nil, err
  32. }
  33. for _, sub := range subs {
  34. sub.OutboundCount = countOutbounds(sub.LastFetchedOutbounds)
  35. // Don't ship the heavy raw blobs to the list view.
  36. sub.LastFetchedOutbounds = ""
  37. sub.LinkIdentities = ""
  38. }
  39. return subs, nil
  40. }
  41. // countOutbounds returns the number of outbounds in a stored LastFetchedOutbounds
  42. // JSON array (0 for empty/invalid).
  43. func countOutbounds(raw string) int {
  44. if strings.TrimSpace(raw) == "" {
  45. return 0
  46. }
  47. var arr []any
  48. if json.Unmarshal([]byte(raw), &arr) != nil {
  49. return 0
  50. }
  51. return len(arr)
  52. }
  53. // Get returns a single subscription by id.
  54. func (s *OutboundSubscriptionService) Get(id int) (*model.OutboundSubscription, error) {
  55. db := database.GetDB()
  56. var sub model.OutboundSubscription
  57. if err := db.First(&sub, id).Error; err != nil {
  58. return nil, err
  59. }
  60. return &sub, nil
  61. }
  62. // Create persists a new subscription. It does not fetch immediately; the caller
  63. // can call Refresh on the returned id if desired.
  64. var defaultPrefixRe = regexp.MustCompile(`^sub(\d+)-$`)
  65. // defaultPrefixNumber returns the smallest positive integer N that is not already
  66. // in use as a "subN-" tag prefix among the given subscriptions. This is used to
  67. // auto-name a subscription's outbounds when the user leaves the prefix blank, so
  68. // deleting a subscription frees its number for reuse instead of letting the
  69. // number grow forever with the auto-increment DB id. A subscription with a blank
  70. // prefix reserves its own id (it falls back to id-based "sub<id>-" tags).
  71. func defaultPrefixNumber(subs []*model.OutboundSubscription, excludeId int) int {
  72. used := map[int]bool{}
  73. for _, sub := range subs {
  74. if sub.Id == excludeId {
  75. continue
  76. }
  77. if sub.TagPrefix == "" {
  78. used[sub.Id] = true
  79. continue
  80. }
  81. if m := defaultPrefixRe.FindStringSubmatch(sub.TagPrefix); m != nil {
  82. if n, err := strconv.Atoi(m[1]); err == nil {
  83. used[n] = true
  84. }
  85. }
  86. }
  87. n := 1
  88. for used[n] {
  89. n++
  90. }
  91. return n
  92. }
  93. // nextDefaultSubPrefix builds the default "subN-" prefix for a new/edited
  94. // subscription, picking the smallest free N (excludeId skips a subscription's
  95. // own current prefix when editing).
  96. func (s *OutboundSubscriptionService) nextDefaultSubPrefix(excludeId int) string {
  97. var subs []*model.OutboundSubscription
  98. _ = database.GetDB().Find(&subs).Error
  99. return fmt.Sprintf("sub%d-", defaultPrefixNumber(subs, excludeId))
  100. }
  101. func (s *OutboundSubscriptionService) Create(remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) (*model.OutboundSubscription, error) {
  102. cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
  103. if err != nil {
  104. return nil, common.NewError("invalid subscription URL:", err)
  105. }
  106. if cleanURL == "" {
  107. return nil, common.NewError("subscription URL is required")
  108. }
  109. if updateInterval <= 0 {
  110. updateInterval = 600
  111. }
  112. prefix := strings.TrimSpace(tagPrefix)
  113. if prefix == "" {
  114. prefix = s.nextDefaultSubPrefix(0)
  115. }
  116. // New subscriptions go to the end of the priority order.
  117. var count int64
  118. database.GetDB().Model(&model.OutboundSubscription{}).Count(&count)
  119. sub := &model.OutboundSubscription{
  120. Remark: strings.TrimSpace(remark),
  121. Url: cleanURL,
  122. Enabled: enabled,
  123. AllowPrivate: allowPrivate,
  124. Prepend: prepend,
  125. Priority: int(count),
  126. TagPrefix: prefix,
  127. UpdateInterval: updateInterval,
  128. }
  129. if err := database.GetDB().Create(sub).Error; err != nil {
  130. return nil, err
  131. }
  132. return sub, nil
  133. }
  134. // Update updates editable fields.
  135. func (s *OutboundSubscriptionService) Update(id int, remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) error {
  136. sub, err := s.Get(id)
  137. if err != nil {
  138. return err
  139. }
  140. cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
  141. if err != nil {
  142. return common.NewError("invalid subscription URL:", err)
  143. }
  144. if cleanURL == "" {
  145. return common.NewError("subscription URL is required")
  146. }
  147. if updateInterval <= 0 {
  148. updateInterval = 600
  149. }
  150. prefix := strings.TrimSpace(tagPrefix)
  151. if prefix == "" {
  152. prefix = s.nextDefaultSubPrefix(sub.Id)
  153. }
  154. sub.Remark = strings.TrimSpace(remark)
  155. sub.Url = cleanURL
  156. sub.Enabled = enabled
  157. sub.AllowPrivate = allowPrivate
  158. sub.Prepend = prepend
  159. sub.TagPrefix = prefix
  160. sub.UpdateInterval = updateInterval
  161. return database.GetDB().Save(sub).Error
  162. }
  163. // Delete removes a subscription.
  164. func (s *OutboundSubscriptionService) Delete(id int) error {
  165. return database.GetDB().Delete(&model.OutboundSubscription{}, id).Error
  166. }
  167. // GetLastOutbounds returns the last successfully fetched outbounds for a subscription
  168. // (as raw interface slice ready for JSON merge). Returns nil slice when none.
  169. func (s *OutboundSubscriptionService) GetLastOutbounds(id int) ([]any, error) {
  170. sub, err := s.Get(id)
  171. if err != nil {
  172. return nil, err
  173. }
  174. if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
  175. return nil, nil
  176. }
  177. var arr []any
  178. if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
  179. return nil, err
  180. }
  181. return arr, nil
  182. }
  183. // Refresh fetches the subscription URL, parses the links, assigns stable tags,
  184. // persists the results, and returns the generated outbounds.
  185. func (s *OutboundSubscriptionService) Refresh(id int) ([]any, error) {
  186. sub, err := s.Get(id)
  187. if err != nil {
  188. return nil, err
  189. }
  190. outbounds, err := s.fetchAndStore(sub)
  191. return outbounds, err
  192. }
  193. // RefreshAllEnabled fetches every enabled subscription whose due time has passed
  194. // (lastUpdated + updateInterval <= now). It returns the number of subscriptions
  195. // that were actually refreshed.
  196. func (s *OutboundSubscriptionService) RefreshAllEnabled() (int, error) {
  197. db := database.GetDB()
  198. var subs []*model.OutboundSubscription
  199. if err := db.Where("enabled = ?", true).Find(&subs).Error; err != nil {
  200. return 0, err
  201. }
  202. now := time.Now().Unix()
  203. refreshed := 0
  204. for _, sub := range subs {
  205. due := sub.LastUpdated + int64(sub.UpdateInterval)
  206. if sub.LastUpdated == 0 || due <= now {
  207. if _, err := s.fetchAndStore(sub); err != nil {
  208. logger.Warningf("outbound sub %d (%s) refresh failed: %v", sub.Id, sub.Remark, err)
  209. // continue with others
  210. } else {
  211. refreshed++
  212. }
  213. }
  214. }
  215. return refreshed, nil
  216. }
  217. // fetchAndStore does the actual network + parse + stability + persist work.
  218. func (s *OutboundSubscriptionService) fetchAndStore(sub *model.OutboundSubscription) ([]any, error) {
  219. // Re-sanitize on every fetch (handles legacy rows + defense in depth against
  220. // any direct DB tampering). Private targets are blocked unless this
  221. // subscription was explicitly created with AllowPrivate.
  222. cleanURL, err := SanitizePublicHTTPURL(sub.Url, sub.AllowPrivate)
  223. if err != nil {
  224. s.recordError(sub, err)
  225. return nil, err
  226. }
  227. if cleanURL == "" {
  228. return nil, common.NewError("subscription has no valid URL")
  229. }
  230. sub.Url = cleanURL // persist the cleaned version
  231. client := s.settingService.NewProxiedHTTPClient(30 * time.Second)
  232. // Re-validate every redirect hop: the initial host is checked above, but a
  233. // redirect could still point at a private/internal address (SSRF). Cap the
  234. // redirect chain as well.
  235. client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
  236. if len(via) >= 10 {
  237. return fmt.Errorf("stopped after 10 redirects")
  238. }
  239. if sub.AllowPrivate {
  240. return nil
  241. }
  242. ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
  243. defer cancel()
  244. return rejectPrivateHost(ctx, req.URL.Hostname())
  245. }
  246. req, err := http.NewRequest("GET", sub.Url, nil)
  247. if err != nil {
  248. s.recordError(sub, err)
  249. return nil, err
  250. }
  251. req.Header.Set("User-Agent", "3x-ui-outbound-sub/1.0")
  252. resp, err := client.Do(req)
  253. if err != nil {
  254. s.recordError(sub, err)
  255. return nil, err
  256. }
  257. defer resp.Body.Close()
  258. if resp.StatusCode != 200 {
  259. err := fmt.Errorf("http %d", resp.StatusCode)
  260. s.recordError(sub, err)
  261. return nil, err
  262. }
  263. body, err := io.ReadAll(resp.Body)
  264. if err != nil {
  265. s.recordError(sub, err)
  266. return nil, err
  267. }
  268. parsed, identities, err := link.ParseSubscriptionBody(body)
  269. if err != nil {
  270. s.recordError(sub, err)
  271. return nil, err
  272. }
  273. // Load previous identities -> tags for stability
  274. prev := map[string]string{}
  275. if strings.TrimSpace(sub.LinkIdentities) != "" {
  276. _ = json.Unmarshal([]byte(sub.LinkIdentities), &prev)
  277. }
  278. // Also load previous outbounds so we can reuse tags even for identities we
  279. // temporarily lost (defensive).
  280. prevTagByIndex := map[int]string{}
  281. if strings.TrimSpace(sub.LastFetchedOutbounds) != "" {
  282. var prevObs []any
  283. if json.Unmarshal([]byte(sub.LastFetchedOutbounds), &prevObs) == nil {
  284. for i, o := range prevObs {
  285. if m, ok := o.(map[string]any); ok {
  286. if tag, _ := m["tag"].(string); tag != "" {
  287. prevTagByIndex[i] = tag
  288. }
  289. }
  290. }
  291. }
  292. }
  293. // Assign tags with stability (identity reuse, positional fallback, then a
  294. // fresh allocation), keeping tags unique within this batch. Extracted into a
  295. // pure function so it can be unit-tested without network/DB. Tags are written
  296. // back into the parsed outbounds in place.
  297. assigned := assignStableTags(parsed, identities, prev, prevTagByIndex, sub.Id, sub.TagPrefix)
  298. // Persist identities for next time
  299. newIdent := map[string]string{}
  300. for i, id := range identities {
  301. newIdent[id] = assigned[i]
  302. }
  303. identJSON, _ := json.Marshal(newIdent)
  304. // Persist the outbounds (as compact JSON array)
  305. obsJSON, _ := json.Marshal(parsed)
  306. sub.LastFetchedOutbounds = string(obsJSON)
  307. sub.LinkIdentities = string(identJSON)
  308. sub.LastUpdated = time.Now().Unix()
  309. sub.LastError = ""
  310. if err := database.GetDB().Save(sub).Error; err != nil {
  311. return nil, err
  312. }
  313. // Return as []any for the config merger
  314. result := make([]any, len(parsed))
  315. for i := range parsed {
  316. result[i] = parsed[i]
  317. }
  318. return result, nil
  319. }
  320. func (s *OutboundSubscriptionService) recordError(sub *model.OutboundSubscription, err error) {
  321. sub.LastError = err.Error()
  322. _ = database.GetDB().Model(sub).Update("last_error", sub.LastError).Error
  323. }
  324. // assignStableTags assigns a tag to each parsed outbound, preferring stability:
  325. // 1. reuse the tag previously mapped to the link's identity (prev),
  326. // 2. else reuse the tag at the same position from the last fetch (prevTagByIndex),
  327. // 3. else allocate a fresh tag from the prefix + remark (link.SuggestTag).
  328. //
  329. // Tags are kept unique within the batch by appending "-N" on collision, and are
  330. // written back into parsed[i]["tag"]. The returned slice holds the assigned tags
  331. // in order. When tagPrefix is empty a "sub<subID>-" prefix is used for fresh tags.
  332. func assignStableTags(parsed []link.Outbound, identities []string, prev map[string]string, prevTagByIndex map[int]string, subID int, tagPrefix string) []string {
  333. used := map[string]bool{} // uniqueness within this refresh batch
  334. assigned := make([]string, len(parsed))
  335. for i := range parsed {
  336. id := ""
  337. if i < len(identities) {
  338. id = identities[i]
  339. }
  340. candidate := ""
  341. if old, ok := prev[id]; ok && old != "" {
  342. candidate = old
  343. }
  344. if candidate == "" {
  345. // try to reuse by rough positional match from previous fetch (best effort)
  346. if old, ok := prevTagByIndex[i]; ok && old != "" {
  347. candidate = old
  348. }
  349. }
  350. if candidate == "" {
  351. // fresh allocation
  352. prefix := tagPrefix
  353. if prefix == "" {
  354. prefix = fmt.Sprintf("sub%d-", subID)
  355. }
  356. remark := ""
  357. if m, ok := parsed[i]["tag"].(string); ok {
  358. remark = m
  359. }
  360. candidate = link.SuggestTag(prefix, remark, i)
  361. }
  362. // ensure local uniqueness inside this batch
  363. final := candidate
  364. for k := 1; used[final]; k++ {
  365. final = fmt.Sprintf("%s-%d", candidate, k)
  366. }
  367. used[final] = true
  368. assigned[i] = final
  369. // write back the tag into the outbound
  370. parsed[i]["tag"] = final
  371. }
  372. return assigned
  373. }
  374. // AllActiveOutbounds returns the concatenation of the last-fetched outbounds
  375. // for every enabled subscription. This is the set that should be merged into
  376. // the final Xray config. Order: subscription creation order (by id asc) so
  377. // that later subscriptions can shadow earlier ones if the admin uses colliding
  378. // prefixes (last writer wins inside xray, but we try to keep tags unique).
  379. func (s *OutboundSubscriptionService) AllActiveOutbounds() ([]any, error) {
  380. prepend, appendList, err := s.activeOutboundsSplit()
  381. if err != nil {
  382. return nil, err
  383. }
  384. return append(prepend, appendList...), nil
  385. }
  386. // activeOutboundsSplit returns the active subscription outbounds split into those
  387. // that should be placed BEFORE the manual template outbounds (Prepend) and those
  388. // placed AFTER. Within each group, subscriptions are ordered by Priority (then id)
  389. // so the admin can control the merged order.
  390. func (s *OutboundSubscriptionService) activeOutboundsSplit() (prepend []any, appendList []any, err error) {
  391. db := database.GetDB()
  392. var subs []*model.OutboundSubscription
  393. if err := db.Where("enabled = ?", true).Order("priority asc, id asc").Find(&subs).Error; err != nil {
  394. return nil, nil, err
  395. }
  396. for _, sub := range subs {
  397. if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
  398. continue
  399. }
  400. var arr []any
  401. if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
  402. logger.Warningf("outbound sub %d has corrupt LastFetchedOutbounds: %v", sub.Id, err)
  403. continue
  404. }
  405. if sub.Prepend {
  406. prepend = append(prepend, arr...)
  407. } else {
  408. appendList = append(appendList, arr...)
  409. }
  410. }
  411. return prepend, appendList, nil
  412. }
  413. // Move shifts a subscription one step up or down in the priority order and
  414. // re-normalizes all priorities to a 0..n-1 sequence.
  415. func (s *OutboundSubscriptionService) Move(id int, up bool) error {
  416. db := database.GetDB()
  417. var subs []*model.OutboundSubscription
  418. if err := db.Order("priority asc, id asc").Find(&subs).Error; err != nil {
  419. return err
  420. }
  421. idx := -1
  422. for i, sub := range subs {
  423. if sub.Id == id {
  424. idx = i
  425. break
  426. }
  427. }
  428. if idx == -1 {
  429. return common.NewError("subscription not found")
  430. }
  431. swap := idx + 1
  432. if up {
  433. swap = idx - 1
  434. }
  435. if swap < 0 || swap >= len(subs) {
  436. return nil // already at the edge
  437. }
  438. subs[idx], subs[swap] = subs[swap], subs[idx]
  439. for i, sub := range subs {
  440. if sub.Priority != i {
  441. if err := db.Model(sub).Update("priority", i).Error; err != nil {
  442. return err
  443. }
  444. }
  445. }
  446. return nil
  447. }
  448. // AllActiveOutboundTags returns only the tags of active subscription outbounds.
  449. // Useful for populating balancer / routing selectors without shipping full objects.
  450. func (s *OutboundSubscriptionService) AllActiveOutboundTags() ([]string, error) {
  451. obs, err := s.AllActiveOutbounds()
  452. if err != nil {
  453. return nil, err
  454. }
  455. tags := make([]string, 0, len(obs))
  456. for _, o := range obs {
  457. if m, ok := o.(map[string]any); ok {
  458. if t, _ := m["tag"].(string); t != "" {
  459. tags = append(tags, t)
  460. }
  461. }
  462. }
  463. return tags, nil
  464. }
  465. /*
  466. Tag stability strategy (important for balancers and routing rules)
  467. When a subscription is refreshed we try very hard to keep the *same* tag for the
  468. same logical outbound so that existing balancers and routing rules keep working.
  469. How we do it:
  470. - On every successful parse we compute a stable "identity" for each link
  471. (the core of the URI with the remark fragment removed, or for vmess the inner
  472. JSON without the "ps" field).
  473. - We persist a map identity -> tag in the LinkIdentities column.
  474. - On the next refresh, if we see the same identity again we reuse the previous tag,
  475. even if the remark changed or minor parameters moved.
  476. - Only when we have never seen the identity before do we allocate a fresh tag
  477. using the user-supplied TagPrefix + slug(remark) (or an index fallback).
  478. - Within one refresh we still deduplicate with -N suffixes.
  479. Consequences for balancers / routing:
  480. - If you use an *exact* tag in a balancer selector or a routing rule, that
  481. specific server will continue to be used after refreshes (as long as the
  482. provider still returns a link that produces the same identity).
  483. - If you use a *prefix/wildcard* selector (e.g. "hk-*", "sg-.*"), then any
  484. *new* servers that the subscription later returns will automatically be
  485. eligible for that balancer on the next Xray reload — this is the recommended
  486. way to "subscribe to a pool".
  487. - When a server disappears from the subscription, its tag simply stops
  488. existing in the final outbounds array. The balancer will have fewer
  489. candidates. If you configured a `fallbackTag` on the balancer, Xray will use
  490. it. Otherwise connections that would have used the missing member may fail
  491. or be routed by the next rule.
  492. - If the provider rotates credentials/UUIDs/hosts for a server, the identity
  493. changes → we treat it as a brand new outbound and give it a new tag. Any
  494. balancer/rule that referenced the *old* tag will no longer see it. This is
  495. an inherent limitation of subscription-based outbounds.
  496. We deliberately do *not* mutate the saved xrayTemplateConfig. Subscription
  497. outbounds are always injected at runtime in GetXrayConfig.
  498. */