|
|
@@ -4,7 +4,6 @@ import (
|
|
|
"encoding/gob"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
- "slices"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
@@ -19,109 +18,167 @@ type MetricSample struct {
|
|
|
V float64 `json:"v"`
|
|
|
}
|
|
|
|
|
|
-// metricCapacityDefault caps each ring buffer at 48h worth of @2s samples.
|
|
|
-// Node metrics arrive less frequently, so they fit the same retention window
|
|
|
-// with room to spare.
|
|
|
-const metricCapacityDefault = 86400
|
|
|
+// tierSpec defines one resolution layer of the rollup ladder: a fixed bucket
|
|
|
+// size in seconds and how many buckets to retain. window = resolution*capacity.
|
|
|
+type tierSpec struct {
|
|
|
+ resolution int
|
|
|
+ capacity int
|
|
|
+}
|
|
|
|
|
|
-// metricHistory is a thread-safe, in-memory ring buffer keyed by
|
|
|
-// arbitrary strings. Two singletons live below: one for system-wide
|
|
|
-// host metrics, one for per-node metrics. Keeping them in this file
|
|
|
-// (rather than scattered across services) makes the storage model
|
|
|
-// easy to reason about and avoids double-locking.
|
|
|
-type metricHistory struct {
|
|
|
- mu sync.Mutex
|
|
|
- metrics map[string][]MetricSample
|
|
|
+// metricTiers is the rollup ladder applied to every series. High resolution is
|
|
|
+// kept only for the recent past; older samples roll up into progressively
|
|
|
+// coarser, cheaper layers (RRDtool-style). Per series this totals ~5700 samples
|
|
|
+// (~90 KiB) yet spans a live 2s view through ~7 days of history.
|
|
|
+var metricTiers = []tierSpec{
|
|
|
+ {resolution: 2, capacity: 1800}, // 1h at 2s
|
|
|
+ {resolution: 60, capacity: 2880}, // 48h at 1m
|
|
|
+ {resolution: 600, capacity: 1008}, // 7d at 10m
|
|
|
}
|
|
|
|
|
|
-func newMetricHistory() *metricHistory {
|
|
|
- return &metricHistory{metrics: map[string][]MetricSample{}}
|
|
|
+// tierBuf is one fixed-resolution ring of a series. Samples land in an open
|
|
|
+// bucket and are averaged into the ring only when the next bucket begins, so a
|
|
|
+// coarse tier carries one mean per bucket instead of every raw point.
|
|
|
+type tierBuf struct {
|
|
|
+ resolution int
|
|
|
+ capacity int
|
|
|
+ samples []MetricSample
|
|
|
+ open bool
|
|
|
+ openStart int64
|
|
|
+ openSum float64
|
|
|
+ openCount int
|
|
|
}
|
|
|
|
|
|
-// append stores a single sample for the given metric, deduping when
|
|
|
-// two appends happen within the same wall-clock second (which can
|
|
|
-// happen if the cron tick is faster than the metric's natural rate).
|
|
|
-func (h *metricHistory) append(metric string, t time.Time, v float64) {
|
|
|
- h.mu.Lock()
|
|
|
- defer h.mu.Unlock()
|
|
|
- buf := h.metrics[metric]
|
|
|
- p := MetricSample{T: t.Unix(), V: v}
|
|
|
- if n := len(buf); n > 0 && buf[n-1].T == p.T {
|
|
|
- buf[n-1] = p
|
|
|
- } else {
|
|
|
- buf = append(buf, p)
|
|
|
- }
|
|
|
- if len(buf) > metricCapacityDefault {
|
|
|
- buf = buf[len(buf)-metricCapacityDefault:]
|
|
|
+func (tb *tierBuf) add(unixSec int64, v float64) {
|
|
|
+ res := int64(tb.resolution)
|
|
|
+ b := (unixSec / res) * res
|
|
|
+ if tb.open && b != tb.openStart {
|
|
|
+ tb.flush()
|
|
|
}
|
|
|
- h.metrics[metric] = buf
|
|
|
+ tb.open = true
|
|
|
+ tb.openStart = b
|
|
|
+ tb.openSum += v
|
|
|
+ tb.openCount++
|
|
|
}
|
|
|
|
|
|
-// drop removes the entire history for one metric. Used when a node is
|
|
|
-// deleted so its old samples don't linger forever in the singleton.
|
|
|
-func (h *metricHistory) drop(metric string) {
|
|
|
- h.mu.Lock()
|
|
|
- delete(h.metrics, metric)
|
|
|
- h.mu.Unlock()
|
|
|
+func (tb *tierBuf) flush() {
|
|
|
+ if tb.openCount == 0 {
|
|
|
+ tb.open = false
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tb.samples = append(tb.samples, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
|
|
|
+ if len(tb.samples) > tb.capacity {
|
|
|
+ tb.samples = tb.samples[len(tb.samples)-tb.capacity:]
|
|
|
+ }
|
|
|
+ tb.open = false
|
|
|
+ tb.openStart = 0
|
|
|
+ tb.openSum = 0
|
|
|
+ tb.openCount = 0
|
|
|
}
|
|
|
|
|
|
-// snapshot returns a deep copy of every series, safe to serialize without
|
|
|
-// holding the lock during disk I/O.
|
|
|
-func (h *metricHistory) snapshot() map[string][]MetricSample {
|
|
|
- h.mu.Lock()
|
|
|
- defer h.mu.Unlock()
|
|
|
- out := make(map[string][]MetricSample, len(h.metrics))
|
|
|
- for k, v := range h.metrics {
|
|
|
- cp := make([]MetricSample, len(v))
|
|
|
- copy(cp, v)
|
|
|
- out[k] = cp
|
|
|
+// readSamples returns a copy of the closed buckets plus the still-open one, so
|
|
|
+// the most recent point is visible before its bucket boundary closes.
|
|
|
+func (tb *tierBuf) readSamples() []MetricSample {
|
|
|
+ out := make([]MetricSample, len(tb.samples), len(tb.samples)+1)
|
|
|
+ copy(out, tb.samples)
|
|
|
+ if tb.openCount > 0 {
|
|
|
+ out = append(out, MetricSample{T: tb.openStart, V: tb.openSum / float64(tb.openCount)})
|
|
|
}
|
|
|
return out
|
|
|
}
|
|
|
|
|
|
-// restore replaces the in-memory series with a previously persisted set,
|
|
|
-// re-applying the per-series capacity cap so a tampered or oversized file
|
|
|
-// can't grow the working set unbounded.
|
|
|
-func (h *metricHistory) restore(data map[string][]MetricSample) {
|
|
|
+// series is the rollup ladder for one metric: a sample is fed to every tier.
|
|
|
+type series struct {
|
|
|
+ tiers []*tierBuf
|
|
|
+}
|
|
|
+
|
|
|
+func newSeries() *series {
|
|
|
+ s := &series{tiers: make([]*tierBuf, len(metricTiers))}
|
|
|
+ for i, spec := range metricTiers {
|
|
|
+ s.tiers[i] = &tierBuf{resolution: spec.resolution, capacity: spec.capacity}
|
|
|
+ }
|
|
|
+ return s
|
|
|
+}
|
|
|
+
|
|
|
+func (s *series) add(unixSec int64, v float64) {
|
|
|
+ for _, tb := range s.tiers {
|
|
|
+ tb.add(unixSec, v)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// pickTier returns the finest tier whose window covers spanSeconds, falling back
|
|
|
+// to the coarsest (longest-window) tier when nothing covers it.
|
|
|
+func (s *series) pickTier(spanSeconds int64) *tierBuf {
|
|
|
+ for _, tb := range s.tiers {
|
|
|
+ if int64(tb.resolution)*int64(tb.capacity) >= spanSeconds {
|
|
|
+ return tb
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return s.tiers[len(s.tiers)-1]
|
|
|
+}
|
|
|
+
|
|
|
+// metricHistory is a thread-safe, in-memory store of tiered series keyed by
|
|
|
+// arbitrary strings. Three singletons live below: system-wide host metrics,
|
|
|
+// per-node metrics, and xray expvar metrics.
|
|
|
+type metricHistory struct {
|
|
|
+ mu sync.Mutex
|
|
|
+ series map[string]*series
|
|
|
+}
|
|
|
+
|
|
|
+func newMetricHistory() *metricHistory {
|
|
|
+ return &metricHistory{series: map[string]*series{}}
|
|
|
+}
|
|
|
+
|
|
|
+// append stores a single sample for the given metric across all tiers.
|
|
|
+func (h *metricHistory) append(metric string, t time.Time, v float64) {
|
|
|
h.mu.Lock()
|
|
|
defer h.mu.Unlock()
|
|
|
- for k, v := range data {
|
|
|
- if len(v) > metricCapacityDefault {
|
|
|
- v = v[len(v)-metricCapacityDefault:]
|
|
|
- }
|
|
|
- h.metrics[k] = v
|
|
|
+ s := h.series[metric]
|
|
|
+ if s == nil {
|
|
|
+ s = newSeries()
|
|
|
+ h.series[metric] = s
|
|
|
}
|
|
|
+ s.add(t.Unix(), v)
|
|
|
}
|
|
|
|
|
|
-// aggregate returns up to maxPoints buckets of size bucketSeconds,
|
|
|
-// each bucket carrying the arithmetic mean of the underlying samples.
|
|
|
-// Bucket alignment is to absolute Unix-second boundaries so two
|
|
|
-// concurrent calls (e.g. two browser tabs) see identical x-axes.
|
|
|
+// drop removes the entire history for one metric. Used when a node is deleted so
|
|
|
+// its old samples don't linger forever in the singleton.
|
|
|
+func (h *metricHistory) drop(metric string) {
|
|
|
+ h.mu.Lock()
|
|
|
+ delete(h.series, metric)
|
|
|
+ h.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+// aggregate returns up to maxPoints buckets of size bucketSeconds, each carrying
|
|
|
+// the arithmetic mean of the underlying samples from the finest tier that covers
|
|
|
+// the requested span. Bucket alignment is to absolute Unix-second boundaries so
|
|
|
+// two concurrent calls see identical x-axes.
|
|
|
func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
|
|
|
+ empty := []map[string]any{}
|
|
|
if bucketSeconds <= 0 || maxPoints <= 0 {
|
|
|
- return []map[string]any{}
|
|
|
+ return empty
|
|
|
}
|
|
|
- cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
|
|
|
+ span := int64(bucketSeconds) * int64(maxPoints)
|
|
|
+ cutoff := time.Now().Unix() - span
|
|
|
|
|
|
h.mu.Lock()
|
|
|
- hist := h.metrics[metric]
|
|
|
- startIdx := 0
|
|
|
- for i, h := range slices.Backward(hist) {
|
|
|
- if h.T < cutoff {
|
|
|
- startIdx = i + 1
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if startIdx >= len(hist) {
|
|
|
+ s := h.series[metric]
|
|
|
+ if s == nil {
|
|
|
h.mu.Unlock()
|
|
|
- return []map[string]any{}
|
|
|
+ return empty
|
|
|
}
|
|
|
- tmp := make([]MetricSample, len(hist)-startIdx)
|
|
|
- copy(tmp, hist[startIdx:])
|
|
|
+ raw := s.pickTier(span).readSamples()
|
|
|
h.mu.Unlock()
|
|
|
|
|
|
+ startIdx := len(raw)
|
|
|
+ for i := len(raw) - 1; i >= 0; i-- {
|
|
|
+ if raw[i].T < cutoff {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ startIdx = i
|
|
|
+ }
|
|
|
+ tmp := raw[startIdx:]
|
|
|
if len(tmp) == 0 {
|
|
|
- return []map[string]any{}
|
|
|
+ return empty
|
|
|
}
|
|
|
|
|
|
bSize := int64(bucketSeconds)
|
|
|
@@ -152,24 +209,79 @@ func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints in
|
|
|
out = out[len(out)-maxPoints:]
|
|
|
}
|
|
|
if out == nil {
|
|
|
- return []map[string]any{}
|
|
|
+ return empty
|
|
|
+ }
|
|
|
+ return out
|
|
|
+}
|
|
|
+
|
|
|
+// persistedTier and persistedSeries are the on-disk shape of a series. Tiers are
|
|
|
+// matched back by resolution on restore, so changing the ladder degrades
|
|
|
+// gracefully (unmatched layers are dropped) instead of corrupting state.
|
|
|
+type persistedTier struct {
|
|
|
+ Resolution int
|
|
|
+ Samples []MetricSample
|
|
|
+}
|
|
|
+
|
|
|
+type persistedSeries struct {
|
|
|
+ Tiers []persistedTier
|
|
|
+}
|
|
|
+
|
|
|
+// snapshot returns a deep copy of every series' closed buckets, safe to
|
|
|
+// serialize without holding the lock during disk I/O.
|
|
|
+func (h *metricHistory) snapshot() map[string]persistedSeries {
|
|
|
+ h.mu.Lock()
|
|
|
+ defer h.mu.Unlock()
|
|
|
+ out := make(map[string]persistedSeries, len(h.series))
|
|
|
+ for k, s := range h.series {
|
|
|
+ ps := persistedSeries{Tiers: make([]persistedTier, len(s.tiers))}
|
|
|
+ for i, tb := range s.tiers {
|
|
|
+ cp := make([]MetricSample, len(tb.samples))
|
|
|
+ copy(cp, tb.samples)
|
|
|
+ ps.Tiers[i] = persistedTier{Resolution: tb.resolution, Samples: cp}
|
|
|
+ }
|
|
|
+ out[k] = ps
|
|
|
}
|
|
|
return out
|
|
|
}
|
|
|
|
|
|
-// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
|
|
|
-// fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
|
|
|
-// per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
|
|
|
-// process-local — survival across panel restart is not required.
|
|
|
+// restore replaces the in-memory series with a previously persisted set,
|
|
|
+// re-applying each tier's capacity cap so a tampered or oversized file can't grow
|
|
|
+// the working set unbounded.
|
|
|
+func (h *metricHistory) restore(data map[string]persistedSeries) {
|
|
|
+ h.mu.Lock()
|
|
|
+ defer h.mu.Unlock()
|
|
|
+ for k, ps := range data {
|
|
|
+ s := newSeries()
|
|
|
+ for _, pt := range ps.Tiers {
|
|
|
+ for _, tb := range s.tiers {
|
|
|
+ if tb.resolution != pt.Resolution {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ samples := pt.Samples
|
|
|
+ if len(samples) > tb.capacity {
|
|
|
+ samples = samples[len(samples)-tb.capacity:]
|
|
|
+ }
|
|
|
+ tb.samples = samples
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ h.series[k] = s
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// systemMetrics holds whole-host time series (cpu, mem, netUp, etc.) fed by
|
|
|
+// ServerService.RefreshStatus every 2s. nodeMetrics holds per-node CPU/Mem fed
|
|
|
+// by NodeHeartbeatJob. xrayMetrics holds xray expvar series. Only systemMetrics
|
|
|
+// is persisted; the others rebuild from live connections.
|
|
|
var (
|
|
|
systemMetrics = newMetricHistory()
|
|
|
nodeMetrics = newMetricHistory()
|
|
|
xrayMetrics = newMetricHistory()
|
|
|
)
|
|
|
|
|
|
-// SystemMetricKeys lists the metric names ServerService writes on every
|
|
|
-// status sample. Exposed for documentation/test purposes; the
|
|
|
-// controller validates incoming names against an allow-list.
|
|
|
+// SystemMetricKeys lists the metric names ServerService writes on every status
|
|
|
+// sample. Exposed for documentation/test purposes; the controller validates
|
|
|
+// incoming names against an allow-list.
|
|
|
var SystemMetricKeys = []string{
|
|
|
"cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
|
|
|
}
|
|
|
@@ -177,18 +289,13 @@ var SystemMetricKeys = []string{
|
|
|
// NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
|
|
|
var NodeMetricKeys = []string{"cpu", "mem", "netUp", "netDown"}
|
|
|
|
|
|
-// XrayMetricKeys lists series sourced from xray's /debug/vars expvar
|
|
|
-// endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
|
|
|
-// as the system metrics, but only when the xray config has a `metrics`
|
|
|
-// block configured.
|
|
|
+// XrayMetricKeys lists series sourced from xray's /debug/vars expvar endpoint.
|
|
|
var XrayMetricKeys = []string{
|
|
|
"xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
|
|
|
}
|
|
|
|
|
|
// systemMetricsStorePath is where the host time-series is persisted between
|
|
|
-// restarts. It lives next to the database so a single volume mount carries
|
|
|
-// both. Only systemMetrics is persisted — node and xray series are cheap to
|
|
|
-// rebuild and tied to live connections.
|
|
|
+// restarts. It lives next to the database so a single volume mount carries both.
|
|
|
func systemMetricsStorePath() string {
|
|
|
return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
|
|
|
}
|
|
|
@@ -216,8 +323,8 @@ func PersistSystemMetrics() error {
|
|
|
}
|
|
|
|
|
|
// RestoreSystemMetrics loads a previously persisted host time-series on startup.
|
|
|
-// A missing file is not an error (first boot). Aggregation already windows by
|
|
|
-// time, so any gap from downtime is handled by the readers.
|
|
|
+// A missing file is not an error (first boot). A pre-tier flat snapshot is
|
|
|
+// migrated by replaying its samples through the rollup.
|
|
|
func RestoreSystemMetrics() {
|
|
|
path := systemMetricsStorePath()
|
|
|
f, err := os.Open(path)
|
|
|
@@ -227,11 +334,36 @@ func RestoreSystemMetrics() {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
- defer f.Close()
|
|
|
- var data map[string][]MetricSample
|
|
|
- if err := gob.NewDecoder(f).Decode(&data); err != nil {
|
|
|
- logger.Warning("decode system metrics failed:", err)
|
|
|
+ var data map[string]persistedSeries
|
|
|
+ decErr := gob.NewDecoder(f).Decode(&data)
|
|
|
+ f.Close()
|
|
|
+ if decErr == nil {
|
|
|
+ systemMetrics.restore(data)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if migrateLegacySystemMetrics(path) {
|
|
|
return
|
|
|
}
|
|
|
- systemMetrics.restore(data)
|
|
|
+ logger.Warning("decode system metrics failed:", decErr)
|
|
|
+}
|
|
|
+
|
|
|
+// migrateLegacySystemMetrics loads a pre-tier flat snapshot
|
|
|
+// (map[string][]MetricSample) and replays it through append so the new tiers are
|
|
|
+// seeded from the existing history instead of starting empty.
|
|
|
+func migrateLegacySystemMetrics(path string) bool {
|
|
|
+ f, err := os.Open(path)
|
|
|
+ if err != nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ defer f.Close()
|
|
|
+ var legacy map[string][]MetricSample
|
|
|
+ if err := gob.NewDecoder(f).Decode(&legacy); err != nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ for metric, samples := range legacy {
|
|
|
+ for _, p := range samples {
|
|
|
+ systemMetrics.append(metric, time.Unix(p.T, 0), p.V)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
}
|