1
0

inbound_traffic.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "maps"
  8. "slices"
  9. "strings"
  10. "time"
  11. "github.com/mhsanaei/3x-ui/v3/internal/database"
  12. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  13. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  14. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  15. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  16. "gorm.io/gorm"
  17. "gorm.io/gorm/clause"
  18. )
  19. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  20. err = submitTrafficWrite(func() error {
  21. var inner error
  22. needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  23. return inner
  24. })
  25. return
  26. }
  27. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
  28. var err error
  29. db := database.GetDB()
  30. tx := db.Begin()
  31. defer func() {
  32. if err != nil {
  33. tx.Rollback()
  34. } else {
  35. tx.Commit()
  36. }
  37. }()
  38. err = s.addInboundTraffic(tx, inboundTraffics)
  39. if err != nil {
  40. return false, false, err
  41. }
  42. err = s.addClientTraffic(tx, clientTraffics)
  43. if err != nil {
  44. return false, false, err
  45. }
  46. needRestart0, count, err := s.autoRenewClients(tx)
  47. if err != nil {
  48. logger.Warning("Error in renew clients:", err)
  49. } else if count > 0 {
  50. logger.Debugf("%v clients renewed", count)
  51. }
  52. disabledClientsCount := int64(0)
  53. needRestart1, count, err := s.disableInvalidClients(tx)
  54. if err != nil {
  55. logger.Warning("Error in disabling invalid clients:", err)
  56. } else if count > 0 {
  57. logger.Debugf("%v clients disabled", count)
  58. disabledClientsCount = count
  59. }
  60. needRestart2, count, err := s.disableInvalidInbounds(tx)
  61. if err != nil {
  62. logger.Warning("Error in disabling invalid inbounds:", err)
  63. } else if count > 0 {
  64. logger.Debugf("%v inbounds disabled", count)
  65. }
  66. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
  67. }
  68. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  69. if len(traffics) == 0 {
  70. return nil
  71. }
  72. var err error
  73. for _, traffic := range traffics {
  74. if traffic.IsInbound {
  75. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  76. Updates(map[string]any{
  77. "up": gorm.Expr("up + ?", traffic.Up),
  78. "down": gorm.Expr("down + ?", traffic.Down),
  79. }).Error
  80. if err != nil {
  81. return err
  82. }
  83. }
  84. }
  85. return nil
  86. }
  87. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  88. if len(traffics) == 0 {
  89. return nil
  90. }
  91. emails := make([]string, 0, len(traffics))
  92. for _, traffic := range traffics {
  93. emails = append(emails, traffic.Email)
  94. }
  95. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  96. // Match purely by email. client_traffics is email-keyed (one shared row per
  97. // email regardless of how many inbounds the client is attached to), and these
  98. // emails come from the local xray's report, so they always belong to a client
  99. // attached to a local inbound. The old `inbound_id NOT IN (node inbounds)`
  100. // filter dropped the local traffic of a client attached to both a node and the
  101. // mother inbound whenever the node inbound happened to be attached first — its
  102. // shared row then carried the node inbound's id (AddClientStat uses OnConflict
  103. // DoNothing and never refreshes it), so the local poll skipped it entirely.
  104. err = tx.Model(xray.ClientTraffic{}).
  105. Where("email IN (?)", emails).
  106. Find(&dbClientTraffics).Error
  107. if err != nil {
  108. return err
  109. }
  110. // Avoid empty slice error
  111. if len(dbClientTraffics) == 0 {
  112. return nil
  113. }
  114. dbClientTraffics, convertedExpiryByEmail, err := s.adjustTraffics(tx, dbClientTraffics)
  115. if err != nil {
  116. return err
  117. }
  118. // Index by email for O(N) merge.
  119. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  120. for i := range traffics {
  121. if traffics[i] != nil {
  122. trafficByEmail[traffics[i].Email] = traffics[i]
  123. }
  124. }
  125. now := time.Now().UnixMilli()
  126. // Use atomic per-row UPDATE instead of read-modify-write Save. tx.Save
  127. // issues UPDATEs in slice order, which varies between concurrent callers;
  128. // on PostgreSQL two transactions locking the same rows in opposite order
  129. // deadlock. An atomic "SET up = up + ?" never holds a row lock across a
  130. // subsequent lock acquisition, so concurrent writers cannot deadlock.
  131. for _, ct := range dbClientTraffics {
  132. t, ok := trafficByEmail[ct.Email]
  133. if !ok || (t.Up == 0 && t.Down == 0) {
  134. continue
  135. }
  136. if err = tx.Exec(
  137. fmt.Sprintf(
  138. `UPDATE client_traffics SET up = up + ?, down = down + ?, last_online = %s WHERE email = ?`,
  139. database.GreatestExpr("last_online", "?"),
  140. ),
  141. t.Up, t.Down, now, ct.Email,
  142. ).Error; err != nil {
  143. logger.Warning("AddClientTraffic update data ", err)
  144. }
  145. }
  146. // adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
  147. // deadline) in-memory. Persist that conversion now since the traffic UPDATE
  148. // above only touches up/down/last_online. Only converted emails are written:
  149. // updating every polled row issued one no-op UPDATE per active client per
  150. // poll. Sorted order keeps concurrent writers lock-compatible on Postgres.
  151. for _, email := range slices.Sorted(maps.Keys(convertedExpiryByEmail)) {
  152. if err = tx.Exec(
  153. `UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
  154. convertedExpiryByEmail[email], email,
  155. ).Error; err != nil {
  156. logger.Warning("AddClientTraffic update expiry_time ", err)
  157. }
  158. }
  159. return nil
  160. }
  161. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, map[string]int64, error) {
  162. now := time.Now().UnixMilli()
  163. // "Start After First Use" stores a negative expiry (the duration). On the
  164. // first traffic tick it becomes an absolute deadline of now+duration. Compute
  165. // it once per email so every inbound the client is attached to lands on the
  166. // same value (recomputing per inbound would skip all but the first one).
  167. newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
  168. for traffic_index := range dbClientTraffics {
  169. if dbClientTraffics[traffic_index].ExpiryTime < 0 {
  170. newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
  171. }
  172. }
  173. if len(newExpiryByEmail) == 0 {
  174. return dbClientTraffics, nil, nil
  175. }
  176. delayedEmails := make([]string, 0, len(newExpiryByEmail))
  177. for email := range newExpiryByEmail {
  178. delayedEmails = append(delayedEmails, email)
  179. }
  180. // Resolve the owning inbounds through the client_inbounds link, which is
  181. // authoritative. client_traffics.inbound_id goes stale when an inbound is
  182. // deleted and recreated, which would leave the negative expiry unconverted.
  183. var inboundIds []int
  184. err := tx.Table("client_inbounds").
  185. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  186. Where("clients.email IN (?)", delayedEmails).
  187. Distinct().
  188. Pluck("client_inbounds.inbound_id", &inboundIds).Error
  189. if err != nil {
  190. return nil, nil, err
  191. }
  192. if len(inboundIds) == 0 {
  193. return dbClientTraffics, nil, nil
  194. }
  195. var inbounds []*model.Inbound
  196. err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  197. if err != nil {
  198. return nil, nil, err
  199. }
  200. for inbound_index := range inbounds {
  201. settings := map[string]any{}
  202. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  203. clients, ok := settings["clients"].([]any)
  204. if ok {
  205. var newClients []any
  206. for client_index := range clients {
  207. c := clients[client_index].(map[string]any)
  208. email, _ := c["email"].(string)
  209. if newExpiry, ok := newExpiryByEmail[email]; ok {
  210. c["expiryTime"] = newExpiry
  211. c["updated_at"] = now
  212. }
  213. if _, ok := c["created_at"]; !ok {
  214. c["created_at"] = now
  215. }
  216. if _, ok := c["updated_at"]; !ok {
  217. c["updated_at"] = now
  218. }
  219. newClients = append(newClients, any(c))
  220. }
  221. settings["clients"] = newClients
  222. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  223. if err != nil {
  224. return nil, nil, err
  225. }
  226. inbounds[inbound_index].Settings = string(modifiedSettings)
  227. }
  228. }
  229. for traffic_index := range dbClientTraffics {
  230. if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
  231. dbClientTraffics[traffic_index].ExpiryTime = newExpiry
  232. }
  233. }
  234. err = tx.Save(inbounds).Error
  235. if err != nil {
  236. logger.Warning("AddClientTraffic update inbounds ", err)
  237. logger.Error(inbounds)
  238. } else {
  239. for _, ib := range inbounds {
  240. if ib == nil {
  241. continue
  242. }
  243. cs, gcErr := s.GetClients(ib)
  244. if gcErr != nil {
  245. logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
  246. continue
  247. }
  248. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  249. logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
  250. }
  251. }
  252. }
  253. return dbClientTraffics, newExpiryByEmail, nil
  254. }
  255. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  256. // check for time expired
  257. var traffics []*xray.ClientTraffic
  258. now := time.Now().Unix() * 1000
  259. var err, err1 error
  260. // Filter to clients that have at least one local inbound. Using
  261. // client_traffics.inbound_id is wrong: it goes stale after an inbound is
  262. // deleted/recreated and always points to the first inbound the client was
  263. // attached to, so it could be a node inbound even when the client also has
  264. // local inbounds. The email-based join through client_inbounds is authoritative.
  265. err = tx.Model(xray.ClientTraffic{}).
  266. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  267. Where("email IN (?)", tx.Table("client_inbounds ci").
  268. Select("c.email").
  269. Joins("JOIN clients c ON c.id = ci.client_id").
  270. Joins("JOIN inbounds i ON i.id = ci.inbound_id").
  271. Where("i.node_id IS NULL")).
  272. Find(&traffics).Error
  273. if err != nil {
  274. return false, 0, err
  275. }
  276. // return if there is no client to renew
  277. if len(traffics) == 0 {
  278. return false, 0, nil
  279. }
  280. var inbound_ids []int
  281. var inbounds []*model.Inbound
  282. needRestart := false
  283. var clientsToAdd []struct {
  284. protocol string
  285. tag string
  286. client map[string]any
  287. }
  288. // Resolve the inbounds to renew through the client_inbounds link rather than
  289. // client_traffics.inbound_id, which goes stale after an inbound is deleted and
  290. // recreated and would otherwise skip the renew entirely.
  291. renewEmails := make([]string, 0, len(traffics))
  292. for _, traffic := range traffics {
  293. renewEmails = append(renewEmails, traffic.Email)
  294. }
  295. for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
  296. var ids []int
  297. if err = tx.Table("client_inbounds").
  298. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  299. Where("clients.email IN ?", batch).
  300. Distinct().
  301. Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
  302. return false, 0, err
  303. }
  304. inbound_ids = append(inbound_ids, ids...)
  305. }
  306. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  307. // per tick instead of N times across chunk boundaries.
  308. inbound_ids = uniqueInts(inbound_ids)
  309. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  310. // are touched in a single tick.
  311. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  312. var page []*model.Inbound
  313. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  314. return false, 0, err
  315. }
  316. inbounds = append(inbounds, page...)
  317. }
  318. // Index the expired traffics by email so each client is an O(1) lookup
  319. // instead of a linear scan of every expired row (O(clients × expired) per
  320. // inbound, quadratic at scale). Pointers keep the in-place mutation below.
  321. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  322. for i := range traffics {
  323. trafficByEmail[traffics[i].Email] = traffics[i]
  324. }
  325. for inbound_index := range inbounds {
  326. settings := map[string]any{}
  327. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  328. clients, _ := settings["clients"].([]any)
  329. if len(clients) == 0 {
  330. continue
  331. }
  332. for client_index := range clients {
  333. c := clients[client_index].(map[string]any)
  334. email, _ := c["email"].(string)
  335. traffic, ok := trafficByEmail[email]
  336. if !ok {
  337. continue
  338. }
  339. newExpiryTime := traffic.ExpiryTime
  340. for newExpiryTime < now {
  341. newExpiryTime += (int64(traffic.Reset) * 86400000)
  342. }
  343. c["expiryTime"] = newExpiryTime
  344. traffic.ExpiryTime = newExpiryTime
  345. traffic.Down = 0
  346. traffic.Up = 0
  347. if !traffic.Enable {
  348. traffic.Enable = true
  349. c["enable"] = true
  350. clientsToAdd = append(clientsToAdd,
  351. struct {
  352. protocol string
  353. tag string
  354. client map[string]any
  355. }{
  356. protocol: string(inbounds[inbound_index].Protocol),
  357. tag: inbounds[inbound_index].Tag,
  358. client: c,
  359. })
  360. }
  361. clients[client_index] = any(c)
  362. }
  363. settings["clients"] = clients
  364. newSettings, err := json.MarshalIndent(settings, "", " ")
  365. if err != nil {
  366. return false, 0, err
  367. }
  368. inbounds[inbound_index].Settings = string(newSettings)
  369. }
  370. err = tx.Save(inbounds).Error
  371. if err != nil {
  372. return false, 0, err
  373. }
  374. for _, ib := range inbounds {
  375. if ib == nil {
  376. continue
  377. }
  378. cs, gcErr := s.GetClients(ib)
  379. if gcErr != nil {
  380. logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
  381. continue
  382. }
  383. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  384. logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
  385. }
  386. }
  387. err = tx.Save(traffics).Error
  388. if err != nil {
  389. return false, 0, err
  390. }
  391. // A renewed client starts a fresh quota window: drop the cross-panel rows
  392. // too, or the stale pushed totals would re-deplete it immediately.
  393. if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
  394. return false, 0, err
  395. }
  396. if p != nil {
  397. err1 = s.xrayApi.Init(p.GetAPIPort())
  398. if err1 != nil {
  399. return true, int64(len(traffics)), nil
  400. }
  401. for _, clientToAdd := range clientsToAdd {
  402. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  403. if err1 != nil {
  404. needRestart = true
  405. }
  406. }
  407. s.xrayApi.Close()
  408. }
  409. return needRestart, int64(len(traffics)), nil
  410. }
  411. // AddClientStat inserts a per-client accounting row, no-op on email
  412. // conflict. Xray reports traffic per email, so the surviving row acts as
  413. // the shared accumulator for inbounds that re-use the same identity.
  414. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  415. clientTraffic := xray.ClientTraffic{
  416. InboundId: inboundId,
  417. Email: client.Email,
  418. Total: client.TotalGB,
  419. ExpiryTime: client.ExpiryTime,
  420. Enable: client.Enable,
  421. Reset: client.Reset,
  422. }
  423. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  424. Create(&clientTraffic).Error
  425. }
  426. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  427. result := tx.Model(xray.ClientTraffic{}).
  428. Where("email = ?", email).
  429. Updates(map[string]any{
  430. "enable": client.Enable,
  431. "email": client.Email,
  432. "total": client.TotalGB,
  433. "expiry_time": client.ExpiryTime,
  434. "reset": client.Reset,
  435. })
  436. err := result.Error
  437. return err
  438. }
  439. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  440. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{email}); err != nil {
  441. return err
  442. }
  443. if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
  444. return err
  445. }
  446. if err := clearGlobalTraffic(tx, email); err != nil {
  447. return err
  448. }
  449. return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
  450. }
  451. func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
  452. if err := adjustGroupBaselinesForRemovedTraffic(tx, emails); err != nil {
  453. return err
  454. }
  455. const chunk = 400
  456. for start := 0; start < len(emails); start += chunk {
  457. end := min(start+chunk, len(emails))
  458. batch := emails[start:end]
  459. if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
  460. return err
  461. }
  462. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  463. return err
  464. }
  465. if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  466. return err
  467. }
  468. }
  469. return nil
  470. }
  471. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  472. return submitTrafficWrite(func() error {
  473. return database.GetDB().Transaction(func(tx *gorm.DB) error {
  474. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  475. return err
  476. }
  477. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  478. return err
  479. }
  480. if err := tx.Model(xray.ClientTraffic{}).
  481. Where("email = ?", clientEmail).
  482. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error; err != nil {
  483. return err
  484. }
  485. return tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error
  486. })
  487. })
  488. }
  489. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  490. err = submitTrafficWrite(func() error {
  491. var inner error
  492. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  493. return inner
  494. })
  495. return
  496. }
  497. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  498. needRestart := false
  499. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  500. if err != nil {
  501. return false, err
  502. }
  503. if !traffic.Enable {
  504. inbound, err := s.GetInbound(id)
  505. if err != nil {
  506. return false, err
  507. }
  508. clients, err := s.GetClients(inbound)
  509. if err != nil {
  510. return false, err
  511. }
  512. for _, client := range clients {
  513. if client.Email == clientEmail && client.Enable {
  514. rt, push, _, perr := s.nodePushPlan(inbound)
  515. if perr != nil {
  516. return false, perr
  517. }
  518. if !push {
  519. if inbound.NodeID == nil {
  520. needRestart = true
  521. }
  522. break
  523. }
  524. cipher := ""
  525. if string(inbound.Protocol) == "shadowsocks" {
  526. var oldSettings map[string]any
  527. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  528. if err != nil {
  529. return false, err
  530. }
  531. cipher = oldSettings["method"].(string)
  532. }
  533. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  534. "email": client.Email,
  535. "id": client.ID,
  536. "auth": client.Auth,
  537. "security": client.Security,
  538. "flow": client.Flow,
  539. "password": client.Password,
  540. "cipher": cipher,
  541. })
  542. if err1 == nil {
  543. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  544. } else if inbound.NodeID != nil {
  545. logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
  546. } else {
  547. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  548. needRestart = true
  549. }
  550. break
  551. }
  552. }
  553. }
  554. traffic.Up = 0
  555. traffic.Down = 0
  556. traffic.Enable = true
  557. db := database.GetDB()
  558. now := time.Now().UnixMilli()
  559. inbound, err := s.GetInbound(id)
  560. if err != nil {
  561. return false, err
  562. }
  563. if err := db.Transaction(func(tx *gorm.DB) error {
  564. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  565. return err
  566. }
  567. if err := tx.Save(traffic).Error; err != nil {
  568. return err
  569. }
  570. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  571. return err
  572. }
  573. if err := tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  574. return err
  575. }
  576. if err := tx.Model(model.Inbound{}).
  577. Where("id = ?", id).
  578. Update("last_traffic_reset_time", now).Error; err != nil {
  579. return err
  580. }
  581. if inbound != nil && inbound.NodeID != nil {
  582. return (&NodeService{}).MarkNodeDirtyTx(tx, *inbound.NodeID)
  583. }
  584. return nil
  585. }); err != nil {
  586. return false, err
  587. }
  588. if inbound != nil && inbound.NodeID != nil {
  589. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  590. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  591. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  592. }
  593. } else {
  594. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  595. }
  596. }
  597. return needRestart, nil
  598. }
  599. func (s *InboundService) ResetAllTraffics() error {
  600. return submitTrafficWrite(func() error {
  601. return s.resetAllTrafficsLocked()
  602. })
  603. }
  604. func (s *InboundService) resetAllTrafficsLocked() error {
  605. db := database.GetDB()
  606. now := time.Now().UnixMilli()
  607. if err := db.Model(model.Inbound{}).
  608. Where("user_id > ?", 0).
  609. Updates(map[string]any{
  610. "up": 0,
  611. "down": 0,
  612. "last_traffic_reset_time": now,
  613. }).Error; err != nil {
  614. return err
  615. }
  616. nodes, err := (&NodeService{}).GetAll()
  617. if err == nil {
  618. for _, node := range nodes {
  619. if rt, err := runtime.GetManager().RuntimeFor(&node.Id); err == nil {
  620. if e := rt.ResetAllTraffics(context.Background()); e != nil {
  621. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  622. }
  623. }
  624. }
  625. }
  626. return nil
  627. }
  628. func (s *InboundService) ResetInboundTraffic(id int) error {
  629. return submitTrafficWrite(func() error {
  630. db := database.GetDB()
  631. if err := db.Model(model.Inbound{}).
  632. Where("id = ?", id).
  633. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  634. return err
  635. }
  636. inbound, err := s.GetInbound(id)
  637. if err == nil && inbound != nil && inbound.NodeID != nil {
  638. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  639. if e := rt.ResetInboundTraffic(context.Background(), inbound); e != nil {
  640. logger.Warning("ResetInboundTraffic: remote propagation to", rt.Name(), "failed:", e)
  641. }
  642. } else {
  643. logger.Warning("ResetInboundTraffic: runtime lookup failed:", rterr)
  644. }
  645. }
  646. return nil
  647. })
  648. }
  649. func (s *InboundService) DelDepletedClients(id int) (err error) {
  650. db := database.GetDB()
  651. tx := db.Begin()
  652. defer func() {
  653. if err == nil {
  654. tx.Commit()
  655. } else {
  656. tx.Rollback()
  657. }
  658. }()
  659. // Collect depleted emails globally — a shared-email row owned by one
  660. // inbound depletes every sibling that lists the email.
  661. now := time.Now().Unix() * 1000
  662. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  663. var depletedRows []xray.ClientTraffic
  664. err = db.Model(xray.ClientTraffic{}).
  665. Where(depletedClause, now).
  666. Find(&depletedRows).Error
  667. if err != nil {
  668. return err
  669. }
  670. if len(depletedRows) == 0 {
  671. return nil
  672. }
  673. depletedEmails := make(map[string]struct{}, len(depletedRows))
  674. for _, r := range depletedRows {
  675. if r.Email == "" {
  676. continue
  677. }
  678. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  679. }
  680. if len(depletedEmails) == 0 {
  681. return nil
  682. }
  683. var inbounds []*model.Inbound
  684. inboundQuery := db.Model(model.Inbound{})
  685. if id >= 0 {
  686. inboundQuery = inboundQuery.Where("id = ?", id)
  687. }
  688. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  689. return err
  690. }
  691. for _, inbound := range inbounds {
  692. var settings map[string]any
  693. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  694. return err
  695. }
  696. rawClients, ok := settings["clients"].([]any)
  697. if !ok {
  698. continue
  699. }
  700. newClients := make([]any, 0, len(rawClients))
  701. removed := 0
  702. for _, client := range rawClients {
  703. c, ok := client.(map[string]any)
  704. if !ok {
  705. newClients = append(newClients, client)
  706. continue
  707. }
  708. email, _ := c["email"].(string)
  709. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  710. removed++
  711. continue
  712. }
  713. newClients = append(newClients, client)
  714. }
  715. if removed == 0 {
  716. continue
  717. }
  718. if len(newClients) == 0 {
  719. _, _ = s.DelInbound(inbound.Id)
  720. continue
  721. }
  722. settings["clients"] = newClients
  723. ns, mErr := json.MarshalIndent(settings, "", " ")
  724. if mErr != nil {
  725. return mErr
  726. }
  727. inbound.Settings = string(ns)
  728. if err = tx.Save(inbound).Error; err != nil {
  729. return err
  730. }
  731. survivingClients, gcErr := s.GetClients(inbound)
  732. if gcErr != nil {
  733. err = gcErr
  734. return err
  735. }
  736. if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
  737. return err
  738. }
  739. }
  740. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  741. // no out-of-scope inbound still references the email.
  742. if id < 0 {
  743. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  744. return err
  745. }
  746. emails := make([]string, 0, len(depletedEmails))
  747. for e := range depletedEmails {
  748. emails = append(emails, e)
  749. }
  750. var stillReferenced []string
  751. emailExpr := database.JSONFieldText("client.value", "email")
  752. stillQuery := fmt.Sprintf(
  753. "SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
  754. emailExpr,
  755. database.JSONClientsFromInbound(),
  756. emailExpr,
  757. )
  758. if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
  759. return err
  760. }
  761. stillSet := make(map[string]struct{}, len(stillReferenced))
  762. for _, e := range stillReferenced {
  763. stillSet[e] = struct{}{}
  764. }
  765. toDelete := make([]string, 0, len(emails))
  766. for _, e := range emails {
  767. if _, kept := stillSet[e]; !kept {
  768. toDelete = append(toDelete, e)
  769. }
  770. }
  771. if len(toDelete) > 0 {
  772. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  773. return err
  774. }
  775. }
  776. return nil
  777. }
  778. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  779. db := database.GetDB()
  780. var inbounds []*model.Inbound
  781. // Retrieve inbounds where settings contain the given tgId
  782. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  783. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  784. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  785. return nil, err
  786. }
  787. var emails []string
  788. for _, inbound := range inbounds {
  789. clients, err := s.GetClients(inbound)
  790. if err != nil {
  791. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  792. continue
  793. }
  794. for _, client := range clients {
  795. if client.TgID == tgId {
  796. emails = append(emails, client.Email)
  797. }
  798. }
  799. }
  800. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  801. // account owns thousands of clients across inbounds.
  802. uniqEmails := uniqueNonEmptyStrings(emails)
  803. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  804. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  805. var page []*xray.ClientTraffic
  806. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  807. if errors.Is(err, gorm.ErrRecordNotFound) {
  808. continue
  809. }
  810. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  811. return nil, err
  812. }
  813. traffics = append(traffics, page...)
  814. }
  815. if len(traffics) == 0 {
  816. logger.Warning("No ClientTraffic records found for emails:", emails)
  817. return nil, nil
  818. }
  819. // Populate UUID and other client data for each traffic record
  820. for i := range traffics {
  821. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  822. traffics[i].Enable = client.Enable
  823. traffics[i].UUID = client.ID
  824. traffics[i].SubId = client.SubID
  825. }
  826. }
  827. return traffics, nil
  828. }
  829. // BumpClientsLastOnline sets client_traffics.last_online to now for the given
  830. // emails. Used in online-API mode for clients that hold a live connection but
  831. // moved no bytes this poll — the traffic path (addClientTraffic) only bumps
  832. // last_online on a non-zero delta, so idle-but-connected clients would
  833. // otherwise show a stale "last online" while being reported online.
  834. func (s *InboundService) BumpClientsLastOnline(emails []string) error {
  835. uniq := uniqueNonEmptyStrings(emails)
  836. if len(uniq) == 0 {
  837. return nil
  838. }
  839. now := time.Now().UnixMilli()
  840. return submitTrafficWrite(func() error {
  841. db := database.GetDB()
  842. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  843. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil {
  844. return err
  845. }
  846. }
  847. return nil
  848. })
  849. }
  850. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  851. uniq := uniqueNonEmptyStrings(emails)
  852. if len(uniq) == 0 {
  853. return nil, nil
  854. }
  855. db := database.GetDB()
  856. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  857. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  858. var page []*xray.ClientTraffic
  859. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  860. return nil, err
  861. }
  862. traffics = append(traffics, page...)
  863. }
  864. overlayGlobalTraffic(db, traffics)
  865. return traffics, nil
  866. }
  867. // GetAllClientTraffics returns the full set of client_traffics rows so the
  868. // websocket broadcasters can ship a complete snapshot every cycle. A pure
  869. // delta path silently dropped the per-client section whenever no client moved
  870. // bytes in the cycle or a node sync failed, leaving client rows in the UI
  871. // stuck at stale numbers — so small installs broadcast this snapshot, and only
  872. // above the traffic job's snapshot threshold (where the marshaled snapshot
  873. // would exceed the hub's payload cap and be dropped wholesale) does the job
  874. // fall back to active-row deltas.
  875. func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
  876. db := database.GetDB()
  877. var traffics []*xray.ClientTraffic
  878. if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
  879. return nil, err
  880. }
  881. overlayGlobalTraffic(db, traffics)
  882. return traffics, nil
  883. }
  884. func (s *InboundService) CountClientTraffics() (int64, error) {
  885. db := database.GetDB()
  886. var count int64
  887. err := db.Model(xray.ClientTraffic{}).Count(&count).Error
  888. return count, err
  889. }
  890. type InboundTrafficSummary struct {
  891. Id int `json:"id"`
  892. Up int64 `json:"up"`
  893. Down int64 `json:"down"`
  894. Total int64 `json:"total"`
  895. Enable bool `json:"enable"`
  896. }
  897. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  898. db := database.GetDB()
  899. var summaries []InboundTrafficSummary
  900. if err := db.Model(&model.Inbound{}).
  901. Select("id, up, down, total, enable").
  902. Find(&summaries).Error; err != nil {
  903. return nil, err
  904. }
  905. return summaries, nil
  906. }
  907. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  908. db := database.GetDB()
  909. var traffics []*xray.ClientTraffic
  910. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil {
  911. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  912. return nil, err
  913. }
  914. if len(traffics) == 0 {
  915. return nil, nil
  916. }
  917. overlayGlobalTraffic(db, traffics)
  918. t := traffics[0]
  919. if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
  920. c := rec.ToClient()
  921. t.UUID = c.ID
  922. t.SubId = c.SubID
  923. return t, nil
  924. }
  925. t2, client, err := s.GetClientByEmail(email)
  926. if err != nil {
  927. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  928. return nil, err
  929. }
  930. if t2 != nil && client != nil {
  931. t2.UUID = client.ID
  932. t2.SubId = client.SubID
  933. return t2, nil
  934. }
  935. return nil, nil
  936. }
  937. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  938. return submitTrafficWrite(func() error {
  939. db := database.GetDB()
  940. err := db.Model(xray.ClientTraffic{}).
  941. Where("email = ?", email).
  942. Updates(map[string]any{
  943. "up": upload,
  944. "down": download,
  945. }).Error
  946. if err != nil {
  947. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  948. }
  949. return err
  950. })
  951. }
  952. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  953. db := database.GetDB()
  954. inbound := &model.Inbound{}
  955. traffic = &xray.ClientTraffic{}
  956. // Search for inbound settings that contain the query
  957. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  958. if err != nil {
  959. if errors.Is(err, gorm.ErrRecordNotFound) {
  960. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  961. return nil, err
  962. }
  963. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  964. return nil, err
  965. }
  966. traffic.InboundId = inbound.Id
  967. // Unmarshal settings to get clients
  968. settings := map[string][]model.Client{}
  969. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  970. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  971. return nil, err
  972. }
  973. clients := settings["clients"]
  974. for _, client := range clients {
  975. if (client.ID == query || client.Password == query) && client.Email != "" {
  976. traffic.Email = client.Email
  977. break
  978. }
  979. }
  980. if traffic.Email == "" {
  981. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  982. return nil, gorm.ErrRecordNotFound
  983. }
  984. // Retrieve ClientTraffic based on the found email
  985. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  986. if err != nil {
  987. if errors.Is(err, gorm.ErrRecordNotFound) {
  988. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  989. return nil, err
  990. }
  991. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  992. return nil, err
  993. }
  994. return traffic, nil
  995. }