probe_http.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. package outbound
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/fs"
  9. "net"
  10. "net/http"
  11. "net/http/httptrace"
  12. "net/url"
  13. "os"
  14. "strconv"
  15. "sync"
  16. "time"
  17. "github.com/mhsanaei/3x-ui/v3/internal/config"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/json_util"
  19. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  20. )
  21. // HTTP-mode probing works by spinning up ONE temporary xray instance per
  22. // batch: every outbound under test gets its own loopback SOCKS inbound plus
  23. // an inboundTag→outboundTag routing rule, and the panel then issues a real,
  24. // individually-timed HTTP request through each inbound. Measuring the request
  25. // client-side (instead of polling xray's observatory) returns the moment the
  26. // response lands, yields the actual HTTP status, and allows an httptrace
  27. // timing breakdown — while the shared process keeps "Test All" at one xray
  28. // spawn per batch instead of one per outbound.
  29. const (
  30. // httpProbeTimeout bounds one probe request end-to-end.
  31. httpProbeTimeout = 10 * time.Second
  32. // httpProbeConcurrency caps parallel probe requests within a batch —
  33. // enough to keep a batch fast, low enough not to spike CPU with TLS
  34. // handshakes on small VPSes.
  35. httpProbeConcurrency = 16
  36. // batchPortsReadyTimeout bounds the wait for the temp instance to open
  37. // its test inbounds.
  38. batchPortsReadyTimeout = 10 * time.Second
  39. // maxBatchItems caps one batch request; the frontend chunks below this.
  40. maxBatchItems = 50
  41. // tcpBatchConcurrency caps parallel TCP-mode items in a batch (each item
  42. // already dials its endpoints concurrently).
  43. tcpBatchConcurrency = 8
  44. defaultTestURL = "https://www.google.com/generate_204"
  45. )
  46. // httpTestSemaphore serialises HTTP-mode batches (each spawns a temp xray
  47. // instance, which is too expensive to run in parallel). TCP-mode probes are
  48. // dial-only and don't need the semaphore.
  49. var httpTestSemaphore sync.Mutex
  50. // batchProcess is the slice of xray.Process the batch engine needs; a seam
  51. // so unit tests can stub the process without an xray binary.
  52. type batchProcess interface {
  53. Start() error
  54. Stop() error
  55. IsRunning() bool
  56. GetResult() string
  57. }
  58. var newBatchProcess = func(cfg *xray.Config, configPath string) batchProcess {
  59. return xray.NewTestProcess(cfg, configPath)
  60. }
  61. // httpBatchItem is one outbound inside an HTTP-mode batch. result is the
  62. // pre-allocated entry in the caller's result slice, filled in place.
  63. type httpBatchItem struct {
  64. index int
  65. tag string
  66. outbound map[string]any
  67. result *TestOutboundResult
  68. }
  69. // TestOutbound probes a single outbound; legacy single-test API kept for the
  70. // /testOutbound endpoint. Dispatch matches TestOutbounds: mode "tcp" dials
  71. // the outbound's endpoints directly, anything else routes a real HTTP request
  72. // through a temp xray instance (UDP-transport outbounds are always forced to
  73. // the HTTP probe — a raw dial can't measure them).
  74. func (s *OutboundService) TestOutbound(outboundJSON string, testURL string, allOutboundsJSON string, mode string) (*TestOutboundResult, error) {
  75. var ob map[string]any
  76. if err := json.Unmarshal([]byte(outboundJSON), &ob); err != nil {
  77. m := "http"
  78. if mode == "tcp" {
  79. m = "tcp"
  80. }
  81. return &TestOutboundResult{Mode: m, Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}, nil
  82. }
  83. results := s.testOutboundsParsed([]map[string]any{ob}, testURL, allOutboundsJSON, mode)
  84. return results[0], nil
  85. }
  86. // TestOutbounds probes a JSON array of outbounds and returns one result per
  87. // input, in input order, each carrying the outbound's tag. allOutboundsJSON
  88. // supplies the config context (sockopt.dialerProxy chains); testURL falls
  89. // back to the default probe URL when empty.
  90. func (s *OutboundService) TestOutbounds(outboundsJSON string, testURL string, allOutboundsJSON string, mode string) ([]*TestOutboundResult, error) {
  91. var raw []json.RawMessage
  92. if err := json.Unmarshal([]byte(outboundsJSON), &raw); err != nil {
  93. return nil, fmt.Errorf("invalid outbounds JSON: %v", err)
  94. }
  95. if len(raw) > maxBatchItems {
  96. return nil, fmt.Errorf("too many outbounds in one request (max %d)", maxBatchItems)
  97. }
  98. items := make([]map[string]any, len(raw))
  99. for i, r := range raw {
  100. var ob map[string]any
  101. if err := json.Unmarshal(r, &ob); err == nil {
  102. items[i] = ob
  103. }
  104. }
  105. return s.testOutboundsParsed(items, testURL, allOutboundsJSON, mode), nil
  106. }
  107. // testOutboundsParsed splits items into the TCP lane (direct dials, bounded
  108. // worker pool) and the HTTP lane (one shared temp xray instance), runs both,
  109. // and returns results aligned with items. A nil item marks unparseable input.
  110. func (s *OutboundService) testOutboundsParsed(items []map[string]any, testURL string, allOutboundsJSON string, mode string) []*TestOutboundResult {
  111. results := make([]*TestOutboundResult, len(items))
  112. modeLabel := "http"
  113. if mode == "tcp" {
  114. modeLabel = "tcp"
  115. }
  116. type tcpEntry struct {
  117. idx int
  118. ob map[string]any
  119. }
  120. var tcpLane []tcpEntry
  121. var httpItems []*httpBatchItem
  122. seenTags := make(map[string]bool)
  123. for i, ob := range items {
  124. if ob == nil {
  125. results[i] = &TestOutboundResult{Mode: modeLabel, Success: false, Error: "Invalid outbound JSON"}
  126. continue
  127. }
  128. // A bare TCP dial only proves reachability for TCP-based proxies.
  129. // UDP protocols (wireguard, hysteria, kcp/quic transports) ignore
  130. // unauthenticated packets, so a raw dial can't tell "reachable" from
  131. // "dead" — route them through the real xray probe.
  132. if mode == "tcp" && !outboundTransportIsUDP(ob) {
  133. tcpLane = append(tcpLane, tcpEntry{idx: i, ob: ob})
  134. continue
  135. }
  136. tag, _ := ob["tag"].(string)
  137. r := &TestOutboundResult{Tag: tag, Mode: "http"}
  138. results[i] = r
  139. protocol, _ := ob["protocol"].(string)
  140. switch {
  141. case tag == "":
  142. r.Error = "Outbound has no tag"
  143. case protocol == "blackhole" || tag == "blocked":
  144. r.Error = "Blocked/blackhole outbound cannot be tested"
  145. case protocol == "loopback":
  146. r.Error = "Loopback outbound cannot be tested"
  147. case seenTags[tag]:
  148. r.Error = fmt.Sprintf("Duplicate outbound tag in batch: %s", tag)
  149. default:
  150. seenTags[tag] = true
  151. httpItems = append(httpItems, &httpBatchItem{index: i, tag: tag, outbound: ob, result: r})
  152. }
  153. }
  154. if len(tcpLane) > 0 {
  155. var wg sync.WaitGroup
  156. sem := make(chan struct{}, tcpBatchConcurrency)
  157. for _, e := range tcpLane {
  158. wg.Add(1)
  159. go func(e tcpEntry) {
  160. defer wg.Done()
  161. sem <- struct{}{}
  162. defer func() { <-sem }()
  163. obJSON, err := json.Marshal(e.ob)
  164. if err != nil {
  165. tag, _ := e.ob["tag"].(string)
  166. results[e.idx] = &TestOutboundResult{Tag: tag, Mode: "tcp", Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}
  167. return
  168. }
  169. r, _ := s.testOutboundTCP(string(obJSON))
  170. results[e.idx] = r
  171. }(e)
  172. }
  173. wg.Wait()
  174. }
  175. if len(httpItems) == 0 {
  176. return results
  177. }
  178. failAll := func(msg string) {
  179. for _, it := range httpItems {
  180. it.result.Success = false
  181. it.result.Error = msg
  182. }
  183. }
  184. var allOutbounds []any
  185. if allOutboundsJSON != "" {
  186. if err := json.Unmarshal([]byte(allOutboundsJSON), &allOutbounds); err != nil {
  187. failAll(fmt.Sprintf("Invalid allOutbounds JSON: %v", err))
  188. return results
  189. }
  190. }
  191. if testURL == "" {
  192. testURL = defaultTestURL
  193. }
  194. if !httpTestSemaphore.TryLock() {
  195. failAll("Another outbound test is already running, please wait")
  196. return results
  197. }
  198. defer httpTestSemaphore.Unlock()
  199. retryPerItem, err := runHTTPProbeBatch(httpItems, allOutbounds, testURL)
  200. if err == nil {
  201. return results
  202. }
  203. if !retryPerItem || len(httpItems) == 1 {
  204. failAll(err.Error())
  205. return results
  206. }
  207. // The shared process never came up — one structurally-bad outbound can
  208. // poison the whole batch config. Retry each item in its own isolated
  209. // instance so the broken outbound reports xray's real error and the
  210. // rest still get tested. Serial: the poisoned case fails fast (~1s).
  211. for _, it := range httpItems {
  212. if _, ferr := runHTTPProbeBatch([]*httpBatchItem{it}, allOutbounds, testURL); ferr != nil {
  213. it.result.Success = false
  214. it.result.Error = ferr.Error()
  215. }
  216. }
  217. return results
  218. }
  219. // runHTTPProbeBatch makes one shared-process attempt for the given items,
  220. // writing per-request outcomes into the items' results. It returns a non-nil
  221. // error only when the process never became usable; retryPerItem reports
  222. // whether splitting the batch into per-item instances could help (true for
  223. // start failures / early exits that a poisoned config would explain, false
  224. // for environmental failures like a missing binary or no free ports).
  225. func runHTTPProbeBatch(items []*httpBatchItem, allOutbounds []any, testURL string) (retryPerItem bool, err error) {
  226. ports, release, err := reserveLoopbackPorts(len(items))
  227. if err != nil {
  228. return false, fmt.Errorf("Failed to reserve test ports: %v", err)
  229. }
  230. defer release()
  231. cfg := buildBatchTestConfig(items, allOutbounds, ports)
  232. configPath, err := createTestConfigPath()
  233. if err != nil {
  234. return false, fmt.Errorf("Failed to create test config path: %v", err)
  235. }
  236. defer os.Remove(configPath)
  237. proc := newBatchProcess(cfg, configPath)
  238. defer func() {
  239. if proc.IsRunning() {
  240. proc.Stop()
  241. }
  242. }()
  243. // Free the reserved ports just before xray binds them; the window is
  244. // milliseconds, and a lost race makes xray exit fast, which surfaces
  245. // below and triggers the per-item retry with fresh ports.
  246. release()
  247. if err := proc.Start(); err != nil {
  248. if errors.Is(err, fs.ErrNotExist) {
  249. // Binary missing — per-item retries would all fail the same way.
  250. return false, fmt.Errorf("Failed to start test xray instance: %v", err)
  251. }
  252. return true, fmt.Errorf("Failed to start test xray instance: %v", err)
  253. }
  254. if err := waitForPortsReady(proc, ports, batchPortsReadyTimeout); err != nil {
  255. return err.exited, err
  256. }
  257. sem := make(chan struct{}, httpProbeConcurrency)
  258. var wg sync.WaitGroup
  259. for i := range items {
  260. wg.Add(1)
  261. go func(it *httpBatchItem, port int) {
  262. defer wg.Done()
  263. sem <- struct{}{}
  264. defer func() { <-sem }()
  265. probeThroughSocks(port, testURL, httpProbeTimeout, it.result)
  266. }(items[i], ports[i])
  267. }
  268. wg.Wait()
  269. if !proc.IsRunning() {
  270. detail := proc.GetResult()
  271. for _, it := range items {
  272. if !it.result.Success {
  273. it.result.Error = "Xray process exited: " + detail
  274. }
  275. }
  276. }
  277. return false, nil
  278. }
  279. // portsReadyError distinguishes "process died" (a poisoned config — worth a
  280. // per-item retry) from "ports never opened while alive" (environmental).
  281. type portsReadyError struct {
  282. msg string
  283. exited bool
  284. }
  285. func (e *portsReadyError) Error() string { return e.msg }
  286. // waitForPortsReady polls until every test inbound accepts connections,
  287. // aborting as soon as the process exits.
  288. func waitForPortsReady(proc batchProcess, ports []int, timeout time.Duration) *portsReadyError {
  289. deadline := time.Now().Add(timeout)
  290. for _, port := range ports {
  291. for {
  292. if !proc.IsRunning() {
  293. return &portsReadyError{msg: "Xray process exited: " + proc.GetResult(), exited: true}
  294. }
  295. conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 100*time.Millisecond)
  296. if err == nil {
  297. conn.Close()
  298. break
  299. }
  300. if time.Now().After(deadline) {
  301. return &portsReadyError{msg: fmt.Sprintf("Xray failed to open test inbounds: port %d not ready after %v", port, timeout)}
  302. }
  303. time.Sleep(50 * time.Millisecond)
  304. }
  305. }
  306. return nil
  307. }
  308. // buildBatchTestConfig assembles the temp instance config: one loopback SOCKS
  309. // inbound per tested outbound, a routing rule binding each inbound to its
  310. // outbound tag, and the full outbound context so dialerProxy chains resolve.
  311. func buildBatchTestConfig(items []*httpBatchItem, allOutbounds []any, ports []int) *xray.Config {
  312. // allOutbounds is the template's outbound list; subscription outbounds
  313. // are injected at runtime and aren't part of it, so append any tested
  314. // outbound whose tag is missing. When a tested outbound's tag collides
  315. // with a template outbound, the template version wins — same semantics
  316. // as the pre-batch tester.
  317. outbounds := make([]any, 0, len(allOutbounds)+len(items))
  318. outbounds = append(outbounds, allOutbounds...)
  319. for _, it := range items {
  320. if !outboundsContainTag(outbounds, it.tag) {
  321. outbounds = append(outbounds, it.outbound)
  322. }
  323. }
  324. for _, ob := range outbounds {
  325. outbound, ok := ob.(map[string]any)
  326. if !ok {
  327. continue
  328. }
  329. // The temp instance must not touch kernel WireGuard devices.
  330. if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
  331. if settings, ok := outbound["settings"].(map[string]any); ok {
  332. settings["noKernelTun"] = true
  333. } else {
  334. outbound["settings"] = map[string]any{"noKernelTun": true}
  335. }
  336. }
  337. }
  338. outboundsJSON, _ := json.Marshal(outbounds)
  339. inbounds := make([]xray.InboundConfig, len(items))
  340. rules := make([]any, len(items))
  341. for i, it := range items {
  342. inTag := fmt.Sprintf("test-in-%d", i)
  343. inbounds[i] = xray.InboundConfig{
  344. Listen: json_util.RawMessage(`"127.0.0.1"`),
  345. Port: ports[i],
  346. Protocol: "socks",
  347. Settings: json_util.RawMessage(`{"auth":"noauth","udp":false}`),
  348. Tag: inTag,
  349. }
  350. rules[i] = map[string]any{
  351. "type": "field",
  352. "inboundTag": []string{inTag},
  353. "outboundTag": it.tag,
  354. }
  355. }
  356. routingJSON, _ := json.Marshal(map[string]any{
  357. "domainStrategy": "AsIs",
  358. "rules": rules,
  359. })
  360. logJSON, _ := json.Marshal(map[string]any{
  361. "loglevel": "warning",
  362. "access": "none",
  363. "error": "",
  364. "dnsLog": false,
  365. })
  366. return &xray.Config{
  367. LogConfig: json_util.RawMessage(logJSON),
  368. InboundConfigs: inbounds,
  369. OutboundConfigs: json_util.RawMessage(outboundsJSON),
  370. RouterConfig: json_util.RawMessage(routingJSON),
  371. Policy: json_util.RawMessage(`{}`),
  372. Stats: json_util.RawMessage(`{}`),
  373. }
  374. }
  375. // outboundsContainTag reports whether any outbound in the slice has the given tag.
  376. func outboundsContainTag(outbounds []any, tag string) bool {
  377. for _, ob := range outbounds {
  378. if m, ok := ob.(map[string]any); ok {
  379. if t, _ := m["tag"].(string); t == tag {
  380. return true
  381. }
  382. }
  383. }
  384. return false
  385. }
  386. // probeThroughSocks issues one timed GET through the local SOCKS inbound at
  387. // the given port and fills result. Any HTTP response — including 4xx/5xx and
  388. // unfollowed redirects — counts as reachable; only transport-level failures
  389. // (refused, reset, timeout, proxy errors) are failures. Delay is request
  390. // start → response headers; the test URL's hostname is resolved by xray
  391. // (Go's SOCKS5 client sends the domain to the proxy), so DNS goes through
  392. // the outbound too.
  393. func probeThroughSocks(port int, testURL string, timeout time.Duration, result *TestOutboundResult) {
  394. proxyURL := &url.URL{Scheme: "socks5", Host: net.JoinHostPort("127.0.0.1", strconv.Itoa(port))}
  395. tr := &http.Transport{
  396. Proxy: http.ProxyURL(proxyURL),
  397. DisableKeepAlives: true,
  398. }
  399. defer tr.CloseIdleConnections()
  400. client := &http.Client{
  401. Transport: tr,
  402. Timeout: timeout,
  403. // A redirect would re-dial through the proxy and skew the timing;
  404. // the 3xx itself already proves the outbound works.
  405. CheckRedirect: func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse },
  406. }
  407. // Timing breakdown. ConnectStart/Done wrap the TCP dial to the local
  408. // inbound (the SOCKS handshake isn't traced, and xray ACKs CONNECT
  409. // before dialing upstream — so the real outbound establishment lands in
  410. // the TLS phase for https URLs, or inside TTFB for plain http).
  411. var (
  412. connStart, tlsStart time.Time
  413. connDur, tlsDur, ttfbDur time.Duration
  414. connDone, tlsDone, gotFirstRB bool
  415. )
  416. start := time.Now()
  417. trace := &httptrace.ClientTrace{
  418. ConnectStart: func(network, addr string) {
  419. if connStart.IsZero() {
  420. connStart = time.Now()
  421. }
  422. },
  423. ConnectDone: func(network, addr string, err error) {
  424. if err == nil && !connDone && !connStart.IsZero() {
  425. connDone = true
  426. connDur = time.Since(connStart)
  427. }
  428. },
  429. TLSHandshakeStart: func() {
  430. if tlsStart.IsZero() {
  431. tlsStart = time.Now()
  432. }
  433. },
  434. TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
  435. if err == nil && !tlsDone && !tlsStart.IsZero() {
  436. tlsDone = true
  437. tlsDur = time.Since(tlsStart)
  438. }
  439. },
  440. GotFirstResponseByte: func() {
  441. if !gotFirstRB {
  442. gotFirstRB = true
  443. ttfbDur = time.Since(start)
  444. }
  445. },
  446. }
  447. req, err := http.NewRequestWithContext(httptrace.WithClientTrace(context.Background(), trace), http.MethodGet, testURL, nil)
  448. if err != nil {
  449. result.Error = err.Error()
  450. return
  451. }
  452. resp, err := client.Do(req)
  453. delay := time.Since(start).Milliseconds()
  454. if err != nil {
  455. result.Error = err.Error()
  456. return
  457. }
  458. resp.Body.Close()
  459. result.Success = true
  460. result.Delay = max(delay, 1)
  461. result.HTTPStatus = resp.StatusCode
  462. if connDone {
  463. result.ConnectMs = max(connDur.Milliseconds(), 1)
  464. }
  465. if tlsDone {
  466. result.TLSMs = max(tlsDur.Milliseconds(), 1)
  467. }
  468. if gotFirstRB {
  469. result.TTFBMs = max(ttfbDur.Milliseconds(), 1)
  470. }
  471. }
  472. // reserveLoopbackPorts grabs n free loopback ports and keeps the listeners
  473. // open so nothing else claims them; release() frees them (idempotent — the
  474. // caller releases right before starting xray and again via defer).
  475. func reserveLoopbackPorts(n int) ([]int, func(), error) {
  476. listeners := make([]net.Listener, 0, n)
  477. release := func() {
  478. for _, l := range listeners {
  479. l.Close()
  480. }
  481. }
  482. ports := make([]int, 0, n)
  483. for range n {
  484. l, err := net.Listen("tcp", "127.0.0.1:0")
  485. if err != nil {
  486. release()
  487. return nil, nil, err
  488. }
  489. listeners = append(listeners, l)
  490. ports = append(ports, l.Addr().(*net.TCPAddr).Port)
  491. }
  492. return ports, release, nil
  493. }
  494. // createTestConfigPath returns a unique path for a temporary xray config file in the bin folder.
  495. // The temp file is created and closed so the path is reserved; Start() will overwrite it.
  496. func createTestConfigPath() (string, error) {
  497. tmpFile, err := os.CreateTemp(config.GetBinFolderPath(), "xray_test_*.json")
  498. if err != nil {
  499. return "", err
  500. }
  501. path := tmpFile.Name()
  502. if err := tmpFile.Close(); err != nil {
  503. os.Remove(path)
  504. return "", err
  505. }
  506. return path, nil
  507. }