check_client_ip_job.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "runtime"
  10. "sort"
  11. "time"
  12. "github.com/mhsanaei/3x-ui/v3/internal/database"
  13. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  14. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  15. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  16. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  17. "gorm.io/gorm"
  18. )
  19. // IPWithTimestamp tracks an IP address with its last seen timestamp
  20. type IPWithTimestamp struct {
  21. IP string `json:"ip"`
  22. Timestamp int64 `json:"timestamp"`
  23. }
  24. // CheckClientIpJob monitors client IP addresses and manages IP blocking based
  25. // on configured limits. The per-client IPs come from the core's online-stats
  26. // API; no access log is involved. On a core too old to expose that API the job
  27. // simply skips the run (the bundled core always supports it).
  28. type CheckClientIpJob struct {
  29. disAllowedIps []string
  30. xrayService service.XrayService
  31. }
  32. var job *CheckClientIpJob
  33. const defaultXrayAPIPort = 62789
  34. const ipStaleAfterSeconds = int64(30 * 60)
  35. // NewCheckClientIpJob creates a new client IP monitoring job instance.
  36. func NewCheckClientIpJob() *CheckClientIpJob {
  37. job = new(CheckClientIpJob)
  38. return job
  39. }
  40. func (j *CheckClientIpJob) Run() {
  41. observed, apiMode := j.collectFromOnlineAPI()
  42. if !apiMode {
  43. // xray is down or predates the online-stats API. There is no access-log
  44. // fallback anymore, so there is nothing to do this run.
  45. logger.Debug("[LimitIP] online-stats API unavailable this run; skipping")
  46. return
  47. }
  48. if !isFail2BanEnabled() {
  49. return
  50. }
  51. hasLimit := j.hasLimitIp()
  52. f2bInstalled := false
  53. if hasLimit {
  54. f2bInstalled = j.checkFail2BanInstalled()
  55. }
  56. j.processObserved(observed, j.resolveEnforce(hasLimit, f2bInstalled), true)
  57. }
  58. // resolveEnforce decides whether limits can actually be enforced this run.
  59. // Without fail2ban on a platform that needs it the limit can't be applied, so
  60. // enforcement is skipped (the panel resets these limits to 0 on upgrade and
  61. // disables the field, so this is normally a no-op).
  62. func (j *CheckClientIpJob) resolveEnforce(hasLimit, f2bInstalled bool) bool {
  63. if hasLimit && runtime.GOOS != "windows" && !f2bInstalled {
  64. return false
  65. }
  66. return hasLimit
  67. }
  68. // collectFromOnlineAPI builds per-email IP observations (email -> ip ->
  69. // last-seen unix seconds) from the core's online-stats API. ok=false means the
  70. // API is unavailable — xray not running, an older core, or a transient gRPC
  71. // failure — and the caller skips the run (there is no access-log fallback).
  72. func (j *CheckClientIpJob) collectFromOnlineAPI() (map[string]map[string]int64, bool) {
  73. onlineUsers, ok, err := j.xrayService.GetOnlineUsers()
  74. if err != nil {
  75. logger.Debug("[LimitIP] online-stats API unavailable this run:", err)
  76. return nil, false
  77. }
  78. if !ok {
  79. return nil, false
  80. }
  81. now := time.Now().Unix()
  82. observed := make(map[string]map[string]int64, len(onlineUsers))
  83. for _, user := range onlineUsers {
  84. for _, entry := range user.IPs {
  85. // No localhost guard needed here: the core's OnlineMap.AddIP drops
  86. // 127.0.0.1/[::1] itself, so they never reach this list.
  87. ts := entry.LastSeen
  88. if ts <= 0 {
  89. ts = now
  90. }
  91. if _, exists := observed[user.Email]; !exists {
  92. observed[user.Email] = make(map[string]int64)
  93. }
  94. if existing, seen := observed[user.Email][entry.IP]; !seen || ts > existing {
  95. observed[user.Email][entry.IP] = ts
  96. }
  97. }
  98. }
  99. return observed, true
  100. }
  101. // hasLimitIp reports whether any client carries an IP limit. It probes the
  102. // normalized clients table (limit_ip is synced there by SyncInbound and the
  103. // legacy seeder), replacing the old `settings LIKE '%limitIp%'` scan that
  104. // loaded and JSON-parsed every inbound's settings blob on each 10s run.
  105. func (j *CheckClientIpJob) hasLimitIp() bool {
  106. db := database.GetDB()
  107. var probe int64
  108. err := db.Model(&model.ClientRecord{}).Where("limit_ip > 0").Limit(1).Count(&probe).Error
  109. return err == nil && probe > 0
  110. }
  111. const ipScanChunk = 400
  112. func chunkEmails(s []string, size int) [][]string {
  113. if len(s) == 0 {
  114. return nil
  115. }
  116. chunks := make([][]string, 0, (len(s)+size-1)/size)
  117. for size < len(s) {
  118. s, chunks = s[size:], append(chunks, s[:size])
  119. }
  120. return append(chunks, s)
  121. }
  122. // loadClientLimits maps each observed email to its clients.limit_ip in a few
  123. // chunked queries, replacing the per-email settings-JSON parse that previously
  124. // resolved the limit.
  125. func (j *CheckClientIpJob) loadClientLimits(emails []string) map[string]int {
  126. db := database.GetDB()
  127. out := make(map[string]int, len(emails))
  128. for _, batch := range chunkEmails(emails, ipScanChunk) {
  129. var rows []struct {
  130. Email string
  131. LimitIp int
  132. }
  133. if err := db.Model(&model.ClientRecord{}).
  134. Select("email, limit_ip").
  135. Where("email IN ?", batch).
  136. Scan(&rows).Error; err != nil {
  137. j.checkError(err)
  138. continue
  139. }
  140. for _, r := range rows {
  141. out[r.Email] = r.LimitIp
  142. }
  143. }
  144. return out
  145. }
  146. // loadInboundsByEmails resolves each email's owning inbound through the
  147. // clients/client_inbounds relation in chunked queries. Like the old per-email
  148. // First() it keeps the lowest inbound id when a client spans several inbounds.
  149. func (j *CheckClientIpJob) loadInboundsByEmails(emails []string) map[string]*model.Inbound {
  150. db := database.GetDB()
  151. minInboundByEmail := make(map[string]int, len(emails))
  152. for _, batch := range chunkEmails(emails, ipScanChunk) {
  153. var pairs []struct {
  154. Email string
  155. InboundId int
  156. }
  157. if err := db.Table("client_inbounds").
  158. Select("clients.email AS email, client_inbounds.inbound_id AS inbound_id").
  159. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  160. Where("clients.email IN ?", batch).
  161. Scan(&pairs).Error; err != nil {
  162. j.checkError(err)
  163. return nil
  164. }
  165. for _, p := range pairs {
  166. if cur, ok := minInboundByEmail[p.Email]; !ok || p.InboundId < cur {
  167. minInboundByEmail[p.Email] = p.InboundId
  168. }
  169. }
  170. }
  171. if len(minInboundByEmail) == 0 {
  172. return nil
  173. }
  174. idSet := make(map[int]struct{}, len(minInboundByEmail))
  175. ids := make([]int, 0, len(minInboundByEmail))
  176. for _, id := range minInboundByEmail {
  177. if _, seen := idSet[id]; !seen {
  178. idSet[id] = struct{}{}
  179. ids = append(ids, id)
  180. }
  181. }
  182. sort.Ints(ids)
  183. inboundsById := make(map[int]*model.Inbound, len(ids))
  184. for lo := 0; lo < len(ids); lo += ipScanChunk {
  185. hi := min(lo+ipScanChunk, len(ids))
  186. var page []*model.Inbound
  187. if err := db.Model(&model.Inbound{}).Where("id IN ?", ids[lo:hi]).Find(&page).Error; err != nil {
  188. j.checkError(err)
  189. return nil
  190. }
  191. for _, ib := range page {
  192. inboundsById[ib.Id] = ib
  193. }
  194. }
  195. out := make(map[string]*model.Inbound, len(minInboundByEmail))
  196. for email, id := range minInboundByEmail {
  197. if ib, ok := inboundsById[id]; ok {
  198. out[email] = ib
  199. }
  200. }
  201. return out
  202. }
  203. func (j *CheckClientIpJob) loadClientIpRows(emails []string) map[string]*model.InboundClientIps {
  204. db := database.GetDB()
  205. out := make(map[string]*model.InboundClientIps, len(emails))
  206. for _, batch := range chunkEmails(emails, ipScanChunk) {
  207. var rows []model.InboundClientIps
  208. if err := db.Where("client_email IN ?", batch).Find(&rows).Error; err != nil {
  209. j.checkError(err)
  210. continue
  211. }
  212. for i := range rows {
  213. out[rows[i].ClientEmail] = &rows[i]
  214. }
  215. }
  216. return out
  217. }
  218. // processObserved runs collection + enforcement for one scan's observations
  219. // (email -> ip -> last-seen unix seconds). observedAreLive marks the
  220. // observations as live connections, which bypass the stale cutoff: a connection
  221. // that opened hours ago is still live even though its timestamp is old. The
  222. // online-stats API always reports live connections, so the job passes true.
  223. // Lookups are batched up front and all inbound_client_ips writes share one
  224. // transaction, so a scan costs a handful of queries and one fsync instead of
  225. // several per observed email.
  226. func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64, enforce, observedAreLive bool) bool {
  227. shouldCleanLog := false
  228. now := time.Now().Unix()
  229. emails := make([]string, 0, len(observed))
  230. for email := range observed {
  231. emails = append(emails, email)
  232. }
  233. sort.Strings(emails)
  234. limitByEmail := j.loadClientLimits(emails)
  235. inboundByEmail := j.loadInboundsByEmails(emails)
  236. ipRowByEmail := j.loadClientIpRows(emails)
  237. // attribution accumulates this scan's local observations per email so they can
  238. // be recorded under this panel's own guid for cross-node IP attribution.
  239. attribution := make(map[string][]model.ClientIpEntry, len(observed))
  240. type pendingDisconnect struct {
  241. inbound *model.Inbound
  242. email string
  243. }
  244. var disconnects []pendingDisconnect
  245. db := database.GetDB()
  246. tx := db.Begin()
  247. if tx.Error != nil {
  248. j.checkError(tx.Error)
  249. return false
  250. }
  251. committed := false
  252. defer func() {
  253. if !committed {
  254. tx.Rollback()
  255. }
  256. }()
  257. for _, email := range emails {
  258. ipTimestamps := observed[email]
  259. // The observations can still reference a client that was just renamed
  260. // or deleted; its email no longer matches any inbound. Skip it (and
  261. // drop any orphaned tracking row) instead of recreating a row and
  262. // logging an ERROR every run (#4963). The batch map resolves through
  263. // the clients relation; the per-email fallback keeps its settings LIKE
  264. // net for clients not yet present there.
  265. inbound, ok := inboundByEmail[email]
  266. if !ok {
  267. var err error
  268. inbound, err = j.getInboundByEmail(email)
  269. if err != nil {
  270. if errors.Is(err, gorm.ErrRecordNotFound) {
  271. logger.Debugf("[LimitIP] skipping stale observed email %q (renamed or deleted)", email)
  272. j.delInboundClientIps(tx, email)
  273. } else {
  274. j.checkError(err)
  275. }
  276. continue
  277. }
  278. }
  279. // Convert to IPWithTimestamp slice
  280. ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps))
  281. attrEntries := make([]model.ClientIpEntry, 0, len(ipTimestamps))
  282. for ip, timestamp := range ipTimestamps {
  283. ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp})
  284. // Live API observations may carry an old lastSeen (connection start),
  285. // so stamp attribution with now; otherwise the stale cutoff would evict
  286. // an IP that is connected right now.
  287. attrTs := timestamp
  288. if observedAreLive {
  289. attrTs = now
  290. }
  291. attrEntries = append(attrEntries, model.ClientIpEntry{IP: ip, Timestamp: attrTs})
  292. }
  293. if len(attrEntries) > 0 {
  294. attribution[email] = attrEntries
  295. }
  296. clientIpsRecord, ok := ipRowByEmail[email]
  297. if !ok {
  298. jsonIps, err := json.Marshal(ipsWithTime)
  299. if err != nil {
  300. j.checkError(err)
  301. continue
  302. }
  303. if err := tx.Save(&model.InboundClientIps{ClientEmail: email, Ips: string(jsonIps)}).Error; err != nil {
  304. j.checkError(err)
  305. }
  306. continue
  307. }
  308. cleaned, banned := j.updateInboundClientIps(tx, clientIpsRecord, inbound, email, limitByEmail[email], ipsWithTime, enforce, observedAreLive)
  309. shouldCleanLog = cleaned || shouldCleanLog
  310. if banned {
  311. disconnects = append(disconnects, pendingDisconnect{inbound: inbound, email: email})
  312. }
  313. }
  314. if err := tx.Commit().Error; err != nil {
  315. j.checkError(err)
  316. return shouldCleanLog
  317. }
  318. committed = true
  319. // Xray disconnects run after the commit so their network round-trips never
  320. // extend the scan's write transaction (node syncs upsert the same table).
  321. clientsCache := make(map[int][]model.Client)
  322. for _, d := range disconnects {
  323. clients, cached := clientsCache[d.inbound.Id]
  324. if !cached {
  325. settings := map[string][]model.Client{}
  326. _ = json.Unmarshal([]byte(d.inbound.Settings), &settings)
  327. clients = settings["clients"]
  328. clientsCache[d.inbound.Id] = clients
  329. }
  330. j.disconnectClientTemporarily(d.inbound, d.email, clients)
  331. }
  332. j.recordLocalAttribution(attribution)
  333. return shouldCleanLog
  334. }
  335. // recordLocalAttribution stores this scan's local observations under this panel's
  336. // own guid so a parent panel can attribute each IP to the node it is on.
  337. // Best-effort: attribution is advisory and must never block IP-limit enforcement.
  338. func (j *CheckClientIpJob) recordLocalAttribution(attribution map[string][]model.ClientIpEntry) {
  339. if len(attribution) == 0 {
  340. return
  341. }
  342. guid, err := (&service.SettingService{}).GetPanelGuid()
  343. if err != nil || guid == "" {
  344. return
  345. }
  346. if err := (&service.InboundService{}).RecordLocalClientIps(guid, attribution); err != nil {
  347. logger.Debug("[LimitIP] record local ip attribution failed:", err)
  348. }
  349. }
  350. // mergeClientIps folds this scan's observations into the persisted set,
  351. // dropping entries older than staleCutoff. newAlwaysLive exempts the new
  352. // entries from that cutoff: an API-observed IP is a live connection by
  353. // definition, even when its lastSeen (set at dispatch time) is hours old.
  354. func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64, newAlwaysLive bool) map[string]int64 {
  355. ipMap := make(map[string]int64, len(old)+len(new))
  356. for _, ipTime := range old {
  357. if ipTime.Timestamp < staleCutoff {
  358. continue
  359. }
  360. ipMap[ipTime.IP] = ipTime.Timestamp
  361. }
  362. for _, ipTime := range new {
  363. if !newAlwaysLive && ipTime.Timestamp < staleCutoff {
  364. continue
  365. }
  366. if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime {
  367. ipMap[ipTime.IP] = ipTime.Timestamp
  368. }
  369. }
  370. return ipMap
  371. }
  372. // selectIpsToBan splits the live IPs (sorted oldest-first by partitionLiveIps)
  373. // into the newest `limit` entries to keep and the older remainder to ban.
  374. func selectIpsToBan(live []IPWithTimestamp, limit int) (kept, banned []IPWithTimestamp) {
  375. if limit <= 0 || len(live) <= limit {
  376. return live, nil
  377. }
  378. cutoff := len(live) - limit
  379. return live[cutoff:], live[:cutoff]
  380. }
  381. func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) {
  382. live = make([]IPWithTimestamp, 0, len(observedThisScan))
  383. historical = make([]IPWithTimestamp, 0, len(ipMap))
  384. now := time.Now().Unix()
  385. for ip, ts := range ipMap {
  386. entry := IPWithTimestamp{IP: ip, Timestamp: ts}
  387. // Consider an IP "live" if it was seen locally in this scan, OR if its
  388. // timestamp from the synced database is very recent (e.g. within 2 minutes).
  389. // This ensures cluster-wide limits work even if the IP was seen on another node.
  390. if observedThisScan[ip] || now-ts < 120 {
  391. live = append(live, entry)
  392. } else {
  393. historical = append(historical, entry)
  394. }
  395. }
  396. sort.Slice(live, func(i, j int) bool { return live[i].Timestamp < live[j].Timestamp })
  397. sort.Slice(historical, func(i, j int) bool { return historical[i].Timestamp < historical[j].Timestamp })
  398. return live, historical
  399. }
  400. func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
  401. if !isFail2BanEnabled() {
  402. return false
  403. }
  404. cmd := "fail2ban-client"
  405. args := []string{"-h"}
  406. err := exec.CommandContext(context.Background(), cmd, args...).Run()
  407. return err == nil
  408. }
  409. func isFail2BanEnabled() bool {
  410. value, ok := os.LookupEnv("XUI_ENABLE_FAIL2BAN")
  411. return !ok || value == "true"
  412. }
  413. func (j *CheckClientIpJob) checkError(e error) {
  414. if e != nil {
  415. logger.Warning("client ip job err:", e)
  416. }
  417. }
  418. // delInboundClientIps drops the inbound_client_ips tracking row for an email
  419. // that no longer maps to any inbound (a renamed or deleted client), so stale
  420. // access-log entries don't keep a ghost row alive (#4963).
  421. func (j *CheckClientIpJob) delInboundClientIps(tx *gorm.DB, clientEmail string) {
  422. if err := tx.Where("client_email = ?", clientEmail).Delete(&model.InboundClientIps{}).Error; err != nil {
  423. j.checkError(err)
  424. }
  425. }
  426. // updateInboundClientIps merges one email's observed IPs into its tracking row
  427. // and applies the IP limit. limitIp comes from the caller (the clients table);
  428. // writes go through the caller's transaction. banned=true asks the caller to
  429. // disconnect the client after the transaction commits.
  430. func (j *CheckClientIpJob) updateInboundClientIps(tx *gorm.DB, inboundClientIps *model.InboundClientIps, inbound *model.Inbound, clientEmail string, limitIp int, newIpsWithTime []IPWithTimestamp, enforce, observedAreLive bool) (shouldCleanLog, banned bool) {
  431. if inbound.Settings == "" {
  432. logger.Debug("wrong data:", inbound)
  433. return false, false
  434. }
  435. if !enforce || limitIp <= 0 || !inbound.Enable {
  436. // Nothing to enforce (collection-only run, no limit on the clients row,
  437. // or inbound disabled): record the observed IPs for the panel and return.
  438. jsonIps, _ := json.Marshal(newIpsWithTime)
  439. inboundClientIps.Ips = string(jsonIps)
  440. if err := tx.Save(inboundClientIps).Error; err != nil {
  441. logger.Error("failed to save inboundClientIps:", err)
  442. }
  443. return false, false
  444. }
  445. // Parse old IPs from database
  446. var oldIpsWithTime []IPWithTimestamp
  447. if inboundClientIps.Ips != "" {
  448. _ = json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime)
  449. }
  450. ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds, observedAreLive)
  451. // only ips seen in this scan count toward the limit. see
  452. // partitionLiveIps.
  453. observedThisScan := make(map[string]bool, len(newIpsWithTime))
  454. for _, ipTime := range newIpsWithTime {
  455. observedThisScan[ipTime.IP] = true
  456. }
  457. liveIps, historicalIps := partitionLiveIps(ipMap, observedThisScan)
  458. j.disAllowedIps = []string{}
  459. // historical db-only ips are excluded from this count on purpose.
  460. keptLive, bannedLive := selectIpsToBan(liveIps, limitIp)
  461. if len(bannedLive) > 0 {
  462. shouldCleanLog = true
  463. banned = true
  464. logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  465. if err != nil {
  466. logger.Errorf("failed to open IP limit log file: %s", err)
  467. return false, false
  468. }
  469. defer logIpFile.Close()
  470. ipLogger := log.New(logIpFile, "", log.LstdFlags)
  471. // log format is load-bearing: x-ui.sh create_iplimit_jails builds
  472. // filter.d/3x-ipl.conf with
  473. // failregex = \[LIMIT_IP\]\s*Email\s*=\s*<F-USER>.+</F-USER>\s*\|\|\s*Disconnecting OLD IP\s*=\s*<ADDR>\s*\|\|\s*Timestamp\s*=\s*\d+
  474. // don't change the wording.
  475. for _, ipTime := range bannedLive {
  476. j.disAllowedIps = append(j.disAllowedIps, ipTime.IP)
  477. ipLogger.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp)
  478. }
  479. }
  480. // keep kept-live + historical in the blob so the panel keeps showing
  481. // recently seen ips. banned live ips are already in the fail2ban log
  482. // and will reappear in the next scan if they reconnect.
  483. dbIps := make([]IPWithTimestamp, 0, len(keptLive)+len(historicalIps))
  484. dbIps = append(dbIps, keptLive...)
  485. dbIps = append(dbIps, historicalIps...)
  486. jsonIps, _ := json.Marshal(dbIps)
  487. inboundClientIps.Ips = string(jsonIps)
  488. if err := tx.Save(inboundClientIps).Error; err != nil {
  489. logger.Error("failed to save inboundClientIps:", err)
  490. return false, banned
  491. }
  492. if len(j.disAllowedIps) > 0 {
  493. logger.Infof("[LIMIT_IP] Client %s: Kept %d live IPs, queued %d old IPs for fail2ban", clientEmail, len(keptLive), len(j.disAllowedIps))
  494. }
  495. return shouldCleanLog, banned
  496. }
  497. // disconnectClientTemporarily removes and re-adds a client to force disconnect banned connections
  498. func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, clientEmail string, clients []model.Client) {
  499. var xrayAPI xray.XrayAPI
  500. apiPort := j.resolveXrayAPIPort()
  501. err := xrayAPI.Init(apiPort)
  502. if err != nil {
  503. logger.Warningf("[LIMIT_IP] Failed to init Xray API for disconnection: %v", err)
  504. return
  505. }
  506. defer xrayAPI.Close()
  507. // Find the client config
  508. var clientConfig map[string]any
  509. for _, client := range clients {
  510. if client.Email == clientEmail {
  511. // Convert client to map for API
  512. clientBytes, _ := json.Marshal(client)
  513. _ = json.Unmarshal(clientBytes, &clientConfig)
  514. break
  515. }
  516. }
  517. if clientConfig == nil {
  518. return
  519. }
  520. // Only perform remove/re-add for protocols supported by XrayAPI.AddUser
  521. protocol := string(inbound.Protocol)
  522. switch protocol {
  523. case "vmess", "vless", "trojan", "shadowsocks":
  524. // supported protocols, continue
  525. default:
  526. logger.Warningf("[LIMIT_IP] Temporary disconnect is not supported for protocol %s on inbound %s", protocol, inbound.Tag)
  527. return
  528. }
  529. // For Shadowsocks, ensure the required "cipher" field is present by
  530. // reading it from the inbound settings (e.g., settings["method"]).
  531. if string(inbound.Protocol) == "shadowsocks" {
  532. var inboundSettings map[string]any
  533. if err := json.Unmarshal([]byte(inbound.Settings), &inboundSettings); err != nil {
  534. logger.Warningf("[LIMIT_IP] Failed to parse inbound settings for shadowsocks cipher: %v", err)
  535. } else {
  536. if method, ok := inboundSettings["method"].(string); ok && method != "" {
  537. clientConfig["cipher"] = method
  538. }
  539. }
  540. }
  541. // Remove user to disconnect all connections
  542. err = xrayAPI.RemoveUser(inbound.Tag, clientEmail)
  543. if err != nil {
  544. logger.Warningf("[LIMIT_IP] Failed to remove user %s: %v", clientEmail, err)
  545. return
  546. }
  547. // Wait a moment for disconnection to take effect
  548. time.Sleep(100 * time.Millisecond)
  549. // Re-add user to allow new connections
  550. err = xrayAPI.AddUser(protocol, inbound.Tag, clientConfig)
  551. if err != nil {
  552. logger.Warningf("[LIMIT_IP] Failed to re-add user %s: %v", clientEmail, err)
  553. }
  554. }
  555. // resolveXrayAPIPort returns the API inbound port from running config, then template config, then default.
  556. func (j *CheckClientIpJob) resolveXrayAPIPort() int {
  557. var configErr error
  558. var templateErr error
  559. if port, err := getAPIPortFromConfigPath(xray.GetConfigPath()); err == nil {
  560. return port
  561. } else {
  562. configErr = err
  563. }
  564. db := database.GetDB()
  565. var template model.Setting
  566. if err := db.Where("key = ?", "xrayTemplateConfig").First(&template).Error; err == nil {
  567. if port, parseErr := getAPIPortFromConfigData([]byte(template.Value)); parseErr == nil {
  568. return port
  569. } else {
  570. templateErr = parseErr
  571. }
  572. } else {
  573. templateErr = err
  574. }
  575. logger.Warningf(
  576. "[LIMIT_IP] Could not determine Xray API port from config or template; falling back to default port %d (config error: %v, template error: %v)",
  577. defaultXrayAPIPort,
  578. configErr,
  579. templateErr,
  580. )
  581. return defaultXrayAPIPort
  582. }
  583. func getAPIPortFromConfigPath(configPath string) (int, error) {
  584. configData, err := os.ReadFile(configPath)
  585. if err != nil {
  586. return 0, err
  587. }
  588. return getAPIPortFromConfigData(configData)
  589. }
  590. func getAPIPortFromConfigData(configData []byte) (int, error) {
  591. xrayConfig := &xray.Config{}
  592. if err := json.Unmarshal(configData, xrayConfig); err != nil {
  593. return 0, err
  594. }
  595. for _, inboundConfig := range xrayConfig.InboundConfigs {
  596. if inboundConfig.Tag == "api" && inboundConfig.Port > 0 {
  597. return inboundConfig.Port, nil
  598. }
  599. }
  600. return 0, errors.New("api inbound port not found")
  601. }
  602. // getInboundByEmail resolves the inbound that owns a client email. It prefers
  603. // the exact clients/client_inbounds relation; a substring "settings LIKE
  604. // %email%" can match the wrong inbound (an email that is a substring of another,
  605. // or text that merely appears elsewhere in the settings JSON). The LIKE + JSON
  606. // scan stays only as a fallback for clients not yet present in the relation, so
  607. // nothing regresses when the join finds no row.
  608. func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
  609. db := database.GetDB()
  610. inbound := &model.Inbound{}
  611. err := db.Model(&model.Inbound{}).
  612. Joins("JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id").
  613. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  614. Where("clients.email = ?", clientEmail).
  615. First(inbound).Error
  616. if err == nil {
  617. return inbound, nil
  618. }
  619. var candidates []model.Inbound
  620. if listErr := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").Find(&candidates).Error; listErr != nil {
  621. return nil, listErr
  622. }
  623. for i := range candidates {
  624. settings := map[string][]model.Client{}
  625. if jsonErr := json.Unmarshal([]byte(candidates[i].Settings), &settings); jsonErr != nil {
  626. continue
  627. }
  628. for _, client := range settings["clients"] {
  629. if client.Email == clientEmail {
  630. return &candidates[i], nil
  631. }
  632. }
  633. }
  634. return nil, err
  635. }