inbound_traffic.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  12. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  13. "gorm.io/gorm"
  14. "gorm.io/gorm/clause"
  15. )
  16. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  17. var disabledNodeIDs []int
  18. err = submitTrafficWrite(func() error {
  19. var inner error
  20. needRestart, clientsDisabled, disabledNodeIDs, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  21. return inner
  22. })
  23. if err == nil && len(disabledNodeIDs) > 0 {
  24. s.restartRemoteNodesOnDisable(disabledNodeIDs)
  25. }
  26. return
  27. }
  28. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, []int, error) {
  29. var err error
  30. db := database.GetDB()
  31. tx := db.Begin()
  32. defer func() {
  33. if err != nil {
  34. tx.Rollback()
  35. } else {
  36. tx.Commit()
  37. }
  38. }()
  39. err = s.addInboundTraffic(tx, inboundTraffics)
  40. if err != nil {
  41. return false, false, nil, err
  42. }
  43. err = s.addClientTraffic(tx, clientTraffics)
  44. if err != nil {
  45. return false, false, nil, err
  46. }
  47. needRestart0, count, err := s.autoRenewClients(tx)
  48. if err != nil {
  49. logger.Warning("Error in renew clients:", err)
  50. } else if count > 0 {
  51. logger.Debugf("%v clients renewed", count)
  52. }
  53. disabledClientsCount := int64(0)
  54. needRestart1, count, disabledNodeIDs, err := s.disableInvalidClients(tx)
  55. if err != nil {
  56. logger.Warning("Error in disabling invalid clients:", err)
  57. } else if count > 0 {
  58. logger.Debugf("%v clients disabled", count)
  59. disabledClientsCount = count
  60. }
  61. needRestart2, count, err := s.disableInvalidInbounds(tx)
  62. if err != nil {
  63. logger.Warning("Error in disabling invalid inbounds:", err)
  64. } else if count > 0 {
  65. logger.Debugf("%v inbounds disabled", count)
  66. }
  67. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, disabledNodeIDs, nil
  68. }
  69. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  70. if len(traffics) == 0 {
  71. return nil
  72. }
  73. var err error
  74. for _, traffic := range traffics {
  75. if traffic.IsInbound {
  76. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  77. Updates(map[string]any{
  78. "up": gorm.Expr("up + ?", traffic.Up),
  79. "down": gorm.Expr("down + ?", traffic.Down),
  80. }).Error
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. }
  86. return nil
  87. }
  88. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  89. if len(traffics) == 0 {
  90. return nil
  91. }
  92. emails := make([]string, 0, len(traffics))
  93. for _, traffic := range traffics {
  94. emails = append(emails, traffic.Email)
  95. }
  96. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  97. // Match purely by email. client_traffics is email-keyed (one shared row per
  98. // email regardless of how many inbounds the client is attached to), and these
  99. // emails come from the local xray's report, so they always belong to a client
  100. // attached to a local inbound. The old `inbound_id NOT IN (node inbounds)`
  101. // filter dropped the local traffic of a client attached to both a node and the
  102. // mother inbound whenever the node inbound happened to be attached first — its
  103. // shared row then carried the node inbound's id (AddClientStat uses OnConflict
  104. // DoNothing and never refreshes it), so the local poll skipped it entirely.
  105. err = tx.Model(xray.ClientTraffic{}).
  106. Where("email IN (?)", emails).
  107. Find(&dbClientTraffics).Error
  108. if err != nil {
  109. return err
  110. }
  111. // Avoid empty slice error
  112. if len(dbClientTraffics) == 0 {
  113. return nil
  114. }
  115. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  116. if err != nil {
  117. return err
  118. }
  119. // Index by email for O(N) merge.
  120. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  121. for i := range traffics {
  122. if traffics[i] != nil {
  123. trafficByEmail[traffics[i].Email] = traffics[i]
  124. }
  125. }
  126. now := time.Now().UnixMilli()
  127. // Use atomic per-row UPDATE instead of read-modify-write Save. tx.Save
  128. // issues UPDATEs in slice order, which varies between concurrent callers;
  129. // on PostgreSQL two transactions locking the same rows in opposite order
  130. // deadlock. An atomic "SET up = up + ?" never holds a row lock across a
  131. // subsequent lock acquisition, so concurrent writers cannot deadlock.
  132. for _, ct := range dbClientTraffics {
  133. t, ok := trafficByEmail[ct.Email]
  134. if !ok || (t.Up == 0 && t.Down == 0) {
  135. continue
  136. }
  137. if err = tx.Exec(
  138. fmt.Sprintf(
  139. `UPDATE client_traffics SET up = up + ?, down = down + ?, last_online = %s WHERE email = ?`,
  140. database.GreatestExpr("last_online", "?"),
  141. ),
  142. t.Up, t.Down, now, ct.Email,
  143. ).Error; err != nil {
  144. logger.Warning("AddClientTraffic update data ", err)
  145. }
  146. }
  147. // adjustTraffics converts delayed-start rows (negative ExpiryTime → absolute
  148. // deadline) in-memory. Persist that conversion now since the traffic UPDATE
  149. // above only touches up/down/last_online.
  150. for _, ct := range dbClientTraffics {
  151. if ct.ExpiryTime > 0 {
  152. if err = tx.Exec(
  153. `UPDATE client_traffics SET expiry_time = ? WHERE email = ? AND expiry_time < 0`,
  154. ct.ExpiryTime, ct.Email,
  155. ).Error; err != nil {
  156. logger.Warning("AddClientTraffic update expiry_time ", err)
  157. }
  158. }
  159. }
  160. return nil
  161. }
  162. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  163. now := time.Now().UnixMilli()
  164. // "Start After First Use" stores a negative expiry (the duration). On the
  165. // first traffic tick it becomes an absolute deadline of now+duration. Compute
  166. // it once per email so every inbound the client is attached to lands on the
  167. // same value (recomputing per inbound would skip all but the first one).
  168. newExpiryByEmail := make(map[string]int64, len(dbClientTraffics))
  169. for traffic_index := range dbClientTraffics {
  170. if dbClientTraffics[traffic_index].ExpiryTime < 0 {
  171. newExpiryByEmail[dbClientTraffics[traffic_index].Email] = now - dbClientTraffics[traffic_index].ExpiryTime
  172. }
  173. }
  174. if len(newExpiryByEmail) == 0 {
  175. return dbClientTraffics, nil
  176. }
  177. delayedEmails := make([]string, 0, len(newExpiryByEmail))
  178. for email := range newExpiryByEmail {
  179. delayedEmails = append(delayedEmails, email)
  180. }
  181. // Resolve the owning inbounds through the client_inbounds link, which is
  182. // authoritative. client_traffics.inbound_id goes stale when an inbound is
  183. // deleted and recreated, which would leave the negative expiry unconverted.
  184. var inboundIds []int
  185. err := tx.Table("client_inbounds").
  186. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  187. Where("clients.email IN (?)", delayedEmails).
  188. Distinct().
  189. Pluck("client_inbounds.inbound_id", &inboundIds).Error
  190. if err != nil {
  191. return nil, err
  192. }
  193. if len(inboundIds) == 0 {
  194. return dbClientTraffics, nil
  195. }
  196. var inbounds []*model.Inbound
  197. err = tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  198. if err != nil {
  199. return nil, err
  200. }
  201. for inbound_index := range inbounds {
  202. settings := map[string]any{}
  203. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  204. clients, ok := settings["clients"].([]any)
  205. if ok {
  206. var newClients []any
  207. for client_index := range clients {
  208. c := clients[client_index].(map[string]any)
  209. email, _ := c["email"].(string)
  210. if newExpiry, ok := newExpiryByEmail[email]; ok {
  211. c["expiryTime"] = newExpiry
  212. c["updated_at"] = now
  213. }
  214. if _, ok := c["created_at"]; !ok {
  215. c["created_at"] = now
  216. }
  217. if _, ok := c["updated_at"]; !ok {
  218. c["updated_at"] = now
  219. }
  220. newClients = append(newClients, any(c))
  221. }
  222. settings["clients"] = newClients
  223. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  224. if err != nil {
  225. return nil, err
  226. }
  227. inbounds[inbound_index].Settings = string(modifiedSettings)
  228. }
  229. }
  230. for traffic_index := range dbClientTraffics {
  231. if newExpiry, ok := newExpiryByEmail[dbClientTraffics[traffic_index].Email]; ok {
  232. dbClientTraffics[traffic_index].ExpiryTime = newExpiry
  233. }
  234. }
  235. err = tx.Save(inbounds).Error
  236. if err != nil {
  237. logger.Warning("AddClientTraffic update inbounds ", err)
  238. logger.Error(inbounds)
  239. } else {
  240. for _, ib := range inbounds {
  241. if ib == nil {
  242. continue
  243. }
  244. cs, gcErr := s.GetClients(ib)
  245. if gcErr != nil {
  246. logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
  247. continue
  248. }
  249. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  250. logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
  251. }
  252. }
  253. }
  254. return dbClientTraffics, nil
  255. }
  256. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  257. // check for time expired
  258. var traffics []*xray.ClientTraffic
  259. now := time.Now().Unix() * 1000
  260. var err, err1 error
  261. // Filter to clients that have at least one local inbound. Using
  262. // client_traffics.inbound_id is wrong: it goes stale after an inbound is
  263. // deleted/recreated and always points to the first inbound the client was
  264. // attached to, so it could be a node inbound even when the client also has
  265. // local inbounds. The email-based join through client_inbounds is authoritative.
  266. err = tx.Model(xray.ClientTraffic{}).
  267. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  268. Where("email IN (?)", tx.Table("client_inbounds ci").
  269. Select("c.email").
  270. Joins("JOIN clients c ON c.id = ci.client_id").
  271. Joins("JOIN inbounds i ON i.id = ci.inbound_id").
  272. Where("i.node_id IS NULL")).
  273. Find(&traffics).Error
  274. if err != nil {
  275. return false, 0, err
  276. }
  277. // return if there is no client to renew
  278. if len(traffics) == 0 {
  279. return false, 0, nil
  280. }
  281. var inbound_ids []int
  282. var inbounds []*model.Inbound
  283. needRestart := false
  284. var clientsToAdd []struct {
  285. protocol string
  286. tag string
  287. client map[string]any
  288. }
  289. // Resolve the inbounds to renew through the client_inbounds link rather than
  290. // client_traffics.inbound_id, which goes stale after an inbound is deleted and
  291. // recreated and would otherwise skip the renew entirely.
  292. renewEmails := make([]string, 0, len(traffics))
  293. for _, traffic := range traffics {
  294. renewEmails = append(renewEmails, traffic.Email)
  295. }
  296. for _, batch := range chunkStrings(renewEmails, sqliteMaxVars) {
  297. var ids []int
  298. if err = tx.Table("client_inbounds").
  299. Joins("JOIN clients ON clients.id = client_inbounds.client_id").
  300. Where("clients.email IN ?", batch).
  301. Distinct().
  302. Pluck("client_inbounds.inbound_id", &ids).Error; err != nil {
  303. return false, 0, err
  304. }
  305. inbound_ids = append(inbound_ids, ids...)
  306. }
  307. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  308. // per tick instead of N times across chunk boundaries.
  309. inbound_ids = uniqueInts(inbound_ids)
  310. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  311. // are touched in a single tick.
  312. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  313. var page []*model.Inbound
  314. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  315. return false, 0, err
  316. }
  317. inbounds = append(inbounds, page...)
  318. }
  319. // Index the expired traffics by email so each client is an O(1) lookup
  320. // instead of a linear scan of every expired row (O(clients × expired) per
  321. // inbound, quadratic at scale). Pointers keep the in-place mutation below.
  322. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  323. for i := range traffics {
  324. trafficByEmail[traffics[i].Email] = traffics[i]
  325. }
  326. for inbound_index := range inbounds {
  327. settings := map[string]any{}
  328. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  329. clients, _ := settings["clients"].([]any)
  330. if len(clients) == 0 {
  331. continue
  332. }
  333. for client_index := range clients {
  334. c := clients[client_index].(map[string]any)
  335. email, _ := c["email"].(string)
  336. traffic, ok := trafficByEmail[email]
  337. if !ok {
  338. continue
  339. }
  340. newExpiryTime := traffic.ExpiryTime
  341. for newExpiryTime < now {
  342. newExpiryTime += (int64(traffic.Reset) * 86400000)
  343. }
  344. c["expiryTime"] = newExpiryTime
  345. traffic.ExpiryTime = newExpiryTime
  346. traffic.Down = 0
  347. traffic.Up = 0
  348. if !traffic.Enable {
  349. traffic.Enable = true
  350. c["enable"] = true
  351. clientsToAdd = append(clientsToAdd,
  352. struct {
  353. protocol string
  354. tag string
  355. client map[string]any
  356. }{
  357. protocol: string(inbounds[inbound_index].Protocol),
  358. tag: inbounds[inbound_index].Tag,
  359. client: c,
  360. })
  361. }
  362. clients[client_index] = any(c)
  363. }
  364. settings["clients"] = clients
  365. newSettings, err := json.MarshalIndent(settings, "", " ")
  366. if err != nil {
  367. return false, 0, err
  368. }
  369. inbounds[inbound_index].Settings = string(newSettings)
  370. }
  371. err = tx.Save(inbounds).Error
  372. if err != nil {
  373. return false, 0, err
  374. }
  375. for _, ib := range inbounds {
  376. if ib == nil {
  377. continue
  378. }
  379. cs, gcErr := s.GetClients(ib)
  380. if gcErr != nil {
  381. logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
  382. continue
  383. }
  384. if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
  385. logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
  386. }
  387. }
  388. err = tx.Save(traffics).Error
  389. if err != nil {
  390. return false, 0, err
  391. }
  392. // A renewed client starts a fresh quota window: drop the cross-panel rows
  393. // too, or the stale pushed totals would re-deplete it immediately.
  394. if err = clearGlobalTraffic(tx, renewEmails...); err != nil {
  395. return false, 0, err
  396. }
  397. if p != nil {
  398. err1 = s.xrayApi.Init(p.GetAPIPort())
  399. if err1 != nil {
  400. return true, int64(len(traffics)), nil
  401. }
  402. for _, clientToAdd := range clientsToAdd {
  403. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  404. if err1 != nil {
  405. needRestart = true
  406. }
  407. }
  408. s.xrayApi.Close()
  409. }
  410. return needRestart, int64(len(traffics)), nil
  411. }
  412. // AddClientStat inserts a per-client accounting row, no-op on email
  413. // conflict. Xray reports traffic per email, so the surviving row acts as
  414. // the shared accumulator for inbounds that re-use the same identity.
  415. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  416. clientTraffic := xray.ClientTraffic{
  417. InboundId: inboundId,
  418. Email: client.Email,
  419. Total: client.TotalGB,
  420. ExpiryTime: client.ExpiryTime,
  421. Enable: client.Enable,
  422. Reset: client.Reset,
  423. }
  424. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  425. Create(&clientTraffic).Error
  426. }
  427. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  428. result := tx.Model(xray.ClientTraffic{}).
  429. Where("email = ?", email).
  430. Updates(map[string]any{
  431. "enable": client.Enable,
  432. "email": client.Email,
  433. "total": client.TotalGB,
  434. "expiry_time": client.ExpiryTime,
  435. "reset": client.Reset,
  436. })
  437. err := result.Error
  438. return err
  439. }
  440. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  441. if err := tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error; err != nil {
  442. return err
  443. }
  444. if err := clearGlobalTraffic(tx, email); err != nil {
  445. return err
  446. }
  447. return tx.Where("email = ?", email).Delete(&model.NodeClientTraffic{}).Error
  448. }
  449. func (s *InboundService) delClientStatsByEmails(tx *gorm.DB, emails []string) error {
  450. const chunk = 400
  451. for start := 0; start < len(emails); start += chunk {
  452. end := min(start+chunk, len(emails))
  453. batch := emails[start:end]
  454. if err := tx.Where("email IN ?", batch).Delete(xray.ClientTraffic{}).Error; err != nil {
  455. return err
  456. }
  457. if err := tx.Where("email IN ?", batch).Delete(&model.ClientGlobalTraffic{}).Error; err != nil {
  458. return err
  459. }
  460. if err := tx.Where("email IN ?", batch).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  461. return err
  462. }
  463. }
  464. return nil
  465. }
  466. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  467. return submitTrafficWrite(func() error {
  468. db := database.GetDB()
  469. if err := clearGlobalTraffic(db, clientEmail); err != nil {
  470. return err
  471. }
  472. if err := db.Model(xray.ClientTraffic{}).
  473. Where("email = ?", clientEmail).
  474. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error; err != nil {
  475. return err
  476. }
  477. return db.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error
  478. })
  479. }
  480. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  481. err = submitTrafficWrite(func() error {
  482. var inner error
  483. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  484. return inner
  485. })
  486. return
  487. }
  488. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  489. needRestart := false
  490. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  491. if err != nil {
  492. return false, err
  493. }
  494. if !traffic.Enable {
  495. inbound, err := s.GetInbound(id)
  496. if err != nil {
  497. return false, err
  498. }
  499. clients, err := s.GetClients(inbound)
  500. if err != nil {
  501. return false, err
  502. }
  503. for _, client := range clients {
  504. if client.Email == clientEmail && client.Enable {
  505. rt, push, dirty, perr := s.nodePushPlan(inbound)
  506. if perr != nil {
  507. return false, perr
  508. }
  509. if !push {
  510. if inbound.NodeID != nil {
  511. if dirty {
  512. if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
  513. logger.Warning("mark node dirty failed:", dErr)
  514. }
  515. }
  516. } else {
  517. needRestart = true
  518. }
  519. break
  520. }
  521. cipher := ""
  522. if string(inbound.Protocol) == "shadowsocks" {
  523. var oldSettings map[string]any
  524. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  525. if err != nil {
  526. return false, err
  527. }
  528. cipher = oldSettings["method"].(string)
  529. }
  530. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  531. "email": client.Email,
  532. "id": client.ID,
  533. "auth": client.Auth,
  534. "security": client.Security,
  535. "flow": client.Flow,
  536. "password": client.Password,
  537. "cipher": cipher,
  538. })
  539. if err1 == nil {
  540. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  541. } else if inbound.NodeID != nil {
  542. logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
  543. if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
  544. logger.Warning("mark node dirty failed:", dErr)
  545. }
  546. } else {
  547. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  548. needRestart = true
  549. }
  550. break
  551. }
  552. }
  553. }
  554. traffic.Up = 0
  555. traffic.Down = 0
  556. traffic.Enable = true
  557. db := database.GetDB()
  558. err = db.Save(traffic).Error
  559. if err != nil {
  560. return false, err
  561. }
  562. if err := clearGlobalTraffic(db, clientEmail); err != nil {
  563. return false, err
  564. }
  565. if err := db.Where("email = ?", clientEmail).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  566. return false, err
  567. }
  568. now := time.Now().UnixMilli()
  569. _ = db.Model(model.Inbound{}).
  570. Where("id = ?", id).
  571. Update("last_traffic_reset_time", now).Error
  572. inbound, err := s.GetInbound(id)
  573. if err == nil && inbound != nil && inbound.NodeID != nil {
  574. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  575. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  576. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  577. }
  578. } else {
  579. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  580. }
  581. }
  582. return needRestart, nil
  583. }
  584. func (s *InboundService) ResetAllTraffics() error {
  585. return submitTrafficWrite(func() error {
  586. return s.resetAllTrafficsLocked()
  587. })
  588. }
  589. func (s *InboundService) resetAllTrafficsLocked() error {
  590. db := database.GetDB()
  591. now := time.Now().UnixMilli()
  592. if err := db.Model(model.Inbound{}).
  593. Where("user_id > ?", 0).
  594. Updates(map[string]any{
  595. "up": 0,
  596. "down": 0,
  597. "last_traffic_reset_time": now,
  598. }).Error; err != nil {
  599. return err
  600. }
  601. nodes, err := (&NodeService{}).GetAll()
  602. if err == nil {
  603. for _, node := range nodes {
  604. if rt, err := runtime.GetManager().RuntimeFor(&node.Id); err == nil {
  605. if e := rt.ResetAllTraffics(context.Background()); e != nil {
  606. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  607. }
  608. }
  609. }
  610. }
  611. return nil
  612. }
  613. func (s *InboundService) ResetInboundTraffic(id int) error {
  614. return submitTrafficWrite(func() error {
  615. db := database.GetDB()
  616. if err := db.Model(model.Inbound{}).
  617. Where("id = ?", id).
  618. Updates(map[string]any{"up": 0, "down": 0}).Error; err != nil {
  619. return err
  620. }
  621. inbound, err := s.GetInbound(id)
  622. if err == nil && inbound != nil && inbound.NodeID != nil {
  623. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  624. if e := rt.ResetInboundTraffic(context.Background(), inbound); e != nil {
  625. logger.Warning("ResetInboundTraffic: remote propagation to", rt.Name(), "failed:", e)
  626. }
  627. } else {
  628. logger.Warning("ResetInboundTraffic: runtime lookup failed:", rterr)
  629. }
  630. }
  631. return nil
  632. })
  633. }
  634. func (s *InboundService) DelDepletedClients(id int) (err error) {
  635. db := database.GetDB()
  636. tx := db.Begin()
  637. defer func() {
  638. if err == nil {
  639. tx.Commit()
  640. } else {
  641. tx.Rollback()
  642. }
  643. }()
  644. // Collect depleted emails globally — a shared-email row owned by one
  645. // inbound depletes every sibling that lists the email.
  646. now := time.Now().Unix() * 1000
  647. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  648. var depletedRows []xray.ClientTraffic
  649. err = db.Model(xray.ClientTraffic{}).
  650. Where(depletedClause, now).
  651. Find(&depletedRows).Error
  652. if err != nil {
  653. return err
  654. }
  655. if len(depletedRows) == 0 {
  656. return nil
  657. }
  658. depletedEmails := make(map[string]struct{}, len(depletedRows))
  659. for _, r := range depletedRows {
  660. if r.Email == "" {
  661. continue
  662. }
  663. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  664. }
  665. if len(depletedEmails) == 0 {
  666. return nil
  667. }
  668. var inbounds []*model.Inbound
  669. inboundQuery := db.Model(model.Inbound{})
  670. if id >= 0 {
  671. inboundQuery = inboundQuery.Where("id = ?", id)
  672. }
  673. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  674. return err
  675. }
  676. for _, inbound := range inbounds {
  677. var settings map[string]any
  678. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  679. return err
  680. }
  681. rawClients, ok := settings["clients"].([]any)
  682. if !ok {
  683. continue
  684. }
  685. newClients := make([]any, 0, len(rawClients))
  686. removed := 0
  687. for _, client := range rawClients {
  688. c, ok := client.(map[string]any)
  689. if !ok {
  690. newClients = append(newClients, client)
  691. continue
  692. }
  693. email, _ := c["email"].(string)
  694. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  695. removed++
  696. continue
  697. }
  698. newClients = append(newClients, client)
  699. }
  700. if removed == 0 {
  701. continue
  702. }
  703. if len(newClients) == 0 {
  704. s.DelInbound(inbound.Id)
  705. continue
  706. }
  707. settings["clients"] = newClients
  708. ns, mErr := json.MarshalIndent(settings, "", " ")
  709. if mErr != nil {
  710. return mErr
  711. }
  712. inbound.Settings = string(ns)
  713. if err = tx.Save(inbound).Error; err != nil {
  714. return err
  715. }
  716. survivingClients, gcErr := s.GetClients(inbound)
  717. if gcErr != nil {
  718. err = gcErr
  719. return err
  720. }
  721. if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
  722. return err
  723. }
  724. }
  725. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  726. // no out-of-scope inbound still references the email.
  727. if id < 0 {
  728. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  729. return err
  730. }
  731. emails := make([]string, 0, len(depletedEmails))
  732. for e := range depletedEmails {
  733. emails = append(emails, e)
  734. }
  735. var stillReferenced []string
  736. emailExpr := database.JSONFieldText("client.value", "email")
  737. stillQuery := fmt.Sprintf(
  738. "SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
  739. emailExpr,
  740. database.JSONClientsFromInbound(),
  741. emailExpr,
  742. )
  743. if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
  744. return err
  745. }
  746. stillSet := make(map[string]struct{}, len(stillReferenced))
  747. for _, e := range stillReferenced {
  748. stillSet[e] = struct{}{}
  749. }
  750. toDelete := make([]string, 0, len(emails))
  751. for _, e := range emails {
  752. if _, kept := stillSet[e]; !kept {
  753. toDelete = append(toDelete, e)
  754. }
  755. }
  756. if len(toDelete) > 0 {
  757. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  758. return err
  759. }
  760. }
  761. return nil
  762. }
  763. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  764. db := database.GetDB()
  765. var inbounds []*model.Inbound
  766. // Retrieve inbounds where settings contain the given tgId
  767. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  768. if err != nil && err != gorm.ErrRecordNotFound {
  769. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  770. return nil, err
  771. }
  772. var emails []string
  773. for _, inbound := range inbounds {
  774. clients, err := s.GetClients(inbound)
  775. if err != nil {
  776. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  777. continue
  778. }
  779. for _, client := range clients {
  780. if client.TgID == tgId {
  781. emails = append(emails, client.Email)
  782. }
  783. }
  784. }
  785. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  786. // account owns thousands of clients across inbounds.
  787. uniqEmails := uniqueNonEmptyStrings(emails)
  788. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  789. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  790. var page []*xray.ClientTraffic
  791. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  792. if err == gorm.ErrRecordNotFound {
  793. continue
  794. }
  795. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  796. return nil, err
  797. }
  798. traffics = append(traffics, page...)
  799. }
  800. if len(traffics) == 0 {
  801. logger.Warning("No ClientTraffic records found for emails:", emails)
  802. return nil, nil
  803. }
  804. // Populate UUID and other client data for each traffic record
  805. for i := range traffics {
  806. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  807. traffics[i].Enable = client.Enable
  808. traffics[i].UUID = client.ID
  809. traffics[i].SubId = client.SubID
  810. }
  811. }
  812. return traffics, nil
  813. }
  814. // BumpClientsLastOnline sets client_traffics.last_online to now for the given
  815. // emails. Used in online-API mode for clients that hold a live connection but
  816. // moved no bytes this poll — the traffic path (addClientTraffic) only bumps
  817. // last_online on a non-zero delta, so idle-but-connected clients would
  818. // otherwise show a stale "last online" while being reported online.
  819. func (s *InboundService) BumpClientsLastOnline(emails []string) error {
  820. uniq := uniqueNonEmptyStrings(emails)
  821. if len(uniq) == 0 {
  822. return nil
  823. }
  824. now := time.Now().UnixMilli()
  825. return submitTrafficWrite(func() error {
  826. db := database.GetDB()
  827. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  828. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("last_online", now).Error; err != nil {
  829. return err
  830. }
  831. }
  832. return nil
  833. })
  834. }
  835. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  836. uniq := uniqueNonEmptyStrings(emails)
  837. if len(uniq) == 0 {
  838. return nil, nil
  839. }
  840. db := database.GetDB()
  841. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  842. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  843. var page []*xray.ClientTraffic
  844. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  845. return nil, err
  846. }
  847. traffics = append(traffics, page...)
  848. }
  849. return traffics, nil
  850. }
  851. // GetAllClientTraffics returns the full set of client_traffics rows so the
  852. // websocket broadcasters can ship a complete snapshot every cycle. The old
  853. // delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
  854. // the per-client section whenever no client moved bytes in the cycle or a
  855. // node sync failed, leaving client rows in the UI stuck at stale numbers.
  856. func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
  857. db := database.GetDB()
  858. var traffics []*xray.ClientTraffic
  859. if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
  860. return nil, err
  861. }
  862. overlayGlobalTraffic(db, traffics)
  863. return traffics, nil
  864. }
  865. type InboundTrafficSummary struct {
  866. Id int `json:"id"`
  867. Up int64 `json:"up"`
  868. Down int64 `json:"down"`
  869. Total int64 `json:"total"`
  870. Enable bool `json:"enable"`
  871. }
  872. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  873. db := database.GetDB()
  874. var summaries []InboundTrafficSummary
  875. if err := db.Model(&model.Inbound{}).
  876. Select("id, up, down, total, enable").
  877. Find(&summaries).Error; err != nil {
  878. return nil, err
  879. }
  880. return summaries, nil
  881. }
  882. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  883. db := database.GetDB()
  884. var traffics []*xray.ClientTraffic
  885. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error; err != nil {
  886. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  887. return nil, err
  888. }
  889. if len(traffics) == 0 {
  890. return nil, nil
  891. }
  892. overlayGlobalTraffic(db, traffics)
  893. t := traffics[0]
  894. if rec, rErr := s.clientService.GetRecordByEmail(db, email); rErr == nil && rec != nil {
  895. c := rec.ToClient()
  896. t.UUID = c.ID
  897. t.SubId = c.SubID
  898. return t, nil
  899. }
  900. t2, client, err := s.GetClientByEmail(email)
  901. if err != nil {
  902. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  903. return nil, err
  904. }
  905. if t2 != nil && client != nil {
  906. t2.UUID = client.ID
  907. t2.SubId = client.SubID
  908. return t2, nil
  909. }
  910. return nil, nil
  911. }
  912. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  913. return submitTrafficWrite(func() error {
  914. db := database.GetDB()
  915. err := db.Model(xray.ClientTraffic{}).
  916. Where("email = ?", email).
  917. Updates(map[string]any{
  918. "up": upload,
  919. "down": download,
  920. }).Error
  921. if err != nil {
  922. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  923. }
  924. return err
  925. })
  926. }
  927. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  928. db := database.GetDB()
  929. inbound := &model.Inbound{}
  930. traffic = &xray.ClientTraffic{}
  931. // Search for inbound settings that contain the query
  932. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  933. if err != nil {
  934. if err == gorm.ErrRecordNotFound {
  935. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  936. return nil, err
  937. }
  938. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  939. return nil, err
  940. }
  941. traffic.InboundId = inbound.Id
  942. // Unmarshal settings to get clients
  943. settings := map[string][]model.Client{}
  944. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  945. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  946. return nil, err
  947. }
  948. clients := settings["clients"]
  949. for _, client := range clients {
  950. if (client.ID == query || client.Password == query) && client.Email != "" {
  951. traffic.Email = client.Email
  952. break
  953. }
  954. }
  955. if traffic.Email == "" {
  956. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  957. return nil, gorm.ErrRecordNotFound
  958. }
  959. // Retrieve ClientTraffic based on the found email
  960. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  961. if err != nil {
  962. if err == gorm.ErrRecordNotFound {
  963. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  964. return nil, err
  965. }
  966. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  967. return nil, err
  968. }
  969. return traffic, nil
  970. }