Prechádzať zdrojové kódy

fix(xray): guard log-writer race and bound handler gRPC deadlines (#5442)

* perf(xray): compile log/traffic regexps once at package scope

GetTraffic recompiled two stats regexps on every traffic tick, and LogWriter.Write
recompiled two more on every log line. Hoist all four to package-level vars so they
compile once at load instead of per call on hot paths.

* fix(xray): guard LogWriter.lastLine against the GetResult reader race

Write is driven by the Xray process goroutine while Process.GetResult
reads lastLine from the caller's goroutine, so the unsynchronized field
is a data race under `go test -race`. Add an RWMutex and route every
write through setLastLine; GetResult reads via LastLine().

* fix(xray): bound handler gRPC calls with a deadline

AddInbound, DelInbound and the AddUser AlterInbound call used
context.Background(), so a hung core connection could block the caller
indefinitely (for example while the process restart lock is held). Give
them a 10s deadline (handlerRPCTimeout) and a nil-client guard, matching
the other handler operations.
n0ctal 1 deň pred
rodič
commit
2bb29468d8

+ 23 - 3
internal/xray/api.go

@@ -123,8 +123,16 @@ func (x *XrayAPI) Close() {
 	x.isConnected = false
 }
 
+// handlerRPCTimeout bounds per-call gRPC handler operations (add/remove inbound,
+// alter user) so a hung core connection cannot block the caller indefinitely —
+// for example while the process restart lock is held.
+const handlerRPCTimeout = 10 * time.Second
+
 // AddInbound adds a new inbound configuration to the Xray core via gRPC.
 func (x *XrayAPI) AddInbound(inbound []byte) error {
+	if x.HandlerServiceClient == nil {
+		return common.NewError("xray HandlerServiceClient is not initialized")
+	}
 	client := *x.HandlerServiceClient
 
 	conf := new(conf.InboundDetourConfig)
@@ -140,15 +148,22 @@ func (x *XrayAPI) AddInbound(inbound []byte) error {
 	}
 	inboundConfig := command.AddInboundRequest{Inbound: config}
 
-	_, err = client.AddInbound(context.Background(), &inboundConfig)
+	ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout)
+	defer cancel()
+	_, err = client.AddInbound(ctx, &inboundConfig)
 
 	return err
 }
 
 // DelInbound removes an inbound configuration from the Xray core by tag.
 func (x *XrayAPI) DelInbound(tag string) error {
+	if x.HandlerServiceClient == nil {
+		return common.NewError("xray HandlerServiceClient is not initialized")
+	}
 	client := *x.HandlerServiceClient
-	_, err := client.RemoveInbound(context.Background(), &command.RemoveInboundRequest{
+	ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout)
+	defer cancel()
+	_, err := client.RemoveInbound(ctx, &command.RemoveInboundRequest{
 		Tag: tag,
 	})
 	return err
@@ -505,9 +520,14 @@ func (x *XrayAPI) AddUser(Protocol string, inboundTag string, user map[string]an
 		return nil
 	}
 
+	if x.HandlerServiceClient == nil {
+		return common.NewError("xray HandlerServiceClient is not initialized")
+	}
 	client := *x.HandlerServiceClient
 
-	_, err = client.AlterInbound(context.Background(), &command.AlterInboundRequest{
+	ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout)
+	defer cancel()
+	_, err = client.AlterInbound(ctx, &command.AlterInboundRequest{
 		Tag: inboundTag,
 		Operation: serial.ToTypedMessage(&command.AddUserOperation{
 			User: &protocol.User{

+ 22 - 5
internal/xray/log_writer.go

@@ -4,6 +4,7 @@ import (
 	"regexp"
 	"runtime"
 	"strings"
+	"sync"
 
 	"github.com/mhsanaei/3x-ui/v3/internal/logger"
 )
@@ -22,9 +23,25 @@ func NewLogWriter() *LogWriter {
 
 // LogWriter processes and filters log output from the Xray process, handling crash detection and message filtering.
 type LogWriter struct {
+	mu       sync.RWMutex
 	lastLine string
 }
 
+// LastLine returns the most recently processed Xray log line. It is safe for
+// concurrent use: Process.GetResult reads it from a different goroutine than the
+// one Xray drives Write from.
+func (lw *LogWriter) LastLine() string {
+	lw.mu.RLock()
+	defer lw.mu.RUnlock()
+	return lw.lastLine
+}
+
+func (lw *LogWriter) setLastLine(line string) {
+	lw.mu.Lock()
+	lw.lastLine = line
+	lw.mu.Unlock()
+}
+
 // Write processes and filters log output from the Xray process, handling crash detection and message filtering.
 func (lw *LogWriter) Write(m []byte) (n int, err error) {
 	// Convert the data to a string
@@ -39,7 +56,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) {
 	// Check if the message contains a crash
 	if crashRegex.MatchString(message) {
 		logger.Debug("Core crash detected:\n", message)
-		lw.lastLine = message
+		lw.setLastLine(message)
 		err1 := writeCrashReport(m)
 		if err1 != nil {
 			logger.Error("Unable to write crash report:", err1)
@@ -60,7 +77,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) {
 			if strings.Contains(msgBodyLower, "tls handshake error") ||
 				strings.Contains(msgBodyLower, "connection ends") {
 				logger.Debug("XRAY: " + msgBody)
-				lw.lastLine = ""
+				lw.setLastLine("")
 				continue
 			}
 
@@ -80,14 +97,14 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) {
 					logger.Debug("XRAY: " + msg)
 				}
 			}
-			lw.lastLine = ""
+			lw.setLastLine("")
 		} else if msg != "" {
 			msgLower := strings.ToLower(msg)
 
 			if strings.Contains(msgLower, "tls handshake error") ||
 				strings.Contains(msgLower, "connection ends") {
 				logger.Debug("XRAY: " + msg)
-				lw.lastLine = msg
+				lw.setLastLine(msg)
 				continue
 			}
 
@@ -96,7 +113,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) {
 			} else {
 				logger.Debug("XRAY: " + msg)
 			}
-			lw.lastLine = msg
+			lw.setLastLine(msg)
 		}
 	}
 

+ 36 - 0
internal/xray/log_writer_race_test.go

@@ -0,0 +1,36 @@
+package xray
+
+import (
+	"sync"
+	"testing"
+)
+
+// TestLogWriterLastLineConcurrent exercises the LogWriter from multiple
+// goroutines: Xray drives Write while another goroutine (Process.GetResult)
+// reads the last line. Run under `go test -race` this fails on an unguarded
+// lastLine field and passes once the access is serialized.
+func TestLogWriterLastLineConcurrent(t *testing.T) {
+	lw := NewLogWriter()
+	const writers, readers, iterations = 4, 4, 500
+
+	var wg sync.WaitGroup
+	wg.Add(writers + readers)
+
+	for i := 0; i < writers; i++ {
+		go func() {
+			defer wg.Done()
+			for j := 0; j < iterations; j++ {
+				_, _ = lw.Write([]byte("2024/01/01 00:00:00.000000 [Info] connection accepted"))
+			}
+		}()
+	}
+	for i := 0; i < readers; i++ {
+		go func() {
+			defer wg.Done()
+			for j := 0; j < iterations; j++ {
+				_ = lw.LastLine()
+			}
+		}()
+	}
+	wg.Wait()
+}

+ 3 - 2
internal/xray/process.go

@@ -273,10 +273,11 @@ func (p *process) GetResult() string {
 	p.mu.RLock()
 	exitErr := p.exitErr
 	p.mu.RUnlock()
-	if len(p.logWriter.lastLine) == 0 && exitErr != nil {
+	lastLine := p.logWriter.LastLine()
+	if len(lastLine) == 0 && exitErr != nil {
 		return exitErr.Error()
 	}
-	return p.logWriter.lastLine
+	return lastLine
 }
 
 // GetVersion returns the version string of the Xray process.