metric_history.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package service
  2. import (
  3. "encoding/gob"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/config"
  9. "github.com/mhsanaei/3x-ui/v3/logger"
  10. )
  11. // MetricSample is one point of any time-series we keep in memory.
  12. // The frontend deserializes both keys, so they must stay short.
  13. type MetricSample struct {
  14. T int64 `json:"t"`
  15. V float64 `json:"v"`
  16. }
  17. // metricCapacityDefault caps each ring buffer at ~5h worth of @2s samples
  18. // or ~25h worth of @10s samples. Plenty for the bucketed aggregation
  19. // view and small enough that the working set per metric stays under
  20. // ~150 KiB.
  21. const metricCapacityDefault = 9000
  22. // metricHistory is a thread-safe, in-memory ring buffer keyed by
  23. // arbitrary strings. Two singletons live below: one for system-wide
  24. // host metrics, one for per-node metrics. Keeping them in this file
  25. // (rather than scattered across services) makes the storage model
  26. // easy to reason about and avoids double-locking.
  27. type metricHistory struct {
  28. mu sync.Mutex
  29. metrics map[string][]MetricSample
  30. }
  31. func newMetricHistory() *metricHistory {
  32. return &metricHistory{metrics: map[string][]MetricSample{}}
  33. }
  34. // append stores a single sample for the given metric, deduping when
  35. // two appends happen within the same wall-clock second (which can
  36. // happen if the cron tick is faster than the metric's natural rate).
  37. func (h *metricHistory) append(metric string, t time.Time, v float64) {
  38. h.mu.Lock()
  39. defer h.mu.Unlock()
  40. buf := h.metrics[metric]
  41. p := MetricSample{T: t.Unix(), V: v}
  42. if n := len(buf); n > 0 && buf[n-1].T == p.T {
  43. buf[n-1] = p
  44. } else {
  45. buf = append(buf, p)
  46. }
  47. if len(buf) > metricCapacityDefault {
  48. buf = buf[len(buf)-metricCapacityDefault:]
  49. }
  50. h.metrics[metric] = buf
  51. }
  52. // drop removes the entire history for one metric. Used when a node is
  53. // deleted so its old samples don't linger forever in the singleton.
  54. func (h *metricHistory) drop(metric string) {
  55. h.mu.Lock()
  56. delete(h.metrics, metric)
  57. h.mu.Unlock()
  58. }
  59. // snapshot returns a deep copy of every series, safe to serialize without
  60. // holding the lock during disk I/O.
  61. func (h *metricHistory) snapshot() map[string][]MetricSample {
  62. h.mu.Lock()
  63. defer h.mu.Unlock()
  64. out := make(map[string][]MetricSample, len(h.metrics))
  65. for k, v := range h.metrics {
  66. cp := make([]MetricSample, len(v))
  67. copy(cp, v)
  68. out[k] = cp
  69. }
  70. return out
  71. }
  72. // restore replaces the in-memory series with a previously persisted set,
  73. // re-applying the per-series capacity cap so a tampered or oversized file
  74. // can't grow the working set unbounded.
  75. func (h *metricHistory) restore(data map[string][]MetricSample) {
  76. h.mu.Lock()
  77. defer h.mu.Unlock()
  78. for k, v := range data {
  79. if len(v) > metricCapacityDefault {
  80. v = v[len(v)-metricCapacityDefault:]
  81. }
  82. h.metrics[k] = v
  83. }
  84. }
  85. // aggregate returns up to maxPoints buckets of size bucketSeconds,
  86. // each bucket carrying the arithmetic mean of the underlying samples.
  87. // Bucket alignment is to absolute Unix-second boundaries so two
  88. // concurrent calls (e.g. two browser tabs) see identical x-axes.
  89. func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
  90. if bucketSeconds <= 0 || maxPoints <= 0 {
  91. return []map[string]any{}
  92. }
  93. cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
  94. h.mu.Lock()
  95. hist := h.metrics[metric]
  96. startIdx := 0
  97. for i := len(hist) - 1; i >= 0; i-- {
  98. if hist[i].T < cutoff {
  99. startIdx = i + 1
  100. break
  101. }
  102. }
  103. if startIdx >= len(hist) {
  104. h.mu.Unlock()
  105. return []map[string]any{}
  106. }
  107. tmp := make([]MetricSample, len(hist)-startIdx)
  108. copy(tmp, hist[startIdx:])
  109. h.mu.Unlock()
  110. if len(tmp) == 0 {
  111. return []map[string]any{}
  112. }
  113. bSize := int64(bucketSeconds)
  114. curBucket := (tmp[0].T / bSize) * bSize
  115. var out []map[string]any
  116. var acc []float64
  117. flush := func(ts int64) {
  118. if len(acc) == 0 {
  119. return
  120. }
  121. sum := 0.0
  122. for _, v := range acc {
  123. sum += v
  124. }
  125. out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
  126. acc = acc[:0]
  127. }
  128. for _, p := range tmp {
  129. b := (p.T / bSize) * bSize
  130. if b != curBucket {
  131. flush(curBucket)
  132. curBucket = b
  133. }
  134. acc = append(acc, p.V)
  135. }
  136. flush(curBucket)
  137. if len(out) > maxPoints {
  138. out = out[len(out)-maxPoints:]
  139. }
  140. if out == nil {
  141. return []map[string]any{}
  142. }
  143. return out
  144. }
  145. // systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
  146. // fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
  147. // per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
  148. // process-local — survival across panel restart is not required.
  149. var (
  150. systemMetrics = newMetricHistory()
  151. nodeMetrics = newMetricHistory()
  152. xrayMetrics = newMetricHistory()
  153. )
  154. // SystemMetricKeys lists the metric names ServerService writes on every
  155. // status sample. Exposed for documentation/test purposes; the
  156. // controller validates incoming names against an allow-list.
  157. var SystemMetricKeys = []string{
  158. "cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
  159. }
  160. // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
  161. var NodeMetricKeys = []string{"cpu", "mem"}
  162. // XrayMetricKeys lists series sourced from xray's /debug/vars expvar
  163. // endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
  164. // as the system metrics, but only when the xray config has a `metrics`
  165. // block configured.
  166. var XrayMetricKeys = []string{
  167. "xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
  168. }
  169. // systemMetricsStorePath is where the host time-series is persisted between
  170. // restarts. It lives next to the database so a single volume mount carries
  171. // both. Only systemMetrics is persisted — node and xray series are cheap to
  172. // rebuild and tied to live connections.
  173. func systemMetricsStorePath() string {
  174. return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
  175. }
  176. // PersistSystemMetrics writes the host time-series to disk via a temp file +
  177. // rename so a crash mid-write can't corrupt the previous snapshot. Called on a
  178. // timer and at shutdown.
  179. func PersistSystemMetrics() error {
  180. path := systemMetricsStorePath()
  181. tmp := path + ".tmp"
  182. f, err := os.Create(tmp)
  183. if err != nil {
  184. return err
  185. }
  186. if err := gob.NewEncoder(f).Encode(systemMetrics.snapshot()); err != nil {
  187. f.Close()
  188. os.Remove(tmp)
  189. return err
  190. }
  191. if err := f.Close(); err != nil {
  192. os.Remove(tmp)
  193. return err
  194. }
  195. return os.Rename(tmp, path)
  196. }
  197. // RestoreSystemMetrics loads a previously persisted host time-series on startup.
  198. // A missing file is not an error (first boot). Aggregation already windows by
  199. // time, so any gap from downtime is handled by the readers.
  200. func RestoreSystemMetrics() {
  201. path := systemMetricsStorePath()
  202. f, err := os.Open(path)
  203. if err != nil {
  204. if !os.IsNotExist(err) {
  205. logger.Warning("restore system metrics failed:", err)
  206. }
  207. return
  208. }
  209. defer f.Close()
  210. var data map[string][]MetricSample
  211. if err := gob.NewDecoder(f).Decode(&data); err != nil {
  212. logger.Warning("decode system metrics failed:", err)
  213. return
  214. }
  215. systemMetrics.restore(data)
  216. }