1
0

xray_metrics.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "regexp"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/mhsanaei/3x-ui/v3/logger"
  13. )
  14. type xrayMetricsState struct {
  15. Enabled bool `json:"enabled"`
  16. Listen string `json:"listen"`
  17. Reason string `json:"reason,omitempty"`
  18. }
  19. type ObsTagSnapshot struct {
  20. Tag string `json:"tag"`
  21. Alive bool `json:"alive"`
  22. Delay int64 `json:"delay"`
  23. LastSeenTime int64 `json:"lastSeenTime"`
  24. LastTryTime int64 `json:"lastTryTime"`
  25. UpdatedAt int64 `json:"updatedAt"`
  26. }
  27. type XrayMetricsService struct {
  28. settingService SettingService
  29. mu sync.RWMutex
  30. state xrayMetricsState
  31. client *http.Client
  32. obsByTag map[string]ObsTagSnapshot
  33. }
  34. var validObsTag = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
  35. func obsHistoryKey(tag string) string {
  36. return "xrObs." + tag + ".delay"
  37. }
  38. func newXrayMetricsClient() *http.Client {
  39. return &http.Client{Timeout: 1500 * time.Millisecond}
  40. }
  41. func (s *XrayMetricsService) getClient() *http.Client {
  42. s.mu.Lock()
  43. defer s.mu.Unlock()
  44. if s.client == nil {
  45. s.client = newXrayMetricsClient()
  46. }
  47. return s.client
  48. }
  49. func (s *XrayMetricsService) State() xrayMetricsState {
  50. s.mu.RLock()
  51. defer s.mu.RUnlock()
  52. return s.state
  53. }
  54. func (s *XrayMetricsService) AggregateMetric(metric string, bucketSeconds, maxPoints int) []map[string]any {
  55. return xrayMetrics.aggregate(metric, bucketSeconds, maxPoints)
  56. }
  57. func (s *XrayMetricsService) ObservatorySnapshot() []ObsTagSnapshot {
  58. s.mu.RLock()
  59. defer s.mu.RUnlock()
  60. out := make([]ObsTagSnapshot, 0, len(s.obsByTag))
  61. for _, v := range s.obsByTag {
  62. out = append(out, v)
  63. }
  64. sort.Slice(out, func(i, j int) bool { return out[i].Tag < out[j].Tag })
  65. return out
  66. }
  67. func (s *XrayMetricsService) HasObservatoryTag(tag string) bool {
  68. if !validObsTag.MatchString(tag) {
  69. return false
  70. }
  71. s.mu.RLock()
  72. defer s.mu.RUnlock()
  73. _, ok := s.obsByTag[tag]
  74. return ok
  75. }
  76. func (s *XrayMetricsService) AggregateObservatory(tag string, bucketSeconds, maxPoints int) []map[string]any {
  77. if !validObsTag.MatchString(tag) {
  78. return []map[string]any{}
  79. }
  80. return xrayMetrics.aggregate(obsHistoryKey(tag), bucketSeconds, maxPoints)
  81. }
  82. func (s *XrayMetricsService) discoverListen() (string, error) {
  83. tmpl, err := s.settingService.GetXrayConfigTemplate()
  84. if err != nil {
  85. return "", err
  86. }
  87. var parsed struct {
  88. Metrics *struct {
  89. Listen string `json:"listen"`
  90. } `json:"metrics"`
  91. }
  92. if err := json.Unmarshal([]byte(tmpl), &parsed); err != nil {
  93. return "", err
  94. }
  95. if parsed.Metrics == nil || strings.TrimSpace(parsed.Metrics.Listen) == "" {
  96. return "", nil
  97. }
  98. return strings.TrimSpace(parsed.Metrics.Listen), nil
  99. }
  100. type rawObsEntry struct {
  101. Alive bool `json:"alive"`
  102. Delay int64 `json:"delay"`
  103. LastSeenTime int64 `json:"last_seen_time"`
  104. LastTryTime int64 `json:"last_try_time"`
  105. OutboundTag string `json:"outbound_tag"`
  106. }
  107. func (s *XrayMetricsService) Sample(t time.Time) {
  108. listen, err := s.discoverListen()
  109. if err != nil {
  110. s.setState(xrayMetricsState{Reason: err.Error()})
  111. return
  112. }
  113. if listen == "" {
  114. s.setState(xrayMetricsState{Reason: "metrics block not configured in xray template"})
  115. return
  116. }
  117. ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
  118. defer cancel()
  119. url := fmt.Sprintf("http://%s/debug/vars", listen)
  120. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  121. if err != nil {
  122. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  123. return
  124. }
  125. resp, err := s.getClient().Do(req)
  126. if err != nil {
  127. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  128. return
  129. }
  130. defer resp.Body.Close()
  131. if resp.StatusCode != http.StatusOK {
  132. s.setState(xrayMetricsState{Listen: listen, Reason: fmt.Sprintf("HTTP %d", resp.StatusCode)})
  133. return
  134. }
  135. var payload struct {
  136. MemStats struct {
  137. HeapAlloc uint64 `json:"HeapAlloc"`
  138. Sys uint64 `json:"Sys"`
  139. HeapObjects uint64 `json:"HeapObjects"`
  140. NumGC uint32 `json:"NumGC"`
  141. PauseNs [256]uint64 `json:"PauseNs"`
  142. } `json:"memstats"`
  143. Observatory map[string]rawObsEntry `json:"observatory"`
  144. }
  145. if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
  146. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  147. return
  148. }
  149. xrayMetrics.append("xrAlloc", t, float64(payload.MemStats.HeapAlloc))
  150. xrayMetrics.append("xrSys", t, float64(payload.MemStats.Sys))
  151. xrayMetrics.append("xrHeapObjects", t, float64(payload.MemStats.HeapObjects))
  152. xrayMetrics.append("xrNumGC", t, float64(payload.MemStats.NumGC))
  153. var lastPause uint64
  154. if payload.MemStats.NumGC > 0 {
  155. idx := (payload.MemStats.NumGC + 255) % 256
  156. lastPause = payload.MemStats.PauseNs[idx]
  157. }
  158. xrayMetrics.append("xrPauseNs", t, float64(lastPause))
  159. s.applyObservatory(t, payload.Observatory)
  160. s.setState(xrayMetricsState{Enabled: true, Listen: listen})
  161. }
  162. func (s *XrayMetricsService) applyObservatory(t time.Time, entries map[string]rawObsEntry) {
  163. next := make(map[string]ObsTagSnapshot, len(entries))
  164. for key, e := range entries {
  165. tag := e.OutboundTag
  166. if tag == "" {
  167. tag = key
  168. }
  169. if !validObsTag.MatchString(tag) {
  170. continue
  171. }
  172. snap := ObsTagSnapshot{
  173. Tag: tag,
  174. Alive: e.Alive,
  175. Delay: e.Delay,
  176. LastSeenTime: e.LastSeenTime,
  177. LastTryTime: e.LastTryTime,
  178. UpdatedAt: t.Unix(),
  179. }
  180. next[tag] = snap
  181. xrayMetrics.append(obsHistoryKey(tag), t, float64(e.Delay))
  182. }
  183. s.mu.Lock()
  184. for tag := range s.obsByTag {
  185. if _, kept := next[tag]; !kept {
  186. xrayMetrics.drop(obsHistoryKey(tag))
  187. }
  188. }
  189. s.obsByTag = next
  190. s.mu.Unlock()
  191. }
  192. func (s *XrayMetricsService) setState(st xrayMetricsState) {
  193. s.mu.Lock()
  194. s.state = st
  195. s.mu.Unlock()
  196. if !st.Enabled && st.Reason != "" {
  197. logger.Debugf("xray metrics unavailable: %s", st.Reason)
  198. }
  199. }