1
0

outbound_subscription.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v3/internal/database"
  14. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  15. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  16. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  17. "github.com/mhsanaei/3x-ui/v3/internal/util/link"
  18. )
  19. // maxOutboundSubscriptionBytes caps a single outbound subscription response.
  20. // It is larger than the 2 MiB user-facing subscription cap because an outbound
  21. // subscription may aggregate many upstream outbounds into one document.
  22. const maxOutboundSubscriptionBytes int64 = 8 << 20
  23. var errOutboundSubscriptionBodyTooLarge = errors.New("outbound subscription response body exceeds size limit")
  24. func readBoundedOutboundSubscriptionBody(r io.Reader) ([]byte, error) {
  25. body, err := io.ReadAll(io.LimitReader(r, maxOutboundSubscriptionBytes+1))
  26. if err != nil {
  27. return nil, err
  28. }
  29. if int64(len(body)) > maxOutboundSubscriptionBytes {
  30. return nil, fmt.Errorf("%w (limit: %d bytes)", errOutboundSubscriptionBodyTooLarge, maxOutboundSubscriptionBytes)
  31. }
  32. return body, nil
  33. }
  34. // OutboundSubscriptionService manages remote outbound subscriptions.
  35. type OutboundSubscriptionService struct {
  36. settingService SettingService
  37. }
  38. // NewOutboundSubscriptionService returns a service for managing outbound subscriptions.
  39. func NewOutboundSubscriptionService() *OutboundSubscriptionService {
  40. return &OutboundSubscriptionService{}
  41. }
  42. // List returns all subscriptions (newest first).
  43. func (s *OutboundSubscriptionService) List() ([]*model.OutboundSubscription, error) {
  44. db := database.GetDB()
  45. var subs []*model.OutboundSubscription
  46. if err := db.Model(&model.OutboundSubscription{}).Order("priority asc, id asc").Find(&subs).Error; err != nil {
  47. return nil, err
  48. }
  49. for _, sub := range subs {
  50. sub.OutboundCount = countOutbounds(sub.LastFetchedOutbounds)
  51. // Don't ship the heavy raw blobs to the list view.
  52. sub.LastFetchedOutbounds = ""
  53. sub.LinkIdentities = ""
  54. }
  55. return subs, nil
  56. }
  57. // countOutbounds returns the number of outbounds in a stored LastFetchedOutbounds
  58. // JSON array (0 for empty/invalid).
  59. func countOutbounds(raw string) int {
  60. if strings.TrimSpace(raw) == "" {
  61. return 0
  62. }
  63. var arr []any
  64. if json.Unmarshal([]byte(raw), &arr) != nil {
  65. return 0
  66. }
  67. return len(arr)
  68. }
  69. // Get returns a single subscription by id.
  70. func (s *OutboundSubscriptionService) Get(id int) (*model.OutboundSubscription, error) {
  71. db := database.GetDB()
  72. var sub model.OutboundSubscription
  73. if err := db.First(&sub, id).Error; err != nil {
  74. return nil, err
  75. }
  76. return &sub, nil
  77. }
  78. // Create persists a new subscription. It does not fetch immediately; the caller
  79. // can call Refresh on the returned id if desired.
  80. var defaultPrefixRe = regexp.MustCompile(`^sub(\d+)-$`)
  81. // defaultPrefixNumber returns the smallest positive integer N that is not already
  82. // in use as a "subN-" tag prefix among the given subscriptions. This is used to
  83. // auto-name a subscription's outbounds when the user leaves the prefix blank, so
  84. // deleting a subscription frees its number for reuse instead of letting the
  85. // number grow forever with the auto-increment DB id. A subscription with a blank
  86. // prefix reserves its own id (it falls back to id-based "sub<id>-" tags).
  87. func defaultPrefixNumber(subs []*model.OutboundSubscription, excludeId int) int {
  88. used := map[int]bool{}
  89. for _, sub := range subs {
  90. if sub.Id == excludeId {
  91. continue
  92. }
  93. if sub.TagPrefix == "" {
  94. used[sub.Id] = true
  95. continue
  96. }
  97. if m := defaultPrefixRe.FindStringSubmatch(sub.TagPrefix); m != nil {
  98. if n, err := strconv.Atoi(m[1]); err == nil {
  99. used[n] = true
  100. }
  101. }
  102. }
  103. n := 1
  104. for used[n] {
  105. n++
  106. }
  107. return n
  108. }
  109. // nextDefaultSubPrefix builds the default "subN-" prefix for a new/edited
  110. // subscription, picking the smallest free N (excludeId skips a subscription's
  111. // own current prefix when editing).
  112. func (s *OutboundSubscriptionService) nextDefaultSubPrefix(excludeId int) string {
  113. var subs []*model.OutboundSubscription
  114. _ = database.GetDB().Find(&subs).Error
  115. return fmt.Sprintf("sub%d-", defaultPrefixNumber(subs, excludeId))
  116. }
  117. func (s *OutboundSubscriptionService) Create(remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) (*model.OutboundSubscription, error) {
  118. cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
  119. if err != nil {
  120. return nil, common.NewError("invalid subscription URL:", err)
  121. }
  122. if cleanURL == "" {
  123. return nil, common.NewError("subscription URL is required")
  124. }
  125. if updateInterval <= 0 {
  126. updateInterval = 600
  127. }
  128. prefix := strings.TrimSpace(tagPrefix)
  129. if prefix == "" {
  130. prefix = s.nextDefaultSubPrefix(0)
  131. }
  132. // New subscriptions go to the end of the priority order.
  133. var count int64
  134. database.GetDB().Model(&model.OutboundSubscription{}).Count(&count)
  135. sub := &model.OutboundSubscription{
  136. Remark: strings.TrimSpace(remark),
  137. Url: cleanURL,
  138. Enabled: enabled,
  139. AllowPrivate: allowPrivate,
  140. Prepend: prepend,
  141. Priority: int(count),
  142. TagPrefix: prefix,
  143. UpdateInterval: updateInterval,
  144. }
  145. if err := database.GetDB().Create(sub).Error; err != nil {
  146. return nil, err
  147. }
  148. return sub, nil
  149. }
  150. // Update updates editable fields.
  151. func (s *OutboundSubscriptionService) Update(id int, remark, rawURL, tagPrefix string, enabled bool, updateInterval int, allowPrivate, prepend bool) error {
  152. sub, err := s.Get(id)
  153. if err != nil {
  154. return err
  155. }
  156. cleanURL, err := SanitizePublicHTTPURL(rawURL, allowPrivate)
  157. if err != nil {
  158. return common.NewError("invalid subscription URL:", err)
  159. }
  160. if cleanURL == "" {
  161. return common.NewError("subscription URL is required")
  162. }
  163. if updateInterval <= 0 {
  164. updateInterval = 600
  165. }
  166. prefix := strings.TrimSpace(tagPrefix)
  167. if prefix == "" {
  168. prefix = s.nextDefaultSubPrefix(sub.Id)
  169. }
  170. sub.Remark = strings.TrimSpace(remark)
  171. sub.Url = cleanURL
  172. sub.Enabled = enabled
  173. sub.AllowPrivate = allowPrivate
  174. sub.Prepend = prepend
  175. sub.TagPrefix = prefix
  176. sub.UpdateInterval = updateInterval
  177. return database.GetDB().Save(sub).Error
  178. }
  179. // Delete removes a subscription.
  180. func (s *OutboundSubscriptionService) Delete(id int) error {
  181. return database.GetDB().Delete(&model.OutboundSubscription{}, id).Error
  182. }
  183. // GetLastOutbounds returns the last successfully fetched outbounds for a subscription
  184. // (as raw interface slice ready for JSON merge). Returns nil slice when none.
  185. func (s *OutboundSubscriptionService) GetLastOutbounds(id int) ([]any, error) {
  186. sub, err := s.Get(id)
  187. if err != nil {
  188. return nil, err
  189. }
  190. if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
  191. return nil, nil
  192. }
  193. var arr []any
  194. if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
  195. return nil, err
  196. }
  197. return arr, nil
  198. }
  199. // Refresh fetches the subscription URL, parses the links, assigns stable tags,
  200. // persists the results, and returns the generated outbounds.
  201. func (s *OutboundSubscriptionService) Refresh(id int) ([]any, error) {
  202. sub, err := s.Get(id)
  203. if err != nil {
  204. return nil, err
  205. }
  206. outbounds, err := s.fetchAndStore(sub)
  207. return outbounds, err
  208. }
  209. // RefreshAllEnabled fetches every enabled subscription whose due time has passed
  210. // (lastUpdated + updateInterval <= now). It returns the number of subscriptions
  211. // that were actually refreshed.
  212. func (s *OutboundSubscriptionService) RefreshAllEnabled() (int, error) {
  213. db := database.GetDB()
  214. var subs []*model.OutboundSubscription
  215. if err := db.Where("enabled = ?", true).Find(&subs).Error; err != nil {
  216. return 0, err
  217. }
  218. now := time.Now().Unix()
  219. refreshed := 0
  220. for _, sub := range subs {
  221. due := sub.LastUpdated + int64(sub.UpdateInterval)
  222. if sub.LastUpdated == 0 || due <= now {
  223. if _, err := s.fetchAndStore(sub); err != nil {
  224. logger.Warningf("outbound sub %d (%s) refresh failed: %v", sub.Id, sub.Remark, err)
  225. // continue with others
  226. } else {
  227. refreshed++
  228. }
  229. }
  230. }
  231. return refreshed, nil
  232. }
  233. // fetchAndStore does the actual network + parse + stability + persist work.
  234. func (s *OutboundSubscriptionService) fetchAndStore(sub *model.OutboundSubscription) ([]any, error) {
  235. // Re-sanitize on every fetch (handles legacy rows + defense in depth against
  236. // any direct DB tampering). Private targets are blocked unless this
  237. // subscription was explicitly created with AllowPrivate.
  238. cleanURL, err := SanitizePublicHTTPURL(sub.Url, sub.AllowPrivate)
  239. if err != nil {
  240. s.recordError(sub, err)
  241. return nil, err
  242. }
  243. if cleanURL == "" {
  244. return nil, common.NewError("subscription has no valid URL")
  245. }
  246. sub.Url = cleanURL // persist the cleaned version
  247. client := s.settingService.NewProxiedHTTPClient(30 * time.Second)
  248. // Re-validate every redirect hop: the initial host is checked above, but a
  249. // redirect could still point at a private/internal address (SSRF). Cap the
  250. // redirect chain as well.
  251. client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
  252. if len(via) >= 10 {
  253. return fmt.Errorf("stopped after 10 redirects")
  254. }
  255. if sub.AllowPrivate {
  256. return nil
  257. }
  258. ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second)
  259. defer cancel()
  260. return rejectPrivateHost(ctx, req.URL.Hostname())
  261. }
  262. req, err := http.NewRequest("GET", sub.Url, nil)
  263. if err != nil {
  264. s.recordError(sub, err)
  265. return nil, err
  266. }
  267. req.Header.Set("User-Agent", "3x-ui-outbound-sub/1.0")
  268. resp, err := client.Do(req)
  269. if err != nil {
  270. s.recordError(sub, err)
  271. return nil, err
  272. }
  273. defer resp.Body.Close()
  274. if resp.StatusCode != 200 {
  275. err := fmt.Errorf("http %d", resp.StatusCode)
  276. s.recordError(sub, err)
  277. return nil, err
  278. }
  279. body, err := readBoundedOutboundSubscriptionBody(resp.Body)
  280. if err != nil {
  281. s.recordError(sub, err)
  282. return nil, err
  283. }
  284. parsed, identities, err := link.ParseSubscriptionBody(body)
  285. if err != nil {
  286. s.recordError(sub, err)
  287. return nil, err
  288. }
  289. // Load previous identities -> tags for stability
  290. prev := map[string]string{}
  291. if strings.TrimSpace(sub.LinkIdentities) != "" {
  292. _ = json.Unmarshal([]byte(sub.LinkIdentities), &prev)
  293. }
  294. // Also load previous outbounds so we can reuse tags even for identities we
  295. // temporarily lost (defensive).
  296. prevTagByIndex := map[int]string{}
  297. if strings.TrimSpace(sub.LastFetchedOutbounds) != "" {
  298. var prevObs []any
  299. if json.Unmarshal([]byte(sub.LastFetchedOutbounds), &prevObs) == nil {
  300. for i, o := range prevObs {
  301. if m, ok := o.(map[string]any); ok {
  302. if tag, _ := m["tag"].(string); tag != "" {
  303. prevTagByIndex[i] = tag
  304. }
  305. }
  306. }
  307. }
  308. }
  309. // Assign tags with stability (identity reuse, positional fallback, then a
  310. // fresh allocation), keeping tags unique within this batch. Extracted into a
  311. // pure function so it can be unit-tested without network/DB. Tags are written
  312. // back into the parsed outbounds in place.
  313. assigned := assignStableTags(parsed, identities, prev, prevTagByIndex, sub.Id, sub.TagPrefix)
  314. // Persist identities for next time
  315. newIdent := map[string]string{}
  316. for i, id := range identities {
  317. newIdent[id] = assigned[i]
  318. }
  319. identJSON, _ := json.Marshal(newIdent)
  320. // Persist the outbounds (as compact JSON array)
  321. obsJSON, _ := json.Marshal(parsed)
  322. sub.LastFetchedOutbounds = string(obsJSON)
  323. sub.LinkIdentities = string(identJSON)
  324. sub.LastUpdated = time.Now().Unix()
  325. sub.LastError = ""
  326. if err := database.GetDB().Save(sub).Error; err != nil {
  327. return nil, err
  328. }
  329. // Return as []any for the config merger
  330. result := make([]any, len(parsed))
  331. for i := range parsed {
  332. result[i] = parsed[i]
  333. }
  334. return result, nil
  335. }
  336. func (s *OutboundSubscriptionService) recordError(sub *model.OutboundSubscription, err error) {
  337. sub.LastError = err.Error()
  338. _ = database.GetDB().Model(sub).Update("last_error", sub.LastError).Error
  339. }
  340. // assignStableTags assigns a tag to each parsed outbound, preferring stability:
  341. // 1. reuse the tag previously mapped to the link's identity (prev),
  342. // 2. else reuse the tag at the same position from the last fetch (prevTagByIndex),
  343. // 3. else allocate a fresh tag from the prefix + remark (link.SuggestTag).
  344. //
  345. // Tags are kept unique within the batch by appending "-N" on collision, and are
  346. // written back into parsed[i]["tag"]. The returned slice holds the assigned tags
  347. // in order. When tagPrefix is empty a "sub<subID>-" prefix is used for fresh tags.
  348. func assignStableTags(parsed []link.Outbound, identities []string, prev map[string]string, prevTagByIndex map[int]string, subID int, tagPrefix string) []string {
  349. used := map[string]bool{} // uniqueness within this refresh batch
  350. assigned := make([]string, len(parsed))
  351. for i := range parsed {
  352. id := ""
  353. if i < len(identities) {
  354. id = identities[i]
  355. }
  356. candidate := ""
  357. if old, ok := prev[id]; ok && old != "" {
  358. candidate = old
  359. }
  360. if candidate == "" {
  361. // try to reuse by rough positional match from previous fetch (best effort)
  362. if old, ok := prevTagByIndex[i]; ok && old != "" {
  363. candidate = old
  364. }
  365. }
  366. if candidate == "" {
  367. // fresh allocation
  368. prefix := tagPrefix
  369. if prefix == "" {
  370. prefix = fmt.Sprintf("sub%d-", subID)
  371. }
  372. remark := ""
  373. if m, ok := parsed[i]["tag"].(string); ok {
  374. remark = m
  375. }
  376. candidate = link.SuggestTag(prefix, remark, i)
  377. }
  378. // ensure local uniqueness inside this batch
  379. final := candidate
  380. for k := 1; used[final]; k++ {
  381. final = fmt.Sprintf("%s-%d", candidate, k)
  382. }
  383. used[final] = true
  384. assigned[i] = final
  385. // write back the tag into the outbound
  386. parsed[i]["tag"] = final
  387. }
  388. return assigned
  389. }
  390. // AllActiveOutbounds returns the concatenation of the last-fetched outbounds
  391. // for every enabled subscription. This is the set that should be merged into
  392. // the final Xray config. Order: subscription creation order (by id asc) so
  393. // that later subscriptions can shadow earlier ones if the admin uses colliding
  394. // prefixes (last writer wins inside xray, but we try to keep tags unique).
  395. func (s *OutboundSubscriptionService) AllActiveOutbounds() ([]any, error) {
  396. prepend, appendList, err := s.activeOutboundsSplit()
  397. if err != nil {
  398. return nil, err
  399. }
  400. return append(prepend, appendList...), nil
  401. }
  402. // activeOutboundsSplit returns the active subscription outbounds split into those
  403. // that should be placed BEFORE the manual template outbounds (Prepend) and those
  404. // placed AFTER. Within each group, subscriptions are ordered by Priority (then id)
  405. // so the admin can control the merged order.
  406. func (s *OutboundSubscriptionService) activeOutboundsSplit() (prepend []any, appendList []any, err error) {
  407. db := database.GetDB()
  408. var subs []*model.OutboundSubscription
  409. if err := db.Where("enabled = ?", true).Order("priority asc, id asc").Find(&subs).Error; err != nil {
  410. return nil, nil, err
  411. }
  412. for _, sub := range subs {
  413. if strings.TrimSpace(sub.LastFetchedOutbounds) == "" {
  414. continue
  415. }
  416. var arr []any
  417. if err := json.Unmarshal([]byte(sub.LastFetchedOutbounds), &arr); err != nil {
  418. logger.Warningf("outbound sub %d has corrupt LastFetchedOutbounds: %v", sub.Id, err)
  419. continue
  420. }
  421. if sub.Prepend {
  422. prepend = append(prepend, arr...)
  423. } else {
  424. appendList = append(appendList, arr...)
  425. }
  426. }
  427. return prepend, appendList, nil
  428. }
  429. // Move shifts a subscription one step up or down in the priority order and
  430. // re-normalizes all priorities to a 0..n-1 sequence.
  431. func (s *OutboundSubscriptionService) Move(id int, up bool) error {
  432. db := database.GetDB()
  433. var subs []*model.OutboundSubscription
  434. if err := db.Order("priority asc, id asc").Find(&subs).Error; err != nil {
  435. return err
  436. }
  437. idx := -1
  438. for i, sub := range subs {
  439. if sub.Id == id {
  440. idx = i
  441. break
  442. }
  443. }
  444. if idx == -1 {
  445. return common.NewError("subscription not found")
  446. }
  447. swap := idx + 1
  448. if up {
  449. swap = idx - 1
  450. }
  451. if swap < 0 || swap >= len(subs) {
  452. return nil // already at the edge
  453. }
  454. subs[idx], subs[swap] = subs[swap], subs[idx]
  455. for i, sub := range subs {
  456. if sub.Priority != i {
  457. if err := db.Model(sub).Update("priority", i).Error; err != nil {
  458. return err
  459. }
  460. }
  461. }
  462. return nil
  463. }
  464. // AllActiveOutboundTags returns only the tags of active subscription outbounds.
  465. // Useful for populating balancer / routing selectors without shipping full objects.
  466. func (s *OutboundSubscriptionService) AllActiveOutboundTags() ([]string, error) {
  467. obs, err := s.AllActiveOutbounds()
  468. if err != nil {
  469. return nil, err
  470. }
  471. tags := make([]string, 0, len(obs))
  472. for _, o := range obs {
  473. if m, ok := o.(map[string]any); ok {
  474. if t, _ := m["tag"].(string); t != "" {
  475. tags = append(tags, t)
  476. }
  477. }
  478. }
  479. return tags, nil
  480. }
  481. /*
  482. Tag stability strategy (important for balancers and routing rules)
  483. When a subscription is refreshed we try very hard to keep the *same* tag for the
  484. same logical outbound so that existing balancers and routing rules keep working.
  485. How we do it:
  486. - On every successful parse we compute a stable "identity" for each link
  487. (the core of the URI with the remark fragment removed, or for vmess the inner
  488. JSON without the "ps" field).
  489. - We persist a map identity -> tag in the LinkIdentities column.
  490. - On the next refresh, if we see the same identity again we reuse the previous tag,
  491. even if the remark changed or minor parameters moved.
  492. - Only when we have never seen the identity before do we allocate a fresh tag
  493. using the user-supplied TagPrefix + slug(remark) (or an index fallback).
  494. - Within one refresh we still deduplicate with -N suffixes.
  495. Consequences for balancers / routing:
  496. - If you use an *exact* tag in a balancer selector or a routing rule, that
  497. specific server will continue to be used after refreshes (as long as the
  498. provider still returns a link that produces the same identity).
  499. - If you use a *prefix/wildcard* selector (e.g. "hk-*", "sg-.*"), then any
  500. *new* servers that the subscription later returns will automatically be
  501. eligible for that balancer on the next Xray reload — this is the recommended
  502. way to "subscribe to a pool".
  503. - When a server disappears from the subscription, its tag simply stops
  504. existing in the final outbounds array. The balancer will have fewer
  505. candidates. If you configured a `fallbackTag` on the balancer, Xray will use
  506. it. Otherwise connections that would have used the missing member may fail
  507. or be routed by the next rule.
  508. - If the provider rotates credentials/UUIDs/hosts for a server, the identity
  509. changes → we treat it as a brand new outbound and give it a new tag. Any
  510. balancer/rule that referenced the *old* tag will no longer see it. This is
  511. an inherent limitation of subscription-based outbounds.
  512. We deliberately do *not* mutate the saved xrayTemplateConfig. Subscription
  513. outbounds are always injected at runtime in GetXrayConfig.
  514. */