Przeglądaj źródła

fix(xray): guard process lifecycle fields against concurrent access (#5395)

The process cmd, done and exitErr fields were written by Start/startCommand and
the waitForCommand goroutine while IsRunning/GetErr/GetResult/Stop read them
concurrently from other goroutines (the status endpoint and the check-xray
job) — a data race. Guard them with a RWMutex: writers take the write lock;
readers snapshot under the read lock and run any blocking syscall
(Wait/Signal/Kill) on the local copy without holding it. IsRunning now uses the
done channel as the exit signal instead of reading cmd.ProcessState, which
races with cmd.Wait. Adds a -race regression test.
n0ctal 1 dzień temu
rodzic
commit
abffa8f6c9
2 zmienionych plików z 118 dodań i 21 usunięć
  1. 58 21
      internal/xray/process.go
  2. 60 0
      internal/xray/process_race_test.go

+ 58 - 21
internal/xray/process.go

@@ -127,6 +127,12 @@ func NewTestProcess(xrayConfig *Config, configPath string) *Process {
 }
 
 type process struct {
+	// mu guards the process lifecycle fields (cmd, done, exitErr) which are
+	// written by Start/startCommand and the waitForCommand goroutine while being
+	// read concurrently by IsRunning/GetErr/GetResult/Stop from other goroutines
+	// (status endpoint, check-xray-running job). Snapshot under the lock, then do
+	// any blocking syscall (Wait/Signal/Kill) on the local copy without holding it.
+	mu   sync.RWMutex
 	cmd  *exec.Cmd
 	done chan struct{}
 
@@ -236,31 +242,39 @@ func newTestProcess(config *Config, configPath string) *process {
 
 // IsRunning returns true if the Xray process is currently running.
 func (p *process) IsRunning() bool {
-	if p.cmd == nil || p.cmd.Process == nil {
+	p.mu.RLock()
+	cmd, done := p.cmd, p.done
+	p.mu.RUnlock()
+	if cmd == nil || cmd.Process == nil {
 		return false
 	}
-	if p.done != nil {
+	// done is closed by the waitForCommand goroutine exactly when cmd.Wait
+	// returns, i.e. when the process has exited; it is the race-free signal here
+	// (reading cmd.ProcessState would race with that Wait).
+	if done != nil {
 		select {
-		case <-p.done:
+		case <-done:
 			return false
 		default:
 		}
 	}
-	if p.cmd.ProcessState == nil {
-		return true
-	}
-	return false
+	return true
 }
 
 // GetErr returns the last error encountered by the Xray process.
 func (p *process) GetErr() error {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
 	return p.exitErr
 }
 
 // GetResult returns the last log line or error from the Xray process.
 func (p *process) GetResult() string {
-	if len(p.logWriter.lastLine) == 0 && p.exitErr != nil {
-		return p.exitErr.Error()
+	p.mu.RLock()
+	exitErr := p.exitErr
+	p.mu.RUnlock()
+	if len(p.logWriter.lastLine) == 0 && exitErr != nil {
+		return exitErr.Error()
 	}
 	return p.logWriter.lastLine
 }
@@ -493,7 +507,7 @@ func (p *process) Start() (err error) {
 	defer func() {
 		if err != nil {
 			logger.Error("Failure in running xray-core process: ", err)
-			p.exitErr = err
+			p.setExitErr(err)
 		}
 	}()
 
@@ -532,25 +546,36 @@ func (p *process) Start() (err error) {
 }
 
 func (p *process) startCommand(cmd *exec.Cmd) error {
+	p.mu.Lock()
 	p.cmd = cmd
 	p.done = make(chan struct{})
 	p.exitErr = nil
+	done := p.done
+	p.mu.Unlock()
 	p.intentionalStop.Store(false)
 
 	if err := cmd.Start(); err != nil {
-		close(p.done)
+		close(done)
+		p.mu.Lock()
 		p.cmd = nil
+		p.mu.Unlock()
 		return err
 	}
 
 	attachChildLifetime(cmd)
 
-	go p.waitForCommand(cmd)
+	go p.waitForCommand(cmd, done)
 	return nil
 }
 
-func (p *process) waitForCommand(cmd *exec.Cmd) {
-	defer close(p.done)
+func (p *process) setExitErr(err error) {
+	p.mu.Lock()
+	p.exitErr = err
+	p.mu.Unlock()
+}
+
+func (p *process) waitForCommand(cmd *exec.Cmd, done chan struct{}) {
+	defer close(done)
 
 	err := cmd.Wait()
 	if err == nil || p.intentionalStop.Load() {
@@ -561,13 +586,13 @@ func (p *process) waitForCommand(cmd *exec.Cmd) {
 	if runtime.GOOS == "windows" {
 		errStr := strings.ToLower(err.Error())
 		if strings.Contains(errStr, "exit status 1") {
-			p.exitErr = err
+			p.setExitErr(err)
 			return
 		}
 	}
 
 	logger.Error("Failure in running xray-core:", err)
-	p.exitErr = err
+	p.setExitErr(err)
 	if OnCrash != nil {
 		OnCrash(err)
 	}
@@ -580,6 +605,15 @@ func (p *process) Stop() error {
 	}
 	p.intentionalStop.Store(true)
 
+	// Snapshot cmd once, then run the blocking Signal/Kill/Wait on the local copy
+	// without holding the lock.
+	p.mu.RLock()
+	cmd := p.cmd
+	p.mu.RUnlock()
+	if cmd == nil || cmd.Process == nil {
+		return errors.New("xray is not running")
+	}
+
 	// Remove temporary config file used for test runs so main config is never touched
 	if p.configPath != "" {
 		if p.configPath != GetConfigPath() {
@@ -591,13 +625,13 @@ func (p *process) Stop() error {
 	}
 
 	if runtime.GOOS == "windows" {
-		if err := p.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
+		if err := cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
 			return err
 		}
 		return p.waitForExit(xrayForceStopTimeout)
 	}
 
-	if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
+	if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
 		if errors.Is(err, os.ErrProcessDone) {
 			return p.waitForExit(xrayForceStopTimeout)
 		}
@@ -609,14 +643,17 @@ func (p *process) Stop() error {
 	}
 
 	logger.Warning("xray-core did not stop after SIGTERM, killing process")
-	if err := p.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
+	if err := cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
 		return err
 	}
 	return p.waitForExit(xrayForceStopTimeout)
 }
 
 func (p *process) waitForExit(timeout time.Duration) error {
-	if p.done == nil {
+	p.mu.RLock()
+	done := p.done
+	p.mu.RUnlock()
+	if done == nil {
 		return nil
 	}
 
@@ -624,7 +661,7 @@ func (p *process) waitForExit(timeout time.Duration) error {
 	defer timer.Stop()
 
 	select {
-	case <-p.done:
+	case <-done:
 		return nil
 	case <-timer.C:
 		return common.NewErrorf("timed out waiting for xray-core process to stop after %s", timeout)

+ 60 - 0
internal/xray/process_race_test.go

@@ -0,0 +1,60 @@
+package xray
+
+import (
+	"errors"
+	"os/exec"
+	"sync"
+	"testing"
+	"time"
+)
+
+// TestProcessLifecycleFieldsRaceSafe drives the lifecycle fields (cmd, done,
+// exitErr) the way Start/startCommand and the waitForCommand goroutine do, while
+// the status getters read them concurrently. Run with -race: any unsynchronized
+// access to those fields is reported as a data race.
+func TestProcessLifecycleFieldsRaceSafe(t *testing.T) {
+	p := &process{logWriter: NewLogWriter()}
+
+	var wg sync.WaitGroup
+	stop := make(chan struct{})
+
+	// Writer: churn cmd/done/exitErr like Start + waitForCommand.
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-stop:
+				return
+			default:
+			}
+			p.mu.Lock()
+			p.cmd = &exec.Cmd{}
+			p.done = make(chan struct{})
+			p.mu.Unlock()
+			p.setExitErr(errors.New("boom"))
+		}
+	}()
+
+	// Readers: the concurrent status getters.
+	for i := 0; i < 4; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for {
+				select {
+				case <-stop:
+					return
+				default:
+				}
+				_ = p.IsRunning()
+				_ = p.GetErr()
+				_ = p.GetResult()
+			}
+		}()
+	}
+
+	time.Sleep(50 * time.Millisecond)
+	close(stop)
+	wg.Wait()
+}