1
0

traffic_writer_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package service
  2. import (
  3. "sync/atomic"
  4. "testing"
  5. "time"
  6. )
  7. func TestTrafficWriterStartStopStartAcceptsWrites(t *testing.T) {
  8. resetTrafficWriterForTest(t)
  9. StartTrafficWriter()
  10. var writes atomic.Int32
  11. if err := submitTrafficWrite(func() error {
  12. writes.Add(1)
  13. return nil
  14. }); err != nil {
  15. t.Fatalf("first submitTrafficWrite: %v", err)
  16. }
  17. StopTrafficWriter()
  18. StartTrafficWriter()
  19. if err := submitTrafficWrite(func() error {
  20. writes.Add(1)
  21. return nil
  22. }); err != nil {
  23. t.Fatalf("second submitTrafficWrite: %v", err)
  24. }
  25. if got := writes.Load(); got != 2 {
  26. t.Fatalf("writes = %d, want 2", got)
  27. }
  28. }
  29. func TestTrafficWriterSubmitAfterStopRunsInline(t *testing.T) {
  30. resetTrafficWriterForTest(t)
  31. StartTrafficWriter()
  32. StopTrafficWriter()
  33. ran := make(chan struct{})
  34. errCh := make(chan error, 1)
  35. go func() {
  36. errCh <- submitTrafficWrite(func() error {
  37. close(ran)
  38. return nil
  39. })
  40. }()
  41. select {
  42. case <-ran:
  43. case <-time.After(time.Second):
  44. t.Fatal("submitTrafficWrite did not run after traffic writer stopped")
  45. }
  46. if err := waitTrafficWriterErr(t, errCh); err != nil {
  47. t.Fatalf("submitTrafficWrite after stop: %v", err)
  48. }
  49. }
  50. func TestTrafficWriterStopDrainsQueuedWrite(t *testing.T) {
  51. resetTrafficWriterForTest(t)
  52. StartTrafficWriter()
  53. firstStarted := make(chan struct{})
  54. releaseFirst := make(chan struct{})
  55. firstErr := make(chan error, 1)
  56. go func() {
  57. firstErr <- submitTrafficWrite(func() error {
  58. close(firstStarted)
  59. <-releaseFirst
  60. return nil
  61. })
  62. }()
  63. waitTrafficWriterSignal(t, firstStarted, "first write did not start")
  64. secondRan := make(chan struct{})
  65. secondErr := make(chan error, 1)
  66. go func() {
  67. secondErr <- submitTrafficWrite(func() error {
  68. close(secondRan)
  69. return nil
  70. })
  71. }()
  72. waitTrafficWriterQueued(t)
  73. stopDone := make(chan struct{})
  74. go func() {
  75. StopTrafficWriter()
  76. close(stopDone)
  77. }()
  78. select {
  79. case <-stopDone:
  80. t.Fatal("StopTrafficWriter returned before in-flight write was released")
  81. case <-time.After(50 * time.Millisecond):
  82. }
  83. close(releaseFirst)
  84. waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter did not return")
  85. waitTrafficWriterSignal(t, secondRan, "queued write was not drained")
  86. if err := waitTrafficWriterErr(t, firstErr); err != nil {
  87. t.Fatalf("first submitTrafficWrite: %v", err)
  88. }
  89. if err := waitTrafficWriterErr(t, secondErr); err != nil {
  90. t.Fatalf("second submitTrafficWrite: %v", err)
  91. }
  92. }
  93. func TestTrafficWriterConcurrentStopDuringSubmitDoesNotHang(t *testing.T) {
  94. resetTrafficWriterForTest(t)
  95. StartTrafficWriter()
  96. started := make(chan struct{})
  97. release := make(chan struct{})
  98. errCh := make(chan error, 1)
  99. go func() {
  100. errCh <- submitTrafficWrite(func() error {
  101. close(started)
  102. <-release
  103. return nil
  104. })
  105. }()
  106. waitTrafficWriterSignal(t, started, "write did not start")
  107. stopDone := make(chan struct{})
  108. go func() {
  109. StopTrafficWriter()
  110. close(stopDone)
  111. }()
  112. close(release)
  113. waitTrafficWriterSignal(t, stopDone, "StopTrafficWriter hung during submit")
  114. if err := waitTrafficWriterErr(t, errCh); err != nil {
  115. t.Fatalf("submitTrafficWrite during stop: %v", err)
  116. }
  117. }
  118. func resetTrafficWriterForTest(t *testing.T) {
  119. t.Helper()
  120. StopTrafficWriter()
  121. twMu.Lock()
  122. clearTrafficWriterState()
  123. twMu.Unlock()
  124. t.Cleanup(func() {
  125. StopTrafficWriter()
  126. twMu.Lock()
  127. clearTrafficWriterState()
  128. twMu.Unlock()
  129. })
  130. }
  131. func waitTrafficWriterQueued(t *testing.T) {
  132. t.Helper()
  133. deadline := time.Now().Add(time.Second)
  134. for time.Now().Before(deadline) {
  135. twMu.Lock()
  136. queued := 0
  137. if twQueue != nil {
  138. queued = len(twQueue)
  139. }
  140. twMu.Unlock()
  141. if queued > 0 {
  142. return
  143. }
  144. time.Sleep(10 * time.Millisecond)
  145. }
  146. t.Fatal("write was not queued")
  147. }
  148. func waitTrafficWriterSignal(t *testing.T, ch <-chan struct{}, msg string) {
  149. t.Helper()
  150. select {
  151. case <-ch:
  152. case <-time.After(time.Second):
  153. t.Fatal(msg)
  154. }
  155. }
  156. func waitTrafficWriterErr(t *testing.T, ch <-chan error) error {
  157. t.Helper()
  158. select {
  159. case err := <-ch:
  160. return err
  161. case <-time.After(time.Second):
  162. t.Fatal("timed out waiting for traffic writer result")
  163. return nil
  164. }
  165. }