1
0

inbound.go 106 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897
  1. // Package service provides business logic services for the 3x-ui web panel,
  2. // including inbound/outbound management, user administration, settings, and Xray integration.
  3. package service
  4. import (
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/google/uuid"
  13. "github.com/mhsanaei/3x-ui/v3/database"
  14. "github.com/mhsanaei/3x-ui/v3/database/model"
  15. "github.com/mhsanaei/3x-ui/v3/logger"
  16. "github.com/mhsanaei/3x-ui/v3/util/common"
  17. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  18. "github.com/mhsanaei/3x-ui/v3/xray"
  19. "gorm.io/gorm"
  20. "gorm.io/gorm/clause"
  21. )
  22. type InboundService struct {
  23. xrayApi xray.XrayAPI
  24. }
  25. func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
  26. mgr := runtime.GetManager()
  27. if mgr == nil {
  28. return nil, fmt.Errorf("runtime manager not initialised")
  29. }
  30. return mgr.RuntimeFor(ib.NodeID)
  31. }
  32. type CopyClientsResult struct {
  33. Added []string `json:"added"`
  34. Skipped []string `json:"skipped"`
  35. Errors []string `json:"errors"`
  36. }
  37. // enrichClientStats parses each inbound's clients once, fills in the
  38. // UUID/SubId fields on the preloaded ClientStats, and tops up rows owned by
  39. // a sibling inbound (shared-email mode — the row is keyed on email so it
  40. // only preloads on its owning inbound).
  41. func (s *InboundService) enrichClientStats(db *gorm.DB, inbounds []*model.Inbound) {
  42. if len(inbounds) == 0 {
  43. return
  44. }
  45. clientsByInbound := make([][]model.Client, len(inbounds))
  46. seenByInbound := make([]map[string]struct{}, len(inbounds))
  47. missing := make(map[string]struct{})
  48. for i, inbound := range inbounds {
  49. clients, _ := s.GetClients(inbound)
  50. clientsByInbound[i] = clients
  51. seen := make(map[string]struct{}, len(inbound.ClientStats))
  52. for _, st := range inbound.ClientStats {
  53. if st.Email != "" {
  54. seen[strings.ToLower(st.Email)] = struct{}{}
  55. }
  56. }
  57. seenByInbound[i] = seen
  58. for _, c := range clients {
  59. if c.Email == "" {
  60. continue
  61. }
  62. if _, ok := seen[strings.ToLower(c.Email)]; !ok {
  63. missing[c.Email] = struct{}{}
  64. }
  65. }
  66. }
  67. if len(missing) > 0 {
  68. emails := make([]string, 0, len(missing))
  69. for e := range missing {
  70. emails = append(emails, e)
  71. }
  72. var extra []xray.ClientTraffic
  73. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&extra).Error; err != nil {
  74. logger.Warning("enrichClientStats:", err)
  75. } else {
  76. byEmail := make(map[string]xray.ClientTraffic, len(extra))
  77. for _, st := range extra {
  78. byEmail[strings.ToLower(st.Email)] = st
  79. }
  80. for i, inbound := range inbounds {
  81. for _, c := range clientsByInbound[i] {
  82. if c.Email == "" {
  83. continue
  84. }
  85. key := strings.ToLower(c.Email)
  86. if _, ok := seenByInbound[i][key]; ok {
  87. continue
  88. }
  89. if st, ok := byEmail[key]; ok {
  90. inbound.ClientStats = append(inbound.ClientStats, st)
  91. seenByInbound[i][key] = struct{}{}
  92. }
  93. }
  94. }
  95. }
  96. }
  97. for i, inbound := range inbounds {
  98. clients := clientsByInbound[i]
  99. if len(clients) == 0 || len(inbound.ClientStats) == 0 {
  100. continue
  101. }
  102. cMap := make(map[string]model.Client, len(clients))
  103. for _, c := range clients {
  104. cMap[strings.ToLower(c.Email)] = c
  105. }
  106. for j := range inbound.ClientStats {
  107. email := strings.ToLower(inbound.ClientStats[j].Email)
  108. if c, ok := cMap[email]; ok {
  109. inbound.ClientStats[j].UUID = c.ID
  110. inbound.ClientStats[j].SubId = c.SubID
  111. }
  112. }
  113. }
  114. }
  115. // GetInbounds retrieves all inbounds for a specific user with client stats.
  116. func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
  117. db := database.GetDB()
  118. var inbounds []*model.Inbound
  119. err := db.Model(model.Inbound{}).Preload("ClientStats").Where("user_id = ?", userId).Find(&inbounds).Error
  120. if err != nil && err != gorm.ErrRecordNotFound {
  121. return nil, err
  122. }
  123. s.enrichClientStats(db, inbounds)
  124. return inbounds, nil
  125. }
  126. // GetAllInbounds retrieves all inbounds with client stats.
  127. func (s *InboundService) GetAllInbounds() ([]*model.Inbound, error) {
  128. db := database.GetDB()
  129. var inbounds []*model.Inbound
  130. err := db.Model(model.Inbound{}).Preload("ClientStats").Find(&inbounds).Error
  131. if err != nil && err != gorm.ErrRecordNotFound {
  132. return nil, err
  133. }
  134. s.enrichClientStats(db, inbounds)
  135. return inbounds, nil
  136. }
  137. func (s *InboundService) GetInboundsByTrafficReset(period string) ([]*model.Inbound, error) {
  138. db := database.GetDB()
  139. var inbounds []*model.Inbound
  140. err := db.Model(model.Inbound{}).Where("traffic_reset = ?", period).Find(&inbounds).Error
  141. if err != nil && err != gorm.ErrRecordNotFound {
  142. return nil, err
  143. }
  144. return inbounds, nil
  145. }
  146. func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, error) {
  147. settings := map[string][]model.Client{}
  148. json.Unmarshal([]byte(inbound.Settings), &settings)
  149. if settings == nil {
  150. return nil, fmt.Errorf("setting is null")
  151. }
  152. clients := settings["clients"]
  153. if clients == nil {
  154. return nil, nil
  155. }
  156. return clients, nil
  157. }
  158. func (s *InboundService) getAllEmails() ([]string, error) {
  159. db := database.GetDB()
  160. var emails []string
  161. err := db.Raw(`
  162. SELECT DISTINCT JSON_EXTRACT(client.value, '$.email')
  163. FROM inbounds,
  164. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  165. `).Scan(&emails).Error
  166. if err != nil {
  167. return nil, err
  168. }
  169. return emails, nil
  170. }
  171. // getAllEmailSubIDs returns email→subId. An email seen with two different
  172. // non-empty subIds is locked (mapped to "") so neither identity can claim it.
  173. func (s *InboundService) getAllEmailSubIDs() (map[string]string, error) {
  174. db := database.GetDB()
  175. var rows []struct {
  176. Email string
  177. SubID string
  178. }
  179. err := db.Raw(`
  180. SELECT JSON_EXTRACT(client.value, '$.email') AS email,
  181. JSON_EXTRACT(client.value, '$.subId') AS sub_id
  182. FROM inbounds,
  183. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  184. `).Scan(&rows).Error
  185. if err != nil {
  186. return nil, err
  187. }
  188. result := make(map[string]string, len(rows))
  189. for _, r := range rows {
  190. email := strings.ToLower(strings.Trim(r.Email, "\""))
  191. if email == "" {
  192. continue
  193. }
  194. subID := strings.Trim(r.SubID, "\"")
  195. if existing, ok := result[email]; ok {
  196. if existing != subID {
  197. result[email] = ""
  198. }
  199. continue
  200. }
  201. result[email] = subID
  202. }
  203. return result, nil
  204. }
  205. func lowerAll(in []string) []string {
  206. out := make([]string, len(in))
  207. for i, s := range in {
  208. out[i] = strings.ToLower(s)
  209. }
  210. return out
  211. }
  212. // emailUsedByOtherInbounds reports whether email lives in any inbound other
  213. // than exceptInboundId. Empty email returns false.
  214. func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId int) (bool, error) {
  215. if email == "" {
  216. return false, nil
  217. }
  218. db := database.GetDB()
  219. var count int64
  220. err := db.Raw(`
  221. SELECT COUNT(*)
  222. FROM inbounds,
  223. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  224. WHERE inbounds.id != ?
  225. AND LOWER(JSON_EXTRACT(client.value, '$.email')) = LOWER(?)
  226. `, exceptInboundId, email).Scan(&count).Error
  227. if err != nil {
  228. return false, err
  229. }
  230. return count > 0, nil
  231. }
  232. // checkEmailsExistForClients validates a batch of incoming clients. An email
  233. // collides only when the existing holder has a different (or empty) subId —
  234. // matching non-empty subIds let multiple inbounds share one identity.
  235. func (s *InboundService) checkEmailsExistForClients(clients []model.Client) (string, error) {
  236. emailSubIDs, err := s.getAllEmailSubIDs()
  237. if err != nil {
  238. return "", err
  239. }
  240. seen := make(map[string]string, len(clients))
  241. for _, client := range clients {
  242. if client.Email == "" {
  243. continue
  244. }
  245. key := strings.ToLower(client.Email)
  246. // Within the same payload, the same email must carry the same subId;
  247. // otherwise we would silently merge two distinct identities.
  248. if prev, ok := seen[key]; ok {
  249. if prev != client.SubID || client.SubID == "" {
  250. return client.Email, nil
  251. }
  252. continue
  253. }
  254. seen[key] = client.SubID
  255. if existingSub, ok := emailSubIDs[key]; ok {
  256. if client.SubID == "" || existingSub == "" || existingSub != client.SubID {
  257. return client.Email, nil
  258. }
  259. }
  260. }
  261. return "", nil
  262. }
  263. // AddInbound creates a new inbound configuration.
  264. // It validates port uniqueness, client email uniqueness, and required fields,
  265. // then saves the inbound to the database and optionally adds it to the running Xray instance.
  266. // Returns the created inbound, whether Xray needs restart, and any error.
  267. func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
  268. exist, err := s.checkPortConflict(inbound, 0)
  269. if err != nil {
  270. return inbound, false, err
  271. }
  272. if exist {
  273. return inbound, false, common.NewError("Port already exists:", inbound.Port)
  274. }
  275. inbound.Tag, err = s.resolveInboundTag(inbound, 0)
  276. if err != nil {
  277. return inbound, false, err
  278. }
  279. clients, err := s.GetClients(inbound)
  280. if err != nil {
  281. return inbound, false, err
  282. }
  283. existEmail, err := s.checkEmailsExistForClients(clients)
  284. if err != nil {
  285. return inbound, false, err
  286. }
  287. if existEmail != "" {
  288. return inbound, false, common.NewError("Duplicate email:", existEmail)
  289. }
  290. // Ensure created_at and updated_at on clients in settings
  291. if len(clients) > 0 {
  292. var settings map[string]any
  293. if err2 := json.Unmarshal([]byte(inbound.Settings), &settings); err2 == nil && settings != nil {
  294. now := time.Now().Unix() * 1000
  295. updatedClients := make([]model.Client, 0, len(clients))
  296. for _, c := range clients {
  297. if c.CreatedAt == 0 {
  298. c.CreatedAt = now
  299. }
  300. c.UpdatedAt = now
  301. updatedClients = append(updatedClients, c)
  302. }
  303. settings["clients"] = updatedClients
  304. if bs, err3 := json.MarshalIndent(settings, "", " "); err3 == nil {
  305. inbound.Settings = string(bs)
  306. } else {
  307. logger.Debug("Unable to marshal inbound settings with timestamps:", err3)
  308. }
  309. } else if err2 != nil {
  310. logger.Debug("Unable to parse inbound settings for timestamps:", err2)
  311. }
  312. }
  313. // Secure client ID
  314. for _, client := range clients {
  315. switch inbound.Protocol {
  316. case "trojan":
  317. if client.Password == "" {
  318. return inbound, false, common.NewError("empty client ID")
  319. }
  320. case "shadowsocks":
  321. if client.Email == "" {
  322. return inbound, false, common.NewError("empty client ID")
  323. }
  324. case "hysteria", "hysteria2":
  325. if client.Auth == "" {
  326. return inbound, false, common.NewError("empty client ID")
  327. }
  328. default:
  329. if client.ID == "" {
  330. return inbound, false, common.NewError("empty client ID")
  331. }
  332. }
  333. }
  334. db := database.GetDB()
  335. tx := db.Begin()
  336. defer func() {
  337. if err == nil {
  338. tx.Commit()
  339. } else {
  340. tx.Rollback()
  341. }
  342. }()
  343. err = tx.Save(inbound).Error
  344. if err == nil {
  345. if len(inbound.ClientStats) == 0 {
  346. for _, client := range clients {
  347. s.AddClientStat(tx, inbound.Id, &client)
  348. }
  349. }
  350. } else {
  351. return inbound, false, err
  352. }
  353. needRestart := false
  354. if inbound.Enable {
  355. rt, rterr := s.runtimeFor(inbound)
  356. if rterr != nil {
  357. err = rterr
  358. return inbound, false, err
  359. }
  360. if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil {
  361. logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag)
  362. } else {
  363. logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
  364. if inbound.NodeID != nil {
  365. err = err1
  366. return inbound, false, err
  367. }
  368. needRestart = true
  369. }
  370. }
  371. return inbound, needRestart, err
  372. }
  373. func (s *InboundService) DelInbound(id int) (bool, error) {
  374. db := database.GetDB()
  375. needRestart := false
  376. var ib model.Inbound
  377. loadErr := db.Model(model.Inbound{}).Where("id = ? and enable = ?", id, true).First(&ib).Error
  378. if loadErr == nil {
  379. rt, rterr := s.runtimeFor(&ib)
  380. if rterr != nil {
  381. logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
  382. if ib.NodeID == nil {
  383. needRestart = true
  384. }
  385. } else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil {
  386. logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag)
  387. } else {
  388. logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1)
  389. if ib.NodeID == nil {
  390. needRestart = true
  391. }
  392. }
  393. } else {
  394. logger.Debug("No enabled inbound found to remove by api, id:", id)
  395. }
  396. // Delete client traffics of inbounds
  397. err := db.Where("inbound_id = ?", id).Delete(xray.ClientTraffic{}).Error
  398. if err != nil {
  399. return false, err
  400. }
  401. inbound, err := s.GetInbound(id)
  402. if err != nil {
  403. return false, err
  404. }
  405. clients, err := s.GetClients(inbound)
  406. if err != nil {
  407. return false, err
  408. }
  409. // Bulk-delete client IPs for every email in this inbound. The previous
  410. // per-client loop fired one DELETE per row — at 7k+ clients that meant
  411. // thousands of synchronous SQL roundtrips and a multi-second freeze.
  412. // Chunked to stay under SQLite's bind-variable limit on huge inbounds.
  413. if len(clients) > 0 {
  414. emails := make([]string, 0, len(clients))
  415. for i := range clients {
  416. if clients[i].Email != "" {
  417. emails = append(emails, clients[i].Email)
  418. }
  419. }
  420. for _, batch := range chunkStrings(uniqueNonEmptyStrings(emails), sqliteMaxVars) {
  421. if err := db.Where("client_email IN ?", batch).
  422. Delete(model.InboundClientIps{}).Error; err != nil {
  423. return false, err
  424. }
  425. }
  426. }
  427. return needRestart, db.Delete(model.Inbound{}, id).Error
  428. }
  429. func (s *InboundService) GetInbound(id int) (*model.Inbound, error) {
  430. db := database.GetDB()
  431. inbound := &model.Inbound{}
  432. err := db.Model(model.Inbound{}).First(inbound, id).Error
  433. if err != nil {
  434. return nil, err
  435. }
  436. return inbound, nil
  437. }
  438. // SetInboundEnable toggles only the enable flag of an inbound, without
  439. // rewriting the (potentially multi-MB) settings JSON. Used by the UI's
  440. // per-row enable switch — for inbounds with thousands of clients the full
  441. // UpdateInbound path is an order of magnitude too slow for an interactive
  442. // toggle (parses + reserialises every client, runs O(N) traffic diff).
  443. //
  444. // Returns (needRestart, error). needRestart is true when the xray runtime
  445. // could not be re-synced from the cached config and a full restart is
  446. // required to pick up the change.
  447. func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
  448. inbound, err := s.GetInbound(id)
  449. if err != nil {
  450. return false, err
  451. }
  452. if inbound.Enable == enable {
  453. return false, nil
  454. }
  455. db := database.GetDB()
  456. if err := db.Model(model.Inbound{}).Where("id = ?", id).
  457. Update("enable", enable).Error; err != nil {
  458. return false, err
  459. }
  460. inbound.Enable = enable
  461. needRestart := false
  462. rt, rterr := s.runtimeFor(inbound)
  463. if rterr != nil {
  464. if inbound.NodeID != nil {
  465. return false, rterr
  466. }
  467. return true, nil
  468. }
  469. if err := rt.DelInbound(context.Background(), inbound); err != nil &&
  470. !strings.Contains(err.Error(), "not found") {
  471. logger.Debug("SetInboundEnable: DelInbound on", rt.Name(), "failed:", err)
  472. needRestart = true
  473. }
  474. if !enable {
  475. return needRestart, nil
  476. }
  477. addTarget := inbound
  478. if inbound.NodeID == nil {
  479. runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
  480. if err != nil {
  481. logger.Debug("SetInboundEnable: build runtime config failed:", err)
  482. return true, nil
  483. }
  484. addTarget = runtimeInbound
  485. }
  486. if err := rt.AddInbound(context.Background(), addTarget); err != nil {
  487. logger.Debug("SetInboundEnable: AddInbound on", rt.Name(), "failed:", err)
  488. if inbound.NodeID != nil {
  489. return false, err
  490. }
  491. needRestart = true
  492. }
  493. return needRestart, nil
  494. }
  495. func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
  496. exist, err := s.checkPortConflict(inbound, inbound.Id)
  497. if err != nil {
  498. return inbound, false, err
  499. }
  500. if exist {
  501. return inbound, false, common.NewError("Port already exists:", inbound.Port)
  502. }
  503. oldInbound, err := s.GetInbound(inbound.Id)
  504. if err != nil {
  505. return inbound, false, err
  506. }
  507. tag := oldInbound.Tag
  508. db := database.GetDB()
  509. tx := db.Begin()
  510. defer func() {
  511. if err != nil {
  512. tx.Rollback()
  513. } else {
  514. tx.Commit()
  515. }
  516. }()
  517. err = s.updateClientTraffics(tx, oldInbound, inbound)
  518. if err != nil {
  519. return inbound, false, err
  520. }
  521. // Ensure created_at and updated_at exist in inbound.Settings clients
  522. {
  523. var oldSettings map[string]any
  524. _ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  525. emailToCreated := map[string]int64{}
  526. emailToUpdated := map[string]int64{}
  527. if oldSettings != nil {
  528. if oc, ok := oldSettings["clients"].([]any); ok {
  529. for _, it := range oc {
  530. if m, ok2 := it.(map[string]any); ok2 {
  531. if email, ok3 := m["email"].(string); ok3 {
  532. switch v := m["created_at"].(type) {
  533. case float64:
  534. emailToCreated[email] = int64(v)
  535. case int64:
  536. emailToCreated[email] = v
  537. }
  538. switch v := m["updated_at"].(type) {
  539. case float64:
  540. emailToUpdated[email] = int64(v)
  541. case int64:
  542. emailToUpdated[email] = v
  543. }
  544. }
  545. }
  546. }
  547. }
  548. }
  549. var newSettings map[string]any
  550. if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil {
  551. now := time.Now().Unix() * 1000
  552. if nSlice, ok := newSettings["clients"].([]any); ok {
  553. for i := range nSlice {
  554. if m, ok2 := nSlice[i].(map[string]any); ok2 {
  555. email, _ := m["email"].(string)
  556. if _, ok3 := m["created_at"]; !ok3 {
  557. if v, ok4 := emailToCreated[email]; ok4 && v > 0 {
  558. m["created_at"] = v
  559. } else {
  560. m["created_at"] = now
  561. }
  562. }
  563. // Preserve client's updated_at if present; do not bump on parent inbound update
  564. if _, hasUpdated := m["updated_at"]; !hasUpdated {
  565. if v, ok4 := emailToUpdated[email]; ok4 && v > 0 {
  566. m["updated_at"] = v
  567. }
  568. }
  569. nSlice[i] = m
  570. }
  571. }
  572. newSettings["clients"] = nSlice
  573. if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil {
  574. inbound.Settings = string(bs)
  575. }
  576. }
  577. }
  578. }
  579. oldInbound.Total = inbound.Total
  580. oldInbound.Remark = inbound.Remark
  581. oldInbound.Enable = inbound.Enable
  582. oldInbound.ExpiryTime = inbound.ExpiryTime
  583. oldInbound.TrafficReset = inbound.TrafficReset
  584. oldInbound.Listen = inbound.Listen
  585. oldInbound.Port = inbound.Port
  586. oldInbound.Protocol = inbound.Protocol
  587. oldInbound.Settings = inbound.Settings
  588. oldInbound.StreamSettings = inbound.StreamSettings
  589. oldInbound.Sniffing = inbound.Sniffing
  590. oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id)
  591. if err != nil {
  592. return inbound, false, err
  593. }
  594. needRestart := false
  595. rt, rterr := s.runtimeFor(oldInbound)
  596. if rterr != nil {
  597. if oldInbound.NodeID != nil {
  598. err = rterr
  599. return inbound, false, err
  600. }
  601. needRestart = true
  602. } else {
  603. oldSnapshot := *oldInbound
  604. oldSnapshot.Tag = tag
  605. if oldInbound.NodeID == nil {
  606. if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
  607. logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
  608. }
  609. if inbound.Enable {
  610. runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound)
  611. if err2 != nil {
  612. logger.Debug("Unable to prepare runtime inbound config:", err2)
  613. needRestart = true
  614. } else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil {
  615. logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag)
  616. } else {
  617. logger.Debug("Unable to update inbound on", rt.Name(), ":", err2)
  618. needRestart = true
  619. }
  620. }
  621. } else {
  622. if !inbound.Enable {
  623. if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
  624. err = err2
  625. return inbound, false, err
  626. }
  627. } else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
  628. err = err2
  629. return inbound, false, err
  630. }
  631. }
  632. }
  633. return inbound, needRestart, tx.Save(oldInbound).Error
  634. }
  635. func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) {
  636. if inbound == nil {
  637. return nil, fmt.Errorf("inbound is nil")
  638. }
  639. runtimeInbound := *inbound
  640. settings := map[string]any{}
  641. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  642. return nil, err
  643. }
  644. clients, ok := settings["clients"].([]any)
  645. if !ok {
  646. return &runtimeInbound, nil
  647. }
  648. var clientStats []xray.ClientTraffic
  649. err := tx.Model(xray.ClientTraffic{}).
  650. Where("inbound_id = ?", inbound.Id).
  651. Select("email", "enable").
  652. Find(&clientStats).Error
  653. if err != nil {
  654. return nil, err
  655. }
  656. enableMap := make(map[string]bool, len(clientStats))
  657. for _, clientTraffic := range clientStats {
  658. enableMap[clientTraffic.Email] = clientTraffic.Enable
  659. }
  660. finalClients := make([]any, 0, len(clients))
  661. for _, client := range clients {
  662. c, ok := client.(map[string]any)
  663. if !ok {
  664. continue
  665. }
  666. email, _ := c["email"].(string)
  667. if enable, exists := enableMap[email]; exists && !enable {
  668. continue
  669. }
  670. if manualEnable, ok := c["enable"].(bool); ok && !manualEnable {
  671. continue
  672. }
  673. finalClients = append(finalClients, c)
  674. }
  675. settings["clients"] = finalClients
  676. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  677. if err != nil {
  678. return nil, err
  679. }
  680. runtimeInbound.Settings = string(modifiedSettings)
  681. return &runtimeInbound, nil
  682. }
  683. // updateClientTraffics syncs the ClientTraffic rows with the inbound's clients
  684. // list: removes rows for emails that disappeared, inserts rows for newly-added
  685. // emails. Uses sets for O(N) lookup — the previous nested-loop implementation
  686. // was O(N²) and degraded into multi-second pauses on inbounds with thousands
  687. // of clients (toggling, saving, or deleting any such inbound felt frozen).
  688. func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error {
  689. oldClients, err := s.GetClients(oldInbound)
  690. if err != nil {
  691. return err
  692. }
  693. newClients, err := s.GetClients(newInbound)
  694. if err != nil {
  695. return err
  696. }
  697. // Email is the unique key for ClientTraffic rows. Clients without an
  698. // email have no stats row to sync — skip them on both sides instead of
  699. // risking a unique-constraint hit or accidental delete of an unrelated row.
  700. oldEmails := make(map[string]struct{}, len(oldClients))
  701. for i := range oldClients {
  702. if oldClients[i].Email == "" {
  703. continue
  704. }
  705. oldEmails[oldClients[i].Email] = struct{}{}
  706. }
  707. newEmails := make(map[string]struct{}, len(newClients))
  708. for i := range newClients {
  709. if newClients[i].Email == "" {
  710. continue
  711. }
  712. newEmails[newClients[i].Email] = struct{}{}
  713. }
  714. // Drop stats rows for removed emails — but not when a sibling inbound
  715. // still references the email, since the row is the shared accumulator.
  716. for i := range oldClients {
  717. email := oldClients[i].Email
  718. if email == "" {
  719. continue
  720. }
  721. if _, kept := newEmails[email]; kept {
  722. continue
  723. }
  724. stillUsed, err := s.emailUsedByOtherInbounds(email, oldInbound.Id)
  725. if err != nil {
  726. return err
  727. }
  728. if stillUsed {
  729. continue
  730. }
  731. if err := s.DelClientStat(tx, email); err != nil {
  732. return err
  733. }
  734. }
  735. for i := range newClients {
  736. email := newClients[i].Email
  737. if email == "" {
  738. continue
  739. }
  740. if _, existed := oldEmails[email]; existed {
  741. if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil {
  742. return err
  743. }
  744. continue
  745. }
  746. if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
  747. return err
  748. }
  749. }
  750. return nil
  751. }
  752. func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) {
  753. clients, err := s.GetClients(data)
  754. if err != nil {
  755. return false, err
  756. }
  757. var settings map[string]any
  758. err = json.Unmarshal([]byte(data.Settings), &settings)
  759. if err != nil {
  760. return false, err
  761. }
  762. interfaceClients := settings["clients"].([]any)
  763. // Add timestamps for new clients being appended
  764. nowTs := time.Now().Unix() * 1000
  765. for i := range interfaceClients {
  766. if cm, ok := interfaceClients[i].(map[string]any); ok {
  767. if _, ok2 := cm["created_at"]; !ok2 {
  768. cm["created_at"] = nowTs
  769. }
  770. cm["updated_at"] = nowTs
  771. interfaceClients[i] = cm
  772. }
  773. }
  774. existEmail, err := s.checkEmailsExistForClients(clients)
  775. if err != nil {
  776. return false, err
  777. }
  778. if existEmail != "" {
  779. return false, common.NewError("Duplicate email:", existEmail)
  780. }
  781. oldInbound, err := s.GetInbound(data.Id)
  782. if err != nil {
  783. return false, err
  784. }
  785. // Secure client ID
  786. for _, client := range clients {
  787. switch oldInbound.Protocol {
  788. case "trojan":
  789. if client.Password == "" {
  790. return false, common.NewError("empty client ID")
  791. }
  792. case "shadowsocks":
  793. if client.Email == "" {
  794. return false, common.NewError("empty client ID")
  795. }
  796. case "hysteria", "hysteria2":
  797. if client.Auth == "" {
  798. return false, common.NewError("empty client ID")
  799. }
  800. default:
  801. if client.ID == "" {
  802. return false, common.NewError("empty client ID")
  803. }
  804. }
  805. }
  806. var oldSettings map[string]any
  807. err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  808. if err != nil {
  809. return false, err
  810. }
  811. oldClients := oldSettings["clients"].([]any)
  812. oldClients = append(oldClients, interfaceClients...)
  813. oldSettings["clients"] = oldClients
  814. newSettings, err := json.MarshalIndent(oldSettings, "", " ")
  815. if err != nil {
  816. return false, err
  817. }
  818. oldInbound.Settings = string(newSettings)
  819. db := database.GetDB()
  820. tx := db.Begin()
  821. defer func() {
  822. if err != nil {
  823. tx.Rollback()
  824. } else {
  825. tx.Commit()
  826. }
  827. }()
  828. needRestart := false
  829. rt, rterr := s.runtimeFor(oldInbound)
  830. if rterr != nil {
  831. if oldInbound.NodeID != nil {
  832. err = rterr
  833. return false, err
  834. }
  835. needRestart = true
  836. } else if oldInbound.NodeID == nil {
  837. for _, client := range clients {
  838. if len(client.Email) == 0 {
  839. needRestart = true
  840. continue
  841. }
  842. s.AddClientStat(tx, data.Id, &client)
  843. if !client.Enable {
  844. continue
  845. }
  846. cipher := ""
  847. if oldInbound.Protocol == "shadowsocks" {
  848. cipher = oldSettings["method"].(string)
  849. }
  850. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  851. "email": client.Email,
  852. "id": client.ID,
  853. "auth": client.Auth,
  854. "security": client.Security,
  855. "flow": client.Flow,
  856. "password": client.Password,
  857. "cipher": cipher,
  858. })
  859. if err1 == nil {
  860. logger.Debug("Client added on", rt.Name(), ":", client.Email)
  861. } else {
  862. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  863. needRestart = true
  864. }
  865. }
  866. } else {
  867. for _, client := range clients {
  868. if len(client.Email) > 0 {
  869. s.AddClientStat(tx, data.Id, &client)
  870. }
  871. }
  872. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  873. err = err1
  874. return false, err
  875. }
  876. }
  877. return needRestart, tx.Save(oldInbound).Error
  878. }
  879. func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string {
  880. switch protocol {
  881. case model.Trojan:
  882. return client.Password
  883. case model.Shadowsocks:
  884. return client.Email
  885. case model.Hysteria:
  886. return client.Auth
  887. default:
  888. return client.ID
  889. }
  890. }
  891. func (s *InboundService) writeBackClientSubID(sourceInboundID int, sourceProtocol model.Protocol, client model.Client, subID string) (bool, error) {
  892. client.SubID = subID
  893. client.UpdatedAt = time.Now().UnixMilli()
  894. clientID := s.getClientPrimaryKey(sourceProtocol, client)
  895. if clientID == "" {
  896. return false, common.NewError("empty client ID")
  897. }
  898. settingsBytes, err := json.Marshal(map[string][]model.Client{
  899. "clients": {client},
  900. })
  901. if err != nil {
  902. return false, err
  903. }
  904. updatePayload := &model.Inbound{
  905. Id: sourceInboundID,
  906. Settings: string(settingsBytes),
  907. }
  908. return s.UpdateInboundClient(updatePayload, clientID)
  909. }
  910. func (s *InboundService) generateRandomCredential(targetProtocol model.Protocol) string {
  911. switch targetProtocol {
  912. case model.VMESS, model.VLESS:
  913. return uuid.NewString()
  914. default:
  915. return strings.ReplaceAll(uuid.NewString(), "-", "")
  916. }
  917. }
  918. func (s *InboundService) buildTargetClientFromSource(source model.Client, targetProtocol model.Protocol, email string, flow string) (model.Client, error) {
  919. nowTs := time.Now().UnixMilli()
  920. target := source
  921. target.Email = email
  922. target.CreatedAt = nowTs
  923. target.UpdatedAt = nowTs
  924. target.ID = ""
  925. target.Password = ""
  926. target.Auth = ""
  927. target.Flow = ""
  928. switch targetProtocol {
  929. case model.VMESS:
  930. target.ID = s.generateRandomCredential(targetProtocol)
  931. case model.VLESS:
  932. target.ID = s.generateRandomCredential(targetProtocol)
  933. if flow == "xtls-rprx-vision" || flow == "xtls-rprx-vision-udp443" {
  934. target.Flow = flow
  935. }
  936. case model.Trojan, model.Shadowsocks:
  937. target.Password = s.generateRandomCredential(targetProtocol)
  938. case model.Hysteria:
  939. target.Auth = s.generateRandomCredential(targetProtocol)
  940. default:
  941. target.ID = s.generateRandomCredential(targetProtocol)
  942. }
  943. return target, nil
  944. }
  945. func (s *InboundService) nextAvailableCopiedEmail(originalEmail string, targetID int, occupied map[string]struct{}) string {
  946. base := fmt.Sprintf("%s_%d", originalEmail, targetID)
  947. candidate := base
  948. suffix := 0
  949. for {
  950. if _, exists := occupied[strings.ToLower(candidate)]; !exists {
  951. occupied[strings.ToLower(candidate)] = struct{}{}
  952. return candidate
  953. }
  954. suffix++
  955. candidate = fmt.Sprintf("%s_%d", base, suffix)
  956. }
  957. }
  958. func (s *InboundService) CopyInboundClients(targetInboundID int, sourceInboundID int, clientEmails []string, flow string) (*CopyClientsResult, bool, error) {
  959. result := &CopyClientsResult{
  960. Added: []string{},
  961. Skipped: []string{},
  962. Errors: []string{},
  963. }
  964. if targetInboundID == sourceInboundID {
  965. return result, false, common.NewError("source and target inbounds must be different")
  966. }
  967. targetInbound, err := s.GetInbound(targetInboundID)
  968. if err != nil {
  969. return result, false, err
  970. }
  971. sourceInbound, err := s.GetInbound(sourceInboundID)
  972. if err != nil {
  973. return result, false, err
  974. }
  975. sourceClients, err := s.GetClients(sourceInbound)
  976. if err != nil {
  977. return result, false, err
  978. }
  979. if len(sourceClients) == 0 {
  980. return result, false, nil
  981. }
  982. allowedEmails := map[string]struct{}{}
  983. if len(clientEmails) > 0 {
  984. for _, email := range clientEmails {
  985. allowedEmails[strings.ToLower(strings.TrimSpace(email))] = struct{}{}
  986. }
  987. }
  988. occupiedEmails := map[string]struct{}{}
  989. allEmails, err := s.getAllEmails()
  990. if err != nil {
  991. return result, false, err
  992. }
  993. for _, email := range allEmails {
  994. clean := strings.Trim(email, "\"")
  995. if clean != "" {
  996. occupiedEmails[strings.ToLower(clean)] = struct{}{}
  997. }
  998. }
  999. newClients := make([]model.Client, 0)
  1000. needRestart := false
  1001. for _, sourceClient := range sourceClients {
  1002. originalEmail := strings.TrimSpace(sourceClient.Email)
  1003. if originalEmail == "" {
  1004. continue
  1005. }
  1006. if len(allowedEmails) > 0 {
  1007. if _, ok := allowedEmails[strings.ToLower(originalEmail)]; !ok {
  1008. continue
  1009. }
  1010. }
  1011. if sourceClient.SubID == "" {
  1012. newSubID := uuid.NewString()
  1013. subNeedRestart, subErr := s.writeBackClientSubID(sourceInbound.Id, sourceInbound.Protocol, sourceClient, newSubID)
  1014. if subErr != nil {
  1015. result.Errors = append(result.Errors, fmt.Sprintf("%s: failed to write source subId: %v", originalEmail, subErr))
  1016. continue
  1017. }
  1018. if subNeedRestart {
  1019. needRestart = true
  1020. }
  1021. sourceClient.SubID = newSubID
  1022. }
  1023. targetEmail := s.nextAvailableCopiedEmail(originalEmail, targetInboundID, occupiedEmails)
  1024. targetClient, buildErr := s.buildTargetClientFromSource(sourceClient, targetInbound.Protocol, targetEmail, flow)
  1025. if buildErr != nil {
  1026. result.Errors = append(result.Errors, fmt.Sprintf("%s: %v", originalEmail, buildErr))
  1027. continue
  1028. }
  1029. newClients = append(newClients, targetClient)
  1030. result.Added = append(result.Added, targetEmail)
  1031. }
  1032. if len(newClients) == 0 {
  1033. return result, needRestart, nil
  1034. }
  1035. settingsPayload, err := json.Marshal(map[string][]model.Client{
  1036. "clients": newClients,
  1037. })
  1038. if err != nil {
  1039. return result, needRestart, err
  1040. }
  1041. addNeedRestart, err := s.AddInboundClient(&model.Inbound{
  1042. Id: targetInboundID,
  1043. Settings: string(settingsPayload),
  1044. })
  1045. if err != nil {
  1046. return result, needRestart, err
  1047. }
  1048. if addNeedRestart {
  1049. needRestart = true
  1050. }
  1051. return result, needRestart, nil
  1052. }
  1053. func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool, error) {
  1054. oldInbound, err := s.GetInbound(inboundId)
  1055. if err != nil {
  1056. logger.Error("Load Old Data Error")
  1057. return false, err
  1058. }
  1059. var settings map[string]any
  1060. err = json.Unmarshal([]byte(oldInbound.Settings), &settings)
  1061. if err != nil {
  1062. return false, err
  1063. }
  1064. email := ""
  1065. client_key := "id"
  1066. switch oldInbound.Protocol {
  1067. case "trojan":
  1068. client_key = "password"
  1069. case "shadowsocks":
  1070. client_key = "email"
  1071. case "hysteria", "hysteria2":
  1072. client_key = "auth"
  1073. }
  1074. interfaceClients := settings["clients"].([]any)
  1075. var newClients []any
  1076. needApiDel := false
  1077. clientFound := false
  1078. for _, client := range interfaceClients {
  1079. c := client.(map[string]any)
  1080. c_id := c[client_key].(string)
  1081. if c_id == clientId {
  1082. clientFound = true
  1083. email, _ = c["email"].(string)
  1084. needApiDel, _ = c["enable"].(bool)
  1085. } else {
  1086. newClients = append(newClients, client)
  1087. }
  1088. }
  1089. if !clientFound {
  1090. return false, common.NewError("Client Not Found In Inbound For ID:", clientId)
  1091. }
  1092. if len(newClients) == 0 {
  1093. return false, common.NewError("no client remained in Inbound")
  1094. }
  1095. settings["clients"] = newClients
  1096. newSettings, err := json.MarshalIndent(settings, "", " ")
  1097. if err != nil {
  1098. return false, err
  1099. }
  1100. oldInbound.Settings = string(newSettings)
  1101. db := database.GetDB()
  1102. // Keep the client_traffics row and IPs alive when another inbound still
  1103. // references this email — siblings depend on the shared accounting state.
  1104. emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
  1105. if err != nil {
  1106. return false, err
  1107. }
  1108. if !emailShared {
  1109. err = s.DelClientIPs(db, email)
  1110. if err != nil {
  1111. logger.Error("Error in delete client IPs")
  1112. return false, err
  1113. }
  1114. }
  1115. needRestart := false
  1116. if len(email) > 0 {
  1117. notDepleted := true
  1118. err = db.Model(xray.ClientTraffic{}).Select("enable").Where("email = ?", email).First(&notDepleted).Error
  1119. if err != nil {
  1120. logger.Error("Get stats error")
  1121. return false, err
  1122. }
  1123. if !emailShared {
  1124. err = s.DelClientStat(db, email)
  1125. if err != nil {
  1126. logger.Error("Delete stats Data Error")
  1127. return false, err
  1128. }
  1129. }
  1130. if needApiDel && notDepleted {
  1131. rt, rterr := s.runtimeFor(oldInbound)
  1132. if rterr != nil {
  1133. if oldInbound.NodeID != nil {
  1134. return false, rterr
  1135. }
  1136. needRestart = true
  1137. } else if oldInbound.NodeID == nil {
  1138. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  1139. if err1 == nil {
  1140. logger.Debug("Client deleted on", rt.Name(), ":", email)
  1141. needRestart = false
  1142. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  1143. logger.Debug("User is already deleted. Nothing to do more...")
  1144. } else {
  1145. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  1146. needRestart = true
  1147. }
  1148. } else {
  1149. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  1150. return false, err1
  1151. }
  1152. }
  1153. }
  1154. }
  1155. return needRestart, db.Save(oldInbound).Error
  1156. }
  1157. func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) {
  1158. // TODO: check if TrafficReset field is updating
  1159. clients, err := s.GetClients(data)
  1160. if err != nil {
  1161. return false, err
  1162. }
  1163. var settings map[string]any
  1164. err = json.Unmarshal([]byte(data.Settings), &settings)
  1165. if err != nil {
  1166. return false, err
  1167. }
  1168. interfaceClients := settings["clients"].([]any)
  1169. oldInbound, err := s.GetInbound(data.Id)
  1170. if err != nil {
  1171. return false, err
  1172. }
  1173. oldClients, err := s.GetClients(oldInbound)
  1174. if err != nil {
  1175. return false, err
  1176. }
  1177. oldEmail := ""
  1178. newClientId := ""
  1179. clientIndex := -1
  1180. for index, oldClient := range oldClients {
  1181. oldClientId := ""
  1182. switch oldInbound.Protocol {
  1183. case "trojan":
  1184. oldClientId = oldClient.Password
  1185. newClientId = clients[0].Password
  1186. case "shadowsocks":
  1187. oldClientId = oldClient.Email
  1188. newClientId = clients[0].Email
  1189. case "hysteria", "hysteria2":
  1190. oldClientId = oldClient.Auth
  1191. newClientId = clients[0].Auth
  1192. default:
  1193. oldClientId = oldClient.ID
  1194. newClientId = clients[0].ID
  1195. }
  1196. if clientId == oldClientId {
  1197. oldEmail = oldClient.Email
  1198. clientIndex = index
  1199. break
  1200. }
  1201. }
  1202. // Validate new client ID
  1203. if newClientId == "" || clientIndex == -1 {
  1204. return false, common.NewError("empty client ID")
  1205. }
  1206. if len(clients[0].Email) > 0 && clients[0].Email != oldEmail {
  1207. existEmail, err := s.checkEmailsExistForClients(clients)
  1208. if err != nil {
  1209. return false, err
  1210. }
  1211. if existEmail != "" {
  1212. return false, common.NewError("Duplicate email:", existEmail)
  1213. }
  1214. }
  1215. var oldSettings map[string]any
  1216. err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
  1217. if err != nil {
  1218. return false, err
  1219. }
  1220. settingsClients := oldSettings["clients"].([]any)
  1221. // Preserve created_at and set updated_at for the replacing client
  1222. var preservedCreated any
  1223. if clientIndex >= 0 && clientIndex < len(settingsClients) {
  1224. if oldMap, ok := settingsClients[clientIndex].(map[string]any); ok {
  1225. if v, ok2 := oldMap["created_at"]; ok2 {
  1226. preservedCreated = v
  1227. }
  1228. }
  1229. }
  1230. if len(interfaceClients) > 0 {
  1231. if newMap, ok := interfaceClients[0].(map[string]any); ok {
  1232. if preservedCreated == nil {
  1233. preservedCreated = time.Now().Unix() * 1000
  1234. }
  1235. newMap["created_at"] = preservedCreated
  1236. newMap["updated_at"] = time.Now().Unix() * 1000
  1237. interfaceClients[0] = newMap
  1238. }
  1239. }
  1240. settingsClients[clientIndex] = interfaceClients[0]
  1241. oldSettings["clients"] = settingsClients
  1242. // testseed is only meaningful when at least one VLESS client uses the exact
  1243. // xtls-rprx-vision flow. The client-edit path only rewrites a single client,
  1244. // so re-check the flow set here and strip a stale testseed when nothing in the
  1245. // inbound still warrants it. The full-inbound update path already handles this
  1246. // on the JS side via VLESSSettings.toJson().
  1247. if oldInbound.Protocol == model.VLESS {
  1248. hasVisionFlow := false
  1249. for _, c := range settingsClients {
  1250. cm, ok := c.(map[string]any)
  1251. if !ok {
  1252. continue
  1253. }
  1254. if flow, _ := cm["flow"].(string); flow == "xtls-rprx-vision" {
  1255. hasVisionFlow = true
  1256. break
  1257. }
  1258. }
  1259. if !hasVisionFlow {
  1260. delete(oldSettings, "testseed")
  1261. }
  1262. }
  1263. newSettings, err := json.MarshalIndent(oldSettings, "", " ")
  1264. if err != nil {
  1265. return false, err
  1266. }
  1267. oldInbound.Settings = string(newSettings)
  1268. db := database.GetDB()
  1269. tx := db.Begin()
  1270. defer func() {
  1271. if err != nil {
  1272. tx.Rollback()
  1273. } else {
  1274. tx.Commit()
  1275. }
  1276. }()
  1277. if len(clients[0].Email) > 0 {
  1278. if len(oldEmail) > 0 {
  1279. // Repointing onto an email that already has a row would collide on
  1280. // the unique constraint, so retire the donor and let the surviving
  1281. // row carry the merged identity.
  1282. emailUnchanged := strings.EqualFold(oldEmail, clients[0].Email)
  1283. targetExists := int64(0)
  1284. if !emailUnchanged {
  1285. if err = tx.Model(xray.ClientTraffic{}).Where("email = ?", clients[0].Email).Count(&targetExists).Error; err != nil {
  1286. return false, err
  1287. }
  1288. }
  1289. if emailUnchanged || targetExists == 0 {
  1290. err = s.UpdateClientStat(tx, oldEmail, &clients[0])
  1291. if err != nil {
  1292. return false, err
  1293. }
  1294. err = s.UpdateClientIPs(tx, oldEmail, clients[0].Email)
  1295. if err != nil {
  1296. return false, err
  1297. }
  1298. } else {
  1299. stillUsed, sErr := s.emailUsedByOtherInbounds(oldEmail, data.Id)
  1300. if sErr != nil {
  1301. return false, sErr
  1302. }
  1303. if !stillUsed {
  1304. if err = s.DelClientStat(tx, oldEmail); err != nil {
  1305. return false, err
  1306. }
  1307. if err = s.DelClientIPs(tx, oldEmail); err != nil {
  1308. return false, err
  1309. }
  1310. }
  1311. // Refresh the surviving row with the new client's limits/expiry.
  1312. if err = s.UpdateClientStat(tx, clients[0].Email, &clients[0]); err != nil {
  1313. return false, err
  1314. }
  1315. }
  1316. } else {
  1317. s.AddClientStat(tx, data.Id, &clients[0])
  1318. }
  1319. } else {
  1320. stillUsed, err := s.emailUsedByOtherInbounds(oldEmail, data.Id)
  1321. if err != nil {
  1322. return false, err
  1323. }
  1324. if !stillUsed {
  1325. err = s.DelClientStat(tx, oldEmail)
  1326. if err != nil {
  1327. return false, err
  1328. }
  1329. err = s.DelClientIPs(tx, oldEmail)
  1330. if err != nil {
  1331. return false, err
  1332. }
  1333. }
  1334. }
  1335. needRestart := false
  1336. if len(oldEmail) > 0 {
  1337. rt, rterr := s.runtimeFor(oldInbound)
  1338. if rterr != nil {
  1339. if oldInbound.NodeID != nil {
  1340. err = rterr
  1341. return false, err
  1342. }
  1343. needRestart = true
  1344. } else if oldInbound.NodeID == nil {
  1345. if oldClients[clientIndex].Enable {
  1346. err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
  1347. if err1 == nil {
  1348. logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail)
  1349. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) {
  1350. logger.Debug("User is already deleted. Nothing to do more...")
  1351. } else {
  1352. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  1353. needRestart = true
  1354. }
  1355. }
  1356. if clients[0].Enable {
  1357. cipher := ""
  1358. if oldInbound.Protocol == "shadowsocks" {
  1359. cipher = oldSettings["method"].(string)
  1360. }
  1361. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  1362. "email": clients[0].Email,
  1363. "id": clients[0].ID,
  1364. "security": clients[0].Security,
  1365. "flow": clients[0].Flow,
  1366. "auth": clients[0].Auth,
  1367. "password": clients[0].Password,
  1368. "cipher": cipher,
  1369. })
  1370. if err1 == nil {
  1371. logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email)
  1372. } else {
  1373. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  1374. needRestart = true
  1375. }
  1376. }
  1377. } else {
  1378. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  1379. err = err1
  1380. return false, err
  1381. }
  1382. }
  1383. } else {
  1384. logger.Debug("Client old email not found")
  1385. needRestart = true
  1386. }
  1387. return needRestart, tx.Save(oldInbound).Error
  1388. }
  1389. const resetGracePeriodMs int64 = 30000
  1390. func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
  1391. var structuralChange bool
  1392. err := submitTrafficWrite(func() error {
  1393. var inner error
  1394. structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
  1395. return inner
  1396. })
  1397. return structuralChange, err
  1398. }
  1399. func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
  1400. if snap == nil || nodeID <= 0 {
  1401. return false, nil
  1402. }
  1403. db := database.GetDB()
  1404. now := time.Now().UnixMilli()
  1405. var central []model.Inbound
  1406. if err := db.Model(model.Inbound{}).
  1407. Where("node_id = ?", nodeID).
  1408. Find(&central).Error; err != nil {
  1409. return false, err
  1410. }
  1411. tagToCentral := make(map[string]*model.Inbound, len(central))
  1412. for i := range central {
  1413. tagToCentral[central[i].Tag] = &central[i]
  1414. }
  1415. var centralClientStats []xray.ClientTraffic
  1416. if len(central) > 0 {
  1417. ids := make([]int, 0, len(central))
  1418. for i := range central {
  1419. ids = append(ids, central[i].Id)
  1420. }
  1421. if err := db.Model(xray.ClientTraffic{}).
  1422. Where("inbound_id IN ?", ids).
  1423. Find(&centralClientStats).Error; err != nil {
  1424. return false, err
  1425. }
  1426. }
  1427. type csKey struct {
  1428. inboundID int
  1429. email string
  1430. }
  1431. centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
  1432. for i := range centralClientStats {
  1433. centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = &centralClientStats[i]
  1434. }
  1435. var defaultUserId int
  1436. if len(central) > 0 {
  1437. defaultUserId = central[0].UserId
  1438. } else {
  1439. var u model.User
  1440. if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
  1441. defaultUserId = u.Id
  1442. } else {
  1443. defaultUserId = 1
  1444. }
  1445. }
  1446. tx := db.Begin()
  1447. committed := false
  1448. defer func() {
  1449. if !committed {
  1450. tx.Rollback()
  1451. }
  1452. }()
  1453. structuralChange := false
  1454. snapTags := make(map[string]struct{}, len(snap.Inbounds))
  1455. for _, snapIb := range snap.Inbounds {
  1456. if snapIb == nil {
  1457. continue
  1458. }
  1459. snapTags[snapIb.Tag] = struct{}{}
  1460. c, ok := tagToCentral[snapIb.Tag]
  1461. if !ok {
  1462. newIb := model.Inbound{
  1463. UserId: defaultUserId,
  1464. NodeID: &nodeID,
  1465. Tag: snapIb.Tag,
  1466. Listen: snapIb.Listen,
  1467. Port: snapIb.Port,
  1468. Protocol: snapIb.Protocol,
  1469. Settings: snapIb.Settings,
  1470. StreamSettings: snapIb.StreamSettings,
  1471. Sniffing: snapIb.Sniffing,
  1472. TrafficReset: snapIb.TrafficReset,
  1473. Enable: snapIb.Enable,
  1474. Remark: snapIb.Remark,
  1475. Total: snapIb.Total,
  1476. ExpiryTime: snapIb.ExpiryTime,
  1477. Up: snapIb.Up,
  1478. Down: snapIb.Down,
  1479. AllTime: snapIb.AllTime,
  1480. }
  1481. if err := tx.Create(&newIb).Error; err != nil {
  1482. logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err)
  1483. continue
  1484. }
  1485. tagToCentral[snapIb.Tag] = &newIb
  1486. structuralChange = true
  1487. continue
  1488. }
  1489. inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
  1490. updates := map[string]any{
  1491. "enable": snapIb.Enable,
  1492. "remark": snapIb.Remark,
  1493. "listen": snapIb.Listen,
  1494. "port": snapIb.Port,
  1495. "protocol": snapIb.Protocol,
  1496. "total": snapIb.Total,
  1497. "expiry_time": snapIb.ExpiryTime,
  1498. "settings": snapIb.Settings,
  1499. "stream_settings": snapIb.StreamSettings,
  1500. "sniffing": snapIb.Sniffing,
  1501. "traffic_reset": snapIb.TrafficReset,
  1502. }
  1503. if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
  1504. updates["up"] = snapIb.Up
  1505. updates["down"] = snapIb.Down
  1506. }
  1507. if snapIb.AllTime > c.AllTime {
  1508. updates["all_time"] = snapIb.AllTime
  1509. }
  1510. if c.Settings != snapIb.Settings ||
  1511. c.Remark != snapIb.Remark ||
  1512. c.Listen != snapIb.Listen ||
  1513. c.Port != snapIb.Port ||
  1514. c.Total != snapIb.Total ||
  1515. c.ExpiryTime != snapIb.ExpiryTime ||
  1516. c.Enable != snapIb.Enable {
  1517. structuralChange = true
  1518. }
  1519. if err := tx.Model(model.Inbound{}).
  1520. Where("id = ?", c.Id).
  1521. Updates(updates).Error; err != nil {
  1522. return false, err
  1523. }
  1524. }
  1525. for _, c := range central {
  1526. if _, kept := snapTags[c.Tag]; kept {
  1527. continue
  1528. }
  1529. if err := tx.Where("inbound_id = ?", c.Id).
  1530. Delete(&xray.ClientTraffic{}).Error; err != nil {
  1531. return false, err
  1532. }
  1533. if err := tx.Where("id = ?", c.Id).
  1534. Delete(&model.Inbound{}).Error; err != nil {
  1535. return false, err
  1536. }
  1537. delete(tagToCentral, c.Tag)
  1538. structuralChange = true
  1539. }
  1540. for _, snapIb := range snap.Inbounds {
  1541. if snapIb == nil {
  1542. continue
  1543. }
  1544. c, ok := tagToCentral[snapIb.Tag]
  1545. if !ok {
  1546. continue
  1547. }
  1548. inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
  1549. snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
  1550. for _, cs := range snapIb.ClientStats {
  1551. snapEmails[cs.Email] = struct{}{}
  1552. existing := centralCS[csKey{c.Id, cs.Email}]
  1553. if existing == nil {
  1554. if err := tx.Create(&xray.ClientTraffic{
  1555. InboundId: c.Id,
  1556. Email: cs.Email,
  1557. Enable: cs.Enable,
  1558. Total: cs.Total,
  1559. ExpiryTime: cs.ExpiryTime,
  1560. Reset: cs.Reset,
  1561. Up: cs.Up,
  1562. Down: cs.Down,
  1563. AllTime: cs.AllTime,
  1564. LastOnline: cs.LastOnline,
  1565. }).Error; err != nil {
  1566. return false, err
  1567. }
  1568. structuralChange = true
  1569. continue
  1570. }
  1571. if existing.Enable != cs.Enable ||
  1572. existing.Total != cs.Total ||
  1573. existing.ExpiryTime != cs.ExpiryTime ||
  1574. existing.Reset != cs.Reset {
  1575. structuralChange = true
  1576. }
  1577. allTime := existing.AllTime
  1578. if cs.AllTime > allTime {
  1579. allTime = cs.AllTime
  1580. }
  1581. if inGrace && cs.Up+cs.Down > 0 {
  1582. if err := tx.Exec(
  1583. `UPDATE client_traffics
  1584. SET enable = ?, total = ?, expiry_time = ?, reset = ?, all_time = ?
  1585. WHERE inbound_id = ? AND email = ?`,
  1586. cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime, c.Id, cs.Email,
  1587. ).Error; err != nil {
  1588. return false, err
  1589. }
  1590. continue
  1591. }
  1592. if err := tx.Exec(
  1593. `UPDATE client_traffics
  1594. SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
  1595. all_time = ?, last_online = MAX(last_online, ?)
  1596. WHERE inbound_id = ? AND email = ?`,
  1597. cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, allTime,
  1598. cs.LastOnline, c.Id, cs.Email,
  1599. ).Error; err != nil {
  1600. return false, err
  1601. }
  1602. }
  1603. for k, existing := range centralCS {
  1604. if k.inboundID != c.Id {
  1605. continue
  1606. }
  1607. if _, kept := snapEmails[k.email]; kept {
  1608. continue
  1609. }
  1610. if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
  1611. Delete(&xray.ClientTraffic{}).Error; err != nil {
  1612. return false, err
  1613. }
  1614. structuralChange = true
  1615. }
  1616. }
  1617. if err := tx.Commit().Error; err != nil {
  1618. return false, err
  1619. }
  1620. committed = true
  1621. if p != nil {
  1622. p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
  1623. }
  1624. return structuralChange, nil
  1625. }
  1626. func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
  1627. err = submitTrafficWrite(func() error {
  1628. var inner error
  1629. needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
  1630. return inner
  1631. })
  1632. return
  1633. }
  1634. func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
  1635. var err error
  1636. db := database.GetDB()
  1637. tx := db.Begin()
  1638. defer func() {
  1639. if err != nil {
  1640. tx.Rollback()
  1641. } else {
  1642. tx.Commit()
  1643. }
  1644. }()
  1645. err = s.addInboundTraffic(tx, inboundTraffics)
  1646. if err != nil {
  1647. return false, false, err
  1648. }
  1649. err = s.addClientTraffic(tx, clientTraffics)
  1650. if err != nil {
  1651. return false, false, err
  1652. }
  1653. needRestart0, count, err := s.autoRenewClients(tx)
  1654. if err != nil {
  1655. logger.Warning("Error in renew clients:", err)
  1656. } else if count > 0 {
  1657. logger.Debugf("%v clients renewed", count)
  1658. }
  1659. disabledClientsCount := int64(0)
  1660. needRestart1, count, err := s.disableInvalidClients(tx)
  1661. if err != nil {
  1662. logger.Warning("Error in disabling invalid clients:", err)
  1663. } else if count > 0 {
  1664. logger.Debugf("%v clients disabled", count)
  1665. disabledClientsCount = count
  1666. }
  1667. needRestart2, count, err := s.disableInvalidInbounds(tx)
  1668. if err != nil {
  1669. logger.Warning("Error in disabling invalid inbounds:", err)
  1670. } else if count > 0 {
  1671. logger.Debugf("%v inbounds disabled", count)
  1672. }
  1673. return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
  1674. }
  1675. func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  1676. if len(traffics) == 0 {
  1677. return nil
  1678. }
  1679. var err error
  1680. for _, traffic := range traffics {
  1681. if traffic.IsInbound {
  1682. err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
  1683. Updates(map[string]any{
  1684. "up": gorm.Expr("up + ?", traffic.Up),
  1685. "down": gorm.Expr("down + ?", traffic.Down),
  1686. "all_time": gorm.Expr("COALESCE(all_time, 0) + ?", traffic.Up+traffic.Down),
  1687. }).Error
  1688. if err != nil {
  1689. return err
  1690. }
  1691. }
  1692. }
  1693. return nil
  1694. }
  1695. func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
  1696. if len(traffics) == 0 {
  1697. // Empty onlineUsers
  1698. if p != nil {
  1699. p.SetOnlineClients(make([]string, 0))
  1700. }
  1701. return nil
  1702. }
  1703. onlineClients := make([]string, 0)
  1704. emails := make([]string, 0, len(traffics))
  1705. for _, traffic := range traffics {
  1706. emails = append(emails, traffic.Email)
  1707. }
  1708. dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
  1709. err = tx.Model(xray.ClientTraffic{}).
  1710. Where("email IN (?) AND inbound_id IN (?)", emails,
  1711. tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1712. Find(&dbClientTraffics).Error
  1713. if err != nil {
  1714. return err
  1715. }
  1716. // Avoid empty slice error
  1717. if len(dbClientTraffics) == 0 {
  1718. return nil
  1719. }
  1720. dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
  1721. if err != nil {
  1722. return err
  1723. }
  1724. // Index by email for O(N) merge — the previous nested loop was O(N²)
  1725. // and dominated each cron tick on inbounds with thousands of active
  1726. // clients (7500 × 7500 = 56M string comparisons every 10 seconds).
  1727. trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
  1728. for i := range traffics {
  1729. if traffics[i] != nil {
  1730. trafficByEmail[traffics[i].Email] = traffics[i]
  1731. }
  1732. }
  1733. now := time.Now().UnixMilli()
  1734. for dbTraffic_index := range dbClientTraffics {
  1735. t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
  1736. if !ok {
  1737. continue
  1738. }
  1739. dbClientTraffics[dbTraffic_index].Up += t.Up
  1740. dbClientTraffics[dbTraffic_index].Down += t.Down
  1741. dbClientTraffics[dbTraffic_index].AllTime += t.Up + t.Down
  1742. if t.Up+t.Down > 0 {
  1743. onlineClients = append(onlineClients, t.Email)
  1744. dbClientTraffics[dbTraffic_index].LastOnline = now
  1745. }
  1746. }
  1747. // Set onlineUsers
  1748. p.SetOnlineClients(onlineClients)
  1749. err = tx.Save(dbClientTraffics).Error
  1750. if err != nil {
  1751. logger.Warning("AddClientTraffic update data ", err)
  1752. }
  1753. return nil
  1754. }
  1755. func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
  1756. inboundIds := make([]int, 0, len(dbClientTraffics))
  1757. for _, dbClientTraffic := range dbClientTraffics {
  1758. if dbClientTraffic.ExpiryTime < 0 {
  1759. inboundIds = append(inboundIds, dbClientTraffic.InboundId)
  1760. }
  1761. }
  1762. if len(inboundIds) > 0 {
  1763. var inbounds []*model.Inbound
  1764. err := tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
  1765. if err != nil {
  1766. return nil, err
  1767. }
  1768. for inbound_index := range inbounds {
  1769. settings := map[string]any{}
  1770. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  1771. clients, ok := settings["clients"].([]any)
  1772. if ok {
  1773. var newClients []any
  1774. for client_index := range clients {
  1775. c := clients[client_index].(map[string]any)
  1776. for traffic_index := range dbClientTraffics {
  1777. if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email {
  1778. oldExpiryTime := c["expiryTime"].(float64)
  1779. newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime)
  1780. c["expiryTime"] = newExpiryTime
  1781. c["updated_at"] = time.Now().Unix() * 1000
  1782. dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime
  1783. break
  1784. }
  1785. }
  1786. // Backfill created_at and updated_at
  1787. if _, ok := c["created_at"]; !ok {
  1788. c["created_at"] = time.Now().Unix() * 1000
  1789. }
  1790. c["updated_at"] = time.Now().Unix() * 1000
  1791. newClients = append(newClients, any(c))
  1792. }
  1793. settings["clients"] = newClients
  1794. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  1795. if err != nil {
  1796. return nil, err
  1797. }
  1798. inbounds[inbound_index].Settings = string(modifiedSettings)
  1799. }
  1800. }
  1801. err = tx.Save(inbounds).Error
  1802. if err != nil {
  1803. logger.Warning("AddClientTraffic update inbounds ", err)
  1804. logger.Error(inbounds)
  1805. }
  1806. }
  1807. return dbClientTraffics, nil
  1808. }
  1809. func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
  1810. // check for time expired
  1811. var traffics []*xray.ClientTraffic
  1812. now := time.Now().Unix() * 1000
  1813. var err, err1 error
  1814. err = tx.Model(xray.ClientTraffic{}).
  1815. Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
  1816. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1817. Find(&traffics).Error
  1818. if err != nil {
  1819. return false, 0, err
  1820. }
  1821. // return if there is no client to renew
  1822. if len(traffics) == 0 {
  1823. return false, 0, nil
  1824. }
  1825. var inbound_ids []int
  1826. var inbounds []*model.Inbound
  1827. needRestart := false
  1828. var clientsToAdd []struct {
  1829. protocol string
  1830. tag string
  1831. client map[string]any
  1832. }
  1833. for _, traffic := range traffics {
  1834. inbound_ids = append(inbound_ids, traffic.InboundId)
  1835. }
  1836. // Dedupe so an inbound hosting N expired clients is fetched and saved once
  1837. // per tick instead of N times across chunk boundaries.
  1838. inbound_ids = uniqueInts(inbound_ids)
  1839. // Chunked to stay under SQLite's bind-variable limit when many inbounds
  1840. // are touched in a single tick.
  1841. for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
  1842. var page []*model.Inbound
  1843. if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
  1844. return false, 0, err
  1845. }
  1846. inbounds = append(inbounds, page...)
  1847. }
  1848. for inbound_index := range inbounds {
  1849. settings := map[string]any{}
  1850. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  1851. clients := settings["clients"].([]any)
  1852. for client_index := range clients {
  1853. c := clients[client_index].(map[string]any)
  1854. for traffic_index, traffic := range traffics {
  1855. if traffic.Email == c["email"].(string) {
  1856. newExpiryTime := traffic.ExpiryTime
  1857. for newExpiryTime < now {
  1858. newExpiryTime += (int64(traffic.Reset) * 86400000)
  1859. }
  1860. c["expiryTime"] = newExpiryTime
  1861. traffics[traffic_index].ExpiryTime = newExpiryTime
  1862. traffics[traffic_index].Down = 0
  1863. traffics[traffic_index].Up = 0
  1864. if !traffic.Enable {
  1865. traffics[traffic_index].Enable = true
  1866. c["enable"] = true
  1867. clientsToAdd = append(clientsToAdd,
  1868. struct {
  1869. protocol string
  1870. tag string
  1871. client map[string]any
  1872. }{
  1873. protocol: string(inbounds[inbound_index].Protocol),
  1874. tag: inbounds[inbound_index].Tag,
  1875. client: c,
  1876. })
  1877. }
  1878. clients[client_index] = any(c)
  1879. break
  1880. }
  1881. }
  1882. }
  1883. settings["clients"] = clients
  1884. newSettings, err := json.MarshalIndent(settings, "", " ")
  1885. if err != nil {
  1886. return false, 0, err
  1887. }
  1888. inbounds[inbound_index].Settings = string(newSettings)
  1889. }
  1890. err = tx.Save(inbounds).Error
  1891. if err != nil {
  1892. return false, 0, err
  1893. }
  1894. err = tx.Save(traffics).Error
  1895. if err != nil {
  1896. return false, 0, err
  1897. }
  1898. if p != nil {
  1899. err1 = s.xrayApi.Init(p.GetAPIPort())
  1900. if err1 != nil {
  1901. return true, int64(len(traffics)), nil
  1902. }
  1903. for _, clientToAdd := range clientsToAdd {
  1904. err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
  1905. if err1 != nil {
  1906. needRestart = true
  1907. }
  1908. }
  1909. s.xrayApi.Close()
  1910. }
  1911. return needRestart, int64(len(traffics)), nil
  1912. }
  1913. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  1914. now := time.Now().Unix() * 1000
  1915. needRestart := false
  1916. if p != nil {
  1917. var tags []string
  1918. err := tx.Table("inbounds").
  1919. Select("inbounds.tag").
  1920. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  1921. Scan(&tags).Error
  1922. if err != nil {
  1923. return false, 0, err
  1924. }
  1925. s.xrayApi.Init(p.GetAPIPort())
  1926. for _, tag := range tags {
  1927. err1 := s.xrayApi.DelInbound(tag)
  1928. if err1 == nil {
  1929. logger.Debug("Inbound disabled by api:", tag)
  1930. } else {
  1931. logger.Debug("Error in disabling inbound by api:", err1)
  1932. needRestart = true
  1933. }
  1934. }
  1935. s.xrayApi.Close()
  1936. }
  1937. result := tx.Model(model.Inbound{}).
  1938. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  1939. Update("enable", false)
  1940. err := result.Error
  1941. count := result.RowsAffected
  1942. return needRestart, count, err
  1943. }
  1944. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
  1945. now := time.Now().Unix() * 1000
  1946. needRestart := false
  1947. var depletedRows []xray.ClientTraffic
  1948. err := tx.Model(xray.ClientTraffic{}).
  1949. Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
  1950. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  1951. Find(&depletedRows).Error
  1952. if err != nil {
  1953. return false, 0, err
  1954. }
  1955. if len(depletedRows) == 0 {
  1956. return false, 0, nil
  1957. }
  1958. rowByEmail := make(map[string]*xray.ClientTraffic, len(depletedRows))
  1959. depletedEmails := make([]string, 0, len(depletedRows))
  1960. for i := range depletedRows {
  1961. if depletedRows[i].Email == "" {
  1962. continue
  1963. }
  1964. rowByEmail[strings.ToLower(depletedRows[i].Email)] = &depletedRows[i]
  1965. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  1966. }
  1967. // Resolve inbound membership only for the depleted emails — pushing the
  1968. // filter into SQLite avoids dragging every panel client through Go for
  1969. // the common case where most clients are healthy.
  1970. var memberships []struct {
  1971. InboundId int
  1972. Tag string
  1973. Email string
  1974. SubID string `gorm:"column:sub_id"`
  1975. }
  1976. if len(depletedEmails) > 0 {
  1977. err = tx.Raw(`
  1978. SELECT inbounds.id AS inbound_id,
  1979. inbounds.tag AS tag,
  1980. JSON_EXTRACT(client.value, '$.email') AS email,
  1981. JSON_EXTRACT(client.value, '$.subId') AS sub_id
  1982. FROM inbounds,
  1983. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  1984. WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
  1985. `, lowerAll(depletedEmails)).Scan(&memberships).Error
  1986. if err != nil {
  1987. return false, 0, err
  1988. }
  1989. }
  1990. // Discover the row holder's subId per email. Only siblings sharing it
  1991. // get cascaded; legacy data where two identities reuse the same email
  1992. // stays isolated to the row owner.
  1993. holderSub := make(map[string]string, len(rowByEmail))
  1994. for _, m := range memberships {
  1995. email := strings.ToLower(strings.Trim(m.Email, "\""))
  1996. row, ok := rowByEmail[email]
  1997. if !ok || m.InboundId != row.InboundId {
  1998. continue
  1999. }
  2000. holderSub[email] = strings.Trim(m.SubID, "\"")
  2001. }
  2002. type target struct {
  2003. InboundId int
  2004. Tag string
  2005. Email string
  2006. }
  2007. var targets []target
  2008. for _, m := range memberships {
  2009. email := strings.ToLower(strings.Trim(m.Email, "\""))
  2010. row, ok := rowByEmail[email]
  2011. if !ok {
  2012. continue
  2013. }
  2014. expected, hasSub := holderSub[email]
  2015. mSub := strings.Trim(m.SubID, "\"")
  2016. switch {
  2017. case !hasSub || expected == "":
  2018. if m.InboundId != row.InboundId {
  2019. continue
  2020. }
  2021. case mSub != expected:
  2022. continue
  2023. }
  2024. targets = append(targets, target{
  2025. InboundId: m.InboundId,
  2026. Tag: m.Tag,
  2027. Email: strings.Trim(m.Email, "\""),
  2028. })
  2029. }
  2030. if p != nil && len(targets) > 0 {
  2031. s.xrayApi.Init(p.GetAPIPort())
  2032. for _, t := range targets {
  2033. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  2034. if err1 == nil {
  2035. logger.Debug("Client disabled by api:", t.Email)
  2036. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  2037. logger.Debug("User is already disabled. Nothing to do more...")
  2038. } else {
  2039. logger.Debug("Error in disabling client by api:", err1)
  2040. needRestart = true
  2041. }
  2042. }
  2043. s.xrayApi.Close()
  2044. }
  2045. result := tx.Model(xray.ClientTraffic{}).
  2046. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
  2047. Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
  2048. Update("enable", false)
  2049. err = result.Error
  2050. count := result.RowsAffected
  2051. if err != nil {
  2052. return needRestart, count, err
  2053. }
  2054. if len(targets) == 0 {
  2055. return needRestart, count, nil
  2056. }
  2057. inboundEmailMap := make(map[int]map[string]struct{})
  2058. for _, t := range targets {
  2059. if inboundEmailMap[t.InboundId] == nil {
  2060. inboundEmailMap[t.InboundId] = make(map[string]struct{})
  2061. }
  2062. inboundEmailMap[t.InboundId][t.Email] = struct{}{}
  2063. }
  2064. inboundIds := make([]int, 0, len(inboundEmailMap))
  2065. for id := range inboundEmailMap {
  2066. inboundIds = append(inboundIds, id)
  2067. }
  2068. var inbounds []*model.Inbound
  2069. if err = tx.Model(model.Inbound{}).Where("id IN ?", inboundIds).Find(&inbounds).Error; err != nil {
  2070. logger.Warning("disableInvalidClients fetch inbounds:", err)
  2071. return needRestart, count, nil
  2072. }
  2073. dirty := make([]*model.Inbound, 0, len(inbounds))
  2074. for _, inbound := range inbounds {
  2075. settings := map[string]any{}
  2076. if jsonErr := json.Unmarshal([]byte(inbound.Settings), &settings); jsonErr != nil {
  2077. continue
  2078. }
  2079. clientsRaw, ok := settings["clients"].([]any)
  2080. if !ok {
  2081. continue
  2082. }
  2083. emailSet := inboundEmailMap[inbound.Id]
  2084. changed := false
  2085. for i := range clientsRaw {
  2086. c, ok := clientsRaw[i].(map[string]any)
  2087. if !ok {
  2088. continue
  2089. }
  2090. email, _ := c["email"].(string)
  2091. if _, shouldDisable := emailSet[email]; !shouldDisable {
  2092. continue
  2093. }
  2094. c["enable"] = false
  2095. if row, ok := rowByEmail[strings.ToLower(email)]; ok {
  2096. c["totalGB"] = row.Total
  2097. c["expiryTime"] = row.ExpiryTime
  2098. }
  2099. c["updated_at"] = now
  2100. clientsRaw[i] = c
  2101. changed = true
  2102. }
  2103. if !changed {
  2104. continue
  2105. }
  2106. settings["clients"] = clientsRaw
  2107. modifiedSettings, jsonErr := json.MarshalIndent(settings, "", " ")
  2108. if jsonErr != nil {
  2109. continue
  2110. }
  2111. inbound.Settings = string(modifiedSettings)
  2112. dirty = append(dirty, inbound)
  2113. }
  2114. if len(dirty) > 0 {
  2115. if err = tx.Save(dirty).Error; err != nil {
  2116. logger.Warning("disableInvalidClients update inbound settings:", err)
  2117. }
  2118. }
  2119. return needRestart, count, nil
  2120. }
  2121. func (s *InboundService) GetInboundTags() (string, error) {
  2122. db := database.GetDB()
  2123. var inboundTags []string
  2124. err := db.Model(model.Inbound{}).Select("tag").Find(&inboundTags).Error
  2125. if err != nil && err != gorm.ErrRecordNotFound {
  2126. return "", err
  2127. }
  2128. tags, _ := json.Marshal(inboundTags)
  2129. return string(tags), nil
  2130. }
  2131. func (s *InboundService) GetClientReverseTags() (string, error) {
  2132. db := database.GetDB()
  2133. var inbounds []model.Inbound
  2134. err := db.Model(model.Inbound{}).Select("settings").Where("protocol = ?", "vless").Find(&inbounds).Error
  2135. if err != nil && err != gorm.ErrRecordNotFound {
  2136. return "[]", err
  2137. }
  2138. tagSet := make(map[string]struct{})
  2139. for _, inbound := range inbounds {
  2140. var settings map[string]any
  2141. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  2142. continue
  2143. }
  2144. clients, ok := settings["clients"].([]any)
  2145. if !ok {
  2146. continue
  2147. }
  2148. for _, client := range clients {
  2149. clientMap, ok := client.(map[string]any)
  2150. if !ok {
  2151. continue
  2152. }
  2153. reverse, ok := clientMap["reverse"].(map[string]any)
  2154. if !ok {
  2155. continue
  2156. }
  2157. tag, _ := reverse["tag"].(string)
  2158. tag = strings.TrimSpace(tag)
  2159. if tag != "" {
  2160. tagSet[tag] = struct{}{}
  2161. }
  2162. }
  2163. }
  2164. rawTags := make([]string, 0, len(tagSet))
  2165. for tag := range tagSet {
  2166. rawTags = append(rawTags, tag)
  2167. }
  2168. sort.Strings(rawTags)
  2169. result, _ := json.Marshal(rawTags)
  2170. return string(result), nil
  2171. }
  2172. func (s *InboundService) MigrationRemoveOrphanedTraffics() {
  2173. db := database.GetDB()
  2174. db.Exec(`
  2175. DELETE FROM client_traffics
  2176. WHERE email NOT IN (
  2177. SELECT JSON_EXTRACT(client.value, '$.email')
  2178. FROM inbounds,
  2179. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  2180. )
  2181. `)
  2182. }
  2183. // AddClientStat inserts a per-client accounting row, no-op on email
  2184. // conflict. Xray reports traffic per email, so the surviving row acts as
  2185. // the shared accumulator for inbounds that re-use the same identity.
  2186. func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
  2187. clientTraffic := xray.ClientTraffic{
  2188. InboundId: inboundId,
  2189. Email: client.Email,
  2190. Total: client.TotalGB,
  2191. ExpiryTime: client.ExpiryTime,
  2192. Enable: client.Enable,
  2193. Reset: client.Reset,
  2194. }
  2195. return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
  2196. Create(&clientTraffic).Error
  2197. }
  2198. func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
  2199. result := tx.Model(xray.ClientTraffic{}).
  2200. Where("email = ?", email).
  2201. Updates(map[string]any{
  2202. "enable": client.Enable,
  2203. "email": client.Email,
  2204. "total": client.TotalGB,
  2205. "expiry_time": client.ExpiryTime,
  2206. "reset": client.Reset,
  2207. })
  2208. err := result.Error
  2209. return err
  2210. }
  2211. func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail string) error {
  2212. return tx.Model(model.InboundClientIps{}).Where("client_email = ?", oldEmail).Update("client_email", newEmail).Error
  2213. }
  2214. func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
  2215. return tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error
  2216. }
  2217. func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
  2218. return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
  2219. }
  2220. func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
  2221. db := database.GetDB()
  2222. var traffics []*xray.ClientTraffic
  2223. err = db.Model(xray.ClientTraffic{}).Where("id = ?", trafficId).Find(&traffics).Error
  2224. if err != nil {
  2225. logger.Warningf("Error retrieving ClientTraffic with trafficId %d: %v", trafficId, err)
  2226. return nil, nil, err
  2227. }
  2228. if len(traffics) > 0 {
  2229. inbound, err = s.GetInbound(traffics[0].InboundId)
  2230. return traffics[0], inbound, err
  2231. }
  2232. return nil, nil, nil
  2233. }
  2234. func (s *InboundService) GetClientInboundByEmail(email string) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
  2235. db := database.GetDB()
  2236. var traffics []*xray.ClientTraffic
  2237. err = db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error
  2238. if err != nil {
  2239. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  2240. return nil, nil, err
  2241. }
  2242. if len(traffics) > 0 {
  2243. inbound, err = s.GetInbound(traffics[0].InboundId)
  2244. return traffics[0], inbound, err
  2245. }
  2246. return nil, nil, nil
  2247. }
  2248. func (s *InboundService) GetClientByEmail(clientEmail string) (*xray.ClientTraffic, *model.Client, error) {
  2249. traffic, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2250. if err != nil {
  2251. return nil, nil, err
  2252. }
  2253. if inbound == nil {
  2254. return nil, nil, common.NewError("Inbound Not Found For Email:", clientEmail)
  2255. }
  2256. clients, err := s.GetClients(inbound)
  2257. if err != nil {
  2258. return nil, nil, err
  2259. }
  2260. for _, client := range clients {
  2261. if client.Email == clientEmail {
  2262. return traffic, &client, nil
  2263. }
  2264. }
  2265. return nil, nil, common.NewError("Client Not Found In Inbound For Email:", clientEmail)
  2266. }
  2267. func (s *InboundService) SetClientTelegramUserID(trafficId int, tgId int64) (bool, error) {
  2268. traffic, inbound, err := s.GetClientInboundByTrafficID(trafficId)
  2269. if err != nil {
  2270. return false, err
  2271. }
  2272. if inbound == nil {
  2273. return false, common.NewError("Inbound Not Found For Traffic ID:", trafficId)
  2274. }
  2275. clientEmail := traffic.Email
  2276. oldClients, err := s.GetClients(inbound)
  2277. if err != nil {
  2278. return false, err
  2279. }
  2280. clientId := ""
  2281. for _, oldClient := range oldClients {
  2282. if oldClient.Email == clientEmail {
  2283. switch inbound.Protocol {
  2284. case "trojan":
  2285. clientId = oldClient.Password
  2286. case "shadowsocks":
  2287. clientId = oldClient.Email
  2288. default:
  2289. clientId = oldClient.ID
  2290. }
  2291. break
  2292. }
  2293. }
  2294. if len(clientId) == 0 {
  2295. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2296. }
  2297. var settings map[string]any
  2298. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2299. if err != nil {
  2300. return false, err
  2301. }
  2302. clients := settings["clients"].([]any)
  2303. var newClients []any
  2304. for client_index := range clients {
  2305. c := clients[client_index].(map[string]any)
  2306. if c["email"] == clientEmail {
  2307. c["tgId"] = tgId
  2308. c["updated_at"] = time.Now().Unix() * 1000
  2309. newClients = append(newClients, any(c))
  2310. }
  2311. }
  2312. settings["clients"] = newClients
  2313. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2314. if err != nil {
  2315. return false, err
  2316. }
  2317. inbound.Settings = string(modifiedSettings)
  2318. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2319. return needRestart, err
  2320. }
  2321. func (s *InboundService) checkIsEnabledByEmail(clientEmail string) (bool, error) {
  2322. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2323. if err != nil {
  2324. return false, err
  2325. }
  2326. if inbound == nil {
  2327. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2328. }
  2329. clients, err := s.GetClients(inbound)
  2330. if err != nil {
  2331. return false, err
  2332. }
  2333. isEnable := false
  2334. for _, client := range clients {
  2335. if client.Email == clientEmail {
  2336. isEnable = client.Enable
  2337. break
  2338. }
  2339. }
  2340. return isEnable, err
  2341. }
  2342. func (s *InboundService) ToggleClientEnableByEmail(clientEmail string) (bool, bool, error) {
  2343. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2344. if err != nil {
  2345. return false, false, err
  2346. }
  2347. if inbound == nil {
  2348. return false, false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2349. }
  2350. oldClients, err := s.GetClients(inbound)
  2351. if err != nil {
  2352. return false, false, err
  2353. }
  2354. clientId := ""
  2355. clientOldEnabled := false
  2356. for _, oldClient := range oldClients {
  2357. if oldClient.Email == clientEmail {
  2358. switch inbound.Protocol {
  2359. case "trojan":
  2360. clientId = oldClient.Password
  2361. case "shadowsocks":
  2362. clientId = oldClient.Email
  2363. default:
  2364. clientId = oldClient.ID
  2365. }
  2366. clientOldEnabled = oldClient.Enable
  2367. break
  2368. }
  2369. }
  2370. if len(clientId) == 0 {
  2371. return false, false, common.NewError("Client Not Found For Email:", clientEmail)
  2372. }
  2373. var settings map[string]any
  2374. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2375. if err != nil {
  2376. return false, false, err
  2377. }
  2378. clients := settings["clients"].([]any)
  2379. var newClients []any
  2380. for client_index := range clients {
  2381. c := clients[client_index].(map[string]any)
  2382. if c["email"] == clientEmail {
  2383. c["enable"] = !clientOldEnabled
  2384. c["updated_at"] = time.Now().Unix() * 1000
  2385. newClients = append(newClients, any(c))
  2386. }
  2387. }
  2388. settings["clients"] = newClients
  2389. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2390. if err != nil {
  2391. return false, false, err
  2392. }
  2393. inbound.Settings = string(modifiedSettings)
  2394. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2395. if err != nil {
  2396. return false, needRestart, err
  2397. }
  2398. return !clientOldEnabled, needRestart, nil
  2399. }
  2400. // SetClientEnableByEmail sets client enable state to desired value; returns (changed, needRestart, error)
  2401. func (s *InboundService) SetClientEnableByEmail(clientEmail string, enable bool) (bool, bool, error) {
  2402. current, err := s.checkIsEnabledByEmail(clientEmail)
  2403. if err != nil {
  2404. return false, false, err
  2405. }
  2406. if current == enable {
  2407. return false, false, nil
  2408. }
  2409. newEnabled, needRestart, err := s.ToggleClientEnableByEmail(clientEmail)
  2410. if err != nil {
  2411. return false, needRestart, err
  2412. }
  2413. return newEnabled == enable, needRestart, nil
  2414. }
  2415. func (s *InboundService) ResetClientIpLimitByEmail(clientEmail string, count int) (bool, error) {
  2416. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2417. if err != nil {
  2418. return false, err
  2419. }
  2420. if inbound == nil {
  2421. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2422. }
  2423. oldClients, err := s.GetClients(inbound)
  2424. if err != nil {
  2425. return false, err
  2426. }
  2427. clientId := ""
  2428. for _, oldClient := range oldClients {
  2429. if oldClient.Email == clientEmail {
  2430. switch inbound.Protocol {
  2431. case "trojan":
  2432. clientId = oldClient.Password
  2433. case "shadowsocks":
  2434. clientId = oldClient.Email
  2435. default:
  2436. clientId = oldClient.ID
  2437. }
  2438. break
  2439. }
  2440. }
  2441. if len(clientId) == 0 {
  2442. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2443. }
  2444. var settings map[string]any
  2445. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2446. if err != nil {
  2447. return false, err
  2448. }
  2449. clients := settings["clients"].([]any)
  2450. var newClients []any
  2451. for client_index := range clients {
  2452. c := clients[client_index].(map[string]any)
  2453. if c["email"] == clientEmail {
  2454. c["limitIp"] = count
  2455. c["updated_at"] = time.Now().Unix() * 1000
  2456. newClients = append(newClients, any(c))
  2457. }
  2458. }
  2459. settings["clients"] = newClients
  2460. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2461. if err != nil {
  2462. return false, err
  2463. }
  2464. inbound.Settings = string(modifiedSettings)
  2465. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2466. return needRestart, err
  2467. }
  2468. func (s *InboundService) ResetClientExpiryTimeByEmail(clientEmail string, expiry_time int64) (bool, error) {
  2469. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2470. if err != nil {
  2471. return false, err
  2472. }
  2473. if inbound == nil {
  2474. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2475. }
  2476. oldClients, err := s.GetClients(inbound)
  2477. if err != nil {
  2478. return false, err
  2479. }
  2480. clientId := ""
  2481. for _, oldClient := range oldClients {
  2482. if oldClient.Email == clientEmail {
  2483. switch inbound.Protocol {
  2484. case "trojan":
  2485. clientId = oldClient.Password
  2486. case "shadowsocks":
  2487. clientId = oldClient.Email
  2488. default:
  2489. clientId = oldClient.ID
  2490. }
  2491. break
  2492. }
  2493. }
  2494. if len(clientId) == 0 {
  2495. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2496. }
  2497. var settings map[string]any
  2498. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2499. if err != nil {
  2500. return false, err
  2501. }
  2502. clients := settings["clients"].([]any)
  2503. var newClients []any
  2504. for client_index := range clients {
  2505. c := clients[client_index].(map[string]any)
  2506. if c["email"] == clientEmail {
  2507. c["expiryTime"] = expiry_time
  2508. c["updated_at"] = time.Now().Unix() * 1000
  2509. newClients = append(newClients, any(c))
  2510. }
  2511. }
  2512. settings["clients"] = newClients
  2513. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2514. if err != nil {
  2515. return false, err
  2516. }
  2517. inbound.Settings = string(modifiedSettings)
  2518. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2519. return needRestart, err
  2520. }
  2521. func (s *InboundService) ResetClientTrafficLimitByEmail(clientEmail string, totalGB int) (bool, error) {
  2522. if totalGB < 0 {
  2523. return false, common.NewError("totalGB must be >= 0")
  2524. }
  2525. _, inbound, err := s.GetClientInboundByEmail(clientEmail)
  2526. if err != nil {
  2527. return false, err
  2528. }
  2529. if inbound == nil {
  2530. return false, common.NewError("Inbound Not Found For Email:", clientEmail)
  2531. }
  2532. oldClients, err := s.GetClients(inbound)
  2533. if err != nil {
  2534. return false, err
  2535. }
  2536. clientId := ""
  2537. for _, oldClient := range oldClients {
  2538. if oldClient.Email == clientEmail {
  2539. switch inbound.Protocol {
  2540. case "trojan":
  2541. clientId = oldClient.Password
  2542. case "shadowsocks":
  2543. clientId = oldClient.Email
  2544. default:
  2545. clientId = oldClient.ID
  2546. }
  2547. break
  2548. }
  2549. }
  2550. if len(clientId) == 0 {
  2551. return false, common.NewError("Client Not Found For Email:", clientEmail)
  2552. }
  2553. var settings map[string]any
  2554. err = json.Unmarshal([]byte(inbound.Settings), &settings)
  2555. if err != nil {
  2556. return false, err
  2557. }
  2558. clients := settings["clients"].([]any)
  2559. var newClients []any
  2560. for client_index := range clients {
  2561. c := clients[client_index].(map[string]any)
  2562. if c["email"] == clientEmail {
  2563. c["totalGB"] = totalGB * 1024 * 1024 * 1024
  2564. c["updated_at"] = time.Now().Unix() * 1000
  2565. newClients = append(newClients, any(c))
  2566. }
  2567. }
  2568. settings["clients"] = newClients
  2569. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  2570. if err != nil {
  2571. return false, err
  2572. }
  2573. inbound.Settings = string(modifiedSettings)
  2574. needRestart, err := s.UpdateInboundClient(inbound, clientId)
  2575. return needRestart, err
  2576. }
  2577. func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
  2578. return submitTrafficWrite(func() error {
  2579. db := database.GetDB()
  2580. return db.Model(xray.ClientTraffic{}).
  2581. Where("email = ?", clientEmail).
  2582. Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
  2583. })
  2584. }
  2585. func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
  2586. err = submitTrafficWrite(func() error {
  2587. var inner error
  2588. needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
  2589. return inner
  2590. })
  2591. return
  2592. }
  2593. func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
  2594. needRestart := false
  2595. traffic, err := s.GetClientTrafficByEmail(clientEmail)
  2596. if err != nil {
  2597. return false, err
  2598. }
  2599. if !traffic.Enable {
  2600. inbound, err := s.GetInbound(id)
  2601. if err != nil {
  2602. return false, err
  2603. }
  2604. clients, err := s.GetClients(inbound)
  2605. if err != nil {
  2606. return false, err
  2607. }
  2608. for _, client := range clients {
  2609. if client.Email == clientEmail && client.Enable {
  2610. rt, rterr := s.runtimeFor(inbound)
  2611. if rterr != nil {
  2612. if inbound.NodeID != nil {
  2613. return false, rterr
  2614. }
  2615. needRestart = true
  2616. break
  2617. }
  2618. cipher := ""
  2619. if string(inbound.Protocol) == "shadowsocks" {
  2620. var oldSettings map[string]any
  2621. err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
  2622. if err != nil {
  2623. return false, err
  2624. }
  2625. cipher = oldSettings["method"].(string)
  2626. }
  2627. err1 := rt.AddUser(context.Background(), inbound, map[string]any{
  2628. "email": client.Email,
  2629. "id": client.ID,
  2630. "auth": client.Auth,
  2631. "security": client.Security,
  2632. "flow": client.Flow,
  2633. "password": client.Password,
  2634. "cipher": cipher,
  2635. })
  2636. if err1 == nil {
  2637. logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
  2638. } else {
  2639. logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
  2640. needRestart = true
  2641. }
  2642. break
  2643. }
  2644. }
  2645. }
  2646. traffic.Up = 0
  2647. traffic.Down = 0
  2648. traffic.Enable = true
  2649. db := database.GetDB()
  2650. err = db.Save(traffic).Error
  2651. if err != nil {
  2652. return false, err
  2653. }
  2654. now := time.Now().UnixMilli()
  2655. _ = db.Model(model.Inbound{}).
  2656. Where("id = ?", id).
  2657. Update("last_traffic_reset_time", now).Error
  2658. inbound, err := s.GetInbound(id)
  2659. if err == nil && inbound != nil && inbound.NodeID != nil {
  2660. if rt, rterr := s.runtimeFor(inbound); rterr == nil {
  2661. if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
  2662. logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
  2663. }
  2664. } else {
  2665. logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
  2666. }
  2667. }
  2668. return needRestart, nil
  2669. }
  2670. func (s *InboundService) ResetAllClientTraffics(id int) error {
  2671. return submitTrafficWrite(func() error {
  2672. return s.resetAllClientTrafficsLocked(id)
  2673. })
  2674. }
  2675. func (s *InboundService) resetAllClientTrafficsLocked(id int) error {
  2676. db := database.GetDB()
  2677. now := time.Now().Unix() * 1000
  2678. if err := db.Transaction(func(tx *gorm.DB) error {
  2679. whereText := "inbound_id "
  2680. if id == -1 {
  2681. whereText += " > ?"
  2682. } else {
  2683. whereText += " = ?"
  2684. }
  2685. // Reset client traffics
  2686. result := tx.Model(xray.ClientTraffic{}).
  2687. Where(whereText, id).
  2688. Updates(map[string]any{"enable": true, "up": 0, "down": 0})
  2689. if result.Error != nil {
  2690. return result.Error
  2691. }
  2692. // Update lastTrafficResetTime for the inbound(s)
  2693. inboundWhereText := "id "
  2694. if id == -1 {
  2695. inboundWhereText += " > ?"
  2696. } else {
  2697. inboundWhereText += " = ?"
  2698. }
  2699. result = tx.Model(model.Inbound{}).
  2700. Where(inboundWhereText, id).
  2701. Update("last_traffic_reset_time", now)
  2702. return result.Error
  2703. }); err != nil {
  2704. return err
  2705. }
  2706. var inbounds []model.Inbound
  2707. q := db.Model(model.Inbound{}).Where("node_id IS NOT NULL")
  2708. if id != -1 {
  2709. q = q.Where("id = ?", id)
  2710. }
  2711. if err := q.Find(&inbounds).Error; err != nil {
  2712. logger.Warning("ResetAllClientTraffics: discover node inbounds failed:", err)
  2713. return nil
  2714. }
  2715. for i := range inbounds {
  2716. ib := &inbounds[i]
  2717. rt, rterr := s.runtimeFor(ib)
  2718. if rterr != nil {
  2719. logger.Warning("ResetAllClientTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
  2720. continue
  2721. }
  2722. if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
  2723. logger.Warning("ResetAllClientTraffics: remote propagation to", rt.Name(), "failed:", e)
  2724. }
  2725. }
  2726. return nil
  2727. }
  2728. func (s *InboundService) ResetAllTraffics() error {
  2729. return submitTrafficWrite(func() error {
  2730. return s.resetAllTrafficsLocked()
  2731. })
  2732. }
  2733. func (s *InboundService) resetAllTrafficsLocked() error {
  2734. db := database.GetDB()
  2735. now := time.Now().UnixMilli()
  2736. if err := db.Model(model.Inbound{}).
  2737. Where("user_id > ?", 0).
  2738. Updates(map[string]any{
  2739. "up": 0,
  2740. "down": 0,
  2741. "last_traffic_reset_time": now,
  2742. }).Error; err != nil {
  2743. return err
  2744. }
  2745. var inbounds []model.Inbound
  2746. if err := db.Model(model.Inbound{}).
  2747. Where("node_id IS NOT NULL").
  2748. Find(&inbounds).Error; err != nil {
  2749. logger.Warning("ResetAllTraffics: discover node inbounds failed:", err)
  2750. return nil
  2751. }
  2752. for i := range inbounds {
  2753. ib := &inbounds[i]
  2754. rt, rterr := s.runtimeFor(ib)
  2755. if rterr != nil {
  2756. logger.Warning("ResetAllTraffics: runtime lookup for inbound", ib.Id, "failed:", rterr)
  2757. continue
  2758. }
  2759. if e := rt.ResetInboundClientTraffics(context.Background(), ib); e != nil {
  2760. logger.Warning("ResetAllTraffics: remote propagation to", rt.Name(), "failed:", e)
  2761. }
  2762. }
  2763. return nil
  2764. }
  2765. func (s *InboundService) ResetInboundTraffic(id int) error {
  2766. return submitTrafficWrite(func() error {
  2767. db := database.GetDB()
  2768. return db.Model(model.Inbound{}).
  2769. Where("id = ?", id).
  2770. Updates(map[string]any{"up": 0, "down": 0}).Error
  2771. })
  2772. }
  2773. func (s *InboundService) DelDepletedClients(id int) (err error) {
  2774. db := database.GetDB()
  2775. tx := db.Begin()
  2776. defer func() {
  2777. if err == nil {
  2778. tx.Commit()
  2779. } else {
  2780. tx.Rollback()
  2781. }
  2782. }()
  2783. // Collect depleted emails globally — a shared-email row owned by one
  2784. // inbound depletes every sibling that lists the email.
  2785. now := time.Now().Unix() * 1000
  2786. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  2787. var depletedRows []xray.ClientTraffic
  2788. err = db.Model(xray.ClientTraffic{}).
  2789. Where(depletedClause, now).
  2790. Find(&depletedRows).Error
  2791. if err != nil {
  2792. return err
  2793. }
  2794. if len(depletedRows) == 0 {
  2795. return nil
  2796. }
  2797. depletedEmails := make(map[string]struct{}, len(depletedRows))
  2798. for _, r := range depletedRows {
  2799. if r.Email == "" {
  2800. continue
  2801. }
  2802. depletedEmails[strings.ToLower(r.Email)] = struct{}{}
  2803. }
  2804. if len(depletedEmails) == 0 {
  2805. return nil
  2806. }
  2807. var inbounds []*model.Inbound
  2808. inboundQuery := db.Model(model.Inbound{})
  2809. if id >= 0 {
  2810. inboundQuery = inboundQuery.Where("id = ?", id)
  2811. }
  2812. if err = inboundQuery.Find(&inbounds).Error; err != nil {
  2813. return err
  2814. }
  2815. for _, inbound := range inbounds {
  2816. var settings map[string]any
  2817. if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  2818. return err
  2819. }
  2820. rawClients, ok := settings["clients"].([]any)
  2821. if !ok {
  2822. continue
  2823. }
  2824. newClients := make([]any, 0, len(rawClients))
  2825. removed := 0
  2826. for _, client := range rawClients {
  2827. c, ok := client.(map[string]any)
  2828. if !ok {
  2829. newClients = append(newClients, client)
  2830. continue
  2831. }
  2832. email, _ := c["email"].(string)
  2833. if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
  2834. removed++
  2835. continue
  2836. }
  2837. newClients = append(newClients, client)
  2838. }
  2839. if removed == 0 {
  2840. continue
  2841. }
  2842. if len(newClients) == 0 {
  2843. s.DelInbound(inbound.Id)
  2844. continue
  2845. }
  2846. settings["clients"] = newClients
  2847. ns, mErr := json.MarshalIndent(settings, "", " ")
  2848. if mErr != nil {
  2849. return mErr
  2850. }
  2851. inbound.Settings = string(ns)
  2852. if err = tx.Save(inbound).Error; err != nil {
  2853. return err
  2854. }
  2855. }
  2856. // Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
  2857. // no out-of-scope inbound still references the email.
  2858. if id < 0 {
  2859. err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
  2860. return err
  2861. }
  2862. emails := make([]string, 0, len(depletedEmails))
  2863. for e := range depletedEmails {
  2864. emails = append(emails, e)
  2865. }
  2866. var stillReferenced []string
  2867. if err = tx.Raw(`
  2868. SELECT DISTINCT LOWER(JSON_EXTRACT(client.value, '$.email'))
  2869. FROM inbounds,
  2870. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  2871. WHERE LOWER(JSON_EXTRACT(client.value, '$.email')) IN ?
  2872. `, emails).Scan(&stillReferenced).Error; err != nil {
  2873. return err
  2874. }
  2875. stillSet := make(map[string]struct{}, len(stillReferenced))
  2876. for _, e := range stillReferenced {
  2877. stillSet[e] = struct{}{}
  2878. }
  2879. toDelete := make([]string, 0, len(emails))
  2880. for _, e := range emails {
  2881. if _, kept := stillSet[e]; !kept {
  2882. toDelete = append(toDelete, e)
  2883. }
  2884. }
  2885. if len(toDelete) > 0 {
  2886. if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
  2887. return err
  2888. }
  2889. }
  2890. return nil
  2891. }
  2892. func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
  2893. db := database.GetDB()
  2894. var inbounds []*model.Inbound
  2895. // Retrieve inbounds where settings contain the given tgId
  2896. err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
  2897. if err != nil && err != gorm.ErrRecordNotFound {
  2898. logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
  2899. return nil, err
  2900. }
  2901. var emails []string
  2902. for _, inbound := range inbounds {
  2903. clients, err := s.GetClients(inbound)
  2904. if err != nil {
  2905. logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
  2906. continue
  2907. }
  2908. for _, client := range clients {
  2909. if client.TgID == tgId {
  2910. emails = append(emails, client.Email)
  2911. }
  2912. }
  2913. }
  2914. // Chunked to stay under SQLite's bind-variable limit when a single Telegram
  2915. // account owns thousands of clients across inbounds.
  2916. uniqEmails := uniqueNonEmptyStrings(emails)
  2917. traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
  2918. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  2919. var page []*xray.ClientTraffic
  2920. if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  2921. if err == gorm.ErrRecordNotFound {
  2922. continue
  2923. }
  2924. logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
  2925. return nil, err
  2926. }
  2927. traffics = append(traffics, page...)
  2928. }
  2929. if len(traffics) == 0 {
  2930. logger.Warning("No ClientTraffic records found for emails:", emails)
  2931. return nil, nil
  2932. }
  2933. // Populate UUID and other client data for each traffic record
  2934. for i := range traffics {
  2935. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  2936. traffics[i].Enable = client.Enable
  2937. traffics[i].UUID = client.ID
  2938. traffics[i].SubId = client.SubID
  2939. }
  2940. }
  2941. return traffics, nil
  2942. }
  2943. // sqliteMaxVars is a safe ceiling for the number of bind parameters in a
  2944. // single SQL statement. SQLite's SQLITE_MAX_VARIABLE_NUMBER is 999 on builds
  2945. // before 3.32 and 32766 after; staying under 999 keeps queries portable
  2946. // across forks/old binaries and also bounds per-query memory on truly large
  2947. // installs (>32k clients) where even modern SQLite would refuse a single IN.
  2948. const sqliteMaxVars = 900
  2949. // uniqueNonEmptyStrings returns a deduplicated copy of in with empty strings
  2950. // removed, preserving the order of first occurrence.
  2951. func uniqueNonEmptyStrings(in []string) []string {
  2952. if len(in) == 0 {
  2953. return nil
  2954. }
  2955. seen := make(map[string]struct{}, len(in))
  2956. out := make([]string, 0, len(in))
  2957. for _, v := range in {
  2958. if v == "" {
  2959. continue
  2960. }
  2961. if _, ok := seen[v]; ok {
  2962. continue
  2963. }
  2964. seen[v] = struct{}{}
  2965. out = append(out, v)
  2966. }
  2967. return out
  2968. }
  2969. // uniqueInts returns a deduplicated copy of in, preserving order of first occurrence.
  2970. func uniqueInts(in []int) []int {
  2971. if len(in) == 0 {
  2972. return nil
  2973. }
  2974. seen := make(map[int]struct{}, len(in))
  2975. out := make([]int, 0, len(in))
  2976. for _, v := range in {
  2977. if _, ok := seen[v]; ok {
  2978. continue
  2979. }
  2980. seen[v] = struct{}{}
  2981. out = append(out, v)
  2982. }
  2983. return out
  2984. }
  2985. // chunkStrings splits s into consecutive sub-slices of at most size elements.
  2986. // Returns nil for an empty input or non-positive size.
  2987. func chunkStrings(s []string, size int) [][]string {
  2988. if size <= 0 || len(s) == 0 {
  2989. return nil
  2990. }
  2991. out := make([][]string, 0, (len(s)+size-1)/size)
  2992. for i := 0; i < len(s); i += size {
  2993. end := i + size
  2994. if end > len(s) {
  2995. end = len(s)
  2996. }
  2997. out = append(out, s[i:end])
  2998. }
  2999. return out
  3000. }
  3001. // chunkInts splits s into consecutive sub-slices of at most size elements.
  3002. // Returns nil for an empty input or non-positive size.
  3003. func chunkInts(s []int, size int) [][]int {
  3004. if size <= 0 || len(s) == 0 {
  3005. return nil
  3006. }
  3007. out := make([][]int, 0, (len(s)+size-1)/size)
  3008. for i := 0; i < len(s); i += size {
  3009. end := i + size
  3010. if end > len(s) {
  3011. end = len(s)
  3012. }
  3013. out = append(out, s[i:end])
  3014. }
  3015. return out
  3016. }
  3017. func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
  3018. uniq := uniqueNonEmptyStrings(emails)
  3019. if len(uniq) == 0 {
  3020. return nil, nil
  3021. }
  3022. db := database.GetDB()
  3023. traffics := make([]*xray.ClientTraffic, 0, len(uniq))
  3024. for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
  3025. var page []*xray.ClientTraffic
  3026. if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
  3027. return nil, err
  3028. }
  3029. traffics = append(traffics, page...)
  3030. }
  3031. return traffics, nil
  3032. }
  3033. type InboundTrafficSummary struct {
  3034. Id int `json:"id"`
  3035. Up int64 `json:"up"`
  3036. Down int64 `json:"down"`
  3037. Total int64 `json:"total"`
  3038. AllTime int64 `json:"allTime"`
  3039. Enable bool `json:"enable"`
  3040. }
  3041. func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
  3042. db := database.GetDB()
  3043. var summaries []InboundTrafficSummary
  3044. if err := db.Model(&model.Inbound{}).
  3045. Select("id, up, down, total, all_time, enable").
  3046. Find(&summaries).Error; err != nil {
  3047. return nil, err
  3048. }
  3049. return summaries, nil
  3050. }
  3051. func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
  3052. // Prefer retrieving along with client to reflect actual enabled state from inbound settings
  3053. t, client, err := s.GetClientByEmail(email)
  3054. if err != nil {
  3055. logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
  3056. return nil, err
  3057. }
  3058. if t != nil && client != nil {
  3059. t.UUID = client.ID
  3060. t.SubId = client.SubID
  3061. return t, nil
  3062. }
  3063. return nil, nil
  3064. }
  3065. func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
  3066. return submitTrafficWrite(func() error {
  3067. db := database.GetDB()
  3068. err := db.Model(xray.ClientTraffic{}).
  3069. Where("email = ?", email).
  3070. Updates(map[string]any{
  3071. "up": upload,
  3072. "down": download,
  3073. "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
  3074. }).Error
  3075. if err != nil {
  3076. logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
  3077. }
  3078. return err
  3079. })
  3080. }
  3081. func (s *InboundService) GetClientTrafficByID(id string) ([]xray.ClientTraffic, error) {
  3082. db := database.GetDB()
  3083. var traffics []xray.ClientTraffic
  3084. err := db.Model(xray.ClientTraffic{}).Where(`email IN(
  3085. SELECT JSON_EXTRACT(client.value, '$.email') as email
  3086. FROM inbounds,
  3087. JSON_EACH(JSON_EXTRACT(inbounds.settings, '$.clients')) AS client
  3088. WHERE
  3089. JSON_EXTRACT(client.value, '$.id') in (?)
  3090. )`, id).Find(&traffics).Error
  3091. if err != nil {
  3092. logger.Debug(err)
  3093. return nil, err
  3094. }
  3095. // Reconcile enable flag with client settings per email to avoid stale DB value
  3096. for i := range traffics {
  3097. if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
  3098. traffics[i].Enable = client.Enable
  3099. traffics[i].UUID = client.ID
  3100. traffics[i].SubId = client.SubID
  3101. }
  3102. }
  3103. return traffics, err
  3104. }
  3105. func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
  3106. db := database.GetDB()
  3107. inbound := &model.Inbound{}
  3108. traffic = &xray.ClientTraffic{}
  3109. // Search for inbound settings that contain the query
  3110. err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
  3111. if err != nil {
  3112. if err == gorm.ErrRecordNotFound {
  3113. logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
  3114. return nil, err
  3115. }
  3116. logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
  3117. return nil, err
  3118. }
  3119. traffic.InboundId = inbound.Id
  3120. // Unmarshal settings to get clients
  3121. settings := map[string][]model.Client{}
  3122. if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
  3123. logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
  3124. return nil, err
  3125. }
  3126. clients := settings["clients"]
  3127. for _, client := range clients {
  3128. if (client.ID == query || client.Password == query) && client.Email != "" {
  3129. traffic.Email = client.Email
  3130. break
  3131. }
  3132. }
  3133. if traffic.Email == "" {
  3134. logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
  3135. return nil, gorm.ErrRecordNotFound
  3136. }
  3137. // Retrieve ClientTraffic based on the found email
  3138. err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
  3139. if err != nil {
  3140. if err == gorm.ErrRecordNotFound {
  3141. logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
  3142. return nil, err
  3143. }
  3144. logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
  3145. return nil, err
  3146. }
  3147. return traffic, nil
  3148. }
  3149. func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error) {
  3150. db := database.GetDB()
  3151. InboundClientIps := &model.InboundClientIps{}
  3152. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  3153. if err != nil {
  3154. return "", err
  3155. }
  3156. if InboundClientIps.Ips == "" {
  3157. return "", nil
  3158. }
  3159. // Try to parse as new format (with timestamps)
  3160. type IPWithTimestamp struct {
  3161. IP string `json:"ip"`
  3162. Timestamp int64 `json:"timestamp"`
  3163. }
  3164. var ipsWithTime []IPWithTimestamp
  3165. err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
  3166. // If successfully parsed as new format, return with timestamps
  3167. if err == nil && len(ipsWithTime) > 0 {
  3168. return InboundClientIps.Ips, nil
  3169. }
  3170. // Otherwise, assume it's old format (simple string array)
  3171. // Try to parse as simple array and convert to new format
  3172. var oldIps []string
  3173. err = json.Unmarshal([]byte(InboundClientIps.Ips), &oldIps)
  3174. if err == nil && len(oldIps) > 0 {
  3175. // Convert old format to new format with current timestamp
  3176. newIpsWithTime := make([]IPWithTimestamp, len(oldIps))
  3177. for i, ip := range oldIps {
  3178. newIpsWithTime[i] = IPWithTimestamp{
  3179. IP: ip,
  3180. Timestamp: time.Now().Unix(),
  3181. }
  3182. }
  3183. result, _ := json.Marshal(newIpsWithTime)
  3184. return string(result), nil
  3185. }
  3186. // Return as-is if parsing fails
  3187. return InboundClientIps.Ips, nil
  3188. }
  3189. func (s *InboundService) ClearClientIps(clientEmail string) error {
  3190. db := database.GetDB()
  3191. result := db.Model(model.InboundClientIps{}).
  3192. Where("client_email = ?", clientEmail).
  3193. Update("ips", "")
  3194. err := result.Error
  3195. if err != nil {
  3196. return err
  3197. }
  3198. return nil
  3199. }
  3200. func (s *InboundService) SearchInbounds(query string) ([]*model.Inbound, error) {
  3201. db := database.GetDB()
  3202. var inbounds []*model.Inbound
  3203. err := db.Model(model.Inbound{}).Preload("ClientStats").Where("remark like ?", "%"+query+"%").Find(&inbounds).Error
  3204. if err != nil && err != gorm.ErrRecordNotFound {
  3205. return nil, err
  3206. }
  3207. return inbounds, nil
  3208. }
  3209. func (s *InboundService) MigrationRequirements() {
  3210. db := database.GetDB()
  3211. tx := db.Begin()
  3212. var err error
  3213. defer func() {
  3214. if err == nil {
  3215. tx.Commit()
  3216. if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
  3217. logger.Warningf("VACUUM failed: %v", dbErr)
  3218. }
  3219. } else {
  3220. tx.Rollback()
  3221. }
  3222. }()
  3223. // Calculate and backfill all_time from up+down for inbounds and clients
  3224. err = tx.Exec(`
  3225. UPDATE inbounds
  3226. SET all_time = IFNULL(up, 0) + IFNULL(down, 0)
  3227. WHERE IFNULL(all_time, 0) = 0 AND (IFNULL(up, 0) + IFNULL(down, 0)) > 0
  3228. `).Error
  3229. if err != nil {
  3230. return
  3231. }
  3232. err = tx.Exec(`
  3233. UPDATE client_traffics
  3234. SET all_time = IFNULL(up, 0) + IFNULL(down, 0)
  3235. WHERE IFNULL(all_time, 0) = 0 AND (IFNULL(up, 0) + IFNULL(down, 0)) > 0
  3236. `).Error
  3237. if err != nil {
  3238. return
  3239. }
  3240. // Fix inbounds based problems
  3241. var inbounds []*model.Inbound
  3242. err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan"}).Find(&inbounds).Error
  3243. if err != nil && err != gorm.ErrRecordNotFound {
  3244. return
  3245. }
  3246. for inbound_index := range inbounds {
  3247. settings := map[string]any{}
  3248. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  3249. clients, ok := settings["clients"].([]any)
  3250. if ok {
  3251. // Fix Client configuration problems
  3252. var newClients []any
  3253. hasVisionFlow := false
  3254. for client_index := range clients {
  3255. c := clients[client_index].(map[string]any)
  3256. // Add email='' if it is not exists
  3257. if _, ok := c["email"]; !ok {
  3258. c["email"] = ""
  3259. }
  3260. // Convert string tgId to int64
  3261. if _, ok := c["tgId"]; ok {
  3262. var tgId any = c["tgId"]
  3263. if tgIdStr, ok2 := tgId.(string); ok2 {
  3264. tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
  3265. if err == nil {
  3266. c["tgId"] = tgIdInt64
  3267. }
  3268. }
  3269. }
  3270. // Remove "flow": "xtls-rprx-direct"
  3271. if _, ok := c["flow"]; ok {
  3272. if c["flow"] == "xtls-rprx-direct" {
  3273. c["flow"] = ""
  3274. }
  3275. }
  3276. if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
  3277. hasVisionFlow = true
  3278. }
  3279. // Backfill created_at and updated_at
  3280. if _, ok := c["created_at"]; !ok {
  3281. c["created_at"] = time.Now().Unix() * 1000
  3282. }
  3283. c["updated_at"] = time.Now().Unix() * 1000
  3284. newClients = append(newClients, any(c))
  3285. }
  3286. settings["clients"] = newClients
  3287. // Drop orphaned testseed: VLESS-only field, only meaningful when at least
  3288. // one client uses the exact xtls-rprx-vision flow. Older versions saved it
  3289. // for any non-empty flow (including the UDP variant) or kept it after the
  3290. // flow was cleared from the client modal — clean those up here.
  3291. if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
  3292. delete(settings, "testseed")
  3293. }
  3294. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  3295. if err != nil {
  3296. return
  3297. }
  3298. inbounds[inbound_index].Settings = string(modifiedSettings)
  3299. }
  3300. // Add client traffic row for all clients which has email
  3301. modelClients, err := s.GetClients(inbounds[inbound_index])
  3302. if err != nil {
  3303. return
  3304. }
  3305. for _, modelClient := range modelClients {
  3306. if len(modelClient.Email) > 0 {
  3307. var count int64
  3308. tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
  3309. if count == 0 {
  3310. s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
  3311. }
  3312. }
  3313. }
  3314. }
  3315. tx.Save(inbounds)
  3316. // Remove orphaned traffics
  3317. tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
  3318. // Migrate old MultiDomain to External Proxy
  3319. var externalProxy []struct {
  3320. Id int
  3321. Port int
  3322. StreamSettings []byte
  3323. }
  3324. err = tx.Raw(`select id, port, stream_settings
  3325. from inbounds
  3326. WHERE protocol in ('vmess','vless','trojan')
  3327. AND json_extract(stream_settings, '$.security') = 'tls'
  3328. AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`).Scan(&externalProxy).Error
  3329. if err != nil || len(externalProxy) == 0 {
  3330. return
  3331. }
  3332. for _, ep := range externalProxy {
  3333. var reverses any
  3334. var stream map[string]any
  3335. json.Unmarshal(ep.StreamSettings, &stream)
  3336. if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
  3337. if settings, ok := tlsSettings["settings"].(map[string]any); ok {
  3338. if domains, ok := settings["domains"].([]any); ok {
  3339. for _, domain := range domains {
  3340. if domainMap, ok := domain.(map[string]any); ok {
  3341. domainMap["forceTls"] = "same"
  3342. domainMap["port"] = ep.Port
  3343. domainMap["dest"] = domainMap["domain"].(string)
  3344. delete(domainMap, "domain")
  3345. }
  3346. }
  3347. }
  3348. reverses = settings["domains"]
  3349. delete(settings, "domains")
  3350. }
  3351. }
  3352. stream["externalProxy"] = reverses
  3353. newStream, _ := json.MarshalIndent(stream, " ", " ")
  3354. tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
  3355. }
  3356. err = tx.Raw(`UPDATE inbounds
  3357. SET tag = REPLACE(tag, '0.0.0.0:', '')
  3358. WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error
  3359. if err != nil {
  3360. return
  3361. }
  3362. }
  3363. func (s *InboundService) MigrateDB() {
  3364. s.MigrationRequirements()
  3365. s.MigrationRemoveOrphanedTraffics()
  3366. }
  3367. func (s *InboundService) GetOnlineClients() []string {
  3368. return p.GetOnlineClients()
  3369. }
  3370. func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
  3371. if p != nil {
  3372. p.SetNodeOnlineClients(nodeID, emails)
  3373. }
  3374. }
  3375. func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
  3376. if p != nil {
  3377. p.ClearNodeOnlineClients(nodeID)
  3378. }
  3379. }
  3380. func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
  3381. db := database.GetDB()
  3382. var rows []xray.ClientTraffic
  3383. err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
  3384. if err != nil && err != gorm.ErrRecordNotFound {
  3385. return nil, err
  3386. }
  3387. result := make(map[string]int64, len(rows))
  3388. for _, r := range rows {
  3389. result[r.Email] = r.LastOnline
  3390. }
  3391. return result, nil
  3392. }
  3393. func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
  3394. db := database.GetDB()
  3395. // Step 1: Get ClientTraffic records for emails in the input list.
  3396. // Chunked to stay under SQLite's bind-variable limit on huge inputs.
  3397. uniqEmails := uniqueNonEmptyStrings(emails)
  3398. clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
  3399. for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
  3400. var page []xray.ClientTraffic
  3401. if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
  3402. return nil, nil, err
  3403. }
  3404. clients = append(clients, page...)
  3405. }
  3406. // Step 2: Sort clients by (Up + Down) descending
  3407. sort.Slice(clients, func(i, j int) bool {
  3408. return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
  3409. })
  3410. // Step 3: Extract sorted valid emails and track found ones
  3411. validEmails := make([]string, 0, len(clients))
  3412. found := make(map[string]bool)
  3413. for _, client := range clients {
  3414. validEmails = append(validEmails, client.Email)
  3415. found[client.Email] = true
  3416. }
  3417. // Step 4: Identify emails that were not found in the database
  3418. extraEmails := make([]string, 0)
  3419. for _, email := range emails {
  3420. if !found[email] {
  3421. extraEmails = append(extraEmails, email)
  3422. }
  3423. }
  3424. return validEmails, extraEmails, nil
  3425. }
  3426. func (s *InboundService) DelInboundClientByEmail(inboundId int, email string) (bool, error) {
  3427. oldInbound, err := s.GetInbound(inboundId)
  3428. if err != nil {
  3429. logger.Error("Load Old Data Error")
  3430. return false, err
  3431. }
  3432. var settings map[string]any
  3433. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  3434. return false, err
  3435. }
  3436. interfaceClients, ok := settings["clients"].([]any)
  3437. if !ok {
  3438. return false, common.NewError("invalid clients format in inbound settings")
  3439. }
  3440. var newClients []any
  3441. needApiDel := false
  3442. found := false
  3443. for _, client := range interfaceClients {
  3444. c, ok := client.(map[string]any)
  3445. if !ok {
  3446. continue
  3447. }
  3448. if cEmail, ok := c["email"].(string); ok && cEmail == email {
  3449. // matched client, drop it
  3450. found = true
  3451. needApiDel, _ = c["enable"].(bool)
  3452. } else {
  3453. newClients = append(newClients, client)
  3454. }
  3455. }
  3456. if !found {
  3457. return false, common.NewError(fmt.Sprintf("client with email %s not found", email))
  3458. }
  3459. if len(newClients) == 0 {
  3460. return false, common.NewError("no client remained in Inbound")
  3461. }
  3462. settings["clients"] = newClients
  3463. newSettings, err := json.MarshalIndent(settings, "", " ")
  3464. if err != nil {
  3465. return false, err
  3466. }
  3467. oldInbound.Settings = string(newSettings)
  3468. db := database.GetDB()
  3469. // Drop the row and IPs only when this was the last inbound referencing
  3470. // the email — siblings still need the shared accounting state.
  3471. emailShared, err := s.emailUsedByOtherInbounds(email, inboundId)
  3472. if err != nil {
  3473. return false, err
  3474. }
  3475. if !emailShared {
  3476. if err := s.DelClientIPs(db, email); err != nil {
  3477. logger.Error("Error in delete client IPs")
  3478. return false, err
  3479. }
  3480. }
  3481. needRestart := false
  3482. // remove stats too
  3483. if len(email) > 0 && !emailShared {
  3484. traffic, err := s.GetClientTrafficByEmail(email)
  3485. if err != nil {
  3486. return false, err
  3487. }
  3488. if traffic != nil {
  3489. if err := s.DelClientStat(db, email); err != nil {
  3490. logger.Error("Delete stats Data Error")
  3491. return false, err
  3492. }
  3493. }
  3494. if needApiDel {
  3495. rt, rterr := s.runtimeFor(oldInbound)
  3496. if rterr != nil {
  3497. if oldInbound.NodeID != nil {
  3498. return false, rterr
  3499. }
  3500. needRestart = true
  3501. } else if oldInbound.NodeID == nil {
  3502. if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil {
  3503. logger.Debug("Client deleted on", rt.Name(), ":", email)
  3504. needRestart = false
  3505. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  3506. logger.Debug("User is already deleted. Nothing to do more...")
  3507. } else {
  3508. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  3509. needRestart = true
  3510. }
  3511. } else {
  3512. if err1 := rt.UpdateInbound(context.Background(), oldInbound, oldInbound); err1 != nil {
  3513. return false, err1
  3514. }
  3515. }
  3516. }
  3517. }
  3518. return needRestart, db.Save(oldInbound).Error
  3519. }
  3520. type SubLinkProvider interface {
  3521. SubLinksForSubId(host, subId string) ([]string, error)
  3522. LinksForClient(host string, inbound *model.Inbound, email string) []string
  3523. }
  3524. var registeredSubLinkProvider SubLinkProvider
  3525. func RegisterSubLinkProvider(p SubLinkProvider) {
  3526. registeredSubLinkProvider = p
  3527. }
  3528. func (s *InboundService) GetSubLinks(host, subId string) ([]string, error) {
  3529. if registeredSubLinkProvider == nil {
  3530. return nil, common.NewError("sub link provider not registered")
  3531. }
  3532. return registeredSubLinkProvider.SubLinksForSubId(host, subId)
  3533. }
  3534. func (s *InboundService) GetClientLinks(host string, id int, email string) ([]string, error) {
  3535. inbound, err := s.GetInbound(id)
  3536. if err != nil {
  3537. return nil, err
  3538. }
  3539. if registeredSubLinkProvider == nil {
  3540. return nil, common.NewError("sub link provider not registered")
  3541. }
  3542. return registeredSubLinkProvider.LinksForClient(host, inbound, email), nil
  3543. }