|
|
@@ -21,39 +21,64 @@ type trafficWriteRequest struct {
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
+ twMu sync.Mutex
|
|
|
twQueue chan *trafficWriteRequest
|
|
|
- twCtx context.Context
|
|
|
twCancel context.CancelFunc
|
|
|
twDone chan struct{}
|
|
|
- twOnce sync.Once
|
|
|
)
|
|
|
|
|
|
+// StartTrafficWriter spins up the serial writer goroutine. Safe to call again
|
|
|
+// after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The
|
|
|
+// previous sync.Once-based implementation deadlocked after a SIGHUP-driven
|
|
|
+// panel restart: Stop killed the consumer goroutine but Once prevented Start
|
|
|
+// from spawning a new one, so every later submitTrafficWrite blocked forever
|
|
|
+// on <-req.done with no consumer (including the AddTraffic call inside
|
|
|
+// XrayService.GetXrayConfig that runs from startTask).
|
|
|
func StartTrafficWriter() {
|
|
|
- twOnce.Do(func() {
|
|
|
- twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize)
|
|
|
- twCtx, twCancel = context.WithCancel(context.Background())
|
|
|
- twDone = make(chan struct{})
|
|
|
- go runTrafficWriter()
|
|
|
- })
|
|
|
+ twMu.Lock()
|
|
|
+ defer twMu.Unlock()
|
|
|
+ if twQueue != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ done := make(chan struct{})
|
|
|
+ twQueue = queue
|
|
|
+ twCancel = cancel
|
|
|
+ twDone = done
|
|
|
+ go runTrafficWriter(queue, ctx, done)
|
|
|
}
|
|
|
|
|
|
+// StopTrafficWriter cancels the writer context and waits for the goroutine to
|
|
|
+// drain any pending requests before returning. Resets the package state so a
|
|
|
+// subsequent StartTrafficWriter can spawn a fresh consumer.
|
|
|
func StopTrafficWriter() {
|
|
|
- if twCancel != nil {
|
|
|
- twCancel()
|
|
|
- <-twDone
|
|
|
+ twMu.Lock()
|
|
|
+ cancel := twCancel
|
|
|
+ done := twDone
|
|
|
+ twQueue = nil
|
|
|
+ twCancel = nil
|
|
|
+ twDone = nil
|
|
|
+ twMu.Unlock()
|
|
|
+
|
|
|
+ if cancel != nil {
|
|
|
+ cancel()
|
|
|
+ }
|
|
|
+ if done != nil {
|
|
|
+ <-done
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func runTrafficWriter() {
|
|
|
- defer close(twDone)
|
|
|
+func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) {
|
|
|
+ defer close(done)
|
|
|
for {
|
|
|
select {
|
|
|
- case req := <-twQueue:
|
|
|
+ case req := <-queue:
|
|
|
req.done <- safeApply(req.apply)
|
|
|
- case <-twCtx.Done():
|
|
|
+ case <-ctx.Done():
|
|
|
for {
|
|
|
select {
|
|
|
- case req := <-twQueue:
|
|
|
+ case req := <-queue:
|
|
|
req.done <- safeApply(req.apply)
|
|
|
default:
|
|
|
return
|
|
|
@@ -74,12 +99,16 @@ func safeApply(fn func() error) (err error) {
|
|
|
}
|
|
|
|
|
|
func submitTrafficWrite(fn func() error) error {
|
|
|
- if twQueue == nil {
|
|
|
+ twMu.Lock()
|
|
|
+ queue := twQueue
|
|
|
+ twMu.Unlock()
|
|
|
+
|
|
|
+ if queue == nil {
|
|
|
return safeApply(fn)
|
|
|
}
|
|
|
req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
|
|
|
select {
|
|
|
- case twQueue <- req:
|
|
|
+ case queue <- req:
|
|
|
case <-time.After(trafficWriterSubmitTimeout):
|
|
|
return errors.New("traffic writer queue full")
|
|
|
}
|