Quellcode durchsuchen

perf: prevent cron job overlap, auto-set GOMEMLIMIT, fix tgbot userStates race

cron: SkipIfStillRunning stops a slow 5s/10s job from overlapping itself and racing the shared xrayAPI (grpc conn leak) and the StatsLastValues map (fatal concurrent map write). memlimit: auto-detect a Go soft memory limit from XUI_MEMORY_LIMIT, the cgroup limit, or system RAM (about 90 percent); opt-in pprof via XUI_PPROF. tgbot: userStates now goes through a mutex-guarded store with TTL pruning (was raced by worker-pool and delayed-delete goroutines). check_client_ip: prefilter inbounds by settings LIKE limitIp instead of loading and JSON-parsing all of them every scan. minor: prune StatsLastValues, RateLimiter.lastSent, reportedRemoteTagConflict. docker-compose: document the memory knobs.
MHSanaei vor 9 Stunden
Ursprung
Commit
7d23a2c15b

+ 8 - 0
docker-compose.yml

@@ -5,6 +5,9 @@ services:
       dockerfile: ./Dockerfile
     container_name: 3xui_app
     # hostname: yourhostname <- optional
+    # Optional hard memory cap. When set, the panel auto-derives its Go soft
+    # limit (GOMEMLIMIT, ~90%) from this so it GCs before the OOM killer fires.
+    # mem_limit: 512m
     # The bundled Fail2ban (XUI_ENABLE_FAIL2BAN below) enforces the IP limit
     # with iptables, which needs NET_ADMIN. Without these caps a ban is logged
     # and shown in fail2ban status but never actually applied. NET_RAW covers
@@ -18,6 +21,11 @@ services:
     environment:
       XRAY_VMESS_AEAD_FORCED: "false"
       XUI_ENABLE_FAIL2BAN: "true"
+      # Go memory soft limit. If neither is set, the panel auto-detects the
+      # cgroup/host limit and targets ~90%. Pin it explicitly with one of:
+      # XUI_MEMORY_LIMIT: "400"      # in MiB
+      # GOMEMLIMIT: "400MiB"         # Go syntax, takes precedence
+      # XUI_PPROF: "true"           # expose pprof on 127.0.0.1:6060 for profiling
       # XUI_INIT_WEB_BASE_PATH: "/"
       # XUI_PORT: "8080"
       # To use PostgreSQL instead of the default SQLite, run:

+ 24 - 5
internal/eventbus/filter.go

