manager.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package mtproto
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v3/database/model"
  14. "github.com/mhsanaei/3x-ui/v3/logger"
  15. )
  16. // Instance is the desired runtime configuration of one mtproto inbound.
  17. type Instance struct {
  18. Id int
  19. Tag string
  20. Listen string
  21. Port int
  22. Secret string
  23. }
  24. func (inst Instance) bindTo() string {
  25. listen := inst.Listen
  26. if listen == "" {
  27. listen = "0.0.0.0"
  28. }
  29. return fmt.Sprintf("%s:%d", listen, inst.Port)
  30. }
  31. func (inst Instance) fingerprint() string {
  32. return fmt.Sprintf("%s|%s", inst.bindTo(), inst.Secret)
  33. }
  34. // Traffic is a per-inbound traffic delta scraped from an mtg metrics endpoint.
  35. type Traffic struct {
  36. Tag string
  37. Up int64
  38. Down int64
  39. }
  40. type managed struct {
  41. proc *Process
  42. tag string
  43. fingerprint string
  44. metricsPort int
  45. lastUp int64
  46. lastDown int64
  47. haveLast bool
  48. }
  49. // Manager owns the set of running mtg processes keyed by inbound id.
  50. type Manager struct {
  51. mu sync.Mutex
  52. procs map[int]*managed
  53. }
  54. var (
  55. managerOnce sync.Once
  56. manager *Manager
  57. )
  58. // GetManager returns the process-wide mtg manager singleton.
  59. func GetManager() *Manager {
  60. managerOnce.Do(func() {
  61. manager = &Manager{procs: map[int]*managed{}}
  62. })
  63. return manager
  64. }
  65. // InstanceFromInbound derives a desired Instance from an mtproto inbound,
  66. // healing the FakeTLS secret so it always matches the configured domain.
  67. // Returns false when the inbound is not a usable mtproto inbound.
  68. func InstanceFromInbound(ib *model.Inbound) (Instance, bool) {
  69. if ib == nil || ib.Protocol != model.MTProto {
  70. return Instance{}, false
  71. }
  72. settings := ib.Settings
  73. if healed, ok := model.HealMtprotoSecret(settings); ok {
  74. settings = healed
  75. }
  76. var parsed struct {
  77. Secret string `json:"secret"`
  78. }
  79. if err := json.Unmarshal([]byte(settings), &parsed); err != nil {
  80. return Instance{}, false
  81. }
  82. if parsed.Secret == "" {
  83. return Instance{}, false
  84. }
  85. return Instance{
  86. Id: ib.Id,
  87. Tag: ib.Tag,
  88. Listen: ib.Listen,
  89. Port: ib.Port,
  90. Secret: parsed.Secret,
  91. }, true
  92. }
  93. // Ensure starts the mtg process for an instance, or restarts it when its
  94. // configuration changed. A no-op when the desired process is already running.
  95. func (m *Manager) Ensure(inst Instance) error {
  96. m.mu.Lock()
  97. defer m.mu.Unlock()
  98. return m.ensureLocked(inst)
  99. }
  100. func (m *Manager) ensureLocked(inst Instance) error {
  101. fp := inst.fingerprint()
  102. if cur, ok := m.procs[inst.Id]; ok {
  103. if cur.fingerprint == fp && cur.proc.IsRunning() {
  104. cur.tag = inst.Tag
  105. return nil
  106. }
  107. cur.proc.Stop()
  108. delete(m.procs, inst.Id)
  109. }
  110. metricsPort, err := freeLocalPort()
  111. if err != nil {
  112. return err
  113. }
  114. cfgPath := configPathForID(inst.Id)
  115. if err := writeConfig(cfgPath, inst.Secret, inst.bindTo(), metricsPort); err != nil {
  116. return err
  117. }
  118. proc := newProcess(cfgPath)
  119. if err := proc.Start(); err != nil {
  120. return err
  121. }
  122. m.procs[inst.Id] = &managed{
  123. proc: proc,
  124. tag: inst.Tag,
  125. fingerprint: fp,
  126. metricsPort: metricsPort,
  127. }
  128. logger.Info("mtproto: started mtg for inbound", inst.Id, "on", inst.bindTo())
  129. return nil
  130. }
  131. // Remove stops and forgets the mtg process for an inbound id.
  132. func (m *Manager) Remove(id int) {
  133. m.mu.Lock()
  134. defer m.mu.Unlock()
  135. if cur, ok := m.procs[id]; ok {
  136. cur.proc.Stop()
  137. delete(m.procs, id)
  138. _ = os.Remove(configPathForID(id))
  139. logger.Info("mtproto: stopped mtg for inbound", id)
  140. }
  141. }
  142. // Reconcile drives the running set toward the desired instances: it stops
  143. // processes that are no longer wanted and (re)starts the rest. Used at boot
  144. // and periodically to recover from crashes.
  145. func (m *Manager) Reconcile(desired []Instance) {
  146. m.mu.Lock()
  147. defer m.mu.Unlock()
  148. want := make(map[int]struct{}, len(desired))
  149. for _, inst := range desired {
  150. want[inst.Id] = struct{}{}
  151. }
  152. for id, cur := range m.procs {
  153. if _, ok := want[id]; !ok {
  154. cur.proc.Stop()
  155. delete(m.procs, id)
  156. _ = os.Remove(configPathForID(id))
  157. }
  158. }
  159. for _, inst := range desired {
  160. if err := m.ensureLocked(inst); err != nil {
  161. logger.Warning("mtproto: reconcile failed for inbound", inst.Id, ":", err)
  162. }
  163. }
  164. }
  165. // StopAll stops every managed mtg process. Called on panel shutdown.
  166. func (m *Manager) StopAll() {
  167. m.mu.Lock()
  168. defer m.mu.Unlock()
  169. for id, cur := range m.procs {
  170. cur.proc.Stop()
  171. delete(m.procs, id)
  172. }
  173. }
  174. // CollectTraffic scrapes each running mtg metrics endpoint and returns the
  175. // per-inbound byte deltas since the previous scrape.
  176. func (m *Manager) CollectTraffic() []Traffic {
  177. m.mu.Lock()
  178. defer m.mu.Unlock()
  179. out := make([]Traffic, 0, len(m.procs))
  180. for _, cur := range m.procs {
  181. if cur.proc == nil || !cur.proc.IsRunning() {
  182. continue
  183. }
  184. up, down, ok := scrapeTraffic(cur.metricsPort)
  185. if !ok {
  186. continue
  187. }
  188. if cur.haveLast {
  189. du := up - cur.lastUp
  190. dd := down - cur.lastDown
  191. if du < 0 {
  192. du = 0
  193. }
  194. if dd < 0 {
  195. dd = 0
  196. }
  197. if du > 0 || dd > 0 {
  198. out = append(out, Traffic{Tag: cur.tag, Up: du, Down: dd})
  199. }
  200. }
  201. cur.lastUp = up
  202. cur.lastDown = down
  203. cur.haveLast = true
  204. }
  205. return out
  206. }
  207. func freeLocalPort() (int, error) {
  208. l, err := net.Listen("tcp", "127.0.0.1:0")
  209. if err != nil {
  210. return 0, err
  211. }
  212. defer l.Close()
  213. return l.Addr().(*net.TCPAddr).Port, nil
  214. }
  215. func writeConfig(path, secret, bindTo string, metricsPort int) error {
  216. if err := os.MkdirAll(configDir(), 0o750); err != nil {
  217. return err
  218. }
  219. content := fmt.Sprintf("secret = %q\nbind-to = %q\n\n[stats.prometheus]\nenabled = true\nbind-to = \"127.0.0.1:%d\"\nhttp-path = \"/metrics\"\nmetric-prefix = \"mtg\"\n",
  220. secret, bindTo, metricsPort)
  221. return os.WriteFile(path, []byte(content), 0o640)
  222. }
  223. // scrapeTraffic reads the mtg Prometheus metrics endpoint and sums byte
  224. // counters by direction. mtg exposes a traffic counter labelled with a
  225. // direction; "to_telegram" is treated as upload and "to_client" as download.
  226. // Best-effort: an unreachable endpoint or unrecognised format yields ok=false.
  227. func scrapeTraffic(port int) (up int64, down int64, ok bool) {
  228. client := http.Client{Timeout: 3 * time.Second}
  229. resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
  230. if err != nil {
  231. return 0, 0, false
  232. }
  233. defer resp.Body.Close()
  234. scanner := bufio.NewScanner(resp.Body)
  235. scanner.Buffer(make([]byte, 64*1024), 1024*1024)
  236. found := false
  237. for scanner.Scan() {
  238. line := strings.TrimSpace(scanner.Text())
  239. if line == "" || line[0] == '#' || !strings.Contains(line, "traffic") {
  240. continue
  241. }
  242. name, labels, value, perr := parseMetricLine(line)
  243. if perr != nil || !strings.HasPrefix(name, "mtg") {
  244. continue
  245. }
  246. switch labels["direction"] {
  247. case "to_telegram", "egress", "up":
  248. up += int64(value)
  249. case "to_client", "ingress", "down":
  250. down += int64(value)
  251. default:
  252. down += int64(value)
  253. }
  254. found = true
  255. }
  256. if err := scanner.Err(); err != nil {
  257. logger.Debug("mtproto: metrics scan error:", err)
  258. }
  259. return up, down, found
  260. }
  261. func parseMetricLine(line string) (name string, labels map[string]string, value float64, err error) {
  262. labels = map[string]string{}
  263. rest := line
  264. if brace := strings.IndexByte(line, '{'); brace >= 0 {
  265. name = line[:brace]
  266. end := strings.IndexByte(line, '}')
  267. if end < brace {
  268. return "", nil, 0, fmt.Errorf("malformed metric line")
  269. }
  270. for _, kv := range strings.Split(line[brace+1:end], ",") {
  271. eq := strings.IndexByte(kv, '=')
  272. if eq < 0 {
  273. continue
  274. }
  275. labels[strings.TrimSpace(kv[:eq])] = strings.Trim(strings.TrimSpace(kv[eq+1:]), `"`)
  276. }
  277. rest = strings.TrimSpace(line[end+1:])
  278. } else {
  279. fields := strings.Fields(line)
  280. if len(fields) < 2 {
  281. return "", nil, 0, fmt.Errorf("malformed metric line")
  282. }
  283. name = fields[0]
  284. rest = fields[1]
  285. }
  286. valFields := strings.Fields(rest)
  287. if len(valFields) == 0 {
  288. return "", nil, 0, fmt.Errorf("missing metric value")
  289. }
  290. value, err = strconv.ParseFloat(valFields[0], 64)
  291. if err != nil {
  292. return "", nil, 0, err
  293. }
  294. return name, labels, value, nil
  295. }