1
0

outbound.go 12 KB


  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "sync"
  11. "time"
  12. "github.com/mhsanaei/3x-ui/v2/config"
  13. "github.com/mhsanaei/3x-ui/v2/database"
  14. "github.com/mhsanaei/3x-ui/v2/database/model"
  15. "github.com/mhsanaei/3x-ui/v2/logger"
  16. "github.com/mhsanaei/3x-ui/v2/util/common"
  17. "github.com/mhsanaei/3x-ui/v2/util/json_util"
  18. "github.com/mhsanaei/3x-ui/v2/xray"
  19. "gorm.io/gorm"
  20. )
  21. // OutboundService provides business logic for managing Xray outbound configurations.
  22. // It handles outbound traffic monitoring and statistics.
  23. type OutboundService struct{}
  24. // testSemaphore limits concurrent outbound tests to prevent resource exhaustion.
  25. var testSemaphore sync.Mutex
  26. func (s *OutboundService) AddTraffic(traffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool) {
  27. var err error
  28. db := database.GetDB()
  29. tx := db.Begin()
  30. defer func() {
  31. if err != nil {
  32. tx.Rollback()
  33. } else {
  34. tx.Commit()
  35. }
  36. }()
  37. err = s.addOutboundTraffic(tx, traffics)
  38. if err != nil {
  39. return err, false
  40. }
  41. return nil, false
  42. }
  43. func (s *OutboundService) addOutboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
  44. if len(traffics) == 0 {
  45. return nil
  46. }
  47. var err error
  48. for _, traffic := range traffics {
  49. if traffic.IsOutbound {
  50. var outbound model.OutboundTraffics
  51. err = tx.Model(&model.OutboundTraffics{}).Where("tag = ?", traffic.Tag).
  52. FirstOrCreate(&outbound).Error
  53. if err != nil {
  54. return err
  55. }
  56. outbound.Tag = traffic.Tag
  57. outbound.Up = outbound.Up + traffic.Up
  58. outbound.Down = outbound.Down + traffic.Down
  59. outbound.Total = outbound.Up + outbound.Down
  60. err = tx.Save(&outbound).Error
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. }
  66. return nil
  67. }
  68. func (s *OutboundService) GetOutboundsTraffic() ([]*model.OutboundTraffics, error) {
  69. db := database.GetDB()
  70. var traffics []*model.OutboundTraffics
  71. err := db.Model(model.OutboundTraffics{}).Find(&traffics).Error
  72. if err != nil {
  73. logger.Warning("Error retrieving OutboundTraffics: ", err)
  74. return nil, err
  75. }
  76. return traffics, nil
  77. }
  78. func (s *OutboundService) ResetOutboundTraffic(tag string) error {
  79. db := database.GetDB()
  80. whereText := "tag "
  81. if tag == "-alltags-" {
  82. whereText += " <> ?"
  83. } else {
  84. whereText += " = ?"
  85. }
  86. result := db.Model(model.OutboundTraffics{}).
  87. Where(whereText, tag).
  88. Updates(map[string]any{"up": 0, "down": 0, "total": 0})
  89. err := result.Error
  90. if err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. // TestOutboundResult represents the result of testing an outbound
  96. type TestOutboundResult struct {
  97. Success bool `json:"success"`
  98. Delay int64 `json:"delay"` // Delay in milliseconds
  99. Error string `json:"error,omitempty"`
  100. StatusCode int `json:"statusCode,omitempty"`
  101. }
  102. // TestOutbound tests an outbound by creating a temporary xray instance and measuring response time.
  103. // allOutboundsJSON must be a JSON array of all outbounds; they are copied into the test config unchanged.
  104. // Only the test inbound and a route rule (to the tested outbound tag) are added.
  105. func (s *OutboundService) TestOutbound(outboundJSON string, testURL string, allOutboundsJSON string) (*TestOutboundResult, error) {
  106. if testURL == "" {
  107. testURL = "https://www.google.com/generate_204"
  108. }
  109. // Limit to one concurrent test at a time
  110. if !testSemaphore.TryLock() {
  111. return &TestOutboundResult{
  112. Success: false,
  113. Error: "Another outbound test is already running, please wait",
  114. }, nil
  115. }
  116. defer testSemaphore.Unlock()
  117. // Parse the outbound being tested to get its tag
  118. var testOutbound map[string]any
  119. if err := json.Unmarshal([]byte(outboundJSON), &testOutbound); err != nil {
  120. return &TestOutboundResult{
  121. Success: false,
  122. Error: fmt.Sprintf("Invalid outbound JSON: %v", err),
  123. }, nil
  124. }
  125. outboundTag, _ := testOutbound["tag"].(string)
  126. if outboundTag == "" {
  127. return &TestOutboundResult{
  128. Success: false,
  129. Error: "Outbound has no tag",
  130. }, nil
  131. }
  132. if protocol, _ := testOutbound["protocol"].(string); protocol == "blackhole" || outboundTag == "blocked" {
  133. return &TestOutboundResult{
  134. Success: false,
  135. Error: "Blocked/blackhole outbound cannot be tested",
  136. }, nil
  137. }
  138. // Use all outbounds when provided; otherwise fall back to single outbound
  139. var allOutbounds []any
  140. if allOutboundsJSON != "" {
  141. if err := json.Unmarshal([]byte(allOutboundsJSON), &allOutbounds); err != nil {
  142. return &TestOutboundResult{
  143. Success: false,
  144. Error: fmt.Sprintf("Invalid allOutbounds JSON: %v", err),
  145. }, nil
  146. }
  147. }
  148. if len(allOutbounds) == 0 {
  149. allOutbounds = []any{testOutbound}
  150. }
  151. // Find an available port for test inbound
  152. testPort, err := findAvailablePort()
  153. if err != nil {
  154. return &TestOutboundResult{
  155. Success: false,
  156. Error: fmt.Sprintf("Failed to find available port: %v", err),
  157. }, nil
  158. }
  159. // Copy all outbounds as-is, add only test inbound and route rule
  160. testConfig := s.createTestConfig(outboundTag, allOutbounds, testPort)
  161. // Use a temporary config file so the main config.json is never overwritten
  162. testConfigPath, err := createTestConfigPath()
  163. if err != nil {
  164. return &TestOutboundResult{
  165. Success: false,
  166. Error: fmt.Sprintf("Failed to create test config path: %v", err),
  167. }, nil
  168. }
  169. defer os.Remove(testConfigPath) // ensure temp file is removed even if process is not stopped
  170. // Create temporary xray process with its own config file
  171. testProcess := xray.NewTestProcess(testConfig, testConfigPath)
  172. defer func() {
  173. if testProcess.IsRunning() {
  174. testProcess.Stop()
  175. }
  176. }()
  177. // Start the test process
  178. if err := testProcess.Start(); err != nil {
  179. return &TestOutboundResult{
  180. Success: false,
  181. Error: fmt.Sprintf("Failed to start test xray instance: %v", err),
  182. }, nil
  183. }
  184. // Wait for xray to start listening on the test port
  185. if err := waitForPort(testPort, 3*time.Second); err != nil {
  186. if !testProcess.IsRunning() {
  187. result := testProcess.GetResult()
  188. return &TestOutboundResult{
  189. Success: false,
  190. Error: fmt.Sprintf("Xray process exited: %s", result),
  191. }, nil
  192. }
  193. return &TestOutboundResult{
  194. Success: false,
  195. Error: fmt.Sprintf("Xray failed to start listening: %v", err),
  196. }, nil
  197. }
  198. // Check if process is still running
  199. if !testProcess.IsRunning() {
  200. result := testProcess.GetResult()
  201. return &TestOutboundResult{
  202. Success: false,
  203. Error: fmt.Sprintf("Xray process exited: %s", result),
  204. }, nil
  205. }
  206. // Test the connection through proxy
  207. delay, statusCode, err := s.testConnection(testPort, testURL)
  208. if err != nil {
  209. return &TestOutboundResult{
  210. Success: false,
  211. Error: err.Error(),
  212. }, nil
  213. }
  214. return &TestOutboundResult{
  215. Success: true,
  216. Delay: delay,
  217. StatusCode: statusCode,
  218. }, nil
  219. }
  220. // createTestConfig creates a test config by copying all outbounds unchanged and adding
  221. // only the test inbound (SOCKS) and a route rule that sends traffic to the given outbound tag.
  222. func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []any, testPort int) *xray.Config {
  223. // Test inbound (SOCKS proxy) - only addition to inbounds
  224. testInbound := xray.InboundConfig{
  225. Tag: "test-inbound",
  226. Listen: json_util.RawMessage(`"127.0.0.1"`),
  227. Port: testPort,
  228. Protocol: "socks",
  229. Settings: json_util.RawMessage(`{"auth":"noauth","udp":true}`),
  230. }
  231. // Outbounds: copy all, but set noKernelTun=true for WireGuard outbounds
  232. processedOutbounds := make([]any, len(allOutbounds))
  233. for i, ob := range allOutbounds {
  234. outbound, ok := ob.(map[string]any)
  235. if !ok {
  236. processedOutbounds[i] = ob
  237. continue
  238. }
  239. if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
  240. // Set noKernelTun to true for WireGuard outbounds
  241. if settings, ok := outbound["settings"].(map[string]any); ok {
  242. settings["noKernelTun"] = true
  243. } else {
  244. // Create settings if it doesn't exist
  245. outbound["settings"] = map[string]any{
  246. "noKernelTun": true,
  247. }
  248. }
  249. }
  250. processedOutbounds[i] = outbound
  251. }
  252. outboundsJSON, _ := json.Marshal(processedOutbounds)
  253. // Create routing rule to route all traffic through test outbound
  254. routingRules := []map[string]any{
  255. {
  256. "type": "field",
  257. "outboundTag": outboundTag,
  258. "network": "tcp,udp",
  259. },
  260. }
  261. routingJSON, _ := json.Marshal(map[string]any{
  262. "domainStrategy": "AsIs",
  263. "rules": routingRules,
  264. })
  265. // Disable logging for test process to avoid creating orphaned log files
  266. logConfig := map[string]any{
  267. "loglevel": "warning",
  268. "access": "none",
  269. "error": "none",
  270. "dnsLog": false,
  271. }
  272. logJSON, _ := json.Marshal(logConfig)
  273. // Create minimal config
  274. cfg := &xray.Config{
  275. LogConfig: json_util.RawMessage(logJSON),
  276. InboundConfigs: []xray.InboundConfig{
  277. testInbound,
  278. },
  279. OutboundConfigs: json_util.RawMessage(string(outboundsJSON)),
  280. RouterConfig: json_util.RawMessage(string(routingJSON)),
  281. Policy: json_util.RawMessage(`{}`),
  282. Stats: json_util.RawMessage(`{}`),
  283. }
  284. return cfg
  285. }
  286. // testConnection tests the connection through the proxy and measures delay.
  287. // It performs a warmup request first to establish the SOCKS connection and populate DNS caches,
  288. // then measures the second request for a more accurate latency reading.
  289. func (s *OutboundService) testConnection(proxyPort int, testURL string) (int64, int, error) {
  290. // Create SOCKS5 proxy URL
  291. proxyURL := fmt.Sprintf("socks5://127.0.0.1:%d", proxyPort)
  292. // Parse proxy URL
  293. proxyURLParsed, err := url.Parse(proxyURL)
  294. if err != nil {
  295. return 0, 0, common.NewErrorf("Invalid proxy URL: %v", err)
  296. }
  297. // Create HTTP client with proxy and keep-alive for connection reuse
  298. client := &http.Client{
  299. Timeout: 10 * time.Second,
  300. Transport: &http.Transport{
  301. Proxy: http.ProxyURL(proxyURLParsed),
  302. DialContext: (&net.Dialer{
  303. Timeout: 5 * time.Second,
  304. KeepAlive: 30 * time.Second,
  305. }).DialContext,
  306. MaxIdleConns: 1,
  307. IdleConnTimeout: 10 * time.Second,
  308. DisableCompression: true,
  309. },
  310. }
  311. // Warmup request: establishes SOCKS/TLS connection, DNS, and TCP to the target.
  312. // This mirrors real-world usage where connections are reused.
  313. warmupResp, err := client.Get(testURL)
  314. if err != nil {
  315. return 0, 0, common.NewErrorf("Request failed: %v", err)
  316. }
  317. io.Copy(io.Discard, warmupResp.Body)
  318. warmupResp.Body.Close()
  319. // Measure the actual request on the warm connection
  320. startTime := time.Now()
  321. resp, err := client.Get(testURL)
  322. delay := time.Since(startTime).Milliseconds()
  323. if err != nil {
  324. return 0, 0, common.NewErrorf("Request failed: %v", err)
  325. }
  326. io.Copy(io.Discard, resp.Body)
  327. resp.Body.Close()
  328. return delay, resp.StatusCode, nil
  329. }
  330. // waitForPort polls until the given TCP port is accepting connections or the timeout expires.
  331. func waitForPort(port int, timeout time.Duration) error {
  332. deadline := time.Now().Add(timeout)
  333. for time.Now().Before(deadline) {
  334. conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 100*time.Millisecond)
  335. if err == nil {
  336. conn.Close()
  337. return nil
  338. }
  339. time.Sleep(50 * time.Millisecond)
  340. }
  341. return fmt.Errorf("port %d not ready after %v", port, timeout)
  342. }
  343. // findAvailablePort finds an available port for testing
  344. func findAvailablePort() (int, error) {
  345. listener, err := net.Listen("tcp", ":0")
  346. if err != nil {
  347. return 0, err
  348. }
  349. defer listener.Close()
  350. addr := listener.Addr().(*net.TCPAddr)
  351. return addr.Port, nil
  352. }
  353. // createTestConfigPath returns a unique path for a temporary xray config file in the bin folder.
  354. // The temp file is created and closed so the path is reserved; Start() will overwrite it.
  355. func createTestConfigPath() (string, error) {
  356. tmpFile, err := os.CreateTemp(config.GetBinFolderPath(), "xray_test_*.json")
  357. if err != nil {
  358. return "", err
  359. }
  360. path := tmpFile.Name()
  361. if err := tmpFile.Close(); err != nil {
  362. os.Remove(path)
  363. return "", err
  364. }
  365. return path, nil
  366. }