| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- package service
- import (
- "encoding/gob"
- "os"
- "path/filepath"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/config"
- "github.com/mhsanaei/3x-ui/v3/logger"
- )
- // MetricSample is one point of any time-series we keep in memory.
- // The frontend deserializes both keys, so they must stay short.
- type MetricSample struct {
- T int64 `json:"t"`
- V float64 `json:"v"`
- }
- // metricCapacityDefault caps each ring buffer at ~5h worth of @2s samples
- // or ~25h worth of @10s samples. Plenty for the bucketed aggregation
- // view and small enough that the working set per metric stays under
- // ~150 KiB.
- const metricCapacityDefault = 9000
- // metricHistory is a thread-safe, in-memory ring buffer keyed by
- // arbitrary strings. Two singletons live below: one for system-wide
- // host metrics, one for per-node metrics. Keeping them in this file
- // (rather than scattered across services) makes the storage model
- // easy to reason about and avoids double-locking.
- type metricHistory struct {
- mu sync.Mutex
- metrics map[string][]MetricSample
- }
- func newMetricHistory() *metricHistory {
- return &metricHistory{metrics: map[string][]MetricSample{}}
- }
- // append stores a single sample for the given metric, deduping when
- // two appends happen within the same wall-clock second (which can
- // happen if the cron tick is faster than the metric's natural rate).
- func (h *metricHistory) append(metric string, t time.Time, v float64) {
- h.mu.Lock()
- defer h.mu.Unlock()
- buf := h.metrics[metric]
- p := MetricSample{T: t.Unix(), V: v}
- if n := len(buf); n > 0 && buf[n-1].T == p.T {
- buf[n-1] = p
- } else {
- buf = append(buf, p)
- }
- if len(buf) > metricCapacityDefault {
- buf = buf[len(buf)-metricCapacityDefault:]
- }
- h.metrics[metric] = buf
- }
- // drop removes the entire history for one metric. Used when a node is
- // deleted so its old samples don't linger forever in the singleton.
- func (h *metricHistory) drop(metric string) {
- h.mu.Lock()
- delete(h.metrics, metric)
- h.mu.Unlock()
- }
- // snapshot returns a deep copy of every series, safe to serialize without
- // holding the lock during disk I/O.
- func (h *metricHistory) snapshot() map[string][]MetricSample {
- h.mu.Lock()
- defer h.mu.Unlock()
- out := make(map[string][]MetricSample, len(h.metrics))
- for k, v := range h.metrics {
- cp := make([]MetricSample, len(v))
- copy(cp, v)
- out[k] = cp
- }
- return out
- }
- // restore replaces the in-memory series with a previously persisted set,
- // re-applying the per-series capacity cap so a tampered or oversized file
- // can't grow the working set unbounded.
- func (h *metricHistory) restore(data map[string][]MetricSample) {
- h.mu.Lock()
- defer h.mu.Unlock()
- for k, v := range data {
- if len(v) > metricCapacityDefault {
- v = v[len(v)-metricCapacityDefault:]
- }
- h.metrics[k] = v
- }
- }
- // aggregate returns up to maxPoints buckets of size bucketSeconds,
- // each bucket carrying the arithmetic mean of the underlying samples.
- // Bucket alignment is to absolute Unix-second boundaries so two
- // concurrent calls (e.g. two browser tabs) see identical x-axes.
- func (h *metricHistory) aggregate(metric string, bucketSeconds int, maxPoints int) []map[string]any {
- if bucketSeconds <= 0 || maxPoints <= 0 {
- return []map[string]any{}
- }
- cutoff := time.Now().Add(-time.Duration(bucketSeconds*maxPoints) * time.Second).Unix()
- h.mu.Lock()
- hist := h.metrics[metric]
- startIdx := 0
- for i := len(hist) - 1; i >= 0; i-- {
- if hist[i].T < cutoff {
- startIdx = i + 1
- break
- }
- }
- if startIdx >= len(hist) {
- h.mu.Unlock()
- return []map[string]any{}
- }
- tmp := make([]MetricSample, len(hist)-startIdx)
- copy(tmp, hist[startIdx:])
- h.mu.Unlock()
- if len(tmp) == 0 {
- return []map[string]any{}
- }
- bSize := int64(bucketSeconds)
- curBucket := (tmp[0].T / bSize) * bSize
- var out []map[string]any
- var acc []float64
- flush := func(ts int64) {
- if len(acc) == 0 {
- return
- }
- sum := 0.0
- for _, v := range acc {
- sum += v
- }
- out = append(out, map[string]any{"t": ts, "v": sum / float64(len(acc))})
- acc = acc[:0]
- }
- for _, p := range tmp {
- b := (p.T / bSize) * bSize
- if b != curBucket {
- flush(curBucket)
- curBucket = b
- }
- acc = append(acc, p.V)
- }
- flush(curBucket)
- if len(out) > maxPoints {
- out = out[len(out)-maxPoints:]
- }
- if out == nil {
- return []map[string]any{}
- }
- return out
- }
- // systemMetrics holds whole-host time series (cpu, mem, netUp, etc.)
- // fed by ServerService.RefreshStatus every 2s. nodeMetrics holds
- // per-node CPU/Mem fed by NodeHeartbeatJob every 10s. Both are
- // process-local — survival across panel restart is not required.
- var (
- systemMetrics = newMetricHistory()
- nodeMetrics = newMetricHistory()
- xrayMetrics = newMetricHistory()
- )
- // SystemMetricKeys lists the metric names ServerService writes on every
- // status sample. Exposed for documentation/test purposes; the
- // controller validates incoming names against an allow-list.
- var SystemMetricKeys = []string{
- "cpu", "mem", "swap", "netUp", "netDown", "pktUp", "pktDown", "diskRead", "diskWrite", "diskUsage", "tcpCount", "udpCount", "online", "load1", "load5", "load15",
- }
- // NodeMetricKeys lists the per-node metric names NodeHeartbeatJob writes.
- var NodeMetricKeys = []string{"cpu", "mem"}
- // XrayMetricKeys lists series sourced from xray's /debug/vars expvar
- // endpoint. Populated by XrayMetricsService.Sample on the same 2s cadence
- // as the system metrics, but only when the xray config has a `metrics`
- // block configured.
- var XrayMetricKeys = []string{
- "xrAlloc", "xrSys", "xrHeapObjects", "xrNumGC", "xrPauseNs",
- }
- // systemMetricsStorePath is where the host time-series is persisted between
- // restarts. It lives next to the database so a single volume mount carries
- // both. Only systemMetrics is persisted — node and xray series are cheap to
- // rebuild and tied to live connections.
- func systemMetricsStorePath() string {
- return filepath.Join(config.GetDBFolderPath(), "system_metrics.gob")
- }
- // PersistSystemMetrics writes the host time-series to disk via a temp file +
- // rename so a crash mid-write can't corrupt the previous snapshot. Called on a
- // timer and at shutdown.
- func PersistSystemMetrics() error {
- path := systemMetricsStorePath()
- tmp := path + ".tmp"
- f, err := os.Create(tmp)
- if err != nil {
- return err
- }
- if err := gob.NewEncoder(f).Encode(systemMetrics.snapshot()); err != nil {
- f.Close()
- os.Remove(tmp)
- return err
- }
- if err := f.Close(); err != nil {
- os.Remove(tmp)
- return err
- }
- return os.Rename(tmp, path)
- }
- // RestoreSystemMetrics loads a previously persisted host time-series on startup.
- // A missing file is not an error (first boot). Aggregation already windows by
- // time, so any gap from downtime is handled by the readers.
- func RestoreSystemMetrics() {
- path := systemMetricsStorePath()
- f, err := os.Open(path)
- if err != nil {
- if !os.IsNotExist(err) {
- logger.Warning("restore system metrics failed:", err)
- }
- return
- }
- defer f.Close()
- var data map[string][]MetricSample
- if err := gob.NewDecoder(f).Decode(&data); err != nil {
- logger.Warning("decode system metrics failed:", err)
- return
- }
- systemMetrics.restore(data)
- }
|