inbound.go 105 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868
  1. // Package service provides business logic services for the 3x-ui web panel,
  2. // including inbound/outbound management, user administration, settings, and Xray integration.
  3. package service
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/google/uuid"
  13. "github.com/mhsanaei/3x-ui/v3/database"
  14. "github.com/mhsanaei/3x-ui/v3/database/model"
  15. "github.com/mhsanaei/3x-ui/v3/logger"
  16. "github.com/mhsanaei/3x-ui/v3/util/common"
  17. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  18. "github.com/mhsanaei/3x-ui/v3/xray"
  19. "gorm.io/gorm"
  20. "gorm.io/gorm/clause"
  21. )
  22. type InboundService struct {
  23. xrayApi xray.XrayAPI
  24. }
  25. func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
  26. mgr := runtime.GetManager()
  27. if mgr == nil {
  28. return nil, fmt.Errorf("runtime manager not initialised")
  29. }
  30. return mgr.RuntimeFor(ib.NodeID)
  31. }
  32. type CopyClientsResult struct {
  33. Added []string `json:"added"`
  34. Skipped []string `json:"skipped"`
  35. Errors []string `json:"errors"`
  36. }
  37. // enrichClientStats parses each inbound's clients once, fills in the
  38. // UUID/SubId fields on the preloaded ClientStats, and tops up rows owned by
  39. // a sibling inbound (shared-email mode — the row is keyed on email so it
  40. // only preloads on its owning inbound).
  41. func (s *InboundService) enrichClientStats(db *gorm.DB, inbounds []*model.Inbound) {
  42. if len(inbounds) == 0 {
  43. return
  44. }
  45. clientsByInbound := make([][]model.Client, len(inbounds))
  46. seenByInbound := make([]map[string]struct{}, len(inbounds))
  47. missing := make(map[string]struct{})
  48. for i, inbound := range inbounds {
  49. clients, _ := s.GetClients(inbound)
  50. clientsByInbound[i] = clients
  51. seen := make(map[string]struct{}, len(inbound.ClientStats))
  52. for _, st := range inbound.ClientStats {
  53. if st.Email != "" {
  54. seen[strings.ToLower(st.Email)] = struct{}{}
  55. }
  56. }
  57. seenByInbound[i] = seen
  58. for _, c := range clients {
  59. if c.Email == "" {
  60. continue
  61. }
  62. if _, ok := seen[strings.ToLower(c.Email)]; !ok {
  63. missing[c.Email] = struct{}{}
  64. }
  65. }
  66. }
  67. if len(missing) > 0 {
  68. emails := make([]string, 0, len(missing))
  69. for e := range missing {
  70. emails = append(emails, e)
  71. }
  72. var extra []xray.ClientTraffic
  73. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&extra).Error; err != nil {
  74. logger.Warning("enrichClientStats:", err)
  75. } else {
  76. byEmail := make(map[string]xray.ClientTraffic, len(extra))
  77. for _, st := range extra {
  78. byEmail[strings.ToLower(st.Email)] = st
  79. }
  80. for i, inbound := range inbounds {
  81. for _, c := range clientsByInbound[i] {
  82. if c.Email == "" {
  83. continue
  84. }
  85. key := strings.ToLower(c.Email)
  86. if _, ok := seenByInbound[i][key]; ok {
  87. continue
  88. }
  89. if st, ok := byEmail[key]; ok {
  90. inbound.ClientStats = append(inbound.ClientStats, st)
  91. seenByInbound[i][key] = struct{}{}
  92. }
  93. }
  94. }
  95. }
  96. }
  97. for i, inbound := range inbounds {
  98. clients := clientsByInbound[i]
  99. if len(clients) == 0 || len(inbound.ClientStats) == 0 {
  100. continue
  101. }
  102. cMap := make(map[string]model.Client, len(clients))
  103. for _, c := range clients {
  104. cMap[strings.ToLower(c.Email)] = c
  105. }
  106. for j := range inbound.ClientStats {
  107. email := strings.ToLower(inbound.ClientStats[j].Email)
  108. if c, ok := cMap[email]; ok {
  109. inbound.ClientStats[j].UUID = c.ID
  110. inbound.ClientStats[j].SubId = c.SubID
  111. }
  112. }
  113. }
  114. }
  115. // GetInbounds retrieves all inbounds for a specific user with client stats.
  116. func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
  117. db := database.GetDB()
  118. var inbounds []*model.Inbound
  119. err := db.Model(model.Inbound{}).Preload("ClientStats").Where("user_id = ?", userId).Find(&inbounds).Error
  120. if err != nil && err != gorm.ErrRecordNotFound {
  121. return nil, err
  122. }
  123. s.enrichClientStats(db, inbounds)
  124. return inbounds, nil
  125. }
  126. // GetAllInbounds retrieves all inbounds with client stats.
  127. func (s *InboundService) GetAllInbounds() ([]*model.Inbound, error) {
  128. db := database.GetDB()
  129. var inbounds []*model.Inbound
  130. err := db.Model(model.Inbound{}).Preload("ClientStats").Find(&inbounds).Error
  131. if err != nil && err != gorm.ErrRecordNotFound {
  132. return nil, err
  133. }
  134. s.enrichClientStats(db, inbounds)
  135. return inbounds, nil
  136. }
  137. func (s *InboundService) GetInboundsByTrafficReset(period string) ([]*model.Inbound, error) {
  138. db := database.GetDB()
  139. var inbounds []*model.Inbound
  140. err := db.Model(model.Inbound{}).Where("traffic_reset = ?", period).Find(&inbounds).Error
  141. if err != nil && err != gorm.ErrRecordNotFound {
  142. return nil, err
  143. }
  144. return inbounds, nil
  145. }
  146. func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, error) {
  147. settings := map[string][]model.Client{}
  148. json.Unmarshal([]byte(inbound.Settings), &settings)
  149. if settings == nil {
  150. return nil, fmt.Errorf("setting is null")
  151. }
  152. clients := settings["clients"]
  153. if clients == nil {
  154. return nil, nil
  155. }
  156. return clients, nil
  157. }
  158. func (s *InboundService) getAllEmails() ([]string, error) {
  159. db := database.GetDB()
  160. var emails []string
  161. err := db.Raw(`
  162. SELECT DISTINCT JSON_EXTRACT(client.value, '$.email')
  163. FROM inbounds,
  164. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  165. `).Scan(&emails).Error
  166. if err != nil {
  167. return nil, err
  168. }
  169. return emails, nil
  170. }
  171. // getAllEmailSubIDs returns email→subId. An email seen with two different
  172. // non-empty subIds is locked (mapped to "") so neither identity can claim it.
  173. func (s *InboundService) getAllEmailSubIDs() (map[string]string, error) {
  174. db := database.GetDB()
  175. var rows []struct {
  176. Email string
  177. SubID string
  178. }
  179. err := db.Raw(`
  180. SELECT JSON_EXTRACT(client.value, '$.email') AS email,
  181. JSON_EXTRACT(client.value, '$.subId') AS sub_id
  182. FROM inbounds,
  183. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  184. `).Scan(&rows).Error
  185. if err != nil {
  186. return nil, err
  187. }
  188. result := make(map[string]string, len(rows))
  189. for _, r := range rows {
  190. email := strings.ToLower(strings.Trim(r.Email, "\""))
  191. if email == "" {
  192. continue
  193. }
  194. subID := strings.Trim(r.SubID, "\"")
  195. if existing, ok := result[email]; ok {
  196. if existing != subID {
  197. result[email] = ""
  198. }
  199. continue
  200. }
  201. result[email] = subID
  202. }
  203. return result, nil
  204. }
  205. func lowerAll(in []string) []string {
  206. out := make([]string, len(in))
  207. for i, s := range in {
  208. out[i] = strings.ToLower(s)
  209. }
  210. return out
  211. }
  212. // emailUsedByOtherInbounds reports whether email lives in any inbound other
  213. // than exceptInboundId. Empty email returns false.
  214. func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId int) (bool, error) {
  215. if email == "" {
  216. return false, nil
  217. }
  218. db := database.GetDB()
  219. var count int64
  220. err := db.Raw(`
  221. SELECT COUNT(*)
  222. FROM inbounds,
  223. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  224. WHERE inbounds.id != ?
  225. AND LOWER(JSON_EXTRACT(client.value, '$.email')) = LOWER(?)
  226. `, exceptInboundId, email).Scan(&count).Error
  227. if err != nil {
  228. return false, err
  229. }
  230. return count > 0, nil
  231. }
  232. // checkEmailsExistForClients validates a batch of incoming clients. An email
  233. // collides only when the existing holder has a different (or empty) subId —
  234. // matching non-empty subIds let multiple inbounds share one identity.
  235. func (s *InboundService) checkEmailsExistForClients(clients []model.Client) (string, error) {
  236. emailSubIDs, err := s.getAllEmailSubIDs()
  237. if err != nil {
  238. return "", err
  239. }
  240. seen := make(map[string]string, len(clients))
  241. for _, client := range clients {
  242. if client.Email == "" {
  243. continue
  244. }
  245. key := strings.ToLower(client.Email)
  246. // Within the same payload, the same email must carry the same subId;
  247. // otherwise we would silently merge two distinct identities.
  248. if prev, ok := seen[key]; ok {
  249. if prev != client.SubID || client.SubID == "" {
  250. return client.Email, nil
  251. }
  252. continue
  253. }
  254. seen[key] = client.SubID
  255. if existingSub, ok := emailSubIDs[key]; ok {
  256. if client.SubID == "" || existingSub == "" || existingSub != client.SubID {
  257. return client.Email, nil
  258. }
  259. }
  260. }
  261. return "", nil
  262. }
  263. // AddInbound creates a new inbound configuration.
  264. // It validates port uniqueness, client email uniqueness, and required fields,
  265. // then saves the inbound to the database and optionally adds it to the running Xray instance.
  266. // Returns the created inbound, whether Xray needs restart, and any error.
  267. func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
  268. exist, err := s.checkPortConflict(inbound, 0)
  269. if err != nil {
  270. return inbound, false, err
  271. }
  272. if exist {
  273. return inbound, false, common.NewError("Port already exists:", inbound.Port)
  274. }
  275. inbound.Tag, err = s.resolveInboundTag(inbound, 0)
  276. if err != nil {
  277. return inbound, false, err
  278. }
  279. clients, err := s.GetClients(inbound)
  280. if err != nil {
  281. return inbound, false, err
  282. }
  283. existEmail, err := s.checkEmailsExistForClients(clients)
  284. if err != nil {
  285. return inbound, false, err
  286. }
  287. if existEmail != "" {
  288. return inbound, false, common.NewError("Duplicate email:", existEmail)
  289. }
  290. // Ensure created_at and updated_at on clients in settings
  291. if len(clients) > 0 {
  292. var settings map[string]any
  293. if err2 := json.Unmarshal([]byte(inbound.Settings), &settings); err2 == nil && settings != nil {
  294. now := time.Now().Unix() * 1000
  295. updatedClients := make([]model.Client, 0, len(clients))
  296. for _, c := range clients {
  297. if c.CreatedAt == 0 {
  298. c.CreatedAt = now
  299. }
  300. c.UpdatedAt = now
  301. updatedClients = append(updatedClients, c)
  302. }
  303. settings["clients"] = updatedClients
  304. if bs, err3 := json.MarshalIndent(settings, "", " "); err3 == nil {
  305. inbound.Settings = string(bs)
  306. } else {
  307. logger.Debug("Unable to marshal inbound settings with timestamps:", err3)
  308. }
  309. } else if err2 != nil {
  310. logger.Debug("Unable to parse inbound settings for timestamps:", err2)
  311. }
  312. }
  313. // Secure client ID
  314. for _, client := range clients {
  315. switch inbound.Protocol {
  316. case "trojan":
  317. if client.Password == "" {
  318. return inbound, false, common.NewError("empty client ID")
  319. }
  320. case "shadowsocks":
  321. if client.Email == "" {
  322. return inbound, false, common.NewError("empty client ID")
  323. }
  324. case "hysteria", "hysteria2":
  325. if client.Auth == "" {
  326. return inbound, false, common.NewError("empty client ID")
  327. }
  328. default:
  329. if client.ID == "" {
  330. return inbound, false, common.NewError("empty client ID")
  331. }
  332. }
  333. }
  334. db := database.GetDB()
  335. tx := db.Begin()
  336. defer func() {
  337. if err == nil {
  338. tx.Commit()
  339. } else {
  340. tx.Rollback()
  341. }
  342. }()
  343. err = tx.Save(inbound).Error
  344. if err == nil {
  345. if len(inbound.ClientStats) == 0 {
  346. for _, client := range clients {
  347. s.AddClientStat(tx, inbound.Id, &client)
  348. }
  349. }
  350. } else {
  351. return inbound, false, err
  352. }
  353. needRestart := false
  354. if inbound.Enable {
  355. rt, rterr := s.runtimeFor(inbound)
  356. if rterr != nil {
  357. err = rterr
  358. return inbound, false, err
  359. }
  360. if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil {
  361. logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag)
  362. } else {
  363. logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
  364. if inbound.NodeID != nil {
  365. err = err1
  366. return inbound, false, err
  367. }
  368. needRestart = true
  369. }
  370. }
  371. return inbound, needRestart, err
  372. }
  373. func (s *InboundService) DelInbound(id int) (bool, error) {
  374. db := database.GetDB()
  375. needRestart := false
  376. var ib model.Inbound
  377. loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error
  378. if loadErr == nil {
  379. rt, rterr := s.runtimeFor(&ib)
  380. if rterr != nil {
  381. logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
  382. if ib.NodeID == nil {
  383. needRestart = true
  384. }
  385. } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil {
  386. logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag)
  387. } else {
  388. logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1)
  389. if ib.NodeID == nil {
  390. needRestart = true
  391. }
  392. }
  393. } else {
  394. logger.Debug("No enabled inbound found to remove by api, id:", id)
  395. }
  396. // Delete client traffics of inbounds
  397. err := db.Where("inbound_id = ?", id).Delete(xray.ClientTraffic{}).Error
  398. if err != nil {
  399. return false, err
  400. }
  401. inbound, err := s.GetInbound(id)
  402. if err != nil {
  403. return false, err
  404. }
  405. clients, err := s.GetClients(inbound)
  406. if err != nil {
  407. return false, err
  408. }
  409. // Bulk-delete client IPs for every email in this inbound. The previous
  410. // per-client loop fired one DELETE per row — at 7k+ clients that meant
  411. // thousands of synchronous SQL roundtrips and a multi-second freeze.
  412. // Chunked to stay under SQLite's bind-variable limit on huge inbounds.
  413. if len(clients) > 0 {
  414. emails := make([]string, 0, len(clients))
  415. for i := range clients {
  416. if clients[i].Email != "" {
  417. emails = append(emails, clients[i].Email)
  418. }
  419. }
  420. for _, batch := range chunkStrings(uniqueNonEmptyStrings(emails), sqliteMaxVars) {
  421. if err := db.Where("client_email IN ?", batch).
  422. Delete(model.InboundClientIps{}).Error; err != nil {
  423. return false, err
  424. }
  425. }
  426. }
  427. return needRestart, db.Delete(model.Inbound{}, id).Error
  428. }
  429. func (s *InboundService) GetInbound(id int) (*model.Inbound, error) {
  430. db := database.GetDB()
  431. inbound := &model.Inbound{}
  432. err := db.Model(model.Inbound{}).First(inbound, id).Error
  433. if err != nil {
  434. return nil, err
  435. }
  436. return inbound, nil
  437. }
  438. // SetInboundEnable toggles only the enable flag of an inbound, without
  439. // rewriting the (potentially multi-MB) settings JSON. Used by the UI's
  440. // per-row enable switch — for inbounds with thousands of clients the full
  441. // UpdateInbound path is an order of magnitude too slow for an interactive
  442. // toggle (parses + reserialises every client, runs O(N) traffic diff).
  443. //
  444. // Returns (needRestart, error). needRestart is true when the xray runtime
  445. // could not be re-synced from the cached config and a full restart is
  446. // required to pick up the change.
  447. func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
  448. inbound, err := s.GetInbound(id)
  449. if err != nil {
  450. return false, err
  451. }
  452. if inbound.Enable == enable {
  453. return false, nil
  454. }
  455. db := database.GetDB()
  456. if err := db.Model(model.Inbound{}).Where("id = ?", id).
  457. Update("enable", enable).Error; err != nil {
  458. return false, err
  459. }
  460. inbound.Enable = enable
  461. needRestart := false
  462. rt, rterr := s.runtimeFor(inbound)
  463. if rterr != nil {
  464. if inbound.NodeID != nil {
  465. return false, rterr
  466. }
  467. return true, nil
  468. }
  469. if err := rt.DelInbound(context.Background(), inbound); err != nil &&
  470. !strings.Contains(err.Error(), "not found") {
  471. logger.Debug("SetInboundEnable: DelInbound on", rt.Name(), "failed:", err)
  472. needRestart = true
  473. }
  474. if !enable {
  475. return needRestart, nil
  476. }
  477. addTarget := inbound
  478. if inbound.NodeID == nil {
  479. runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
  480. if err != nil {
  481. logger.Debug("SetInboundEnable: build runtime config failed:", err)
  482. return true, nil
  483. }
  484. addTarget = runtimeInbound
  485. }
  486. if err := rt.AddInbound(context.Background(), addTarget); err != nil {
  487. logger.Debug("SetInboundEnable: AddInbound on", rt.Name(), "failed:", err)
  488. if inbound.NodeID != nil {
  489. return false, err
  490. }
  491. needRestart = true
  492. }
  493. return needRestart, nil
  494. }
  495. func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
  496. exist, err := s.checkPortConflict(inbound, inbound.Id)
  497. if err != nil {
  498. return inbound, false, err
  499. }
  500. if exist {
  501. return inbound, false, common.NewError("Port already exists:", inbound.Port)
  502. }
  503. oldInbound, err := s.GetInbound(inbound.Id)
  504. if err != nil {
  505. return inbound, false, err
  506. }
  507. tag := oldInbound.Tag
  508. db := database.GetDB()
  509. tx := db.Begin()
  510. defer func() {
  511. if err != nil {
  512. tx.Rollback()
  513. } else {
  514. tx.Commit()
  515. }
  516. }()
  517. err = s.updateClientTraffics(tx, oldInbound, inbound)
  518. if err != nil {
  519. return inbound, false, err
  520. }
  521. // Ensure created_at and updated_at exist in inbound.Settings clients
  522. {
  523. var oldSettings map[string]any
  524. _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  525. emailToCreated := map[string]int64{}
  526. emailToUpdated := map[string]int64{}
  527. if oldSettings != nil {
  528. if oc, ok := oldSettings["clients"].([]any); ok {
  529. for _, it := range oc {
  530. if m, ok2 := it.(map[string]any); ok2 {
  531. if email, ok3 := m["email"].(string); ok3 {
  532. switch v := m["created_at"].(type) {
  533. case float64:
  534. emailToCreated[email] = int64(v)
  535. case int64:
  536. emailToCreated[email] = v
  537. }
  538. switch v := m["updated_at"].(type) {
  539. case float64:
  540. emailToUpdated[email] = int64(v)
  541. case int64:
  542. emailToUpdated[email] = v
  543. }
  544. }
  545. }
  546. }
  547. }
  548. }
  549. var newSettings map[string]any
  550. if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil {
  551. now := time.Now().Unix() * 1000
  552. if nSlice, ok := newSettings["clients"].([]any); ok {
  553. for i := range nSlice {
  554. if m, ok2 := nSlice[i].(map[string]any); ok2 {
  555. email, _ := m["email"].(string)
  556. if _, ok3 := m["created_at"]; !ok3 {
  557. if v, ok4 := emailToCreated[email]; ok4 && v > 0 {
  558. m["created_at"] = v
  559. } else {
  560. m["created_at"] = now
  561. }
  562. }
  563. // Preserve client's updated_at if present; do not bump on parent inbound update
  564. if _, hasUpdated := m["updated_at"]; !hasUpdated {
  565. if v, ok4 := emailToUpdated[email]; ok4 && v > 0 {
  566. m["updated_at"] = v
  567. }
  568. }
  569. nSlice[i] = m
  570. }
  571. }
  572. newSettings["clients"] = nSlice
  573. if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil {
  574. inbound.Settings = string(bs)
  575. }
  576. }
  577. }
  578. }
  579. oldInbound.Total = inbound.Total
  580. oldInbound.Remark = inbound.Remark
  581. oldInbound.Enable = inbound.Enable
  582. oldInbound.ExpiryTime = inbound.ExpiryTime
  583. oldInbound.TrafficReset = inbound.TrafficReset
  584. oldInbound.Listen = inbound.Listen
  585. oldInbound.Port = inbound.Port
  586. oldInbound.Protocol = inbound.Protocol
  587. oldInbound.Settings = inbound.Settings
  588. oldInbound.StreamSettings = inbound.StreamSettings
  589. oldInbound.Sniffing = inbound.Sniffing
  590. oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id)
  591. if err != nil {
  592. return inbound, false, err
  593. }
  594. needRestart := false
  595. rt, rterr := s.runtimeFor(oldInbound)
  596. if rterr != nil {
  597. if oldInbound.NodeID != nil {
  598. err = rterr
  599. return inbound, false, err
  600. }
  601. needRestart = true
  602. } else {
  603. oldSnapshot := *oldInbound
  604. oldSnapshot.Tag = tag
  605. if oldInbound.NodeID == nil {
  606. if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
  607. logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
  608. }
  609. if inbound.Enable {
  610. runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound)
  611. if err2 != nil {
  612. logger.Debug("Unable to prepare runtime inbound config:", err2)
  613. needRestart = true
  614. } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil {
  615. logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag)
  616. } else {
  617. logger.Debug("Unable to update inbound on", rt.Name(), ":", err2)
  618. needRestart = true
  619. }
  620. }
  621. } else {
  622. if !inbound.Enable {
  623. if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
  624. err = err2
  625. return inbound, false, err
  626. }
  627. } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
  628. err = err2
  629. return inbound, false, err
  630. }
  631. }
  632. }
  633. return inbound, needRestart, tx.Save(oldInbound).Error
  634. }
  635. func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) {
  636. if inbound == nil {
  637. return nil, fmt.Errorf("inbound is nil")
  638. }
  639. runtimeInbound := *inbound
  640. settings := map[string]any{}
  641. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  642. return nil, err
  643. }
  644. clients, ok := settings["clients"].([]any)
  645. if !ok {
  646. return &runtimeInbound, nil
  647. }
  648. var clientStats []xray.ClientTraffic
  649. err := tx.Model(xray.ClientTraffic{}).
  650. Where("inbound_id = ?", inbound.Id).
  651. Select("email", "enable").
  652. Find(&clientStats).Error
  653. if err != nil {
  654. return nil, err
  655. }
  656. enableMap := make(map[string]bool, len(clientStats))
  657. for _, clientTraffic := range clientStats {
  658. enableMap[clientTraffic.Email] = clientTraffic.Enable
  659. }
  660. finalClients := make([]any, 0, len(clients))
  661. for _, client := range clients {
  662. c, ok := client.(map[string]any)
  663. if !ok {
  664. continue
  665. }
  666. email, _ := c["email"].(string)
  667. if enable, exists := enableMap[email]; exists && !enable {
  668. continue
  669. }
  670. if manualEnable, ok := c["enable"].(bool); ok && !manualEnable {
  671. continue
  672. }
  673. finalClients = append(finalClients, c)
  674. }
  675. settings["clients"] = finalClients
  676. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  677. if err != nil {
  678. return nil, err
  679. }
  680. runtimeInbound.Settings = string(modifiedSettings)
  681. return &runtimeInbound, nil
  682. }
  683. // updateClientTraffics syncs the ClientTraffic rows with the inbound's clients
  684. // list: removes rows for emails that disappeared, inserts rows for newly-added
  685. // emails. Uses sets for O(N) lookup — the previous nested-loop implementation
  686. // was O(N²) and degraded into multi-second pauses on inbounds with thousands
  687. // of clients (toggling, saving, or deleting any such inbound felt frozen).
  688. func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error {
  689. oldClients, err := s.GetClients(oldInbound)
  690. if err != nil {
  691. return err
  692. }
  693. newClients, err := s.GetClients(newInbound)
  694. if err != nil {
  695. return err
  696. }
  697. // Email is the unique key for ClientTraffic rows. Clients without an
  698. // email have no stats row to sync — skip them on both sides instead of
  699. // risking a unique-constraint hit or accidental delete of an unrelated row.
  700. oldEmails := make(map[string]struct{}, len(oldClients))
  701. for i := range oldClients {
  702. if oldClients[i].Email == "" {
  703. continue
  704. }
  705. oldEmails[oldClients[i].Email] = struct{}{}
  706. }
  707. newEmails := make(map[string]struct{}, len(newClients))
  708. for i := range newClients {
  709. if newClients[i].Email == "" {
  710. continue
  711. }
  712. newEmails[newClients[i].Email] = struct{}{}
  713. }
  714. // Drop stats rows for removed emails — but not when a sibling inbound
  715. // still references the email, since the row is the shared accumulator.
  716. for i := range oldClients {
  717. email := oldClients[i].Email
  718. if email == "" {
  719. continue
  720. }
  721. if _, kept := newEmails[email]; kept {
  722. continue
  723. }
  724. stillUsed, err := s.emailUsedByOtherInbounds(email, oldInbound.Id)
  725. if err != nil {
  726. return err
  727. }
  728. if stillUsed {
  729. continue
  730. }
  731. if err := s.DelClientStat(tx, email); err != nil {
  732. return err
  733. }
  734. }
  735. for i := range newClients {
  736. email := newClients[i].Email
  737. if email == "" {
  738. continue
  739. }
  740. if _, existed := oldEmails[email]; existed {
  741. if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil {
  742. return err
  743. }
  744. continue
  745. }
  746. if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
  747. return err
  748. }
  749. }
  750. return nil
  751. }
  752. func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
  753. clients, err := s.GetClients(data)
  754. if err != nil {
  755. return false, err
  756. }
  757. var settings map[string]any
  758. err = json.Unmarshal([]byte(data.Settings), &settings)
  759. if err != nil {
  760. return false, err
  761. }
  762. interfaceClients := settings["clients"].([]any)
  763. // Add timestamps for new clients being appended
  764. nowTs := time.Now().Unix() * 1000
  765. for i := range interfaceClients {
  766. if cm, ok := interfaceClients[i].(map[string]any); ok {
  767. if _, ok2 := cm["created_at"]; !ok2 {
  768. cm["created_at"] = nowTs
  769. }
  770. cm["updated_at"] = nowTs
  771. interfaceClients[i] = cm
  772. }
  773. }
  774. existEmail, err := s.checkEmailsExistForClients(clients)
  775. if err != nil {
  776. return false, err
  777. }
  778. if existEmail != "" {
  779. return false, common.NewError("Duplicate email:", existEmail)
  780. }
  781. oldInbound, err := s.GetInbound(data.Id)
  782. if err != nil {
  783. return false, err
  784. }
  785. // Secure client ID
  786. for _, client := range clients {
  787. switch oldInbound.Protocol {
  788. case "trojan":
  789. if client.Password == "" {
  790. return false, common.NewError("empty client ID")
  791. }
  792. case "shadowsocks":
  793. if client.Email == "" {
  794. return false, common.NewError("empty client ID")
  795. }
  796. case "hysteria", "hysteria2":
  797. if client.Auth == "" {
  798. return false, common.NewError("empty client ID")
  799. }
  800. default:
  801. if client.ID == "" {
  802. return false, common.NewError("empty client ID")
  803. }
  804. }
  805. }
  806. var oldSettings map[string]any
  807. err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  808. if err != nil {
  809. return false, err
  810. }
  811. oldClients := oldSettings["clients"].([]any)
  812. oldClients = append(oldClients, interfaceClients...)
  813. oldSettings["clients"] = oldClients
  814. newSettings, err := json.MarshalIndent(oldSettings, "", " ")
  815. if err != nil {
  816. return false, err
  817. }
  818. oldInbound.Settings = string(newSettings)
  819. db := database.GetDB()
  820. tx := db.Begin()
  821. defer func() {
  822. if err != nil {
  823. tx.Rollback()
  824. } else {
  825. tx.Commit()
  826. }
  827. }()
  828. needRestart := false
  829. rt, rterr := s.runtimeFor(oldInbound)
  830. if rterr != nil {
  831. if oldInbound.NodeID != nil {
  832. err = rterr
  833. return false, err
  834. }
  835. needRestart = true
  836. } else if oldInbound.NodeID == nil {
  837. for _, client := range clients {
  838. if len(client.Email) == 0 {
  839. needRestart = true
  840. continue
  841. }
  842. s.AddClientStat(tx, data.Id, &client)
  843. if !client.Enable {
  844. continue
  845. }
  846. cipher := ""
  847. if oldInbound.Protocol == "shadowsocks" {
  848. cipher = oldSettings["method"].(string)
  849. }
  850. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  851. "email": client.Email,
  852. "id": client.ID,
  853. "auth": client.Auth,
  854. "security": client.Security,
  855. "flow": client.Flow,
  856. "password": client.Password,
  857. "cipher": cipher,
  858. })
  859. if err1 == nil {
  860. logger.Debug("Client added on", rt.Name(), ":", client.Email)
  861. } else {
  862. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  863. needRestart = true
  864. }
  865. }
  866. } else {
  867. for _, client := range clients {
  868. if len(client.Email) > 0 {
  869. s.AddClientStat(tx, data.Id, &client)
  870. }
  871. }
  872. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  873. err = err1
  874. return false, err
  875. }
  876. }
  877. return needRestart, tx.Save(oldInbound).Error
  878. }
  879. func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string {
  880. switch protocol {
  881. case model.Trojan:
  882. return client.Password
  883. case model.Shadowsocks:
  884. return client.Email
  885. case model.Hysteria:
  886. return client.Auth
  887. default:
  888. return client.ID
  889. }
  890. }
  891. func (s *InboundService) writeBackClientSubID(sourceInboundID int, sourceProtocol model.Protocol, client model.Client, subID string) (bool, error) {
  892. client.SubID = subID
  893. client.UpdatedAt = time.Now().UnixMilli()
  894. clientID := s.getClientPrimaryKey(sourceProtocol, client)
  895. if clientID == "" {
  896. return false, common.NewError("empty client ID")
  897. }
  898. settingsBytes, err := json.Marshal(map[string][]model.Client{
  899. "clients": {client},
  900. })
  901. if err != nil {
  902. return false, err
  903. }
  904. updatePayload := &model.Inbound{
  905. Id: sourceInboundID,
  906. Settings: string(settingsBytes),
  907. }
  908. return s.UpdateInboundClient(updatePayload, clientID)
  909. }
  910. func (s *InboundService) generateRandomCredential(targetProtocol model.Protocol) string {
  911. switch targetProtocol {
  912. case model.VMESS, model.VLESS:
  913. return uuid.NewString()
  914. default:
  915. return strings.ReplaceAll(uuid.NewString(), "-", "")
  916. }
  917. }
  918. func (s *InboundService) buildTargetClientFromSource(source model.Client, targetProtocol model.Protocol, email string, flow string) (model.Client, error) {
  919. nowTs := time.Now().UnixMilli()
  920. target := source
  921. target.Email = email
  922. target.CreatedAt = nowTs
  923. target.UpdatedAt = nowTs
  924. target.ID = ""
  925. target.Password = ""
  926. target.Auth = ""
  927. target.Flow = ""
  928. switch targetProtocol {
  929. case model.VMESS:
  930. target.ID = s.generateRandomCredential(targetProtocol)
  931. case model.VLESS:
  932. target.ID = s.generateRandomCredential(targetProtocol)
  933. if flow == "xtls-rprx-vision" || flow == "xtls-rprx-vision-udp443" {
  934. target.Flow = flow
  935. }
  936. case model.Trojan, model.Shadowsocks:
  937. target.Password = s.generateRandomCredential(targetProtocol)
  938. case model.Hysteria:
  939. target.Auth = s.generateRandomCredential(targetProtocol)
  940. default:
  941. target.ID = s.generateRandomCredential(targetProtocol)
  942. }
  943. return target, nil
  944. }
  945. func (s *InboundService) nextAvailableCopiedEmail(originalEmail string, targetID int, occupied map[string]struct{}) string {
  946. base := fmt.Sprintf("%s_%d", originalEmail, targetID)
  947. candidate := base
  948. suffix := 0
  949. for {
  950. if _, exists := occupied[strings.ToLower(candidate)]; !exists {
  951. occupied[strings.ToLower(candidate)] = struct{}{}
  952. return candidate
  953. }
  954. suffix++
  955. candidate = fmt.Sprintf("%s_%d", base, suffix)
  956. }
  957. }
  958. func (s *InboundService) CopyInboundClients(targetInboundID int, sourceInboundID int, clientEmails []string, flow string) (*CopyClientsResult, bool, error) {
  959. result := &CopyClientsResult{
  960. Added: []string{},
  961. Skipped: []string{},
  962. Errors: []string{},
  963. }
  964. if targetInboundID == sourceInboundID {
  965. return result, false, common.NewError("source and target inbounds must be different")
  966. }
  967. targetInbound, err := s.GetInbound(targetInboundID)
  968. if err != nil {
  969. return result, false, err
  970. }
  971. sourceInbound, err := s.GetInbound(sourceInboundID)
  972. if err != nil {
  973. return result, false, err
  974. }
  975. sourceClients, err := s.GetClients(sourceInbound)
  976. if err != nil {
  977. return result, false, err
  978. }
  979. if len(sourceClients) == 0 {
  980. return result, false, nil
  981. }
  982. allowedEmails := map[string]struct{}{}
  983. if len(clientEmails) > 0 {
  984. for _, email := range clientEmails {
  985. allowedEmails[strings.ToLower(strings.TrimSpace(email))] = struct{}{}
  986. }
  987. }
  988. occupiedEmails := map[string]struct{}{}
  989. allEmails, err := s.getAllEmails()
  990. if err != nil {
  991. return result, false, err
  992. }
  993. for _, email := range allEmails {
  994. clean := strings.Trim(email, "\"")
  995. if clean != "" {
  996. occupiedEmails[strings.ToLower(clean)] = struct{}{}
  997. }
  998. }
  999. newClients := make([]model.Client, 0)
  1000. needRestart := false
  1001. for _, sourceClient := range sourceClients {
  1002. originalEmail := strings.TrimSpace(sourceClient.Email)
  1003. if originalEmail == "" {
  1004. continue
  1005. }
  1006. if len(allowedEmails) > 0 {
  1007. if _, ok := allowedEmails[strings.ToLower(originalEmail)]; !ok {
  1008. continue
  1009. }
  1010. }
  1011. if sourceClient.SubID == "" {
  1012. newSubID := uuid.NewString()
  1013. subNeedRestart, subErr := s.writeBackClientSubID(sourceInbound.Id, sourceInbound.Protocol, sourceClient, newSubID)
  1014. if subErr != nil {
  1015. result.Errors = append(result.Errors, fmt.Sprintf("%s: failed to write source subId: %v", originalEmail, subErr))
  1016. continue
  1017. }
  1018. if subNeedRestart {
  1019. needRestart = true
  1020. }
  1021. sourceClient.SubID = newSubID
  1022. }
  1023. targetEmail := s.nextAvailableCopiedEmail(originalEmail, targetInboundID, occupiedEmails)
  1024. targetClient, buildErr := s.buildTargetClientFromSource(sourceClient, targetInbound.Protocol, targetEmail, flow)
  1025. if buildErr != nil {
  1026. result.Errors = append(result.Errors, fmt.Sprintf("%s: %v", originalEmail, buildErr))
  1027. continue
  1028. }
  1029. newClients = append(newClients, targetClient)
  1030. result.Added = append(result.Added, targetEmail)
  1031. }
  1032. if len(newClients) == 0 {
  1033. return result, needRestart, nil
  1034. }
  1035. settingsPayload, err := json.Marshal(map[string][]model.Client{
  1036. "clients": newClients,
  1037. })
  1038. if err != nil {
  1039. return result, needRestart, err
  1040. }
  1041. addNeedRestart, err := s.AddInboundClient(&model.Inbound{
  1042. Id: targetInboundID,
  1043. Settings: string(settingsPayload),
  1044. })
  1045. if err != nil {
  1046. return result, needRestart, err
  1047. }
  1048. if addNeedRestart {
  1049. needRestart = true
  1050. }
  1051. return result, needRestart, nil
  1052. }
  1053. func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool, error) {
  1054. oldInbound, err := s.GetInbound(inboundId)
  1055. if err != nil {
  1056. logger.Error("Load Old Data Error")
  1057. return false, err
  1058. }
  1059. var settings map[string]any
  1060. err = json.Unmarshal([]byte(oldInbound.Settings), &settings)
  1061. if err != nil {
  1062. return false, err
  1063. }
  1064. email := ""
  1065. client_key := "id"
  1066. switch oldInbound.Protocol {
  1067. case "trojan":
  1068. client_key = "password"
  1069. case "shadowsocks":
  1070. client_key = "email"
  1071. case "hysteria", "hysteria2":
  1072. client_key = "auth"
  1073. }
  1074. interfaceClients := settings["clients"].([]any)
  1075. var newClients []any
  1076. needApiDel := false
  1077. clientFound := false
  1078. for _, client := range interfaceClients {
  1079. c := client.(map[string]any)
  1080. c_id := c[client_key].(string)
  1081. if c_id == clientId {
  1082. clientFound = true
  1083. email, _ = c["email"].(string)
  1084. needApiDel, _ = c["enable"].(bool)
  1085. } else {
  1086. newClients = append(newClients, client)
  1087. }
  1088. }
  1089. if !clientFound {
  1090. return false, common.NewError("Client Not Found In Inbound For ID:", clientId)
  1091. }
  1092. if len(newClients) == 0 {
  1093. return false, common.NewError("no client remained in Inbound")
  1094. }
  1095. settings["clients"] = newClients
  1096. newSettings, err := json.MarshalIndent(settings, "", " ")
  1097. if err != nil {
  1098. return false, err
  1099. }
  1100. oldInbound.Settings = string(newSettings)
  1101. db := database.GetDB()
  1102. // Keep the client_traffics row and IPs alive when another inbound still
  1103. // references this email — siblings depend on the shared accounting state.
  1104. emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
  1105. if err != nil {
  1106. return false, err
  1107. }
  1108. if !emailShared {
  1109. err = s.DelClientIPs(db, email)
  1110. if err != nil {
  1111. logger.Error("Error in delete client IPs")
  1112. return false, err
  1113. }
  1114. }
  1115. needRestart := false
  1116. if len(email) > 0 {
  1117. notDepleted := true
  1118. err = db.Model(xray.ClientTraffic{}).Select("enable").Where("email = ?", email).First(&notDepleted).Error
  1119. if err != nil {
  1120. logger.Error("Get stats error")
  1121. return false, err
  1122. }
  1123. if !emailShared {
  1124. err = s.DelClientStat(db, email)
  1125. if err != nil {
  1126. logger.Error("Delete stats Data Error")
  1127. return false, err
  1128. }
  1129. }
  1130. if needApiDel && notDepleted {
  1131. rt, rterr := s.runtimeFor(oldInbound)
  1132. if rterr != nil {
  1133. if oldInbound.NodeID != nil {
  1134. return false, rterr
  1135. }
  1136. needRestart = true
  1137. } else if oldInbound.NodeID == nil {
  1138. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  1139. if err1 == nil {
  1140. logger.Debug("Client deleted on", rt.Name(), ":", email)
  1141. needRestart = false
  1142. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  1143. logger.Debug("User is already deleted. Nothing to do more...")
  1144. } else {
  1145. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  1146. needRestart = true
  1147. }
  1148. } else {
  1149. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  1150. return false, err1
  1151. }
  1152. }
  1153. }
  1154. }
  1155. return needRestart, db.Save(oldInbound).Error
  1156. }
  1157. func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) {
  1158. // TODO: check if TrafficReset field is updating
  1159. clients, err := s.GetClients(data)
  1160. if err != nil {
  1161. return false, err
  1162. }
  1163. var settings map[string]any
  1164. err = json.Unmarshal([]byte(data.Settings), &settings)
  1165. if err != nil {
  1166. return false, err
  1167. }
  1168. interfaceClients := settings["clients"].([]any)
  1169. oldInbound, err := s.GetInbound(data.Id)
  1170. if err != nil {
  1171. return false, err
  1172. }
  1173. oldClients, err := s.GetClients(oldInbound)
  1174. if err != nil {
  1175. return false, err
  1176. }
  1177. oldEmail := ""
  1178. newClientId := ""
  1179. clientIndex := -1
  1180. for index, oldClient := range oldClients {
  1181. oldClientId := ""
  1182. switch oldInbound.Protocol {
  1183. case "trojan":
  1184. oldClientId = oldClient.Password
  1185. newClientId = clients[0].Password
  1186. case "shadowsocks":
  1187. oldClientId = oldClient.Email
  1188. newClientId = clients[0].Email
  1189. case "hysteria", "hysteria2":
  1190. oldClientId = oldClient.Auth
  1191. newClientId = clients[0].Auth
  1192. default:
  1193. oldClientId = oldClient.ID
  1194. newClientId = clients[0].ID
  1195. }
  1196. if clientId == oldClientId {
  1197. oldEmail = oldClient.Email
  1198. clientIndex = index
  1199. break
  1200. }
  1201. }
  1202. // Validate new client ID
  1203. if newClientId == "" || clientIndex == -1 {
  1204. return false, common.NewError("empty client ID")
  1205. }
  1206. if len(clients[0].Email) > 0 && clients[0].Email != oldEmail {
  1207. existEmail, err := s.checkEmailsExistForClients(clients)
  1208. if err != nil {
  1209. return false, err
  1210. }
  1211. if existEmail != "" {
  1212. return false, common.NewError("Duplicate email:", existEmail)
  1213. }
  1214. }
  1215. var oldSettings map[string]any
  1216. err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  1217. if err != nil {
  1218. return false, err
  1219. }
  1220. settingsClients := oldSettings["clients"].([]any)
  1221. // Preserve created_at and set updated_at for the replacing client
  1222. var preservedCreated any
  1223. if clientIndex >= 0 && clientIndex < len(settingsClients) {
  1224. if oldMap, ok := settingsClients[clientIndex].(map[string]any); ok {
  1225. if v, ok2 := oldMap["created_at"]; ok2 {
  1226. preservedCreated = v
  1227. }
  1228. }
  1229. }
  1230. if len(interfaceClients) > 0 {
  1231. if newMap, ok := interfaceClients[0].(map[string]any); ok {
  1232. if preservedCreated == nil {
  1233. preservedCreated = time.Now().Unix() * 1000
  1234. }
  1235. newMap["created_at"] = preservedCreated
  1236. newMap["updated_at"] = time.Now().Unix() * 1000
  1237. interfaceClients[0] = newMap
  1238. }
  1239. }
  1240. settingsClients[clientIndex] = interfaceClients[0]
  1241. oldSettings["clients"] = settingsClients
  1242. // testseed is only meaningful when at least one VLESS client uses the exact
  1243. // xtls-rprx-vision flow. The client-edit path only rewrites a single client,
  1244. // so re-check the flow set here and strip a stale testseed when nothing in the
  1245. // inbound still warrants it. The full-inbound update path already handles this
  1246. // on the JS side via VLESSSettings.toJson().
  1247. if oldInbound.Protocol == model.VLESS {
  1248. hasVisionFlow := false
  1249. for _, c := range settingsClients {
  1250. cm, ok := c.(map[string]any)
  1251. if !ok {
  1252. continue
  1253. }
  1254. if flow, _ := cm["flow"].(string); flow == "xtls-rprx-vision" {
  1255. hasVisionFlow = true
  1256. break
  1257. }
  1258. }
  1259. if !hasVisionFlow {
  1260. delete(oldSettings, "testseed")
  1261. }
  1262. }
  1263. newSettings, err := json.MarshalIndent(oldSettings, "", " ")
  1264. if err != nil {
  1265. return false, err
  1266. }
  1267. oldInbound.Settings = string(newSettings)
  1268. db := database.GetDB()
  1269. tx := db.Begin()
  1270. defer func() {
  1271. if err != nil {
  1272. tx.Rollback()
  1273. } else {
  1274. tx.Commit()
  1275. }
  1276. }()
  1277. if len(clients[0].Email) > 0 {
  1278. if len(oldEmail) > 0 {
  1279. // Repointing onto an email that already has a row would collide on
  1280. // the unique constraint, so retire the donor and let the surviving
  1281. // row carry the merged identity.
  1282. emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email)
  1283. targetExists := int64(0)
  1284. if !emailUnchanged {
  1285. if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil {
  1286. return false, err
  1287. }
  1288. }
  1289. if emailUnchanged || targetExists == 0 {
  1290. err = s.UpdateClientStat(tx, oldEmail, &clients[0])
  1291. if err != nil {
  1292. return false, err
  1293. }
  1294. err = s.UpdateClientIPs(tx, oldEmail, clients[0].Email)
  1295. if err != nil {
  1296. return false, err
  1297. }
  1298. } else {
  1299. stillUsed, sErr := s.emailUsedByOtherInbounds(oldEmail, data.Id)
  1300. if sErr != nil {
  1301. return false, sErr
  1302. }
  1303. if !stillUsed {
  1304. if err = s.DelClientStat(tx, oldEmail); err != nil {
  1305. return false, err
  1306. }
  1307. if err = s.DelClientIPs(tx, oldEmail); err != nil {
  1308. return false, err
  1309. }
  1310. }
  1311. // Refresh the surviving row with the new client's limits/expiry.
  1312. if err = s.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil {
  1313. return false, err
  1314. }
  1315. }
  1316. } else {
  1317. s.AddClientStat(tx, data.Id, &clients[0])
  1318. }
  1319. } else {
  1320. stillUsed, err := s.emailUsedByOtherInbounds(oldEmail, data.Id)
  1321. if err != nil {
  1322. return false, err
  1323. }
  1324. if !stillUsed {
  1325. err = s.DelClientStat(tx, oldEmail)
  1326. if err != nil {
  1327. return false, err
  1328. }
  1329. err = s.DelClientIPs(tx, oldEmail)
  1330. if err != nil {
  1331. return false, err
  1332. }
  1333. }
  1334. }
  1335. needRestart := false
  1336. if len(oldEmail) > 0 {
  1337. rt, rterr := s.runtimeFor(oldInbound)
  1338. if rterr != nil {
  1339. if oldInbound.NodeID != nil {
  1340. err = rterr
  1341. return false, err
  1342. }
  1343. needRestart = true
  1344. } else if oldInbound.NodeID == nil {
  1345. if oldClients[clientIndex].Enable {
  1346. err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
  1347. if err1 == nil {
  1348. logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail)
  1349. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) {
  1350. logger.Debug("User is already deleted. Nothing to do more...")
  1351. } else {
  1352. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  1353. needRestart = true
  1354. }
  1355. }
  1356. if clients[0].Enable {
  1357. cipher := ""
  1358. if oldInbound.Protocol == "shadowsocks" {
  1359. cipher = oldSettings["method"].(string)
  1360. }
  1361. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  1362. "email": clients[0].Email,
  1363. "id": clients[0].ID,
  1364. "security": clients[0].Security,
  1365. "flow": clients[0].Flow,
  1366. "auth": clients[0].Auth,
  1367. "password": clients[0].Password,
  1368. "cipher": cipher,
  1369. })
  1370. if err1 == nil {
  1371. logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email)
  1372. } else {
  1373. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  1374. needRestart = true
  1375. }
  1376. }
  1377. } else {
  1378. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  1379. err = err1
  1380. return false, err
  1381. }
  1382. }
  1383. } else {
  1384. logger.Debug("Client old email not found")
  1385. needRestart = true
  1386. }
  1387. return needRestart, tx.Save(oldInbound).Error
  1388. }
  1389. const resetGracePeriodMs int64 = 30000
  1390. func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
  1391. var structuralChange bool
  1392. err := submitTrafficWrite(func() error {
  1393. var inner error
  1394. structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
  1395. return inner
  1396. })
  1397. return structuralChange, err
  1398. }
  1399. func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
  1400. if snap == nil || nodeID <= 0 {
  1401. return false, nil
  1402. }
  1403. db := database.GetDB()
  1404. now := time.Now().UnixMilli()
  1405. var central []model.Inbound
  1406. if err := db.Model(model.Inbound{}).
  1407. Where("node_id = ?", nodeID).
  1408. Find(&central).Error; err != nil {
  1409. return false, err
  1410. }
  1411. tagToCentral := make(map[string]*model.Inbound, len(central))
  1412. for i := range central {
  1413. tagToCentral[central[i].Tag] = &central[i]
  1414. }
  1415. var centralClientStats []xray.ClientTraffic
  1416. if len(central) > 0 {
  1417. ids := make([]int, 0, len(central))
  1418. for i := range central {
  1419. ids = append(ids, central[i].Id)
  1420. }
  1421. if err := db.Model(xray.ClientTraffic{}).
  1422. Where("inbound_id IN ?", ids).
  1423. Find(&centralClientStats).Error; err != nil {
  1424. return false, err
  1425. }
  1426. }
  1427. type csKey struct {
  1428. inboundID int
  1429. email string
  1430. }
  1431. centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
  1432. for i := range centralClientStats {
  1433. centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
  1434. }
  1435. var defaultUserId int
  1436. if len(central) > 0 {
  1437. defaultUserId = central[0].UserId
  1438. } else {
  1439. var u model.User
  1440. if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
  1441. defaultUserId = u.Id
  1442. } else {
  1443. defaultUserId = 1
  1444. }
  1445. }
  1446. tx := db.Begin()
  1447. committed := false
  1448. defer func() {
  1449. if !committed {
  1450. tx.Rollback()
  1451. }
  1452. }()
  1453. structuralChange := false
  1454. snapTags := make(map[string]struct{}, len(snap.Inbounds))
  1455. for _, snapIb := range snap.Inbounds {
  1456. if snapIb == nil {
  1457. continue
  1458. }
  1459. snapTags[snapIb.Tag] = struct{}{}
  1460. c, ok := tagToCentral[snapIb.Tag]
  1461. if !ok {
  1462. newIb := model.Inbound{
  1463. UserId: defaultUserId,
  1464. NodeID: &nodeID,
  1465. Tag: snapIb.Tag,
  1466. Listen: snapIb.Listen,
  1467. Port: snapIb.Port,
  1468. Protocol: snapIb.Protocol,
  1469. Settings: snapIb.Settings,
  1470. StreamSettings: snapIb.StreamSettings,
  1471. Sniffing: snapIb.Sniffing,
  1472. TrafficReset: snapIb.TrafficReset,
  1473. Enable: snapIb.Enable,
  1474. Remark: snapIb.Remark,
  1475. Total: snapIb.Total,
  1476. ExpiryTime: snapIb.ExpiryTime,
  1477. Up: snapIb.Up,
  1478. Down: snapIb.Down,
  1479. AllTime: snapIb.AllTime,
  1480. }
  1481. if err := tx.Create(&newIb).Error; err != nil {
  1482. logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err)
  1483. continue
  1484. }
  1485. tagToCentral[snapIb.Tag] = &newIb
  1486. structuralChange = true
  1487. continue
  1488. }
  1489. inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
  1490. updates := map[string]any{
  1491. "enable": snapIb.Enable,
  1492. "remark": snapIb.Remark,
  1493. "listen": snapIb.Listen,
  1494. "port": snapIb.Port,
  1495. "protocol": snapIb.Protocol,
  1496. "total": snapIb.Total,
  1497. "expiry_time": snapIb.ExpiryTime,
  1498. "settings": snapIb.Settings,
  1499. "stream_settings": snapIb.StreamSettings,
  1500. "sniffing": snapIb.Sniffing,
  1501. "traffic_reset": snapIb.TrafficReset,
  1502. }
  1503. if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
  1504. updates["up"] = snapIb.Up
  1505. updates["down"] = snapIb.Down
  1506. }
  1507. if snapIb.AllTime > c.AllTime {
  1508. updates["all_time"] = snapIb.AllTime
  1509. }
  1510. if c.Settings != snapIb.Settings ||
  1511. c.Remark != snapIb.Remark ||
  1512. c.Listen != snapIb.Listen ||
  1513. c.Port != snapIb.Port ||
  1514. c.Total != snapIb.Total ||
  1515. c.ExpiryTime != snapIb.ExpiryTime ||
  1516. c.Enable != snapIb.Enable {
  1517. structuralChange = true
  1518. }
  1519. if err := tx.Model(model.Inbound{}).
  1520. Where("id = ?", c.Id).
  1521. Updates(updates).Error; err != nil {
  1522. return false, err
  1523. }
  1524. }
  1525. for _, c := range central {
  1526. if _, kept := snapTags[c.Tag]; kept {
  1527. continue
  1528. }
  1529. if err := tx.Where("inbound_id = ?", c.Id).
  1530. Delete(&xray.ClientTraffic{}).Error; err != nil {
  1531. return false, err
  1532. }
  1533. if err := tx.Where("id = ?", c.Id).
  1534. Delete(&model.Inbound{}).Error; err != nil {
  1535. return false, err
  1536. }
  1537. delete(tagToCentral, c.Tag)
  1538. structuralChange = true
  1539. }
  1540. for _, snapIb := range snap.Inbounds {
  1541. if snapIb == nil {
  1542. continue
  1543. }
  1544. c, ok := tagToCentral[snapIb.Tag]
  1545. if !ok {
  1546. continue
  1547. }
  1548. inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
  1549. snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
  1550. for _, cs := range snapIb.ClientStats {
  1551. snapEmails[cs.Email] = struct{}{}
  1552. existing := centralCS[csKey{c.Id, cs.Email}]
  1553. if existing == nil {
  1554. if err := tx.Create(&xray.ClientTraffic{
  1555. InboundId: c.Id,
  1556. Email: cs.Email,
  1557. Enable: cs.Enable,
  1558. Total: cs.Total,
  1559. ExpiryTime: cs.ExpiryTime,
  1560. Reset: cs.Reset,
  1561. Up: cs.Up,
  1562. Down: cs.Down,
  1563. AllTime: cs.AllTime,
  1564. LastOnline: cs.LastOnline,
  1565. }).Error; err != nil {
  1566. return false, err
  1567. }
  1568. structuralChange = true
  1569. continue
  1570. }
  1571. if existing.Enable != cs.Enable ||
  1572. existing.Total != cs.Total ||
  1573. existing.ExpiryTime != cs.ExpiryTime ||
  1574. existing.Reset != cs.Reset {
  1575. structuralChange = true
  1576. }
  1577. allTime := existing.AllTime
  1578. if cs.AllTime > allTime {
  1579. allTime = cs.AllTime
  1580. }
  1581. if inGrace && cs.Up+cs.Down > 0 {
  1582. if err := tx.Exec(
  1583. `UPDATE client_traffics
  1584. SET enable = ?, total = ?, expiry_time = ?, reset = ?, all_time = ?
  1585. WHERE inbound_id = ? AND email = ?`,
  1586. cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, c.Id, cs.Email,
  1587. ).Error; err != nil {
  1588. return false, err
  1589. }
  1590. continue
  1591. }
  1592. if err := tx.Exec(
  1593. `UPDATE client_traffics
  1594. SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
  1595. all_time = ?, last_online = MAX(last_online, ?)
  1596. WHERE inbound_id = ? AND email = ?`,
  1597. cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime,
  1598. cs.LastOnline, c.Id, cs.Email,
  1599. ).Error; err != nil {
  1600. return false, err
  1601. }
  1602. }
  1603. for k, existing := range centralCS {
  1604. if k.inboundID != c.Id {
  1605. continue
  1606. }
  1607. if _, kept := snapEmails[k.email]; kept {
  1608. continue
  1609. }
  1610. if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
  1611. Delete(&xray.ClientTraffic{}).Error; err != nil {
  1612. return false, err
  1613. }
  1614. structuralChange = true
  1615. }
  1616. }
  1617. if err := tx.Commit().Error; err != nil {
  1618. return false, err
  1619. }
  1620. committed = true
  1621. if p != nil {
  1622. p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
  1623. }
  1624. return structuralChange, nil
  1625. }
  1626. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  1627. err = submitTrafficWrite(func() error {
  1628. var inner error
  1629. needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  1630. return inner
  1631. })
  1632. return
  1633. }
  1634. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
  1635. var err error
  1636. db := database.GetDB()
  1637. tx := db.Begin()
  1638. defer func() {
  1639. if err != nil {
  1640. tx.Rollback()
  1641. } else {
  1642. tx.Commit()
  1643. }
  1644. }()
  1645. err = s.addInboundTraffic(tx, inboundTraffics)
  1646. if err != nil {
  1647. return false, false, err
  1648. }
  1649. err = s.addClientTraffic(tx, clientTraffics)
  1650. if err != nil {
  1651. return false, false, err
  1652. }
  1653. needRestart0, count, err := s.autoRenewClients(tx)
  1654. if err != nil {
  1655. logger.Warning("Error in renew clients:", err)
  1656. } else if count > 0 {
  1657. logger.Debugf("%v clients renewed", count)
  1658. }
  1659. disabledClientsCount := int64(0)
  1660. needRestart1, count, err := s.disableInvalidClients(tx)
  1661. if err != nil {
  1662. logger.Warning("Error in disabling invalid clients:", err)
  1663. } else if count > 0 {
  1664. logger.Debugf("%v clients disabled", count)
  1665. disabledClientsCount = count
  1666. }
  1667. needRestart2, count, err := s.disableInvalidInbounds(tx)
  1668. if err != nil {
  1669. logger.Warning("Error in disabling invalid inbounds:", err)
  1670. } else if count > 0 {
  1671. logger.Debugf("%v inbounds disabled", count)
  1672. }
  1673. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
  1674. }
  1675. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  1676. if len(traffics) == 0 {
  1677. return nil
  1678. }
  1679. var err error
  1680. for _, traffic := range traffics {
  1681. if traffic.IsInbound {
  1682. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  1683. Updates(map[string]any{
  1684. "up": gorm.Expr("up + ?", traffic.Up),
  1685. "down": gorm.Expr("down + ?", traffic.Down),
  1686. "all_time": gorm.Expr("COALESCE(all_time, 0) + ?", traffic.Up+traffic.Down),
  1687. }).Error
  1688. if err != nil {
  1689. return err
  1690. }
  1691. }
  1692. }
  1693. return nil
  1694. }
  1695. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  1696. if len(traffics) == 0 {
  1697. // Empty onlineUsers
  1698. if p != nil {
  1699. p.SetOnlineClients(make([]string, 0))
  1700. }
  1701. return nil
  1702. }
  1703. onlineClients := make([]string, 0)
  1704. emails := make([]string, 0, len(traffics))
  1705. for _, traffic := range traffics {
  1706. emails = append(emails, traffic.Email)
  1707. }
  1708. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  1709. err = tx.Model(xray.ClientTraffic{}).
  1710. Where("email IN (?) AND inbound_id IN (?)", emails,
  1711. tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1712. Find(&dbClientTraffics).Error
  1713. if err != nil {
  1714. return err
  1715. }
  1716. // Avoid empty slice error
  1717. if len(dbClientTraffics) == 0 {
  1718. return nil
  1719. }
  1720. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  1721. if err != nil {
  1722. return err
  1723. }
  1724. // Index by email for O(N) merge — the previous nested loop was O(N²)
  1725. // and dominated each cron tick on inbounds with thousands of active
  1726. // clients (7500 × 7500 = 56M string comparisons every 10 seconds).
  1727. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  1728. for i := range traffics {
  1729. if traffics[i] != nil {
  1730. trafficByEmail[traffics[i].Email] = traffics[i]
  1731. }
  1732. }
  1733. now := time.Now().UnixMilli()
  1734. for dbTraffic_index := range dbClientTraffics {
  1735. t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
  1736. if !ok {
  1737. continue
  1738. }
  1739. dbClientTraffics[dbTraffic_index].Up += t.Up
  1740. dbClientTraffics[dbTraffic_index].Down += t.Down
  1741. dbClientTraffics[dbTraffic_index].AllTime += t.Up + t.Down
  1742. if t.Up+t.Down > 0 {
  1743. onlineClients = append(onlineClients, t.Email)
  1744. dbClientTraffics[dbTraffic_index].LastOnline = now
  1745. }
  1746. }
  1747. // Set onlineUsers
  1748. p.SetOnlineClients(onlineClients)
  1749. err = tx.Save(dbClientTraffics).Error
  1750. if err != nil {
  1751. logger.Warning("AddClientTraffic update data ", err)
  1752. }
  1753. return nil
  1754. }
  1755. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  1756. inboundIds := make([]int, 0, len(dbClientTraffics))
  1757. for _, dbClientTraffic := range dbClientTraffics {
  1758. if dbClientTraffic.ExpiryTime < 0 {
  1759. inboundIds = append(inboundIds, dbClientTraffic.InboundId)
  1760. }
  1761. }
  1762. if len(inboundIds) > 0 {
  1763. var inbounds []*model.Inbound
  1764. err := tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  1765. if err != nil {
  1766. return nil, err
  1767. }
  1768. for inbound_index := range inbounds {
  1769. settings := map[string]any{}
  1770. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  1771. clients, ok := settings["clients"].([]any)
  1772. if ok {
  1773. var newClients []any
  1774. for client_index := range clients {
  1775. c := clients[client_index].(map[string]any)
  1776. for traffic_index := range dbClientTraffics {
  1777. if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email {
  1778. oldExpiryTime := c["expiryTime"].(float64)
  1779. newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime)
  1780. c["expiryTime"] = newExpiryTime
  1781. c["updated_at"] = time.Now().Unix() * 1000
  1782. dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime
  1783. break
  1784. }
  1785. }
  1786. // Backfill created_at and updated_at
  1787. if _, ok := c["created_at"]; !ok {
  1788. c["created_at"] = time.Now().Unix() * 1000
  1789. }
  1790. c["updated_at"] = time.Now().Unix() * 1000
  1791. newClients = append(newClients, any(c))
  1792. }
  1793. settings["clients"] = newClients
  1794. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  1795. if err != nil {
  1796. return nil, err
  1797. }
  1798. inbounds[inbound_index].Settings = string(modifiedSettings)
  1799. }
  1800. }
  1801. err = tx.Save(inbounds).Error
  1802. if err != nil {
  1803. logger.Warning("AddClientTraffic update inbounds ", err)
  1804. logger.Error(inbounds)
  1805. }
  1806. }
  1807. return dbClientTraffics, nil
  1808. }
  1809. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  1810. // check for time expired
  1811. var traffics []*xray.ClientTraffic
  1812. now := time.Now().Unix() * 1000
  1813. var err, err1 error
  1814. err = tx.Model(xray.ClientTraffic{}).
  1815. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  1816. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1817. Find(&traffics).Error
  1818. if err != nil {
  1819. return false, 0, err
  1820. }
  1821. // return if there is no client to renew
  1822. if len(traffics) == 0 {
  1823. return false, 0, nil
  1824. }
  1825. var inbound_ids []int
  1826. var inbounds []*model.Inbound
  1827. needRestart := false
  1828. var clientsToAdd []struct {
  1829. protocol string
  1830. tag string
  1831. client map[string]any
  1832. }
  1833. for _, traffic := range traffics {
  1834. inbound_ids = append(inbound_ids, traffic.InboundId)
  1835. }
  1836. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  1837. // per tick instead of N times across chunk boundaries.
  1838. inbound_ids = uniqueInts(inbound_ids)
  1839. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  1840. // are touched in a single tick.
  1841. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  1842. var page []*model.Inbound
  1843. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  1844. return false, 0, err
  1845. }
  1846. inbounds = append(inbounds, page...)
  1847. }
  1848. for inbound_index := range inbounds {
  1849. settings := map[string]any{}
  1850. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  1851. clients := settings["clients"].([]any)
  1852. for client_index := range clients {
  1853. c := clients[client_index].(map[string]any)
  1854. for traffic_index, traffic := range traffics {
  1855. if traffic.Email == c["email"].(string) {
  1856. newExpiryTime := traffic.ExpiryTime
  1857. for newExpiryTime < now {
  1858. newExpiryTime += (int64(traffic.Reset) * 86400000)
  1859. }
  1860. c["expiryTime"] = newExpiryTime
  1861. traffics[traffic_index].ExpiryTime = newExpiryTime
  1862. traffics[traffic_index].Down = 0
  1863. traffics[traffic_index].Up = 0
  1864. if !traffic.Enable {
  1865. traffics[traffic_index].Enable = true
  1866. clientsToAdd = append(clientsToAdd,
  1867. struct {
  1868. protocol string
  1869. tag string
  1870. client map[string]any
  1871. }{
  1872. protocol: string(inbounds[inbound_index].Protocol),
  1873. tag: inbounds[inbound_index].Tag,
  1874. client: c,
  1875. })
  1876. }
  1877. clients[client_index] = any(c)
  1878. break
  1879. }
  1880. }
  1881. }
  1882. settings["clients"] = clients
  1883. newSettings, err := json.MarshalIndent(settings, "", " ")
  1884. if err != nil {
  1885. return false, 0, err
  1886. }
  1887. inbounds[inbound_index].Settings = string(newSettings)
  1888. }
  1889. err = tx.Save(inbounds).Error
  1890. if err != nil {
  1891. return false, 0, err
  1892. }
  1893. err = tx.Save(traffics).Error
  1894. if err != nil {
  1895. return false, 0, err
  1896. }
  1897. if p != nil {
  1898. err1 = s.xrayApi.Init(p.GetAPIPort())
  1899. if err1 != nil {
  1900. return true, int64(len(traffics)), nil
  1901. }
  1902. for _, clientToAdd := range clientsToAdd {
  1903. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  1904. if err1 != nil {
  1905. needRestart = true
  1906. }
  1907. }
  1908. s.xrayApi.Close()
  1909. }
  1910. return needRestart, int64(len(traffics)), nil
  1911. }
  1912. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  1913. now := time.Now().Unix() * 1000
  1914. needRestart := false
  1915. if p != nil {
  1916. var tags []string
  1917. err := tx.Table("inbounds").
  1918. Select("inbounds.tag").
  1919. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  1920. Scan(&tags).Error
  1921. if err != nil {
  1922. return false, 0, err
  1923. }
  1924. s.xrayApi.Init(p.GetAPIPort())
  1925. for _, tag := range tags {
  1926. err1 := s.xrayApi.DelInbound(tag)
  1927. if err1 == nil {
  1928. logger.Debug("Inbound disabled by api:", tag)
  1929. } else {
  1930. logger.Debug("Error in disabling inbound by api:", err1)
  1931. needRestart = true
  1932. }
  1933. }
  1934. s.xrayApi.Close()
  1935. }
  1936. result := tx.Model(model.Inbound{}).
  1937. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  1938. Update("enable", false)
  1939. err := result.Error
  1940. count := result.RowsAffected
  1941. return needRestart, count, err
  1942. }
  1943. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
  1944. now := time.Now().Unix() * 1000
  1945. needRestart := false
  1946. var depletedRows []xray.ClientTraffic
  1947. err := tx.Model(xray.ClientTraffic{}).
  1948. Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
  1949. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1950. Find(&depletedRows).Error
  1951. if err != nil {
  1952. return false, 0, err
  1953. }
  1954. if len(depletedRows) == 0 {
  1955. return false, 0, nil
  1956. }
  1957. rowByEmail := make(map[string]*xray.ClientTraffic, len(depletedRows))
  1958. depletedEmails := make([]string, 0, len(depletedRows))
  1959. for i := range depletedRows {
  1960. if depletedRows[i].Email == "" {
  1961. continue
  1962. }
  1963. rowByEmail[strings.ToLower(depletedRows[i].Email)] = &depletedRows[i]
  1964. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  1965. }
  1966. // Resolve inbound membership only for the depleted emails — pushing the
  1967. // filter into SQLite avoids dragging every panel client through Go for
  1968. // the common case where most clients are healthy.
  1969. var memberships []struct {
  1970. InboundId int
  1971. Tag string
  1972. Email string
  1973. SubID string `gorm:"column:sub_id"`
  1974. }
  1975. if len(depletedEmails) > 0 {
  1976. err = tx.Raw(`
  1977. SELECT inbounds.id AS inbound_id,
  1978. inbounds.tag AS tag,
  1979. JSON_EXTRACT(client.value, '$.email') AS email,
  1980. JSON_EXTRACT(client.value, '$.subId') AS sub_id
  1981. FROM inbounds,
  1982. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  1983. WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
  1984. `, lowerAll(depletedEmails)).Scan(&memberships).Error
  1985. if err != nil {
  1986. return false, 0, err
  1987. }
  1988. }
  1989. // Discover the row holder's subId per email. Only siblings sharing it
  1990. // get cascaded; legacy data where two identities reuse the same email
  1991. // stays isolated to the row owner.
  1992. holderSub := make(map[string]string, len(rowByEmail))
  1993. for _, m := range memberships {
  1994. email := strings.ToLower(strings.Trim(m.Email, "\""))
  1995. row, ok := rowByEmail[email]
  1996. if !ok || m.InboundId != row.InboundId {
  1997. continue
  1998. }
  1999. holderSub[email] = strings.Trim(m.SubID, "\"")
  2000. }
  2001. type target struct {
  2002. InboundId int
  2003. Tag string
  2004. Email string
  2005. }
  2006. var targets []target
  2007. for _, m := range memberships {
  2008. email := strings.ToLower(strings.Trim(m.Email, "\""))
  2009. row, ok := rowByEmail[email]
  2010. if !ok {
  2011. continue
  2012. }
  2013. expected, hasSub := holderSub[email]
  2014. mSub := strings.Trim(m.SubID, "\"")
  2015. switch {
  2016. case !hasSub || expected == "":
  2017. if m.InboundId != row.InboundId {
  2018. continue
  2019. }
  2020. case mSub != expected:
  2021. continue
  2022. }
  2023. targets = append(targets, target{
  2024. InboundId: m.InboundId,
  2025. Tag: m.Tag,
  2026. Email: strings.Trim(m.Email, "\""),
  2027. })
  2028. }
  2029. if p != nil && len(targets) > 0 {
  2030. s.xrayApi.Init(p.GetAPIPort())
  2031. for _, t := range targets {
  2032. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  2033. if err1 == nil {
  2034. logger.Debug("Client disabled by api:", t.Email)
  2035. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  2036. logger.Debug("User is already disabled. Nothing to do more...")
  2037. } else {
  2038. logger.Debug("Error in disabling client by api:", err1)
  2039. needRestart = true
  2040. }
  2041. }
  2042. s.xrayApi.Close()
  2043. }
  2044. result := tx.Model(xray.ClientTraffic{}).
  2045. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
  2046. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  2047. Update("enable", false)
  2048. err = result.Error
  2049. count := result.RowsAffected
  2050. if err != nil {
  2051. return needRestart, count, err
  2052. }
  2053. if len(targets) == 0 {
  2054. return needRestart, count, nil
  2055. }
  2056. inboundEmailMap := make(map[int]map[string]struct{})
  2057. for _, t := range targets {
  2058. if inboundEmailMap[t.InboundId] == nil {
  2059. inboundEmailMap[t.InboundId] = make(map[string]struct{})
  2060. }
  2061. inboundEmailMap[t.InboundId][t.Email] = struct{}{}
  2062. }
  2063. inboundIds := make([]int, 0, len(inboundEmailMap))
  2064. for id := range inboundEmailMap {
  2065. inboundIds = append(inboundIds, id)
  2066. }
  2067. var inbounds []*model.Inbound
  2068. if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil {
  2069. logger.Warning("disableInvalidClients fetch inbounds:", err)
  2070. return needRestart, count, nil
  2071. }
  2072. dirty := make([]*model.Inbound, 0, len(inbounds))
  2073. for _, inbound := range inbounds {
  2074. settings := map[string]any{}
  2075. if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil {
  2076. continue
  2077. }
  2078. clientsRaw, ok := settings["clients"].([]any)
  2079. if !ok {
  2080. continue
  2081. }
  2082. emailSet := inboundEmailMap[inbound.Id]
  2083. changed := false
  2084. for i := range clientsRaw {
  2085. c, ok := clientsRaw[i].(map[string]any)
  2086. if !ok {
  2087. continue
  2088. }
  2089. email, _ := c["email"].(string)
  2090. if _, shouldDisable := emailSet[email]; !shouldDisable {
  2091. continue
  2092. }
  2093. c["enable"] = false
  2094. if row, ok := rowByEmail[strings.ToLower(email)]; ok {
  2095. c["totalGB"] = row.Total
  2096. c["expiryTime"] = row.ExpiryTime
  2097. }
  2098. c["updated_at"] = now
  2099. clientsRaw[i] = c
  2100. changed = true
  2101. }
  2102. if !changed {
  2103. continue
  2104. }
  2105. settings["clients"] = clientsRaw
  2106. modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ")
  2107. if jsonErr != nil {
  2108. continue
  2109. }
  2110. inbound.Settings = string(modifiedSettings)
  2111. dirty = append(dirty, inbound)
  2112. }
  2113. if len(dirty) > 0 {
  2114. if err = tx.Save(dirty).Error; err != nil {
  2115. logger.Warning("disableInvalidClients update inbound settings:", err)
  2116. }
  2117. }
  2118. return needRestart, count, nil
  2119. }
  2120. func (s *InboundService) GetInboundTags() (string, error) {
  2121. db := database.GetDB()
  2122. var inboundTags []string
  2123. err := db.Model(model.Inbound{}).Select("tag").Find(&inboundTags).Error
  2124. if err != nil && err != gorm.ErrRecordNotFound {
  2125. return "", err
  2126. }
  2127. tags, _ := json.Marshal(inboundTags)
  2128. return string(tags), nil
  2129. }
  2130. func (s *InboundService) GetClientReverseTags() (string, error) {
  2131. db := database.GetDB()
  2132. var inbounds []model.Inbound
  2133. err := db.Model(model.Inbound{}).Select("settings").Where("protocol = ?", "vless").Find(&inbounds).Error
  2134. if err != nil && err != gorm.ErrRecordNotFound {
  2135. return "[]", err
  2136. }
  2137. tagSet := make(map[string]struct{})
  2138. for _, inbound := range inbounds {
  2139. var settings map[string]any
  2140. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  2141. continue
  2142. }
  2143. clients, ok := settings["clients"].([]any)
  2144. if !ok {
  2145. continue
  2146. }
  2147. for _, client := range clients {
  2148. clientMap, ok := client.(map[string]any)
  2149. if !ok {
  2150. continue
  2151. }
  2152. reverse, ok := clientMap["reverse"].(map[string]any)
  2153. if !ok {
  2154. continue
  2155. }
  2156. tag, _ := reverse["tag"].(string)
  2157. tag = strings.TrimSpace(tag)
  2158. if tag != "" {
  2159. tagSet[tag] = struct{}{}
  2160. }
  2161. }
  2162. }
  2163. rawTags := make([]string, 0, len(tagSet))
  2164. for tag := range tagSet {
  2165. rawTags = append(rawTags, tag)
  2166. }
  2167. sort.Strings(rawTags)
  2168. result, _ := json.Marshal(rawTags)
  2169. return string(result), nil
  2170. }
  2171. func (s *InboundService) MigrationRemoveOrphanedTraffics() {
  2172. db := database.GetDB()
  2173. db.Exec(`
  2174. DELETE FROM client_traffics
  2175. WHERE email NOT IN (
  2176. SELECT JSON_EXTRACT(client.value, '$.email')
  2177. FROM inbounds,
  2178. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  2179. )
  2180. `)
  2181. }
  2182. // AddClientStat inserts a per-client accounting row, no-op on email
  2183. // conflict. Xray reports traffic per email, so the surviving row acts as
  2184. // the shared accumulator for inbounds that re-use the same identity.
  2185. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  2186. clientTraffic := xray.ClientTraffic{
  2187. InboundId: inboundId,
  2188. Email: client.Email,
  2189. Total: client.TotalGB,
  2190. ExpiryTime: client.ExpiryTime,
  2191. Enable: client.Enable,
  2192. Reset: client.Reset,
  2193. }
  2194. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  2195. Create(&clientTraffic).Error
  2196. }
  2197. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  2198. result := tx.Model(xray.ClientTraffic{}).
  2199. Where("email = ?", email).
  2200. Updates(map[string]any{
  2201. "enable": client.Enable,
  2202. "email": client.Email,
  2203. "total": client.TotalGB,
  2204. "expiry_time": client.ExpiryTime,
  2205. "reset": client.Reset,
  2206. })
  2207. err := result.Error
  2208. return err
  2209. }
  2210. func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail string) error {
  2211. return tx.Model(model.InboundClientIps{}).Where("client_email = ?", oldEmail).Update("client_email", newEmail).Error
  2212. }
  2213. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  2214. return tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error
  2215. }
  2216. func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
  2217. return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
  2218. }
  2219. func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
  2220. db := database.GetDB()
  2221. var traffics []*xray.ClientTraffic
  2222. err = db.Model(xray.ClientTraffic{}).Where("id = ?", trafficId).Find(&traffics).Error
  2223. if err != nil {
  2224. logger.Warningf("Error retrieving ClientTraffic with trafficId %d: %v", trafficId, err)
  2225. return nil, nil, err
  2226. }
  2227. if len(traffics) > 0 {
  2228. inbound, err = s.GetInbound(traffics[0].InboundId)
  2229. return traffics[0], inbound, err
  2230. }
  2231. return nil, nil, nil
  2232. }
  2233. func (s *InboundService) GetClientInboundByEmail(email string) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
  2234. db := database.GetDB()
  2235. var traffics []*xray.ClientTraffic
  2236. err = db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error
  2237. if err != nil {
  2238. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  2239. return nil, nil, err
  2240. }
  2241. if len(traffics) > 0 {
  2242. inbound, err = s.GetInbound(traffics[0].InboundId)
  2243. return traffics[0], inbound, err
  2244. }
  2245. return nil, nil, nil
  2246. }
  2247. func (s *InboundService) GetClientByEmail(clientEmail string) (*xray.ClientTraffic, *model.Client, error) {
  2248. traffic, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2249. if err != nil {
  2250. return nil, nil, err
  2251. }
  2252. if inbound == nil {
  2253. return nil, nil, common.NewError("Inbound Not Found For Email:", clientEmail)
  2254. }
  2255. clients, err := s.GetClients(inbound)
  2256. if err != nil {
  2257. return nil, nil, err
  2258. }
  2259. for _, client := range clients {
  2260. if client.Email == clientEmail {
  2261. return traffic, &client, nil
  2262. }
  2263. }
  2264. return nil, nil, common.NewError("Client Not Found In Inbound For Email:", clientEmail)
  2265. }
  2266. func (s *InboundService) SetClientTelegramUserID(trafficId int, tgId int64) (bool, error) {
  2267. traffic, inbound, err := s.GetClientInboundByTrafficID(trafficId)
  2268. if err != nil {
  2269. return false, err
  2270. }
  2271. if inbound == nil {
  2272. return false, common.NewError("Inbound Not Found For Traffic ID:", trafficId)
  2273. }
  2274. clientEmail := traffic.Email
  2275. oldClients, err := s.GetClients(inbound)
  2276. if err != nil {
  2277. return false, err
  2278. }
  2279. clientId := ""
  2280. for _, oldClient := range oldClients {
  2281. if oldClient.Email == clientEmail {
  2282. switch inbound.Protocol {
  2283. case "trojan":
  2284. clientId = oldClient.Password
  2285. case "shadowsocks":
  2286. clientId = oldClient.Email
  2287. default:
  2288. clientId = oldClient.ID
  2289. }
  2290. break
  2291. }
  2292. }
  2293. if len(clientId) == 0 {
  2294. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2295. }
  2296. var settings map[string]any
  2297. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2298. if err != nil {
  2299. return false, err
  2300. }
  2301. clients := settings["clients"].([]any)
  2302. var newClients []any
  2303. for client_index := range clients {
  2304. c := clients[client_index].(map[string]any)
  2305. if c["email"] == clientEmail {
  2306. c["tgId"] = tgId
  2307. c["updated_at"] = time.Now().Unix() * 1000
  2308. newClients = append(newClients, any(c))
  2309. }
  2310. }
  2311. settings["clients"] = newClients
  2312. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2313. if err != nil {
  2314. return false, err
  2315. }
  2316. inbound.Settings = string(modifiedSettings)
  2317. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2318. return needRestart, err
  2319. }
  2320. func (s *InboundService) checkIsEnabledByEmail(clientEmail string) (bool, error) {
  2321. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2322. if err != nil {
  2323. return false, err
  2324. }
  2325. if inbound == nil {
  2326. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2327. }
  2328. clients, err := s.GetClients(inbound)
  2329. if err != nil {
  2330. return false, err
  2331. }
  2332. isEnable := false
  2333. for _, client := range clients {
  2334. if client.Email == clientEmail {
  2335. isEnable = client.Enable
  2336. break
  2337. }
  2338. }
  2339. return isEnable, err
  2340. }
  2341. func (s *InboundService) ToggleClientEnableByEmail(clientEmail string) (bool, bool, error) {
  2342. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2343. if err != nil {
  2344. return false, false, err
  2345. }
  2346. if inbound == nil {
  2347. return false, false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2348. }
  2349. oldClients, err := s.GetClients(inbound)
  2350. if err != nil {
  2351. return false, false, err
  2352. }
  2353. clientId := ""
  2354. clientOldEnabled := false
  2355. for _, oldClient := range oldClients {
  2356. if oldClient.Email == clientEmail {
  2357. switch inbound.Protocol {
  2358. case "trojan":
  2359. clientId = oldClient.Password
  2360. case "shadowsocks":
  2361. clientId = oldClient.Email
  2362. default:
  2363. clientId = oldClient.ID
  2364. }
  2365. clientOldEnabled = oldClient.Enable
  2366. break
  2367. }
  2368. }
  2369. if len(clientId) == 0 {
  2370. return false, false, common.NewError("Client Not Found For Email:", clientEmail)
  2371. }
  2372. var settings map[string]any
  2373. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2374. if err != nil {
  2375. return false, false, err
  2376. }
  2377. clients := settings["clients"].([]any)
  2378. var newClients []any
  2379. for client_index := range clients {
  2380. c := clients[client_index].(map[string]any)
  2381. if c["email"] == clientEmail {
  2382. c["enable"] = !clientOldEnabled
  2383. c["updated_at"] = time.Now().Unix() * 1000
  2384. newClients = append(newClients, any(c))
  2385. }
  2386. }
  2387. settings["clients"] = newClients
  2388. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2389. if err != nil {
  2390. return false, false, err
  2391. }
  2392. inbound.Settings = string(modifiedSettings)
  2393. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2394. if err != nil {
  2395. return false, needRestart, err
  2396. }
  2397. return !clientOldEnabled, needRestart, nil
  2398. }
  2399. // SetClientEnableByEmail sets client enable state to desired value; returns (changed, needRestart, error)
  2400. func (s *InboundService) SetClientEnableByEmail(clientEmail string, enable bool) (bool, bool, error) {
  2401. current, err := s.checkIsEnabledByEmail(clientEmail)
  2402. if err != nil {
  2403. return false, false, err
  2404. }
  2405. if current == enable {
  2406. return false, false, nil
  2407. }
  2408. newEnabled, needRestart, err := s.ToggleClientEnableByEmail(clientEmail)
  2409. if err != nil {
  2410. return false, needRestart, err
  2411. }
  2412. return newEnabled == enable, needRestart, nil
  2413. }
  2414. func (s *InboundService) ResetClientIpLimitByEmail(clientEmail string, count int) (bool, error) {
  2415. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2416. if err != nil {
  2417. return false, err
  2418. }
  2419. if inbound == nil {
  2420. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2421. }
  2422. oldClients, err := s.GetClients(inbound)
  2423. if err != nil {
  2424. return false, err
  2425. }
  2426. clientId := ""
  2427. for _, oldClient := range oldClients {
  2428. if oldClient.Email == clientEmail {
  2429. switch inbound.Protocol {
  2430. case "trojan":
  2431. clientId = oldClient.Password
  2432. case "shadowsocks":
  2433. clientId = oldClient.Email
  2434. default:
  2435. clientId = oldClient.ID
  2436. }
  2437. break
  2438. }
  2439. }
  2440. if len(clientId) == 0 {
  2441. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2442. }
  2443. var settings map[string]any
  2444. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2445. if err != nil {
  2446. return false, err
  2447. }
  2448. clients := settings["clients"].([]any)
  2449. var newClients []any
  2450. for client_index := range clients {
  2451. c := clients[client_index].(map[string]any)
  2452. if c["email"] == clientEmail {
  2453. c["limitIp"] = count
  2454. c["updated_at"] = time.Now().Unix() * 1000
  2455. newClients = append(newClients, any(c))
  2456. }
  2457. }
  2458. settings["clients"] = newClients
  2459. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2460. if err != nil {
  2461. return false, err
  2462. }
  2463. inbound.Settings = string(modifiedSettings)
  2464. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2465. return needRestart, err
  2466. }
  2467. func (s *InboundService) ResetClientExpiryTimeByEmail(clientEmail string, expiry_time int64) (bool, error) {
  2468. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2469. if err != nil {
  2470. return false, err
  2471. }
  2472. if inbound == nil {
  2473. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2474. }
  2475. oldClients, err := s.GetClients(inbound)
  2476. if err != nil {
  2477. return false, err
  2478. }
  2479. clientId := ""
  2480. for _, oldClient := range oldClients {
  2481. if oldClient.Email == clientEmail {
  2482. switch inbound.Protocol {
  2483. case "trojan":
  2484. clientId = oldClient.Password
  2485. case "shadowsocks":
  2486. clientId = oldClient.Email
  2487. default:
  2488. clientId = oldClient.ID
  2489. }
  2490. break
  2491. }
  2492. }
  2493. if len(clientId) == 0 {
  2494. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2495. }
  2496. var settings map[string]any
  2497. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2498. if err != nil {
  2499. return false, err
  2500. }
  2501. clients := settings["clients"].([]any)
  2502. var newClients []any
  2503. for client_index := range clients {
  2504. c := clients[client_index].(map[string]any)
  2505. if c["email"] == clientEmail {
  2506. c["expiryTime"] = expiry_time
  2507. c["updated_at"] = time.Now().Unix() * 1000
  2508. newClients = append(newClients, any(c))
  2509. }
  2510. }
  2511. settings["clients"] = newClients
  2512. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2513. if err != nil {
  2514. return false, err
  2515. }
  2516. inbound.Settings = string(modifiedSettings)
  2517. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2518. return needRestart, err
  2519. }
  2520. func (s *InboundService) ResetClientTrafficLimitByEmail(clientEmail string, totalGB int) (bool, error) {
  2521. if totalGB < 0 {
  2522. return false, common.NewError("totalGB must be >= 0")
  2523. }
  2524. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2525. if err != nil {
  2526. return false, err
  2527. }
  2528. if inbound == nil {
  2529. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2530. }
  2531. oldClients, err := s.GetClients(inbound)
  2532. if err != nil {
  2533. return false, err
  2534. }
  2535. clientId := ""
  2536. for _, oldClient := range oldClients {
  2537. if oldClient.Email == clientEmail {
  2538. switch inbound.Protocol {
  2539. case "trojan":
  2540. clientId = oldClient.Password
  2541. case "shadowsocks":
  2542. clientId = oldClient.Email
  2543. default:
  2544. clientId = oldClient.ID
  2545. }
  2546. break
  2547. }
  2548. }
  2549. if len(clientId) == 0 {
  2550. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2551. }
  2552. var settings map[string]any
  2553. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2554. if err != nil {
  2555. return false, err
  2556. }
  2557. clients := settings["clients"].([]any)
  2558. var newClients []any
  2559. for client_index := range clients {
  2560. c := clients[client_index].(map[string]any)
  2561. if c["email"] == clientEmail {
  2562. c["totalGB"] = totalGB * 1024 * 1024 * 1024
  2563. c["updated_at"] = time.Now().Unix() * 1000
  2564. newClients = append(newClients, any(c))
  2565. }
  2566. }
  2567. settings["clients"] = newClients
  2568. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2569. if err != nil {
  2570. return false, err
  2571. }
  2572. inbound.Settings = string(modifiedSettings)
  2573. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2574. return needRestart, err
  2575. }
  2576. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  2577. return submitTrafficWrite(func() error {
  2578. db := database.GetDB()
  2579. return db.Model(xray.ClientTraffic{}).
  2580. Where("email = ?", clientEmail).
  2581. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
  2582. })
  2583. }
  2584. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  2585. err = submitTrafficWrite(func() error {
  2586. var inner error
  2587. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  2588. return inner
  2589. })
  2590. return
  2591. }
  2592. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  2593. needRestart := false
  2594. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  2595. if err != nil {
  2596. return false, err
  2597. }
  2598. if !traffic.Enable {
  2599. inbound, err := s.GetInbound(id)
  2600. if err != nil {
  2601. return false, err
  2602. }
  2603. clients, err := s.GetClients(inbound)
  2604. if err != nil {
  2605. return false, err
  2606. }
  2607. for _, client := range clients {
  2608. if client.Email == clientEmail && client.Enable {
  2609. rt, rterr := s.runtimeFor(inbound)
  2610. if rterr != nil {
  2611. if inbound.NodeID != nil {
  2612. return false, rterr
  2613. }
  2614. needRestart = true
  2615. break
  2616. }
  2617. cipher := ""
  2618. if string(inbound.Protocol) == "shadowsocks" {
  2619. var oldSettings map[string]any
  2620. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  2621. if err != nil {
  2622. return false, err
  2623. }
  2624. cipher = oldSettings["method"].(string)
  2625. }
  2626. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  2627. "email": client.Email,
  2628. "id": client.ID,
  2629. "auth": client.Auth,
  2630. "security": client.Security,
  2631. "flow": client.Flow,
  2632. "password": client.Password,
  2633. "cipher": cipher,
  2634. })
  2635. if err1 == nil {
  2636. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  2637. } else {
  2638. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  2639. needRestart = true
  2640. }
  2641. break
  2642. }
  2643. }
  2644. }
  2645. traffic.Up = 0
  2646. traffic.Down = 0
  2647. traffic.Enable = true
  2648. db := database.GetDB()
  2649. err = db.Save(traffic).Error
  2650. if err != nil {
  2651. return false, err
  2652. }
  2653. now := time.Now().UnixMilli()
  2654. _ = db.Model(model.Inbound{}).
  2655. Where("id = ?", id).
  2656. Update("last_traffic_reset_time", now).Error
  2657. inbound, err := s.GetInbound(id)
  2658. if err == nil && inbound != nil && inbound.NodeID != nil {
  2659. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  2660. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  2661. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  2662. }
  2663. } else {
  2664. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  2665. }
  2666. }
  2667. return needRestart, nil
  2668. }
  2669. func (s *InboundService) ResetAllClientTraffics(id int) error {
  2670. return submitTrafficWrite(func() error {
  2671. return s.resetAllClientTrafficsLocked(id)
  2672. })
  2673. }
  2674. func (s *InboundService) resetAllClientTrafficsLocked(id int) error {
  2675. db := database.GetDB()
  2676. now := time.Now().Unix() * 1000
  2677. if err := db.Transaction(func(tx *gorm.DB) error {
  2678. whereText := "inbound_id "
  2679. if id == -1 {
  2680. whereText += " > ?"
  2681. } else {
  2682. whereText += " = ?"
  2683. }
  2684. // Reset client traffics
  2685. result := tx.Model(xray.ClientTraffic{}).
  2686. Where(whereText, id).
  2687. Updates(map[string]any{"enable": true, "up": 0, "down": 0})
  2688. if result.Error != nil {
  2689. return result.Error
  2690. }
  2691. // Update lastTrafficResetTime for the inbound(s)
  2692. inboundWhereText := "id "
  2693. if id == -1 {
  2694. inboundWhereText += " > ?"
  2695. } else {
  2696. inboundWhereText += " = ?"
  2697. }
  2698. result = tx.Model(model.Inbound{}).
  2699. Where(inboundWhereText, id).
  2700. Update("last_traffic_reset_time", now)
  2701. return result.Error
  2702. }); err != nil {
  2703. return err
  2704. }
  2705. var inbounds []model.Inbound
  2706. q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
  2707. if id != -1 {
  2708. q = q.Where("id = ?", id)
  2709. }
  2710. if err := q.Find(&inbounds).Error; err != nil {
  2711. logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
  2712. return nil
  2713. }
  2714. for i := range inbounds {
  2715. ib := &inbounds[i]
  2716. rt, rterr := s.runtimeFor(ib)
  2717. if rterr != nil {
  2718. logger.Warning("ResetAllClientTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
  2719. continue
  2720. }
  2721. if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
  2722. logger.Warning("ResetAllClientTraffics: remote propagation to", rt.Name(), "failed:", e)
  2723. }
  2724. }
  2725. return nil
  2726. }
  2727. func (s *InboundService) ResetAllTraffics() error {
  2728. return submitTrafficWrite(func() error {
  2729. return s.resetAllTrafficsLocked()
  2730. })
  2731. }
  2732. func (s *InboundService) resetAllTrafficsLocked() error {
  2733. db := database.GetDB()
  2734. now := time.Now().UnixMilli()
  2735. if err := db.Model(model.Inbound{}).
  2736. Where("user_id > ?", 0).
  2737. Updates(map[string]any{
  2738. "up": 0,
  2739. "down": 0,
  2740. "last_traffic_reset_time": now,
  2741. }).Error; err != nil {
  2742. return err
  2743. }
  2744. var inbounds []model.Inbound
  2745. if err := db.Model(model.Inbound{}).
  2746. Where("node_id IS NOT NULL").
  2747. Find(&inbounds).Error; err != nil {
  2748. logger.Warning("ResetAllTraffics: discover node inbounds failed:", err)
  2749. return nil
  2750. }
  2751. for i := range inbounds {
  2752. ib := &inbounds[i]
  2753. rt, rterr := s.runtimeFor(ib)
  2754. if rterr != nil {
  2755. logger.Warning("ResetAllTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
  2756. continue
  2757. }
  2758. if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
  2759. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  2760. }
  2761. }
  2762. return nil
  2763. }
  2764. func (s *InboundService) ResetInboundTraffic(id int) error {
  2765. return submitTrafficWrite(func() error {
  2766. db := database.GetDB()
  2767. return db.Model(model.Inbound{}).
  2768. Where("id = ?", id).
  2769. Updates(map[string]any{"up": 0, "down": 0}).Error
  2770. })
  2771. }
  2772. func (s *InboundService) DelDepletedClients(id int) (err error) {
  2773. db := database.GetDB()
  2774. tx := db.Begin()
  2775. defer func() {
  2776. if err == nil {
  2777. tx.Commit()
  2778. } else {
  2779. tx.Rollback()
  2780. }
  2781. }()
  2782. // Collect depleted emails globally — a shared-email row owned by one
  2783. // inbound depletes every sibling that lists the email.
  2784. now := time.Now().Unix() * 1000
  2785. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  2786. var depletedRows []xray.ClientTraffic
  2787. err = db.Model(xray.ClientTraffic{}).
  2788. Where(depletedClause, now).
  2789. Find(&depletedRows).Error
  2790. if err != nil {
  2791. return err
  2792. }
  2793. if len(depletedRows) == 0 {
  2794. return nil
  2795. }
  2796. depletedEmails := make(map[string]struct{}, len(depletedRows))
  2797. for _, r := range depletedRows {
  2798. if r.Email == "" {
  2799. continue
  2800. }
  2801. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  2802. }
  2803. if len(depletedEmails) == 0 {
  2804. return nil
  2805. }
  2806. var inbounds []*model.Inbound
  2807. inboundQuery := db.Model(model.Inbound{})
  2808. if id >= 0 {
  2809. inboundQuery = inboundQuery.Where("id = ?", id)
  2810. }
  2811. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  2812. return err
  2813. }
  2814. for _, inbound := range inbounds {
  2815. var settings map[string]any
  2816. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  2817. return err
  2818. }
  2819. rawClients, ok := settings["clients"].([]any)
  2820. if !ok {
  2821. continue
  2822. }
  2823. newClients := make([]any, 0, len(rawClients))
  2824. removed := 0
  2825. for _, client := range rawClients {
  2826. c, ok := client.(map[string]any)
  2827. if !ok {
  2828. newClients = append(newClients, client)
  2829. continue
  2830. }
  2831. email, _ := c["email"].(string)
  2832. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  2833. removed++
  2834. continue
  2835. }
  2836. newClients = append(newClients, client)
  2837. }
  2838. if removed == 0 {
  2839. continue
  2840. }
  2841. if len(newClients) == 0 {
  2842. s.DelInbound(inbound.Id)
  2843. continue
  2844. }
  2845. settings["clients"] = newClients
  2846. ns, mErr := json.MarshalIndent(settings, "", " ")
  2847. if mErr != nil {
  2848. return mErr
  2849. }
  2850. inbound.Settings = string(ns)
  2851. if err = tx.Save(inbound).Error; err != nil {
  2852. return err
  2853. }
  2854. }
  2855. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  2856. // no out-of-scope inbound still references the email.
  2857. if id < 0 {
  2858. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  2859. return err
  2860. }
  2861. emails := make([]string, 0, len(depletedEmails))
  2862. for e := range depletedEmails {
  2863. emails = append(emails, e)
  2864. }
  2865. var stillReferenced []string
  2866. if err = tx.Raw(`
  2867. SELECT DISTINCT LOWER(JSON_EXTRACT(client.value, '$.email'))
  2868. FROM inbounds,
  2869. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  2870. WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
  2871. `, emails).Scan(&stillReferenced).Error; err != nil {
  2872. return err
  2873. }
  2874. stillSet := make(map[string]struct{}, len(stillReferenced))
  2875. for _, e := range stillReferenced {
  2876. stillSet[e] = struct{}{}
  2877. }
  2878. toDelete := make([]string, 0, len(emails))
  2879. for _, e := range emails {
  2880. if _, kept := stillSet[e]; !kept {
  2881. toDelete = append(toDelete, e)
  2882. }
  2883. }
  2884. if len(toDelete) > 0 {
  2885. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  2886. return err
  2887. }
  2888. }
  2889. return nil
  2890. }
  2891. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  2892. db := database.GetDB()
  2893. var inbounds []*model.Inbound
  2894. // Retrieve inbounds where settings contain the given tgId
  2895. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  2896. if err != nil && err != gorm.ErrRecordNotFound {
  2897. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  2898. return nil, err
  2899. }
  2900. var emails []string
  2901. for _, inbound := range inbounds {
  2902. clients, err := s.GetClients(inbound)
  2903. if err != nil {
  2904. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  2905. continue
  2906. }
  2907. for _, client := range clients {
  2908. if client.TgID == tgId {
  2909. emails = append(emails, client.Email)
  2910. }
  2911. }
  2912. }
  2913. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  2914. // account owns thousands of clients across inbounds.
  2915. uniqEmails := uniqueNonEmptyStrings(emails)
  2916. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  2917. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  2918. var page []*xray.ClientTraffic
  2919. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  2920. if err == gorm.ErrRecordNotFound {
  2921. continue
  2922. }
  2923. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  2924. return nil, err
  2925. }
  2926. traffics = append(traffics, page...)
  2927. }
  2928. if len(traffics) == 0 {
  2929. logger.Warning("No ClientTraffic records found for emails:", emails)
  2930. return nil, nil
  2931. }
  2932. // Populate UUID and other client data for each traffic record
  2933. for i := range traffics {
  2934. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  2935. traffics[i].Enable = client.Enable
  2936. traffics[i].UUID = client.ID
  2937. traffics[i].SubId = client.SubID
  2938. }
  2939. }
  2940. return traffics, nil
  2941. }
  2942. // sqliteMaxVars is a safe ceiling for the number of bind parameters in a
  2943. // single SQL statement. SQLite's SQLITE_MAX_VARIABLE_NUMBER is 999 on builds
  2944. // before 3.32 and 32766 after; staying under 999 keeps queries portable
  2945. // across forks/old binaries and also bounds per-query memory on truly large
  2946. // installs (>32k clients) where even modern SQLite would refuse a single IN.
  2947. const sqliteMaxVars = 900
  2948. // uniqueNonEmptyStrings returns a deduplicated copy of in with empty strings
  2949. // removed, preserving the order of first occurrence.
  2950. func uniqueNonEmptyStrings(in []string) []string {
  2951. if len(in) == 0 {
  2952. return nil
  2953. }
  2954. seen := make(map[string]struct{}, len(in))
  2955. out := make([]string, 0, len(in))
  2956. for _, v := range in {
  2957. if v == "" {
  2958. continue
  2959. }
  2960. if _, ok := seen[v]; ok {
  2961. continue
  2962. }
  2963. seen[v] = struct{}{}
  2964. out = append(out, v)
  2965. }
  2966. return out
  2967. }
  2968. // uniqueInts returns a deduplicated copy of in, preserving order of first occurrence.
  2969. func uniqueInts(in []int) []int {
  2970. if len(in) == 0 {
  2971. return nil
  2972. }
  2973. seen := make(map[int]struct{}, len(in))
  2974. out := make([]int, 0, len(in))
  2975. for _, v := range in {
  2976. if _, ok := seen[v]; ok {
  2977. continue
  2978. }
  2979. seen[v] = struct{}{}
  2980. out = append(out, v)
  2981. }
  2982. return out
  2983. }
  2984. // chunkStrings splits s into consecutive sub-slices of at most size elements.
  2985. // Returns nil for an empty input or non-positive size.
  2986. func chunkStrings(s []string, size int) [][]string {
  2987. if size <= 0 || len(s) == 0 {
  2988. return nil
  2989. }
  2990. out := make([][]string, 0, (len(s)+size-1)/size)
  2991. for i := 0; i < len(s); i += size {
  2992. end := i + size
  2993. if end > len(s) {
  2994. end = len(s)
  2995. }
  2996. out = append(out, s[i:end])
  2997. }
  2998. return out
  2999. }
  3000. // chunkInts splits s into consecutive sub-slices of at most size elements.
  3001. // Returns nil for an empty input or non-positive size.
  3002. func chunkInts(s []int, size int) [][]int {
  3003. if size <= 0 || len(s) == 0 {
  3004. return nil
  3005. }
  3006. out := make([][]int, 0, (len(s)+size-1)/size)
  3007. for i := 0; i < len(s); i += size {
  3008. end := i + size
  3009. if end > len(s) {
  3010. end = len(s)
  3011. }
  3012. out = append(out, s[i:end])
  3013. }
  3014. return out
  3015. }
  3016. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  3017. uniq := uniqueNonEmptyStrings(emails)
  3018. if len(uniq) == 0 {
  3019. return nil, nil
  3020. }
  3021. db := database.GetDB()
  3022. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  3023. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  3024. var page []*xray.ClientTraffic
  3025. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  3026. return nil, err
  3027. }
  3028. traffics = append(traffics, page...)
  3029. }
  3030. return traffics, nil
  3031. }
  3032. type InboundTrafficSummary struct {
  3033. Id int `json:"id"`
  3034. Up int64 `json:"up"`
  3035. Down int64 `json:"down"`
  3036. Total int64 `json:"total"`
  3037. AllTime int64 `json:"allTime"`
  3038. Enable bool `json:"enable"`
  3039. }
  3040. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  3041. db := database.GetDB()
  3042. var summaries []InboundTrafficSummary
  3043. if err := db.Model(&model.Inbound{}).
  3044. Select("id, up, down, total, all_time, enable").
  3045. Find(&summaries).Error; err != nil {
  3046. return nil, err
  3047. }
  3048. return summaries, nil
  3049. }
  3050. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  3051. // Prefer retrieving along with client to reflect actual enabled state from inbound settings
  3052. t, client, err := s.GetClientByEmail(email)
  3053. if err != nil {
  3054. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  3055. return nil, err
  3056. }
  3057. if t != nil && client != nil {
  3058. t.UUID = client.ID
  3059. t.SubId = client.SubID
  3060. return t, nil
  3061. }
  3062. return nil, nil
  3063. }
  3064. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  3065. return submitTrafficWrite(func() error {
  3066. db := database.GetDB()
  3067. err := db.Model(xray.ClientTraffic{}).
  3068. Where("email = ?", email).
  3069. Updates(map[string]any{
  3070. "up": upload,
  3071. "down": download,
  3072. "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
  3073. }).Error
  3074. if err != nil {
  3075. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  3076. }
  3077. return err
  3078. })
  3079. }
  3080. func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) {
  3081. db := database.GetDB()
  3082. var traffics []xray.ClientTraffic
  3083. err := db.Model(xray.ClientTraffic{}).Where(`email IN(
  3084. SELECT JSON_EXTRACT(client.value, '$.email') as email
  3085. FROM inbounds,
  3086. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  3087. WHERE
  3088. JSON_EXTRACT(client.value, '$.id') in (?)
  3089. )`, id).Find(&traffics).Error
  3090. if err != nil {
  3091. logger.Debug(err)
  3092. return nil, err
  3093. }
  3094. // Reconcile enable flag with client settings per email to avoid stale DB value
  3095. for i := range traffics {
  3096. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  3097. traffics[i].Enable = client.Enable
  3098. traffics[i].UUID = client.ID
  3099. traffics[i].SubId = client.SubID
  3100. }
  3101. }
  3102. return traffics, err
  3103. }
  3104. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  3105. db := database.GetDB()
  3106. inbound := &model.Inbound{}
  3107. traffic = &xray.ClientTraffic{}
  3108. // Search for inbound settings that contain the query
  3109. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  3110. if err != nil {
  3111. if err == gorm.ErrRecordNotFound {
  3112. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  3113. return nil, err
  3114. }
  3115. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  3116. return nil, err
  3117. }
  3118. traffic.InboundId = inbound.Id
  3119. // Unmarshal settings to get clients
  3120. settings := map[string][]model.Client{}
  3121. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  3122. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  3123. return nil, err
  3124. }
  3125. clients := settings["clients"]
  3126. for _, client := range clients {
  3127. if (client.ID == query || client.Password == query) && client.Email != "" {
  3128. traffic.Email = client.Email
  3129. break
  3130. }
  3131. }
  3132. if traffic.Email == "" {
  3133. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  3134. return nil, gorm.ErrRecordNotFound
  3135. }
  3136. // Retrieve ClientTraffic based on the found email
  3137. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  3138. if err != nil {
  3139. if err == gorm.ErrRecordNotFound {
  3140. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  3141. return nil, err
  3142. }
  3143. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  3144. return nil, err
  3145. }
  3146. return traffic, nil
  3147. }
  3148. func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error) {
  3149. db := database.GetDB()
  3150. InboundClientIps := &model.InboundClientIps{}
  3151. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  3152. if err != nil {
  3153. return "", err
  3154. }
  3155. if InboundClientIps.Ips == "" {
  3156. return "", nil
  3157. }
  3158. // Try to parse as new format (with timestamps)
  3159. type IPWithTimestamp struct {
  3160. IP string `json:"ip"`
  3161. Timestamp int64 `json:"timestamp"`
  3162. }
  3163. var ipsWithTime []IPWithTimestamp
  3164. err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
  3165. // If successfully parsed as new format, return with timestamps
  3166. if err == nil && len(ipsWithTime) > 0 {
  3167. return InboundClientIps.Ips, nil
  3168. }
  3169. // Otherwise, assume it's old format (simple string array)
  3170. // Try to parse as simple array and convert to new format
  3171. var oldIps []string
  3172. err = json.Unmarshal([]byte(InboundClientIps.Ips), &oldIps)
  3173. if err == nil && len(oldIps) > 0 {
  3174. // Convert old format to new format with current timestamp
  3175. newIpsWithTime := make([]IPWithTimestamp, len(oldIps))
  3176. for i, ip := range oldIps {
  3177. newIpsWithTime[i] = IPWithTimestamp{
  3178. IP: ip,
  3179. Timestamp: time.Now().Unix(),
  3180. }
  3181. }
  3182. result, _ := json.Marshal(newIpsWithTime)
  3183. return string(result), nil
  3184. }
  3185. // Return as-is if parsing fails
  3186. return InboundClientIps.Ips, nil
  3187. }
  3188. func (s *InboundService) ClearClientIps(clientEmail string) error {
  3189. db := database.GetDB()
  3190. result := db.Model(model.InboundClientIps{}).
  3191. Where("client_email = ?", clientEmail).
  3192. Update("ips", "")
  3193. err := result.Error
  3194. if err != nil {
  3195. return err
  3196. }
  3197. return nil
  3198. }
  3199. func (s *InboundService) SearchInbounds(query string) ([]*model.Inbound, error) {
  3200. db := database.GetDB()
  3201. var inbounds []*model.Inbound
  3202. err := db.Model(model.Inbound{}).Preload("ClientStats").Where("remark like ?", "%"+query+"%").Find(&inbounds).Error
  3203. if err != nil && err != gorm.ErrRecordNotFound {
  3204. return nil, err
  3205. }
  3206. return inbounds, nil
  3207. }
  3208. func (s *InboundService) MigrationRequirements() {
  3209. db := database.GetDB()
  3210. tx := db.Begin()
  3211. var err error
  3212. defer func() {
  3213. if err == nil {
  3214. tx.Commit()
  3215. if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
  3216. logger.Warningf("VACUUM failed: %v", dbErr)
  3217. }
  3218. } else {
  3219. tx.Rollback()
  3220. }
  3221. }()
  3222. // Calculate and backfill all_time from up+down for inbounds and clients
  3223. err = tx.Exec(`
  3224. UPDATE inbounds
  3225. SET all_time = IFNULL(up, 0) + IFNULL(down, 0)
  3226. WHERE IFNULL(all_time, 0) = 0 AND (IFNULL(up, 0) + IFNULL(down, 0)) > 0
  3227. `).Error
  3228. if err != nil {
  3229. return
  3230. }
  3231. err = tx.Exec(`
  3232. UPDATE client_traffics
  3233. SET all_time = IFNULL(up, 0) + IFNULL(down, 0)
  3234. WHERE IFNULL(all_time, 0) = 0 AND (IFNULL(up, 0) + IFNULL(down, 0)) > 0
  3235. `).Error
  3236. if err != nil {
  3237. return
  3238. }
  3239. // Fix inbounds based problems
  3240. var inbounds []*model.Inbound
  3241. err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan"}).Find(&inbounds).Error
  3242. if err != nil && err != gorm.ErrRecordNotFound {
  3243. return
  3244. }
  3245. for inbound_index := range inbounds {
  3246. settings := map[string]any{}
  3247. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  3248. clients, ok := settings["clients"].([]any)
  3249. if ok {
  3250. // Fix Client configuration problems
  3251. var newClients []any
  3252. hasVisionFlow := false
  3253. for client_index := range clients {
  3254. c := clients[client_index].(map[string]any)
  3255. // Add email='' if it is not exists
  3256. if _, ok := c["email"]; !ok {
  3257. c["email"] = ""
  3258. }
  3259. // Convert string tgId to int64
  3260. if _, ok := c["tgId"]; ok {
  3261. var tgId any = c["tgId"]
  3262. if tgIdStr, ok2 := tgId.(string); ok2 {
  3263. tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
  3264. if err == nil {
  3265. c["tgId"] = tgIdInt64
  3266. }
  3267. }
  3268. }
  3269. // Remove "flow": "xtls-rprx-direct"
  3270. if _, ok := c["flow"]; ok {
  3271. if c["flow"] == "xtls-rprx-direct" {
  3272. c["flow"] = ""
  3273. }
  3274. }
  3275. if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
  3276. hasVisionFlow = true
  3277. }
  3278. // Backfill created_at and updated_at
  3279. if _, ok := c["created_at"]; !ok {
  3280. c["created_at"] = time.Now().Unix() * 1000
  3281. }
  3282. c["updated_at"] = time.Now().Unix() * 1000
  3283. newClients = append(newClients, any(c))
  3284. }
  3285. settings["clients"] = newClients
  3286. // Drop orphaned testseed: VLESS-only field, only meaningful when at least
  3287. // one client uses the exact xtls-rprx-vision flow. Older versions saved it
  3288. // for any non-empty flow (including the UDP variant) or kept it after the
  3289. // flow was cleared from the client modal — clean those up here.
  3290. if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
  3291. delete(settings, "testseed")
  3292. }
  3293. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  3294. if err != nil {
  3295. return
  3296. }
  3297. inbounds[inbound_index].Settings = string(modifiedSettings)
  3298. }
  3299. // Add client traffic row for all clients which has email
  3300. modelClients, err := s.GetClients(inbounds[inbound_index])
  3301. if err != nil {
  3302. return
  3303. }
  3304. for _, modelClient := range modelClients {
  3305. if len(modelClient.Email) > 0 {
  3306. var count int64
  3307. tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
  3308. if count == 0 {
  3309. s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
  3310. }
  3311. }
  3312. }
  3313. }
  3314. tx.Save(inbounds)
  3315. // Remove orphaned traffics
  3316. tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
  3317. // Migrate old MultiDomain to External Proxy
  3318. var externalProxy []struct {
  3319. Id int
  3320. Port int
  3321. StreamSettings []byte
  3322. }
  3323. err = tx.Raw(`select id, port, stream_settings
  3324. from inbounds
  3325. WHERE protocol in ('vmess','vless','trojan')
  3326. AND json_extract(stream_settings, '$.security') = 'tls'
  3327. AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`).Scan(&externalProxy).Error
  3328. if err != nil || len(externalProxy) == 0 {
  3329. return
  3330. }
  3331. for _, ep := range externalProxy {
  3332. var reverses any
  3333. var stream map[string]any
  3334. json.Unmarshal(ep.StreamSettings, &stream)
  3335. if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
  3336. if settings, ok := tlsSettings["settings"].(map[string]any); ok {
  3337. if domains, ok := settings["domains"].([]any); ok {
  3338. for _, domain := range domains {
  3339. if domainMap, ok := domain.(map[string]any); ok {
  3340. domainMap["forceTls"] = "same"
  3341. domainMap["port"] = ep.Port
  3342. domainMap["dest"] = domainMap["domain"].(string)
  3343. delete(domainMap, "domain")
  3344. }
  3345. }
  3346. }
  3347. reverses = settings["domains"]
  3348. delete(settings, "domains")
  3349. }
  3350. }
  3351. stream["externalProxy"] = reverses
  3352. newStream, _ := json.MarshalIndent(stream, " ", " ")
  3353. tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
  3354. }
  3355. err = tx.Raw(`UPDATE inbounds
  3356. SET tag = REPLACE(tag, '0.0.0.0:', '')
  3357. WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error
  3358. if err != nil {
  3359. return
  3360. }
  3361. }
  3362. func (s *InboundService) MigrateDB() {
  3363. s.MigrationRequirements()
  3364. s.MigrationRemoveOrphanedTraffics()
  3365. }
  3366. func (s *InboundService) GetOnlineClients() []string {
  3367. return p.GetOnlineClients()
  3368. }
  3369. func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
  3370. if p != nil {
  3371. p.SetNodeOnlineClients(nodeID, emails)
  3372. }
  3373. }
  3374. func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
  3375. if p != nil {
  3376. p.ClearNodeOnlineClients(nodeID)
  3377. }
  3378. }
  3379. func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
  3380. db := database.GetDB()
  3381. var rows []xray.ClientTraffic
  3382. err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
  3383. if err != nil && err != gorm.ErrRecordNotFound {
  3384. return nil, err
  3385. }
  3386. result := make(map[string]int64, len(rows))
  3387. for _, r := range rows {
  3388. result[r.Email] = r.LastOnline
  3389. }
  3390. return result, nil
  3391. }
  3392. func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
  3393. db := database.GetDB()
  3394. // Step 1: Get ClientTraffic records for emails in the input list.
  3395. // Chunked to stay under SQLite's bind-variable limit on huge inputs.
  3396. uniqEmails := uniqueNonEmptyStrings(emails)
  3397. clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
  3398. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  3399. var page []xray.ClientTraffic
  3400. if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
  3401. return nil, nil, err
  3402. }
  3403. clients = append(clients, page...)
  3404. }
  3405. // Step 2: Sort clients by (Up + Down) descending
  3406. sort.Slice(clients, func(i, j int) bool {
  3407. return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
  3408. })
  3409. // Step 3: Extract sorted valid emails and track found ones
  3410. validEmails := make([]string, 0, len(clients))
  3411. found := make(map[string]bool)
  3412. for _, client := range clients {
  3413. validEmails = append(validEmails, client.Email)
  3414. found[client.Email] = true
  3415. }
  3416. // Step 4: Identify emails that were not found in the database
  3417. extraEmails := make([]string, 0)
  3418. for _, email := range emails {
  3419. if !found[email] {
  3420. extraEmails = append(extraEmails, email)
  3421. }
  3422. }
  3423. return validEmails, extraEmails, nil
  3424. }
  3425. func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (bool, error) {
  3426. oldInbound, err := s.GetInbound(inboundId)
  3427. if err != nil {
  3428. logger.Error("Load Old Data Error")
  3429. return false, err
  3430. }
  3431. var settings map[string]any
  3432. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  3433. return false, err
  3434. }
  3435. interfaceClients, ok := settings["clients"].([]any)
  3436. if !ok {
  3437. return false, common.NewError("invalid clients format in inbound settings")
  3438. }
  3439. var newClients []any
  3440. needApiDel := false
  3441. found := false
  3442. for _, client := range interfaceClients {
  3443. c, ok := client.(map[string]any)
  3444. if !ok {
  3445. continue
  3446. }
  3447. if cEmail, ok := c["email"].(string); ok && cEmail == email {
  3448. // matched client, drop it
  3449. found = true
  3450. needApiDel, _ = c["enable"].(bool)
  3451. } else {
  3452. newClients = append(newClients, client)
  3453. }
  3454. }
  3455. if !found {
  3456. return false, common.NewError(fmt.Sprintf("client with email %s not found", email))
  3457. }
  3458. if len(newClients) == 0 {
  3459. return false, common.NewError("no client remained in Inbound")
  3460. }
  3461. settings["clients"] = newClients
  3462. newSettings, err := json.MarshalIndent(settings, "", " ")
  3463. if err != nil {
  3464. return false, err
  3465. }
  3466. oldInbound.Settings = string(newSettings)
  3467. db := database.GetDB()
  3468. // Drop the row and IPs only when this was the last inbound referencing
  3469. // the email — siblings still need the shared accounting state.
  3470. emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
  3471. if err != nil {
  3472. return false, err
  3473. }
  3474. if !emailShared {
  3475. if err := s.DelClientIPs(db, email); err != nil {
  3476. logger.Error("Error in delete client IPs")
  3477. return false, err
  3478. }
  3479. }
  3480. needRestart := false
  3481. // remove stats too
  3482. if len(email) > 0 && !emailShared {
  3483. traffic, err := s.GetClientTrafficByEmail(email)
  3484. if err != nil {
  3485. return false, err
  3486. }
  3487. if traffic != nil {
  3488. if err := s.DelClientStat(db, email); err != nil {
  3489. logger.Error("Delete stats Data Error")
  3490. return false, err
  3491. }
  3492. }
  3493. if needApiDel {
  3494. rt, rterr := s.runtimeFor(oldInbound)
  3495. if rterr != nil {
  3496. if oldInbound.NodeID != nil {
  3497. return false, rterr
  3498. }
  3499. needRestart = true
  3500. } else if oldInbound.NodeID == nil {
  3501. if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil {
  3502. logger.Debug("Client deleted on", rt.Name(), ":", email)
  3503. needRestart = false
  3504. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  3505. logger.Debug("User is already deleted. Nothing to do more...")
  3506. } else {
  3507. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  3508. needRestart = true
  3509. }
  3510. } else {
  3511. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  3512. return false, err1
  3513. }
  3514. }
  3515. }
  3516. }
  3517. return needRestart, db.Save(oldInbound).Error
  3518. }