outbound.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package outbound
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database"
  11. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  12. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // OutboundService provides business logic for managing Xray outbound configurations.
  17. // It handles outbound traffic monitoring and statistics.
  18. type OutboundService struct{}
  19. func (s *OutboundService) AddTraffic(traffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool) {
  20. var err error
  21. db := database.GetDB()
  22. tx := db.Begin()
  23. defer func() {
  24. if err != nil {
  25. tx.Rollback()
  26. } else {
  27. tx.Commit()
  28. }
  29. }()
  30. err = s.addOutboundTraffic(tx, traffics)
  31. if err != nil {
  32. return err, false
  33. }
  34. return nil, false
  35. }
  36. func (s *OutboundService) addOutboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  37. if len(traffics) == 0 {
  38. return nil
  39. }
  40. var err error
  41. for _, traffic := range traffics {
  42. if traffic.IsOutbound {
  43. var outbound model.OutboundTraffics
  44. err = tx.Model(&model.OutboundTraffics{}).Where("tag = ?", traffic.Tag).
  45. FirstOrCreate(&outbound).Error
  46. if err != nil {
  47. return err
  48. }
  49. outbound.Tag = traffic.Tag
  50. outbound.Up = outbound.Up + traffic.Up
  51. outbound.Down = outbound.Down + traffic.Down
  52. outbound.Total = outbound.Up + outbound.Down
  53. err = tx.Save(&outbound).Error
  54. if err != nil {
  55. return err
  56. }
  57. }
  58. }
  59. return nil
  60. }
  61. func (s *OutboundService) GetOutboundsTraffic() ([]*model.OutboundTraffics, error) {
  62. db := database.GetDB()
  63. var traffics []*model.OutboundTraffics
  64. err := db.Model(model.OutboundTraffics{}).Find(&traffics).Error
  65. if err != nil {
  66. logger.Warning("Error retrieving OutboundTraffics: ", err)
  67. return nil, err
  68. }
  69. return traffics, nil
  70. }
  71. func (s *OutboundService) ResetOutboundTraffic(tag string) error {
  72. db := database.GetDB()
  73. whereText := "tag "
  74. if tag == "-alltags-" {
  75. whereText += " <> ?"
  76. } else {
  77. whereText += " = ?"
  78. }
  79. result := db.Model(model.OutboundTraffics{}).
  80. Where(whereText, tag).
  81. Updates(map[string]any{"up": 0, "down": 0, "total": 0})
  82. err := result.Error
  83. if err != nil {
  84. return err
  85. }
  86. return nil
  87. }
  88. // TestOutboundResult represents the result of testing an outbound.
  89. // Delay is in milliseconds. Endpoints is only populated for TCP-mode
  90. // probes; HTTP mode reports the time of a real HTTP request routed
  91. // through the outbound, with an optional timing breakdown.
  92. type TestOutboundResult struct {
  93. Tag string `json:"tag,omitempty"`
  94. Success bool `json:"success"`
  95. Delay int64 `json:"delay"`
  96. Error string `json:"error,omitempty"`
  97. Mode string `json:"mode,omitempty"`
  98. // HTTP-mode extras. Any HTTP response counts as reachable; HTTPStatus
  99. // records what the test URL answered. ConnectMs is the dial to the local
  100. // test inbound; TLSMs covers outbound-chain establishment + target TLS
  101. // (https URLs only, since xray ACKs the SOCKS CONNECT before dialing
  102. // upstream); TTFBMs is request start → first response byte.
  103. HTTPStatus int `json:"httpStatus,omitempty"`
  104. ConnectMs int64 `json:"connectMs,omitempty"`
  105. TLSMs int64 `json:"tlsMs,omitempty"`
  106. TTFBMs int64 `json:"ttfbMs,omitempty"`
  107. Endpoints []TestEndpointResult `json:"endpoints,omitempty"`
  108. }
  109. // TestEndpointResult is one entry in a TCP-mode probe — the per-endpoint
  110. // dial outcome for outbounds that expose multiple servers/peers.
  111. type TestEndpointResult struct {
  112. Address string `json:"address"`
  113. Success bool `json:"success"`
  114. Delay int64 `json:"delay"`
  115. Error string `json:"error,omitempty"`
  116. }
  117. func (s *OutboundService) testOutboundTCP(outboundJSON string) (*TestOutboundResult, error) {
  118. var ob map[string]any
  119. if err := json.Unmarshal([]byte(outboundJSON), &ob); err != nil {
  120. return &TestOutboundResult{Mode: "tcp", Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}, nil
  121. }
  122. tag, _ := ob["tag"].(string)
  123. protocol, _ := ob["protocol"].(string)
  124. if protocol == "blackhole" || protocol == "freedom" || tag == "blocked" {
  125. return &TestOutboundResult{Tag: tag, Mode: "tcp", Success: false, Error: "Outbound has no testable endpoint"}, nil
  126. }
  127. endpoints := extractOutboundEndpoints(ob)
  128. if len(endpoints) == 0 {
  129. return &TestOutboundResult{Tag: tag, Mode: "tcp", Success: false, Error: "No testable endpoint"}, nil
  130. }
  131. results := make([]TestEndpointResult, len(endpoints))
  132. var wg sync.WaitGroup
  133. for i := range endpoints {
  134. wg.Add(1)
  135. go func(i int) {
  136. defer wg.Done()
  137. results[i] = probeTCPEndpoint(endpoints[i], 5*time.Second)
  138. }(i)
  139. }
  140. wg.Wait()
  141. var bestDelay int64 = -1
  142. var firstErr string
  143. for _, r := range results {
  144. if r.Success {
  145. if bestDelay < 0 || r.Delay < bestDelay {
  146. bestDelay = r.Delay
  147. }
  148. } else if firstErr == "" {
  149. firstErr = r.Error
  150. }
  151. }
  152. out := &TestOutboundResult{Tag: tag, Mode: "tcp", Endpoints: results}
  153. if bestDelay >= 0 {
  154. out.Success = true
  155. out.Delay = bestDelay
  156. } else {
  157. out.Error = firstErr
  158. if out.Error == "" {
  159. out.Error = "All endpoints unreachable"
  160. }
  161. }
  162. return out, nil
  163. }
  164. func probeTCPEndpoint(endpoint string, timeout time.Duration) TestEndpointResult {
  165. r := TestEndpointResult{Address: endpoint}
  166. start := time.Now()
  167. conn, err := (&net.Dialer{Timeout: timeout}).DialContext(context.Background(), "tcp", endpoint)
  168. r.Delay = time.Since(start).Milliseconds()
  169. if err != nil {
  170. r.Error = err.Error()
  171. return r
  172. }
  173. conn.Close()
  174. r.Success = true
  175. return r
  176. }
  177. // outboundTransportIsUDP reports whether the outbound's proxy speaks UDP
  178. // (wireguard, hysteria, or a kcp/quic/hysteria stream transport). A bare
  179. // UDP dial can't probe these — they ignore unauthenticated packets, so a
  180. // dial neither proves reachability nor measures latency. Such outbounds
  181. // must go through the real xray handshake probe instead.
  182. func outboundTransportIsUDP(ob map[string]any) bool {
  183. if protocol, _ := ob["protocol"].(string); protocol == "hysteria" || protocol == "wireguard" {
  184. return true
  185. }
  186. if stream, ok := ob["streamSettings"].(map[string]any); ok {
  187. if n, _ := stream["network"].(string); n == "hysteria" || n == "kcp" || n == "quic" {
  188. return true
  189. }
  190. }
  191. return false
  192. }
  193. func extractOutboundEndpoints(ob map[string]any) []string {
  194. protocol, _ := ob["protocol"].(string)
  195. settings, _ := ob["settings"].(map[string]any)
  196. if settings == nil {
  197. return nil
  198. }
  199. var out []string
  200. addServer := func(addr any, port any) {
  201. host, _ := addr.(string)
  202. p := numAsInt(port)
  203. if host != "" && p > 0 {
  204. out = append(out, fmt.Sprintf("%s:%d", host, p))
  205. }
  206. }
  207. switch protocol {
  208. case "vmess":
  209. if vnext, ok := settings["vnext"].([]any); ok {
  210. for _, v := range vnext {
  211. if vm, ok := v.(map[string]any); ok {
  212. addServer(vm["address"], vm["port"])
  213. }
  214. }
  215. }
  216. case "vless":
  217. addServer(settings["address"], settings["port"])
  218. case "hysteria":
  219. addServer(settings["address"], settings["port"])
  220. case "trojan", "shadowsocks", "http", "socks":
  221. if servers, ok := settings["servers"].([]any); ok {
  222. for _, sv := range servers {
  223. if sm, ok := sv.(map[string]any); ok {
  224. addServer(sm["address"], sm["port"])
  225. }
  226. }
  227. }
  228. case "wireguard":
  229. if peers, ok := settings["peers"].([]any); ok {
  230. for _, p := range peers {
  231. if pm, ok := p.(map[string]any); ok {
  232. if ep, _ := pm["endpoint"].(string); ep != "" {
  233. out = append(out, ep)
  234. }
  235. }
  236. }
  237. }
  238. }
  239. return out
  240. }
  241. func numAsInt(v any) int {
  242. switch n := v.(type) {
  243. case float64:
  244. return int(n)
  245. case int:
  246. return n
  247. case int64:
  248. return int(n)
  249. case string:
  250. if i, err := strconv.Atoi(n); err == nil {
  251. return i
  252. }
  253. }
  254. return 0
  255. }