1
0

metric_history.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package service
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // MetricSample is one point of any time-series we keep in memory.
  7. // The frontend deserializes both keys, so they must stay short.
  8. type MetricSample struct {
  9. T int64 `json:"t"`
  10. V float64 `json:"v"`
  11. }
  12. // metricCapacityDefault caps each ring buffer at ~5h worth of @2s samples
  13. // or ~25h worth of @10s samples. Plenty for the bucketed aggregation
  14. // view and small enough that the working set per metric stays under
  15. // ~150 KiB.
  16. const metricCapacityDefault = 9000
  17. // metricHistory is a thread-safe, in-memory ring buffer keyed by
  18. // arbitrary strings. Two singletons live below: one for system-wide
  19. // host metrics, one for per-node metrics. Keeping them in this file
  20. // (rather than scattered across services) makes the storage model
  21. // easy to reason about and avoids double-locking.
  22. type metricHistory struct {
  23. mu sync.Mutex
  24. metrics map[string][]MetricSample
  25. }
  26. func newMetricHistory() *metricHistory {
  27. return &metricHistory{metrics: map[string][]MetricSample{}}
  28. }
  29. // append stores a single sample for the given metric, deduping when
  30. // two appends happen within the same wall-clock second (which can
  31. // happen if the cron tick is faster than the metric's natural rate).
  32. func (h *metricHistory) append(metric string, t time.Time, v float64) {
  33. h.mu.Lock()
  34. defer h.mu.Unlock()
  35. buf := h.metrics[metric]
  36. p := MetricSample{T: t.Unix(), V: v}
  37. if n := len(buf); n > 0 && buf[n-1].T == p.T {
  38. buf[n-1] = p
  39. } else {
  40. buf = append(buf, p)
  41. }
  42. if len(buf) > metricCapacityDefault {
  43. buf = buf[len(buf)-metricCapacityDefault:]
  44. }
  45. h.metrics[metric] = buf
  46. }
  47. // drop removes the entire history for one metric. Used when a node is
  48. // deleted so its old samples don't linger forever in the singleton.
  49. func (h *metricHistory) drop(metric string) {
  50. h.mu.Lock()
  51. delete(h.metrics, metric)
  52. h.mu.Unlock()
  53. }
  54. // aggregate returns up to maxPoints buckets of size bucketSeconds,
  55. // each bucket carrying the arithmetic mean of the underlying samples.
  56. // Bucket alignment is to absolute Unix-second boundaries so two
  57. // concurrent calls (e.g. two browser tabs) see identical x-axes.
  58. func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
  59. if bucketSeconds <= 0 || maxPoints <= 0 {
  60. return []map[string]any{}
  61. }
  62. cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
  63. h.mu.Lock()
  64. hist := h.metrics[metric]
  65. startIdx := 0
  66. for i := len(hist) - 1; i >= 0; i-- {
  67. if hist[i].T < cutoff {
  68. startIdx = i + 1
  69. break
  70. }
  71. }
  72. if startIdx >= len(hist) {
  73. h.mu.Unlock()
  74. return []map[string]any{}
  75. }
  76. tmp := make([]MetricSample, len(hist)-startIdx)
  77. copy(tmp, hist[startIdx:])
  78. h.mu.Unlock()
  79. if len(tmp) == 0 {
  80. return []map[string]any{}
  81. }
  82. bSize := int64(bucketSeconds)
  83. curBucket := (tmp[0].T / bSize) * bSize
  84. var out []map[string]any
  85. var acc []float64
  86. flush := func(ts int64) {
  87. if len(acc) == 0 {
  88. return
  89. }
  90. sum := 0.0
  91. for _, v := range acc {
  92. sum += v
  93. }
  94. out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
  95. acc = acc[:0]
  96. }
  97. for _, p := range tmp {
  98. b := (p.T / bSize) * bSize
  99. if b != curBucket {
  100. flush(curBucket)
  101. curBucket = b
  102. }
  103. acc = append(acc, p.V)
  104. }
  105. flush(curBucket)
  106. if len(out) > maxPoints {
  107. out = out[len(out)-maxPoints:]
  108. }
  109. if out == nil {
  110. return []map[string]any{}
  111. }
  112. return out
  113. }
  114. // systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
  115. // fed by ServerController.refreshStatus every 2s. nodeMetrics holds
  116. // per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
  117. // process-local — survival across panel restart is not required.
  118. var (
  119. systemMetrics = newMetricHistory()
  120. nodeMetrics = newMetricHistory()
  121. )
  122. // SystemMetricKeys lists the metric names ServerService writes on every
  123. // status sample. Exposed for documentation/test purposes; the
  124. // controller validates incoming names against an allow-list.
  125. var SystemMetricKeys = []string{
  126. "cpu", "mem", "netUp", "netDown", "online", "load1", "load5", "load15",
  127. }
  128. // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
  129. var NodeMetricKeys = []string{"cpu", "mem"}