@@ -7,9 +7,10 @@ import (
 
 // RateLimiter prevents notification spam from flapping events.
 type RateLimiter struct {
-	mu       sync.Mutex
-	lastSent map[string]time.Time
-	cooldown time.Duration
+	mu        sync.Mutex
+	lastSent  map[string]time.Time
+	cooldown  time.Duration
+	lastPrune time.Time
 }
 
 // NewRateLimiter creates a rate limiter with the given cooldown period.
@@ -23,11 +24,29 @@ func NewRateLimiter(cooldown time.Duration) *RateLimiter {
 // Allow returns true if the event should be sent (cooldown has elapsed).
 func (r *RateLimiter) Allow(eventType EventType, source string) bool {
 	key := string(eventType) + ":" + source
+	now := time.Now()
 	r.mu.Lock()
 	defer r.mu.Unlock()
-	if time.Since(r.lastSent[key]) < r.cooldown {
+	r.pruneLocked(now)
+	if now.Sub(r.lastSent[key]) < r.cooldown {
 		return false
 	}
-	r.lastSent[key] = time.Now()
+	r.lastSent[key] = now
 	return true
 }
+
+// pruneLocked drops keys whose cooldown has elapsed. Such an entry no longer
+// affects Allow's result, so removing it is safe and keeps the map from
+// retaining one entry per (eventType, source) ever seen. Throttled to once per
+// cooldown so a busy bus doesn't sweep the whole map on every event.
+func (r *RateLimiter) pruneLocked(now time.Time) {
+	if now.Sub(r.lastPrune) < r.cooldown {
+		return
+	}
+	r.lastPrune = now
+	for k, v := range r.lastSent {
+		if now.Sub(v) >= r.cooldown {
+			delete(r.lastSent, k)
+		}
+	}
+}

+ 78 - 0
internal/util/sys/memlimit.go

@@ -0,0 +1,78 @@
+package sys
+
+import (
+	"os"
+	"runtime/debug"
+	"strconv"
+	"strings"
+
+	"github.com/shirou/gopsutil/v4/mem"
+)
+
+// memLimitHeadroomPercent is the share of detected memory used for the soft
+// limit, leaving room for non-heap (stacks, mmap, the xray child) before the OS
+// OOM-kills the process.
+const memLimitHeadroomPercent = 90
+
+// ApplyMemoryLimit sets a Go soft memory limit (the runtime's GOMEMLIMIT) when
+// one is not already configured, so a long-running panel in a memory-capped
+// container or VPS triggers GC as it approaches the cap instead of growing RSS
+// until the OS OOM-kills it. Precedence: an explicit GOMEMLIMIT env is left to
+// the runtime; otherwise XUI_MEMORY_LIMIT (in MiB) wins; otherwise the limit is
+// derived from the cgroup memory limit, falling back to total system RAM.
+// Returns the limit applied in bytes (0 when none) and a short source label.
+func ApplyMemoryLimit() (int64, string) {
+	if strings.TrimSpace(os.Getenv("GOMEMLIMIT")) != "" {
+		return 0, "GOMEMLIMIT env (handled by the Go runtime)"
+	}
+
+	if v := strings.TrimSpace(os.Getenv("XUI_MEMORY_LIMIT")); v != "" {
+		if mb, err := strconv.ParseInt(v, 10, 64); err == nil && mb > 0 {
+			limit := mb << 20
+			debug.SetMemoryLimit(limit)
+			return limit, "XUI_MEMORY_LIMIT=" + v + "MiB"
+		}
+	}
+
+	total, source := detectAvailableMemory()
+	if total <= 0 {
+		return 0, "undetectable; left at Go default"
+	}
+	limit := total / 100 * memLimitHeadroomPercent
+	debug.SetMemoryLimit(limit)
+	return limit, source
+}
+
+func detectAvailableMemory() (int64, string) {
+	if v, ok := cgroupMemoryLimit(); ok {
+		return v, "cgroup limit"
+	}
+	if vm, err := mem.VirtualMemory(); err == nil && vm.Total > 0 {
+		return int64(vm.Total), "system RAM"
+	}
+	return 0, ""
+}
+
+// cgroupMemoryLimit reads the container memory limit from cgroup v2 then v1.
+// A "max" value or the v1 unlimited sentinel (~8 EiB) means no limit at this
+// level, so it reports not-found and the caller falls back to system RAM. The
+// files are absent off Linux, which also yields not-found.
+func cgroupMemoryLimit() (int64, bool) {
+	const unlimited = int64(1) << 62
+
+	if b, err := os.ReadFile("/sys/fs/cgroup/memory.max"); err == nil {
+		if s := strings.TrimSpace(string(b)); s != "" && s != "max" {
+			if v, err := strconv.ParseInt(s, 10, 64); err == nil && v > 0 && v < unlimited {
+				return v, true
+			}
+		}
+	}
+
+	if b, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); err == nil {
+		if v, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64); err == nil && v > 0 && v < unlimited {
+			return v, true
+		}
+	}
+
+	return 0, false
+}

+ 1 - 1
internal/web/job/check_client_ip_job.go

@@ -157,7 +157,7 @@ func (j *CheckClientIpJob) hasLimitIp() bool {
 	db := database.GetDB()
 	var inbounds []*model.Inbound
 
-	err := db.Model(model.Inbound{}).Find(&inbounds).Error
+	err := db.Model(model.Inbound{}).Where("settings LIKE ?", "%limitIp%").Find(&inbounds).Error
 	if err != nil {
 		return false
 	}

+ 1 - 0
internal/web/service/inbound_node.go

@@ -407,6 +407,7 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
 				}
 				continue
 			}
