|
@@ -92,15 +92,17 @@ func TestTwoNodesShareEmail_SumsCorrectly(t *testing.T) {
|
|
|
|
|
|
|
|
const email = "shared"
|
|
const email = "shared"
|
|
|
|
|
|
|
|
|
|
+ // First-ever sync from each node: the row is created at 0 and the current
|
|
|
|
|
+ // node counters become the baseline. Only deltas beyond those baselines count.
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
|
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 100, 100, "after baselines")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 0, 0, "after baselines — historical node traffic not imported")
|
|
|
|
|
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
syncNode(t, svc, 2, "n2-in", xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
|
|
|
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 210, 210, "after both nodes grow")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 110, 110, "after both nodes grow — deltas (50+60) accrue")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestSingleNode_MirrorsCorrectly(t *testing.T) {
|
|
func TestSingleNode_MirrorsCorrectly(t *testing.T) {
|
|
@@ -109,11 +111,13 @@ func TestSingleNode_MirrorsCorrectly(t *testing.T) {
|
|
|
svc := &InboundService{}
|
|
svc := &InboundService{}
|
|
|
|
|
|
|
|
const email = "solo"
|
|
const email = "solo"
|
|
|
|
|
+ // First sync: row seeded at 0, current counter becomes the baseline.
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 500, Down: 600, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 500, 600, "first sync")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync — historical traffic not imported")
|
|
|
|
|
|
|
|
|
|
+ // Second sync: delta (700-500=200 / 800-600=200) accrues normally.
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 700, Down: 800, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 700, 800, "second sync mirrors cumulative")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 200, 200, "second sync — delta accrues")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
|
|
func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
|
|
@@ -137,22 +141,52 @@ func TestUpgrade_PreExistingRow_NoDoubleCount(t *testing.T) {
|
|
|
assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
|
|
assertUpDown(t, readTraffic(t, db, email), 1100, 2100, "growth after upgrade accrues")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// TestGhostData_NoPhantomTraffic reproduces the 50 GB phantom traffic bug:
|
|
|
|
|
+// a node retains stale data for a deleted client (e.g. de-1 was offline during
|
|
|
|
|
+// deletion). When the master first syncs this email again it must NOT import the
|
|
|
|
|
+// stale counter — it sets Up=0 and records the current node value as the
|
|
|
|
|
+// baseline so only future deltas count.
|
|
|
|
|
+func TestGhostData_NoPhantomTraffic(t *testing.T) {
|
|
|
|
|
+ db := initTrafficTestDB(t)
|
|
|
|
|
+ createNodeInbound(t, db, 1, "de1-in", 41001)
|
|
|
|
|
+ svc := &InboundService{}
|
|
|
|
|
+
|
|
|
|
|
+ const email = "pouya"
|
|
|
|
|
+ const staleBytes int64 = 50 * 1024 * 1024 * 1024 // 50 GB stale on de-1
|
|
|
|
|
+
|
|
|
|
|
+ // Node reports a client with a large pre-existing counter (ghost data from a
|
|
|
|
|
+ // deleted account that survived on the node). Master has no row for this email.
|
|
|
|
|
+ syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes, Down: staleBytes, Enable: true})
|
|
|
|
|
+
|
|
|
|
|
+ ct := readTraffic(t, db, email)
|
|
|
|
|
+ if ct.Up >= staleBytes || ct.Down >= staleBytes {
|
|
|
|
|
+ t.Errorf("ghost data imported: up=%d down=%d; stale node counter must not count as new traffic", ct.Up, ct.Down)
|
|
|
|
|
+ }
|
|
|
|
|
+ assertUpDown(t, ct, 0, 0, "first sync of ghost email — imported as 0, not 50 GB")
|
|
|
|
|
+
|
|
|
|
|
+ // Subsequent syncs only add incremental usage beyond the baseline.
|
|
|
|
|
+ syncNode(t, svc, 1, "de1-in", xray.ClientTraffic{Email: email, Up: staleBytes + 1024, Down: staleBytes + 2048, Enable: true})
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 1024, 2048, "only incremental traffic beyond baseline counts")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func TestNodeCounterReset_Clamped(t *testing.T) {
|
|
func TestNodeCounterReset_Clamped(t *testing.T) {
|
|
|
db := initTrafficTestDB(t)
|
|
db := initTrafficTestDB(t)
|
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
createNodeInbound(t, db, 1, "n1-in", 41001)
|
|
|
svc := &InboundService{}
|
|
svc := &InboundService{}
|
|
|
|
|
|
|
|
const email = "restart"
|
|
const email = "restart"
|
|
|
|
|
+ // First sync seeds at 0 (baseline=900). Second sync adds delta 950-900=50.
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 900, Down: 900, Enable: true})
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 950, Down: 950, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 950, 950, "before node reset")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 50, 50, "before node reset")
|
|
|
|
|
|
|
|
|
|
+ // Counter resets to 50 (Xray restart). delta=50-950=-900 → clamped → adds 50.
|
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
|
|
syncNode(t, svc, 1, "n1-in", xray.ClientTraffic{Email: email, Up: 50, Down: 50, Enable: true})
|
|
|
ct := readTraffic(t, db, email)
|
|
ct := readTraffic(t, db, email)
|
|
|
if ct.Up < 0 || ct.Down < 0 {
|
|
if ct.Up < 0 || ct.Down < 0 {
|
|
|
t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
|
|
t.Fatalf("row went negative after node reset: up=%d down=%d", ct.Up, ct.Down)
|
|
|
}
|
|
}
|
|
|
- assertUpDown(t, ct, 1000, 1000, "after node counter reset (clamped)")
|
|
|
|
|
|
|
+ assertUpDown(t, ct, 100, 100, "after node counter reset (clamped)")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestCentralReset_NoReAdd(t *testing.T) {
|
|
func TestCentralReset_NoReAdd(t *testing.T) {
|
|
@@ -186,11 +220,13 @@ func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
|
|
|
|
|
|
|
|
const email = "shared"
|
|
const email = "shared"
|
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
|
|
|
+ // Baselines set: node1=100, node2=200. Row seeded at 0.
|
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
|
+ // node1 delta=50, node2 delta=60 → total=110.
|
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 150, Down: 150, Enable: true})
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 260, Down: 260, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 210, 210, "baseline sum")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 110, 110, "baseline sum")
|
|
|
|
|
|
|
|
// Node 1 rebuilt (reinstall / another master's reconcile): its inbound
|
|
// Node 1 rebuilt (reinstall / another master's reconcile): its inbound
|
|
|
// vanishes from the snapshot. The shared accumulator must survive — losing
|
|
// vanishes from the snapshot. The shared accumulator must survive — losing
|
|
@@ -199,11 +235,11 @@ func TestInboundRemoval_KeepsSharedEmailRow(t *testing.T) {
|
|
|
if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
|
|
if _, err := svc.setRemoteTrafficLocked(1, &runtime.TrafficSnapshot{}, false); err != nil {
|
|
|
t.Fatalf("sync node 1 with empty snapshot: %v", err)
|
|
t.Fatalf("sync node 1 with empty snapshot: %v", err)
|
|
|
}
|
|
}
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 210, 210, "after node 1 inbound removal")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 110, 110, "after node 1 inbound removal")
|
|
|
|
|
|
|
|
- // Node 2 keeps accruing onto the surviving row.
|
|
|
|
|
|
|
+ // Node 2 keeps accruing onto the surviving row. node2 delta=300-260=40 → 150.
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 300, Down: 300, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 250, 250, "after node 2 grows")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 150, 150, "after node 2 grows")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
|
|
func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
|
|
@@ -214,16 +250,18 @@ func TestClientGoneFromOneNode_KeepsSharedEmailRow(t *testing.T) {
|
|
|
|
|
|
|
|
const email = "shared"
|
|
const email = "shared"
|
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
settings := fmt.Sprintf(`{"clients": [{"email": %q, "enable": true}]}`, email)
|
|
|
|
|
+ // First syncs set baselines (node1=100, node2=200). Row seeded at 0.
|
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", settings, xray.ClientTraffic{Email: email, Up: 100, Down: 100, Enable: true})
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 200, Down: 200, Enable: true})
|
|
|
|
|
|
|
|
// Client detached from node 1's inbound only: its stats vanish from that
|
|
// Client detached from node 1's inbound only: its stats vanish from that
|
|
|
// inbound's snapshot while node 2 still hosts the email.
|
|
// inbound's snapshot while node 2 still hosts the email.
|
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
|
|
syncNodeWithSettings(t, svc, 1, "n1-in", `{"clients": []}`)
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 100, 100, "after client left node 1")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 0, 0, "after client left node 1 — row unchanged at 0")
|
|
|
|
|
|
|
|
|
|
+ // Node 2 delta: 240-200=40 → accrues to 40.
|
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
|
|
syncNodeWithSettings(t, svc, 2, "n2-in", settings, xray.ClientTraffic{Email: email, Up: 240, Down: 240, Enable: true})
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 140, 140, "node 2 keeps accruing")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 40, 40, "node 2 keeps accruing")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
|
|
// TestStatsUnderSiblingInbound_KeepsNodeBaseline reproduces the recurring
|
|
@@ -306,15 +344,21 @@ func TestMultiAttach_SameNode_DivergentSiblings(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // First-ever sync: row seeded at 0, canon (max of siblings = 100) becomes the
|
|
|
|
|
+ // baseline. The node total is NOT imported as historical traffic — this prevents
|
|
|
|
|
+ // ghost data from a previously-deleted account being counted as real usage.
|
|
|
sync(100, 50, 80)
|
|
sync(100, 50, 80)
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 100, 100, "first sync counts the node total once, not the sum")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 0, 0, "first sync seeds at 0 — not the sum (230) nor the max (100)")
|
|
|
|
|
|
|
|
|
|
+ // Second sync: canon grew from 100→150, delta=50 accrues. Siblings 60 and 90
|
|
|
|
|
+ // do not re-add their full values — the canon clamp prevents inflation.
|
|
|
sync(150, 60, 90)
|
|
sync(150, 60, 90)
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 150, 150, "second sync: grew by 50, not by every sibling")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 50, 50, "second sync: only the 50-unit increment counts, not every sibling")
|
|
|
|
|
|
|
|
- // Equal siblings (the healthy current-schema case) must still accrue once.
|
|
|
|
|
|
|
+ // Equal siblings (the healthy current-schema case): canon grew from 150→200,
|
|
|
|
|
+ // delta=50 accrues once.
|
|
|
sync(200, 200, 200)
|
|
sync(200, 200, 200)
|
|
|
- assertUpDown(t, readTraffic(t, db, email), 200, 200, "equal siblings accrue the single increment")
|
|
|
|
|
|
|
+ assertUpDown(t, readTraffic(t, db, email), 100, 100, "equal siblings accrue the single increment")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
|
|
func TestDelClientStat_CleansNodeBaselines(t *testing.T) {
|