xray_metrics.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "regexp"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  13. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  14. )
  15. type xrayMetricsState struct {
  16. Enabled bool `json:"enabled"`
  17. Listen string `json:"listen"`
  18. Reason string `json:"reason,omitempty"`
  19. }
  20. type ObsTagSnapshot struct {
  21. Tag string `json:"tag"`
  22. Alive bool `json:"alive"`
  23. Delay int64 `json:"delay"`
  24. LastSeenTime int64 `json:"lastSeenTime"`
  25. LastTryTime int64 `json:"lastTryTime"`
  26. UpdatedAt int64 `json:"updatedAt"`
  27. }
  28. // eventBus is the shared bus for publishing observatory state-change events.
  29. // Set once during startup via SetEventBus; nil when no bus is configured.
  30. var eventBus *eventbus.Bus
  31. // SetEventBus assigns the global event bus used by applyObservatory to publish
  32. // outbound health transitions. Must be called once during startup before any
  33. // Sample tick runs.
  34. func SetEventBus(b *eventbus.Bus) { eventBus = b }
  35. type XrayMetricsService struct {
  36. settingService SettingService
  37. mu sync.RWMutex
  38. state xrayMetricsState
  39. client *http.Client
  40. obsByTag map[string]ObsTagSnapshot
  41. }
  42. var validObsTag = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
  43. func obsHistoryKey(tag string) string {
  44. return "xrObs." + tag + ".delay"
  45. }
  46. func newXrayMetricsClient() *http.Client {
  47. return &http.Client{Timeout: 1500 * time.Millisecond}
  48. }
  49. func (s *XrayMetricsService) getClient() *http.Client {
  50. s.mu.Lock()
  51. defer s.mu.Unlock()
  52. if s.client == nil {
  53. s.client = newXrayMetricsClient()
  54. }
  55. return s.client
  56. }
  57. func (s *XrayMetricsService) State() xrayMetricsState {
  58. s.mu.RLock()
  59. defer s.mu.RUnlock()
  60. return s.state
  61. }
  62. func (s *XrayMetricsService) AggregateMetric(metric string, bucketSeconds, maxPoints int) []map[string]any {
  63. return xrayMetrics.aggregate(metric, bucketSeconds, maxPoints)
  64. }
  65. func (s *XrayMetricsService) ObservatorySnapshot() []ObsTagSnapshot {
  66. s.mu.RLock()
  67. defer s.mu.RUnlock()
  68. out := make([]ObsTagSnapshot, 0, len(s.obsByTag))
  69. for _, v := range s.obsByTag {
  70. out = append(out, v)
  71. }
  72. sort.Slice(out, func(i, j int) bool { return out[i].Tag < out[j].Tag })
  73. return out
  74. }
  75. func (s *XrayMetricsService) HasObservatoryTag(tag string) bool {
  76. if !validObsTag.MatchString(tag) {
  77. return false
  78. }
  79. s.mu.RLock()
  80. defer s.mu.RUnlock()
  81. _, ok := s.obsByTag[tag]
  82. return ok
  83. }
  84. func (s *XrayMetricsService) AggregateObservatory(tag string, bucketSeconds, maxPoints int) []map[string]any {
  85. if !validObsTag.MatchString(tag) {
  86. return []map[string]any{}
  87. }
  88. return xrayMetrics.aggregate(obsHistoryKey(tag), bucketSeconds, maxPoints)
  89. }
  90. func (s *XrayMetricsService) discoverListen() (string, error) {
  91. tmpl, err := s.settingService.GetXrayConfigTemplate()
  92. if err != nil {
  93. return "", err
  94. }
  95. var parsed struct {
  96. Metrics *struct {
  97. Listen string `json:"listen"`
  98. } `json:"metrics"`
  99. }
  100. if err := json.Unmarshal([]byte(tmpl), &parsed); err != nil {
  101. return "", err
  102. }
  103. if parsed.Metrics == nil || strings.TrimSpace(parsed.Metrics.Listen) == "" {
  104. return "", nil
  105. }
  106. return strings.TrimSpace(parsed.Metrics.Listen), nil
  107. }
  108. type rawObsEntry struct {
  109. Alive bool `json:"alive"`
  110. Delay int64 `json:"delay"`
  111. LastSeenTime int64 `json:"last_seen_time"`
  112. LastTryTime int64 `json:"last_try_time"`
  113. OutboundTag string `json:"outbound_tag"`
  114. }
  115. func (s *XrayMetricsService) Sample(t time.Time) {
  116. listen, err := s.discoverListen()
  117. if err != nil {
  118. s.setState(xrayMetricsState{Reason: err.Error()})
  119. return
  120. }
  121. if listen == "" {
  122. s.setState(xrayMetricsState{Reason: "metrics block not configured in xray template"})
  123. return
  124. }
  125. ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
  126. defer cancel()
  127. url := fmt.Sprintf("http://%s/debug/vars", listen)
  128. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  129. if err != nil {
  130. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  131. return
  132. }
  133. resp, err := s.getClient().Do(req)
  134. if err != nil {
  135. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  136. return
  137. }
  138. defer resp.Body.Close()
  139. if resp.StatusCode != http.StatusOK {
  140. s.setState(xrayMetricsState{Listen: listen, Reason: fmt.Sprintf("HTTP %d", resp.StatusCode)})
  141. return
  142. }
  143. var payload struct {
  144. MemStats struct {
  145. HeapAlloc uint64 `json:"HeapAlloc"`
  146. Sys uint64 `json:"Sys"`
  147. HeapObjects uint64 `json:"HeapObjects"`
  148. NumGC uint32 `json:"NumGC"`
  149. PauseNs [256]uint64 `json:"PauseNs"`
  150. } `json:"memstats"`
  151. Observatory map[string]rawObsEntry `json:"observatory"`
  152. }
  153. if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
  154. s.setState(xrayMetricsState{Listen: listen, Reason: err.Error()})
  155. return
  156. }
  157. xrayMetrics.append("xrAlloc", t, float64(payload.MemStats.HeapAlloc))
  158. xrayMetrics.append("xrSys", t, float64(payload.MemStats.Sys))
  159. xrayMetrics.append("xrHeapObjects", t, float64(payload.MemStats.HeapObjects))
  160. xrayMetrics.append("xrNumGC", t, float64(payload.MemStats.NumGC))
  161. var lastPause uint64
  162. if payload.MemStats.NumGC > 0 {
  163. idx := (payload.MemStats.NumGC + 255) % 256
  164. lastPause = payload.MemStats.PauseNs[idx]
  165. }
  166. xrayMetrics.append("xrPauseNs", t, float64(lastPause))
  167. s.applyObservatory(t, payload.Observatory)
  168. s.setState(xrayMetricsState{Enabled: true, Listen: listen})
  169. }
  170. func (s *XrayMetricsService) applyObservatory(t time.Time, entries map[string]rawObsEntry) {
  171. next := make(map[string]ObsTagSnapshot, len(entries))
  172. for key, e := range entries {
  173. tag := e.OutboundTag
  174. if tag == "" {
  175. tag = key
  176. }
  177. if !validObsTag.MatchString(tag) {
  178. continue
  179. }
  180. snap := ObsTagSnapshot{
  181. Tag: tag,
  182. Alive: e.Alive,
  183. Delay: e.Delay,
  184. LastSeenTime: e.LastSeenTime,
  185. LastTryTime: e.LastTryTime,
  186. UpdatedAt: t.Unix(),
  187. }
  188. next[tag] = snap
  189. xrayMetrics.append(obsHistoryKey(tag), t, float64(e.Delay))
  190. }
  191. s.mu.Lock()
  192. // Detect transitions and publish events
  193. if eventBus != nil {
  194. // Check existing tags for state changes
  195. for tag, old := range s.obsByTag {
  196. cur, exists := next[tag]
  197. if !exists {
  198. // Tag disappeared from observatory — skip, not a real failure
  199. continue
  200. }
  201. if old.Alive && !cur.Alive {
  202. errMsg := ""
  203. if cur.Delay < 0 {
  204. errMsg = "probe failed"
  205. }
  206. eventBus.Publish(eventbus.Event{
  207. Type: eventbus.EventOutboundDown,
  208. Source: tag,
  209. Data: &eventbus.OutboundHealthData{Delay: cur.Delay, Error: errMsg},
  210. })
  211. } else if !old.Alive && cur.Alive {
  212. eventBus.Publish(eventbus.Event{
  213. Type: eventbus.EventOutboundUp,
  214. Source: tag,
  215. Data: &eventbus.OutboundHealthData{Delay: cur.Delay},
  216. })
  217. }
  218. }
  219. }
  220. for tag := range s.obsByTag {
  221. if _, kept := next[tag]; !kept {
  222. xrayMetrics.drop(obsHistoryKey(tag))
  223. }
  224. }
  225. s.obsByTag = next
  226. s.mu.Unlock()
  227. }
  228. func (s *XrayMetricsService) setState(st xrayMetricsState) {
  229. s.mu.Lock()
  230. s.state = st
  231. s.mu.Unlock()
  232. if !st.Enabled && st.Reason != "" {
  233. logger.Debugf("xray metrics unavailable: %s", st.Reason)
  234. }
  235. }