1
0

metric_history.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package service
  2. import (
  3. "encoding/gob"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/config"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. )
  11. // MetricSample is one point of any time-series we keep in memory.
  12. // The frontend deserializes both keys, so they must stay short.
  13. type MetricSample struct {
  14. T int64 `json:"t"`
  15. V float64 `json:"v"`
  16. }
  17. // tierSpec defines one resolution layer of the rollup ladder: a fixed bucket
  18. // size in seconds and how many buckets to retain. window = resolution*capacity.
  19. type tierSpec struct {
  20. resolution int
  21. capacity int
  22. }
  23. // metricTiers is the rollup ladder applied to every series. High resolution is
  24. // kept only for the recent past; older samples roll up into progressively
  25. // coarser, cheaper layers (RRDtool-style). Per series this totals ~5700 samples
  26. // (~90 KiB) yet spans a live 2s view through ~7 days of history.
  27. var metricTiers = []tierSpec{
  28. {resolution: 2, capacity: 1800}, // 1h at 2s
  29. {resolution: 60, capacity: 2880}, // 48h at 1m
  30. {resolution: 600, capacity: 1008}, // 7d at 10m
  31. }
  32. // tierBuf is one fixed-resolution ring of a series. Samples land in an open
  33. // bucket and are averaged into the ring only when the next bucket begins, so a
  34. // coarse tier carries one mean per bucket instead of every raw point.
  35. type tierBuf struct {
  36. resolution int
  37. capacity int
  38. samples []MetricSample
  39. open bool
  40. openStart int64
  41. openSum float64
  42. openCount int
  43. }
  44. func (tb *tierBuf) add(unixSec int64, v float64) {
  45. res := int64(tb.resolution)
  46. b := (unixSec / res) * res
  47. if tb.open && b != tb.openStart {
  48. tb.flush()
  49. }
  50. tb.open = true
  51. tb.openStart = b
  52. tb.openSum += v
  53. tb.openCount++
  54. }
  55. func (tb *tierBuf) flush() {
  56. if tb.openCount == 0 {
  57. tb.open = false
  58. return
  59. }
  60. tb.samples = append(tb.samples, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
  61. if len(tb.samples) > tb.capacity {
  62. tb.samples = tb.samples[len(tb.samples)-tb.capacity:]
  63. }
  64. tb.open = false
  65. tb.openStart = 0
  66. tb.openSum = 0
  67. tb.openCount = 0
  68. }
  69. // readSamples returns a copy of the closed buckets plus the still-open one, so
  70. // the most recent point is visible before its bucket boundary closes.
  71. func (tb *tierBuf) readSamples() []MetricSample {
  72. out := make([]MetricSample, len(tb.samples), len(tb.samples)+1)
  73. copy(out, tb.samples)
  74. if tb.openCount > 0 {
  75. out = append(out, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
  76. }
  77. return out
  78. }
  79. // series is the rollup ladder for one metric: a sample is fed to every tier.
  80. type series struct {
  81. tiers []*tierBuf
  82. }
  83. func newSeries() *series {
  84. s := &series{tiers: make([]*tierBuf, len(metricTiers))}
  85. for i, spec := range metricTiers {
  86. s.tiers[i] = &tierBuf{resolution: spec.resolution, capacity: spec.capacity}
  87. }
  88. return s
  89. }
  90. func (s *series) add(unixSec int64, v float64) {
  91. for _, tb := range s.tiers {
  92. tb.add(unixSec, v)
  93. }
  94. }
  95. // pickTier returns the finest tier whose window covers spanSeconds, falling back
  96. // to the coarsest (longest-window) tier when nothing covers it.
  97. func (s *series) pickTier(spanSeconds int64) *tierBuf {
  98. for _, tb := range s.tiers {
  99. if int64(tb.resolution)*int64(tb.capacity) >= spanSeconds {
  100. return tb
  101. }
  102. }
  103. return s.tiers[len(s.tiers)-1]
  104. }
  105. // metricHistory is a thread-safe, in-memory store of tiered series keyed by
  106. // arbitrary strings. Three singletons live below: system-wide host metrics,
  107. // per-node metrics, and xray expvar metrics.
  108. type metricHistory struct {
  109. mu sync.Mutex
  110. series map[string]*series
  111. }
  112. func newMetricHistory() *metricHistory {
  113. return &metricHistory{series: map[string]*series{}}
  114. }
  115. // append stores a single sample for the given metric across all tiers.
  116. func (h *metricHistory) append(metric string, t time.Time, v float64) {
  117. h.mu.Lock()
  118. defer h.mu.Unlock()
  119. s := h.series[metric]
  120. if s == nil {
  121. s = newSeries()
  122. h.series[metric] = s
  123. }
  124. s.add(t.Unix(), v)
  125. }
  126. // drop removes the entire history for one metric. Used when a node is deleted so
  127. // its old samples don't linger forever in the singleton.
  128. func (h *metricHistory) drop(metric string) {
  129. h.mu.Lock()
  130. delete(h.series, metric)
  131. h.mu.Unlock()
  132. }
  133. // aggregate returns up to maxPoints buckets of size bucketSeconds, each carrying
  134. // the arithmetic mean of the underlying samples from the finest tier that covers
  135. // the requested span. Bucket alignment is to absolute Unix-second boundaries so
  136. // two concurrent calls see identical x-axes.
  137. func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
  138. empty := []map[string]any{}
  139. if bucketSeconds <= 0 || maxPoints <= 0 {
  140. return empty
  141. }
  142. span := int64(bucketSeconds) * int64(maxPoints)
  143. cutoff := time.Now().Unix() - span
  144. h.mu.Lock()
  145. s := h.series[metric]
  146. if s == nil {
  147. h.mu.Unlock()
  148. return empty
  149. }
  150. raw := s.pickTier(span).readSamples()
  151. h.mu.Unlock()
  152. startIdx := len(raw)
  153. for i := len(raw) - 1; i >= 0; i-- {
  154. if raw[i].T < cutoff {
  155. break
  156. }
  157. startIdx = i
  158. }
  159. tmp := raw[startIdx:]
  160. if len(tmp) == 0 {
  161. return empty
  162. }
  163. bSize := int64(bucketSeconds)
  164. curBucket := (tmp[0].T / bSize) * bSize
  165. var out []map[string]any
  166. var acc []float64
  167. flush := func(ts int64) {
  168. if len(acc) == 0 {
  169. return
  170. }
  171. sum := 0.0
  172. for _, v := range acc {
  173. sum += v
  174. }
  175. out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
  176. acc = acc[:0]
  177. }
  178. for _, p := range tmp {
  179. b := (p.T / bSize) * bSize
  180. if b != curBucket {
  181. flush(curBucket)
  182. curBucket = b
  183. }
  184. acc = append(acc, p.V)
  185. }
  186. flush(curBucket)
  187. if len(out) > maxPoints {
  188. out = out[len(out)-maxPoints:]
  189. }
  190. if out == nil {
  191. return empty
  192. }
  193. return out
  194. }
  195. // persistedTier and persistedSeries are the on-disk shape of a series. Tiers are
  196. // matched back by resolution on restore, so changing the ladder degrades
  197. // gracefully (unmatched layers are dropped) instead of corrupting state.
  198. type persistedTier struct {
  199. Resolution int
  200. Samples []MetricSample
  201. }
  202. type persistedSeries struct {
  203. Tiers []persistedTier
  204. }
  205. // snapshot returns a deep copy of every series' closed buckets, safe to
  206. // serialize without holding the lock during disk I/O.
  207. func (h *metricHistory) snapshot() map[string]persistedSeries {
  208. h.mu.Lock()
  209. defer h.mu.Unlock()
  210. out := make(map[string]persistedSeries, len(h.series))
  211. for k, s := range h.series {
  212. ps := persistedSeries{Tiers: make([]persistedTier, len(s.tiers))}
  213. for i, tb := range s.tiers {
  214. cp := make([]MetricSample, len(tb.samples))
  215. copy(cp, tb.samples)
  216. ps.Tiers[i] = persistedTier{Resolution: tb.resolution, Samples: cp}
  217. }
  218. out[k] = ps
  219. }
  220. return out
  221. }
  222. // restore replaces the in-memory series with a previously persisted set,
  223. // re-applying each tier's capacity cap so a tampered or oversized file can't grow
  224. // the working set unbounded.
  225. func (h *metricHistory) restore(data map[string]persistedSeries) {
  226. h.mu.Lock()
  227. defer h.mu.Unlock()
  228. for k, ps := range data {
  229. s := newSeries()
  230. for _, pt := range ps.Tiers {
  231. for _, tb := range s.tiers {
  232. if tb.resolution != pt.Resolution {
  233. continue
  234. }
  235. samples := pt.Samples
  236. if len(samples) > tb.capacity {
  237. samples = samples[len(samples)-tb.capacity:]
  238. }
  239. tb.samples = samples
  240. break
  241. }
  242. }
  243. h.series[k] = s
  244. }
  245. }
  246. // systemMetrics holds whole-host time series (cpu, mem, netUp, etc.) fed by
  247. // ServerService.RefreshStatus every 2s. nodeMetrics holds per-node CPU/Mem fed
  248. // by NodeHeartbeatJob. xrayMetrics holds xray expvar series. Only systemMetrics
  249. // is persisted; the others rebuild from live connections.
  250. var (
  251. systemMetrics = newMetricHistory()
  252. nodeMetrics = newMetricHistory()
  253. xrayMetrics = newMetricHistory()
  254. )
  255. // SystemMetricKeys lists the metric names ServerService writes on every status
  256. // sample. Exposed for documentation/test purposes; the controller validates
  257. // incoming names against an allow-list.
  258. var SystemMetricKeys = []string{
  259. "cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
  260. }
  261. // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
  262. var NodeMetricKeys = []string{"cpu", "mem", "netUp", "netDown"}
  263. // XrayMetricKeys lists series sourced from xray's /debug/vars expvar endpoint.
  264. var XrayMetricKeys = []string{
  265. "xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
  266. }
  267. // systemMetricsStorePath is where the host time-series is persisted between
  268. // restarts. It lives next to the database so a single volume mount carries both.
  269. func systemMetricsStorePath() string {
  270. return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
  271. }
  272. // PersistSystemMetrics writes the host time-series to disk via a temp file +
  273. // rename so a crash mid-write can't corrupt the previous snapshot. Called on a
  274. // timer and at shutdown.
  275. func PersistSystemMetrics() error {
  276. path := systemMetricsStorePath()
  277. tmp := path + ".tmp"
  278. f, err := os.Create(tmp)
  279. if err != nil {
  280. return err
  281. }
  282. if err := gob.NewEncoder(f).Encode(systemMetrics.snapshot()); err != nil {
  283. f.Close()
  284. os.Remove(tmp)
  285. return err
  286. }
  287. if err := f.Close(); err != nil {
  288. os.Remove(tmp)
  289. return err
  290. }
  291. return os.Rename(tmp, path)
  292. }
  293. // RestoreSystemMetrics loads a previously persisted host time-series on startup.
  294. // A missing file is not an error (first boot). A pre-tier flat snapshot is
  295. // migrated by replaying its samples through the rollup.
  296. func RestoreSystemMetrics() {
  297. path := systemMetricsStorePath()
  298. f, err := os.Open(path)
  299. if err != nil {
  300. if !os.IsNotExist(err) {
  301. logger.Warning("restore system metrics failed:", err)
  302. }
  303. return
  304. }
  305. var data map[string]persistedSeries
  306. decErr := gob.NewDecoder(f).Decode(&data)
  307. f.Close()
  308. if decErr == nil {
  309. systemMetrics.restore(data)
  310. return
  311. }
  312. if migrateLegacySystemMetrics(path) {
  313. return
  314. }
  315. logger.Warning("decode system metrics failed:", decErr)
  316. }
  317. // migrateLegacySystemMetrics loads a pre-tier flat snapshot
  318. // (map[string][]MetricSample) and replays it through append so the new tiers are
  319. // seeded from the existing history instead of starting empty.
  320. func migrateLegacySystemMetrics(path string) bool {
  321. f, err := os.Open(path)
  322. if err != nil {
  323. return false
  324. }
  325. defer f.Close()
  326. var legacy map[string][]MetricSample
  327. if err := gob.NewDecoder(f).Decode(&legacy); err != nil {
  328. return false
  329. }
  330. for metric, samples := range legacy {
  331. for _, p := range samples {
  332. systemMetrics.append(metric, time.Unix(p.T, 0), p.V)
  333. }
  334. }
  335. return true
  336. }