check_client_ip_job.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package job
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "io"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "regexp"
  10. "runtime"
  11. "sort"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v2/database"
  14. "github.com/mhsanaei/3x-ui/v2/database/model"
  15. "github.com/mhsanaei/3x-ui/v2/logger"
  16. "github.com/mhsanaei/3x-ui/v2/xray"
  17. )
  18. // IPWithTimestamp tracks an IP address with its last seen timestamp
  19. type IPWithTimestamp struct {
  20. IP string `json:"ip"`
  21. Timestamp int64 `json:"timestamp"`
  22. }
  23. // CheckClientIpJob monitors client IP addresses from access logs and manages IP blocking based on configured limits.
  24. type CheckClientIpJob struct {
  25. lastClear int64
  26. disAllowedIps []string
  27. }
  28. var job *CheckClientIpJob
  29. // NewCheckClientIpJob creates a new client IP monitoring job instance.
  30. func NewCheckClientIpJob() *CheckClientIpJob {
  31. job = new(CheckClientIpJob)
  32. return job
  33. }
  34. func (j *CheckClientIpJob) Run() {
  35. if j.lastClear == 0 {
  36. j.lastClear = time.Now().Unix()
  37. }
  38. shouldClearAccessLog := false
  39. iplimitActive := j.hasLimitIp()
  40. f2bInstalled := j.checkFail2BanInstalled()
  41. isAccessLogAvailable := j.checkAccessLogAvailable(iplimitActive)
  42. if isAccessLogAvailable {
  43. if runtime.GOOS == "windows" {
  44. if iplimitActive {
  45. shouldClearAccessLog = j.processLogFile()
  46. }
  47. } else {
  48. if iplimitActive {
  49. if f2bInstalled {
  50. shouldClearAccessLog = j.processLogFile()
  51. } else {
  52. if !f2bInstalled {
  53. logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.")
  54. }
  55. }
  56. }
  57. }
  58. }
  59. if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) {
  60. j.clearAccessLog()
  61. }
  62. }
  63. func (j *CheckClientIpJob) clearAccessLog() {
  64. logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  65. j.checkError(err)
  66. defer logAccessP.Close()
  67. accessLogPath, err := xray.GetAccessLogPath()
  68. j.checkError(err)
  69. file, err := os.Open(accessLogPath)
  70. j.checkError(err)
  71. defer file.Close()
  72. _, err = io.Copy(logAccessP, file)
  73. j.checkError(err)
  74. err = os.Truncate(accessLogPath, 0)
  75. j.checkError(err)
  76. j.lastClear = time.Now().Unix()
  77. }
  78. func (j *CheckClientIpJob) hasLimitIp() bool {
  79. db := database.GetDB()
  80. var inbounds []*model.Inbound
  81. err := db.Model(model.Inbound{}).Find(&inbounds).Error
  82. if err != nil {
  83. return false
  84. }
  85. for _, inbound := range inbounds {
  86. if inbound.Settings == "" {
  87. continue
  88. }
  89. settings := map[string][]model.Client{}
  90. json.Unmarshal([]byte(inbound.Settings), &settings)
  91. clients := settings["clients"]
  92. for _, client := range clients {
  93. limitIp := client.LimitIP
  94. if limitIp > 0 {
  95. return true
  96. }
  97. }
  98. }
  99. return false
  100. }
  101. func (j *CheckClientIpJob) processLogFile() bool {
  102. ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
  103. emailRegex := regexp.MustCompile(`email: (.+)$`)
  104. timestampRegex := regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})`)
  105. accessLogPath, _ := xray.GetAccessLogPath()
  106. file, _ := os.Open(accessLogPath)
  107. defer file.Close()
  108. // Track IPs with their last seen timestamp
  109. inboundClientIps := make(map[string]map[string]int64, 100)
  110. scanner := bufio.NewScanner(file)
  111. for scanner.Scan() {
  112. line := scanner.Text()
  113. ipMatches := ipRegex.FindStringSubmatch(line)
  114. if len(ipMatches) < 2 {
  115. continue
  116. }
  117. ip := ipMatches[1]
  118. if ip == "127.0.0.1" || ip == "::1" {
  119. continue
  120. }
  121. emailMatches := emailRegex.FindStringSubmatch(line)
  122. if len(emailMatches) < 2 {
  123. continue
  124. }
  125. email := emailMatches[1]
  126. // Extract timestamp from log line
  127. var timestamp int64
  128. timestampMatches := timestampRegex.FindStringSubmatch(line)
  129. if len(timestampMatches) >= 2 {
  130. t, err := time.Parse("2006/01/02 15:04:05", timestampMatches[1])
  131. if err == nil {
  132. timestamp = t.Unix()
  133. } else {
  134. timestamp = time.Now().Unix()
  135. }
  136. } else {
  137. timestamp = time.Now().Unix()
  138. }
  139. if _, exists := inboundClientIps[email]; !exists {
  140. inboundClientIps[email] = make(map[string]int64)
  141. }
  142. // Update timestamp - keep the latest
  143. if existingTime, ok := inboundClientIps[email][ip]; !ok || timestamp > existingTime {
  144. inboundClientIps[email][ip] = timestamp
  145. }
  146. }
  147. shouldCleanLog := false
  148. for email, ipTimestamps := range inboundClientIps {
  149. // Convert to IPWithTimestamp slice
  150. ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps))
  151. for ip, timestamp := range ipTimestamps {
  152. ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp})
  153. }
  154. clientIpsRecord, err := j.getInboundClientIps(email)
  155. if err != nil {
  156. j.addInboundClientIps(email, ipsWithTime)
  157. continue
  158. }
  159. shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, email, ipsWithTime) || shouldCleanLog
  160. }
  161. return shouldCleanLog
  162. }
  163. func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
  164. cmd := "fail2ban-client"
  165. args := []string{"-h"}
  166. err := exec.Command(cmd, args...).Run()
  167. return err == nil
  168. }
  169. func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool {
  170. accessLogPath, err := xray.GetAccessLogPath()
  171. if err != nil {
  172. return false
  173. }
  174. if accessLogPath == "none" || accessLogPath == "" {
  175. if iplimitActive {
  176. logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.")
  177. }
  178. return false
  179. }
  180. return true
  181. }
  182. func (j *CheckClientIpJob) checkError(e error) {
  183. if e != nil {
  184. logger.Warning("client ip job err:", e)
  185. }
  186. }
  187. func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) {
  188. db := database.GetDB()
  189. InboundClientIps := &model.InboundClientIps{}
  190. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  191. if err != nil {
  192. return nil, err
  193. }
  194. return InboundClientIps, nil
  195. }
  196. func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ipsWithTime []IPWithTimestamp) error {
  197. inboundClientIps := &model.InboundClientIps{}
  198. jsonIps, err := json.Marshal(ipsWithTime)
  199. j.checkError(err)
  200. inboundClientIps.ClientEmail = clientEmail
  201. inboundClientIps.Ips = string(jsonIps)
  202. db := database.GetDB()
  203. tx := db.Begin()
  204. defer func() {
  205. if err == nil {
  206. tx.Commit()
  207. } else {
  208. tx.Rollback()
  209. }
  210. }()
  211. err = tx.Save(inboundClientIps).Error
  212. if err != nil {
  213. return err
  214. }
  215. return nil
  216. }
  217. func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, newIpsWithTime []IPWithTimestamp) bool {
  218. // Get the inbound configuration
  219. inbound, err := j.getInboundByEmail(clientEmail)
  220. if err != nil {
  221. logger.Errorf("failed to fetch inbound settings for email %s: %s", clientEmail, err)
  222. return false
  223. }
  224. if inbound.Settings == "" {
  225. logger.Debug("wrong data:", inbound)
  226. return false
  227. }
  228. settings := map[string][]model.Client{}
  229. json.Unmarshal([]byte(inbound.Settings), &settings)
  230. clients := settings["clients"]
  231. // Find the client's IP limit
  232. var limitIp int
  233. var clientFound bool
  234. for _, client := range clients {
  235. if client.Email == clientEmail {
  236. limitIp = client.LimitIP
  237. clientFound = true
  238. break
  239. }
  240. }
  241. if !clientFound || limitIp <= 0 || !inbound.Enable {
  242. // No limit or inbound disabled, just update and return
  243. jsonIps, _ := json.Marshal(newIpsWithTime)
  244. inboundClientIps.Ips = string(jsonIps)
  245. db := database.GetDB()
  246. db.Save(inboundClientIps)
  247. return false
  248. }
  249. // Parse old IPs from database
  250. var oldIpsWithTime []IPWithTimestamp
  251. if inboundClientIps.Ips != "" {
  252. json.Unmarshal([]byte(inboundClientIps.Ips), &oldIpsWithTime)
  253. }
  254. // Merge old and new IPs, keeping the latest timestamp for each IP
  255. ipMap := make(map[string]int64)
  256. for _, ipTime := range oldIpsWithTime {
  257. ipMap[ipTime.IP] = ipTime.Timestamp
  258. }
  259. for _, ipTime := range newIpsWithTime {
  260. if existingTime, ok := ipMap[ipTime.IP]; !ok || ipTime.Timestamp > existingTime {
  261. ipMap[ipTime.IP] = ipTime.Timestamp
  262. }
  263. }
  264. // Convert back to slice and sort by timestamp (oldest first)
  265. // This ensures we always protect the original/current connections and ban new excess ones.
  266. allIps := make([]IPWithTimestamp, 0, len(ipMap))
  267. for ip, timestamp := range ipMap {
  268. allIps = append(allIps, IPWithTimestamp{IP: ip, Timestamp: timestamp})
  269. }
  270. sort.Slice(allIps, func(i, j int) bool {
  271. return allIps[i].Timestamp < allIps[j].Timestamp // Ascending order (oldest first)
  272. })
  273. shouldCleanLog := false
  274. j.disAllowedIps = []string{}
  275. // Open log file
  276. logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
  277. if err != nil {
  278. logger.Errorf("failed to open IP limit log file: %s", err)
  279. return false
  280. }
  281. defer logIpFile.Close()
  282. log.SetOutput(logIpFile)
  283. log.SetFlags(log.LstdFlags)
  284. // Check if we exceed the limit
  285. if len(allIps) > limitIp {
  286. shouldCleanLog = true
  287. // Keep the oldest IPs (currently active connections) and ban the new excess ones.
  288. keptIps := allIps[:limitIp]
  289. bannedIps := allIps[limitIp:]
  290. // Log banned IPs in the format fail2ban filters expect: [LIMIT_IP] Email = X || Disconnecting OLD IP = Y || Timestamp = Z
  291. for _, ipTime := range bannedIps {
  292. j.disAllowedIps = append(j.disAllowedIps, ipTime.IP)
  293. log.Printf("[LIMIT_IP] Email = %s || Disconnecting OLD IP = %s || Timestamp = %d", clientEmail, ipTime.IP, ipTime.Timestamp)
  294. }
  295. // Update database with only the currently active (kept) IPs
  296. jsonIps, _ := json.Marshal(keptIps)
  297. inboundClientIps.Ips = string(jsonIps)
  298. } else {
  299. // Under limit, save all IPs
  300. jsonIps, _ := json.Marshal(allIps)
  301. inboundClientIps.Ips = string(jsonIps)
  302. }
  303. db := database.GetDB()
  304. err = db.Save(inboundClientIps).Error
  305. if err != nil {
  306. logger.Error("failed to save inboundClientIps:", err)
  307. return false
  308. }
  309. if len(j.disAllowedIps) > 0 {
  310. logger.Infof("[LIMIT_IP] Client %s: Kept %d current IPs, queued %d new IPs for fail2ban", clientEmail, limitIp, len(j.disAllowedIps))
  311. }
  312. return shouldCleanLog
  313. }
  314. func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
  315. db := database.GetDB()
  316. inbound := &model.Inbound{}
  317. err := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").First(inbound).Error
  318. if err != nil {
  319. return nil, err
  320. }
  321. return inbound, nil
  322. }