inbound_migration.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  11. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  12. "gorm.io/gorm"
  13. )
  14. func (s *InboundService) MigrationRemoveOrphanedTraffics() {
  15. db := database.GetDB()
  16. query := fmt.Sprintf(
  17. "DELETE FROM client_traffics WHERE email NOT IN (SELECT %s %s)",
  18. database.JSONFieldText("client.value", "email"),
  19. database.JSONClientsFromInbound(),
  20. )
  21. db.Exec(query)
  22. }
  23. func (s *InboundService) MigrationRequirements() {
  24. db := database.GetDB()
  25. tx := db.Begin()
  26. var err error
  27. defer func() {
  28. if err == nil {
  29. tx.Commit()
  30. if !database.IsPostgres() {
  31. if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
  32. logger.Warningf("VACUUM failed: %v", dbErr)
  33. }
  34. }
  35. } else {
  36. tx.Rollback()
  37. }
  38. }()
  39. if tx.Migrator().HasColumn(&model.Inbound{}, "all_time") {
  40. if err = tx.Migrator().DropColumn(&model.Inbound{}, "all_time"); err != nil {
  41. return
  42. }
  43. }
  44. if tx.Migrator().HasColumn(&xray.ClientTraffic{}, "all_time") {
  45. if err = tx.Migrator().DropColumn(&xray.ClientTraffic{}, "all_time"); err != nil {
  46. return
  47. }
  48. }
  49. // Normalize "enable" columns to boolean on Postgres. Legacy SQLite data
  50. // (0/1 integers), partial migrations, or mixed write paths (public API
  51. // inbound updates that flow through UpdateClientStat + client syncs, plus
  52. // node traffic merge deltas) can leave the column as integer or with mixed
  53. // interpretation. This (combined with the dialect-aware
  54. // ClientTrafficEnableMergeExpr) prevents type problems in the node traffic
  55. // sync merge (SetRemoteTraffic) and makes the sync robust even when
  56. // inbounds are updated via the public API (incl. ones carrying
  57. // externalProxy in streamSettings). The same expression is also safe on
  58. // SQLite (no PG :: casts).
  59. if database.IsPostgres() {
  60. // Use DO block so it is idempotent and doesn't fail if already boolean.
  61. normalizeBool := func(table, col string) {
  62. tx.Exec(fmt.Sprintf(`
  63. DO $$
  64. BEGIN
  65. IF EXISTS (
  66. SELECT 1 FROM information_schema.columns
  67. WHERE table_name = '%s' AND column_name = '%s'
  68. AND data_type <> 'boolean'
  69. ) THEN
  70. ALTER TABLE %s ALTER COLUMN %s
  71. TYPE boolean USING (CASE WHEN %s::text IN ('1','true','t','yes') THEN true ELSE false END);
  72. END IF;
  73. END $$;`, table, col, table, col, col))
  74. }
  75. normalizeBool("inbounds", "enable")
  76. normalizeBool("client_traffics", "enable")
  77. normalizeBool("nodes", "enable")
  78. normalizeBool("clients", "enable")
  79. normalizeBool("api_tokens", "enabled")
  80. normalizeBool("outbound_subscriptions", "enabled")
  81. }
  82. // Fix inbounds based problems
  83. var inbounds []*model.Inbound
  84. err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan", "shadowsocks", "hysteria"}).Find(&inbounds).Error
  85. if err != nil && err != gorm.ErrRecordNotFound {
  86. return
  87. }
  88. for inbound_index := range inbounds {
  89. settings := map[string]any{}
  90. json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
  91. if raw, exists := settings["clients"]; exists && raw == nil {
  92. settings["clients"] = []any{}
  93. }
  94. clients, ok := settings["clients"].([]any)
  95. if ok {
  96. // Fix Client configuration problems
  97. newClients := make([]any, 0, len(clients))
  98. hasVisionFlow := false
  99. for client_index := range clients {
  100. c := clients[client_index].(map[string]any)
  101. // Add email='' if it is not exists
  102. if _, ok := c["email"]; !ok {
  103. c["email"] = ""
  104. }
  105. // Convert string tgId to int64
  106. if _, ok := c["tgId"]; ok {
  107. var tgId any = c["tgId"]
  108. if tgIdStr, ok2 := tgId.(string); ok2 {
  109. tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
  110. if err == nil {
  111. c["tgId"] = tgIdInt64
  112. }
  113. }
  114. }
  115. // Remove "flow": "xtls-rprx-direct"
  116. if _, ok := c["flow"]; ok {
  117. if c["flow"] == "xtls-rprx-direct" {
  118. c["flow"] = ""
  119. }
  120. }
  121. if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
  122. hasVisionFlow = true
  123. }
  124. // Backfill created_at and updated_at
  125. if _, ok := c["created_at"]; !ok {
  126. c["created_at"] = time.Now().Unix() * 1000
  127. }
  128. c["updated_at"] = time.Now().Unix() * 1000
  129. newClients = append(newClients, any(c))
  130. }
  131. settings["clients"] = newClients
  132. // Drop orphaned testseed: VLESS-only field, only meaningful when at least
  133. // one client uses the exact xtls-rprx-vision flow. Older versions saved it
  134. // for any non-empty flow (including the UDP variant) or kept it after the
  135. // flow was cleared from the client modal — clean those up here.
  136. if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
  137. delete(settings, "testseed")
  138. }
  139. modifiedSettings, err := json.MarshalIndent(settings, "", " ")
  140. if err != nil {
  141. return
  142. }
  143. inbounds[inbound_index].Settings = string(modifiedSettings)
  144. }
  145. // Add client traffic row for all clients which has email
  146. modelClients, err := s.GetClients(inbounds[inbound_index])
  147. if err != nil {
  148. return
  149. }
  150. for _, modelClient := range modelClients {
  151. if len(modelClient.Email) > 0 {
  152. var count int64
  153. tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
  154. if count == 0 {
  155. s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
  156. }
  157. }
  158. }
  159. // Heal clients table for installs where the one-shot seeder
  160. // skipped clients due to a tgId-string unmarshal error.
  161. if syncErr := s.clientService.SyncInbound(tx, inbounds[inbound_index].Id, modelClients); syncErr != nil {
  162. logger.Warning("MigrationRequirements sync clients failed:", syncErr)
  163. }
  164. }
  165. tx.Save(inbounds)
  166. // Remove orphaned traffics
  167. tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
  168. // Migrate old MultiDomain to External Proxy
  169. var externalProxy []struct {
  170. Id int
  171. Port int
  172. StreamSettings string // text column on both DBs; safer than []byte for cross-DB scan
  173. }
  174. externalProxyQuery := `select id, port, stream_settings
  175. from inbounds
  176. WHERE protocol in ('vmess','vless','trojan')
  177. AND json_extract(stream_settings, '$.security') = 'tls'
  178. AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`
  179. if database.IsPostgres() {
  180. externalProxyQuery = `select id, port, stream_settings
  181. from inbounds
  182. WHERE protocol in ('vmess','vless','trojan')
  183. AND NULLIF(stream_settings, '')::jsonb #>> '{security}' = 'tls'
  184. AND NULLIF(stream_settings, '')::jsonb #> '{tlsSettings,settings,domains}' IS NOT NULL`
  185. }
  186. err = tx.Raw(externalProxyQuery).Scan(&externalProxy).Error
  187. if err != nil || len(externalProxy) == 0 {
  188. return
  189. }
  190. for _, ep := range externalProxy {
  191. var reverses any
  192. var stream map[string]any
  193. json.Unmarshal([]byte(ep.StreamSettings), &stream)
  194. if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
  195. if settings, ok := tlsSettings["settings"].(map[string]any); ok {
  196. if domains, ok := settings["domains"].([]any); ok {
  197. for _, domain := range domains {
  198. if domainMap, ok := domain.(map[string]any); ok {
  199. domainMap["forceTls"] = "same"
  200. domainMap["port"] = ep.Port
  201. domainMap["dest"] = domainMap["domain"].(string)
  202. delete(domainMap, "domain")
  203. }
  204. }
  205. }
  206. reverses = settings["domains"]
  207. delete(settings, "domains")
  208. }
  209. }
  210. stream["externalProxy"] = reverses
  211. newStream, _ := json.MarshalIndent(stream, " ", " ")
  212. tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
  213. }
  214. // Legacy tag cleanup for old auto-generated tags (e.g. "0.0.0.0:443-...").
  215. // Must be cross-DB: INSTR/REPLACE work on SQLite; Postgres needs position().
  216. tagCleanup := `UPDATE inbounds
  217. SET tag = REPLACE(tag, '0.0.0.0:', '')
  218. WHERE INSTR(tag, '0.0.0.0:') > 0;`
  219. if database.IsPostgres() {
  220. tagCleanup = `UPDATE inbounds
  221. SET tag = REPLACE(tag, '0.0.0.0:', '')
  222. WHERE position('0.0.0.0:' in tag) > 0;`
  223. }
  224. err = tx.Raw(tagCleanup).Error
  225. if err != nil {
  226. return
  227. }
  228. }
  229. func (s *InboundService) MigrateDB() {
  230. s.MigrationRequirements()
  231. s.MigrationRemoveOrphanedTraffics()
  232. }