Explorar el Código

feat: synchronize access.log client IPs across nodes (#5098)

* feat: synchronize access.log client IPs across nodes for global fail2ban limits

* fix(nodes): harden cross-node client-IP merge for cluster fail2ban

MergeInboundClientIps inserted new rows with the remote node's primary key,
which collides with the independently auto-incremented local id and rolled
back the whole sync batch — breaking exactly the node-only clients the
feature targets. It also never evicted stale IPs, so the 30-minute cutoff
was defeated cluster-wide (the master pushed its unpruned table back to
nodes, which re-added IPs they had just pruned) and the blobs grew unbounded.

- drop the remote id on create (Id=0) and guard the email-unique race with
  ON CONFLICT DO NOTHING; also fixes a latent Postgres sequence collision
- apply the same 30-minute stale cutoff inside the merge and skip creating
  node-only rows whose IPs are all stale
- throttle the IP fetch/merge/push to ~10s (data only refreshes every 10s)
  instead of running on every 5s traffic tick, cutting SQLite write churn
- log the load error on the push path and tidy the merge response message
- add unit tests for the merge (remote-id, dedup, stale-drop, skips)

---------

Co-authored-by: Rqzbeh <[email protected]>
Co-authored-by: Sanaei <[email protected]>
Rouzbeh† hace 6 horas
padre
commit
9f31d7d056

+ 73 - 0
frontend/public/openapi.json

@@ -4060,6 +4060,79 @@
         }
       }
     },
