manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package mtproto
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v3/database/model"
  14. "github.com/mhsanaei/3x-ui/v3/logger"
  15. )
  16. // Instance is the desired runtime configuration of one mtproto inbound.
  17. type Instance struct {
  18. Id int
  19. Tag string
  20. Listen string
  21. Port int
  22. Secret string
  23. // Optional mtg tuning; each is omitted from the generated TOML when
  24. // zero-valued so mtg falls back to its own defaults.
  25. Debug bool
  26. ProxyProtocolListener bool
  27. PreferIP string
  28. FrontingIP string
  29. FrontingPort int
  30. FrontingProxyProtocol bool
  31. }
  32. func (inst Instance) bindTo() string {
  33. listen := inst.Listen
  34. if listen == "" {
  35. listen = "0.0.0.0"
  36. }
  37. return fmt.Sprintf("%s:%d", listen, inst.Port)
  38. }
  39. // fingerprint changes whenever any value that ends up in the generated TOML
  40. // changes, so ensureLocked restarts mtg when the operator edits a setting.
  41. func (inst Instance) fingerprint() string {
  42. return strings.Join([]string{
  43. inst.bindTo(),
  44. inst.Secret,
  45. strconv.FormatBool(inst.Debug),
  46. strconv.FormatBool(inst.ProxyProtocolListener),
  47. inst.PreferIP,
  48. inst.FrontingIP,
  49. strconv.Itoa(inst.FrontingPort),
  50. strconv.FormatBool(inst.FrontingProxyProtocol),
  51. }, "|")
  52. }
  53. // Traffic is a per-inbound traffic delta scraped from an mtg metrics endpoint.
  54. type Traffic struct {
  55. Tag string
  56. Up int64
  57. Down int64
  58. }
  59. type managed struct {
  60. proc *Process
  61. tag string
  62. fingerprint string
  63. metricsPort int
  64. lastUp int64
  65. lastDown int64
  66. haveLast bool
  67. }
  68. // Manager owns the set of running mtg processes keyed by inbound id.
  69. type Manager struct {
  70. mu sync.Mutex
  71. procs map[int]*managed
  72. // swept records that the one-time startup cleanup of orphaned mtg
  73. // processes (survivors of a previous x-ui run) has already run.
  74. swept bool
  75. }
  76. var (
  77. managerOnce sync.Once
  78. manager *Manager
  79. )
  80. // GetManager returns the process-wide mtg manager singleton.
  81. func GetManager() *Manager {
  82. managerOnce.Do(func() {
  83. manager = &Manager{procs: map[int]*managed{}}
  84. })
  85. return manager
  86. }
  87. // InstanceFromInbound derives a desired Instance from an mtproto inbound,
  88. // healing the FakeTLS secret so it always matches the configured domain.
  89. // Returns false when the inbound is not a usable mtproto inbound.
  90. func InstanceFromInbound(ib *model.Inbound) (Instance, bool) {
  91. if ib == nil || ib.Protocol != model.MTProto {
  92. return Instance{}, false
  93. }
  94. settings := ib.Settings
  95. if healed, ok := model.HealMtprotoSecret(settings); ok {
  96. settings = healed
  97. }
  98. var parsed struct {
  99. Secret string `json:"secret"`
  100. Debug bool `json:"debug"`
  101. ProxyProtocolListener bool `json:"proxyProtocolListener"`
  102. PreferIP string `json:"preferIp"`
  103. DomainFronting struct {
  104. IP string `json:"ip"`
  105. Port int `json:"port"`
  106. ProxyProtocol bool `json:"proxyProtocol"`
  107. } `json:"domainFronting"`
  108. }
  109. if err := json.Unmarshal([]byte(settings), &parsed); err != nil {
  110. return Instance{}, false
  111. }
  112. if parsed.Secret == "" {
  113. return Instance{}, false
  114. }
  115. return Instance{
  116. Id: ib.Id,
  117. Tag: ib.Tag,
  118. Listen: ib.Listen,
  119. Port: ib.Port,
  120. Secret: parsed.Secret,
  121. Debug: parsed.Debug,
  122. ProxyProtocolListener: parsed.ProxyProtocolListener,
  123. PreferIP: parsed.PreferIP,
  124. FrontingIP: parsed.DomainFronting.IP,
  125. FrontingPort: parsed.DomainFronting.Port,
  126. FrontingProxyProtocol: parsed.DomainFronting.ProxyProtocol,
  127. }, true
  128. }
  129. // Ensure starts the mtg process for an instance, or restarts it when its
  130. // configuration changed. A no-op when the desired process is already running.
  131. func (m *Manager) Ensure(inst Instance) error {
  132. m.mu.Lock()
  133. defer m.mu.Unlock()
  134. m.sweepOrphansLocked()
  135. return m.ensureLocked(inst)
  136. }
  137. // sweepOrphansLocked kills mtg processes left running by a previous x-ui run,
  138. // exactly once per process lifetime and before any of our own mtg are started.
  139. // Because x-ui owns every mtg process, anything alive at this point is an orphan
  140. // that would otherwise keep holding an inbound port with a stale secret.
  141. func (m *Manager) sweepOrphansLocked() {
  142. if m.swept {
  143. return
  144. }
  145. m.swept = true
  146. if n := killStrayMtgProcesses(GetBinaryPath()); n > 0 {
  147. logger.Warningf("mtproto: terminated %d orphaned mtg process(es) from a previous run", n)
  148. }
  149. }
  150. func (m *Manager) ensureLocked(inst Instance) error {
  151. fp := inst.fingerprint()
  152. if cur, ok := m.procs[inst.Id]; ok {
  153. if cur.fingerprint == fp && cur.proc.IsRunning() {
  154. cur.tag = inst.Tag
  155. return nil
  156. }
  157. cur.proc.Stop()
  158. delete(m.procs, inst.Id)
  159. }
  160. metricsPort, err := freeLocalPort()
  161. if err != nil {
  162. return err
  163. }
  164. cfgPath := configPathForID(inst.Id)
  165. if err := writeConfig(cfgPath, inst, metricsPort); err != nil {
  166. return err
  167. }
  168. proc := newProcess(cfgPath, fmt.Sprintf("inbound %d", inst.Id))
  169. if err := proc.Start(); err != nil {
  170. return err
  171. }
  172. m.procs[inst.Id] = &managed{
  173. proc: proc,
  174. tag: inst.Tag,
  175. fingerprint: fp,
  176. metricsPort: metricsPort,
  177. }
  178. logger.Infof("mtproto: started mtg for inbound %d on %s", inst.Id, inst.bindTo())
  179. return nil
  180. }
  181. // Remove stops and forgets the mtg process for an inbound id.
  182. func (m *Manager) Remove(id int) {
  183. m.mu.Lock()
  184. defer m.mu.Unlock()
  185. if cur, ok := m.procs[id]; ok {
  186. cur.proc.Stop()
  187. delete(m.procs, id)
  188. _ = os.Remove(configPathForID(id))
  189. logger.Infof("mtproto: stopped mtg for inbound %d", id)
  190. }
  191. }
  192. // Reconcile drives the running set toward the desired instances: it stops
  193. // processes that are no longer wanted and (re)starts the rest. Used at boot
  194. // and periodically to recover from crashes.
  195. func (m *Manager) Reconcile(desired []Instance) {
  196. m.mu.Lock()
  197. defer m.mu.Unlock()
  198. m.sweepOrphansLocked()
  199. want := make(map[int]struct{}, len(desired))
  200. for _, inst := range desired {
  201. want[inst.Id] = struct{}{}
  202. }
  203. for id, cur := range m.procs {
  204. if _, ok := want[id]; !ok {
  205. cur.proc.Stop()
  206. delete(m.procs, id)
  207. _ = os.Remove(configPathForID(id))
  208. }
  209. }
  210. for _, inst := range desired {
  211. if err := m.ensureLocked(inst); err != nil {
  212. logger.Warningf("mtproto: reconcile failed for inbound %d: %v", inst.Id, err)
  213. }
  214. }
  215. }
  216. // StopAll stops every managed mtg process. Called on panel shutdown.
  217. func (m *Manager) StopAll() {
  218. m.mu.Lock()
  219. defer m.mu.Unlock()
  220. for id, cur := range m.procs {
  221. _ = cur.proc.Stop()
  222. _ = os.Remove(configPathForID(id))
  223. delete(m.procs, id)
  224. }
  225. }
  226. // CollectTraffic scrapes each running mtg metrics endpoint and returns the
  227. // per-inbound byte deltas since the previous scrape.
  228. func (m *Manager) CollectTraffic() []Traffic {
  229. // Snapshot the state we need under the lock, then release before doing
  230. // network I/O so that Ensure/Reconcile/Remove are not blocked.
  231. type snap struct {
  232. id int
  233. metricsPort int
  234. tag string
  235. haveLast bool
  236. lastUp int64
  237. lastDown int64
  238. }
  239. m.mu.Lock()
  240. snaps := make([]snap, 0, len(m.procs))
  241. for id, cur := range m.procs {
  242. if cur.proc == nil || !cur.proc.IsRunning() {
  243. continue
  244. }
  245. snaps = append(snaps, snap{
  246. id: id,
  247. metricsPort: cur.metricsPort,
  248. tag: cur.tag,
  249. haveLast: cur.haveLast,
  250. lastUp: cur.lastUp,
  251. lastDown: cur.lastDown,
  252. })
  253. }
  254. m.mu.Unlock()
  255. out := make([]Traffic, 0, len(snaps))
  256. for _, s := range snaps {
  257. up, down, ok := scrapeTraffic(s.metricsPort)
  258. if !ok {
  259. continue
  260. }
  261. var du, dd int64
  262. if s.haveLast {
  263. du = up - s.lastUp
  264. dd = down - s.lastDown
  265. if du < 0 {
  266. du = 0
  267. }
  268. if dd < 0 {
  269. dd = 0
  270. }
  271. }
  272. // Re-acquire lock to persist the new baseline, but only if the entry
  273. // still exists (it may have been removed during the scrape).
  274. m.mu.Lock()
  275. if cur, ok := m.procs[s.id]; ok {
  276. cur.lastUp = up
  277. cur.lastDown = down
  278. cur.haveLast = true
  279. }
  280. m.mu.Unlock()
  281. if s.haveLast && (du > 0 || dd > 0) {
  282. out = append(out, Traffic{Tag: s.tag, Up: du, Down: dd})
  283. }
  284. }
  285. return out
  286. }
  287. func freeLocalPort() (int, error) {
  288. l, err := net.Listen("tcp", "127.0.0.1:0")
  289. if err != nil {
  290. return 0, err
  291. }
  292. defer l.Close()
  293. return l.Addr().(*net.TCPAddr).Port, nil
  294. }
  295. // renderConfig builds the mtg TOML for an instance. Top-level keys must precede
  296. // any [section] header in TOML, so the layout is: required keys, then the
  297. // optional scalar tuning, then [domain-fronting], and finally [stats.prometheus]
  298. // — which x-ui always emits and scrapes for traffic (see scrapeTraffic).
  299. func renderConfig(inst Instance, metricsPort int) string {
  300. var b strings.Builder
  301. fmt.Fprintf(&b, "secret = %q\n", inst.Secret)
  302. fmt.Fprintf(&b, "bind-to = %q\n", inst.bindTo())
  303. if inst.Debug {
  304. b.WriteString("debug = true\n")
  305. }
  306. if inst.ProxyProtocolListener {
  307. b.WriteString("proxy-protocol-listener = true\n")
  308. }
  309. if inst.PreferIP != "" {
  310. fmt.Fprintf(&b, "prefer-ip = %q\n", inst.PreferIP)
  311. }
  312. if inst.FrontingIP != "" || inst.FrontingPort > 0 || inst.FrontingProxyProtocol {
  313. b.WriteString("\n[domain-fronting]\n")
  314. if inst.FrontingIP != "" {
  315. fmt.Fprintf(&b, "ip = %q\n", inst.FrontingIP)
  316. }
  317. if inst.FrontingPort > 0 {
  318. fmt.Fprintf(&b, "port = %d\n", inst.FrontingPort)
  319. }
  320. if inst.FrontingProxyProtocol {
  321. b.WriteString("proxy-protocol = true\n")
  322. }
  323. }
  324. fmt.Fprintf(&b, "\n[stats.prometheus]\nenabled = true\nbind-to = \"127.0.0.1:%d\"\nhttp-path = \"/metrics\"\nmetric-prefix = \"mtg\"\n", metricsPort)
  325. return b.String()
  326. }
  327. func writeConfig(path string, inst Instance, metricsPort int) error {
  328. if err := os.MkdirAll(configDir(), 0o750); err != nil {
  329. return err
  330. }
  331. return os.WriteFile(path, []byte(renderConfig(inst, metricsPort)), 0o640)
  332. }
  333. // scrapeTraffic reads the mtg Prometheus metrics endpoint and sums byte
  334. // counters by direction. mtg exposes a traffic counter labelled with a
  335. // direction; "to_telegram" is treated as upload and "to_client" as download.
  336. // Best-effort: an unreachable endpoint or unrecognised format yields ok=false.
  337. func scrapeTraffic(port int) (up int64, down int64, ok bool) {
  338. client := http.Client{Timeout: 3 * time.Second}
  339. resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
  340. if err != nil {
  341. return 0, 0, false
  342. }
  343. defer resp.Body.Close()
  344. scanner := bufio.NewScanner(resp.Body)
  345. scanner.Buffer(make([]byte, 64*1024), 1024*1024)
  346. found := false
  347. for scanner.Scan() {
  348. line := strings.TrimSpace(scanner.Text())
  349. if line == "" || line[0] == '#' || !strings.Contains(line, "traffic") {
  350. continue
  351. }
  352. name, labels, value, perr := parseMetricLine(line)
  353. if perr != nil || !strings.HasPrefix(name, "mtg") {
  354. continue
  355. }
  356. switch labels["direction"] {
  357. case "to_telegram", "egress", "up":
  358. up += int64(value)
  359. case "to_client", "ingress", "down":
  360. down += int64(value)
  361. default:
  362. down += int64(value)
  363. }
  364. found = true
  365. }
  366. if err := scanner.Err(); err != nil {
  367. logger.Debug("mtproto: metrics scan error:", err)
  368. }
  369. return up, down, found
  370. }
  371. func parseMetricLine(line string) (name string, labels map[string]string, value float64, err error) {
  372. labels = map[string]string{}
  373. rest := line
  374. if brace := strings.IndexByte(line, '{'); brace >= 0 {
  375. name = line[:brace]
  376. end := strings.IndexByte(line, '}')
  377. if end < brace {
  378. return "", nil, 0, fmt.Errorf("malformed metric line")
  379. }
  380. for _, kv := range strings.Split(line[brace+1:end], ",") {
  381. eq := strings.IndexByte(kv, '=')
  382. if eq < 0 {
  383. continue
  384. }
  385. labels[strings.TrimSpace(kv[:eq])] = strings.Trim(strings.TrimSpace(kv[eq+1:]), `"`)
  386. }
  387. rest = strings.TrimSpace(line[end+1:])
  388. } else {
  389. fields := strings.Fields(line)
  390. if len(fields) < 2 {
  391. return "", nil, 0, fmt.Errorf("malformed metric line")
  392. }
  393. name = fields[0]
  394. rest = fields[1]
  395. }
  396. valFields := strings.Fields(rest)
  397. if len(valFields) == 0 {
  398. return "", nil, 0, fmt.Errorf("missing metric value")
  399. }
  400. value, err = strconv.ParseFloat(valFields[0], 64)
  401. if err != nil {
  402. return "", nil, 0, err
  403. }
  404. return name, labels, value, nil
  405. }