+			reportedRemoteTagConflict.Delete(fmt.Sprintf("%d:%s", nodeID, snapIb.Tag))
 			newIb := model.Inbound{
 				UserId:               defaultUserId,
 				NodeID:               &nodeID,

+ 61 - 1
internal/web/service/tgbot/tgbot.go

@@ -83,7 +83,65 @@ var (
 	client_Reset         int
 )
 
-var userStates = make(map[int64]string)
+// userStateStore guards the per-chat conversation states. The Telegram command
+// and callback handlers run on a worker-pool goroutine while the message handler
+// runs on the dispatch goroutine, so a bare map would be a concurrent-map-write
+// crash. It also expires abandoned conversations so a user who starts a flow and
+// goes silent doesn't leave an entry forever.
+type userStateStore struct {
+	mu        sync.Mutex
+	states    map[int64]userStateEntry
+	lastPrune time.Time
+}
+
+type userStateEntry struct {
+	state string
+	at    time.Time
+}
+
+var userStateMgr = &userStateStore{states: make(map[int64]userStateEntry)}
+
+func (s *userStateStore) set(chatID int64, state string) {
+	s.mu.Lock()
+	s.states[chatID] = userStateEntry{state: state, at: time.Now()}
+	s.mu.Unlock()
+}
+
+func (s *userStateStore) get(chatID int64) (string, bool) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	e, ok := s.states[chatID]
+	return e.state, ok
+}
+
+func (s *userStateStore) clear(chatID int64) {
+	s.mu.Lock()
+	delete(s.states, chatID)
+	s.mu.Unlock()
+}
+
+func (s *userStateStore) reset() {
+	s.mu.Lock()
+	s.states = make(map[int64]userStateEntry)
+	s.mu.Unlock()
+}
+
+// maybePrune drops conversations older than maxAge, at most once per maxAge so a
+// busy bot doesn't sweep the whole map on every message.
+func (s *userStateStore) maybePrune(maxAge time.Duration) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	now := time.Now()
+	if now.Sub(s.lastPrune) < maxAge {
+		return
+	}
+	s.lastPrune = now
+	for id, e := range s.states {
+		if now.Sub(e.at) > maxAge {
+			delete(s.states, id)
+		}
+	}
+}
 
 // LoginStatus represents the result of a login attempt.
 type LoginStatus byte
@@ -411,6 +469,8 @@ func StopBot() {
 	isRunning = false
 	tgBotMutex.Unlock()
 
+	userStateMgr.reset()
+
 	if handler != nil {
 		handler.Stop()
 	}

+ 17 - 16
internal/web/service/tgbot/tgbot_router.go

@@ -47,7 +47,7 @@ func (t *Tgbot) OnReceive() {
 		tgBotMutex.Unlock()
 
 		h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
-			delete(userStates, message.Chat.ID)
+			userStateMgr.clear(message.Chat.ID)
 			t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
 			return nil
 		}, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
@@ -62,7 +62,7 @@ func (t *Tgbot) OnReceive() {
 				messageWorkerPool <- struct{}{}        // Acquire worker
 				defer func() { <-messageWorkerPool }() // Release worker
 
-				delete(userStates, message.Chat.ID)
+				userStateMgr.clear(message.Chat.ID)
 				t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID))
 			}()
 			return nil
@@ -74,25 +74,26 @@ func (t *Tgbot) OnReceive() {
 				messageWorkerPool <- struct{}{}        // Acquire worker
 				defer func() { <-messageWorkerPool }() // Release worker
 
-				delete(userStates, query.Message.GetChat().ID)
+				userStateMgr.clear(query.Message.GetChat().ID)
 				t.answerCallback(&query, checkAdmin(query.From.ID))
 			}()
 			return nil
 		}, th.AnyCallbackQueryWithMessage())
 
 		h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
