inbound_traffic.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  12. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  13. "gorm.io/gorm"
  14. "gorm.io/gorm/clause"
  15. )
  16. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  17. var disabledNodeIDs []int
  18. err = submitTrafficWrite(func() error {
  19. var inner error
  20. needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  21. return inner
  22. })
  23. if err == nil && len(disabledNodeIDs) > 0 {
  24. s.restartRemoteNodesOnDisable(disabledNodeIDs)
  25. }
  26. return
  27. }
  28. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) {
  29. var err error
  30. db := database.GetDB()
  31. tx := db.Begin()
  32. defer func() {
  33. if err != nil {
  34. tx.Rollback()
  35. } else {
  36. tx.Commit()
  37. }
  38. }()
  39. err = s.addInboundTraffic(tx, inboundTraffics)
  40. if err != nil {
  41. return false, false, nil, err
  42. }
  43. err = s.addClientTraffic(tx, clientTraffics)
  44. if err != nil {
  45. return false, false, nil, err
  46. }
  47. needRestart0, count, err := s.autoRenewClients(tx)
  48. if err != nil {
  49. logger.Warning("Error in renew clients:", err)
  50. } else if count > 0 {
  51. logger.Debugf("%v clients renewed", count)
  52. }
  53. disabledClientsCount := int64(0)
  54. needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx)
  55. if err != nil {
  56. logger.Warning("Error in disabling invalid clients:", err)
  57. } else if count > 0 {
  58. logger.Debugf("%v clients disabled", count)
  59. disabledClientsCount = count
  60. }
  61. needRestart2, count, err := s.disableInvalidInbounds(tx)
  62. if err != nil {
  63. logger.Warning("Error in disabling invalid inbounds:", err)
  64. } else if count > 0 {
  65. logger.Debugf("%v inbounds disabled", count)
  66. }
  67. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil
  68. }
  69. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  70. if len(traffics) == 0 {
  71. return nil
  72. }
  73. var err error
  74. for _, traffic := range traffics {
  75. if traffic.IsInbound {
  76. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  77. Updates(map[string]any{
  78. "up": gorm.Expr("up + ?", traffic.Up),
  79. "down": gorm.Expr("down + ?", traffic.Down),
  80. }).Error
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. }
  86. return nil
  87. }
  88. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  89. if len(traffics) == 0 {
  90. return nil
  91. }
  92. emails := make([]string, 0, len(traffics))
  93. for _, traffic := range traffics {
  94. emails = append(emails, traffic.Email)
  95. }
  96. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  97. // Match purely by email. client_traffics is email-keyed (one shared row per
  98. // email regardless of how many inbounds the client is attached to), and these
  99. // emails come from the local xray's report, so they always belong to a client
  100. // attached to a local inbound. The old `inbound_id NOT IN (node inbounds)`
  101. // filter dropped the local traffic of a client attached to both a node and the
  102. // mother inbound whenever the node inbound happened to be attached first — its
  103. // shared row then carried the node inbound's id (AddClientStat uses OnConflict
  104. // DoNothing and never refreshes it), so the local poll skipped it entirely.
  105. err = tx.Model(xray.ClientTraffic{}).
  106. Where("email IN (?)", emails).
  107. Find(&dbClientTraffics).Error
  108. if err != nil {
  109. return err
  110. }
  111. // Avoid empty slice error
  112. if len(dbClientTraffics) == 0 {
  113. return nil
  114. }
  115. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  116. if err != nil {
  117. return err
  118. }
  119. // Index by email for O(N) merge — the previous nested loop was O(N²)
  120. // and dominated each cron tick on inbounds with thousands of active
  121. // clients (7500 × 7500 = 56M string comparisons every 10 seconds).
  122. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  123. for i := range traffics {
  124. if traffics[i] != nil {
  125. trafficByEmail[traffics[i].Email] = traffics[i]
  126. }
  127. }
  128. now := time.Now().UnixMilli()
  129. for dbTraffic_index := range dbClientTraffics {
  130. t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
  131. if !ok {
  132. continue
  133. }
  134. dbClientTraffics[dbTraffic_index].Up += t.Up
  135. dbClientTraffics[dbTraffic_index].Down += t.Down
  136. if t.Up+t.Down > 0 {
  137. dbClientTraffics[dbTraffic_index].LastOnline = now
  138. }
  139. }
  140. err = tx.Save(dbClientTraffics).Error
  141. if err != nil {
  142. logger.Warning("AddClientTraffic update data ", err)
  143. }
  144. return nil
  145. }
  146. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  147. now := time.Now().UnixMilli()
  148. // "Start After First Use" stores a negative expiry (the duration). On the
  149. // first traffic tick it becomes an absolute deadline of now+duration. Compute
  150. // it once per email so every inbound the client is attached to lands on the
  151. // same value (recomputing per inbound would skip all but the first one).
  152. newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
  153. for traffic_index := range dbClientTraffics {
  154. if dbClientTraffics[traffic_index].ExpiryTime < 0 {
  155. newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
  156. }
  157. }
  158. if len(newExpiryByEmail) == 0 {
  159. return dbClientTraffics, nil
  160. }
  161. delayedEmails := make([]string, 0, len(newExpiryByEmail))
  162. for email := range newExpiryByEmail {
  163. delayedEmails = append(delayedEmails, email)
  164. }
  165. // Resolve the owning inbounds through the client_inbounds link, which is
  166. // authoritative. client_traffics.inbound_id goes stale when an inbound is
  167. // deleted and recreated, which would leave the negative expiry unconverted.
  168. var inboundIds []int
  169. err := tx.Table("client_inbounds").
  170. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  171. Where("clients.email IN (?)", delayedEmails).
  172. Distinct().
  173. Pluck("client_inbounds.inbound_id", &inboundIds).Error
  174. if err != nil {
  175. return nil, err
  176. }
  177. if len(inboundIds) == 0 {
  178. return dbClientTraffics, nil
  179. }
  180. var inbounds []*model.Inbound
  181. err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  182. if err != nil {
  183. return nil, err
  184. }
  185. for inbound_index := range inbounds {
  186. settings := map[string]any{}
  187. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  188. clients, ok := settings["clients"].([]any)
  189. if ok {
  190. var newClients []any
  191. for client_index := range clients {
  192. c := clients[client_index].(map[string]any)
  193. email, _ := c["email"].(string)
  194. if newExpiry, ok := newExpiryByEmail[email]; ok {
  195. c["expiryTime"] = newExpiry
  196. c["updated_at"] = now
  197. }
  198. if _, ok := c["created_at"]; !ok {
  199. c["created_at"] = now
  200. }
  201. if _, ok := c["updated_at"]; !ok {
  202. c["updated_at"] = now
  203. }
  204. newClients = append(newClients, any(c))
  205. }
  206. settings["clients"] = newClients
  207. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  208. if err != nil {
  209. return nil, err
  210. }
  211. inbounds[inbound_index].Settings = string(modifiedSettings)
  212. }
  213. }
  214. for traffic_index := range dbClientTraffics {
  215. if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
  216. dbClientTraffics[traffic_index].ExpiryTime = newExpiry
  217. }
  218. }
  219. err = tx.Save(inbounds).Error
  220. if err != nil {
  221. logger.Warning("AddClientTraffic update inbounds ", err)
  222. logger.Error(inbounds)
  223. } else {
  224. for _, ib := range inbounds {
  225. if ib == nil {
  226. continue
  227. }
  228. cs, gcErr := s.GetClients(ib)
  229. if gcErr != nil {
  230. logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
  231. continue
  232. }
  233. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  234. logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
  235. }
  236. }
  237. }
  238. return dbClientTraffics, nil
  239. }
  240. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  241. // check for time expired
  242. var traffics []*xray.ClientTraffic
  243. now := time.Now().Unix() * 1000
  244. var err, err1 error
  245. err = tx.Model(xray.ClientTraffic{}).
  246. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  247. Where("inbound_id NOT IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NOT NULL")).
  248. Find(&traffics).Error
  249. if err != nil {
  250. return false, 0, err
  251. }
  252. // return if there is no client to renew
  253. if len(traffics) == 0 {
  254. return false, 0, nil
  255. }
  256. var inbound_ids []int
  257. var inbounds []*model.Inbound
  258. needRestart := false
  259. var clientsToAdd []struct {
  260. protocol string
  261. tag string
  262. client map[string]any
  263. }
  264. // Resolve the inbounds to renew through the client_inbounds link rather than
  265. // client_traffics.inbound_id, which goes stale after an inbound is deleted and
  266. // recreated and would otherwise skip the renew entirely.
  267. renewEmails := make([]string, 0, len(traffics))
  268. for _, traffic := range traffics {
  269. renewEmails = append(renewEmails, traffic.Email)
  270. }
  271. for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
  272. var ids []int
  273. if err = tx.Table("client_inbounds").
  274. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  275. Where("clients.email IN ?", batch).
  276. Distinct().
  277. Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
  278. return false, 0, err
  279. }
  280. inbound_ids = append(inbound_ids, ids...)
  281. }
  282. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  283. // per tick instead of N times across chunk boundaries.
  284. inbound_ids = uniqueInts(inbound_ids)
  285. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  286. // are touched in a single tick.
  287. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  288. var page []*model.Inbound
  289. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  290. return false, 0, err
  291. }
  292. inbounds = append(inbounds, page...)
  293. }
  294. for inbound_index := range inbounds {
  295. settings := map[string]any{}
  296. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  297. clients := settings["clients"].([]any)
  298. for client_index := range clients {
  299. c := clients[client_index].(map[string]any)
  300. for traffic_index, traffic := range traffics {
  301. if traffic.Email == c["email"].(string) {
  302. newExpiryTime := traffic.ExpiryTime
  303. for newExpiryTime < now {
  304. newExpiryTime += (int64(traffic.Reset) * 86400000)
  305. }
  306. c["expiryTime"] = newExpiryTime
  307. traffics[traffic_index].ExpiryTime = newExpiryTime
  308. traffics[traffic_index].Down = 0
  309. traffics[traffic_index].Up = 0
  310. if !traffic.Enable {
  311. traffics[traffic_index].Enable = true
  312. c["enable"] = true
  313. clientsToAdd = append(clientsToAdd,
  314. struct {
  315. protocol string
  316. tag string
  317. client map[string]any
  318. }{
  319. protocol: string(inbounds[inbound_index].Protocol),
  320. tag: inbounds[inbound_index].Tag,
  321. client: c,
  322. })
  323. }
  324. clients[client_index] = any(c)
  325. break
  326. }
  327. }
  328. }
  329. settings["clients"] = clients
  330. newSettings, err := json.MarshalIndent(settings, "", " ")
  331. if err != nil {
  332. return false, 0, err
  333. }
  334. inbounds[inbound_index].Settings = string(newSettings)
  335. }
  336. err = tx.Save(inbounds).Error
  337. if err != nil {
  338. return false, 0, err
  339. }
  340. for _, ib := range inbounds {
  341. if ib == nil {
  342. continue
  343. }
  344. cs, gcErr := s.GetClients(ib)
  345. if gcErr != nil {
  346. logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
  347. continue
  348. }
  349. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  350. logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
  351. }
  352. }
  353. err = tx.Save(traffics).Error
  354. if err != nil {
  355. return false, 0, err
  356. }
  357. // A renewed client starts a fresh quota window: drop the cross-panel rows
  358. // too, or the stale pushed totals would re-deplete it immediately.
  359. if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
  360. return false, 0, err
  361. }
  362. if p != nil {
  363. err1 = s.xrayApi.Init(p.GetAPIPort())
  364. if err1 != nil {
  365. return true, int64(len(traffics)), nil
  366. }
  367. for _, clientToAdd := range clientsToAdd {
  368. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  369. if err1 != nil {
  370. needRestart = true
  371. }
  372. }
  373. s.xrayApi.Close()
  374. }
  375. return needRestart, int64(len(traffics)), nil
  376. }
  377. // AddClientStat inserts a per-client accounting row, no-op on email
  378. // conflict. Xray reports traffic per email, so the surviving row acts as
  379. // the shared accumulator for inbounds that re-use the same identity.
  380. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  381. clientTraffic := xray.ClientTraffic{
  382. InboundId: inboundId,
  383. Email: client.Email,
  384. Total: client.TotalGB,
  385. ExpiryTime: client.ExpiryTime,
  386. Enable: client.Enable,
  387. Reset: client.Reset,
  388. }
  389. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  390. Create(&clientTraffic).Error
  391. }
  392. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  393. result := tx.Model(xray.ClientTraffic{}).
  394. Where("email = ?", email).
  395. Updates(map[string]any{
  396. "enable": client.Enable,
  397. "email": client.Email,
  398. "total": client.TotalGB,
  399. "expiry_time": client.ExpiryTime,
  400. "reset": client.Reset,
  401. })
  402. err := result.Error
  403. return err
  404. }
  405. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  406. if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
  407. return err
  408. }
  409. if err := clearGlobalTraffic(tx, email); err != nil {
  410. return err
  411. }
  412. return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
  413. }
  414. func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
  415. const chunk = 400
  416. for start := 0; start < len(emails); start += chunk {
  417. end := min(start+chunk, len(emails))
  418. batch := emails[start:end]
  419. if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
  420. return err
  421. }
  422. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  423. return err
  424. }
  425. if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  426. return err
  427. }
  428. }
  429. return nil
  430. }
  431. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  432. return submitTrafficWrite(func() error {
  433. db := database.GetDB()
  434. if err := clearGlobalTraffic(db, clientEmail); err != nil {
  435. return err
  436. }
  437. return db.Model(xray.ClientTraffic{}).
  438. Where("email = ?", clientEmail).
  439. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
  440. })
  441. }
  442. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  443. err = submitTrafficWrite(func() error {
  444. var inner error
  445. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  446. return inner
  447. })
  448. return
  449. }
  450. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  451. needRestart := false
  452. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  453. if err != nil {
  454. return false, err
  455. }
  456. if !traffic.Enable {
  457. inbound, err := s.GetInbound(id)
  458. if err != nil {
  459. return false, err
  460. }
  461. clients, err := s.GetClients(inbound)
  462. if err != nil {
  463. return false, err
  464. }
  465. for _, client := range clients {
  466. if client.Email == clientEmail && client.Enable {
  467. rt, push, dirty, perr := s.nodePushPlan(inbound)
  468. if perr != nil {
  469. return false, perr
  470. }
  471. if !push {
  472. if inbound.NodeID != nil {
  473. if dirty {
  474. if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
  475. logger.Warning("mark node dirty failed:", dErr)
  476. }
  477. }
  478. } else {
  479. needRestart = true
  480. }
  481. break
  482. }
  483. cipher := ""
  484. if string(inbound.Protocol) == "shadowsocks" {
  485. var oldSettings map[string]any
  486. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  487. if err != nil {
  488. return false, err
  489. }
  490. cipher = oldSettings["method"].(string)
  491. }
  492. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  493. "email": client.Email,
  494. "id": client.ID,
  495. "auth": client.Auth,
  496. "security": client.Security,
  497. "flow": client.Flow,
  498. "password": client.Password,
  499. "cipher": cipher,
  500. })
  501. if err1 == nil {
  502. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  503. } else if inbound.NodeID != nil {
  504. logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
  505. if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
  506. logger.Warning("mark node dirty failed:", dErr)
  507. }
  508. } else {
  509. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  510. needRestart = true
  511. }
  512. break
  513. }
  514. }
  515. }
  516. traffic.Up = 0
  517. traffic.Down = 0
  518. traffic.Enable = true
  519. db := database.GetDB()
  520. err = db.Save(traffic).Error
  521. if err != nil {
  522. return false, err
  523. }
  524. if err := clearGlobalTraffic(db, clientEmail); err != nil {
  525. return false, err
  526. }
  527. now := time.Now().UnixMilli()
  528. _ = db.Model(model.Inbound{}).
  529. Where("id = ?", id).
  530. Update("last_traffic_reset_time", now).Error
  531. inbound, err := s.GetInbound(id)
  532. if err == nil && inbound != nil && inbound.NodeID != nil {
  533. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  534. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  535. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  536. }
  537. } else {
  538. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  539. }
  540. }
  541. return needRestart, nil
  542. }
  543. func (s *InboundService) ResetAllTraffics() error {
  544. return submitTrafficWrite(func() error {
  545. return s.resetAllTrafficsLocked()
  546. })
  547. }
  548. func (s *InboundService) resetAllTrafficsLocked() error {
  549. db := database.GetDB()
  550. now := time.Now().UnixMilli()
  551. if err := db.Model(model.Inbound{}).
  552. Where("user_id > ?", 0).
  553. Updates(map[string]any{
  554. "up": 0,
  555. "down": 0,
  556. "last_traffic_reset_time": now,
  557. }).Error; err != nil {
  558. return err
  559. }
  560. nodes, err := (&NodeService{}).GetAll()
  561. if err == nil {
  562. for _, node := range nodes {
  563. if rt, err := runtime.GetManager().RuntimeFor(&node.Id); err == nil {
  564. if e := rt.ResetAllTraffics(context.Background()); e != nil {
  565. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  566. }
  567. }
  568. }
  569. }
  570. return nil
  571. }
  572. func (s *InboundService) ResetInboundTraffic(id int) error {
  573. return submitTrafficWrite(func() error {
  574. db := database.GetDB()
  575. if err := db.Model(model.Inbound{}).
  576. Where("id = ?", id).
  577. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  578. return err
  579. }
  580. inbound, err := s.GetInbound(id)
  581. if err == nil && inbound != nil && inbound.NodeID != nil {
  582. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  583. if e := rt.ResetInboundTraffic(context.Background(), inbound); e != nil {
  584. logger.Warning("ResetInboundTraffic: remote propagation to", rt.Name(), "failed:", e)
  585. }
  586. } else {
  587. logger.Warning("ResetInboundTraffic: runtime lookup failed:", rterr)
  588. }
  589. }
  590. return nil
  591. })
  592. }
  593. func (s *InboundService) DelDepletedClients(id int) (err error) {
  594. db := database.GetDB()
  595. tx := db.Begin()
  596. defer func() {
  597. if err == nil {
  598. tx.Commit()
  599. } else {
  600. tx.Rollback()
  601. }
  602. }()
  603. // Collect depleted emails globally — a shared-email row owned by one
  604. // inbound depletes every sibling that lists the email.
  605. now := time.Now().Unix() * 1000
  606. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  607. var depletedRows []xray.ClientTraffic
  608. err = db.Model(xray.ClientTraffic{}).
  609. Where(depletedClause, now).
  610. Find(&depletedRows).Error
  611. if err != nil {
  612. return err
  613. }
  614. if len(depletedRows) == 0 {
  615. return nil
  616. }
  617. depletedEmails := make(map[string]struct{}, len(depletedRows))
  618. for _, r := range depletedRows {
  619. if r.Email == "" {
  620. continue
  621. }
  622. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  623. }
  624. if len(depletedEmails) == 0 {
  625. return nil
  626. }
  627. var inbounds []*model.Inbound
  628. inboundQuery := db.Model(model.Inbound{})
  629. if id >= 0 {
  630. inboundQuery = inboundQuery.Where("id = ?", id)
  631. }
  632. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  633. return err
  634. }
  635. for _, inbound := range inbounds {
  636. var settings map[string]any
  637. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  638. return err
  639. }
  640. rawClients, ok := settings["clients"].([]any)
  641. if !ok {
  642. continue
  643. }
  644. newClients := make([]any, 0, len(rawClients))
  645. removed := 0
  646. for _, client := range rawClients {
  647. c, ok := client.(map[string]any)
  648. if !ok {
  649. newClients = append(newClients, client)
  650. continue
  651. }
  652. email, _ := c["email"].(string)
  653. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  654. removed++
  655. continue
  656. }
  657. newClients = append(newClients, client)
  658. }
  659. if removed == 0 {
  660. continue
  661. }
  662. if len(newClients) == 0 {
  663. s.DelInbound(inbound.Id)
  664. continue
  665. }
  666. settings["clients"] = newClients
  667. ns, mErr := json.MarshalIndent(settings, "", " ")
  668. if mErr != nil {
  669. return mErr
  670. }
  671. inbound.Settings = string(ns)
  672. if err = tx.Save(inbound).Error; err != nil {
  673. return err
  674. }
  675. survivingClients, gcErr := s.GetClients(inbound)
  676. if gcErr != nil {
  677. err = gcErr
  678. return err
  679. }
  680. if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
  681. return err
  682. }
  683. }
  684. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  685. // no out-of-scope inbound still references the email.
  686. if id < 0 {
  687. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  688. return err
  689. }
  690. emails := make([]string, 0, len(depletedEmails))
  691. for e := range depletedEmails {
  692. emails = append(emails, e)
  693. }
  694. var stillReferenced []string
  695. emailExpr := database.JSONFieldText("client.value", "email")
  696. stillQuery := fmt.Sprintf(
  697. "SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
  698. emailExpr,
  699. database.JSONClientsFromInbound(),
  700. emailExpr,
  701. )
  702. if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
  703. return err
  704. }
  705. stillSet := make(map[string]struct{}, len(stillReferenced))
  706. for _, e := range stillReferenced {
  707. stillSet[e] = struct{}{}
  708. }
  709. toDelete := make([]string, 0, len(emails))
  710. for _, e := range emails {
  711. if _, kept := stillSet[e]; !kept {
  712. toDelete = append(toDelete, e)
  713. }
  714. }
  715. if len(toDelete) > 0 {
  716. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  717. return err
  718. }
  719. }
  720. return nil
  721. }
  722. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  723. db := database.GetDB()
  724. var inbounds []*model.Inbound
  725. // Retrieve inbounds where settings contain the given tgId
  726. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  727. if err != nil && err != gorm.ErrRecordNotFound {
  728. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  729. return nil, err
  730. }
  731. var emails []string
  732. for _, inbound := range inbounds {
  733. clients, err := s.GetClients(inbound)
  734. if err != nil {
  735. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  736. continue
  737. }
  738. for _, client := range clients {
  739. if client.TgID == tgId {
  740. emails = append(emails, client.Email)
  741. }
  742. }
  743. }
  744. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  745. // account owns thousands of clients across inbounds.
  746. uniqEmails := uniqueNonEmptyStrings(emails)
  747. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  748. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  749. var page []*xray.ClientTraffic
  750. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  751. if err == gorm.ErrRecordNotFound {
  752. continue
  753. }
  754. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  755. return nil, err
  756. }
  757. traffics = append(traffics, page...)
  758. }
  759. if len(traffics) == 0 {
  760. logger.Warning("No ClientTraffic records found for emails:", emails)
  761. return nil, nil
  762. }
  763. // Populate UUID and other client data for each traffic record
  764. for i := range traffics {
  765. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  766. traffics[i].Enable = client.Enable
  767. traffics[i].UUID = client.ID
  768. traffics[i].SubId = client.SubID
  769. }
  770. }
  771. return traffics, nil
  772. }
  773. // BumpClientsLastOnline sets client_traffics.last_online to now for the given
  774. // emails. Used in online-API mode for clients that hold a live connection but
  775. // moved no bytes this poll — the traffic path (addClientTraffic) only bumps
  776. // last_online on a non-zero delta, so idle-but-connected clients would
  777. // otherwise show a stale "last online" while being reported online.
  778. func (s *InboundService) BumpClientsLastOnline(emails []string) error {
  779. uniq := uniqueNonEmptyStrings(emails)
  780. if len(uniq) == 0 {
  781. return nil
  782. }
  783. now := time.Now().UnixMilli()
  784. return submitTrafficWrite(func() error {
  785. db := database.GetDB()
  786. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  787. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil {
  788. return err
  789. }
  790. }
  791. return nil
  792. })
  793. }
  794. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  795. uniq := uniqueNonEmptyStrings(emails)
  796. if len(uniq) == 0 {
  797. return nil, nil
  798. }
  799. db := database.GetDB()
  800. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  801. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  802. var page []*xray.ClientTraffic
  803. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  804. return nil, err
  805. }
  806. traffics = append(traffics, page...)
  807. }
  808. return traffics, nil
  809. }
  810. // GetAllClientTraffics returns the full set of client_traffics rows so the
  811. // websocket broadcasters can ship a complete snapshot every cycle. The old
  812. // delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
  813. // the per-client section whenever no client moved bytes in the cycle or a
  814. // node sync failed, leaving client rows in the UI stuck at stale numbers.
  815. func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
  816. db := database.GetDB()
  817. var traffics []*xray.ClientTraffic
  818. if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
  819. return nil, err
  820. }
  821. overlayGlobalTraffic(db, traffics)
  822. return traffics, nil
  823. }
  824. type InboundTrafficSummary struct {
  825. Id int `json:"id"`
  826. Up int64 `json:"up"`
  827. Down int64 `json:"down"`
  828. Total int64 `json:"total"`
  829. Enable bool `json:"enable"`
  830. }
  831. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  832. db := database.GetDB()
  833. var summaries []InboundTrafficSummary
  834. if err := db.Model(&model.Inbound{}).
  835. Select("id, up, down, total, enable").
  836. Find(&summaries).Error; err != nil {
  837. return nil, err
  838. }
  839. return summaries, nil
  840. }
  841. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  842. db := database.GetDB()
  843. var traffics []*xray.ClientTraffic
  844. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil {
  845. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  846. return nil, err
  847. }
  848. if len(traffics) == 0 {
  849. return nil, nil
  850. }
  851. overlayGlobalTraffic(db, traffics)
  852. t := traffics[0]
  853. if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
  854. c := rec.ToClient()
  855. t.UUID = c.ID
  856. t.SubId = c.SubID
  857. return t, nil
  858. }
  859. t2, client, err := s.GetClientByEmail(email)
  860. if err != nil {
  861. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  862. return nil, err
  863. }
  864. if t2 != nil && client != nil {
  865. t2.UUID = client.ID
  866. t2.SubId = client.SubID
  867. return t2, nil
  868. }
  869. return nil, nil
  870. }
  871. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  872. return submitTrafficWrite(func() error {
  873. db := database.GetDB()
  874. err := db.Model(xray.ClientTraffic{}).
  875. Where("email = ?", email).
  876. Updates(map[string]any{
  877. "up": upload,
  878. "down": download,
  879. }).Error
  880. if err != nil {
  881. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  882. }
  883. return err
  884. })
  885. }
  886. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  887. db := database.GetDB()
  888. inbound := &model.Inbound{}
  889. traffic = &xray.ClientTraffic{}
  890. // Search for inbound settings that contain the query
  891. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  892. if err != nil {
  893. if err == gorm.ErrRecordNotFound {
  894. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  895. return nil, err
  896. }
  897. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  898. return nil, err
  899. }
  900. traffic.InboundId = inbound.Id
  901. // Unmarshal settings to get clients
  902. settings := map[string][]model.Client{}
  903. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  904. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  905. return nil, err
  906. }
  907. clients := settings["clients"]
  908. for _, client := range clients {
  909. if (client.ID == query || client.Password == query) && client.Email != "" {
  910. traffic.Email = client.Email
  911. break
  912. }
  913. }
  914. if traffic.Email == "" {
  915. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  916. return nil, gorm.ErrRecordNotFound
  917. }
  918. // Retrieve ClientTraffic based on the found email
  919. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  920. if err != nil {
  921. if err == gorm.ErrRecordNotFound {
  922. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  923. return nil, err
  924. }
  925. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  926. return nil, err
  927. }
  928. return traffic, nil
  929. }