check_client_ip_job.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package job
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "io"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "regexp"
  10. "sort"
  11. "strings"
  12. "sync"
  13. "time"
  14. "x-ui/config"
  15. "x-ui/database"
  16. "x-ui/database/model"
  17. "x-ui/logger"
  18. "x-ui/xray"
  19. )
  20. type CheckClientIpJob struct {
  21. disAllowedIps []string
  22. }
  23. var job *CheckClientIpJob
  24. var ipFiles = []string{
  25. xray.GetIPLimitLogPath(),
  26. xray.GetIPLimitBannedLogPath(),
  27. xray.GetIPLimitBannedPrevLogPath(),
  28. xray.GetAccessPersistentLogPath(),
  29. xray.GetAccessPersistentPrevLogPath(),
  30. }
  31. func NewCheckClientIpJob() *CheckClientIpJob {
  32. job = new(CheckClientIpJob)
  33. return job
  34. }
  35. func (j *CheckClientIpJob) Run() {
  36. var wg sync.WaitGroup
  37. if j.checkFail2BanInstalled() {
  38. j.openLogFiles(ipFiles)
  39. }
  40. if j.hasLimitIp() {
  41. if j.checkFail2BanInstalled() && xray.GetAccessLogPath() == "./access.log" {
  42. j.processLogFile()
  43. } else {
  44. if !j.checkFail2BanInstalled() {
  45. logger.Warning("fail2ban is not installed. IP limiting may not work properly.")
  46. }
  47. switch xray.GetAccessLogPath() {
  48. case "none":
  49. logger.Warning("Access log is set to 'none', check your Xray Configs")
  50. case "":
  51. logger.Warning("Access log doesn't exist in your Xray Configs")
  52. }
  53. }
  54. }
  55. if !j.checkFail2BanInstalled() && xray.GetAccessLogPath() == "./access.log" {
  56. wg.Add(1)
  57. go func() {
  58. defer wg.Done()
  59. j.clearLogTime()
  60. }()
  61. wg.Wait()
  62. }
  63. }
  64. func (j *CheckClientIpJob) clearLogTime() {
  65. ticker := time.NewTicker(time.Hour)
  66. defer ticker.Stop()
  67. for range ticker.C {
  68. j.clearAccessLog()
  69. }
  70. }
  71. func (j *CheckClientIpJob) clearAccessLog() {
  72. accessLogPath := xray.GetAccessLogPath()
  73. logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
  74. j.checkError(err)
  75. defer logAccessP.Close()
  76. // reopen the access log file for reading
  77. file, err := os.Open(accessLogPath)
  78. j.checkError(err)
  79. // copy access log content to persistent file
  80. _, err = io.Copy(logAccessP, file)
  81. j.checkError(err)
  82. // close the file after copying content
  83. file.Close()
  84. // clean access log
  85. err = os.Truncate(accessLogPath, 0)
  86. j.checkError(err)
  87. }
  88. func (j *CheckClientIpJob) hasLimitIp() bool {
  89. db := database.GetDB()
  90. var inbounds []*model.Inbound
  91. err := db.Model(model.Inbound{}).Find(&inbounds).Error
  92. if err != nil {
  93. return false
  94. }
  95. for _, inbound := range inbounds {
  96. if inbound.Settings == "" {
  97. continue
  98. }
  99. settings := map[string][]model.Client{}
  100. json.Unmarshal([]byte(inbound.Settings), &settings)
  101. clients := settings["clients"]
  102. for _, client := range clients {
  103. limitIp := client.LimitIP
  104. if limitIp > 0 {
  105. return true
  106. }
  107. }
  108. }
  109. return false
  110. }
  111. func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
  112. cmd := "fail2ban-client"
  113. args := []string{"-h"}
  114. err := exec.Command(cmd, args...).Run()
  115. return err == nil
  116. }
  117. func (j *CheckClientIpJob) openLogFiles(ipFiles []string) {
  118. for i := 0; i < len(ipFiles); i++ {
  119. err := os.MkdirAll(config.GetLogFolder(), 0770)
  120. j.checkError(err)
  121. file, err := os.OpenFile(ipFiles[i], os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
  122. j.checkError(err)
  123. defer file.Close()
  124. }
  125. }
  126. func (j *CheckClientIpJob) processLogFile() {
  127. accessLogPath := xray.GetAccessLogPath()
  128. file, err := os.Open(accessLogPath)
  129. j.checkError(err)
  130. defer file.Close()
  131. InboundClientIps := make(map[string][]string)
  132. scanner := bufio.NewScanner(file)
  133. for scanner.Scan() {
  134. line := scanner.Text()
  135. ipRegx, _ := regexp.Compile(`(\d+\.\d+\.\d+\.\d+).* accepted`)
  136. emailRegx, _ := regexp.Compile(`email:.+`)
  137. matches := ipRegx.FindStringSubmatch(line)
  138. if len(matches) > 1 {
  139. ip := matches[1]
  140. if ip == "127.0.0.1" {
  141. continue
  142. }
  143. matchesEmail := emailRegx.FindString(line)
  144. if matchesEmail == "" {
  145. continue
  146. }
  147. matchesEmail = strings.TrimSpace(strings.Split(matchesEmail, "email: ")[1])
  148. if InboundClientIps[matchesEmail] != nil {
  149. if j.contains(InboundClientIps[matchesEmail], ip) {
  150. continue
  151. }
  152. InboundClientIps[matchesEmail] = append(InboundClientIps[matchesEmail], ip)
  153. } else {
  154. InboundClientIps[matchesEmail] = append(InboundClientIps[matchesEmail], ip)
  155. }
  156. }
  157. }
  158. j.checkError(scanner.Err())
  159. shouldCleanLog := false
  160. for clientEmail, ips := range InboundClientIps {
  161. inboundClientIps, err := j.getInboundClientIps(clientEmail)
  162. sort.Strings(ips)
  163. if err != nil {
  164. j.addInboundClientIps(clientEmail, ips)
  165. } else {
  166. shouldCleanLog = j.updateInboundClientIps(inboundClientIps, clientEmail, ips)
  167. }
  168. }
  169. // added delay before cleaning logs to reduce chance of logging IP that already has been banned
  170. time.Sleep(time.Second * 2)
  171. if shouldCleanLog {
  172. j.clearAccessLog()
  173. }
  174. }
  175. func (j *CheckClientIpJob) checkError(e error) {
  176. if e != nil {
  177. logger.Warning("client ip job err:", e)
  178. }
  179. }
  180. func (j *CheckClientIpJob) contains(s []string, str string) bool {
  181. for _, v := range s {
  182. if v == str {
  183. return true
  184. }
  185. }
  186. return false
  187. }
  188. func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) {
  189. db := database.GetDB()
  190. InboundClientIps := &model.InboundClientIps{}
  191. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  192. if err != nil {
  193. return nil, err
  194. }
  195. return InboundClientIps, nil
  196. }
  197. func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ips []string) error {
  198. inboundClientIps := &model.InboundClientIps{}
  199. jsonIps, err := json.Marshal(ips)
  200. j.checkError(err)
  201. inboundClientIps.ClientEmail = clientEmail
  202. inboundClientIps.Ips = string(jsonIps)
  203. db := database.GetDB()
  204. tx := db.Begin()
  205. defer func() {
  206. if err == nil {
  207. tx.Commit()
  208. } else {
  209. tx.Rollback()
  210. }
  211. }()
  212. err = tx.Save(inboundClientIps).Error
  213. if err != nil {
  214. return err
  215. }
  216. return nil
  217. }
  218. func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, ips []string) bool {
  219. jsonIps, err := json.Marshal(ips)
  220. j.checkError(err)
  221. inboundClientIps.ClientEmail = clientEmail
  222. inboundClientIps.Ips = string(jsonIps)
  223. // check inbound limitation
  224. inbound, err := j.getInboundByEmail(clientEmail)
  225. j.checkError(err)
  226. if inbound.Settings == "" {
  227. logger.Debug("wrong data ", inbound)
  228. return false
  229. }
  230. settings := map[string][]model.Client{}
  231. json.Unmarshal([]byte(inbound.Settings), &settings)
  232. clients := settings["clients"]
  233. shouldCleanLog := false
  234. j.disAllowedIps = []string{}
  235. // create iplimit log file channel
  236. logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
  237. if err != nil {
  238. logger.Errorf("failed to create or open ip limit log file: %s", err)
  239. }
  240. defer logIpFile.Close()
  241. log.SetOutput(logIpFile)
  242. log.SetFlags(log.LstdFlags)
  243. for _, client := range clients {
  244. if client.Email == clientEmail {
  245. limitIp := client.LimitIP
  246. if limitIp != 0 {
  247. shouldCleanLog = true
  248. if limitIp < len(ips) && inbound.Enable {
  249. j.disAllowedIps = append(j.disAllowedIps, ips[limitIp:]...)
  250. for i := limitIp; i < len(ips); i++ {
  251. log.Printf("[LIMIT_IP] Email = %s || SRC = %s", clientEmail, ips[i])
  252. }
  253. }
  254. }
  255. }
  256. }
  257. sort.Strings(j.disAllowedIps)
  258. if len(j.disAllowedIps) > 0 {
  259. logger.Debug("disAllowedIps ", j.disAllowedIps)
  260. }
  261. db := database.GetDB()
  262. err = db.Save(inboundClientIps).Error
  263. if err != nil {
  264. return shouldCleanLog
  265. }
  266. return shouldCleanLog
  267. }
  268. func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
  269. db := database.GetDB()
  270. var inbounds *model.Inbound
  271. err := db.Model(model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").Find(&inbounds).Error
  272. if err != nil {
  273. return nil, err
  274. }
  275. return inbounds, nil
  276. }