check_client_ip_job.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. package job
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "log"
  8. "os"
  9. "os/exec"
  10. "regexp"
  11. "runtime"
  12. "sort"
  13. "time"
  14. "github.com/mhsanaei/3x-ui/v3/database"
  15. "github.com/mhsanaei/3x-ui/v3/database/model"
  16. "github.com/mhsanaei/3x-ui/v3/logger"
  17. "github.com/mhsanaei/3x-ui/v3/xray"
  18. )
  19. // IPWithTimestamp tracks an IP address with its last seen timestamp
  20. type IPWithTimestamp struct {
  21. IP string `json:"ip"`
  22. Timestamp int64 `json:"timestamp"`
  23. }
  24. // CheckClientIpJob monitors client IP addresses from access logs and manages IP blocking based on configured limits.
  25. type CheckClientIpJob struct {
  26. lastClear int64
  27. disAllowedIps []string
  28. }
  29. var job *CheckClientIpJob
  30. const defaultXrayAPIPort = 62789
  31. const ipStaleAfterSeconds = int64(30 * 60)
  32. // NewCheckClientIpJob creates a new client IP monitoring job instance.
  33. func NewCheckClientIpJob() *CheckClientIpJob {
  34. job = new(CheckClientIpJob)
  35. return job
  36. }
  37. func (j *CheckClientIpJob) Run() {
  38. if j.lastClear == 0 {
  39. j.lastClear = time.Now().Unix()
  40. }
  41. shouldClearAccessLog := false
  42. fail2BanEnabled := isFail2BanEnabled()
  43. hasLimit := fail2BanEnabled && j.hasLimitIp()
  44. f2bInstalled := false
  45. if hasLimit {
  46. f2bInstalled = j.checkFail2BanInstalled()
  47. }
  48. isAccessLogAvailable := j.checkAccessLogAvailable(hasLimit)
  49. if fail2BanEnabled && isAccessLogAvailable {
  50. enforce := hasLimit
  51. if hasLimit && runtime.GOOS != "windows" && !f2bInstalled {
  52. logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.")
  53. enforce = false
  54. }
  55. shouldClearAccessLog = j.processLogFile(enforce)
  56. }
  57. if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) {
  58. j.clearAccessLog()
  59. }
  60. }
  61. func (j *CheckClientIpJob) clearAccessLog() {
  62. logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  63. j.checkError(err)
  64. defer logAccessP.Close()
  65. accessLogPath, err := xray.GetAccessLogPath()
  66. j.checkError(err)
  67. file, err := os.Open(accessLogPath)
  68. j.checkError(err)
  69. defer file.Close()
  70. _, err = io.Copy(logAccessP, file)
  71. j.checkError(err)
  72. err = os.Truncate(accessLogPath, 0)
  73. j.checkError(err)
  74. j.lastClear = time.Now().Unix()
  75. }
  76. func (j *CheckClientIpJob) hasLimitIp() bool {
  77. db := database.GetDB()
  78. var inbounds []*model.Inbound
  79. err := db.Model(model.Inbound{}).Find(&inbounds).Error
  80. if err != nil {
  81. return false
  82. }
  83. for _, inbound := range inbounds {
  84. if inbound.Settings == "" {
  85. continue
  86. }
  87. settings := map[string][]model.Client{}
  88. json.Unmarshal([]byte(inbound.Settings), &settings)
  89. clients := settings["clients"]
  90. for _, client := range clients {
  91. limitIp := client.LimitIP
  92. if limitIp > 0 {
  93. return true
  94. }
  95. }
  96. }
  97. return false
  98. }
  99. func (j *CheckClientIpJob) processLogFile(enforce bool) bool {
  100. ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
  101. emailRegex := regexp.MustCompile(`email: (.+)$`)
  102. timestampRegex := regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})`)
  103. accessLogPath, _ := xray.GetAccessLogPath()
  104. file, _ := os.Open(accessLogPath)
  105. defer file.Close()
  106. // Track IPs with their last seen timestamp
  107. inboundClientIps := make(map[string]map[string]int64, 100)
  108. scanner := bufio.NewScanner(file)
  109. for scanner.Scan() {
  110. line := scanner.Text()
  111. ipMatches := ipRegex.FindStringSubmatch(line)
  112. if len(ipMatches) < 2 {
  113. continue
  114. }
  115. ip := ipMatches[1]
  116. if ip == "127.0.0.1" || ip == "::1" {
  117. continue
  118. }
  119. emailMatches := emailRegex.FindStringSubmatch(line)
  120. if len(emailMatches) < 2 {
  121. continue
  122. }
  123. email := emailMatches[1]
  124. // Extract timestamp from log line
  125. var timestamp int64
  126. timestampMatches := timestampRegex.FindStringSubmatch(line)
  127. if len(timestampMatches) >= 2 {
  128. t, err := time.ParseInLocation("2006/01/02 15:04:05", timestampMatches[1], time.Local)
  129. if err == nil {
  130. timestamp = t.Unix()
  131. } else {
  132. timestamp = time.Now().Unix()
  133. }
  134. } else {
  135. timestamp = time.Now().Unix()
  136. }
  137. if _, exists := inboundClientIps[email]; !exists {
  138. inboundClientIps[email] = make(map[string]int64)
  139. }
  140. // Update timestamp - keep the latest
  141. if existingTime, ok := inboundClientIps[email][ip]; !ok || timestamp > existingTime {
  142. inboundClientIps[email][ip] = timestamp
  143. }
  144. }
  145. if err := scanner.Err(); err != nil {
  146. j.checkError(err)
  147. }
  148. shouldCleanLog := false
  149. for email, ipTimestamps := range inboundClientIps {
  150. // Convert to IPWithTimestamp slice
  151. ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps))
  152. for ip, timestamp := range ipTimestamps {
  153. ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp})
  154. }
  155. clientIpsRecord, err := j.getInboundClientIps(email)
  156. if err != nil {
  157. j.addInboundClientIps(email, ipsWithTime)
  158. continue
  159. }
  160. shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, email, ipsWithTime, enforce) || shouldCleanLog
  161. }
  162. return shouldCleanLog
  163. }
  164. func mergeClientIps(old, new []IPWithTimestamp, staleCutoff int64) map[string]int64 {
  165. ipMap := make(map[string]int64, len(old)+len(new))
  166. for _, ipTime := range old {
  167. if ipTime.Timestamp < staleCutoff {
  168. continue
  169. }
  170. ipMap[ipTime.IP] = ipTime.Timestamp
  171. }
  172. for _, ipTime := range new {
  173. if ipTime.Timestamp < staleCutoff {
  174. continue
  175. }
  176. if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime {
  177. ipMap[ipTime.IP] = ipTime.Timestamp
  178. }
  179. }
  180. return ipMap
  181. }
  182. func partitionLiveIps(ipMap map[string]int64, observedThisScan map[string]bool) (live, historical []IPWithTimestamp) {
  183. live = make([]IPWithTimestamp, 0, len(observedThisScan))
  184. historical = make([]IPWithTimestamp, 0, len(ipMap))
  185. for ip, ts := range ipMap {
  186. entry := IPWithTimestamp{IP: ip, Timestamp: ts}
  187. if observedThisScan[ip] {
  188. live = append(live, entry)
  189. } else {
  190. historical = append(historical, entry)
  191. }
  192. }
  193. sort.Slice(live, func(i, j int) bool { return live[i].Timestamp < live[j].Timestamp })
  194. sort.Slice(historical, func(i, j int) bool { return historical[i].Timestamp < historical[j].Timestamp })
  195. return live, historical
  196. }
  197. func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
  198. if !isFail2BanEnabled() {
  199. return false
  200. }
  201. cmd := "fail2ban-client"
  202. args := []string{"-h"}
  203. err := exec.Command(cmd, args...).Run()
  204. return err == nil
  205. }
  206. func isFail2BanEnabled() bool {
  207. value, ok := os.LookupEnv("XUI_ENABLE_FAIL2BAN")
  208. return !ok || value == "true"
  209. }
  210. func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool {
  211. accessLogPath, err := xray.GetAccessLogPath()
  212. if err != nil {
  213. return false
  214. }
  215. if accessLogPath == "none" || accessLogPath == "" {
  216. if iplimitActive {
  217. logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.")
  218. }
  219. return false
  220. }
  221. return true
  222. }
  223. func (j *CheckClientIpJob) checkError(e error) {
  224. if e != nil {
  225. logger.Warning("client ip job err:", e)
  226. }
  227. }
  228. func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) {
  229. db := database.GetDB()
  230. InboundClientIps := &model.InboundClientIps{}
  231. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  232. if err != nil {
  233. return nil, err
  234. }
  235. return InboundClientIps, nil
  236. }
  237. func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ipsWithTime []IPWithTimestamp) error {
  238. inboundClientIps := &model.InboundClientIps{}
  239. jsonIps, err := json.Marshal(ipsWithTime)
  240. j.checkError(err)
  241. inboundClientIps.ClientEmail = clientEmail
  242. inboundClientIps.Ips = string(jsonIps)
  243. db := database.GetDB()
  244. tx := db.Begin()
  245. defer func() {
  246. if err == nil {
  247. tx.Commit()
  248. } else {
  249. tx.Rollback()
  250. }
  251. }()
  252. err = tx.Save(inboundClientIps).Error
  253. if err != nil {
  254. return err
  255. }
  256. return nil
  257. }
  258. func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, newIpsWithTime []IPWithTimestamp, enforce bool) bool {
  259. // Get the inbound configuration
  260. inbound, err := j.getInboundByEmail(clientEmail)
  261. if err != nil {
  262. logger.Errorf("failed to fetch inbound settings for email %s: %s", clientEmail, err)
  263. return false
  264. }
  265. if inbound.Settings == "" {
  266. logger.Debug("wrong data:", inbound)
  267. return false
  268. }
  269. settings := map[string][]model.Client{}
  270. json.Unmarshal([]byte(inbound.Settings), &settings)
  271. clients := settings["clients"]
  272. // Find the client's IP limit
  273. var limitIp int
  274. var clientFound bool
  275. for _, client := range clients {
  276. if client.Email == clientEmail {
  277. limitIp = client.LimitIP
  278. clientFound = true
  279. break
  280. }
  281. }
  282. if !enforce || !clientFound || limitIp <= 0 || !inbound.Enable {
  283. // Nothing to enforce (collection-only run, no limit, client missing, or
  284. // inbound disabled): record the observed IPs for the panel and return.
  285. jsonIps, _ := json.Marshal(newIpsWithTime)
  286. inboundClientIps.Ips = string(jsonIps)
  287. db := database.GetDB()
  288. db.Save(inboundClientIps)
  289. return false
  290. }
  291. // Parse old IPs from database
  292. var oldIpsWithTime []IPWithTimestamp
  293. if inboundClientIps.Ips != "" {
  294. json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime)
  295. }
  296. ipMap := mergeClientIps(oldIpsWithTime, newIpsWithTime, time.Now().Unix()-ipStaleAfterSeconds)
  297. // only ips seen in this scan count toward the limit. see
  298. // partitionLiveIps.
  299. observedThisScan := make(map[string]bool, len(newIpsWithTime))
  300. for _, ipTime := range newIpsWithTime {
  301. observedThisScan[ipTime.IP] = true
  302. }
  303. liveIps, historicalIps := partitionLiveIps(ipMap, observedThisScan)
  304. shouldCleanLog := false
  305. j.disAllowedIps = []string{}
  306. // historical db-only ips are excluded from this count on purpose.
  307. var keptLive []IPWithTimestamp
  308. if len(liveIps) > limitIp {
  309. shouldCleanLog = true
  310. // keep the newest live ips, ban older ones.
  311. cutoff := len(liveIps) - limitIp
  312. keptLive = liveIps[cutoff:]
  313. bannedLive := liveIps[:cutoff]
  314. logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
  315. if err != nil {
  316. logger.Errorf("failed to open IP limit log file: %s", err)
  317. return false
  318. }
  319. defer logIpFile.Close()
  320. ipLogger := log.New(logIpFile, "", log.LstdFlags)
  321. // log format is load-bearing: x-ui.sh create_iplimit_jails builds
  322. // filter.d/3x-ipl.conf with
  323. // failregex = \[LIMIT_IP\]\s*Email\s*=\s*<F-USER>.+</F-USER>\s*\|\|\s*Disconnecting OLD IP\s*=\s*<ADDR>\s*\|\|\s*Timestamp\s*=\s*\d+
  324. // don't change the wording.
  325. for _, ipTime := range bannedLive {
  326. j.disAllowedIps = append(j.disAllowedIps, ipTime.IP)
  327. ipLogger.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp)
  328. }
  329. // force xray to drop existing connections from banned ips
  330. j.disconnectClientTemporarily(inbound, clientEmail, clients)
  331. } else {
  332. keptLive = liveIps
  333. }
  334. // keep kept-live + historical in the blob so the panel keeps showing
  335. // recently seen ips. banned live ips are already in the fail2ban log
  336. // and will reappear in the next scan if they reconnect.
  337. dbIps := make([]IPWithTimestamp, 0, len(keptLive)+len(historicalIps))
  338. dbIps = append(dbIps, keptLive...)
  339. dbIps = append(dbIps, historicalIps...)
  340. jsonIps, _ := json.Marshal(dbIps)
  341. inboundClientIps.Ips = string(jsonIps)
  342. db := database.GetDB()
  343. err = db.Save(inboundClientIps).Error
  344. if err != nil {
  345. logger.Error("failed to save inboundClientIps:", err)
  346. return false
  347. }
  348. if len(j.disAllowedIps) > 0 {
  349. logger.Infof("[LIMIT_IP] Client %s: Kept %d live IPs, queued %d old IPs for fail2ban", clientEmail, len(keptLive), len(j.disAllowedIps))
  350. }
  351. return shouldCleanLog
  352. }
  353. // disconnectClientTemporarily removes and re-adds a client to force disconnect banned connections
  354. func (j *CheckClientIpJob) disconnectClientTemporarily(inbound *model.Inbound, clientEmail string, clients []model.Client) {
  355. var xrayAPI xray.XrayAPI
  356. apiPort := j.resolveXrayAPIPort()
  357. err := xrayAPI.Init(apiPort)
  358. if err != nil {
  359. logger.Warningf("[LIMIT_IP] Failed to init Xray API for disconnection: %v", err)
  360. return
  361. }
  362. defer xrayAPI.Close()
  363. // Find the client config
  364. var clientConfig map[string]any
  365. for _, client := range clients {
  366. if client.Email == clientEmail {
  367. // Convert client to map for API
  368. clientBytes, _ := json.Marshal(client)
  369. json.Unmarshal(clientBytes, &clientConfig)
  370. break
  371. }
  372. }
  373. if clientConfig == nil {
  374. return
  375. }
  376. // Only perform remove/re-add for protocols supported by XrayAPI.AddUser
  377. protocol := string(inbound.Protocol)
  378. switch protocol {
  379. case "vmess", "vless", "trojan", "shadowsocks":
  380. // supported protocols, continue
  381. default:
  382. logger.Warningf("[LIMIT_IP] Temporary disconnect is not supported for protocol %s on inbound %s", protocol, inbound.Tag)
  383. return
  384. }
  385. // For Shadowsocks, ensure the required "cipher" field is present by
  386. // reading it from the inbound settings (e.g., settings["method"]).
  387. if string(inbound.Protocol) == "shadowsocks" {
  388. var inboundSettings map[string]any
  389. if err := json.Unmarshal([]byte(inbound.Settings), &inboundSettings); err != nil {
  390. logger.Warningf("[LIMIT_IP] Failed to parse inbound settings for shadowsocks cipher: %v", err)
  391. } else {
  392. if method, ok := inboundSettings["method"].(string); ok && method != "" {
  393. clientConfig["cipher"] = method
  394. }
  395. }
  396. }
  397. // Remove user to disconnect all connections
  398. err = xrayAPI.RemoveUser(inbound.Tag, clientEmail)
  399. if err != nil {
  400. logger.Warningf("[LIMIT_IP] Failed to remove user %s: %v", clientEmail, err)
  401. return
  402. }
  403. // Wait a moment for disconnection to take effect
  404. time.Sleep(100 * time.Millisecond)
  405. // Re-add user to allow new connections
  406. err = xrayAPI.AddUser(protocol, inbound.Tag, clientConfig)
  407. if err != nil {
  408. logger.Warningf("[LIMIT_IP] Failed to re-add user %s: %v", clientEmail, err)
  409. }
  410. }
  411. // resolveXrayAPIPort returns the API inbound port from running config, then template config, then default.
  412. func (j *CheckClientIpJob) resolveXrayAPIPort() int {
  413. var configErr error
  414. var templateErr error
  415. if port, err := getAPIPortFromConfigPath(xray.GetConfigPath()); err == nil {
  416. return port
  417. } else {
  418. configErr = err
  419. }
  420. db := database.GetDB()
  421. var template model.Setting
  422. if err := db.Where("key = ?", "xrayTemplateConfig").First(&template).Error; err == nil {
  423. if port, parseErr := getAPIPortFromConfigData([]byte(template.Value)); parseErr == nil {
  424. return port
  425. } else {
  426. templateErr = parseErr
  427. }
  428. } else {
  429. templateErr = err
  430. }
  431. logger.Warningf(
  432. "[LIMIT_IP] Could not determine Xray API port from config or template; falling back to default port %d (config error: %v, template error: %v)",
  433. defaultXrayAPIPort,
  434. configErr,
  435. templateErr,
  436. )
  437. return defaultXrayAPIPort
  438. }
  439. func getAPIPortFromConfigPath(configPath string) (int, error) {
  440. configData, err := os.ReadFile(configPath)
  441. if err != nil {
  442. return 0, err
  443. }
  444. return getAPIPortFromConfigData(configData)
  445. }
  446. func getAPIPortFromConfigData(configData []byte) (int, error) {
  447. xrayConfig := &xray.Config{}
  448. if err := json.Unmarshal(configData, xrayConfig); err != nil {
  449. return 0, err
  450. }
  451. for _, inboundConfig := range xrayConfig.InboundConfigs {
  452. if inboundConfig.Tag == "api" && inboundConfig.Port > 0 {
  453. return inboundConfig.Port, nil
  454. }
  455. }
  456. return 0, errors.New("api inbound port not found")
  457. }
  458. func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
  459. db := database.GetDB()
  460. inbound := &model.Inbound{}
  461. err := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").First(inbound).Error
  462. if err != nil {
  463. return nil, err
  464. }
  465. return inbound, nil
  466. }