inbound_traffic.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. "gorm.io/gorm/clause"
  16. )
  17. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  18. err = submitTrafficWrite(func() error {
  19. var inner error
  20. needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  21. return inner
  22. })
  23. return
  24. }
  25. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
  26. var err error
  27. db := database.GetDB()
  28. tx := db.Begin()
  29. defer func() {
  30. if err != nil {
  31. tx.Rollback()
  32. } else {
  33. tx.Commit()
  34. }
  35. }()
  36. err = s.addInboundTraffic(tx, inboundTraffics)
  37. if err != nil {
  38. return false, false, err
  39. }
  40. err = s.addClientTraffic(tx, clientTraffics)
  41. if err != nil {
  42. return false, false, err
  43. }
  44. needRestart0, count, err := s.autoRenewClients(tx)
  45. if err != nil {
  46. logger.Warning("Error in renew clients:", err)
  47. } else if count > 0 {
  48. logger.Debugf("%v clients renewed", count)
  49. }
  50. disabledClientsCount := int64(0)
  51. needRestart1, count, err := s.disableInvalidClients(tx)
  52. if err != nil {
  53. logger.Warning("Error in disabling invalid clients:", err)
  54. } else if count > 0 {
  55. logger.Debugf("%v clients disabled", count)
  56. disabledClientsCount = count
  57. }
  58. needRestart2, count, err := s.disableInvalidInbounds(tx)
  59. if err != nil {
  60. logger.Warning("Error in disabling invalid inbounds:", err)
  61. } else if count > 0 {
  62. logger.Debugf("%v inbounds disabled", count)
  63. }
  64. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
  65. }
  66. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  67. if len(traffics) == 0 {
  68. return nil
  69. }
  70. var err error
  71. for _, traffic := range traffics {
  72. if traffic.IsInbound {
  73. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  74. Updates(map[string]any{
  75. "up": gorm.Expr("up + ?", traffic.Up),
  76. "down": gorm.Expr("down + ?", traffic.Down),
  77. }).Error
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. }
  83. return nil
  84. }
  85. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  86. if len(traffics) == 0 {
  87. return nil
  88. }
  89. emails := make([]string, 0, len(traffics))
  90. for _, traffic := range traffics {
  91. emails = append(emails, traffic.Email)
  92. }
  93. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  94. // Match purely by email. client_traffics is email-keyed (one shared row per
  95. // email regardless of how many inbounds the client is attached to), and these
  96. // emails come from the local xray's report, so they always belong to a client
  97. // attached to a local inbound. The old `inbound_id NOT IN (node inbounds)`
  98. // filter dropped the local traffic of a client attached to both a node and the
  99. // mother inbound whenever the node inbound happened to be attached first — its
  100. // shared row then carried the node inbound's id (AddClientStat uses OnConflict
  101. // DoNothing and never refreshes it), so the local poll skipped it entirely.
  102. err = tx.Model(xray.ClientTraffic{}).
  103. Where("email IN (?)", emails).
  104. Find(&dbClientTraffics).Error
  105. if err != nil {
  106. return err
  107. }
  108. // Avoid empty slice error
  109. if len(dbClientTraffics) == 0 {
  110. return nil
  111. }
  112. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  113. if err != nil {
  114. return err
  115. }
  116. // Index by email for O(N) merge.
  117. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  118. for i := range traffics {
  119. if traffics[i] != nil {
  120. trafficByEmail[traffics[i].Email] = traffics[i]
  121. }
  122. }
  123. now := time.Now().UnixMilli()
  124. // Use atomic per-row UPDATE instead of read-modify-write Save. tx.Save
  125. // issues UPDATEs in slice order, which varies between concurrent callers;
  126. // on PostgreSQL two transactions locking the same rows in opposite order
  127. // deadlock. An atomic "SET up = up + ?" never holds a row lock across a
  128. // subsequent lock acquisition, so concurrent writers cannot deadlock.
  129. for _, ct := range dbClientTraffics {
  130. t, ok := trafficByEmail[ct.Email]
  131. if !ok || (t.Up == 0 && t.Down == 0) {
  132. continue
  133. }
  134. if err = tx.Exec(
  135. fmt.Sprintf(
  136. `UPDATE client_traffics SET up = up + ?, down = down + ?, last_online = %s WHERE email = ?`,
  137. database.GreatestExpr("last_online", "?"),
  138. ),
  139. t.Up, t.Down, now, ct.Email,
  140. ).Error; err != nil {
  141. logger.Warning("AddClientTraffic update data ", err)
  142. }
  143. }
  144. // adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
  145. // deadline) in-memory. Persist that conversion now since the traffic UPDATE
  146. // above only touches up/down/last_online.
  147. for _, ct := range dbClientTraffics {
  148. if ct.ExpiryTime > 0 {
  149. if err = tx.Exec(
  150. `UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
  151. ct.ExpiryTime, ct.Email,
  152. ).Error; err != nil {
  153. logger.Warning("AddClientTraffic update expiry_time ", err)
  154. }
  155. }
  156. }
  157. return nil
  158. }
  159. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  160. now := time.Now().UnixMilli()
  161. // "Start After First Use" stores a negative expiry (the duration). On the
  162. // first traffic tick it becomes an absolute deadline of now+duration. Compute
  163. // it once per email so every inbound the client is attached to lands on the
  164. // same value (recomputing per inbound would skip all but the first one).
  165. newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
  166. for traffic_index := range dbClientTraffics {
  167. if dbClientTraffics[traffic_index].ExpiryTime < 0 {
  168. newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
  169. }
  170. }
  171. if len(newExpiryByEmail) == 0 {
  172. return dbClientTraffics, nil
  173. }
  174. delayedEmails := make([]string, 0, len(newExpiryByEmail))
  175. for email := range newExpiryByEmail {
  176. delayedEmails = append(delayedEmails, email)
  177. }
  178. // Resolve the owning inbounds through the client_inbounds link, which is
  179. // authoritative. client_traffics.inbound_id goes stale when an inbound is
  180. // deleted and recreated, which would leave the negative expiry unconverted.
  181. var inboundIds []int
  182. err := tx.Table("client_inbounds").
  183. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  184. Where("clients.email IN (?)", delayedEmails).
  185. Distinct().
  186. Pluck("client_inbounds.inbound_id", &inboundIds).Error
  187. if err != nil {
  188. return nil, err
  189. }
  190. if len(inboundIds) == 0 {
  191. return dbClientTraffics, nil
  192. }
  193. var inbounds []*model.Inbound
  194. err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  195. if err != nil {
  196. return nil, err
  197. }
  198. for inbound_index := range inbounds {
  199. settings := map[string]any{}
  200. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  201. clients, ok := settings["clients"].([]any)
  202. if ok {
  203. var newClients []any
  204. for client_index := range clients {
  205. c := clients[client_index].(map[string]any)
  206. email, _ := c["email"].(string)
  207. if newExpiry, ok := newExpiryByEmail[email]; ok {
  208. c["expiryTime"] = newExpiry
  209. c["updated_at"] = now
  210. }
  211. if _, ok := c["created_at"]; !ok {
  212. c["created_at"] = now
  213. }
  214. if _, ok := c["updated_at"]; !ok {
  215. c["updated_at"] = now
  216. }
  217. newClients = append(newClients, any(c))
  218. }
  219. settings["clients"] = newClients
  220. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  221. if err != nil {
  222. return nil, err
  223. }
  224. inbounds[inbound_index].Settings = string(modifiedSettings)
  225. }
  226. }
  227. for traffic_index := range dbClientTraffics {
  228. if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
  229. dbClientTraffics[traffic_index].ExpiryTime = newExpiry
  230. }
  231. }
  232. err = tx.Save(inbounds).Error
  233. if err != nil {
  234. logger.Warning("AddClientTraffic update inbounds ", err)
  235. logger.Error(inbounds)
  236. } else {
  237. for _, ib := range inbounds {
  238. if ib == nil {
  239. continue
  240. }
  241. cs, gcErr := s.GetClients(ib)
  242. if gcErr != nil {
  243. logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
  244. continue
  245. }
  246. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  247. logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
  248. }
  249. }
  250. }
  251. return dbClientTraffics, nil
  252. }
  253. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  254. // check for time expired
  255. var traffics []*xray.ClientTraffic
  256. now := time.Now().Unix() * 1000
  257. var err, err1 error
  258. // Filter to clients that have at least one local inbound. Using
  259. // client_traffics.inbound_id is wrong: it goes stale after an inbound is
  260. // deleted/recreated and always points to the first inbound the client was
  261. // attached to, so it could be a node inbound even when the client also has
  262. // local inbounds. The email-based join through client_inbounds is authoritative.
  263. err = tx.Model(xray.ClientTraffic{}).
  264. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  265. Where("email IN (?)", tx.Table("client_inbounds ci").
  266. Select("c.email").
  267. Joins("JOIN clients c ON c.id = ci.client_id").
  268. Joins("JOIN inbounds i ON i.id = ci.inbound_id").
  269. Where("i.node_id IS NULL")).
  270. Find(&traffics).Error
  271. if err != nil {
  272. return false, 0, err
  273. }
  274. // return if there is no client to renew
  275. if len(traffics) == 0 {
  276. return false, 0, nil
  277. }
  278. var inbound_ids []int
  279. var inbounds []*model.Inbound
  280. needRestart := false
  281. var clientsToAdd []struct {
  282. protocol string
  283. tag string
  284. client map[string]any
  285. }
  286. // Resolve the inbounds to renew through the client_inbounds link rather than
  287. // client_traffics.inbound_id, which goes stale after an inbound is deleted and
  288. // recreated and would otherwise skip the renew entirely.
  289. renewEmails := make([]string, 0, len(traffics))
  290. for _, traffic := range traffics {
  291. renewEmails = append(renewEmails, traffic.Email)
  292. }
  293. for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
  294. var ids []int
  295. if err = tx.Table("client_inbounds").
  296. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  297. Where("clients.email IN ?", batch).
  298. Distinct().
  299. Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
  300. return false, 0, err
  301. }
  302. inbound_ids = append(inbound_ids, ids...)
  303. }
  304. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  305. // per tick instead of N times across chunk boundaries.
  306. inbound_ids = uniqueInts(inbound_ids)
  307. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  308. // are touched in a single tick.
  309. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  310. var page []*model.Inbound
  311. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  312. return false, 0, err
  313. }
  314. inbounds = append(inbounds, page...)
  315. }
  316. // Index the expired traffics by email so each client is an O(1) lookup
  317. // instead of a linear scan of every expired row (O(clients × expired) per
  318. // inbound, quadratic at scale). Pointers keep the in-place mutation below.
  319. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  320. for i := range traffics {
  321. trafficByEmail[traffics[i].Email] = traffics[i]
  322. }
  323. for inbound_index := range inbounds {
  324. settings := map[string]any{}
  325. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  326. clients, _ := settings["clients"].([]any)
  327. if len(clients) == 0 {
  328. continue
  329. }
  330. for client_index := range clients {
  331. c := clients[client_index].(map[string]any)
  332. email, _ := c["email"].(string)
  333. traffic, ok := trafficByEmail[email]
  334. if !ok {
  335. continue
  336. }
  337. newExpiryTime := traffic.ExpiryTime
  338. for newExpiryTime < now {
  339. newExpiryTime += (int64(traffic.Reset) * 86400000)
  340. }
  341. c["expiryTime"] = newExpiryTime
  342. traffic.ExpiryTime = newExpiryTime
  343. traffic.Down = 0
  344. traffic.Up = 0
  345. if !traffic.Enable {
  346. traffic.Enable = true
  347. c["enable"] = true
  348. clientsToAdd = append(clientsToAdd,
  349. struct {
  350. protocol string
  351. tag string
  352. client map[string]any
  353. }{
  354. protocol: string(inbounds[inbound_index].Protocol),
  355. tag: inbounds[inbound_index].Tag,
  356. client: c,
  357. })
  358. }
  359. clients[client_index] = any(c)
  360. }
  361. settings["clients"] = clients
  362. newSettings, err := json.MarshalIndent(settings, "", " ")
  363. if err != nil {
  364. return false, 0, err
  365. }
  366. inbounds[inbound_index].Settings = string(newSettings)
  367. }
  368. err = tx.Save(inbounds).Error
  369. if err != nil {
  370. return false, 0, err
  371. }
  372. for _, ib := range inbounds {
  373. if ib == nil {
  374. continue
  375. }
  376. cs, gcErr := s.GetClients(ib)
  377. if gcErr != nil {
  378. logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
  379. continue
  380. }
  381. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  382. logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
  383. }
  384. }
  385. err = tx.Save(traffics).Error
  386. if err != nil {
  387. return false, 0, err
  388. }
  389. // A renewed client starts a fresh quota window: drop the cross-panel rows
  390. // too, or the stale pushed totals would re-deplete it immediately.
  391. if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
  392. return false, 0, err
  393. }
  394. if p != nil {
  395. err1 = s.xrayApi.Init(p.GetAPIPort())
  396. if err1 != nil {
  397. return true, int64(len(traffics)), nil
  398. }
  399. for _, clientToAdd := range clientsToAdd {
  400. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  401. if err1 != nil {
  402. needRestart = true
  403. }
  404. }
  405. s.xrayApi.Close()
  406. }
  407. return needRestart, int64(len(traffics)), nil
  408. }
  409. // AddClientStat inserts a per-client accounting row, no-op on email
  410. // conflict. Xray reports traffic per email, so the surviving row acts as
  411. // the shared accumulator for inbounds that re-use the same identity.
  412. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  413. clientTraffic := xray.ClientTraffic{
  414. InboundId: inboundId,
  415. Email: client.Email,
  416. Total: client.TotalGB,
  417. ExpiryTime: client.ExpiryTime,
  418. Enable: client.Enable,
  419. Reset: client.Reset,
  420. }
  421. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  422. Create(&clientTraffic).Error
  423. }
  424. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  425. result := tx.Model(xray.ClientTraffic{}).
  426. Where("email = ?", email).
  427. Updates(map[string]any{
  428. "enable": client.Enable,
  429. "email": client.Email,
  430. "total": client.TotalGB,
  431. "expiry_time": client.ExpiryTime,
  432. "reset": client.Reset,
  433. })
  434. err := result.Error
  435. return err
  436. }
  437. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  438. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{email}); err != nil {
  439. return err
  440. }
  441. if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
  442. return err
  443. }
  444. if err := clearGlobalTraffic(tx, email); err != nil {
  445. return err
  446. }
  447. return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
  448. }
  449. func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
  450. if err := adjustGroupBaselinesForRemovedTraffic(tx, emails); err != nil {
  451. return err
  452. }
  453. const chunk = 400
  454. for start := 0; start < len(emails); start += chunk {
  455. end := min(start+chunk, len(emails))
  456. batch := emails[start:end]
  457. if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
  458. return err
  459. }
  460. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  461. return err
  462. }
  463. if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  464. return err
  465. }
  466. }
  467. return nil
  468. }
  469. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  470. return submitTrafficWrite(func() error {
  471. return database.GetDB().Transaction(func(tx *gorm.DB) error {
  472. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  473. return err
  474. }
  475. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  476. return err
  477. }
  478. if err := tx.Model(xray.ClientTraffic{}).
  479. Where("email = ?", clientEmail).
  480. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error; err != nil {
  481. return err
  482. }
  483. return tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error
  484. })
  485. })
  486. }
  487. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  488. err = submitTrafficWrite(func() error {
  489. var inner error
  490. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  491. return inner
  492. })
  493. return
  494. }
  495. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  496. needRestart := false
  497. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  498. if err != nil {
  499. return false, err
  500. }
  501. if !traffic.Enable {
  502. inbound, err := s.GetInbound(id)
  503. if err != nil {
  504. return false, err
  505. }
  506. clients, err := s.GetClients(inbound)
  507. if err != nil {
  508. return false, err
  509. }
  510. for _, client := range clients {
  511. if client.Email == clientEmail && client.Enable {
  512. rt, push, _, perr := s.nodePushPlan(inbound)
  513. if perr != nil {
  514. return false, perr
  515. }
  516. if !push {
  517. if inbound.NodeID == nil {
  518. needRestart = true
  519. }
  520. break
  521. }
  522. cipher := ""
  523. if string(inbound.Protocol) == "shadowsocks" {
  524. var oldSettings map[string]any
  525. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  526. if err != nil {
  527. return false, err
  528. }
  529. cipher = oldSettings["method"].(string)
  530. }
  531. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  532. "email": client.Email,
  533. "id": client.ID,
  534. "auth": client.Auth,
  535. "security": client.Security,
  536. "flow": client.Flow,
  537. "password": client.Password,
  538. "cipher": cipher,
  539. })
  540. if err1 == nil {
  541. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  542. } else if inbound.NodeID != nil {
  543. logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
  544. } else {
  545. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  546. needRestart = true
  547. }
  548. break
  549. }
  550. }
  551. }
  552. traffic.Up = 0
  553. traffic.Down = 0
  554. traffic.Enable = true
  555. db := database.GetDB()
  556. now := time.Now().UnixMilli()
  557. inbound, err := s.GetInbound(id)
  558. if err != nil {
  559. return false, err
  560. }
  561. if err := db.Transaction(func(tx *gorm.DB) error {
  562. if err := adjustGroupBaselinesForRemovedTraffic(tx, []string{clientEmail}); err != nil {
  563. return err
  564. }
  565. if err := tx.Save(traffic).Error; err != nil {
  566. return err
  567. }
  568. if err := clearGlobalTraffic(tx, clientEmail); err != nil {
  569. return err
  570. }
  571. if err := tx.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  572. return err
  573. }
  574. if err := tx.Model(model.Inbound{}).
  575. Where("id = ?", id).
  576. Update("last_traffic_reset_time", now).Error; err != nil {
  577. return err
  578. }
  579. if inbound != nil && inbound.NodeID != nil {
  580. return (&NodeService{}).MarkNodeDirtyTx(tx, *inbound.NodeID)
  581. }
  582. return nil
  583. }); err != nil {
  584. return false, err
  585. }
  586. if inbound != nil && inbound.NodeID != nil {
  587. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  588. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  589. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  590. }
  591. } else {
  592. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  593. }
  594. }
  595. return needRestart, nil
  596. }
  597. func (s *InboundService) ResetAllTraffics() error {
  598. return submitTrafficWrite(func() error {
  599. return s.resetAllTrafficsLocked()
  600. })
  601. }
  602. func (s *InboundService) resetAllTrafficsLocked() error {
  603. db := database.GetDB()
  604. now := time.Now().UnixMilli()
  605. if err := db.Model(model.Inbound{}).
  606. Where("user_id > ?", 0).
  607. Updates(map[string]any{
  608. "up": 0,
  609. "down": 0,
  610. "last_traffic_reset_time": now,
  611. }).Error; err != nil {
  612. return err
  613. }
  614. nodes, err := (&NodeService{}).GetAll()
  615. if err == nil {
  616. for _, node := range nodes {
  617. if rt, err := runtime.GetManager().RuntimeFor(&node.Id); err == nil {
  618. if e := rt.ResetAllTraffics(context.Background()); e != nil {
  619. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  620. }
  621. }
  622. }
  623. }
  624. return nil
  625. }
  626. func (s *InboundService) ResetInboundTraffic(id int) error {
  627. return submitTrafficWrite(func() error {
  628. db := database.GetDB()
  629. if err := db.Model(model.Inbound{}).
  630. Where("id = ?", id).
  631. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  632. return err
  633. }
  634. inbound, err := s.GetInbound(id)
  635. if err == nil && inbound != nil && inbound.NodeID != nil {
  636. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  637. if e := rt.ResetInboundTraffic(context.Background(), inbound); e != nil {
  638. logger.Warning("ResetInboundTraffic: remote propagation to", rt.Name(), "failed:", e)
  639. }
  640. } else {
  641. logger.Warning("ResetInboundTraffic: runtime lookup failed:", rterr)
  642. }
  643. }
  644. return nil
  645. })
  646. }
  647. func (s *InboundService) DelDepletedClients(id int) (err error) {
  648. db := database.GetDB()
  649. tx := db.Begin()
  650. defer func() {
  651. if err == nil {
  652. tx.Commit()
  653. } else {
  654. tx.Rollback()
  655. }
  656. }()
  657. // Collect depleted emails globally — a shared-email row owned by one
  658. // inbound depletes every sibling that lists the email.
  659. now := time.Now().Unix() * 1000
  660. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  661. var depletedRows []xray.ClientTraffic
  662. err = db.Model(xray.ClientTraffic{}).
  663. Where(depletedClause, now).
  664. Find(&depletedRows).Error
  665. if err != nil {
  666. return err
  667. }
  668. if len(depletedRows) == 0 {
  669. return nil
  670. }
  671. depletedEmails := make(map[string]struct{}, len(depletedRows))
  672. for _, r := range depletedRows {
  673. if r.Email == "" {
  674. continue
  675. }
  676. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  677. }
  678. if len(depletedEmails) == 0 {
  679. return nil
  680. }
  681. var inbounds []*model.Inbound
  682. inboundQuery := db.Model(model.Inbound{})
  683. if id >= 0 {
  684. inboundQuery = inboundQuery.Where("id = ?", id)
  685. }
  686. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  687. return err
  688. }
  689. for _, inbound := range inbounds {
  690. var settings map[string]any
  691. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  692. return err
  693. }
  694. rawClients, ok := settings["clients"].([]any)
  695. if !ok {
  696. continue
  697. }
  698. newClients := make([]any, 0, len(rawClients))
  699. removed := 0
  700. for _, client := range rawClients {
  701. c, ok := client.(map[string]any)
  702. if !ok {
  703. newClients = append(newClients, client)
  704. continue
  705. }
  706. email, _ := c["email"].(string)
  707. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  708. removed++
  709. continue
  710. }
  711. newClients = append(newClients, client)
  712. }
  713. if removed == 0 {
  714. continue
  715. }
  716. if len(newClients) == 0 {
  717. _, _ = s.DelInbound(inbound.Id)
  718. continue
  719. }
  720. settings["clients"] = newClients
  721. ns, mErr := json.MarshalIndent(settings, "", " ")
  722. if mErr != nil {
  723. return mErr
  724. }
  725. inbound.Settings = string(ns)
  726. if err = tx.Save(inbound).Error; err != nil {
  727. return err
  728. }
  729. survivingClients, gcErr := s.GetClients(inbound)
  730. if gcErr != nil {
  731. err = gcErr
  732. return err
  733. }
  734. if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
  735. return err
  736. }
  737. }
  738. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  739. // no out-of-scope inbound still references the email.
  740. if id < 0 {
  741. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  742. return err
  743. }
  744. emails := make([]string, 0, len(depletedEmails))
  745. for e := range depletedEmails {
  746. emails = append(emails, e)
  747. }
  748. var stillReferenced []string
  749. emailExpr := database.JSONFieldText("client.value", "email")
  750. stillQuery := fmt.Sprintf(
  751. "SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
  752. emailExpr,
  753. database.JSONClientsFromInbound(),
  754. emailExpr,
  755. )
  756. if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
  757. return err
  758. }
  759. stillSet := make(map[string]struct{}, len(stillReferenced))
  760. for _, e := range stillReferenced {
  761. stillSet[e] = struct{}{}
  762. }
  763. toDelete := make([]string, 0, len(emails))
  764. for _, e := range emails {
  765. if _, kept := stillSet[e]; !kept {
  766. toDelete = append(toDelete, e)
  767. }
  768. }
  769. if len(toDelete) > 0 {
  770. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  771. return err
  772. }
  773. }
  774. return nil
  775. }
  776. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  777. db := database.GetDB()
  778. var inbounds []*model.Inbound
  779. // Retrieve inbounds where settings contain the given tgId
  780. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  781. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  782. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  783. return nil, err
  784. }
  785. var emails []string
  786. for _, inbound := range inbounds {
  787. clients, err := s.GetClients(inbound)
  788. if err != nil {
  789. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  790. continue
  791. }
  792. for _, client := range clients {
  793. if client.TgID == tgId {
  794. emails = append(emails, client.Email)
  795. }
  796. }
  797. }
  798. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  799. // account owns thousands of clients across inbounds.
  800. uniqEmails := uniqueNonEmptyStrings(emails)
  801. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  802. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  803. var page []*xray.ClientTraffic
  804. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  805. if errors.Is(err, gorm.ErrRecordNotFound) {
  806. continue
  807. }
  808. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  809. return nil, err
  810. }
  811. traffics = append(traffics, page...)
  812. }
  813. if len(traffics) == 0 {
  814. logger.Warning("No ClientTraffic records found for emails:", emails)
  815. return nil, nil
  816. }
  817. // Populate UUID and other client data for each traffic record
  818. for i := range traffics {
  819. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  820. traffics[i].Enable = client.Enable
  821. traffics[i].UUID = client.ID
  822. traffics[i].SubId = client.SubID
  823. }
  824. }
  825. return traffics, nil
  826. }
  827. // BumpClientsLastOnline sets client_traffics.last_online to now for the given
  828. // emails. Used in online-API mode for clients that hold a live connection but
  829. // moved no bytes this poll — the traffic path (addClientTraffic) only bumps
  830. // last_online on a non-zero delta, so idle-but-connected clients would
  831. // otherwise show a stale "last online" while being reported online.
  832. func (s *InboundService) BumpClientsLastOnline(emails []string) error {
  833. uniq := uniqueNonEmptyStrings(emails)
  834. if len(uniq) == 0 {
  835. return nil
  836. }
  837. now := time.Now().UnixMilli()
  838. return submitTrafficWrite(func() error {
  839. db := database.GetDB()
  840. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  841. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil {
  842. return err
  843. }
  844. }
  845. return nil
  846. })
  847. }
  848. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  849. uniq := uniqueNonEmptyStrings(emails)
  850. if len(uniq) == 0 {
  851. return nil, nil
  852. }
  853. db := database.GetDB()
  854. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  855. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  856. var page []*xray.ClientTraffic
  857. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  858. return nil, err
  859. }
  860. traffics = append(traffics, page...)
  861. }
  862. return traffics, nil
  863. }
  864. // GetAllClientTraffics returns the full set of client_traffics rows so the
  865. // websocket broadcasters can ship a complete snapshot every cycle. The old
  866. // delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
  867. // the per-client section whenever no client moved bytes in the cycle or a
  868. // node sync failed, leaving client rows in the UI stuck at stale numbers.
  869. func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
  870. db := database.GetDB()
  871. var traffics []*xray.ClientTraffic
  872. if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
  873. return nil, err
  874. }
  875. overlayGlobalTraffic(db, traffics)
  876. return traffics, nil
  877. }
  878. type InboundTrafficSummary struct {
  879. Id int `json:"id"`
  880. Up int64 `json:"up"`
  881. Down int64 `json:"down"`
  882. Total int64 `json:"total"`
  883. Enable bool `json:"enable"`
  884. }
  885. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  886. db := database.GetDB()
  887. var summaries []InboundTrafficSummary
  888. if err := db.Model(&model.Inbound{}).
  889. Select("id, up, down, total, enable").
  890. Find(&summaries).Error; err != nil {
  891. return nil, err
  892. }
  893. return summaries, nil
  894. }
  895. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  896. db := database.GetDB()
  897. var traffics []*xray.ClientTraffic
  898. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil {
  899. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  900. return nil, err
  901. }
  902. if len(traffics) == 0 {
  903. return nil, nil
  904. }
  905. overlayGlobalTraffic(db, traffics)
  906. t := traffics[0]
  907. if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
  908. c := rec.ToClient()
  909. t.UUID = c.ID
  910. t.SubId = c.SubID
  911. return t, nil
  912. }
  913. t2, client, err := s.GetClientByEmail(email)
  914. if err != nil {
  915. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  916. return nil, err
  917. }
  918. if t2 != nil && client != nil {
  919. t2.UUID = client.ID
  920. t2.SubId = client.SubID
  921. return t2, nil
  922. }
  923. return nil, nil
  924. }
  925. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  926. return submitTrafficWrite(func() error {
  927. db := database.GetDB()
  928. err := db.Model(xray.ClientTraffic{}).
  929. Where("email = ?", email).
  930. Updates(map[string]any{
  931. "up": upload,
  932. "down": download,
  933. }).Error
  934. if err != nil {
  935. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  936. }
  937. return err
  938. })
  939. }
  940. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  941. db := database.GetDB()
  942. inbound := &model.Inbound{}
  943. traffic = &xray.ClientTraffic{}
  944. // Search for inbound settings that contain the query
  945. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  946. if err != nil {
  947. if errors.Is(err, gorm.ErrRecordNotFound) {
  948. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  949. return nil, err
  950. }
  951. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  952. return nil, err
  953. }
  954. traffic.InboundId = inbound.Id
  955. // Unmarshal settings to get clients
  956. settings := map[string][]model.Client{}
  957. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  958. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  959. return nil, err
  960. }
  961. clients := settings["clients"]
  962. for _, client := range clients {
  963. if (client.ID == query || client.Password == query) && client.Email != "" {
  964. traffic.Email = client.Email
  965. break
  966. }
  967. }
  968. if traffic.Email == "" {
  969. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  970. return nil, gorm.ErrRecordNotFound
  971. }
  972. // Retrieve ClientTraffic based on the found email
  973. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  974. if err != nil {
  975. if errors.Is(err, gorm.ErrRecordNotFound) {
  976. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  977. return nil, err
  978. }
  979. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  980. return nil, err
  981. }
  982. return traffic, nil
  983. }