probe_http.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. package outbound
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/fs"
  9. "net"
  10. "net/http"
  11. "net/http/httptrace"
  12. "net/url"
  13. "os"
  14. "strconv"
  15. "sync"
  16. "time"
  17. "github.com/mhsanaei/3x-ui/v3/internal/config"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/json_util"
  19. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  20. )
  21. // HTTP-mode probing works by spinning up ONE temporary xray instance per
  22. // batch: every outbound under test gets its own loopback SOCKS inbound plus
  23. // an inboundTag→outboundTag routing rule, and the panel then issues a real,
  24. // individually-timed HTTP request through each inbound. Measuring the request
  25. // client-side (instead of polling xray's observatory) returns the moment the
  26. // response lands, yields the actual HTTP status, and allows an httptrace
  27. // timing breakdown — while the shared process keeps "Test All" at one xray
  28. // spawn per batch instead of one per outbound.
  29. const (
  30. // httpProbeTimeout bounds one probe request end-to-end.
  31. httpProbeTimeout = 10 * time.Second
  32. // httpProbeConcurrency caps parallel probe requests within a batch —
  33. // enough to keep a batch fast, low enough not to spike CPU with TLS
  34. // handshakes on small VPSes.
  35. httpProbeConcurrency = 16
  36. // batchPortsReadyTimeout bounds the wait for the temp instance to open
  37. // its test inbounds.
  38. batchPortsReadyTimeout = 10 * time.Second
  39. // maxBatchItems caps one batch request; the frontend chunks below this.
  40. maxBatchItems = 50
  41. // tcpBatchConcurrency caps parallel TCP-mode items in a batch (each item
  42. // already dials its endpoints concurrently).
  43. tcpBatchConcurrency = 8
  44. defaultTestURL = "https://www.google.com/generate_204"
  45. )
  46. // httpTestSemaphore serialises HTTP-mode batches (each spawns a temp xray
  47. // instance, which is too expensive to run in parallel). TCP-mode probes are
  48. // dial-only and don't need the semaphore.
  49. var httpTestSemaphore sync.Mutex
  50. // batchProcess is the slice of xray.Process the batch engine needs; a seam
  51. // so unit tests can stub the process without an xray binary.
  52. type batchProcess interface {
  53. Start() error
  54. Stop() error
  55. IsRunning() bool
  56. GetResult() string
  57. }
  58. var newBatchProcess = func(cfg *xray.Config, configPath string) batchProcess {
  59. return xray.NewTestProcess(cfg, configPath)
  60. }
  61. // httpBatchItem is one outbound inside an HTTP-mode batch. result is the
  62. // pre-allocated entry in the caller's result slice, filled in place.
  63. type httpBatchItem struct {
  64. index int
  65. tag string
  66. outbound map[string]any
  67. result *TestOutboundResult
  68. }
  69. // TestOutbound probes a single outbound; legacy single-test API kept for the
  70. // /testOutbound endpoint. Dispatch matches TestOutbounds: mode "tcp" dials
  71. // the outbound's endpoints directly, anything else routes a real HTTP request
  72. // through a temp xray instance (UDP-transport outbounds are always forced to
  73. // the HTTP probe — a raw dial can't measure them).
  74. func (s *OutboundService) TestOutbound(outboundJSON string, testURL string, allOutboundsJSON string, mode string) (*TestOutboundResult, error) {
  75. var ob map[string]any
  76. if err := json.Unmarshal([]byte(outboundJSON), &ob); err != nil {
  77. m := "http"
  78. if mode == "tcp" {
  79. m = "tcp"
  80. }
  81. return &TestOutboundResult{Mode: m, Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}, nil
  82. }
  83. results := s.testOutboundsParsed([]map[string]any{ob}, testURL, allOutboundsJSON, mode)
  84. return results[0], nil
  85. }
  86. // TestOutbounds probes a JSON array of outbounds and returns one result per
  87. // input, in input order, each carrying the outbound's tag. allOutboundsJSON
  88. // supplies the config context (sockopt.dialerProxy chains); testURL falls
  89. // back to the default probe URL when empty.
  90. func (s *OutboundService) TestOutbounds(outboundsJSON string, testURL string, allOutboundsJSON string, mode string) ([]*TestOutboundResult, error) {
  91. var raw []json.RawMessage
  92. if err := json.Unmarshal([]byte(outboundsJSON), &raw); err != nil {
  93. return nil, fmt.Errorf("invalid outbounds JSON: %v", err)
  94. }
  95. if len(raw) > maxBatchItems {
  96. return nil, fmt.Errorf("too many outbounds in one request (max %d)", maxBatchItems)
  97. }
  98. items := make([]map[string]any, len(raw))
  99. for i, r := range raw {
  100. var ob map[string]any
  101. if err := json.Unmarshal(r, &ob); err == nil {
  102. items[i] = ob
  103. }
  104. }
  105. return s.testOutboundsParsed(items, testURL, allOutboundsJSON, mode), nil
  106. }
  107. // testOutboundsParsed splits items into the TCP lane (direct dials, bounded
  108. // worker pool) and the HTTP lane (one shared temp xray instance), runs both,
  109. // and returns results aligned with items. A nil item marks unparseable input.
  110. func (s *OutboundService) testOutboundsParsed(items []map[string]any, testURL string, allOutboundsJSON string, mode string) []*TestOutboundResult {
  111. results := make([]*TestOutboundResult, len(items))
  112. modeLabel := "http"
  113. if mode == "tcp" {
  114. modeLabel = "tcp"
  115. }
  116. type tcpEntry struct {
  117. idx int
  118. ob map[string]any
  119. }
  120. var tcpLane []tcpEntry
  121. var httpItems []*httpBatchItem
  122. seenTags := make(map[string]bool)
  123. for i, ob := range items {
  124. if ob == nil {
  125. results[i] = &TestOutboundResult{Mode: modeLabel, Success: false, Error: "Invalid outbound JSON"}
  126. continue
  127. }
  128. // A bare TCP dial only proves reachability for TCP-based proxies.
  129. // UDP protocols (wireguard, hysteria, kcp/quic transports) ignore
  130. // unauthenticated packets, so a raw dial can't tell "reachable" from
  131. // "dead" — route them through the real xray probe.
  132. if mode == "tcp" && !outboundTransportIsUDP(ob) {
  133. tcpLane = append(tcpLane, tcpEntry{idx: i, ob: ob})
  134. continue
  135. }
  136. tag, _ := ob["tag"].(string)
  137. r := &TestOutboundResult{Tag: tag, Mode: "http"}
  138. results[i] = r
  139. protocol, _ := ob["protocol"].(string)
  140. switch {
  141. case tag == "":
  142. r.Error = "Outbound has no tag"
  143. case protocol == "blackhole" || tag == "blocked":
  144. r.Error = "Blocked/blackhole outbound cannot be tested"
  145. case protocol == "loopback":
  146. r.Error = "Loopback outbound cannot be tested"
  147. case protocol == "freedom" || protocol == "dns":
  148. // Direct/DNS outbounds aren't proxies — an HTTP probe through them
  149. // would only measure the host's own reachability, not a tunnel.
  150. r.Error = "Direct/DNS outbound cannot be tested"
  151. case seenTags[tag]:
  152. r.Error = fmt.Sprintf("Duplicate outbound tag in batch: %s", tag)
  153. default:
  154. seenTags[tag] = true
  155. httpItems = append(httpItems, &httpBatchItem{index: i, tag: tag, outbound: ob, result: r})
  156. }
  157. }
  158. if len(tcpLane) > 0 {
  159. var wg sync.WaitGroup
  160. sem := make(chan struct{}, tcpBatchConcurrency)
  161. for _, e := range tcpLane {
  162. wg.Add(1)
  163. go func(e tcpEntry) {
  164. defer wg.Done()
  165. sem <- struct{}{}
  166. defer func() { <-sem }()
  167. obJSON, err := json.Marshal(e.ob)
  168. if err != nil {
  169. tag, _ := e.ob["tag"].(string)
  170. results[e.idx] = &TestOutboundResult{Tag: tag, Mode: "tcp", Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}
  171. return
  172. }
  173. r, _ := s.testOutboundTCP(string(obJSON))
  174. results[e.idx] = r
  175. }(e)
  176. }
  177. wg.Wait()
  178. }
  179. if len(httpItems) == 0 {
  180. return results
  181. }
  182. failAll := func(msg string) {
  183. for _, it := range httpItems {
  184. it.result.Success = false
  185. it.result.Error = msg
  186. }
  187. }
  188. var allOutbounds []any
  189. if allOutboundsJSON != "" {
  190. if err := json.Unmarshal([]byte(allOutboundsJSON), &allOutbounds); err != nil {
  191. failAll(fmt.Sprintf("Invalid allOutbounds JSON: %v", err))
  192. return results
  193. }
  194. }
  195. if testURL == "" {
  196. testURL = defaultTestURL
  197. }
  198. if !httpTestSemaphore.TryLock() {
  199. failAll("Another outbound test is already running, please wait")
  200. return results
  201. }
  202. defer httpTestSemaphore.Unlock()
  203. retryPerItem, err := runHTTPProbeBatch(httpItems, allOutbounds, testURL)
  204. if err == nil {
  205. return results
  206. }
  207. if !retryPerItem || len(httpItems) == 1 {
  208. failAll(err.Error())
  209. return results
  210. }
  211. // The shared process never came up — one structurally-bad outbound can
  212. // poison the whole batch config. Retry each item in its own isolated
  213. // instance so the broken outbound reports xray's real error and the
  214. // rest still get tested. Serial: the poisoned case fails fast (~1s).
  215. for _, it := range httpItems {
  216. if _, ferr := runHTTPProbeBatch([]*httpBatchItem{it}, allOutbounds, testURL); ferr != nil {
  217. it.result.Success = false
  218. it.result.Error = ferr.Error()
  219. }
  220. }
  221. return results
  222. }
  223. // runHTTPProbeBatch makes one shared-process attempt for the given items,
  224. // writing per-request outcomes into the items' results. It returns a non-nil
  225. // error only when the process never became usable; retryPerItem reports
  226. // whether splitting the batch into per-item instances could help (true for
  227. // start failures / early exits that a poisoned config would explain, false
  228. // for environmental failures like a missing binary or no free ports).
  229. func runHTTPProbeBatch(items []*httpBatchItem, allOutbounds []any, testURL string) (retryPerItem bool, err error) {
  230. ports, release, err := reserveLoopbackPorts(len(items))
  231. if err != nil {
  232. return false, fmt.Errorf("Failed to reserve test ports: %v", err)
  233. }
  234. defer release()
  235. cfg := buildBatchTestConfig(items, allOutbounds, ports)
  236. configPath, err := createTestConfigPath()
  237. if err != nil {
  238. return false, fmt.Errorf("Failed to create test config path: %v", err)
  239. }
  240. defer os.Remove(configPath)
  241. proc := newBatchProcess(cfg, configPath)
  242. defer func() {
  243. if proc.IsRunning() {
  244. proc.Stop()
  245. }
  246. }()
  247. // Free the reserved ports just before xray binds them; the window is
  248. // milliseconds, and a lost race makes xray exit fast, which surfaces
  249. // below and triggers the per-item retry with fresh ports.
  250. release()
  251. if err := proc.Start(); err != nil {
  252. if errors.Is(err, fs.ErrNotExist) {
  253. // Binary missing — per-item retries would all fail the same way.
  254. return false, fmt.Errorf("Failed to start test xray instance: %v", err)
  255. }
  256. return true, fmt.Errorf("Failed to start test xray instance: %v", err)
  257. }
  258. if err := waitForPortsReady(proc, ports, batchPortsReadyTimeout); err != nil {
  259. return err.exited, err
  260. }
  261. sem := make(chan struct{}, httpProbeConcurrency)
  262. var wg sync.WaitGroup
  263. for i := range items {
  264. wg.Add(1)
  265. go func(it *httpBatchItem, port int) {
  266. defer wg.Done()
  267. sem <- struct{}{}
  268. defer func() { <-sem }()
  269. probeThroughSocks(port, testURL, httpProbeTimeout, it.result)
  270. }(items[i], ports[i])
  271. }
  272. wg.Wait()
  273. if !proc.IsRunning() {
  274. detail := proc.GetResult()
  275. for _, it := range items {
  276. if !it.result.Success {
  277. it.result.Error = "Xray process exited: " + detail
  278. }
  279. }
  280. }
  281. return false, nil
  282. }
  283. // portsReadyError distinguishes "process died" (a poisoned config — worth a
  284. // per-item retry) from "ports never opened while alive" (environmental).
  285. type portsReadyError struct {
  286. msg string
  287. exited bool
  288. }
  289. func (e *portsReadyError) Error() string { return e.msg }
  290. // waitForPortsReady polls until every test inbound accepts connections,
  291. // aborting as soon as the process exits.
  292. func waitForPortsReady(proc batchProcess, ports []int, timeout time.Duration) *portsReadyError {
  293. deadline := time.Now().Add(timeout)
  294. for _, port := range ports {
  295. for {
  296. if !proc.IsRunning() {
  297. return &portsReadyError{msg: "Xray process exited: " + proc.GetResult(), exited: true}
  298. }
  299. conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 100*time.Millisecond)
  300. if err == nil {
  301. conn.Close()
  302. break
  303. }
  304. if time.Now().After(deadline) {
  305. return &portsReadyError{msg: fmt.Sprintf("Xray failed to open test inbounds: port %d not ready after %v", port, timeout)}
  306. }
  307. time.Sleep(50 * time.Millisecond)
  308. }
  309. }
  310. return nil
  311. }
  312. // buildBatchTestConfig assembles the temp instance config: one loopback SOCKS
  313. // inbound per tested outbound, a routing rule binding each inbound to its
  314. // outbound tag, and the full outbound context so dialerProxy chains resolve.
  315. func buildBatchTestConfig(items []*httpBatchItem, allOutbounds []any, ports []int) *xray.Config {
  316. // allOutbounds is the template's outbound list; subscription outbounds
  317. // are injected at runtime and aren't part of it, so append any tested
  318. // outbound whose tag is missing. When a tested outbound's tag collides
  319. // with a template outbound, the template version wins — same semantics
  320. // as the pre-batch tester.
  321. outbounds := make([]any, 0, len(allOutbounds)+len(items))
  322. outbounds = append(outbounds, allOutbounds...)
  323. for _, it := range items {
  324. if !outboundsContainTag(outbounds, it.tag) {
  325. outbounds = append(outbounds, it.outbound)
  326. }
  327. }
  328. for _, ob := range outbounds {
  329. outbound, ok := ob.(map[string]any)
  330. if !ok {
  331. continue
  332. }
  333. // The temp instance must not touch kernel WireGuard devices.
  334. if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
  335. if settings, ok := outbound["settings"].(map[string]any); ok {
  336. settings["noKernelTun"] = true
  337. } else {
  338. outbound["settings"] = map[string]any{"noKernelTun": true}
  339. }
  340. }
  341. }
  342. outboundsJSON, _ := json.Marshal(outbounds)
  343. inbounds := make([]xray.InboundConfig, len(items))
  344. rules := make([]any, len(items))
  345. for i, it := range items {
  346. inTag := fmt.Sprintf("test-in-%d", i)
  347. inbounds[i] = xray.InboundConfig{
  348. Listen: json_util.RawMessage(`"127.0.0.1"`),
  349. Port: ports[i],
  350. Protocol: "socks",
  351. Settings: json_util.RawMessage(`{"auth":"noauth","udp":false}`),
  352. Tag: inTag,
  353. }
  354. rules[i] = map[string]any{
  355. "type": "field",
  356. "inboundTag": []string{inTag},
  357. "outboundTag": it.tag,
  358. }
  359. }
  360. routingJSON, _ := json.Marshal(map[string]any{
  361. "domainStrategy": "AsIs",
  362. "rules": rules,
  363. })
  364. logJSON, _ := json.Marshal(map[string]any{
  365. "loglevel": "warning",
  366. "access": "none",
  367. "error": "",
  368. "dnsLog": false,
  369. })
  370. return &xray.Config{
  371. LogConfig: json_util.RawMessage(logJSON),
  372. InboundConfigs: inbounds,
  373. OutboundConfigs: json_util.RawMessage(outboundsJSON),
  374. RouterConfig: json_util.RawMessage(routingJSON),
  375. Policy: json_util.RawMessage(`{}`),
  376. Stats: json_util.RawMessage(`{}`),
  377. }
  378. }
  379. // outboundsContainTag reports whether any outbound in the slice has the given tag.
  380. func outboundsContainTag(outbounds []any, tag string) bool {
  381. for _, ob := range outbounds {
  382. if m, ok := ob.(map[string]any); ok {
  383. if t, _ := m["tag"].(string); t == tag {
  384. return true
  385. }
  386. }
  387. }
  388. return false
  389. }
  390. // probeThroughSocks issues one timed GET through the local SOCKS inbound at
  391. // the given port and fills result. Any HTTP response — including 4xx/5xx and
  392. // unfollowed redirects — counts as reachable; only transport-level failures
  393. // (refused, reset, timeout, proxy errors) are failures. Delay is request
  394. // start → response headers; the test URL's hostname is resolved by xray
  395. // (Go's SOCKS5 client sends the domain to the proxy), so DNS goes through
  396. // the outbound too.
  397. func probeThroughSocks(port int, testURL string, timeout time.Duration, result *TestOutboundResult) {
  398. proxyURL := &url.URL{Scheme: "socks5", Host: net.JoinHostPort("127.0.0.1", strconv.Itoa(port))}
  399. tr := &http.Transport{
  400. Proxy: http.ProxyURL(proxyURL),
  401. DisableKeepAlives: true,
  402. }
  403. defer tr.CloseIdleConnections()
  404. client := &http.Client{
  405. Transport: tr,
  406. Timeout: timeout,
  407. // A redirect would re-dial through the proxy and skew the timing;
  408. // the 3xx itself already proves the outbound works.
  409. CheckRedirect: func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse },
  410. }
  411. // Timing breakdown. ConnectStart/Done wrap the TCP dial to the local
  412. // inbound (the SOCKS handshake isn't traced, and xray ACKs CONNECT
  413. // before dialing upstream — so the real outbound establishment lands in
  414. // the TLS phase for https URLs, or inside TTFB for plain http).
  415. var (
  416. connStart, tlsStart time.Time
  417. connDur, tlsDur, ttfbDur time.Duration
  418. connDone, tlsDone, gotFirstRB bool
  419. )
  420. start := time.Now()
  421. trace := &httptrace.ClientTrace{
  422. ConnectStart: func(network, addr string) {
  423. if connStart.IsZero() {
  424. connStart = time.Now()
  425. }
  426. },
  427. ConnectDone: func(network, addr string, err error) {
  428. if err == nil && !connDone && !connStart.IsZero() {
  429. connDone = true
  430. connDur = time.Since(connStart)
  431. }
  432. },
  433. TLSHandshakeStart: func() {
  434. if tlsStart.IsZero() {
  435. tlsStart = time.Now()
  436. }
  437. },
  438. TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
  439. if err == nil && !tlsDone && !tlsStart.IsZero() {
  440. tlsDone = true
  441. tlsDur = time.Since(tlsStart)
  442. }
  443. },
  444. GotFirstResponseByte: func() {
  445. if !gotFirstRB {
  446. gotFirstRB = true
  447. ttfbDur = time.Since(start)
  448. }
  449. },
  450. }
  451. req, err := http.NewRequestWithContext(httptrace.WithClientTrace(context.Background(), trace), http.MethodGet, testURL, nil)
  452. if err != nil {
  453. result.Error = err.Error()
  454. return
  455. }
  456. resp, err := client.Do(req)
  457. delay := time.Since(start).Milliseconds()
  458. if err != nil {
  459. result.Error = err.Error()
  460. return
  461. }
  462. resp.Body.Close()
  463. result.Success = true
  464. result.Delay = max(delay, 1)
  465. result.HTTPStatus = resp.StatusCode
  466. if connDone {
  467. result.ConnectMs = max(connDur.Milliseconds(), 1)
  468. }
  469. if tlsDone {
  470. result.TLSMs = max(tlsDur.Milliseconds(), 1)
  471. }
  472. if gotFirstRB {
  473. result.TTFBMs = max(ttfbDur.Milliseconds(), 1)
  474. }
  475. }
  476. // reserveLoopbackPorts grabs n free loopback ports and keeps the listeners
  477. // open so nothing else claims them; release() frees them (idempotent — the
  478. // caller releases right before starting xray and again via defer).
  479. func reserveLoopbackPorts(n int) ([]int, func(), error) {
  480. listeners := make([]net.Listener, 0, n)
  481. release := func() {
  482. for _, l := range listeners {
  483. l.Close()
  484. }
  485. }
  486. ports := make([]int, 0, n)
  487. for range n {
  488. l, err := net.Listen("tcp", "127.0.0.1:0")
  489. if err != nil {
  490. release()
  491. return nil, nil, err
  492. }
  493. listeners = append(listeners, l)
  494. ports = append(ports, l.Addr().(*net.TCPAddr).Port)
  495. }
  496. return ports, release, nil
  497. }
  498. // createTestConfigPath returns a unique path for a temporary xray config file in the bin folder.
  499. // The temp file is created and closed so the path is reserved; Start() will overwrite it.
  500. func createTestConfigPath() (string, error) {
  501. tmpFile, err := os.CreateTemp(config.GetBinFolderPath(), "xray_test_*.json")
  502. if err != nil {
  503. return "", err
  504. }
  505. path := tmpFile.Name()
  506. if err := tmpFile.Close(); err != nil {
  507. os.Remove(path)
  508. return "", err
  509. }
  510. return path, nil
  511. }