1
0

ws_payload_scale_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "testing"
  6. "time"
  7. )
  8. // TestWsPayloadScale compares the websocket broadcast payload paths the
  9. // traffic job runs every 5s while a browser is connected: the full snapshot
  10. // (GetAllClientTraffics + full last-online map) against the delta variant
  11. // (GetActiveClientTraffics on this poll's active emails). Payload sizes are
  12. // logged against the hub's 10MB drop threshold.
  13. func TestWsPayloadScale(t *testing.T) {
  14. setupScaleDB(t)
  15. svc := &InboundService{}
  16. const hubPayloadLimit = 10 * 1024 * 1024
  17. for _, n := range scaleSizes(t, 10000, 100000) {
  18. t.Run(fmt.Sprintf("N=%d", n), func(t *testing.T) {
  19. ds := seedScaleDataset(t, n, 1)
  20. k := min(10000, n)
  21. activeEmails := sampleEmails(ds.emails, k)
  22. start := time.Now()
  23. all, err := svc.GetAllClientTraffics()
  24. if err != nil {
  25. t.Fatalf("GetAllClientTraffics: %v", err)
  26. }
  27. fetchAll := time.Since(start)
  28. if len(all) != n {
  29. t.Fatalf("GetAllClientTraffics rows = %d, want %d", len(all), n)
  30. }
  31. start = time.Now()
  32. snapshot, err := json.Marshal(map[string]any{"clients": all})
  33. if err != nil {
  34. t.Fatalf("marshal snapshot: %v", err)
  35. }
  36. marshalAll := time.Since(start)
  37. verdict := "delivered"
  38. if len(snapshot) > hubPayloadLimit {
  39. verdict = "DROPPED by hub (>10MB)"
  40. }
  41. t.Logf("N=%-7d snapshot: fetch=%-9v marshal=%-9v payload=%.1fMB %s",
  42. n, fetchAll.Round(time.Millisecond), marshalAll.Round(time.Millisecond),
  43. float64(len(snapshot))/(1<<20), verdict)
  44. start = time.Now()
  45. active, err := svc.GetActiveClientTraffics(activeEmails)
  46. if err != nil {
  47. t.Fatalf("GetActiveClientTraffics: %v", err)
  48. }
  49. fetchActive := time.Since(start)
  50. if len(active) != k {
  51. t.Fatalf("GetActiveClientTraffics rows = %d, want %d", len(active), k)
  52. }
  53. start = time.Now()
  54. delta, err := json.Marshal(map[string]any{"clients": active})
  55. if err != nil {
  56. t.Fatalf("marshal delta: %v", err)
  57. }
  58. marshalActive := time.Since(start)
  59. t.Logf("N=%-7d delta(K=%d): fetch=%-9v marshal=%-9v payload=%.1fMB",
  60. n, k, fetchActive.Round(time.Millisecond), marshalActive.Round(time.Millisecond),
  61. float64(len(delta))/(1<<20))
  62. start = time.Now()
  63. lastOnline, err := svc.GetClientsLastOnline()
  64. if err != nil {
  65. t.Fatalf("GetClientsLastOnline: %v", err)
  66. }
  67. fullMap := time.Since(start)
  68. if len(lastOnline) != n {
  69. t.Fatalf("GetClientsLastOnline entries = %d, want %d", len(lastOnline), n)
  70. }
  71. start = time.Now()
  72. activeLastOnline := make(map[string]int64, len(active))
  73. for _, ct := range active {
  74. activeLastOnline[ct.Email] = ct.LastOnline
  75. }
  76. activeMap := time.Since(start)
  77. t.Logf("N=%-7d lastOnline: fullMap=%-9v activeMap(K=%d)=%v",
  78. n, fullMap.Round(time.Millisecond), k, activeMap.Round(time.Microsecond))
  79. start = time.Now()
  80. if _, err := svc.GetInboundsTrafficSummary(); err != nil {
  81. t.Fatalf("GetInboundsTrafficSummary: %v", err)
  82. }
  83. t.Logf("N=%-7d inboundsSummary=%v", n, time.Since(start).Round(time.Millisecond))
  84. })
  85. }
  86. }