remote.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. package runtime
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  17. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
  19. "github.com/mhsanaei/3x-ui/v3/internal/util/wirecodec"
  20. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  21. )
  22. const remoteHTTPTimeout = 10 * time.Second
  23. // zstdMinBodyBytes is the smallest body worth compressing; below it the framing
  24. // overhead can outweigh the savings.
  25. const zstdMinBodyBytes = 1024
  26. // maxRemoteResponseBytes caps a single node RPC's response body. It bounds the
  27. // wire/decompressed size of one response — the real guard against a broken or
  28. // hostile node streaming an unbounded body. It is NOT a process-wide memory
  29. // bound: concurrent RPCs and the decoded JSON can each exceed it, so
  30. // endpoint-specific caps and a concurrency budget remain follow-ups. Node
  31. // responses (traffic snapshots, client-IP lists, inbound options) are JSON and
  32. // stay well under it.
  33. const maxRemoteResponseBytes = 64 << 20 // 64 MiB
  34. // errBodyDiagBytes bounds how much of a non-OK error body we read for a
  35. // diagnostic snippet (and to let small-error connections be reused) without
  36. // buffering a potentially huge or hostile error payload.
  37. const errBodyDiagBytes = 8 << 10 // 8 KiB
  38. // errRemoteResponseTooLarge is returned when a node response exceeds the cap.
  39. var errRemoteResponseTooLarge = errors.New("remote response exceeds size limit")
  40. // readCappedBody reads all of r but rejects bodies larger than limit, returning
  41. // errRemoteResponseTooLarge. It reads at most limit+1 bytes so a body of exactly
  42. // limit is accepted and the first oversize byte is detected without buffering
  43. // more.
  44. func readCappedBody(r io.Reader, limit int64) ([]byte, error) {
  45. raw, err := io.ReadAll(io.LimitReader(r, limit+1))
  46. if err != nil {
  47. return nil, err
  48. }
  49. if int64(len(raw)) > limit {
  50. return nil, errRemoteResponseTooLarge
  51. }
  52. return raw, nil
  53. }
  54. type envelope struct {
  55. Success bool `json:"success"`
  56. Msg string `json:"msg"`
  57. Obj json.RawMessage `json:"obj"`
  58. }
  59. type Remote struct {
  60. node *model.Node
  61. mu sync.RWMutex
  62. remoteIDByTag map[string]int
  63. // supportsZstd is learned from the node's X-3x-Node-Caps response header; once
  64. // seen, config pushes to this node are zstd-compressed. Old nodes never set
  65. // it, so they keep receiving plain bodies (mixed-version safe).
  66. supportsZstd bool
  67. // Per-node client honoring the TLS verify mode, built once and reused; a
  68. // node config change drops the cached Remote so the next one rebuilds it.
  69. clientOnce sync.Once
  70. client *http.Client
  71. clientErr error
  72. egressResolver NodeEgressResolver
  73. }
  74. type RemoteInboundOption struct {
  75. Tag string `json:"tag"`
  76. Remark string `json:"remark"`
  77. Protocol model.Protocol `json:"protocol"`
  78. Port int `json:"port"`
  79. }
  80. func NewRemote(n *model.Node, r NodeEgressResolver) *Remote {
  81. return &Remote{
  82. node: n,
  83. remoteIDByTag: make(map[string]int),
  84. egressResolver: r,
  85. }
  86. }
  87. func (r *Remote) Name() string { return "node:" + r.node.Name }
  88. func (r *Remote) nodeSupportsZstd() bool {
  89. r.mu.RLock()
  90. defer r.mu.RUnlock()
  91. return r.supportsZstd
  92. }
  93. // recordCaps learns the node's capabilities from a response header so later
  94. // pushes can use the negotiated envelope.
  95. func (r *Remote) recordCaps(h http.Header) {
  96. if !strings.Contains(h.Get(wirecodec.CapsHeader), wirecodec.CapZstd) {
  97. return
  98. }
  99. r.mu.Lock()
  100. r.supportsZstd = true
  101. r.mu.Unlock()
  102. }
  103. // httpClient lazily builds and caches the per-node client honoring the TLS
  104. // verify mode, so Remote ops don't fall back to system CA on skip/pin (#5264).
  105. func (r *Remote) httpClient() (*http.Client, error) {
  106. r.clientOnce.Do(func() {
  107. proxyURL := ""
  108. if r.node.OutboundTag != "" && r.egressResolver != nil {
  109. proxyURL = r.egressResolver.NodeEgressProxyURL(r.node.Id)
  110. }
  111. r.client, r.clientErr = HTTPClientForNode(r.node, proxyURL)
  112. })
  113. return r.client, r.clientErr
  114. }
  115. func (r *Remote) baseURL() (string, error) {
  116. addr, err := netsafe.NormalizeHost(r.node.Address)
  117. if err != nil {
  118. return "", err
  119. }
  120. scheme := r.node.Scheme
  121. if scheme != "http" && scheme != "https" {
  122. scheme = "https"
  123. }
  124. if r.node.Port <= 0 || r.node.Port > 65535 {
  125. return "", fmt.Errorf("invalid node port %d", r.node.Port)
  126. }
  127. bp := r.node.BasePath
  128. if !strings.HasSuffix(bp, "/") {
  129. bp += "/"
  130. }
  131. u := &url.URL{
  132. Scheme: scheme,
  133. Host: net.JoinHostPort(addr, strconv.Itoa(r.node.Port)),
  134. Path: bp,
  135. }
  136. return u.String(), nil
  137. }
  138. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) {
  139. // mtls nodes authenticate via the client certificate, so a bearer token is
  140. // optional for them; every other mode still requires one.
  141. if r.node.ApiToken == "" && r.node.TlsVerifyMode != "mtls" {
  142. return nil, errors.New("node has no API token configured")
  143. }
  144. base, err := r.baseURL()
  145. if err != nil {
  146. return nil, err
  147. }
  148. target := base + strings.TrimPrefix(path, "/")
  149. var (
  150. bodyBytes []byte
  151. contentType string
  152. )
  153. switch b := body.(type) {
  154. case nil:
  155. case url.Values:
  156. bodyBytes = []byte(b.Encode())
  157. contentType = "application/x-www-form-urlencoded"
  158. default:
  159. buf, jerr := json.Marshal(b)
  160. if jerr != nil {
  161. return nil, fmt.Errorf("marshal body: %w", jerr)
  162. }
  163. bodyBytes = buf
  164. contentType = "application/json"
  165. }
  166. // Attach the integrity hash of the uncompressed body unconditionally (a new
  167. // node verifies it, an old one ignores it), and zstd-compress only when the
  168. // node advertised support and the body is worth it.
  169. var (
  170. reqBody io.Reader
  171. hashHex string
  172. zstdEncoded bool
  173. )
  174. if bodyBytes != nil {
  175. hashHex = wirecodec.Sha256Hex(bodyBytes)
  176. if len(bodyBytes) >= zstdMinBodyBytes && r.nodeSupportsZstd() {
  177. bodyBytes = wirecodec.Compress(bodyBytes)
  178. zstdEncoded = true
  179. }
  180. reqBody = bytes.NewReader(bodyBytes)
  181. }
  182. cctx, cancel := context.WithTimeout(netsafe.ContextWithAllowPrivate(ctx, r.node.AllowPrivateAddress), remoteHTTPTimeout)
  183. defer cancel()
  184. req, err := http.NewRequestWithContext(cctx, method, target, reqBody)
  185. if err != nil {
  186. return nil, err
  187. }
  188. if r.node.ApiToken != "" {
  189. req.Header.Set("Authorization", "Bearer "+r.node.ApiToken)
  190. }
  191. req.Header.Set("Accept", "application/json")
  192. if contentType != "" {
  193. req.Header.Set("Content-Type", contentType)
  194. }
  195. if hashHex != "" {
  196. req.Header.Set(wirecodec.HashHeader, hashHex)
  197. }
  198. if zstdEncoded {
  199. req.Header.Set("Content-Encoding", wirecodec.EncodingZstd)
  200. }
  201. client, err := r.httpClient()
  202. if err != nil {
  203. return nil, err
  204. }
  205. resp, err := client.Do(req)
  206. if err != nil {
  207. return nil, fmt.Errorf("%s %s: %w", method, path, err)
  208. }
  209. defer resp.Body.Close()
  210. r.recordCaps(resp.Header)
  211. // Validate status before reading a success payload: a non-OK response's
  212. // body is never used beyond a short diagnostic, so don't let a node force us
  213. // to buffer a large body just to return an HTTP error.
  214. if resp.StatusCode != http.StatusOK {
  215. snippet, _ := io.ReadAll(io.LimitReader(resp.Body, errBodyDiagBytes))
  216. if msg := bytes.TrimSpace(snippet); len(msg) > 0 {
  217. // %q quotes/escapes the untrusted node body so control characters or
  218. // newlines in it can't garble or inject into the error/log output.
  219. return nil, fmt.Errorf("%s %s: HTTP %d: %q", method, path, resp.StatusCode, msg)
  220. }
  221. return nil, fmt.Errorf("%s %s: HTTP %d", method, path, resp.StatusCode)
  222. }
  223. // Fast-fail on an honestly-declared oversize body; the LimitReader below is
  224. // the real guard since Content-Length is untrusted, may be absent, or is -1
  225. // under transparent decompression.
  226. if resp.ContentLength > maxRemoteResponseBytes {
  227. return nil, fmt.Errorf("%s %s: %w (content-length %d, cap %d)", method, path, errRemoteResponseTooLarge, resp.ContentLength, maxRemoteResponseBytes)
  228. }
  229. raw, err := readCappedBody(resp.Body, maxRemoteResponseBytes)
  230. if err != nil {
  231. if errors.Is(err, errRemoteResponseTooLarge) {
  232. return nil, fmt.Errorf("%s %s: %w (cap %d bytes)", method, path, err, maxRemoteResponseBytes)
  233. }
  234. return nil, fmt.Errorf("read body: %w", err)
  235. }
  236. var env envelope
  237. if err := json.Unmarshal(raw, &env); err != nil {
  238. return nil, fmt.Errorf("decode envelope: %w", err)
  239. }
  240. if !env.Success {
  241. return &env, fmt.Errorf("remote: %s", env.Msg)
  242. }
  243. return &env, nil
  244. }
  245. func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) {
  246. if id, ok := r.cacheGetTag(tag); ok {
  247. return id, nil
  248. }
  249. if err := r.refreshRemoteIDs(ctx); err != nil {
  250. return 0, err
  251. }
  252. if id, ok := r.cacheGetTag(tag); ok {
  253. return id, nil
  254. }
  255. return 0, fmt.Errorf("remote inbound with tag %q not found on node %s", tag, r.node.Name)
  256. }
  257. // nodeInboundTagPrefix is the central-panel alias for an inbound on nodeID.
  258. // Kept in sync with service.nodeTagPrefix (port_conflict.go); duplicated here
  259. // so runtime does not import service.
  260. func nodeInboundTagPrefix(nodeID int) string {
  261. return fmt.Sprintf("n%d-", nodeID)
  262. }
  263. // stripNodeInboundTagPrefix removes the central-only n<id>- prefix before
  264. // pushing an inbound to the node so Xray keeps its original tag and routing.
  265. func stripNodeInboundTagPrefix(nodeID int, tag string) string {
  266. if stripped, ok := strings.CutPrefix(tag, nodeInboundTagPrefix(nodeID)); ok {
  267. return stripped
  268. }
  269. return tag
  270. }
  271. // cacheGetTag looks up a remote inbound id by tag, tolerating an n<id>- prefix
  272. // that lives on only one of the two panels: the node may carry the bare tag
  273. // while the central panel stores the prefixed form, or vice versa.
  274. func (r *Remote) cacheGetTag(tag string) (int, bool) {
  275. if id, ok := r.cacheGet(tag); ok {
  276. return id, true
  277. }
  278. prefix := nodeInboundTagPrefix(r.node.Id)
  279. if stripped, found := strings.CutPrefix(tag, prefix); found {
  280. return r.cacheGet(stripped)
  281. }
  282. return r.cacheGet(prefix + tag)
  283. }
  284. func (r *Remote) cacheGet(tag string) (int, bool) {
  285. r.mu.RLock()
  286. defer r.mu.RUnlock()
  287. id, ok := r.remoteIDByTag[tag]
  288. return id, ok
  289. }
  290. func (r *Remote) cacheSet(tag string, id int) {
  291. r.mu.Lock()
  292. defer r.mu.Unlock()
  293. r.remoteIDByTag[tag] = id
  294. }
  295. func (r *Remote) cacheDel(tag string) {
  296. r.mu.Lock()
  297. defer r.mu.Unlock()
  298. delete(r.remoteIDByTag, tag)
  299. }
  300. func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) {
  301. if err := r.refreshRemoteIDs(ctx); err != nil {
  302. return nil, err
  303. }
  304. r.mu.RLock()
  305. defer r.mu.RUnlock()
  306. tags := make([]string, 0, len(r.remoteIDByTag))
  307. for tag := range r.remoteIDByTag {
  308. tags = append(tags, tag)
  309. }
  310. return tags, nil
  311. }
  312. func (r *Remote) ListInboundOptions(ctx context.Context) ([]RemoteInboundOption, error) {
  313. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  314. if err != nil {
  315. return nil, err
  316. }
  317. var list []RemoteInboundOption
  318. if err := json.Unmarshal(env.Obj, &list); err != nil {
  319. return nil, fmt.Errorf("decode inbound list: %w", err)
  320. }
  321. return list, nil
  322. }
  323. func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
  324. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  325. if err != nil {
  326. return err
  327. }
  328. var list []struct {
  329. Id int `json:"id"`
  330. Tag string `json:"tag"`
  331. }
  332. if err := json.Unmarshal(env.Obj, &list); err != nil {
  333. return fmt.Errorf("decode inbound list: %w", err)
  334. }
  335. next := make(map[string]int, len(list))
  336. for _, ib := range list {
  337. if ib.Tag == "" {
  338. continue
  339. }
  340. next[ib.Tag] = ib.Id
  341. }
  342. r.mu.Lock()
  343. r.remoteIDByTag = next
  344. r.mu.Unlock()
  345. return nil
  346. }
  347. func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error {
  348. payload := wireInbound(ib, r.node.Id)
  349. env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload)
  350. if err != nil {
  351. return err
  352. }
  353. var created struct {
  354. Id int `json:"id"`
  355. Tag string `json:"tag"`
  356. }
  357. if len(env.Obj) > 0 {
  358. if err := json.Unmarshal(env.Obj, &created); err == nil && created.Id > 0 && created.Tag != "" {
  359. r.cacheSet(created.Tag, created.Id)
  360. }
  361. }
  362. return nil
  363. }
  364. func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error {
  365. id, err := r.resolveRemoteID(ctx, ib.Tag)
  366. if err != nil {
  367. logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name)
  368. return nil
  369. }
  370. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil {
  371. return err
  372. }
  373. r.cacheDel(ib.Tag)
  374. return nil
  375. }
  376. func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
  377. id, err := r.resolveRemoteID(ctx, oldIb.Tag)
  378. if err != nil {
  379. return r.AddInbound(ctx, newIb)
  380. }
  381. payload := wireInbound(newIb, r.node.Id)
  382. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil {
  383. return err
  384. }
  385. if oldIb.Tag != newIb.Tag {
  386. r.cacheDel(oldIb.Tag)
  387. }
  388. r.cacheSet(newIb.Tag, id)
  389. return nil
  390. }
  391. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
  392. return r.UpdateInbound(ctx, ib, ib)
  393. }
  394. func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) error {
  395. return r.UpdateInbound(ctx, ib, ib)
  396. }
  397. func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error {
  398. id, err := r.resolveRemoteID(ctx, ib.Tag)
  399. if err != nil {
  400. return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err)
  401. }
  402. payload := map[string]any{
  403. "client": client,
  404. "inboundIds": []int{id},
  405. }
  406. if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil {
  407. return err
  408. }
  409. return nil
  410. }
  411. func (r *Remote) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error {
  412. if email == "" {
  413. return nil
  414. }
  415. id, err := r.resolveRemoteID(ctx, ib.Tag)
  416. if err != nil {
  417. // Can't confirm the delete reached the node — surface it so the caller
  418. // marks the node dirty and a reconcile converges, instead of silently
  419. // dropping the delete and letting the next snapshot resurrect the client.
  420. return fmt.Errorf("remote DeleteUser: resolve tag %q: %w", ib.Tag, err)
  421. }
  422. body := map[string]any{"inboundIds": []int{id}}
  423. _, err = r.do(ctx, http.MethodPost,
  424. "panel/api/clients/"+url.PathEscape(email)+"/detach", body)
  425. if err == nil {
  426. return nil
  427. }
  428. if strings.Contains(strings.ToLower(err.Error()), "not found") {
  429. return nil
  430. }
  431. return err
  432. }
  433. func (r *Remote) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error {
  434. if oldEmail == "" {
  435. oldEmail = payload.Email
  436. }
  437. id, err := r.resolveRemoteID(ctx, ib.Tag)
  438. if err != nil {
  439. return err
  440. }
  441. path := "panel/api/clients/update/" + url.PathEscape(oldEmail) +
  442. "?inboundIds=" + strconv.Itoa(id)
  443. if _, err := r.do(ctx, http.MethodPost, path, payload); err != nil {
  444. return err
  445. }
  446. return nil
  447. }
  448. func (r *Remote) RestartXray(ctx context.Context) error {
  449. _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil)
  450. return err
  451. }
  452. // UpdatePanel asks the node to run its own official self-updater (update.sh)
  453. // and restart onto the latest release. The node returns as soon as the job is
  454. // launched; the new version surfaces on the next heartbeat.
  455. func (r *Remote) UpdatePanel(ctx context.Context) error {
  456. _, err := r.do(ctx, http.MethodPost, "panel/api/server/updatePanel", nil)
  457. return err
  458. }
  459. // WebCertFiles holds a node's own web TLS certificate and key file paths.
  460. type WebCertFiles struct {
  461. WebCertFile string `json:"webCertFile"`
  462. WebKeyFile string `json:"webKeyFile"`
  463. }
  464. // GetWebCertFiles fetches the node's own web TLS certificate/key file paths so
  465. // the central panel can offer them as the "Set Cert from Panel" default for a
  466. // node-assigned inbound — those paths exist on the node, the central panel's
  467. // don't. See issue #4854.
  468. func (r *Remote) GetWebCertFiles(ctx context.Context) (*WebCertFiles, error) {
  469. env, err := r.do(ctx, http.MethodGet, "panel/api/server/getWebCertFiles", nil)
  470. if err != nil {
  471. return nil, err
  472. }
  473. var files WebCertFiles
  474. if err := json.Unmarshal(env.Obj, &files); err != nil {
  475. return nil, fmt.Errorf("decode web cert files: %w", err)
  476. }
  477. return &files, nil
  478. }
  479. // GetDescendants fetches the node's read-only summaries of the nodes IT
  480. // manages, so this panel can surface them as transitive sub-nodes in a chained
  481. // topology (#4983). Best-effort: an old-build node without the endpoint returns
  482. // an error the caller ignores.
  483. func (r *Remote) GetDescendants(ctx context.Context) ([]model.NodeSummary, error) {
  484. env, err := r.do(ctx, http.MethodGet, "panel/api/server/descendants", nil)
  485. if err != nil {
  486. return nil, err
  487. }
  488. var out []model.NodeSummary
  489. if len(env.Obj) > 0 {
  490. if err := json.Unmarshal(env.Obj, &out); err != nil {
  491. return nil, fmt.Errorf("decode descendants: %w", err)
  492. }
  493. }
  494. return out, nil
  495. }
  496. func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error {
  497. _, err := r.do(ctx, http.MethodPost,
  498. "panel/api/clients/resetTraffic/"+url.PathEscape(email), nil)
  499. return err
  500. }
  501. func (r *Remote) ResetAllTraffics(ctx context.Context) error {
  502. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/resetAllTraffics", nil)
  503. return err
  504. }
  505. func (r *Remote) ResetInboundTraffic(ctx context.Context, ib *model.Inbound) error {
  506. _, err := r.do(ctx, http.MethodPost, fmt.Sprintf("panel/api/inbounds/%d/resetTraffic", ib.Id), nil)
  507. return err
  508. }
  509. type TrafficSnapshot struct {
  510. Inbounds []*model.Inbound
  511. OnlineEmails []string
  512. // OnlineTree is the node's GUID-keyed online subtree (its own clients under
  513. // its panelGuid plus every descendant under theirs). Preferred over the flat
  514. // OnlineEmails so the master can attribute deeply nested clients to the real
  515. // node across a chain (#4983). Empty when the node is an old build without
  516. // the per-GUID endpoint — OnlineEmails is the fallback then.
  517. OnlineTree map[string][]string
  518. LastOnlineMap map[string]int64
  519. }
  520. func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) {
  521. snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}}
  522. envList, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  523. if err != nil {
  524. return nil, err
  525. }
  526. if err := json.Unmarshal(envList.Obj, &snap.Inbounds); err != nil {
  527. return nil, fmt.Errorf("decode inbound list: %w", err)
  528. }
  529. // Prefer the GUID-keyed subtree; fall back to the flat list only when the
  530. // node is an old build without the per-GUID endpoint (#4983).
  531. envTree, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlinesByGuid", nil)
  532. if err == nil && len(envTree.Obj) > 0 {
  533. _ = json.Unmarshal(envTree.Obj, &snap.OnlineTree)
  534. }
  535. if len(snap.OnlineTree) == 0 {
  536. envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil)
  537. if err != nil {
  538. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
  539. } else if len(envOnlines.Obj) > 0 {
  540. _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
  541. }
  542. }
  543. envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil)
  544. if err != nil {
  545. logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
  546. } else if len(envLastOnline.Obj) > 0 {
  547. _ = json.Unmarshal(envLastOnline.Obj, &snap.LastOnlineMap)
  548. }
  549. return snap, nil
  550. }
  551. // PushGlobalClientTraffics sends this panel's aggregated per-client usage to
  552. // the node, tagged with this panel's GUID so the node keeps one row per
  553. // pushing master. Display/enforcement input on the node only — the node never
  554. // folds these into the counters it reports back, so this panel's (and any
  555. // other master's) delta accounting over the node snapshot stays intact.
  556. func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error {
  557. payload := map[string]any{
  558. "masterGuid": masterGuid,
  559. "traffics": traffics,
  560. }
  561. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload)
  562. return err
  563. }
  564. func wireInbound(ib *model.Inbound, remoteNodeID int) url.Values {
  565. v := url.Values{}
  566. v.Set("total", strconv.FormatInt(ib.Total, 10))
  567. v.Set("remark", ib.Remark)
  568. v.Set("subSortIndex", strconv.Itoa(ib.SubSortIndex))
  569. v.Set("enable", strconv.FormatBool(ib.Enable))
  570. v.Set("expiryTime", strconv.FormatInt(ib.ExpiryTime, 10))
  571. v.Set("listen", ib.Listen)
  572. v.Set("port", strconv.Itoa(ib.Port))
  573. v.Set("protocol", string(ib.Protocol))
  574. v.Set("settings", ib.Settings)
  575. v.Set("streamSettings", sanitizeStreamSettingsForRemote(ib.StreamSettings))
  576. tag := ib.Tag
  577. if remoteNodeID > 0 {
  578. tag = stripNodeInboundTagPrefix(remoteNodeID, tag)
  579. }
  580. v.Set("tag", tag)
  581. v.Set("sniffing", ib.Sniffing)
  582. shareAddrStrategy := strings.TrimSpace(ib.ShareAddrStrategy)
  583. switch shareAddrStrategy {
  584. case "listen", "custom":
  585. default:
  586. shareAddrStrategy = "node"
  587. }
  588. v.Set("shareAddrStrategy", shareAddrStrategy)
  589. v.Set("shareAddr", ib.ShareAddr)
  590. if ib.TrafficReset != "" {
  591. v.Set("trafficReset", ib.TrafficReset)
  592. }
  593. return v
  594. }
  595. // sanitizeStreamSettingsForRemote strips file-based TLS certificate paths
  596. // from the StreamSettings before sending to a remote node, but ONLY when
  597. // inline certificate content (certificate / key) is also present in the same
  598. // entry. In that case the file paths are redundant and stripping them avoids
  599. // confusion when the central panel's local paths don't exist on the remote.
  600. //
  601. // When a certificate entry contains ONLY file paths (no inline content) the
  602. // paths are left untouched: the user explicitly entered paths that exist on
  603. // the remote node's filesystem, and removing them would leave Xray with TLS
  604. // configured but no certificate, causing Xray to crash on the remote node.
  605. func sanitizeStreamSettingsForRemote(streamSettings string) string {
  606. if streamSettings == "" {
  607. return streamSettings
  608. }
  609. var stream map[string]any
  610. if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
  611. return streamSettings
  612. }
  613. tlsSettings, ok := stream["tlsSettings"].(map[string]any)
  614. if !ok {
  615. return streamSettings
  616. }
  617. certificates, ok := tlsSettings["certificates"].([]any)
  618. if !ok {
  619. return streamSettings
  620. }
  621. changed := false
  622. for _, cert := range certificates {
  623. c, ok := cert.(map[string]any)
  624. if !ok {
  625. continue
  626. }
  627. // Only strip file paths when inline content is present so that the
  628. // remote Xray still has a valid certificate to use.
  629. hasCertFile := c["certificateFile"] != nil && c["certificateFile"] != ""
  630. hasKeyFile := c["keyFile"] != nil && c["keyFile"] != ""
  631. hasCertInline := isNonEmptySlice(c["certificate"])
  632. hasKeyInline := isNonEmptySlice(c["key"])
  633. if hasCertFile && hasCertInline {
  634. delete(c, "certificateFile")
  635. changed = true
  636. }
  637. if hasKeyFile && hasKeyInline {
  638. delete(c, "keyFile")
  639. changed = true
  640. }
  641. }
  642. if !changed {
  643. return streamSettings
  644. }
  645. out, err := json.Marshal(stream)
  646. if err != nil {
  647. return streamSettings
  648. }
  649. return string(out)
  650. }
  651. // isNonEmptySlice reports whether v is a non-nil, non-empty JSON array value.
  652. func isNonEmptySlice(v any) bool {
  653. s, ok := v.([]any)
  654. return ok && len(s) > 0
  655. }
  656. func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) {
  657. env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil)
  658. if err != nil {
  659. return nil, err
  660. }
  661. var ips []model.InboundClientIps
  662. if len(env.Obj) > 0 {
  663. if err := json.Unmarshal(env.Obj, &ips); err != nil {
  664. return nil, fmt.Errorf("decode client ips: %w", err)
  665. }
  666. }
  667. return ips, nil
  668. }
  669. func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error {
  670. _, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
  671. return err
  672. }
  673. // FetchClientIpsByGuid pulls the node's per-node IP attribution subtree
  674. // (guid -> email -> observed IPs). Unlike FetchAllClientIps (the flat union the
  675. // master also pushes back), this preserves which physical node each IP is on.
  676. // Returns an empty map for older nodes that lack the endpoint.
  677. func (r *Remote) FetchClientIpsByGuid(ctx context.Context) (map[string]map[string][]model.ClientIpEntry, error) {
  678. env, err := r.do(ctx, http.MethodPost, "panel/api/clients/clientIpsByGuid", nil)
  679. if err != nil {
  680. return nil, err
  681. }
  682. out := map[string]map[string][]model.ClientIpEntry{}
  683. if len(env.Obj) > 0 {
  684. if err := json.Unmarshal(env.Obj, &out); err != nil {
  685. return nil, fmt.Errorf("decode client ips by guid: %w", err)
  686. }
  687. }
  688. return out, nil
  689. }