reconcile_skip_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package runtime
  2. import (
  3. "context"
  4. "net/http"
  5. "net/http/httptest"
  6. "strings"
  7. "sync/atomic"
  8. "testing"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. )
  11. // TestReconcileInbound_SkipsUnchanged proves the delta-skip: a second reconcile
  12. // of an unchanged inbound that the node still reports sends no push, while a
  13. // content change or an absent-on-node inbound forces a fresh push.
  14. func TestReconcileInbound_SkipsUnchanged(t *testing.T) {
  15. var pushes atomic.Int32
  16. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  17. if r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/panel/api/inbounds/update/") {
  18. pushes.Add(1)
  19. }
  20. w.Header().Set("Content-Type", "application/json")
  21. _, _ = w.Write([]byte(`{"success":true}`))
  22. }))
  23. defer srv.Close()
  24. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  25. ib := &model.Inbound{Tag: "in-1", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  26. // Pre-seed the tag→id cache so resolveRemoteID needs no network round-trip.
  27. r.cacheSet(ib.Tag, 7)
  28. // First reconcile: node doesn't report it yet → must push and record the fp.
  29. if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed {
  30. t.Fatalf("first reconcile: pushed=%v err=%v, want push", pushed, err)
  31. }
  32. if got := pushes.Load(); got != 1 {
  33. t.Fatalf("after first reconcile pushes=%d, want 1", got)
  34. }
  35. // Second reconcile: unchanged and present on node → skip.
  36. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || pushed {
  37. t.Fatalf("second reconcile: pushed=%v err=%v, want skip", pushed, err)
  38. }
  39. if got := pushes.Load(); got != 1 {
  40. t.Fatalf("unchanged reconcile pushed again: pushes=%d, want 1", got)
  41. }
  42. // Content change → push again even though it's present on node.
  43. ib.Settings = `{"clients":[{"email":"a@x"}]}`
  44. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
  45. t.Fatalf("changed reconcile: pushed=%v err=%v, want push", pushed, err)
  46. }
  47. if got := pushes.Load(); got != 2 {
  48. t.Fatalf("changed reconcile pushes=%d, want 2", got)
  49. }
  50. // Absent on node (e.g. node restarted/lost it) → re-push even if fp matches.
  51. if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed {
  52. t.Fatalf("absent-on-node reconcile: pushed=%v err=%v, want push", pushed, err)
  53. }
  54. if got := pushes.Load(); got != 3 {
  55. t.Fatalf("absent-on-node reconcile pushes=%d, want 3", got)
  56. }
  57. }
  58. type nodeCallCounts struct {
  59. adds atomic.Int32
  60. inboundUpdates atomic.Int32
  61. clientMutations atomic.Int32
  62. }
  63. func newCountingNodeServer(t *testing.T, clientsResp string) (*httptest.Server, *nodeCallCounts) {
  64. t.Helper()
  65. counts := &nodeCallCounts{}
  66. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  67. w.Header().Set("Content-Type", "application/json")
  68. switch {
  69. case strings.Contains(r.URL.Path, "/panel/api/inbounds/add"):
  70. counts.adds.Add(1)
  71. case strings.Contains(r.URL.Path, "/panel/api/inbounds/update/"):
  72. counts.inboundUpdates.Add(1)
  73. case strings.Contains(r.URL.Path, "/panel/api/clients/"):
  74. counts.clientMutations.Add(1)
  75. _, _ = w.Write([]byte(clientsResp))
  76. return
  77. }
  78. _, _ = w.Write([]byte(`{"success":true}`))
  79. }))
  80. t.Cleanup(srv.Close)
  81. return srv, counts
  82. }
  83. func perClientMutationCases(client model.Client) []struct {
  84. name string
  85. run func(*Remote, *model.Inbound) error
  86. } {
  87. return []struct {
  88. name string
  89. run func(*Remote, *model.Inbound) error
  90. }{
  91. {"add", func(r *Remote, ib *model.Inbound) error {
  92. return r.AddClient(context.Background(), ib, client)
  93. }},
  94. {"delete", func(r *Remote, ib *model.Inbound) error {
  95. return r.DeleteUser(context.Background(), ib, client.Email)
  96. }},
  97. {"update", func(r *Remote, ib *model.Inbound) error {
  98. return r.UpdateUser(context.Background(), ib, client.Email, client)
  99. }},
  100. }
  101. }
  102. // TestPerClientMutationsAloneDoNotSeedReconcileFingerprint: a per-client RPC
  103. // only proves one client's slice converged, so without an explicit advance the
  104. // dirty-reconcile backup must still send the full inbound.
  105. func TestPerClientMutationsAloneDoNotSeedReconcileFingerprint(t *testing.T) {
  106. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  107. client := model.Client{ID: "11111111-1111-1111-1111-111111111111", Email: "a@x", SubID: "s", Enable: true}
  108. for _, tt := range perClientMutationCases(client) {
  109. t.Run(tt.name, func(t *testing.T) {
  110. counts.inboundUpdates.Store(0)
  111. counts.clientMutations.Store(0)
  112. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  113. ib := &model.Inbound{Tag: "in-" + tt.name, Protocol: model.VLESS, Port: 443, Settings: `{"clients":[{"id":"11111111-1111-1111-1111-111111111111","email":"a@x","subId":"s","enable":true}]}`}
  114. r.cacheSet(ib.Tag, 7)
  115. if err := tt.run(r, ib); err != nil {
  116. t.Fatalf("%s client mutation: %v", tt.name, err)
  117. }
  118. if got := counts.clientMutations.Load(); got != 1 {
  119. t.Fatalf("%s client mutation requests=%d, want 1", tt.name, got)
  120. }
  121. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
  122. t.Fatalf("%s reconcile after unadvanced mutation: pushed=%v err=%v, want full push", tt.name, pushed, err)
  123. }
  124. if got := counts.inboundUpdates.Load(); got != 1 {
  125. t.Fatalf("%s reconcile sent %d full inbound updates, want 1", tt.name, got)
  126. }
  127. })
  128. }
  129. }
  130. // TestAdvancePushedInboundEnablesReconcileSkip: when the node provably held the
  131. // pre-edit payload and every per-client push succeeded, advancing the
  132. // fingerprint lets the next reconcile skip the redundant full push.
  133. func TestAdvancePushedInboundEnablesReconcileSkip(t *testing.T) {
  134. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  135. client := model.Client{ID: "11111111-1111-1111-1111-111111111111", Email: "a@x", SubID: "s", Enable: true}
  136. prevByOp := map[string]string{
  137. "add": `{"clients":[]}`,
  138. "delete": `{"clients":[{"email":"a@x","enable":true}]}`,
  139. "update": `{"clients":[{"email":"a@x","enable":false}]}`,
  140. }
  141. newByOp := map[string]string{
  142. "add": `{"clients":[{"email":"a@x","enable":true}]}`,
  143. "delete": `{"clients":[]}`,
  144. "update": `{"clients":[{"email":"a@x","enable":true}]}`,
  145. }
  146. for _, tt := range perClientMutationCases(client) {
  147. t.Run(tt.name, func(t *testing.T) {
  148. counts.inboundUpdates.Store(0)
  149. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  150. prevIb := &model.Inbound{Tag: "in-adv-" + tt.name, Protocol: model.VLESS, Port: 443, Settings: prevByOp[tt.name]}
  151. ib := &model.Inbound{Tag: prevIb.Tag, Protocol: model.VLESS, Port: 443, Settings: newByOp[tt.name]}
  152. r.cacheSet(ib.Tag, 7)
  153. r.recordPushedInbound(prevIb)
  154. if err := tt.run(r, ib); err != nil {
  155. t.Fatalf("%s client mutation: %v", tt.name, err)
  156. }
  157. r.AdvancePushedInbound(prevIb, ib)
  158. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || pushed {
  159. t.Fatalf("%s reconcile after advance: pushed=%v err=%v, want skip", tt.name, pushed, err)
  160. }
  161. if got := counts.inboundUpdates.Load(); got != 0 {
  162. t.Fatalf("%s reconcile sent %d full inbound updates, want 0", tt.name, got)
  163. }
  164. })
  165. }
  166. }
  167. // TestAdvancePushedInboundRequiresMatchingPreviousFingerprint: if changes were
  168. // folded to dirty (or an earlier push failed), the recorded fingerprint no
  169. // longer matches the pre-edit payload; a later successful client push must not
  170. // mask the pending reconcile.
  171. func TestAdvancePushedInboundRequiresMatchingPreviousFingerprint(t *testing.T) {
  172. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  173. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  174. staleIb := &model.Inbound{Tag: "in-stale", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[{"email":"folded@x"}]}`}
  175. prevIb := &model.Inbound{Tag: "in-stale", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  176. ib := &model.Inbound{Tag: "in-stale", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[{"email":"a@x"}]}`}
  177. r.cacheSet(ib.Tag, 7)
  178. r.recordPushedInbound(staleIb)
  179. if err := r.UpdateUser(context.Background(), ib, "a@x", model.Client{Email: "a@x"}); err != nil {
  180. t.Fatalf("client mutation: %v", err)
  181. }
  182. r.AdvancePushedInbound(prevIb, ib)
  183. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
  184. t.Fatalf("reconcile with unproven pre-state: pushed=%v err=%v, want full push", pushed, err)
  185. }
  186. if got := counts.inboundUpdates.Load(); got != 1 {
  187. t.Fatalf("reconcile sent %d full inbound updates, want 1", got)
  188. }
  189. }
  190. // TestAdoptedSerializationChainKeepsReconcileSkip: after a push the node
  191. // re-serializes settings its own way and the master adopts that form back into
  192. // its DB; stamping the adopted payload keeps edit->advance->skip alive instead
  193. // of degrading every edit to a full reconcile push.
  194. func TestAdoptedSerializationChainKeepsReconcileSkip(t *testing.T) {
  195. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  196. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  197. pushForm := &model.Inbound{Tag: "in-adopt", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[{"email":"a@x"}]}`}
  198. adoptedForm := &model.Inbound{Tag: "in-adopt", Protocol: model.VLESS, Port: 443, Settings: "{\n \"clients\": [{\"email\": \"a@x\"}]\n}"}
  199. edited := &model.Inbound{Tag: "in-adopt", Protocol: model.VLESS, Port: 443, Settings: "{\n \"clients\": [{\"comment\": \"c\", \"email\": \"a@x\"}]\n}"}
  200. r.cacheSet(pushForm.Tag, 7)
  201. r.recordPushedInbound(pushForm)
  202. r.RecordAdoptedInbound(adoptedForm)
  203. if err := r.UpdateUser(context.Background(), edited, "a@x", model.Client{Email: "a@x"}); err != nil {
  204. t.Fatalf("client mutation: %v", err)
  205. }
  206. r.AdvancePushedInbound(adoptedForm, edited)
  207. if pushed, err := r.ReconcileInbound(context.Background(), edited, true); err != nil || pushed {
  208. t.Fatalf("reconcile after adopted-form advance: pushed=%v err=%v, want skip", pushed, err)
  209. }
  210. if got := counts.inboundUpdates.Load(); got != 0 {
  211. t.Fatalf("full inbound updates=%d, want 0", got)
  212. }
  213. }
  214. func TestDeleteUserNotFoundHandling(t *testing.T) {
  215. t.Run("envelope not-found counts as already deleted", func(t *testing.T) {
  216. srv, counts := newCountingNodeServer(t, `{"success":false,"msg":"client not found"}`)
  217. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  218. ib := &model.Inbound{Tag: "in-delete-missing", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  219. r.cacheSet(ib.Tag, 7)
  220. if err := r.DeleteUser(context.Background(), ib, "missing@x"); err != nil {
  221. t.Fatalf("DeleteUser missing client: %v", err)
  222. }
  223. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
  224. t.Fatalf("reconcile after missing-client delete: pushed=%v err=%v, want full push", pushed, err)
  225. }
  226. if got := counts.inboundUpdates.Load(); got != 1 {
  227. t.Fatalf("reconcile sent %d full inbound updates, want 1", got)
  228. }
  229. })
  230. t.Run("http 404 from an old node stays an error", func(t *testing.T) {
  231. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  232. if strings.Contains(r.URL.Path, "/panel/api/clients/") {
  233. http.Error(w, "404 page not found", http.StatusNotFound)
  234. return
  235. }
  236. w.Header().Set("Content-Type", "application/json")
  237. _, _ = w.Write([]byte(`{"success":true}`))
  238. }))
  239. defer srv.Close()
  240. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  241. ib := &model.Inbound{Tag: "in-delete-404", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  242. r.cacheSet(ib.Tag, 7)
  243. err := r.DeleteUser(context.Background(), ib, "missing@x")
  244. if err == nil || !strings.Contains(err.Error(), "HTTP 404") {
  245. t.Fatalf("DeleteUser against old node = %v, want HTTP 404 error", err)
  246. }
  247. })
  248. }
  249. // TestDelInboundDropsReconcileFingerprint: deleting an inbound must forget its
  250. // fingerprint so a later same-tag inbound with identical content is re-pushed.
  251. func TestDelInboundDropsReconcileFingerprint(t *testing.T) {
  252. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  253. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  254. ib := &model.Inbound{Tag: "in-del", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  255. r.cacheSet(ib.Tag, 7)
  256. if pushed, err := r.ReconcileInbound(context.Background(), ib, false); err != nil || !pushed {
  257. t.Fatalf("initial reconcile: pushed=%v err=%v, want push", pushed, err)
  258. }
  259. if err := r.DelInbound(context.Background(), ib); err != nil {
  260. t.Fatalf("DelInbound: %v", err)
  261. }
  262. r.cacheSet(ib.Tag, 7)
  263. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || !pushed {
  264. t.Fatalf("reconcile after DelInbound: pushed=%v err=%v, want full push", pushed, err)
  265. }
  266. if got := counts.inboundUpdates.Load(); got != 2 {
  267. t.Fatalf("full inbound updates=%d, want 2", got)
  268. }
  269. }
  270. func TestUpdateInboundFallbackAddSeedsReconcileFingerprint(t *testing.T) {
  271. srv, counts := newCountingNodeServer(t, `{"success":true}`)
  272. r := NewRemote(nodeForPlainServer(t, srv, "verify", "tok"), nil)
  273. ib := &model.Inbound{Tag: "in-add-fallback", Protocol: model.VLESS, Port: 443, Settings: `{"clients":[]}`}
  274. if err := r.UpdateInbound(context.Background(), ib, ib); err != nil {
  275. t.Fatalf("UpdateInbound fallback add: %v", err)
  276. }
  277. if got := counts.adds.Load(); got != 1 {
  278. t.Fatalf("fallback add requests=%d, want 1", got)
  279. }
  280. if pushed, err := r.ReconcileInbound(context.Background(), ib, true); err != nil || pushed {
  281. t.Fatalf("reconcile after fallback add: pushed=%v err=%v, want skip", pushed, err)
  282. }
  283. if got := counts.inboundUpdates.Load(); got != 0 {
  284. t.Fatalf("reconcile sent %d full inbound updates, want 0", got)
  285. }
  286. }