-			if userState, exists := userStates[message.Chat.ID]; exists {
+			userStateMgr.maybePrune(time.Hour)
+			if userState, exists := userStateMgr.get(message.Chat.ID); exists {
 				switch userState {
 				case "awaiting_email":
 					if client_Email == strings.TrimSpace(message.Text) {
 						t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
-						delete(userStates, message.Chat.ID)
+						userStateMgr.clear(message.Chat.ID)
 						return nil
 					}
 
 					client_Email = strings.TrimSpace(message.Text)
 					if t.isSingleWord(client_Email) {
-						userStates[message.Chat.ID] = "awaiting_email"
+						userStateMgr.set(message.Chat.ID, "awaiting_email")
 
 						cancel_btn_markup := tu.InlineKeyboard(
 							tu.InlineKeyboardRow(
@@ -103,26 +104,26 @@ func (t *Tgbot) OnReceive() {
 						t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
 					} else {
 						t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove())
-						delete(userStates, message.Chat.ID)
+						userStateMgr.clear(message.Chat.ID)
 						t.addClient(message.Chat.ID, t.BuildClientDraftMessage())
 					}
 				case "awaiting_comment":
 					if client_Comment == strings.TrimSpace(message.Text) {
 						t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
-						delete(userStates, message.Chat.ID)
+						userStateMgr.clear(message.Chat.ID)
 						return nil
 					}
 
 					client_Comment = strings.TrimSpace(message.Text)
 					t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove())
-					delete(userStates, message.Chat.ID)
+					userStateMgr.clear(message.Chat.ID)
 					t.addClient(message.Chat.ID, t.BuildClientDraftMessage())
 				case "awaiting_tg_id":
 					input := strings.TrimSpace(message.Text)
 					if input == "" || input == "-" || strings.EqualFold(input, "none") {
 						client_TgID = ""
 						t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
-						delete(userStates, message.Chat.ID)
+						userStateMgr.clear(message.Chat.ID)
 						t.addClient(message.Chat.ID, t.BuildClientDraftMessage())
 						return nil
 					}
@@ -137,7 +138,7 @@ func (t *Tgbot) OnReceive() {
 					}
 					client_TgID = input
 					t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.userSaved"), 3, tu.ReplyKeyboardRemove())
-					delete(userStates, message.Chat.ID)
+					userStateMgr.clear(message.Chat.ID)
 					t.addClient(message.Chat.ID, t.BuildClientDraftMessage())
 				}
 
@@ -1236,7 +1237,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool
 		t.SendMsgToTgbot(chatId, t.I18nBot("tgbot.answers.chooseInbound"), inbounds)
 	case "add_client_ch_default_email":
 		t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID())
-		userStates[chatId] = "awaiting_email"
+		userStateMgr.set(chatId, "awaiting_email")
 		cancel_btn_markup := tu.InlineKeyboard(
 			tu.InlineKeyboardRow(
 				tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
@@ -1246,7 +1247,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool
 		t.SendMsgToTgbot(chatId, prompt_message, cancel_btn_markup)
 	case "add_client_ch_default_comment":
 		t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID())
-		userStates[chatId] = "awaiting_comment"
+		userStateMgr.set(chatId, "awaiting_comment")
 		cancel_btn_markup := tu.InlineKeyboard(
 			tu.InlineKeyboardRow(
 				tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
@@ -1256,7 +1257,7 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool
 		t.SendMsgToTgbot(chatId, prompt_message, cancel_btn_markup)
 	case "add_client_ch_default_tg_id":
 		t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID())
-		userStates[chatId] = "awaiting_tg_id"
+		userStateMgr.set(chatId, "awaiting_tg_id")
 		cancel_btn_markup := tu.InlineKeyboard(
 			tu.InlineKeyboardRow(
 				tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
@@ -1357,10 +1358,10 @@ func (t *Tgbot) answerCallback(callbackQuery *telego.CallbackQuery, isAdmin bool
 	case "add_client_default_info":
 		t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID())
 		t.SendMsgToTgbotDeleteAfter(chatId, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
-		delete(userStates, chatId)
+		userStateMgr.clear(chatId)
 		t.addClient(chatId, t.BuildClientDraftMessage())
 	case "add_client_cancel":
-		delete(userStates, chatId)
+		userStateMgr.clear(chatId)
 		receiver_inbound_ID = 0
 		receiver_inbound_IDs = nil
 		t.deleteMessageTgBot(chatId, callbackQuery.Message.GetMessageID())

+ 1 - 1
internal/web/service/tgbot/tgbot_send.go

@@ -232,7 +232,7 @@ func (t *Tgbot) SendMsgToTgbotDeleteAfter(chatId int64, msg string, delayInSecon
 	go func() {
 		time.Sleep(time.Duration(delayInSeconds) * time.Second) // Wait for the specified delay
 		t.deleteMessageTgBot(chatId, sentMsg.MessageID)         // Delete the message
-		delete(userStates, chatId)
+		userStateMgr.clear(chatId)
 	}()
 }
 

+ 13 - 3
internal/web/web.go

@@ -476,9 +476,19 @@ func (s *Server) start(restartXray bool, startTgBot bool) (err error) {
 	}
 	service.StartTrafficWriter()
 
-	// cron.Recover wraps every job so a panic is logged and the scheduler keeps
-	// running, instead of the panic taking down the whole panel process.
-	s.cron = cron.New(cron.WithLocation(loc), cron.WithSeconds(), cron.WithChain(cron.Recover(cron.PrintfLogger(cronPanicLogger{}))))
+	// SkipIfStillRunning stops a slow job (e.g. the 5s traffic poll on a large
+	// install) from overlapping itself: two concurrent runs of the same job race
+	// the shared xrayAPI — leaking a grpc connection — and the StatsLastValues
+	// map, whose concurrent write is a fatal runtime throw cron.Recover can't
+	// catch. cron.Recover then logs any panic and keeps the scheduler alive.
+	s.cron = cron.New(
+		cron.WithLocation(loc),
+		cron.WithSeconds(),
+		cron.WithChain(
+			cron.SkipIfStillRunning(cron.DiscardLogger),
+			cron.Recover(cron.PrintfLogger(cronPanicLogger{})),
+		),
+	)
 	s.cron.Start()
 
 	// Wire the inbound-runtime manager once so InboundService can route

+ 13 - 0
internal/xray/api.go

@@ -594,6 +594,19 @@ func (x *XrayAPI) GetTraffic() ([]*Traffic, []*ClientTraffic, error) {
 			processClientTraffic(matches, value, emailTrafficMap)
 		}
 	}
+
+	// Drop delta baselines for stats that no longer exist (deleted inbounds or
+	// clients), which otherwise linger until the next Xray restart. Only rebuild
+	// when the map has drifted past 2x the live set, so the steady-state hot path
+	// stays allocation-free.
+	if n := len(resp.GetStat()); n > 0 && len(x.StatsLastValues) > 2*n {
+		pruned := make(map[string]int64, n)
+		for _, stat := range resp.GetStat() {
+			pruned[stat.Name] = x.StatsLastValues[stat.Name]
+		}
+		x.StatsLastValues = pruned
+	}
+
 	return mapToSlice(tagTrafficMap), mapToSlice(emailTrafficMap), nil
 }
 

+ 17 - 0
main.go

@@ -6,6 +6,8 @@ import (
 	"flag"
 	"fmt"
 	"log"
+	"net/http"
+	_ "net/http/pprof"
 	"os"
 	"os/signal"
 	"syscall"
@@ -49,6 +51,21 @@ func runWebServer() {
 
 	godotenv.Load()
 
+	if limit, source := sys.ApplyMemoryLimit(); limit > 0 {
+		logger.Infof("Go memory soft limit set to %d MiB (%s)", limit>>20, source)
+	} else {
+		logger.Info("Go memory soft limit not enforced: ", source)
+	}
+
+	if os.Getenv("XUI_PPROF") == "true" {
+		go func() {
+			logger.Info("pprof profiling server listening on 127.0.0.1:6060")
+			if err := http.ListenAndServe("127.0.0.1:6060", nil); err != nil {
+				logger.Warning("pprof server stopped: ", err)
+			}
+		}()
+	}
+
 	err := database.InitDB(config.GetDBPath())
 	if err != nil {
 		log.Fatalf("Error initializing database: %v", err)