manager.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package mtproto
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v3/database/model"
  14. "github.com/mhsanaei/3x-ui/v3/logger"
  15. )
  16. // Instance is the desired runtime configuration of one mtproto inbound.
  17. type Instance struct {
  18. Id int
  19. Tag string
  20. Listen string
  21. Port int
  22. Secret string
  23. }
  24. func (inst Instance) bindTo() string {
  25. listen := inst.Listen
  26. if listen == "" {
  27. listen = "0.0.0.0"
  28. }
  29. return fmt.Sprintf("%s:%d", listen, inst.Port)
  30. }
  31. func (inst Instance) fingerprint() string {
  32. return fmt.Sprintf("%s|%s", inst.bindTo(), inst.Secret)
  33. }
  34. // Traffic is a per-inbound traffic delta scraped from an mtg metrics endpoint.
  35. type Traffic struct {
  36. Tag string
  37. Up int64
  38. Down int64
  39. }
  40. type managed struct {
  41. proc *Process
  42. tag string
  43. fingerprint string
  44. metricsPort int
  45. lastUp int64
  46. lastDown int64
  47. haveLast bool
  48. }
  49. // Manager owns the set of running mtg processes keyed by inbound id.
  50. type Manager struct {
  51. mu sync.Mutex
  52. procs map[int]*managed
  53. // swept records that the one-time startup cleanup of orphaned mtg
  54. // processes (survivors of a previous x-ui run) has already run.
  55. swept bool
  56. }
  57. var (
  58. managerOnce sync.Once
  59. manager *Manager
  60. )
  61. // GetManager returns the process-wide mtg manager singleton.
  62. func GetManager() *Manager {
  63. managerOnce.Do(func() {
  64. manager = &Manager{procs: map[int]*managed{}}
  65. })
  66. return manager
  67. }
  68. // InstanceFromInbound derives a desired Instance from an mtproto inbound,
  69. // healing the FakeTLS secret so it always matches the configured domain.
  70. // Returns false when the inbound is not a usable mtproto inbound.
  71. func InstanceFromInbound(ib *model.Inbound) (Instance, bool) {
  72. if ib == nil || ib.Protocol != model.MTProto {
  73. return Instance{}, false
  74. }
  75. settings := ib.Settings
  76. if healed, ok := model.HealMtprotoSecret(settings); ok {
  77. settings = healed
  78. }
  79. var parsed struct {
  80. Secret string `json:"secret"`
  81. }
  82. if err := json.Unmarshal([]byte(settings), &parsed); err != nil {
  83. return Instance{}, false
  84. }
  85. if parsed.Secret == "" {
  86. return Instance{}, false
  87. }
  88. return Instance{
  89. Id: ib.Id,
  90. Tag: ib.Tag,
  91. Listen: ib.Listen,
  92. Port: ib.Port,
  93. Secret: parsed.Secret,
  94. }, true
  95. }
  96. // Ensure starts the mtg process for an instance, or restarts it when its
  97. // configuration changed. A no-op when the desired process is already running.
  98. func (m *Manager) Ensure(inst Instance) error {
  99. m.mu.Lock()
  100. defer m.mu.Unlock()
  101. m.sweepOrphansLocked()
  102. return m.ensureLocked(inst)
  103. }
  104. // sweepOrphansLocked kills mtg processes left running by a previous x-ui run,
  105. // exactly once per process lifetime and before any of our own mtg are started.
  106. // Because x-ui owns every mtg process, anything alive at this point is an orphan
  107. // that would otherwise keep holding an inbound port with a stale secret.
  108. func (m *Manager) sweepOrphansLocked() {
  109. if m.swept {
  110. return
  111. }
  112. m.swept = true
  113. if n := killStrayMtgProcesses(GetBinaryPath()); n > 0 {
  114. logger.Warningf("mtproto: terminated %d orphaned mtg process(es) from a previous run", n)
  115. }
  116. }
  117. func (m *Manager) ensureLocked(inst Instance) error {
  118. fp := inst.fingerprint()
  119. if cur, ok := m.procs[inst.Id]; ok {
  120. if cur.fingerprint == fp && cur.proc.IsRunning() {
  121. cur.tag = inst.Tag
  122. return nil
  123. }
  124. cur.proc.Stop()
  125. delete(m.procs, inst.Id)
  126. }
  127. metricsPort, err := freeLocalPort()
  128. if err != nil {
  129. return err
  130. }
  131. cfgPath := configPathForID(inst.Id)
  132. if err := writeConfig(cfgPath, inst.Secret, inst.bindTo(), metricsPort); err != nil {
  133. return err
  134. }
  135. proc := newProcess(cfgPath, fmt.Sprintf("inbound %d", inst.Id))
  136. if err := proc.Start(); err != nil {
  137. return err
  138. }
  139. m.procs[inst.Id] = &managed{
  140. proc: proc,
  141. tag: inst.Tag,
  142. fingerprint: fp,
  143. metricsPort: metricsPort,
  144. }
  145. logger.Infof("mtproto: started mtg for inbound %d on %s", inst.Id, inst.bindTo())
  146. return nil
  147. }
  148. // Remove stops and forgets the mtg process for an inbound id.
  149. func (m *Manager) Remove(id int) {
  150. m.mu.Lock()
  151. defer m.mu.Unlock()
  152. if cur, ok := m.procs[id]; ok {
  153. cur.proc.Stop()
  154. delete(m.procs, id)
  155. _ = os.Remove(configPathForID(id))
  156. logger.Infof("mtproto: stopped mtg for inbound %d", id)
  157. }
  158. }
  159. // Reconcile drives the running set toward the desired instances: it stops
  160. // processes that are no longer wanted and (re)starts the rest. Used at boot
  161. // and periodically to recover from crashes.
  162. func (m *Manager) Reconcile(desired []Instance) {
  163. m.mu.Lock()
  164. defer m.mu.Unlock()
  165. m.sweepOrphansLocked()
  166. want := make(map[int]struct{}, len(desired))
  167. for _, inst := range desired {
  168. want[inst.Id] = struct{}{}
  169. }
  170. for id, cur := range m.procs {
  171. if _, ok := want[id]; !ok {
  172. cur.proc.Stop()
  173. delete(m.procs, id)
  174. _ = os.Remove(configPathForID(id))
  175. }
  176. }
  177. for _, inst := range desired {
  178. if err := m.ensureLocked(inst); err != nil {
  179. logger.Warningf("mtproto: reconcile failed for inbound %d: %v", inst.Id, err)
  180. }
  181. }
  182. }
  183. // StopAll stops every managed mtg process. Called on panel shutdown.
  184. func (m *Manager) StopAll() {
  185. m.mu.Lock()
  186. defer m.mu.Unlock()
  187. for id, cur := range m.procs {
  188. _ = cur.proc.Stop()
  189. _ = os.Remove(configPathForID(id))
  190. delete(m.procs, id)
  191. }
  192. }
  193. // CollectTraffic scrapes each running mtg metrics endpoint and returns the
  194. // per-inbound byte deltas since the previous scrape.
  195. func (m *Manager) CollectTraffic() []Traffic {
  196. // Snapshot the state we need under the lock, then release before doing
  197. // network I/O so that Ensure/Reconcile/Remove are not blocked.
  198. type snap struct {
  199. id int
  200. metricsPort int
  201. tag string
  202. haveLast bool
  203. lastUp int64
  204. lastDown int64
  205. }
  206. m.mu.Lock()
  207. snaps := make([]snap, 0, len(m.procs))
  208. for id, cur := range m.procs {
  209. if cur.proc == nil || !cur.proc.IsRunning() {
  210. continue
  211. }
  212. snaps = append(snaps, snap{
  213. id: id,
  214. metricsPort: cur.metricsPort,
  215. tag: cur.tag,
  216. haveLast: cur.haveLast,
  217. lastUp: cur.lastUp,
  218. lastDown: cur.lastDown,
  219. })
  220. }
  221. m.mu.Unlock()
  222. out := make([]Traffic, 0, len(snaps))
  223. for _, s := range snaps {
  224. up, down, ok := scrapeTraffic(s.metricsPort)
  225. if !ok {
  226. continue
  227. }
  228. var du, dd int64
  229. if s.haveLast {
  230. du = up - s.lastUp
  231. dd = down - s.lastDown
  232. if du < 0 {
  233. du = 0
  234. }
  235. if dd < 0 {
  236. dd = 0
  237. }
  238. }
  239. // Re-acquire lock to persist the new baseline, but only if the entry
  240. // still exists (it may have been removed during the scrape).
  241. m.mu.Lock()
  242. if cur, ok := m.procs[s.id]; ok {
  243. cur.lastUp = up
  244. cur.lastDown = down
  245. cur.haveLast = true
  246. }
  247. m.mu.Unlock()
  248. if s.haveLast && (du > 0 || dd > 0) {
  249. out = append(out, Traffic{Tag: s.tag, Up: du, Down: dd})
  250. }
  251. }
  252. return out
  253. }
  254. func freeLocalPort() (int, error) {
  255. l, err := net.Listen("tcp", "127.0.0.1:0")
  256. if err != nil {
  257. return 0, err
  258. }
  259. defer l.Close()
  260. return l.Addr().(*net.TCPAddr).Port, nil
  261. }
  262. func writeConfig(path, secret, bindTo string, metricsPort int) error {
  263. if err := os.MkdirAll(configDir(), 0o750); err != nil {
  264. return err
  265. }
  266. content := fmt.Sprintf("secret = %q\nbind-to = %q\n\n[stats.prometheus]\nenabled = true\nbind-to = \"127.0.0.1:%d\"\nhttp-path = \"/metrics\"\nmetric-prefix = \"mtg\"\n",
  267. secret, bindTo, metricsPort)
  268. return os.WriteFile(path, []byte(content), 0o640)
  269. }
  270. // scrapeTraffic reads the mtg Prometheus metrics endpoint and sums byte
  271. // counters by direction. mtg exposes a traffic counter labelled with a
  272. // direction; "to_telegram" is treated as upload and "to_client" as download.
  273. // Best-effort: an unreachable endpoint or unrecognised format yields ok=false.
  274. func scrapeTraffic(port int) (up int64, down int64, ok bool) {
  275. client := http.Client{Timeout: 3 * time.Second}
  276. resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
  277. if err != nil {
  278. return 0, 0, false
  279. }
  280. defer resp.Body.Close()
  281. scanner := bufio.NewScanner(resp.Body)
  282. scanner.Buffer(make([]byte, 64*1024), 1024*1024)
  283. found := false
  284. for scanner.Scan() {
  285. line := strings.TrimSpace(scanner.Text())
  286. if line == "" || line[0] == '#' || !strings.Contains(line, "traffic") {
  287. continue
  288. }
  289. name, labels, value, perr := parseMetricLine(line)
  290. if perr != nil || !strings.HasPrefix(name, "mtg") {
  291. continue
  292. }
  293. switch labels["direction"] {
  294. case "to_telegram", "egress", "up":
  295. up += int64(value)
  296. case "to_client", "ingress", "down":
  297. down += int64(value)
  298. default:
  299. down += int64(value)
  300. }
  301. found = true
  302. }
  303. if err := scanner.Err(); err != nil {
  304. logger.Debug("mtproto: metrics scan error:", err)
  305. }
  306. return up, down, found
  307. }
  308. func parseMetricLine(line string) (name string, labels map[string]string, value float64, err error) {
  309. labels = map[string]string{}
  310. rest := line
  311. if brace := strings.IndexByte(line, '{'); brace >= 0 {
  312. name = line[:brace]
  313. end := strings.IndexByte(line, '}')
  314. if end < brace {
  315. return "", nil, 0, fmt.Errorf("malformed metric line")
  316. }
  317. for _, kv := range strings.Split(line[brace+1:end], ",") {
  318. eq := strings.IndexByte(kv, '=')
  319. if eq < 0 {
  320. continue
  321. }
  322. labels[strings.TrimSpace(kv[:eq])] = strings.Trim(strings.TrimSpace(kv[eq+1:]), `"`)
  323. }
  324. rest = strings.TrimSpace(line[end+1:])
  325. } else {
  326. fields := strings.Fields(line)
  327. if len(fields) < 2 {
  328. return "", nil, 0, fmt.Errorf("malformed metric line")
  329. }
  330. name = fields[0]
  331. rest = fields[1]
  332. }
  333. valFields := strings.Fields(rest)
  334. if len(valFields) == 0 {
  335. return "", nil, 0, fmt.Errorf("missing metric value")
  336. }
  337. value, err = strconv.ParseFloat(valFields[0], 64)
  338. if err != nil {
  339. return "", nil, 0, err
  340. }
  341. return name, labels, value, nil
  342. }