1
0

outbound.go 12 KB


  1. package service
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "sync"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v2/config"
  14. "github.com/mhsanaei/3x-ui/v2/database"
  15. "github.com/mhsanaei/3x-ui/v2/database/model"
  16. "github.com/mhsanaei/3x-ui/v2/logger"
  17. "github.com/mhsanaei/3x-ui/v2/util/common"
  18. "github.com/mhsanaei/3x-ui/v2/util/json_util"
  19. "github.com/mhsanaei/3x-ui/v2/xray"
  20. "gorm.io/gorm"
  21. )
  22. // OutboundService provides business logic for managing Xray outbound configurations.
  23. // It handles outbound traffic monitoring and statistics.
  24. type OutboundService struct{}
  25. // testSemaphore limits concurrent outbound tests to prevent resource exhaustion.
  26. var testSemaphore sync.Mutex
  27. func (s *OutboundService) AddTraffic(traffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool) {
  28. var err error
  29. db := database.GetDB()
  30. tx := db.Begin()
  31. defer func() {
  32. if err != nil {
  33. tx.Rollback()
  34. } else {
  35. tx.Commit()
  36. }
  37. }()
  38. err = s.addOutboundTraffic(tx, traffics)
  39. if err != nil {
  40. return err, false
  41. }
  42. return nil, false
  43. }
  44. func (s *OutboundService) addOutboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  45. if len(traffics) == 0 {
  46. return nil
  47. }
  48. var err error
  49. for _, traffic := range traffics {
  50. if traffic.IsOutbound {
  51. var outbound model.OutboundTraffics
  52. err = tx.Model(&model.OutboundTraffics{}).Where("tag = ?", traffic.Tag).
  53. FirstOrCreate(&outbound).Error
  54. if err != nil {
  55. return err
  56. }
  57. outbound.Tag = traffic.Tag
  58. outbound.Up = outbound.Up + traffic.Up
  59. outbound.Down = outbound.Down + traffic.Down
  60. outbound.Total = outbound.Up + outbound.Down
  61. err = tx.Save(&outbound).Error
  62. if err != nil {
  63. return err
  64. }
  65. }
  66. }
  67. return nil
  68. }
  69. func (s *OutboundService) GetOutboundsTraffic() ([]*model.OutboundTraffics, error) {
  70. db := database.GetDB()
  71. var traffics []*model.OutboundTraffics
  72. err := db.Model(model.OutboundTraffics{}).Find(&traffics).Error
  73. if err != nil {
  74. logger.Warning("Error retrieving OutboundTraffics: ", err)
  75. return nil, err
  76. }
  77. return traffics, nil
  78. }
  79. func (s *OutboundService) ResetOutboundTraffic(tag string) error {
  80. db := database.GetDB()
  81. whereText := "tag "
  82. if tag == "-alltags-" {
  83. whereText += " <> ?"
  84. } else {
  85. whereText += " = ?"
  86. }
  87. result := db.Model(model.OutboundTraffics{}).
  88. Where(whereText, tag).
  89. Updates(map[string]any{"up": 0, "down": 0, "total": 0})
  90. err := result.Error
  91. if err != nil {
  92. return err
  93. }
  94. return nil
  95. }
  96. // TestOutboundResult represents the result of testing an outbound
  97. type TestOutboundResult struct {
  98. Success bool `json:"success"`
  99. Delay int64 `json:"delay"` // Delay in milliseconds
  100. Error string `json:"error,omitempty"`
  101. StatusCode int `json:"statusCode,omitempty"`
  102. }
  103. // TestOutbound tests an outbound by creating a temporary xray instance and measuring response time.
  104. // allOutboundsJSON must be a JSON array of all outbounds; they are copied into the test config unchanged.
  105. // Only the test inbound and a route rule (to the tested outbound tag) are added.
  106. func (s *OutboundService) TestOutbound(outboundJSON string, testURL string, allOutboundsJSON string) (*TestOutboundResult, error) {
  107. if testURL == "" {
  108. testURL = "https://www.google.com/generate_204"
  109. }
  110. // Limit to one concurrent test at a time
  111. if !testSemaphore.TryLock() {
  112. return &TestOutboundResult{
  113. Success: false,
  114. Error: "Another outbound test is already running, please wait",
  115. }, nil
  116. }
  117. defer testSemaphore.Unlock()
  118. // Parse the outbound being tested to get its tag
  119. var testOutbound map[string]any
  120. if err := json.Unmarshal([]byte(outboundJSON), &testOutbound); err != nil {
  121. return &TestOutboundResult{
  122. Success: false,
  123. Error: fmt.Sprintf("Invalid outbound JSON: %v", err),
  124. }, nil
  125. }
  126. outboundTag, _ := testOutbound["tag"].(string)
  127. if outboundTag == "" {
  128. return &TestOutboundResult{
  129. Success: false,
  130. Error: "Outbound has no tag",
  131. }, nil
  132. }
  133. if protocol, _ := testOutbound["protocol"].(string); protocol == "blackhole" || outboundTag == "blocked" {
  134. return &TestOutboundResult{
  135. Success: false,
  136. Error: "Blocked/blackhole outbound cannot be tested",
  137. }, nil
  138. }
  139. // Use all outbounds when provided; otherwise fall back to single outbound
  140. var allOutbounds []any
  141. if allOutboundsJSON != "" {
  142. if err := json.Unmarshal([]byte(allOutboundsJSON), &allOutbounds); err != nil {
  143. return &TestOutboundResult{
  144. Success: false,
  145. Error: fmt.Sprintf("Invalid allOutbounds JSON: %v", err),
  146. }, nil
  147. }
  148. }
  149. if len(allOutbounds) == 0 {
  150. allOutbounds = []any{testOutbound}
  151. }
  152. // Find an available port for test inbound
  153. testPort, err := findAvailablePort()
  154. if err != nil {
  155. return &TestOutboundResult{
  156. Success: false,
  157. Error: fmt.Sprintf("Failed to find available port: %v", err),
  158. }, nil
  159. }
  160. // Copy all outbounds as-is, add only test inbound and route rule
  161. testConfig := s.createTestConfig(outboundTag, allOutbounds, testPort)
  162. // Use a temporary config file so the main config.json is never overwritten
  163. testConfigPath, err := createTestConfigPath()
  164. if err != nil {
  165. return &TestOutboundResult{
  166. Success: false,
  167. Error: fmt.Sprintf("Failed to create test config path: %v", err),
  168. }, nil
  169. }
  170. defer os.Remove(testConfigPath) // ensure temp file is removed even if process is not stopped
  171. // Create temporary xray process with its own config file
  172. testProcess := xray.NewTestProcess(testConfig, testConfigPath)
  173. defer func() {
  174. if testProcess.IsRunning() {
  175. testProcess.Stop()
  176. }
  177. }()
  178. // Start the test process
  179. if err := testProcess.Start(); err != nil {
  180. return &TestOutboundResult{
  181. Success: false,
  182. Error: fmt.Sprintf("Failed to start test xray instance: %v", err),
  183. }, nil
  184. }
  185. // Wait for xray to start listening on the test port
  186. if err := waitForPort(testPort, 3*time.Second); err != nil {
  187. if !testProcess.IsRunning() {
  188. result := testProcess.GetResult()
  189. return &TestOutboundResult{
  190. Success: false,
  191. Error: fmt.Sprintf("Xray process exited: %s", result),
  192. }, nil
  193. }
  194. return &TestOutboundResult{
  195. Success: false,
  196. Error: fmt.Sprintf("Xray failed to start listening: %v", err),
  197. }, nil
  198. }
  199. // Check if process is still running
  200. if !testProcess.IsRunning() {
  201. result := testProcess.GetResult()
  202. return &TestOutboundResult{
  203. Success: false,
  204. Error: fmt.Sprintf("Xray process exited: %s", result),
  205. }, nil
  206. }
  207. // Test the connection through proxy
  208. delay, statusCode, err := s.testConnection(testPort, testURL)
  209. if err != nil {
  210. return &TestOutboundResult{
  211. Success: false,
  212. Error: err.Error(),
  213. }, nil
  214. }
  215. return &TestOutboundResult{
  216. Success: true,
  217. Delay: delay,
  218. StatusCode: statusCode,
  219. }, nil
  220. }
  221. // createTestConfig creates a test config by copying all outbounds unchanged and adding
  222. // only the test inbound (SOCKS) and a route rule that sends traffic to the given outbound tag.
  223. func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []any, testPort int) *xray.Config {
  224. // Test inbound (SOCKS proxy) - only addition to inbounds
  225. testInbound := xray.InboundConfig{
  226. Tag: "test-inbound",
  227. Listen: json_util.RawMessage(`"127.0.0.1"`),
  228. Port: testPort,
  229. Protocol: "socks",
  230. Settings: json_util.RawMessage(`{"auth":"noauth","udp":true}`),
  231. }
  232. // Outbounds: copy all, but set noKernelTun=true for WireGuard outbounds
  233. processedOutbounds := make([]any, len(allOutbounds))
  234. for i, ob := range allOutbounds {
  235. outbound, ok := ob.(map[string]any)
  236. if !ok {
  237. processedOutbounds[i] = ob
  238. continue
  239. }
  240. if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
  241. // Set noKernelTun to true for WireGuard outbounds
  242. if settings, ok := outbound["settings"].(map[string]any); ok {
  243. settings["noKernelTun"] = true
  244. } else {
  245. // Create settings if it doesn't exist
  246. outbound["settings"] = map[string]any{
  247. "noKernelTun": true,
  248. }
  249. }
  250. }
  251. processedOutbounds[i] = outbound
  252. }
  253. outboundsJSON, _ := json.Marshal(processedOutbounds)
  254. // Create routing rule to route all traffic through test outbound
  255. routingRules := []map[string]any{
  256. {
  257. "type": "field",
  258. "outboundTag": outboundTag,
  259. "network": "tcp,udp",
  260. },
  261. }
  262. routingJSON, _ := json.Marshal(map[string]any{
  263. "domainStrategy": "AsIs",
  264. "rules": routingRules,
  265. })
  266. // Disable logging for test process to avoid creating orphaned log files
  267. logConfig := map[string]any{
  268. "loglevel": "warning",
  269. "access": "none",
  270. "error": "none",
  271. "dnsLog": false,
  272. }
  273. logJSON, _ := json.Marshal(logConfig)
  274. // Create minimal config
  275. cfg := &xray.Config{
  276. LogConfig: json_util.RawMessage(logJSON),
  277. InboundConfigs: []xray.InboundConfig{
  278. testInbound,
  279. },
  280. OutboundConfigs: json_util.RawMessage(string(outboundsJSON)),
  281. RouterConfig: json_util.RawMessage(string(routingJSON)),
  282. Policy: json_util.RawMessage(`{}`),
  283. Stats: json_util.RawMessage(`{}`),
  284. }
  285. return cfg
  286. }
  287. // testConnection tests the connection through the proxy and measures delay.
  288. // It performs a warmup request first to establish the SOCKS connection and populate DNS caches,
  289. // then measures the second request for a more accurate latency reading.
  290. func (s *OutboundService) testConnection(proxyPort int, testURL string) (int64, int, error) {
  291. // Create SOCKS5 proxy URL
  292. proxyURL := fmt.Sprintf("socks5://127.0.0.1:%d", proxyPort)
  293. // Parse proxy URL
  294. proxyURLParsed, err := url.Parse(proxyURL)
  295. if err != nil {
  296. return 0, 0, common.NewErrorf("Invalid proxy URL: %v", err)
  297. }
  298. // Create HTTP client with proxy and keep-alive for connection reuse
  299. client := &http.Client{
  300. Timeout: 10 * time.Second,
  301. Transport: &http.Transport{
  302. Proxy: http.ProxyURL(proxyURLParsed),
  303. DialContext: (&net.Dialer{
  304. Timeout: 5 * time.Second,
  305. KeepAlive: 30 * time.Second,
  306. }).DialContext,
  307. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  308. MaxIdleConns: 1,
  309. IdleConnTimeout: 10 * time.Second,
  310. DisableCompression: true,
  311. },
  312. }
  313. // Warmup request: establishes SOCKS/TLS connection, DNS, and TCP to the target.
  314. // This mirrors real-world usage where connections are reused.
  315. warmupResp, err := client.Get(testURL)
  316. if err != nil {
  317. return 0, 0, common.NewErrorf("Request failed: %v", err)
  318. }
  319. io.Copy(io.Discard, warmupResp.Body)
  320. warmupResp.Body.Close()
  321. // Measure the actual request on the warm connection
  322. startTime := time.Now()
  323. resp, err := client.Get(testURL)
  324. delay := time.Since(startTime).Milliseconds()
  325. if err != nil {
  326. return 0, 0, common.NewErrorf("Request failed: %v", err)
  327. }
  328. io.Copy(io.Discard, resp.Body)
  329. resp.Body.Close()
  330. return delay, resp.StatusCode, nil
  331. }
  332. // waitForPort polls until the given TCP port is accepting connections or the timeout expires.
  333. func waitForPort(port int, timeout time.Duration) error {
  334. deadline := time.Now().Add(timeout)
  335. for time.Now().Before(deadline) {
  336. conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 100*time.Millisecond)
  337. if err == nil {
  338. conn.Close()
  339. return nil
  340. }
  341. time.Sleep(50 * time.Millisecond)
  342. }
  343. return fmt.Errorf("port %d not ready after %v", port, timeout)
  344. }
  345. // findAvailablePort finds an available port for testing
  346. func findAvailablePort() (int, error) {
  347. listener, err := net.Listen("tcp", ":0")
  348. if err != nil {
  349. return 0, err
  350. }
  351. defer listener.Close()
  352. addr := listener.Addr().(*net.TCPAddr)
  353. return addr.Port, nil
  354. }
  355. // createTestConfigPath returns a unique path for a temporary xray config file in the bin folder.
  356. // The temp file is created and closed so the path is reserved; Start() will overwrite it.
  357. func createTestConfigPath() (string, error) {
  358. tmpFile, err := os.CreateTemp(config.GetBinFolderPath(), "xray_test_*.json")
  359. if err != nil {
  360. return "", err
  361. }
  362. path := tmpFile.Name()
  363. if err := tmpFile.Close(); err != nil {
  364. os.Remove(path)
  365. return "", err
  366. }
  367. return path, nil
  368. }