Преглед изворни кода

Fix: traffic writer restart freeze (#4265)

* feat(traffic_writer): enhance traffic writer with concurrency safety and state management

* Revert "feat(traffic_writer): enhance traffic writer with concurrency safety and state management"

This reverts commit e6760ae39629a592dec293197768f27ff0f5a578.

* feat(traffic_writer): enhance traffic writer with concurrency safety and state management

* feat(web): implement panel-only start/stop methods for in-process restarts
Farhad H. P. Shirvan пре 16 часа
родитељ
комит
d86e87ed30
4 измењених фајлова са 293 додато и 35 уклоњено
  1. 2 6
      main.go
  2. 63 17
      web/service/traffic_writer.go
  3. 190 0
      web/service/traffic_writer_test.go
  4. 38 12
      web/web.go

+ 2 - 6
main.go

@@ -81,11 +81,7 @@ func runWebServer() {
 		case syscall.SIGHUP:
 			logger.Info("Received SIGHUP signal. Restarting servers...")
 
-			// --- FIX FOR TELEGRAM BOT CONFLICT (409): Stop bot before restart ---
-			service.StopBot()
-			// --
-
-			err := server.Stop()
+			err := server.StopPanelOnly()
 			if err != nil {
 				logger.Debug("Error stopping web server:", err)
 			}
@@ -96,7 +92,7 @@ func runWebServer() {
 
 			server = web.NewServer()
 			global.SetWebServer(server)
-			err = server.Start()
+			err = server.StartPanelOnly()
 			if err != nil {
 				log.Fatalf("Error restarting web server: %v", err)
 				return

+ 63 - 17
web/service/traffic_writer.go

@@ -23,6 +23,7 @@ type trafficWriteRequest struct {
 var (
 	twMu     sync.Mutex
 	twQueue  chan *trafficWriteRequest
+	twCtx    context.Context
 	twCancel context.CancelFunc
 	twDone   chan struct{}
 )
@@ -37,16 +38,26 @@ var (
 func StartTrafficWriter() {
 	twMu.Lock()
 	defer twMu.Unlock()
-	if twQueue != nil {
-		return
+
+	if twCancel != nil && twDone != nil {
+		select {
+		case <-twDone:
+			clearTrafficWriterState()
+		default:
+			return
+		}
 	}
+
 	queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
 	ctx, cancel := context.WithCancel(context.Background())
 	done := make(chan struct{})
+
 	twQueue = queue
+	twCtx = ctx
 	twCancel = cancel
 	twDone = done
-	go runTrafficWriter(queue, ctx, done)
+
+	go runTrafficWriter(ctx, queue, done)
 }
 
 // StopTrafficWriter cancels the writer context and waits for the goroutine to
@@ -56,20 +67,30 @@ func StopTrafficWriter() {
 	twMu.Lock()
 	cancel := twCancel
 	done := twDone
-	twQueue = nil
-	twCancel = nil
-	twDone = nil
+	if cancel == nil || done == nil {
+		twMu.Unlock()
+		return
+	}
+	cancel()
 	twMu.Unlock()
 
-	if cancel != nil {
-		cancel()
-	}
-	if done != nil {
-		<-done
+	<-done
+
+	twMu.Lock()
+	if twDone == done {
+		clearTrafficWriterState()
 	}
+	twMu.Unlock()
 }
 
-func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) {
+func clearTrafficWriterState() {
+	twQueue = nil
+	twCtx = nil
+	twCancel = nil
+	twDone = nil
+}
+
+func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done chan struct{}) {
 	defer close(done)
 	for {
 		select {
@@ -99,18 +120,43 @@ func safeApply(fn func() error) (err error) {
 }
 
 func submitTrafficWrite(fn func() error) error {
+	req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
+
 	twMu.Lock()
 	queue := twQueue
-	twMu.Unlock()
+	ctx := twCtx
+	done := twDone
+	if queue == nil || ctx == nil || done == nil {
+		twMu.Unlock()
+		return safeApply(fn)
+	}
 
-	if queue == nil {
+	select {
+	case <-ctx.Done():
+		twMu.Unlock()
 		return safeApply(fn)
+	default:
 	}
-	req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
+
+	timer := time.NewTimer(trafficWriterSubmitTimeout)
+	defer timer.Stop()
 	select {
 	case queue <- req:
-	case <-time.After(trafficWriterSubmitTimeout):
+		twMu.Unlock()
+	case <-timer.C:
+		twMu.Unlock()
 		return errors.New("traffic writer queue full")
 	}
-	return <-req.done
+
+	select {
+	case err := <-req.done:
+		return err
+	case <-done:
+		select {
+		case err := <-req.done:
+			return err
+		default:
+			return errors.New("traffic writer stopped before write completed")
+		}
+	}
 }

+ 190 - 0
web/service/traffic_writer_test.go

@@ -0,0 +1,190 @@
+package service
+
+import (
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+func TestTrafficWriterStartStopStartAcceptsWrites(t *testing.T) {
+	resetTrafficWriterForTest(t)
+
+	StartTrafficWriter()
+	var writes atomic.Int32
+	if err := submitTrafficWrite(func() error {
+		writes.Add(1)
+		return nil
+	}); err != nil {
+		t.Fatalf("first submitTrafficWrite: %v", err)
+	}
+
+	StopTrafficWriter()
+	StartTrafficWriter()
+	if err := submitTrafficWrite(func() error {
+		writes.Add(1)
+		return nil
+	}); err != nil {
+		t.Fatalf("second submitTrafficWrite: %v", err)
+	}
+
+	if got := writes.Load(); got != 2 {
+		t.Fatalf("writes = %d, want 2", got)
+	}
+}
+
+func TestTrafficWriterSubmitAfterStopRunsInline(t *testing.T) {
+	resetTrafficWriterForTest(t)
+
+	StartTrafficWriter()
+	StopTrafficWriter()
+
+	ran := make(chan struct{})
+	errCh := make(chan error, 1)
+	go func() {
+		errCh <- submitTrafficWrite(func() error {
+			close(ran)
+			return nil
+		})
+	}()
+
+	select {
+	case <-ran:
+	case <-time.After(time.Second):
+		t.Fatal("submitTrafficWrite did not run after traffic writer stopped")
+	}
+	if err := waitTrafficWriterErr(t, errCh); err != nil {
+		t.Fatalf("submitTrafficWrite after stop: %v", err)
+	}
+}
+
+func TestTrafficWriterStopDrainsQueuedWrite(t *testing.T) {
+	resetTrafficWriterForTest(t)
+
+	StartTrafficWriter()
+	firstStarted := make(chan struct{})
+	releaseFirst := make(chan struct{})
+	firstErr := make(chan error, 1)
+	go func() {
+		firstErr <- submitTrafficWrite(func() error {
+			close(firstStarted)
+			<-releaseFirst
+			return nil
+		})
+	}()
+	waitTrafficWriterSignal(t, firstStarted, "first write did not start")
+
+	secondRan := make(chan struct{})
+	secondErr := make(chan error, 1)
+	go func() {
+		secondErr <- submitTrafficWrite(func() error {
+			close(secondRan)
+			return nil
+		})
+	}()
+	waitTrafficWriterQueued(t)
+
+	stopDone := make(chan struct{})
+	go func() {
+		StopTrafficWriter()
+		close(stopDone)
+	}()
+
+	select {
+	case <-stopDone:
+		t.Fatal("StopTrafficWriter returned before in-flight write was released")
+	case <-time.After(50 * time.Millisecond):
+	}
+
+	close(releaseFirst)
+	waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter did not return")
+	waitTrafficWriterSignal(t, secondRan, "queued write was not drained")
+
+	if err := waitTrafficWriterErr(t, firstErr); err != nil {
+		t.Fatalf("first submitTrafficWrite: %v", err)
+	}
+	if err := waitTrafficWriterErr(t, secondErr); err != nil {
+		t.Fatalf("second submitTrafficWrite: %v", err)
+	}
+}
+
+func TestTrafficWriterConcurrentStopDuringSubmitDoesNotHang(t *testing.T) {
+	resetTrafficWriterForTest(t)
+
+	StartTrafficWriter()
+	started := make(chan struct{})
+	release := make(chan struct{})
+	errCh := make(chan error, 1)
+	go func() {
+		errCh <- submitTrafficWrite(func() error {
+			close(started)
+			<-release
+			return nil
+		})
+	}()
+	waitTrafficWriterSignal(t, started, "write did not start")
+
+	stopDone := make(chan struct{})
+	go func() {
+		StopTrafficWriter()
+		close(stopDone)
+	}()
+
+	close(release)
+	waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter hung during submit")
+	if err := waitTrafficWriterErr(t, errCh); err != nil {
+		t.Fatalf("submitTrafficWrite during stop: %v", err)
+	}
+}
+
+func resetTrafficWriterForTest(t *testing.T) {
+	t.Helper()
+	StopTrafficWriter()
+	twMu.Lock()
+	clearTrafficWriterState()
+	twMu.Unlock()
+	t.Cleanup(func() {
+		StopTrafficWriter()
+		twMu.Lock()
+		clearTrafficWriterState()
+		twMu.Unlock()
+	})
+}
+
+func waitTrafficWriterQueued(t *testing.T) {
+	t.Helper()
+
+	deadline := time.Now().Add(time.Second)
+	for time.Now().Before(deadline) {
+		twMu.Lock()
+		queued := 0
+		if twQueue != nil {
+			queued = len(twQueue)
+		}
+		twMu.Unlock()
+		if queued > 0 {
+			return
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+	t.Fatal("write was not queued")
+}
+
+func waitTrafficWriterSignal(t *testing.T, ch <-chan struct{}, msg string) {
+	t.Helper()
+	select {
+	case <-ch:
+	case <-time.After(time.Second):
+		t.Fatal(msg)
+	}
+}
+
+func waitTrafficWriterErr(t *testing.T, ch <-chan error) error {
+	t.Helper()
+	select {
+	case err := <-ch:
+		return err
+	case <-time.After(time.Second):
+		t.Fatal("timed out waiting for traffic writer result")
+		return nil
+	}
+}

+ 38 - 12
web/web.go

@@ -259,11 +259,13 @@ func (s *Server) initRouter() (*gin.Engine, error) {
 
 // startTask schedules background jobs (Xray checks, traffic jobs, cron
 // jobs) which the panel relies on for periodic maintenance and monitoring.
-func (s *Server) startTask() {
+func (s *Server) startTask(restartXray bool) {
 	s.customGeoService.EnsureOnStartup()
-	err := s.xrayService.RestartXray(true)
-	if err != nil {
-		logger.Warning("start xray failed:", err)
+	if restartXray {
+		err := s.xrayService.RestartXray(true)
+		if err != nil {
+			logger.Warning("start xray failed:", err)
+		}
 	}
 	// Check whether xray is running every second
 	s.cron.AddJob("@every 1s", job.NewCheckXrayRunningJob())
@@ -348,6 +350,15 @@ func (s *Server) startTask() {
 
 // Start initializes and starts the web server with configured settings, routes, and background jobs.
 func (s *Server) Start() (err error) {
+	return s.start(true, true)
+}
+
+// StartPanelOnly initializes the panel during an in-process panel restart without cycling Xray.
+func (s *Server) StartPanelOnly() (err error) {
+	return s.start(false, false)
+}
+
+func (s *Server) start(restartXray bool, startTgBot bool) (err error) {
 	// This is an anonymous function, no function name
 	defer func() {
 		if err != nil {
@@ -427,12 +438,14 @@ func (s *Server) Start() (err error) {
 		s.httpServer.Serve(listener)
 	}()
 
-	s.startTask()
+	s.startTask(restartXray)
 
-	isTgbotenabled, err := s.settingService.GetTgbotEnabled()
-	if (err == nil) && (isTgbotenabled) {
-		tgBot := s.tgbotService.NewTgbot()
-		tgBot.Start(i18nFS)
+	if startTgBot {
+		isTgbotenabled, err := s.settingService.GetTgbotEnabled()
+		if (err == nil) && (isTgbotenabled) {
+			tgBot := s.tgbotService.NewTgbot()
+			tgBot.Start(i18nFS)
+		}
 	}
 
 	return nil
@@ -440,13 +453,26 @@ func (s *Server) Start() (err error) {
 
 // Stop gracefully shuts down the web server, stops Xray, cron jobs, and Telegram bot.
 func (s *Server) Stop() error {
+	return s.stop(true, true)
+}
+
+// StopPanelOnly stops only panel-owned HTTP/background resources for an in-process panel restart.
+func (s *Server) StopPanelOnly() error {
+	return s.stop(false, false)
+}
+
+func (s *Server) stop(stopXray bool, stopTgBot bool) error {
 	s.cancel()
-	s.xrayService.StopXray()
+	if stopXray {
+		s.xrayService.StopXray()
+	}
 	if s.cron != nil {
 		s.cron.Stop()
 	}
-	service.StopTrafficWriter()
-	if s.tgbotService.IsRunning() {
+	if stopXray {
+		service.StopTrafficWriter()
+	}
+	if stopTgBot && s.tgbotService.IsRunning() {
 		s.tgbotService.Stop()
 	}
 	// Gracefully stop WebSocket hub