+    "/panel/api/server/clientIps": {
+      "get": {
+        "tags": [
+          "Server"
+        ],
+        "summary": "Fetch the fully aggregated inbound_client_ips database table. Used by nodes to sync recently active IPs across the cluster.",
+        "operationId": "get_panel_api_server_clientIps",
+        "responses": {
+          "200": {
+            "description": "Successful response",
+            "content": {
+              "application/json": {
+                "schema": {
+                  "type": "object",
+                  "properties": {
+                    "success": {
+                      "type": "boolean"
+                    },
+                    "msg": {
+                      "type": "string"
+                    },
+                    "obj": {
+                      "type": "array",
+                      "items": {
+                        "$ref": "#/components/schemas/InboundClientIps"
+                      }
+                    }
+                  }
+                },
+                "example": {
+                  "success": true,
+                  "obj": [
+                    {
+                      "clientEmail": "",
+                      "id": 0,
+                      "ips": null
+                    }
+                  ]
+                }
+              }
+            }
+          }
+        }
+      },
+      "post": {
+        "tags": [
+          "Server"
+        ],
+        "summary": "Submit a list of recently active IP timestamps. The panel merges them with the existing database to maintain a unified global IP-limit view.",
+        "operationId": "post_panel_api_server_clientIps",
+        "responses": {
+          "200": {
+            "description": "Successful response",
+            "content": {
+              "application/json": {
+                "schema": {
+                  "type": "object",
+                  "properties": {
+                    "success": {
+                      "type": "boolean"
+                    },
+                    "msg": {
+                      "type": "string"
+                    },
+                    "obj": {}
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    },
     "/panel/api/clients/list": {
       "get": {
         "tags": [

+ 15 - 0
frontend/src/pages/api-docs/endpoints.ts

@@ -442,6 +442,21 @@ export const sections: readonly Section[] = [
         body: 'sni=example.com',
         response: '{\n  "success": true,\n  "obj": {\n    "echKeySet": "...",\n    "echServerKeys": [...],\n    "echConfigList": "..."\n  }\n}',
       },
+      {
+        method: 'GET',
+        path: '/panel/api/server/clientIps',
+        summary: 'Fetch the fully aggregated inbound_client_ips database table. Used by nodes to sync recently active IPs across the cluster.',
+        responseSchema: 'InboundClientIps',
+        responseSchemaArray: true,
+      },
+      {
+        method: 'POST',
+        path: '/panel/api/server/clientIps',
+        summary: 'Submit a list of recently active IP timestamps. The panel merges them with the existing database to maintain a unified global IP-limit view.',
+        params: [
+          { name: 'ips', in: 'body (json)', type: 'object[]', desc: 'Array of InboundClientIps to merge.' },
+        ],
+      },
     ],
   },
 

+ 18 - 0
web/controller/server.go

@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/mhsanaei/3x-ui/v3/database"
+	"github.com/mhsanaei/3x-ui/v3/database/model"
 	"github.com/mhsanaei/3x-ui/v3/logger"
 	"github.com/mhsanaei/3x-ui/v3/web/entity"
 	"github.com/mhsanaei/3x-ui/v3/web/global"
@@ -61,6 +62,7 @@ func (a *ServerController) initRouter(g *gin.RouterGroup) {
 	g.GET("/getNewmldsa65", a.getNewmldsa65)
 	g.GET("/getNewmlkem768", a.getNewmlkem768)
 	g.GET("/getNewVlessEnc", a.getNewVlessEnc)
+	g.GET("/clientIps", a.getClientIps)
 
 	g.POST("/stopXrayService", a.stopXrayService)
 	g.POST("/restartXrayService", a.restartXrayService)
@@ -72,6 +74,7 @@ func (a *ServerController) initRouter(g *gin.RouterGroup) {
 	g.POST("/xraylogs/:count", a.getXrayLogs)
 	g.POST("/importDB", a.importDB)
 	g.POST("/getNewEchCert", a.getNewEchCert)
+	g.POST("/clientIps", a.setClientIps)
 }
 
 // startTask registers the @2s ticker that refreshes server status, samples
@@ -420,3 +423,18 @@ func (a *ServerController) getNewmlkem768(c *gin.Context) {
 	}
 	jsonObj(c, out, nil)
 }
+
+func (a *ServerController) getClientIps(c *gin.Context) {
+	ips, err := (&service.InboundService{}).GetAllInboundClientIps()
+	jsonObj(c, ips, err)
+}
+
+func (a *ServerController) setClientIps(c *gin.Context) {
+	var ips []model.InboundClientIps
+	if err := c.ShouldBindJSON(&ips); err != nil {
+		jsonMsg(c, "invalid data", err)
+		return
+	}
+	err := (&service.InboundService{}).MergeInboundClientIps(ips)
+	jsonMsg(c, "Client IPs merged", err)
+}

+ 5 - 1
web/job/check_client_ip_job.go

@@ -242,9 +242,13 @@ func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]in
 func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) {
 	live = make([]IPWithTimestamp, 0, len(observedThisScan))
 	historical = make([]IPWithTimestamp, 0, len(ipMap))
+	now := time.Now().Unix()
 	for ip, ts := range ipMap {
 		entry := IPWithTimestamp{IP: ip, Timestamp: ts}
-		if observedThisScan[ip] {
+		// Consider an IP "live" if it was seen locally in this scan, OR if its
+		// timestamp from the synced database is very recent (e.g. within 2 minutes).
+		// This ensures cluster-wide limits work even if the IP was seen on another node.
+		if observedThisScan[ip] || now-ts < 120 {
 			live = append(live, entry)
 		} else {
 			historical = append(historical, entry)

+ 21 - 0
web/job/check_client_ip_job_test.go

@@ -6,6 +6,7 @@ import (
 	"reflect"
 	"runtime"
 	"testing"
+	"time"
 )
 
 func TestMergeClientIps_EvictsStaleOldEntries(t *testing.T) {
@@ -149,6 +150,26 @@ func TestPartitionLiveIps_EmptyScanLeavesDbIntact(t *testing.T) {
 	}
 }
 
+func TestPartitionLiveIps_RecentSyncedIpIsLive(t *testing.T) {
+	// Synced IPs from other nodes within 2 minutes should be counted as live
+	// even if they weren't observed in the local scan.
+	now := time.Now().Unix()
+	ipMap := map[string]int64{
+		"A": now - 30,  // synced 30s ago -> live
+		"B": now - 150, // synced 2m30s ago -> historical
+	}
+	observed := map[string]bool{}
+
+	live, historical := partitionLiveIps(ipMap, observed)
+
+	if got := collectIps(live); !reflect.DeepEqual(got, []string{"A"}) {
+		t.Fatalf("recent IP should be live\ngot:  %v\nwant: [A]", got)
+	}
+	if got := collectIps(historical); !reflect.DeepEqual(got, []string{"B"}) {
+		t.Fatalf("older IP should be historical\ngot:  %v\nwant: [B]", got)
+	}
+}
+
 func TestCheckFail2BanInstalled_DisabledEnvSkipsClientProbe(t *testing.T) {
 	t.Setenv("XUI_ENABLE_FAIL2BAN", "false")
 	marker := fakeFail2BanClient(t)

+ 39 - 2
web/job/node_traffic_sync_job.go

@@ -16,6 +16,7 @@ const (
 	nodeTrafficSyncConcurrency    = 8
 	nodeTrafficSyncRequestTimeout = 4 * time.Second
 	nodeReconcileTimeout          = 30 * time.Second
+	nodeClientIpSyncInterval      = 10 * time.Second
 )
 
 type NodeTrafficSyncJob struct {
@@ -25,6 +26,8 @@ type NodeTrafficSyncJob struct {
 	xrayService    service.XrayService
 	running        sync.Mutex
 	structural     atomicBool
+	ipSyncMu       sync.Mutex
+	lastIpSync     int64
 }
 
 type atomicBool struct {
@@ -70,6 +73,16 @@ func (j *NodeTrafficSyncJob) Run() {
 		return
 	}
 
+	// Decide once per tick whether this run also syncs client IPs, and stamp the
+	// clock before the loop so two back-to-back 5s ticks can't both qualify.
+	doIpSync := false
+	j.ipSyncMu.Lock()
+	if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) {
+		doIpSync = true
+		j.lastIpSync = now
+	}
+	j.ipSyncMu.Unlock()
+
 	sem := make(chan struct{}, nodeTrafficSyncConcurrency)
 	var wg sync.WaitGroup
 	for _, n := range nodes {
@@ -81,7 +94,7 @@ func (j *NodeTrafficSyncJob) Run() {
 		go func(n *model.Node) {
 			defer wg.Done()
 			defer func() { <-sem }()
-			j.syncOne(mgr, n)
+			j.syncOne(mgr, n, doIpSync)
 		}(n)
 	}
 	wg.Wait()
@@ -151,7 +164,7 @@ func (j *NodeTrafficSyncJob) Run() {
 	}
 }
 
-func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
+func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
 	rt, err := mgr.RemoteFor(n)
 	if err != nil {
 		logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
@@ -190,4 +203,28 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
 	if changed {
 		j.structural.set()
 	}
+
+	if !doIpSync {
+		return
+	}
+
+	nodeIps, err := rt.FetchAllClientIps(ctx)
+	if err == nil && len(nodeIps) > 0 {
+		if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
+			logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err)
+		}
+	} else if err != nil {
+		logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err)
+	}
+
+	masterIps, err := j.inboundService.GetAllInboundClientIps()
+	if err != nil {
+		logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err)
+		return
+	}
+	if len(masterIps) > 0 {
+		if err := rt.PushAllClientIps(ctx, masterIps); err != nil {
+			logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err)
+		}
+	}
 }

+ 19 - 0
web/runtime/remote.go

@@ -533,3 +533,22 @@ func isNonEmptySlice(v any) bool {
 	s, ok := v.([]any)
 	return ok && len(s) > 0
 }
+
+func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) {
+	env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil)
+	if err != nil {
+		return nil, err
+	}
+	var ips []model.InboundClientIps
+	if len(env.Obj) > 0 {
+		if err := json.Unmarshal(env.Obj, &ips); err != nil {
+			return nil, fmt.Errorf("decode client ips: %w", err)
+		}
+	}
+	return ips, nil
+}
+
+func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error {
+	_, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
+	return err
+}

+ 131 - 0
web/service/inbound.go

@@ -399,6 +399,137 @@ func (s *InboundService) GetInboundOptions(userId int) ([]InboundOption, error)
 	return out, nil
 }
 
+func (s *InboundService) GetAllInboundClientIps() ([]model.InboundClientIps, error) {
+	db := database.GetDB()
+	var ips []model.InboundClientIps
+	err := db.Model(&model.InboundClientIps{}).Find(&ips).Error
+	return ips, err
+}
+
+// clientIpStaleAfterSeconds mirrors job.ipStaleAfterSeconds: client IPs older than
+// 30 minutes are evicted. Applying the same cutoff inside the cross-node merge keeps
+// the synced blob bounded and stops the master's push-back from resurrecting IPs that
+// a node has already pruned (otherwise the merge defeats the eviction cluster-wide).
+const clientIpStaleAfterSeconds = int64(30 * 60)
+
+// clientIpEntry is the on-disk shape of each element of InboundClientIps.Ips. Tags
+// match job.IPWithTimestamp so the blob round-trips with the access.log scanner.
+type clientIpEntry struct {
+	IP        string `json:"ip"`
+	Timestamp int64  `json:"timestamp"`
+}
+
+// mergeClientIpEntries unions old and incoming IP observations, dropping anything
+// older than cutoff, keeping the most recent timestamp per IP, and returning the
+// result sorted newest-first.
+func mergeClientIpEntries(old, incoming []clientIpEntry, cutoff int64) []clientIpEntry {
+	ipMap := make(map[string]int64, len(old)+len(incoming))
+	for _, e := range old {
+		if e.Timestamp < cutoff {
+			continue
+		}
+		ipMap[e.IP] = e.Timestamp
+	}
+	for _, e := range incoming {
+		if e.Timestamp < cutoff {
+			continue
+		}
+		if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
+			ipMap[e.IP] = e.Timestamp
+		}
+	}
+	out := make([]clientIpEntry, 0, len(ipMap))
+	for ip, ts := range ipMap {
+		out = append(out, clientIpEntry{IP: ip, Timestamp: ts})
+	}
+	sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
+	return out
+}
+
+// MergeInboundClientIps folds client IPs synced from another node into the local
+// inbound_client_ips table without double-counting an IP seen on multiple nodes and
+// without resurrecting stale entries. Existing rows are updated in place; brand-new
+// clients (typically node-only clients with no local row) are created with a fresh
+// local id.
+func (s *InboundService) MergeInboundClientIps(incomingIps []model.InboundClientIps) error {
+	db := database.GetDB()
+	var currentIps []model.InboundClientIps
+	if err := db.Model(&model.InboundClientIps{}).Find(&currentIps).Error; err != nil {
+		return err
+	}
+
+	currentMap := make(map[string]*model.InboundClientIps, len(currentIps))
+	for i := range currentIps {
+		currentMap[currentIps[i].ClientEmail] = &currentIps[i]
+	}
+
+	now := time.Now().Unix()
+	cutoff := now - clientIpStaleAfterSeconds
+
+	tx := db.Begin()
+	defer func() {
+		if r := recover(); r != nil {
+			tx.Rollback()
+		}
+	}()
+
+	for _, incoming := range incomingIps {
+		if incoming.ClientEmail == "" || incoming.Ips == "" {
+			continue
+		}
+
+		var incomingEntries []clientIpEntry
+		_ = json.Unmarshal([]byte(incoming.Ips), &incomingEntries)
+
+		current, exists := currentMap[incoming.ClientEmail]
+		if !exists {
+			// New client we've never seen locally. Drop stale entries up front and
+			// skip the row entirely if nothing is fresh, so we don't persist a row
+			// that is dead on arrival.
+			fresh := mergeClientIpEntries(nil, incomingEntries, cutoff)
+			if len(fresh) == 0 {
+				continue
+			}
+			b, _ := json.Marshal(fresh)
+			incoming.Ips = string(b)
+			// Never carry the remote node's primary key into the local table: id
+			// spaces are independent across nodes and the remote id would collide
+			// with an unrelated local row. OnConflict guards the race where
+			// check_client_ip_job creates the same brand-new email between the
+			// snapshot above and this insert.
+			incoming.Id = 0
+			if err := tx.Clauses(clause.OnConflict{
+				Columns:   []clause.Column{{Name: "client_email"}},
+				DoNothing: true,
+			}).Create(&incoming).Error; err != nil {
+				tx.Rollback()
+				return err
+			}
+			continue
+		}
+
+		var oldEntries []clientIpEntry
+		if current.Ips != "" {
+			_ = json.Unmarshal([]byte(current.Ips), &oldEntries)
+		}
+
+		merged := mergeClientIpEntries(oldEntries, incomingEntries, cutoff)
+		b, _ := json.Marshal(merged)
+		mergedStr := string(b)
+
+		// A concurrent check_client_ip_job db.Save on the same row can interleave
+		// with this update (benign last-writer-wins; any dropped IP reappears on the
+		// next scan/sync), so only write when the blob actually changed.
+		if current.Ips != mergedStr {
+			if err := tx.Model(&model.InboundClientIps{}).Where("id = ?", current.Id).Update("ips", mergedStr).Error; err != nil {
+				tx.Rollback()
+				return err
+			}
+		}
+	}
+	return tx.Commit().Error
+}
+
 // inboundShadowsocksMethod extracts settings.method for Shadowsocks inbounds so
 // the client UI can generate a valid PSK (base64 of the method's key length)
 // for Shadowsocks 2022 ciphers. Returns "" for non-Shadowsocks inbounds.

+ 211 - 0
web/service/inbound_client_ips_merge_test.go

@@ -0,0 +1,211 @@
+package service
+
+import (
+	"encoding/json"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/mhsanaei/3x-ui/v3/database"
+	"github.com/mhsanaei/3x-ui/v3/database/model"
+)
+
+// setupClientIpTestDB spins up a throwaway SQLite database (migrations + seeders)
+// for a single test, mirroring the harness used by the other service tests.
+func setupClientIpTestDB(t *testing.T) {
+	t.Helper()
+	dbDir := t.TempDir()
+	t.Setenv("XUI_DB_FOLDER", dbDir)
+	if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil {
+		t.Fatalf("InitDB: %v", err)
+	}
+	t.Cleanup(func() { _ = database.CloseDB() })
+}
+
+func marshalIps(t *testing.T, entries ...clientIpEntry) string {
+	t.Helper()
+	b, err := json.Marshal(entries)
+	if err != nil {
+		t.Fatalf("marshal ips: %v", err)
+	}
+	return string(b)
+}
+
+// readClientIps returns the stored IP entries for an email as a map[ip]timestamp,
+// plus whether the row exists at all.
+func readClientIps(t *testing.T, email string) (map[string]int64, bool) {
+	t.Helper()
+	var row model.InboundClientIps
+	err := database.GetDB().Where("client_email = ?", email).First(&row).Error
+	if database.IsNotFound(err) {
+		return nil, false
+	}
+	if err != nil {
+		t.Fatalf("read client ips for %s: %v", email, err)
+	}
+	var entries []clientIpEntry
+	if row.Ips != "" {
+		if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
+			t.Fatalf("unmarshal stored ips for %s: %v", email, err)
+		}
+	}
+	out := make(map[string]int64, len(entries))
+	for _, e := range entries {
+		out[e.IP] = e.Timestamp
+	}
+	return out, true
+}
+
+func TestMergeInboundClientIps_CreatesNodeOnlyRowIgnoringRemoteId(t *testing.T) {
+	setupClientIpTestDB(t)
+	db := database.GetDB()
+	now := time.Now().Unix()
+
+	// Local client occupies id 1.
+	local := &model.InboundClientIps{ClientEmail: "local@x", Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now})}
+	if err := db.Create(local).Error; err != nil {
+		t.Fatalf("seed local row: %v", err)
+	}
+
+	// Incoming node-only client carries the remote node's id 1, which must not
+	// collide with the local row.
+	incoming := []model.InboundClientIps{{
+		Id:          1,
+		ClientEmail: "node@x",
+		Ips:         marshalIps(t, clientIpEntry{IP: "2.2.2.2", Timestamp: now}),
+	}}
+	if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil {
+		t.Fatalf("merge: %v", err)
+	}
+
+	// Local row is untouched.
+	if ips, ok := readClientIps(t, "local@x"); !ok || ips["1.1.1.1"] != now {
+		t.Fatalf("local@x changed unexpectedly: %v (exists=%v)", ips, ok)
+	}
+
+	// Node row exists with its own ip and a freshly assigned id (not the remote 1).
+	var nodeRow model.InboundClientIps
+	if err := db.Where("client_email = ?", "node@x").First(&nodeRow).Error; err != nil {
+		t.Fatalf("node@x not created: %v", err)
+	}
+	if nodeRow.Id == local.Id {
+		t.Fatalf("node@x reused local id %d instead of a fresh one", nodeRow.Id)
+	}
+	if ips, _ := readClientIps(t, "node@x"); ips["2.2.2.2"] != now {
+		t.Fatalf("node@x missing expected ip: %v", ips)
+	}
+}
+
+func TestMergeInboundClientIps_DedupKeepsMaxTimestamp(t *testing.T) {
+	setupClientIpTestDB(t)
+	db := database.GetDB()
+	now := time.Now().Unix()
+
+	if err := db.Create(&model.InboundClientIps{
+		ClientEmail: "a@x",
+		Ips:         marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now - 100}),
+	}).Error; err != nil {
+		t.Fatalf("seed: %v", err)
+	}
+
+	incoming := []model.InboundClientIps{{
+		ClientEmail: "a@x",
+		Ips: marshalIps(t,
+			clientIpEntry{IP: "1.1.1.1", Timestamp: now - 50}, // newer than stored -> wins
+			clientIpEntry{IP: "2.2.2.2", Timestamp: now - 10},
+		),
+	}}
+	if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil {
+		t.Fatalf("merge: %v", err)
+	}
+
+	ips, _ := readClientIps(t, "a@x")
+	if len(ips) != 2 {
+		t.Fatalf("want 2 ips, got %v", ips)
+	}
+	if ips["1.1.1.1"] != now-50 {
+		t.Fatalf("1.1.1.1 should keep max timestamp %d, got %d", now-50, ips["1.1.1.1"])
+	}
+	if ips["2.2.2.2"] != now-10 {
+		t.Fatalf("2.2.2.2 missing/incorrect: %d", ips["2.2.2.2"])
+	}
+}
+
+func TestMergeInboundClientIps_DropsStaleIps(t *testing.T) {
+	setupClientIpTestDB(t)
+	db := database.GetDB()
+	now := time.Now().Unix()
+
+	if err := db.Create(&model.InboundClientIps{
+		ClientEmail: "a@x",
+		Ips: marshalIps(t,
+			clientIpEntry{IP: "old", Timestamp: now - 3600}, // > 30m -> stale
+			clientIpEntry{IP: "fresh", Timestamp: now - 60},
+		),
+	}).Error; err != nil {
+		t.Fatalf("seed: %v", err)
+	}
+
+	incoming := []model.InboundClientIps{{
+		ClientEmail: "a@x",
+		Ips: marshalIps(t,
+			clientIpEntry{IP: "incStale", Timestamp: now - 4000}, // > 30m -> stale
+			clientIpEntry{IP: "incFresh", Timestamp: now - 10},
+		),
+	}}
+	if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil {
+		t.Fatalf("merge: %v", err)
+	}
+
+	ips, _ := readClientIps(t, "a@x")
+	if len(ips) != 2 {
+		t.Fatalf("want only fresh ips, got %v", ips)
+	}
+	if _, ok := ips["old"]; ok {
+		t.Fatalf("stale local ip not dropped: %v", ips)
+	}
+	if _, ok := ips["incStale"]; ok {
+		t.Fatalf("stale incoming ip not dropped: %v", ips)
+	}
+	if ips["fresh"] != now-60 || ips["incFresh"] != now-10 {
+		t.Fatalf("fresh ips wrong: %v", ips)
+	}
+}
+
+func TestMergeInboundClientIps_SkipsAllStaleCreate(t *testing.T) {
+	setupClientIpTestDB(t)
+	now := time.Now().Unix()
+
+	incoming := []model.InboundClientIps{{
+		ClientEmail: "b@x",
+		Ips:         marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now - 9999}),
+	}}
+	if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil {
+		t.Fatalf("merge: %v", err)
+	}
+
+	if _, ok := readClientIps(t, "b@x"); ok {
+		t.Fatalf("all-stale node-only client should not create a row")
+	}
+}
+
+func TestMergeInboundClientIps_SkipsBlankRows(t *testing.T) {
+	setupClientIpTestDB(t)
+	now := time.Now().Unix()
+
+	incoming := []model.InboundClientIps{
+		{ClientEmail: "", Ips: marshalIps(t, clientIpEntry{IP: "1.1.1.1", Timestamp: now})},
+		{ClientEmail: "c@x", Ips: ""},
+	}
+	if err := (&InboundService{}).MergeInboundClientIps(incoming); err != nil {
+		t.Fatalf("merge: %v", err)
+	}
+
+	var count int64
+	if err := database.GetDB().Model(&model.InboundClientIps{}).Count(&count).Error; err != nil {
+		t.Fatalf("count: %v", err)
+	}
+	if count != 0 {
+		t.Fatalf("blank rows should be skipped, but %d row(s) created", count)
+	}
+}