inbound_traffic.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. "gorm.io/gorm/clause"
  16. )
  17. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  18. var disabledNodeIDs []int
  19. err = submitTrafficWrite(func() error {
  20. var inner error
  21. needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  22. return inner
  23. })
  24. if err == nil && len(disabledNodeIDs) > 0 {
  25. s.restartRemoteNodesOnDisable(disabledNodeIDs)
  26. }
  27. return
  28. }
  29. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) {
  30. var err error
  31. db := database.GetDB()
  32. tx := db.Begin()
  33. defer func() {
  34. if err != nil {
  35. tx.Rollback()
  36. } else {
  37. tx.Commit()
  38. }
  39. }()
  40. err = s.addInboundTraffic(tx, inboundTraffics)
  41. if err != nil {
  42. return false, false, nil, err
  43. }
  44. err = s.addClientTraffic(tx, clientTraffics)
  45. if err != nil {
  46. return false, false, nil, err
  47. }
  48. needRestart0, count, err := s.autoRenewClients(tx)
  49. if err != nil {
  50. logger.Warning("Error in renew clients:", err)
  51. } else if count > 0 {
  52. logger.Debugf("%v clients renewed", count)
  53. }
  54. disabledClientsCount := int64(0)
  55. needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx)
  56. if err != nil {
  57. logger.Warning("Error in disabling invalid clients:", err)
  58. } else if count > 0 {
  59. logger.Debugf("%v clients disabled", count)
  60. disabledClientsCount = count
  61. }
  62. needRestart2, count, err := s.disableInvalidInbounds(tx)
  63. if err != nil {
  64. logger.Warning("Error in disabling invalid inbounds:", err)
  65. } else if count > 0 {
  66. logger.Debugf("%v inbounds disabled", count)
  67. }
  68. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil
  69. }
  70. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  71. if len(traffics) == 0 {
  72. return nil
  73. }
  74. var err error
  75. for _, traffic := range traffics {
  76. if traffic.IsInbound {
  77. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  78. Updates(map[string]any{
  79. "up": gorm.Expr("up + ?", traffic.Up),
  80. "down": gorm.Expr("down + ?", traffic.Down),
  81. }).Error
  82. if err != nil {
  83. return err
  84. }
  85. }
  86. }
  87. return nil
  88. }
  89. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  90. if len(traffics) == 0 {
  91. return nil
  92. }
  93. emails := make([]string, 0, len(traffics))
  94. for _, traffic := range traffics {
  95. emails = append(emails, traffic.Email)
  96. }
  97. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  98. // Match purely by email. client_traffics is email-keyed (one shared row per
  99. // email regardless of how many inbounds the client is attached to), and these
  100. // emails come from the local xray's report, so they always belong to a client
  101. // attached to a local inbound. The old `inbound_id NOT IN (node inbounds)`
  102. // filter dropped the local traffic of a client attached to both a node and the
  103. // mother inbound whenever the node inbound happened to be attached first — its
  104. // shared row then carried the node inbound's id (AddClientStat uses OnConflict
  105. // DoNothing and never refreshes it), so the local poll skipped it entirely.
  106. err = tx.Model(xray.ClientTraffic{}).
  107. Where("email IN (?)", emails).
  108. Find(&dbClientTraffics).Error
  109. if err != nil {
  110. return err
  111. }
  112. // Avoid empty slice error
  113. if len(dbClientTraffics) == 0 {
  114. return nil
  115. }
  116. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  117. if err != nil {
  118. return err
  119. }
  120. // Index by email for O(N) merge.
  121. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  122. for i := range traffics {
  123. if traffics[i] != nil {
  124. trafficByEmail[traffics[i].Email] = traffics[i]
  125. }
  126. }
  127. now := time.Now().UnixMilli()
  128. // Use atomic per-row UPDATE instead of read-modify-write Save. tx.Save
  129. // issues UPDATEs in slice order, which varies between concurrent callers;
  130. // on PostgreSQL two transactions locking the same rows in opposite order
  131. // deadlock. An atomic "SET up = up + ?" never holds a row lock across a
  132. // subsequent lock acquisition, so concurrent writers cannot deadlock.
  133. for _, ct := range dbClientTraffics {
  134. t, ok := trafficByEmail[ct.Email]
  135. if !ok || (t.Up == 0 && t.Down == 0) {
  136. continue
  137. }
  138. if err = tx.Exec(
  139. fmt.Sprintf(
  140. `UPDATE client_traffics SET up = up + ?, down = down + ?, last_online = %s WHERE email = ?`,
  141. database.GreatestExpr("last_online", "?"),
  142. ),
  143. t.Up, t.Down, now, ct.Email,
  144. ).Error; err != nil {
  145. logger.Warning("AddClientTraffic update data ", err)
  146. }
  147. }
  148. // adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
  149. // deadline) in-memory. Persist that conversion now since the traffic UPDATE
  150. // above only touches up/down/last_online.
  151. for _, ct := range dbClientTraffics {
  152. if ct.ExpiryTime > 0 {
  153. if err = tx.Exec(
  154. `UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
  155. ct.ExpiryTime, ct.Email,
  156. ).Error; err != nil {
  157. logger.Warning("AddClientTraffic update expiry_time ", err)
  158. }
  159. }
  160. }
  161. return nil
  162. }
  163. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  164. now := time.Now().UnixMilli()
  165. // "Start After First Use" stores a negative expiry (the duration). On the
  166. // first traffic tick it becomes an absolute deadline of now+duration. Compute
  167. // it once per email so every inbound the client is attached to lands on the
  168. // same value (recomputing per inbound would skip all but the first one).
  169. newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
  170. for traffic_index := range dbClientTraffics {
  171. if dbClientTraffics[traffic_index].ExpiryTime < 0 {
  172. newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
  173. }
  174. }
  175. if len(newExpiryByEmail) == 0 {
  176. return dbClientTraffics, nil
  177. }
  178. delayedEmails := make([]string, 0, len(newExpiryByEmail))
  179. for email := range newExpiryByEmail {
  180. delayedEmails = append(delayedEmails, email)
  181. }
  182. // Resolve the owning inbounds through the client_inbounds link, which is
  183. // authoritative. client_traffics.inbound_id goes stale when an inbound is
  184. // deleted and recreated, which would leave the negative expiry unconverted.
  185. var inboundIds []int
  186. err := tx.Table("client_inbounds").
  187. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  188. Where("clients.email IN (?)", delayedEmails).
  189. Distinct().
  190. Pluck("client_inbounds.inbound_id", &inboundIds).Error
  191. if err != nil {
  192. return nil, err
  193. }
  194. if len(inboundIds) == 0 {
  195. return dbClientTraffics, nil
  196. }
  197. var inbounds []*model.Inbound
  198. err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  199. if err != nil {
  200. return nil, err
  201. }
  202. for inbound_index := range inbounds {
  203. settings := map[string]any{}
  204. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  205. clients, ok := settings["clients"].([]any)
  206. if ok {
  207. var newClients []any
  208. for client_index := range clients {
  209. c := clients[client_index].(map[string]any)
  210. email, _ := c["email"].(string)
  211. if newExpiry, ok := newExpiryByEmail[email]; ok {
  212. c["expiryTime"] = newExpiry
  213. c["updated_at"] = now
  214. }
  215. if _, ok := c["created_at"]; !ok {
  216. c["created_at"] = now
  217. }
  218. if _, ok := c["updated_at"]; !ok {
  219. c["updated_at"] = now
  220. }
  221. newClients = append(newClients, any(c))
  222. }
  223. settings["clients"] = newClients
  224. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  225. if err != nil {
  226. return nil, err
  227. }
  228. inbounds[inbound_index].Settings = string(modifiedSettings)
  229. }
  230. }
  231. for traffic_index := range dbClientTraffics {
  232. if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
  233. dbClientTraffics[traffic_index].ExpiryTime = newExpiry
  234. }
  235. }
  236. err = tx.Save(inbounds).Error
  237. if err != nil {
  238. logger.Warning("AddClientTraffic update inbounds ", err)
  239. logger.Error(inbounds)
  240. } else {
  241. for _, ib := range inbounds {
  242. if ib == nil {
  243. continue
  244. }
  245. cs, gcErr := s.GetClients(ib)
  246. if gcErr != nil {
  247. logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
  248. continue
  249. }
  250. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  251. logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
  252. }
  253. }
  254. }
  255. return dbClientTraffics, nil
  256. }
  257. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  258. // check for time expired
  259. var traffics []*xray.ClientTraffic
  260. now := time.Now().Unix() * 1000
  261. var err, err1 error
  262. // Filter to clients that have at least one local inbound. Using
  263. // client_traffics.inbound_id is wrong: it goes stale after an inbound is
  264. // deleted/recreated and always points to the first inbound the client was
  265. // attached to, so it could be a node inbound even when the client also has
  266. // local inbounds. The email-based join through client_inbounds is authoritative.
  267. err = tx.Model(xray.ClientTraffic{}).
  268. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  269. Where("email IN (?)", tx.Table("client_inbounds ci").
  270. Select("c.email").
  271. Joins("JOIN clients c ON c.id = ci.client_id").
  272. Joins("JOIN inbounds i ON i.id = ci.inbound_id").
  273. Where("i.node_id IS NULL")).
  274. Find(&traffics).Error
  275. if err != nil {
  276. return false, 0, err
  277. }
  278. // return if there is no client to renew
  279. if len(traffics) == 0 {
  280. return false, 0, nil
  281. }
  282. var inbound_ids []int
  283. var inbounds []*model.Inbound
  284. needRestart := false
  285. var clientsToAdd []struct {
  286. protocol string
  287. tag string
  288. client map[string]any
  289. }
  290. // Resolve the inbounds to renew through the client_inbounds link rather than
  291. // client_traffics.inbound_id, which goes stale after an inbound is deleted and
  292. // recreated and would otherwise skip the renew entirely.
  293. renewEmails := make([]string, 0, len(traffics))
  294. for _, traffic := range traffics {
  295. renewEmails = append(renewEmails, traffic.Email)
  296. }
  297. for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
  298. var ids []int
  299. if err = tx.Table("client_inbounds").
  300. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  301. Where("clients.email IN ?", batch).
  302. Distinct().
  303. Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
  304. return false, 0, err
  305. }
  306. inbound_ids = append(inbound_ids, ids...)
  307. }
  308. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  309. // per tick instead of N times across chunk boundaries.
  310. inbound_ids = uniqueInts(inbound_ids)
  311. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  312. // are touched in a single tick.
  313. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  314. var page []*model.Inbound
  315. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  316. return false, 0, err
  317. }
  318. inbounds = append(inbounds, page...)
  319. }
  320. // Index the expired traffics by email so each client is an O(1) lookup
  321. // instead of a linear scan of every expired row (O(clients × expired) per
  322. // inbound, quadratic at scale). Pointers keep the in-place mutation below.
  323. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  324. for i := range traffics {
  325. trafficByEmail[traffics[i].Email] = traffics[i]
  326. }
  327. for inbound_index := range inbounds {
  328. settings := map[string]any{}
  329. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  330. clients, _ := settings["clients"].([]any)
  331. if len(clients) == 0 {
  332. continue
  333. }
  334. for client_index := range clients {
  335. c := clients[client_index].(map[string]any)
  336. email, _ := c["email"].(string)
  337. traffic, ok := trafficByEmail[email]
  338. if !ok {
  339. continue
  340. }
  341. newExpiryTime := traffic.ExpiryTime
  342. for newExpiryTime < now {
  343. newExpiryTime += (int64(traffic.Reset) * 86400000)
  344. }
  345. c["expiryTime"] = newExpiryTime
  346. traffic.ExpiryTime = newExpiryTime
  347. traffic.Down = 0
  348. traffic.Up = 0
  349. if !traffic.Enable {
  350. traffic.Enable = true
  351. c["enable"] = true
  352. clientsToAdd = append(clientsToAdd,
  353. struct {
  354. protocol string
  355. tag string
  356. client map[string]any
  357. }{
  358. protocol: string(inbounds[inbound_index].Protocol),
  359. tag: inbounds[inbound_index].Tag,
  360. client: c,
  361. })
  362. }
  363. clients[client_index] = any(c)
  364. }
  365. settings["clients"] = clients
  366. newSettings, err := json.MarshalIndent(settings, "", " ")
  367. if err != nil {
  368. return false, 0, err
  369. }
  370. inbounds[inbound_index].Settings = string(newSettings)
  371. }
  372. err = tx.Save(inbounds).Error
  373. if err != nil {
  374. return false, 0, err
  375. }
  376. for _, ib := range inbounds {
  377. if ib == nil {
  378. continue
  379. }
  380. cs, gcErr := s.GetClients(ib)
  381. if gcErr != nil {
  382. logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
  383. continue
  384. }
  385. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  386. logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
  387. }
  388. }
  389. err = tx.Save(traffics).Error
  390. if err != nil {
  391. return false, 0, err
  392. }
  393. // A renewed client starts a fresh quota window: drop the cross-panel rows
  394. // too, or the stale pushed totals would re-deplete it immediately.
  395. if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
  396. return false, 0, err
  397. }
  398. if p != nil {
  399. err1 = s.xrayApi.Init(p.GetAPIPort())
  400. if err1 != nil {
  401. return true, int64(len(traffics)), nil
  402. }
  403. for _, clientToAdd := range clientsToAdd {
  404. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  405. if err1 != nil {
  406. needRestart = true
  407. }
  408. }
  409. s.xrayApi.Close()
  410. }
  411. return needRestart, int64(len(traffics)), nil
  412. }
  413. // AddClientStat inserts a per-client accounting row, no-op on email
  414. // conflict. Xray reports traffic per email, so the surviving row acts as
  415. // the shared accumulator for inbounds that re-use the same identity.
  416. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  417. clientTraffic := xray.ClientTraffic{
  418. InboundId: inboundId,
  419. Email: client.Email,
  420. Total: client.TotalGB,
  421. ExpiryTime: client.ExpiryTime,
  422. Enable: client.Enable,
  423. Reset: client.Reset,
  424. }
  425. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  426. Create(&clientTraffic).Error
  427. }
  428. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  429. result := tx.Model(xray.ClientTraffic{}).
  430. Where("email = ?", email).
  431. Updates(map[string]any{
  432. "enable": client.Enable,
  433. "email": client.Email,
  434. "total": client.TotalGB,
  435. "expiry_time": client.ExpiryTime,
  436. "reset": client.Reset,
  437. })
  438. err := result.Error
  439. return err
  440. }
  441. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  442. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{email}); err != nil {
  443. return err
  444. }
  445. if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
  446. return err
  447. }
  448. if err := clearGlobalTraffic(tx, email); err != nil {
  449. return err
  450. }
  451. return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
  452. }
  453. func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
  454. if err := adjustGroupBaselinesForRemovedTraffic(tx, emails); err != nil {
  455. return err
  456. }
  457. const chunk = 400
  458. for start := 0; start < len(emails); start += chunk {
  459. end := min(start+chunk, len(emails))
  460. batch := emails[start:end]
  461. if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
  462. return err
  463. }
  464. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  465. return err
  466. }
  467. if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  468. return err
  469. }
  470. }
  471. return nil
  472. }
  473. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  474. return submitTrafficWrite(func() error {
  475. return database.GetDB().Transaction(func(tx *gorm.DB) error {
  476. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  477. return err
  478. }
  479. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  480. return err
  481. }
  482. if err := tx.Model(xray.ClientTraffic{}).
  483. Where("email = ?", clientEmail).
  484. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error; err != nil {
  485. return err
  486. }
  487. return tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error
  488. })
  489. })
  490. }
  491. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  492. err = submitTrafficWrite(func() error {
  493. var inner error
  494. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  495. return inner
  496. })
  497. return
  498. }
  499. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  500. needRestart := false
  501. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  502. if err != nil {
  503. return false, err
  504. }
  505. if !traffic.Enable {
  506. inbound, err := s.GetInbound(id)
  507. if err != nil {
  508. return false, err
  509. }
  510. clients, err := s.GetClients(inbound)
  511. if err != nil {
  512. return false, err
  513. }
  514. for _, client := range clients {
  515. if client.Email == clientEmail && client.Enable {
  516. rt, push, _, perr := s.nodePushPlan(inbound)
  517. if perr != nil {
  518. return false, perr
  519. }
  520. if !push {
  521. if inbound.NodeID == nil {
  522. needRestart = true
  523. }
  524. break
  525. }
  526. cipher := ""
  527. if string(inbound.Protocol) == "shadowsocks" {
  528. var oldSettings map[string]any
  529. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  530. if err != nil {
  531. return false, err
  532. }
  533. cipher = oldSettings["method"].(string)
  534. }
  535. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  536. "email": client.Email,
  537. "id": client.ID,
  538. "auth": client.Auth,
  539. "security": client.Security,
  540. "flow": client.Flow,
  541. "password": client.Password,
  542. "cipher": cipher,
  543. })
  544. if err1 == nil {
  545. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  546. } else if inbound.NodeID != nil {
  547. logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
  548. } else {
  549. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  550. needRestart = true
  551. }
  552. break
  553. }
  554. }
  555. }
  556. traffic.Up = 0
  557. traffic.Down = 0
  558. traffic.Enable = true
  559. db := database.GetDB()
  560. now := time.Now().UnixMilli()
  561. inbound, err := s.GetInbound(id)
  562. if err != nil {
  563. return false, err
  564. }
  565. if err := db.Transaction(func(tx *gorm.DB) error {
  566. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  567. return err
  568. }
  569. if err := tx.Save(traffic).Error; err != nil {
  570. return err
  571. }
  572. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  573. return err
  574. }
  575. if err := tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  576. return err
  577. }
  578. if err := tx.Model(model.Inbound{}).
  579. Where("id = ?", id).
  580. Update("last_traffic_reset_time", now).Error; err != nil {
  581. return err
  582. }
  583. if inbound != nil && inbound.NodeID != nil {
  584. return (&NodeService{}).MarkNodeDirtyTx(tx, *inbound.NodeID)
  585. }
  586. return nil
  587. }); err != nil {
  588. return false, err
  589. }
  590. if inbound != nil && inbound.NodeID != nil {
  591. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  592. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  593. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  594. }
  595. } else {
  596. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  597. }
  598. }
  599. return needRestart, nil
  600. }
  601. func (s *InboundService) ResetAllTraffics() error {
  602. return submitTrafficWrite(func() error {
  603. return s.resetAllTrafficsLocked()
  604. })
  605. }
  606. func (s *InboundService) resetAllTrafficsLocked() error {
  607. db := database.GetDB()
  608. now := time.Now().UnixMilli()
  609. if err := db.Model(model.Inbound{}).
  610. Where("user_id > ?", 0).
  611. Updates(map[string]any{
  612. "up": 0,
  613. "down": 0,
  614. "last_traffic_reset_time": now,
  615. }).Error; err != nil {
  616. return err
  617. }
  618. nodes, err := (&NodeService{}).GetAll()
  619. if err == nil {
  620. for _, node := range nodes {
  621. if rt, err := runtime.GetManager().RuntimeFor(&node.Id); err == nil {
  622. if e := rt.ResetAllTraffics(context.Background()); e != nil {
  623. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  624. }
  625. }
  626. }
  627. }
  628. return nil
  629. }
  630. func (s *InboundService) ResetInboundTraffic(id int) error {
  631. return submitTrafficWrite(func() error {
  632. db := database.GetDB()
  633. if err := db.Model(model.Inbound{}).
  634. Where("id = ?", id).
  635. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  636. return err
  637. }
  638. inbound, err := s.GetInbound(id)
  639. if err == nil && inbound != nil && inbound.NodeID != nil {
  640. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  641. if e := rt.ResetInboundTraffic(context.Background(), inbound); e != nil {
  642. logger.Warning("ResetInboundTraffic: remote propagation to", rt.Name(), "failed:", e)
  643. }
  644. } else {
  645. logger.Warning("ResetInboundTraffic: runtime lookup failed:", rterr)
  646. }
  647. }
  648. return nil
  649. })
  650. }
  651. func (s *InboundService) DelDepletedClients(id int) (err error) {
  652. db := database.GetDB()
  653. tx := db.Begin()
  654. defer func() {
  655. if err == nil {
  656. tx.Commit()
  657. } else {
  658. tx.Rollback()
  659. }
  660. }()
  661. // Collect depleted emails globally — a shared-email row owned by one
  662. // inbound depletes every sibling that lists the email.
  663. now := time.Now().Unix() * 1000
  664. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  665. var depletedRows []xray.ClientTraffic
  666. err = db.Model(xray.ClientTraffic{}).
  667. Where(depletedClause, now).
  668. Find(&depletedRows).Error
  669. if err != nil {
  670. return err
  671. }
  672. if len(depletedRows) == 0 {
  673. return nil
  674. }
  675. depletedEmails := make(map[string]struct{}, len(depletedRows))
  676. for _, r := range depletedRows {
  677. if r.Email == "" {
  678. continue
  679. }
  680. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  681. }
  682. if len(depletedEmails) == 0 {
  683. return nil
  684. }
  685. var inbounds []*model.Inbound
  686. inboundQuery := db.Model(model.Inbound{})
  687. if id >= 0 {
  688. inboundQuery = inboundQuery.Where("id = ?", id)
  689. }
  690. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  691. return err
  692. }
  693. for _, inbound := range inbounds {
  694. var settings map[string]any
  695. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  696. return err
  697. }
  698. rawClients, ok := settings["clients"].([]any)
  699. if !ok {
  700. continue
  701. }
  702. newClients := make([]any, 0, len(rawClients))
  703. removed := 0
  704. for _, client := range rawClients {
  705. c, ok := client.(map[string]any)
  706. if !ok {
  707. newClients = append(newClients, client)
  708. continue
  709. }
  710. email, _ := c["email"].(string)
  711. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  712. removed++
  713. continue
  714. }
  715. newClients = append(newClients, client)
  716. }
  717. if removed == 0 {
  718. continue
  719. }
  720. if len(newClients) == 0 {
  721. _, _ = s.DelInbound(inbound.Id)
  722. continue
  723. }
  724. settings["clients"] = newClients
  725. ns, mErr := json.MarshalIndent(settings, "", " ")
  726. if mErr != nil {
  727. return mErr
  728. }
  729. inbound.Settings = string(ns)
  730. if err = tx.Save(inbound).Error; err != nil {
  731. return err
  732. }
  733. survivingClients, gcErr := s.GetClients(inbound)
  734. if gcErr != nil {
  735. err = gcErr
  736. return err
  737. }
  738. if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
  739. return err
  740. }
  741. }
  742. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  743. // no out-of-scope inbound still references the email.
  744. if id < 0 {
  745. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  746. return err
  747. }
  748. emails := make([]string, 0, len(depletedEmails))
  749. for e := range depletedEmails {
  750. emails = append(emails, e)
  751. }
  752. var stillReferenced []string
  753. emailExpr := database.JSONFieldText("client.value", "email")
  754. stillQuery := fmt.Sprintf(
  755. "SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
  756. emailExpr,
  757. database.JSONClientsFromInbound(),
  758. emailExpr,
  759. )
  760. if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
  761. return err
  762. }
  763. stillSet := make(map[string]struct{}, len(stillReferenced))
  764. for _, e := range stillReferenced {
  765. stillSet[e] = struct{}{}
  766. }
  767. toDelete := make([]string, 0, len(emails))
  768. for _, e := range emails {
  769. if _, kept := stillSet[e]; !kept {
  770. toDelete = append(toDelete, e)
  771. }
  772. }
  773. if len(toDelete) > 0 {
  774. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  775. return err
  776. }
  777. }
  778. return nil
  779. }
  780. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  781. db := database.GetDB()
  782. var inbounds []*model.Inbound
  783. // Retrieve inbounds where settings contain the given tgId
  784. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  785. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  786. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  787. return nil, err
  788. }
  789. var emails []string
  790. for _, inbound := range inbounds {
  791. clients, err := s.GetClients(inbound)
  792. if err != nil {
  793. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  794. continue
  795. }
  796. for _, client := range clients {
  797. if client.TgID == tgId {
  798. emails = append(emails, client.Email)
  799. }
  800. }
  801. }
  802. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  803. // account owns thousands of clients across inbounds.
  804. uniqEmails := uniqueNonEmptyStrings(emails)
  805. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  806. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  807. var page []*xray.ClientTraffic
  808. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  809. if errors.Is(err, gorm.ErrRecordNotFound) {
  810. continue
  811. }
  812. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  813. return nil, err
  814. }
  815. traffics = append(traffics, page...)
  816. }
  817. if len(traffics) == 0 {
  818. logger.Warning("No ClientTraffic records found for emails:", emails)
  819. return nil, nil
  820. }
  821. // Populate UUID and other client data for each traffic record
  822. for i := range traffics {
  823. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  824. traffics[i].Enable = client.Enable
  825. traffics[i].UUID = client.ID
  826. traffics[i].SubId = client.SubID
  827. }
  828. }
  829. return traffics, nil
  830. }
  831. // BumpClientsLastOnline sets client_traffics.last_online to now for the given
  832. // emails. Used in online-API mode for clients that hold a live connection but
  833. // moved no bytes this poll — the traffic path (addClientTraffic) only bumps
  834. // last_online on a non-zero delta, so idle-but-connected clients would
  835. // otherwise show a stale "last online" while being reported online.
  836. func (s *InboundService) BumpClientsLastOnline(emails []string) error {
  837. uniq := uniqueNonEmptyStrings(emails)
  838. if len(uniq) == 0 {
  839. return nil
  840. }
  841. now := time.Now().UnixMilli()
  842. return submitTrafficWrite(func() error {
  843. db := database.GetDB()
  844. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  845. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil {
  846. return err
  847. }
  848. }
  849. return nil
  850. })
  851. }
  852. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  853. uniq := uniqueNonEmptyStrings(emails)
  854. if len(uniq) == 0 {
  855. return nil, nil
  856. }
  857. db := database.GetDB()
  858. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  859. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  860. var page []*xray.ClientTraffic
  861. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  862. return nil, err
  863. }
  864. traffics = append(traffics, page...)
  865. }
  866. return traffics, nil
  867. }
  868. // GetAllClientTraffics returns the full set of client_traffics rows so the
  869. // websocket broadcasters can ship a complete snapshot every cycle. The old
  870. // delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
  871. // the per-client section whenever no client moved bytes in the cycle or a
  872. // node sync failed, leaving client rows in the UI stuck at stale numbers.
  873. func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
  874. db := database.GetDB()
  875. var traffics []*xray.ClientTraffic
  876. if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
  877. return nil, err
  878. }
  879. overlayGlobalTraffic(db, traffics)
  880. return traffics, nil
  881. }
  882. type InboundTrafficSummary struct {
  883. Id int `json:"id"`
  884. Up int64 `json:"up"`
  885. Down int64 `json:"down"`
  886. Total int64 `json:"total"`
  887. Enable bool `json:"enable"`
  888. }
  889. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  890. db := database.GetDB()
  891. var summaries []InboundTrafficSummary
  892. if err := db.Model(&model.Inbound{}).
  893. Select("id, up, down, total, enable").
  894. Find(&summaries).Error; err != nil {
  895. return nil, err
  896. }
  897. return summaries, nil
  898. }
  899. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  900. db := database.GetDB()
  901. var traffics []*xray.ClientTraffic
  902. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil {
  903. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  904. return nil, err
  905. }
  906. if len(traffics) == 0 {
  907. return nil, nil
  908. }
  909. overlayGlobalTraffic(db, traffics)
  910. t := traffics[0]
  911. if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
  912. c := rec.ToClient()
  913. t.UUID = c.ID
  914. t.SubId = c.SubID
  915. return t, nil
  916. }
  917. t2, client, err := s.GetClientByEmail(email)
  918. if err != nil {
  919. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  920. return nil, err
  921. }
  922. if t2 != nil && client != nil {
  923. t2.UUID = client.ID
  924. t2.SubId = client.SubID
  925. return t2, nil
  926. }
  927. return nil, nil
  928. }
  929. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  930. return submitTrafficWrite(func() error {
  931. db := database.GetDB()
  932. err := db.Model(xray.ClientTraffic{}).
  933. Where("email = ?", email).
  934. Updates(map[string]any{
  935. "up": upload,
  936. "down": download,
  937. }).Error
  938. if err != nil {
  939. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  940. }
  941. return err
  942. })
  943. }
  944. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  945. db := database.GetDB()
  946. inbound := &model.Inbound{}
  947. traffic = &xray.ClientTraffic{}
  948. // Search for inbound settings that contain the query
  949. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  950. if err != nil {
  951. if errors.Is(err, gorm.ErrRecordNotFound) {
  952. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  953. return nil, err
  954. }
  955. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  956. return nil, err
  957. }
  958. traffic.InboundId = inbound.Id
  959. // Unmarshal settings to get clients
  960. settings := map[string][]model.Client{}
  961. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  962. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  963. return nil, err
  964. }
  965. clients := settings["clients"]
  966. for _, client := range clients {
  967. if (client.ID == query || client.Password == query) && client.Email != "" {
  968. traffic.Email = client.Email
  969. break
  970. }
  971. }
  972. if traffic.Email == "" {
  973. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  974. return nil, gorm.ErrRecordNotFound
  975. }
  976. // Retrieve ClientTraffic based on the found email
  977. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  978. if err != nil {
  979. if errors.Is(err, gorm.ErrRecordNotFound) {
  980. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  981. return nil, err
  982. }
  983. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  984. return nil, err
  985. }
  986. return traffic, nil
  987. }