inbound_migration.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package service
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  13. "gorm.io/gorm"
  14. )
  15. func (s *InboundService) MigrationRemoveOrphanedTraffics() {
  16. db := database.GetDB()
  17. query := fmt.Sprintf(
  18. "DELETE FROM client_traffics WHERE email NOT IN (SELECT %s %s)",
  19. database.JSONFieldText("client.value", "email"),
  20. database.JSONClientsFromInbound(),
  21. )
  22. db.Exec(query)
  23. }
  24. func (s *InboundService) MigrationRequirements() {
  25. db := database.GetDB()
  26. tx := db.Begin()
  27. var err error
  28. defer func() {
  29. if err == nil {
  30. tx.Commit()
  31. if !database.IsPostgres() {
  32. if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
  33. logger.Warningf("VACUUM failed: %v", dbErr)
  34. }
  35. }
  36. } else {
  37. tx.Rollback()
  38. }
  39. }()
  40. if tx.Migrator().HasColumn(&model.Inbound{}, "all_time") {
  41. if err = tx.Migrator().DropColumn(&model.Inbound{}, "all_time"); err != nil {
  42. return
  43. }
  44. }
  45. if tx.Migrator().HasColumn(&xray.ClientTraffic{}, "all_time") {
  46. if err = tx.Migrator().DropColumn(&xray.ClientTraffic{}, "all_time"); err != nil {
  47. return
  48. }
  49. }
  50. if err = normalizeInboundShareAddressColumns(tx); err != nil {
  51. return
  52. }
  53. // Normalize "enable" columns to boolean on Postgres. Legacy SQLite data
  54. // (0/1 integers), partial migrations, or mixed write paths (public API
  55. // inbound updates that flow through UpdateClientStat + client syncs, plus
  56. // node traffic merge deltas) can leave the column as integer or with mixed
  57. // interpretation. This (combined with the dialect-aware
  58. // ClientTrafficEnableMergeExpr) prevents type problems in the node traffic
  59. // sync merge (SetRemoteTraffic) and makes the sync robust even when
  60. // inbounds are updated via the public API (incl. ones carrying
  61. // externalProxy in streamSettings). The same expression is also safe on
  62. // SQLite (no PG :: casts).
  63. if database.IsPostgres() {
  64. // Use DO block so it is idempotent and doesn't fail if already boolean.
  65. normalizeBool := func(table, col string) {
  66. tx.Exec(fmt.Sprintf(`
  67. DO $$
  68. BEGIN
  69. IF EXISTS (
  70. SELECT 1 FROM information_schema.columns
  71. WHERE table_name = '%s' AND column_name = '%s'
  72. AND data_type <> 'boolean'
  73. ) THEN
  74. ALTER TABLE %s ALTER COLUMN %s
  75. TYPE boolean USING (CASE WHEN %s::text IN ('1','true','t','yes') THEN true ELSE false END);
  76. END IF;
  77. END $$;`, table, col, table, col, col))
  78. }
  79. normalizeBool("inbounds", "enable")
  80. normalizeBool("client_traffics", "enable")
  81. normalizeBool("nodes", "enable")
  82. normalizeBool("clients", "enable")
  83. normalizeBool("api_tokens", "enabled")
  84. normalizeBool("outbound_subscriptions", "enabled")
  85. }
  86. // Fix inbounds based problems
  87. var inbounds []*model.Inbound
  88. err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan", "shadowsocks", "hysteria"}).Find(&inbounds).Error
  89. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  90. return
  91. }
  92. for inbound_index := range inbounds {
  93. settings := map[string]any{}
  94. _ = json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  95. if raw, exists := settings["clients"]; exists && raw == nil {
  96. settings["clients"] = []any{}
  97. }
  98. clients, ok := settings["clients"].([]any)
  99. if ok {
  100. // Fix Client configuration problems
  101. newClients := make([]any, 0, len(clients))
  102. hasVisionFlow := false
  103. for client_index := range clients {
  104. c := clients[client_index].(map[string]any)
  105. // Add email='' if it is not exists
  106. if _, ok := c["email"]; !ok {
  107. c["email"] = ""
  108. }
  109. // Convert string tgId to int64
  110. if _, ok := c["tgId"]; ok {
  111. tgId := c["tgId"]
  112. if tgIdStr, ok2 := tgId.(string); ok2 {
  113. tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
  114. if err == nil {
  115. c["tgId"] = tgIdInt64
  116. }
  117. }
  118. }
  119. // Remove "flow": "xtls-rprx-direct"
  120. if _, ok := c["flow"]; ok {
  121. if c["flow"] == "xtls-rprx-direct" {
  122. c["flow"] = ""
  123. }
  124. }
  125. if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
  126. hasVisionFlow = true
  127. }
  128. // Backfill created_at and updated_at
  129. if _, ok := c["created_at"]; !ok {
  130. c["created_at"] = time.Now().Unix() * 1000
  131. }
  132. c["updated_at"] = time.Now().Unix() * 1000
  133. newClients = append(newClients, any(c))
  134. }
  135. settings["clients"] = newClients
  136. // Drop orphaned testseed: VLESS-only field, only meaningful when at least
  137. // one client uses the exact xtls-rprx-vision flow. Older versions saved it
  138. // for any non-empty flow (including the UDP variant) or kept it after the
  139. // flow was cleared from the client modal — clean those up here.
  140. if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
  141. delete(settings, "testseed")
  142. }
  143. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  144. if err != nil {
  145. return
  146. }
  147. inbounds[inbound_index].Settings = string(modifiedSettings)
  148. }
  149. // Add client traffic row for all clients which has email
  150. modelClients, err := s.GetClients(inbounds[inbound_index])
  151. if err != nil {
  152. return
  153. }
  154. for _, modelClient := range modelClients {
  155. if len(modelClient.Email) > 0 {
  156. var count int64
  157. tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
  158. if count == 0 {
  159. _ = s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
  160. }
  161. }
  162. }
  163. // Heal clients table for installs where the one-shot seeder
  164. // skipped clients due to a tgId-string unmarshal error.
  165. if syncErr := s.clientService.SyncInbound(tx, inbounds[inbound_index].Id, modelClients); syncErr != nil {
  166. logger.Warning("MigrationRequirements sync clients failed:", syncErr)
  167. }
  168. }
  169. tx.Save(inbounds)
  170. // Remove orphaned traffics
  171. tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
  172. // Migrate old MultiDomain to External Proxy
  173. var externalProxy []struct {
  174. Id int
  175. Port int
  176. StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan
  177. }
  178. externalProxyQuery := `select id, port, stream_settings
  179. from inbounds
  180. WHERE protocol in ('vmess','vless','trojan')
  181. AND json_extract(stream_settings, '$.security') = 'tls'
  182. AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`
  183. if database.IsPostgres() {
  184. externalProxyQuery = `select id, port, stream_settings
  185. from inbounds
  186. WHERE protocol in ('vmess','vless','trojan')
  187. AND NULLIF(stream_settings, '')::jsonb #>> '{security}' = 'tls'
  188. AND NULLIF(stream_settings, '')::jsonb #> '{tlsSettings,settings,domains}' IS NOT NULL`
  189. }
  190. err = tx.Raw(externalProxyQuery).Scan(&externalProxy).Error
  191. if err != nil || len(externalProxy) == 0 {
  192. return
  193. }
  194. for _, ep := range externalProxy {
  195. var reverses any
  196. var stream map[string]any
  197. _ = json.Unmarshal([]byte(ep.StreamSettings), &stream)
  198. if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
  199. if settings, ok := tlsSettings["settings"].(map[string]any); ok {
  200. if domains, ok := settings["domains"].([]any); ok {
  201. for _, domain := range domains {
  202. if domainMap, ok := domain.(map[string]any); ok {
  203. domainMap["forceTls"] = "same"
  204. domainMap["port"] = ep.Port
  205. domainMap["dest"] = domainMap["domain"].(string)
  206. delete(domainMap, "domain")
  207. }
  208. }
  209. }
  210. reverses = settings["domains"]
  211. delete(settings, "domains")
  212. }
  213. }
  214. stream["externalProxy"] = reverses
  215. newStream, _ := json.MarshalIndent(stream, " ", " ")
  216. tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
  217. }
  218. // Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-...").
  219. // Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position().
  220. tagCleanup := `UPDATE inbounds
  221. SET tag = REPLACE(tag, '0.0.0.0:', '')
  222. WHERE INSTR(tag, '0.0.0.0:') > 0;`
  223. if database.IsPostgres() {
  224. tagCleanup = `UPDATE inbounds
  225. SET tag = REPLACE(tag, '0.0.0.0:', '')
  226. WHERE position('0.0.0.0:' in tag) > 0;`
  227. }
  228. err = tx.Exec(tagCleanup).Error
  229. if err != nil {
  230. return
  231. }
  232. }
  233. func (s *InboundService) MigrateDB() {
  234. s.MigrationRequirements()
  235. s.MigrationRemoveOrphanedTraffics()
  236. s.MigrationRestoreVisionFlow()
  237. }
  238. // MigrationRestoreVisionFlow repairs VLESS inbounds whose clients lost their
  239. // XTLS Vision flow because the inbound was not flow-eligible when the client was
  240. // written (e.g. an XHTTP inbound whose vlessenc encryption was enabled only
  241. // later). For each now-eligible inbound it restores flow=xtls-rprx-vision on
  242. // clients whose intended flow (their flow_override on a sibling inbound) is
  243. // Vision. Idempotent: once a client carries the flow it is skipped, so this is a
  244. // no-op on healthy installs and on subsequent boots.
  245. func (s *InboundService) MigrationRestoreVisionFlow() {
  246. db := database.GetDB()
  247. var inbounds []*model.Inbound
  248. if err := db.Model(&model.Inbound{}).
  249. Where("protocol = ?", model.VLESS).
  250. Find(&inbounds).Error; err != nil {
  251. logger.Warning("MigrationRestoreVisionFlow: load inbounds failed:", err)
  252. return
  253. }
  254. for _, ib := range inbounds {
  255. restored, changed := s.restoreVisionFlowForEligibleInbound(nil, ib.Settings, ib.StreamSettings, ib.Protocol)
  256. if !changed {
  257. continue
  258. }
  259. clients, err := s.GetClients(&model.Inbound{Settings: restored})
  260. if err != nil {
  261. logger.Warning("MigrationRestoreVisionFlow: parse clients for inbound", ib.Id, "failed:", err)
  262. continue
  263. }
  264. err = db.Transaction(func(tx *gorm.DB) error {
  265. if e := tx.Model(&model.Inbound{}).Where("id = ?", ib.Id).Update("settings", restored).Error; e != nil {
  266. return e
  267. }
  268. return s.clientService.SyncInbound(tx, ib.Id, clients)
  269. })
  270. if err != nil {
  271. logger.Warning("MigrationRestoreVisionFlow: update inbound", ib.Id, "failed:", err)
  272. continue
  273. }
  274. logger.Info("MigrationRestoreVisionFlow: restored XTLS Vision flow on inbound", ib.Id)
  275. }
  276. }