manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package mtproto
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  15. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  16. )
  17. // Instance is the desired runtime configuration of one mtproto inbound.
  18. type Instance struct {
  19. Id int
  20. Tag string
  21. Listen string
  22. Port int
  23. Secret string
  24. // Optional mtg tuning; each is omitted from the generated TOML when
  25. // zero-valued so mtg falls back to its own defaults.
  26. Debug bool
  27. ProxyProtocolListener bool
  28. PreferIP string
  29. FrontingIP string
  30. FrontingPort int
  31. FrontingProxyProtocol bool
  32. // When RouteThroughXray is set, mtg dials Telegram through the loopback
  33. // SOCKS bridge the panel injects into the Xray config at XrayRoutePort, so
  34. // the egress obeys the core's routing rules instead of going out directly.
  35. RouteThroughXray bool
  36. XrayRoutePort int
  37. }
  38. func (inst Instance) bindTo() string {
  39. listen := inst.Listen
  40. if listen == "" {
  41. listen = "0.0.0.0"
  42. }
  43. return fmt.Sprintf("%s:%d", listen, inst.Port)
  44. }
  45. // fingerprint changes whenever any value that ends up in the generated TOML
  46. // changes, so ensureLocked restarts mtg when the operator edits a setting.
  47. func (inst Instance) fingerprint() string {
  48. return strings.Join([]string{
  49. inst.bindTo(),
  50. inst.Secret,
  51. strconv.FormatBool(inst.Debug),
  52. strconv.FormatBool(inst.ProxyProtocolListener),
  53. inst.PreferIP,
  54. inst.FrontingIP,
  55. strconv.Itoa(inst.FrontingPort),
  56. strconv.FormatBool(inst.FrontingProxyProtocol),
  57. strconv.FormatBool(inst.RouteThroughXray),
  58. strconv.Itoa(inst.XrayRoutePort),
  59. }, "|")
  60. }
  61. // Traffic is a per-inbound traffic delta scraped from an mtg metrics endpoint.
  62. type Traffic struct {
  63. Tag string
  64. Up int64
  65. Down int64
  66. }
  67. type managed struct {
  68. proc *Process
  69. tag string
  70. fingerprint string
  71. metricsPort int
  72. lastUp int64
  73. lastDown int64
  74. haveLast bool
  75. }
  76. // Manager owns the set of running mtg processes keyed by inbound id.
  77. type Manager struct {
  78. mu sync.Mutex
  79. procs map[int]*managed
  80. // swept records that the one-time startup cleanup of orphaned mtg
  81. // processes (survivors of a previous x-ui run) has already run.
  82. swept bool
  83. }
  84. var (
  85. managerOnce sync.Once
  86. manager *Manager
  87. )
  88. // GetManager returns the process-wide mtg manager singleton.
  89. func GetManager() *Manager {
  90. managerOnce.Do(func() {
  91. manager = &Manager{procs: map[int]*managed{}}
  92. })
  93. return manager
  94. }
  95. // InstanceFromInbound derives a desired Instance from an mtproto inbound,
  96. // healing the FakeTLS secret so it always matches the configured domain.
  97. // Returns false when the inbound is not a usable mtproto inbound.
  98. func InstanceFromInbound(ib *model.Inbound) (Instance, bool) {
  99. if ib == nil || ib.Protocol != model.MTProto {
  100. return Instance{}, false
  101. }
  102. settings := ib.Settings
  103. if healed, ok := model.HealMtprotoSecret(settings); ok {
  104. settings = healed
  105. }
  106. var parsed struct {
  107. Secret string `json:"secret"`
  108. Debug bool `json:"debug"`
  109. ProxyProtocolListener bool `json:"proxyProtocolListener"`
  110. PreferIP string `json:"preferIp"`
  111. DomainFronting struct {
  112. IP string `json:"ip"`
  113. Port int `json:"port"`
  114. ProxyProtocol bool `json:"proxyProtocol"`
  115. } `json:"domainFronting"`
  116. RouteThroughXray bool `json:"routeThroughXray"`
  117. RouteXrayPort int `json:"routeXrayPort"`
  118. }
  119. if err := json.Unmarshal([]byte(settings), &parsed); err != nil {
  120. return Instance{}, false
  121. }
  122. if parsed.Secret == "" {
  123. return Instance{}, false
  124. }
  125. return Instance{
  126. Id: ib.Id,
  127. Tag: ib.Tag,
  128. Listen: ib.Listen,
  129. Port: ib.Port,
  130. Secret: parsed.Secret,
  131. Debug: parsed.Debug,
  132. ProxyProtocolListener: parsed.ProxyProtocolListener,
  133. PreferIP: parsed.PreferIP,
  134. FrontingIP: parsed.DomainFronting.IP,
  135. FrontingPort: parsed.DomainFronting.Port,
  136. FrontingProxyProtocol: parsed.DomainFronting.ProxyProtocol,
  137. RouteThroughXray: parsed.RouteThroughXray,
  138. XrayRoutePort: parsed.RouteXrayPort,
  139. }, true
  140. }
  141. // Ensure starts the mtg process for an instance, or restarts it when its
  142. // configuration changed. A no-op when the desired process is already running.
  143. func (m *Manager) Ensure(inst Instance) error {
  144. m.mu.Lock()
  145. defer m.mu.Unlock()
  146. m.sweepOrphansLocked()
  147. return m.ensureLocked(inst)
  148. }
  149. // sweepOrphansLocked kills mtg processes left running by a previous x-ui run,
  150. // exactly once per process lifetime and before any of our own mtg are started.
  151. // Because x-ui owns every mtg process, anything alive at this point is an orphan
  152. // that would otherwise keep holding an inbound port with a stale secret.
  153. func (m *Manager) sweepOrphansLocked() {
  154. if m.swept {
  155. return
  156. }
  157. m.swept = true
  158. if n := killStrayMtgProcesses(GetBinaryPath()); n > 0 {
  159. logger.Warningf("mtproto: terminated %d orphaned mtg process(es) from a previous run", n)
  160. }
  161. }
  162. func (m *Manager) ensureLocked(inst Instance) error {
  163. fp := inst.fingerprint()
  164. if cur, ok := m.procs[inst.Id]; ok {
  165. if cur.fingerprint == fp && cur.proc.IsRunning() {
  166. cur.tag = inst.Tag
  167. return nil
  168. }
  169. _ = cur.proc.Stop()
  170. delete(m.procs, inst.Id)
  171. }
  172. metricsPort, err := FreeLocalPort()
  173. if err != nil {
  174. return err
  175. }
  176. cfgPath := configPathForID(inst.Id)
  177. if err := writeConfig(cfgPath, inst, metricsPort); err != nil {
  178. return err
  179. }
  180. proc := newProcess(cfgPath, fmt.Sprintf("inbound %d", inst.Id))
  181. if err := proc.Start(); err != nil {
  182. return err
  183. }
  184. m.procs[inst.Id] = &managed{
  185. proc: proc,
  186. tag: inst.Tag,
  187. fingerprint: fp,
  188. metricsPort: metricsPort,
  189. }
  190. logger.Infof("mtproto: started mtg for inbound %d on %s", inst.Id, inst.bindTo())
  191. return nil
  192. }
  193. // Remove stops and forgets the mtg process for an inbound id.
  194. func (m *Manager) Remove(id int) {
  195. m.mu.Lock()
  196. defer m.mu.Unlock()
  197. if cur, ok := m.procs[id]; ok {
  198. _ = cur.proc.Stop()
  199. delete(m.procs, id)
  200. _ = os.Remove(configPathForID(id))
  201. logger.Infof("mtproto: stopped mtg for inbound %d", id)
  202. }
  203. }
  204. // Reconcile drives the running set toward the desired instances: it stops
  205. // processes that are no longer wanted and (re)starts the rest. Used at boot
  206. // and periodically to recover from crashes.
  207. func (m *Manager) Reconcile(desired []Instance) {
  208. m.mu.Lock()
  209. defer m.mu.Unlock()
  210. m.sweepOrphansLocked()
  211. want := make(map[int]struct{}, len(desired))
  212. for _, inst := range desired {
  213. want[inst.Id] = struct{}{}
  214. }
  215. for id, cur := range m.procs {
  216. if _, ok := want[id]; !ok {
  217. _ = cur.proc.Stop()
  218. delete(m.procs, id)
  219. _ = os.Remove(configPathForID(id))
  220. }
  221. }
  222. for _, inst := range desired {
  223. if err := m.ensureLocked(inst); err != nil {
  224. logger.Warningf("mtproto: reconcile failed for inbound %d: %v", inst.Id, err)
  225. }
  226. }
  227. }
  228. // StopAll stops every managed mtg process. Called on panel shutdown.
  229. func (m *Manager) StopAll() {
  230. m.mu.Lock()
  231. defer m.mu.Unlock()
  232. for id, cur := range m.procs {
  233. _ = cur.proc.Stop()
  234. _ = os.Remove(configPathForID(id))
  235. delete(m.procs, id)
  236. }
  237. }
  238. // CollectTraffic scrapes each running mtg metrics endpoint and returns the
  239. // per-inbound byte deltas since the previous scrape.
  240. func (m *Manager) CollectTraffic() []Traffic {
  241. // Snapshot the state we need under the lock, then release before doing
  242. // network I/O so that Ensure/Reconcile/Remove are not blocked.
  243. type snap struct {
  244. id int
  245. metricsPort int
  246. tag string
  247. haveLast bool
  248. lastUp int64
  249. lastDown int64
  250. }
  251. m.mu.Lock()
  252. snaps := make([]snap, 0, len(m.procs))
  253. for id, cur := range m.procs {
  254. if cur.proc == nil || !cur.proc.IsRunning() {
  255. continue
  256. }
  257. snaps = append(snaps, snap{
  258. id: id,
  259. metricsPort: cur.metricsPort,
  260. tag: cur.tag,
  261. haveLast: cur.haveLast,
  262. lastUp: cur.lastUp,
  263. lastDown: cur.lastDown,
  264. })
  265. }
  266. m.mu.Unlock()
  267. out := make([]Traffic, 0, len(snaps))
  268. for _, s := range snaps {
  269. up, down, ok := scrapeTraffic(s.metricsPort)
  270. if !ok {
  271. continue
  272. }
  273. var du, dd int64
  274. if s.haveLast {
  275. du = up - s.lastUp
  276. dd = down - s.lastDown
  277. if du < 0 {
  278. du = 0
  279. }
  280. if dd < 0 {
  281. dd = 0
  282. }
  283. }
  284. // Re-acquire lock to persist the new baseline, but only if the entry
  285. // still exists (it may have been removed during the scrape).
  286. m.mu.Lock()
  287. if cur, ok := m.procs[s.id]; ok {
  288. cur.lastUp = up
  289. cur.lastDown = down
  290. cur.haveLast = true
  291. }
  292. m.mu.Unlock()
  293. if s.haveLast && (du > 0 || dd > 0) {
  294. out = append(out, Traffic{Tag: s.tag, Up: du, Down: dd})
  295. }
  296. }
  297. return out
  298. }
  299. // FreeLocalPort asks the OS for an unused loopback TCP port. It is used both
  300. // for mtg's metrics endpoint and to allocate the per-inbound SOCKS egress
  301. // bridge port persisted into mtproto inbound settings.
  302. func FreeLocalPort() (int, error) {
  303. l, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", "127.0.0.1:0")
  304. if err != nil {
  305. return 0, err
  306. }
  307. defer l.Close()
  308. return l.Addr().(*net.TCPAddr).Port, nil
  309. }
  310. // renderConfig builds the mtg TOML for an instance. Top-level keys must precede
  311. // any [section] header in TOML, so the layout is: required keys, then the
  312. // optional scalar tuning, then [domain-fronting], and finally [stats.prometheus]
  313. // — which x-ui always emits and scrapes for traffic (see scrapeTraffic).
  314. func renderConfig(inst Instance, metricsPort int) string {
  315. var b strings.Builder
  316. fmt.Fprintf(&b, "secret = %q\n", inst.Secret)
  317. fmt.Fprintf(&b, "bind-to = %q\n", inst.bindTo())
  318. if inst.Debug {
  319. b.WriteString("debug = true\n")
  320. }
  321. if inst.ProxyProtocolListener {
  322. b.WriteString("proxy-protocol-listener = true\n")
  323. }
  324. if inst.PreferIP != "" {
  325. fmt.Fprintf(&b, "prefer-ip = %q\n", inst.PreferIP)
  326. }
  327. if inst.FrontingIP != "" || inst.FrontingPort > 0 || inst.FrontingProxyProtocol {
  328. b.WriteString("\n[domain-fronting]\n")
  329. if inst.FrontingIP != "" {
  330. fmt.Fprintf(&b, "ip = %q\n", inst.FrontingIP)
  331. }
  332. if inst.FrontingPort > 0 {
  333. fmt.Fprintf(&b, "port = %d\n", inst.FrontingPort)
  334. }
  335. if inst.FrontingProxyProtocol {
  336. b.WriteString("proxy-protocol = true\n")
  337. }
  338. }
  339. // When the inbound opts into Xray routing, mtg reaches Telegram through the
  340. // loopback SOCKS bridge the panel injects into the running Xray config. mtg
  341. // only supports SOCKS5 upstreams, which is exactly what the bridge exposes.
  342. if inst.RouteThroughXray && inst.XrayRoutePort > 0 {
  343. fmt.Fprintf(&b, "\n[network]\nproxies = [\"socks5://127.0.0.1:%d\"]\n", inst.XrayRoutePort)
  344. }
  345. fmt.Fprintf(&b, "\n[stats.prometheus]\nenabled = true\nbind-to = \"127.0.0.1:%d\"\nhttp-path = \"/metrics\"\nmetric-prefix = \"mtg\"\n", metricsPort)
  346. return b.String()
  347. }
  348. func writeConfig(path string, inst Instance, metricsPort int) error {
  349. if err := os.MkdirAll(configDir(), 0o750); err != nil {
  350. return err
  351. }
  352. return os.WriteFile(path, []byte(renderConfig(inst, metricsPort)), 0o640)
  353. }
  354. // scrapeTraffic reads the mtg Prometheus metrics endpoint and sums byte
  355. // counters by direction. mtg exposes a traffic counter labelled with a
  356. // direction; "to_telegram" is treated as upload and "to_client" as download.
  357. // Best-effort: an unreachable endpoint or unrecognised format yields ok=false.
  358. func scrapeTraffic(port int) (up int64, down int64, ok bool) {
  359. client := http.Client{Timeout: 3 * time.Second}
  360. req, reqErr := http.NewRequestWithContext(context.Background(), http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/metrics", port), nil)
  361. if reqErr != nil {
  362. return 0, 0, false
  363. }
  364. resp, err := client.Do(req)
  365. if err != nil {
  366. return 0, 0, false
  367. }
  368. defer resp.Body.Close()
  369. scanner := bufio.NewScanner(resp.Body)
  370. scanner.Buffer(make([]byte, 64*1024), 1024*1024)
  371. found := false
  372. for scanner.Scan() {
  373. line := strings.TrimSpace(scanner.Text())
  374. if line == "" || line[0] == '#' || !strings.Contains(line, "traffic") {
  375. continue
  376. }
  377. name, labels, value, perr := parseMetricLine(line)
  378. if perr != nil || !strings.HasPrefix(name, "mtg") {
  379. continue
  380. }
  381. switch labels["direction"] {
  382. case "to_telegram", "egress", "up":
  383. up += int64(value)
  384. case "to_client", "ingress", "down":
  385. down += int64(value)
  386. default:
  387. down += int64(value)
  388. }
  389. found = true
  390. }
  391. if err := scanner.Err(); err != nil {
  392. logger.Debug("mtproto: metrics scan error:", err)
  393. }
  394. return up, down, found
  395. }
  396. func parseMetricLine(line string) (name string, labels map[string]string, value float64, err error) {
  397. labels = map[string]string{}
  398. var rest string
  399. if brace := strings.IndexByte(line, '{'); brace >= 0 {
  400. name = line[:brace]
  401. end := strings.IndexByte(line, '}')
  402. if end < brace {
  403. return "", nil, 0, fmt.Errorf("malformed metric line")
  404. }
  405. for kv := range strings.SplitSeq(line[brace+1:end], ",") {
  406. before, after, ok := strings.Cut(kv, "=")
  407. if !ok {
  408. continue
  409. }
  410. labels[strings.TrimSpace(before)] = strings.Trim(strings.TrimSpace(after), `"`)
  411. }
  412. rest = strings.TrimSpace(line[end+1:])
  413. } else {
  414. fields := strings.Fields(line)
  415. if len(fields) < 2 {
  416. return "", nil, 0, fmt.Errorf("malformed metric line")
  417. }
  418. name = fields[0]
  419. rest = fields[1]
  420. }
  421. valFields := strings.Fields(rest)
  422. if len(valFields) == 0 {
  423. return "", nil, 0, fmt.Errorf("missing metric value")
  424. }
  425. value, err = strconv.ParseFloat(valFields[0], 64)
  426. if err != nil {
  427. return "", nil, 0, err
  428. }
  429. return name, labels, value, nil
  430. }