metric_history.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package service
  2. import (
  3. "encoding/gob"
  4. "os"
  5. "path/filepath"
  6. "slices"
  7. "sync"
  8. "time"
  9. "github.com/mhsanaei/3x-ui/v3/internal/config"
  10. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  11. )
  12. // MetricSample is one point of any time-series we keep in memory.
  13. // The frontend deserializes both keys, so they must stay short.
  14. type MetricSample struct {
  15. T int64 `json:"t"`
  16. V float64 `json:"v"`
  17. }
  18. // metricCapacityDefault caps each ring buffer at ~5h worth of @2s samples
  19. // or ~25h worth of @10s samples. Plenty for the bucketed aggregation
  20. // view and small enough that the working set per metric stays under
  21. // ~150 KiB.
  22. const metricCapacityDefault = 9000
  23. // metricHistory is a thread-safe, in-memory ring buffer keyed by
  24. // arbitrary strings. Two singletons live below: one for system-wide
  25. // host metrics, one for per-node metrics. Keeping them in this file
  26. // (rather than scattered across services) makes the storage model
  27. // easy to reason about and avoids double-locking.
  28. type metricHistory struct {
  29. mu sync.Mutex
  30. metrics map[string][]MetricSample
  31. }
  32. func newMetricHistory() *metricHistory {
  33. return &metricHistory{metrics: map[string][]MetricSample{}}
  34. }
  35. // append stores a single sample for the given metric, deduping when
  36. // two appends happen within the same wall-clock second (which can
  37. // happen if the cron tick is faster than the metric's natural rate).
  38. func (h *metricHistory) append(metric string, t time.Time, v float64) {
  39. h.mu.Lock()
  40. defer h.mu.Unlock()
  41. buf := h.metrics[metric]
  42. p := MetricSample{T: t.Unix(), V: v}
  43. if n := len(buf); n > 0 && buf[n-1].T == p.T {
  44. buf[n-1] = p
  45. } else {
  46. buf = append(buf, p)
  47. }
  48. if len(buf) > metricCapacityDefault {
  49. buf = buf[len(buf)-metricCapacityDefault:]
  50. }
  51. h.metrics[metric] = buf
  52. }
  53. // drop removes the entire history for one metric. Used when a node is
  54. // deleted so its old samples don't linger forever in the singleton.
  55. func (h *metricHistory) drop(metric string) {
  56. h.mu.Lock()
  57. delete(h.metrics, metric)
  58. h.mu.Unlock()
  59. }
  60. // snapshot returns a deep copy of every series, safe to serialize without
  61. // holding the lock during disk I/O.
  62. func (h *metricHistory) snapshot() map[string][]MetricSample {
  63. h.mu.Lock()
  64. defer h.mu.Unlock()
  65. out := make(map[string][]MetricSample, len(h.metrics))
  66. for k, v := range h.metrics {
  67. cp := make([]MetricSample, len(v))
  68. copy(cp, v)
  69. out[k] = cp
  70. }
  71. return out
  72. }
  73. // restore replaces the in-memory series with a previously persisted set,
  74. // re-applying the per-series capacity cap so a tampered or oversized file
  75. // can't grow the working set unbounded.
  76. func (h *metricHistory) restore(data map[string][]MetricSample) {
  77. h.mu.Lock()
  78. defer h.mu.Unlock()
  79. for k, v := range data {
  80. if len(v) > metricCapacityDefault {
  81. v = v[len(v)-metricCapacityDefault:]
  82. }
  83. h.metrics[k] = v
  84. }
  85. }
  86. // aggregate returns up to maxPoints buckets of size bucketSeconds,
  87. // each bucket carrying the arithmetic mean of the underlying samples.
  88. // Bucket alignment is to absolute Unix-second boundaries so two
  89. // concurrent calls (e.g. two browser tabs) see identical x-axes.
  90. func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
  91. if bucketSeconds <= 0 || maxPoints <= 0 {
  92. return []map[string]any{}
  93. }
  94. cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
  95. h.mu.Lock()
  96. hist := h.metrics[metric]
  97. startIdx := 0
  98. for i, h := range slices.Backward(hist) {
  99. if h.T < cutoff {
  100. startIdx = i + 1
  101. break
  102. }
  103. }
  104. if startIdx >= len(hist) {
  105. h.mu.Unlock()
  106. return []map[string]any{}
  107. }
  108. tmp := make([]MetricSample, len(hist)-startIdx)
  109. copy(tmp, hist[startIdx:])
  110. h.mu.Unlock()
  111. if len(tmp) == 0 {
  112. return []map[string]any{}
  113. }
  114. bSize := int64(bucketSeconds)
  115. curBucket := (tmp[0].T / bSize) * bSize
  116. var out []map[string]any
  117. var acc []float64
  118. flush := func(ts int64) {
  119. if len(acc) == 0 {
  120. return
  121. }
  122. sum := 0.0
  123. for _, v := range acc {
  124. sum += v
  125. }
  126. out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
  127. acc = acc[:0]
  128. }
  129. for _, p := range tmp {
  130. b := (p.T / bSize) * bSize
  131. if b != curBucket {
  132. flush(curBucket)
  133. curBucket = b
  134. }
  135. acc = append(acc, p.V)
  136. }
  137. flush(curBucket)
  138. if len(out) > maxPoints {
  139. out = out[len(out)-maxPoints:]
  140. }
  141. if out == nil {
  142. return []map[string]any{}
  143. }
  144. return out
  145. }
  146. // systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
  147. // fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
  148. // per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
  149. // process-local — survival across panel restart is not required.
  150. var (
  151. systemMetrics = newMetricHistory()
  152. nodeMetrics = newMetricHistory()
  153. xrayMetrics = newMetricHistory()
  154. )
  155. // SystemMetricKeys lists the metric names ServerService writes on every
  156. // status sample. Exposed for documentation/test purposes; the
  157. // controller validates incoming names against an allow-list.
  158. var SystemMetricKeys = []string{
  159. "cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
  160. }
  161. // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
  162. var NodeMetricKeys = []string{"cpu", "mem", "netUp", "netDown"}
  163. // XrayMetricKeys lists series sourced from xray's /debug/vars expvar
  164. // endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
  165. // as the system metrics, but only when the xray config has a `metrics`
  166. // block configured.
  167. var XrayMetricKeys = []string{
  168. "xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
  169. }
  170. // systemMetricsStorePath is where the host time-series is persisted between
  171. // restarts. It lives next to the database so a single volume mount carries
  172. // both. Only systemMetrics is persisted — node and xray series are cheap to
  173. // rebuild and tied to live connections.
  174. func systemMetricsStorePath() string {
  175. return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
  176. }
  177. // PersistSystemMetrics writes the host time-series to disk via a temp file +
  178. // rename so a crash mid-write can't corrupt the previous snapshot. Called on a
  179. // timer and at shutdown.
  180. func PersistSystemMetrics() error {
  181. path := systemMetricsStorePath()
  182. tmp := path + ".tmp"
  183. f, err := os.Create(tmp)
  184. if err != nil {
  185. return err
  186. }
  187. if err := gob.NewEncoder(f).Encode(systemMetrics.snapshot()); err != nil {
  188. f.Close()
  189. os.Remove(tmp)
  190. return err
  191. }
  192. if err := f.Close(); err != nil {
  193. os.Remove(tmp)
  194. return err
  195. }
  196. return os.Rename(tmp, path)
  197. }
  198. // RestoreSystemMetrics loads a previously persisted host time-series on startup.
  199. // A missing file is not an error (first boot). Aggregation already windows by
  200. // time, so any gap from downtime is handled by the readers.
  201. func RestoreSystemMetrics() {
  202. path := systemMetricsStorePath()
  203. f, err := os.Open(path)
  204. if err != nil {
  205. if !os.IsNotExist(err) {
  206. logger.Warning("restore system metrics failed:", err)
  207. }
  208. return
  209. }
  210. defer f.Close()
  211. var data map[string][]MetricSample
  212. if err := gob.NewDecoder(f).Decode(&data); err != nil {
  213. logger.Warning("decode system metrics failed:", err)
  214. return
  215. }
  216. systemMetrics.restore(data)
  217. }