| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 |
- package service
- import (
- "context"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/http/httptrace"
- "net/url"
- "os"
- "strconv"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/config"
- "github.com/mhsanaei/3x-ui/v3/database"
- "github.com/mhsanaei/3x-ui/v3/database/model"
- "github.com/mhsanaei/3x-ui/v3/logger"
- "github.com/mhsanaei/3x-ui/v3/util/json_util"
- "github.com/mhsanaei/3x-ui/v3/xray"
- "gorm.io/gorm"
- )
- // OutboundService provides business logic for managing Xray outbound configurations.
- // It handles outbound traffic monitoring and statistics.
- type OutboundService struct{}
- // httpTestSemaphore serialises HTTP-mode probes (each one spawns a temp xray
- // instance, which is too expensive to run in parallel). TCP-mode probes are
- // dial-only and don't need the semaphore.
- var httpTestSemaphore sync.Mutex
- func (s *OutboundService) AddTraffic(traffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool) {
- var err error
- db := database.GetDB()
- tx := db.Begin()
- defer func() {
- if err != nil {
- tx.Rollback()
- } else {
- tx.Commit()
- }
- }()
- err = s.addOutboundTraffic(tx, traffics)
- if err != nil {
- return err, false
- }
- return nil, false
- }
- func (s *OutboundService) addOutboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
- if len(traffics) == 0 {
- return nil
- }
- var err error
- for _, traffic := range traffics {
- if traffic.IsOutbound {
- var outbound model.OutboundTraffics
- err = tx.Model(&model.OutboundTraffics{}).Where("tag = ?", traffic.Tag).
- FirstOrCreate(&outbound).Error
- if err != nil {
- return err
- }
- outbound.Tag = traffic.Tag
- outbound.Up = outbound.Up + traffic.Up
- outbound.Down = outbound.Down + traffic.Down
- outbound.Total = outbound.Up + outbound.Down
- err = tx.Save(&outbound).Error
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (s *OutboundService) GetOutboundsTraffic() ([]*model.OutboundTraffics, error) {
- db := database.GetDB()
- var traffics []*model.OutboundTraffics
- err := db.Model(model.OutboundTraffics{}).Find(&traffics).Error
- if err != nil {
- logger.Warning("Error retrieving OutboundTraffics: ", err)
- return nil, err
- }
- return traffics, nil
- }
- func (s *OutboundService) ResetOutboundTraffic(tag string) error {
- db := database.GetDB()
- whereText := "tag "
- if tag == "-alltags-" {
- whereText += " <> ?"
- } else {
- whereText += " = ?"
- }
- result := db.Model(model.OutboundTraffics{}).
- Where(whereText, tag).
- Updates(map[string]any{"up": 0, "down": 0, "total": 0})
- err := result.Error
- if err != nil {
- return err
- }
- return nil
- }
- // TestOutboundResult represents the result of testing an outbound.
- // Delay/timing fields are in milliseconds. Endpoints is only populated for
- // TCP-mode probes; the HTTP-mode timing breakdown lives in DNSMs/ConnectMs/
- // TLSMs/TTFBMs (any of these can be 0 if the underlying step was skipped —
- // e.g. a non-TLS target leaves TLSMs at 0).
- type TestOutboundResult struct {
- Success bool `json:"success"`
- Delay int64 `json:"delay"`
- Error string `json:"error,omitempty"`
- StatusCode int `json:"statusCode,omitempty"`
- Mode string `json:"mode,omitempty"`
- DNSMs int64 `json:"dnsMs,omitempty"`
- ConnectMs int64 `json:"connectMs,omitempty"`
- TLSMs int64 `json:"tlsMs,omitempty"`
- TTFBMs int64 `json:"ttfbMs,omitempty"`
- Endpoints []TestEndpointResult `json:"endpoints,omitempty"`
- }
- // TestEndpointResult is one entry in a TCP-mode probe — the per-endpoint
- // dial outcome for outbounds that expose multiple servers/peers.
- type TestEndpointResult struct {
- Address string `json:"address"`
- Success bool `json:"success"`
- Delay int64 `json:"delay"`
- Error string `json:"error,omitempty"`
- }
- // TestOutbound dispatches to the chosen probe mode:
- // - mode="tcp": dial the outbound's host:port directly. No xray spin-up,
- // parallel-safe, ~100ms per endpoint. Doesn't validate the proxy
- // protocol — only that the remote is reachable on TCP.
- // - mode="" or "http": spin a temp xray instance, route a real HTTP
- // request through it, return delay + a DNS/Connect/TLS/TTFB breakdown.
- // Authoritative but expensive and serialised by httpTestSemaphore.
- //
- // allOutboundsJSON is only consulted in HTTP mode (it backs
- // sockopt.dialerProxy chains during test).
- func (s *OutboundService) TestOutbound(outboundJSON string, testURL string, allOutboundsJSON string, mode string) (*TestOutboundResult, error) {
- if mode == "tcp" {
- return s.testOutboundTCP(outboundJSON)
- }
- return s.testOutboundHTTP(outboundJSON, testURL, allOutboundsJSON)
- }
- func (s *OutboundService) testOutboundTCP(outboundJSON string) (*TestOutboundResult, error) {
- var ob map[string]any
- if err := json.Unmarshal([]byte(outboundJSON), &ob); err != nil {
- return &TestOutboundResult{Mode: "tcp", Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}, nil
- }
- tag, _ := ob["tag"].(string)
- protocol, _ := ob["protocol"].(string)
- if protocol == "blackhole" || protocol == "freedom" || tag == "blocked" {
- return &TestOutboundResult{Mode: "tcp", Success: false, Error: "Outbound has no testable endpoint"}, nil
- }
- endpoints := extractOutboundEndpoints(ob)
- if len(endpoints) == 0 {
- return &TestOutboundResult{Mode: "tcp", Success: false, Error: "No testable endpoint"}, nil
- }
- results := make([]TestEndpointResult, len(endpoints))
- var wg sync.WaitGroup
- for i := range endpoints {
- wg.Add(1)
- go func(i int) {
- defer wg.Done()
- results[i] = probeTCPEndpoint(endpoints[i], 5*time.Second)
- }(i)
- }
- wg.Wait()
- var bestDelay int64 = -1
- var firstErr string
- for _, r := range results {
- if r.Success {
- if bestDelay < 0 || r.Delay < bestDelay {
- bestDelay = r.Delay
- }
- } else if firstErr == "" {
- firstErr = r.Error
- }
- }
- out := &TestOutboundResult{Mode: "tcp", Endpoints: results}
- if bestDelay >= 0 {
- out.Success = true
- out.Delay = bestDelay
- } else {
- out.Error = firstErr
- if out.Error == "" {
- out.Error = "All endpoints unreachable"
- }
- }
- return out, nil
- }
- func probeTCPEndpoint(endpoint string, timeout time.Duration) TestEndpointResult {
- r := TestEndpointResult{Address: endpoint}
- start := time.Now()
- conn, err := net.DialTimeout("tcp", endpoint, timeout)
- r.Delay = time.Since(start).Milliseconds()
- if err != nil {
- r.Error = err.Error()
- return r
- }
- conn.Close()
- r.Success = true
- return r
- }
- func extractOutboundEndpoints(ob map[string]any) []string {
- protocol, _ := ob["protocol"].(string)
- settings, _ := ob["settings"].(map[string]any)
- if settings == nil {
- return nil
- }
- var out []string
- addServer := func(addr any, port any) {
- host, _ := addr.(string)
- p := numAsInt(port)
- if host != "" && p > 0 {
- out = append(out, fmt.Sprintf("%s:%d", host, p))
- }
- }
- switch protocol {
- case "vmess":
- if vnext, ok := settings["vnext"].([]any); ok {
- for _, v := range vnext {
- if vm, ok := v.(map[string]any); ok {
- addServer(vm["address"], vm["port"])
- }
- }
- }
- case "vless":
- addServer(settings["address"], settings["port"])
- case "trojan", "shadowsocks", "http", "socks":
- if servers, ok := settings["servers"].([]any); ok {
- for _, sv := range servers {
- if sm, ok := sv.(map[string]any); ok {
- addServer(sm["address"], sm["port"])
- }
- }
- }
- case "wireguard":
- if peers, ok := settings["peers"].([]any); ok {
- for _, p := range peers {
- if pm, ok := p.(map[string]any); ok {
- if ep, _ := pm["endpoint"].(string); ep != "" {
- out = append(out, ep)
- }
- }
- }
- }
- }
- return out
- }
- func numAsInt(v any) int {
- switch n := v.(type) {
- case float64:
- return int(n)
- case int:
- return n
- case int64:
- return int(n)
- case string:
- if i, err := strconv.Atoi(n); err == nil {
- return i
- }
- }
- return 0
- }
- func (s *OutboundService) testOutboundHTTP(outboundJSON string, testURL string, allOutboundsJSON string) (*TestOutboundResult, error) {
- if testURL == "" {
- testURL = "https://www.google.com/generate_204"
- }
- if !httpTestSemaphore.TryLock() {
- return &TestOutboundResult{
- Mode: "http",
- Success: false,
- Error: "Another outbound test is already running, please wait",
- }, nil
- }
- defer httpTestSemaphore.Unlock()
- var testOutbound map[string]any
- if err := json.Unmarshal([]byte(outboundJSON), &testOutbound); err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Invalid outbound JSON: %v", err)}, nil
- }
- outboundTag, _ := testOutbound["tag"].(string)
- if outboundTag == "" {
- return &TestOutboundResult{Mode: "http", Success: false, Error: "Outbound has no tag"}, nil
- }
- if protocol, _ := testOutbound["protocol"].(string); protocol == "blackhole" || outboundTag == "blocked" {
- return &TestOutboundResult{Mode: "http", Success: false, Error: "Blocked/blackhole outbound cannot be tested"}, nil
- }
- var allOutbounds []any
- if allOutboundsJSON != "" {
- if err := json.Unmarshal([]byte(allOutboundsJSON), &allOutbounds); err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Invalid allOutbounds JSON: %v", err)}, nil
- }
- }
- if len(allOutbounds) == 0 {
- allOutbounds = []any{testOutbound}
- }
- testPort, err := findAvailablePort()
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Failed to find available port: %v", err)}, nil
- }
- testConfig := s.createTestConfig(outboundTag, allOutbounds, testPort)
- testConfigPath, err := createTestConfigPath()
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Failed to create test config path: %v", err)}, nil
- }
- defer os.Remove(testConfigPath)
- testProcess := xray.NewTestProcess(testConfig, testConfigPath)
- defer func() {
- if testProcess.IsRunning() {
- testProcess.Stop()
- }
- }()
- if err := testProcess.Start(); err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Failed to start test xray instance: %v", err)}, nil
- }
- if err := waitForPort(testPort, 3*time.Second); err != nil {
- if !testProcess.IsRunning() {
- result := testProcess.GetResult()
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray process exited: %s", result)}, nil
- }
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray failed to start listening: %v", err)}, nil
- }
- if !testProcess.IsRunning() {
- result := testProcess.GetResult()
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Xray process exited: %s", result)}, nil
- }
- return s.testConnection(testPort, testURL)
- }
- // createTestConfig creates a test config by copying all outbounds unchanged and adding
- // only the test inbound (SOCKS) and a route rule that sends traffic to the given outbound tag.
- func (s *OutboundService) createTestConfig(outboundTag string, allOutbounds []any, testPort int) *xray.Config {
- // Test inbound (SOCKS proxy) - only addition to inbounds
- testInbound := xray.InboundConfig{
- Tag: "test-inbound",
- Listen: json_util.RawMessage(`"127.0.0.1"`),
- Port: testPort,
- Protocol: "socks",
- Settings: json_util.RawMessage(`{"auth":"noauth","udp":true}`),
- }
- // Outbounds: copy all, but set noKernelTun=true for WireGuard outbounds
- processedOutbounds := make([]any, len(allOutbounds))
- for i, ob := range allOutbounds {
- outbound, ok := ob.(map[string]any)
- if !ok {
- processedOutbounds[i] = ob
- continue
- }
- if protocol, ok := outbound["protocol"].(string); ok && protocol == "wireguard" {
- // Set noKernelTun to true for WireGuard outbounds
- if settings, ok := outbound["settings"].(map[string]any); ok {
- settings["noKernelTun"] = true
- } else {
- // Create settings if it doesn't exist
- outbound["settings"] = map[string]any{
- "noKernelTun": true,
- }
- }
- }
- processedOutbounds[i] = outbound
- }
- outboundsJSON, _ := json.Marshal(processedOutbounds)
- // Create routing rule to route all traffic through test outbound
- routingRules := []map[string]any{
- {
- "type": "field",
- "outboundTag": outboundTag,
- "network": "tcp,udp",
- },
- }
- routingJSON, _ := json.Marshal(map[string]any{
- "domainStrategy": "AsIs",
- "rules": routingRules,
- })
- // Disable logging for test process to avoid creating orphaned log files
- logConfig := map[string]any{
- "loglevel": "warning",
- "access": "none",
- "error": "none",
- "dnsLog": false,
- }
- logJSON, _ := json.Marshal(logConfig)
- // Create minimal config
- cfg := &xray.Config{
- LogConfig: json_util.RawMessage(logJSON),
- InboundConfigs: []xray.InboundConfig{
- testInbound,
- },
- OutboundConfigs: json_util.RawMessage(string(outboundsJSON)),
- RouterConfig: json_util.RawMessage(string(routingJSON)),
- Policy: json_util.RawMessage(`{}`),
- Stats: json_util.RawMessage(`{}`),
- }
- return cfg
- }
- // testConnection runs the actual HTTP probe through the local SOCKS proxy.
- // A warmup request seeds xray's DNS cache / handshake; then a fresh
- // transport runs the measured request so httptrace sees a real cold
- // connection and reports DNS/Connect/TLS/TTFB. Note that DNS and Connect
- // reflect *client → SOCKS-on-loopback*, not the remote target — those
- // happen inside xray and aren't visible to net/http. TLS and TTFB are
- // the meaningful breakdown values for a SOCKS-proxied HTTPS probe.
- func (s *OutboundService) testConnection(proxyPort int, testURL string) (*TestOutboundResult, error) {
- proxyURLStr := fmt.Sprintf("socks5://127.0.0.1:%d", proxyPort)
- proxyURLParsed, err := url.Parse(proxyURLStr)
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Invalid proxy URL: %v", err)}, nil
- }
- mkClient := func() *http.Client {
- return &http.Client{
- Timeout: 10 * time.Second,
- Transport: &http.Transport{
- Proxy: http.ProxyURL(proxyURLParsed),
- DialContext: (&net.Dialer{
- Timeout: 5 * time.Second,
- KeepAlive: 30 * time.Second,
- }).DialContext,
- MaxIdleConns: 1,
- IdleConnTimeout: 1 * time.Second,
- DisableCompression: true,
- },
- }
- }
- warmup := mkClient()
- warmupResp, err := warmup.Get(testURL)
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request failed: %v", err)}, nil
- }
- io.Copy(io.Discard, warmupResp.Body)
- warmupResp.Body.Close()
- warmup.CloseIdleConnections()
- var dnsStart, dnsDone, connectStart, connectDone, tlsStart, tlsDone, firstByte time.Time
- trace := &httptrace.ClientTrace{
- DNSStart: func(_ httptrace.DNSStartInfo) { dnsStart = time.Now() },
- DNSDone: func(_ httptrace.DNSDoneInfo) { dnsDone = time.Now() },
- ConnectStart: func(_, _ string) { connectStart = time.Now() },
- ConnectDone: func(_, _ string, _ error) { connectDone = time.Now() },
- TLSHandshakeStart: func() { tlsStart = time.Now() },
- TLSHandshakeDone: func(_ tls.ConnectionState, _ error) { tlsDone = time.Now() },
- GotFirstResponseByte: func() { firstByte = time.Now() },
- }
- client := mkClient()
- defer client.CloseIdleConnections()
- ctx := httptrace.WithClientTrace(context.Background(), trace)
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, testURL, nil)
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request build failed: %v", err)}, nil
- }
- startTime := time.Now()
- resp, err := client.Do(req)
- delay := time.Since(startTime).Milliseconds()
- if err != nil {
- return &TestOutboundResult{Mode: "http", Success: false, Error: fmt.Sprintf("Request failed: %v", err)}, nil
- }
- io.Copy(io.Discard, resp.Body)
- resp.Body.Close()
- out := &TestOutboundResult{
- Mode: "http",
- Success: true,
- Delay: delay,
- StatusCode: resp.StatusCode,
- }
- if !dnsStart.IsZero() && !dnsDone.IsZero() {
- out.DNSMs = dnsDone.Sub(dnsStart).Milliseconds()
- }
- if !connectStart.IsZero() && !connectDone.IsZero() {
- out.ConnectMs = connectDone.Sub(connectStart).Milliseconds()
- }
- if !tlsStart.IsZero() && !tlsDone.IsZero() {
- out.TLSMs = tlsDone.Sub(tlsStart).Milliseconds()
- }
- if !firstByte.IsZero() {
- out.TTFBMs = firstByte.Sub(startTime).Milliseconds()
- }
- return out, nil
- }
- // waitForPort polls until the given TCP port is accepting connections or the timeout expires.
- func waitForPort(port int, timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for time.Now().Before(deadline) {
- conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 100*time.Millisecond)
- if err == nil {
- conn.Close()
- return nil
- }
- time.Sleep(50 * time.Millisecond)
- }
- return fmt.Errorf("port %d not ready after %v", port, timeout)
- }
- // findAvailablePort finds an available port for testing
- func findAvailablePort() (int, error) {
- listener, err := net.Listen("tcp", ":0")
- if err != nil {
- return 0, err
- }
- defer listener.Close()
- addr := listener.Addr().(*net.TCPAddr)
- return addr.Port, nil
- }
- // createTestConfigPath returns a unique path for a temporary xray config file in the bin folder.
- // The temp file is created and closed so the path is reserved; Start() will overwrite it.
- func createTestConfigPath() (string, error) {
- tmpFile, err := os.CreateTemp(config.GetBinFolderPath(), "xray_test_*.json")
- if err != nil {
- return "", err
- }
- path := tmpFile.Name()
- if err := tmpFile.Close(); err != nil {
- os.Remove(path)
- return "", err
- }
- return path, nil
- }
|