1
0

remote.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800
  1. package runtime
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  19. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  20. "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
  21. "github.com/mhsanaei/3x-ui/v3/internal/util/wirecodec"
  22. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  23. )
  24. const remoteHTTPTimeout = 10 * time.Second
  25. // zstdMinBodyBytes is the smallest body worth compressing; below it the framing
  26. // overhead can outweigh the savings.
  27. const zstdMinBodyBytes = 1024
  28. // maxRemoteResponseBytes caps a single node RPC's response body. It bounds the
  29. // wire/decompressed size of one response — the real guard against a broken or
  30. // hostile node streaming an unbounded body. It is NOT a process-wide memory
  31. // bound: concurrent RPCs and the decoded JSON can each exceed it, so
  32. // endpoint-specific caps and a concurrency budget remain follow-ups. Node
  33. // responses (traffic snapshots, client-IP lists, inbound options) are JSON and
  34. // stay well under it.
  35. const maxRemoteResponseBytes = 64 << 20 // 64 MiB
  36. // errBodyDiagBytes bounds how much of a non-OK error body we read for a
  37. // diagnostic snippet (and to let small-error connections be reused) without
  38. // buffering a potentially huge or hostile error payload.
  39. const errBodyDiagBytes = 8 << 10 // 8 KiB
  40. // errRemoteResponseTooLarge is returned when a node response exceeds the cap.
  41. var errRemoteResponseTooLarge = errors.New("remote response exceeds size limit")
  42. // readCappedBody reads all of r but rejects bodies larger than limit, returning
  43. // errRemoteResponseTooLarge. It reads at most limit+1 bytes so a body of exactly
  44. // limit is accepted and the first oversize byte is detected without buffering
  45. // more.
  46. func readCappedBody(r io.Reader, limit int64) ([]byte, error) {
  47. raw, err := io.ReadAll(io.LimitReader(r, limit+1))
  48. if err != nil {
  49. return nil, err
  50. }
  51. if int64(len(raw)) > limit {
  52. return nil, errRemoteResponseTooLarge
  53. }
  54. return raw, nil
  55. }
  56. type envelope struct {
  57. Success bool `json:"success"`
  58. Msg string `json:"msg"`
  59. Obj json.RawMessage `json:"obj"`
  60. }
  61. type Remote struct {
  62. node *model.Node
  63. mu sync.RWMutex
  64. remoteIDByTag map[string]int
  65. // pushedFP holds the fingerprint of the last inbound wire payload successfully
  66. // pushed, keyed by panel-side tag, so reconcile can skip re-sending an
  67. // unchanged inbound. Guarded by mu; dropped with the Remote on node config change.
  68. pushedFP map[string]string
  69. // supportsZstd is learned from the node's X-3x-Node-Caps response header; once
  70. // seen, config pushes to this node are zstd-compressed. Old nodes never set
  71. // it, so they keep receiving plain bodies (mixed-version safe).
  72. supportsZstd bool
  73. // Per-node client honoring the TLS verify mode, built once and reused; a
  74. // node config change drops the cached Remote so the next one rebuilds it.
  75. clientOnce sync.Once
  76. client *http.Client
  77. clientErr error
  78. egressResolver NodeEgressResolver
  79. }
  80. type RemoteInboundOption struct {
  81. Tag string `json:"tag"`
  82. Remark string `json:"remark"`
  83. Protocol model.Protocol `json:"protocol"`
  84. Port int `json:"port"`
  85. }
  86. func NewRemote(n *model.Node, r NodeEgressResolver) *Remote {
  87. return &Remote{
  88. node: n,
  89. remoteIDByTag: make(map[string]int),
  90. pushedFP: make(map[string]string),
  91. egressResolver: r,
  92. }
  93. }
  94. func (r *Remote) Name() string { return "node:" + r.node.Name }
  95. func (r *Remote) nodeSupportsZstd() bool {
  96. r.mu.RLock()
  97. defer r.mu.RUnlock()
  98. return r.supportsZstd
  99. }
  100. // recordCaps learns the node's capabilities from a response header so later
  101. // pushes can use the negotiated envelope.
  102. func (r *Remote) recordCaps(h http.Header) {
  103. if !strings.Contains(h.Get(wirecodec.CapsHeader), wirecodec.CapZstd) {
  104. return
  105. }
  106. r.mu.Lock()
  107. r.supportsZstd = true
  108. r.mu.Unlock()
  109. }
  110. // httpClient lazily builds and caches the per-node client honoring the TLS
  111. // verify mode, so Remote ops don't fall back to system CA on skip/pin (#5264).
  112. func (r *Remote) httpClient() (*http.Client, error) {
  113. r.clientOnce.Do(func() {
  114. proxyURL := ""
  115. if r.node.OutboundTag != "" && r.egressResolver != nil {
  116. proxyURL = r.egressResolver.NodeEgressProxyURL(r.node.Id)
  117. }
  118. r.client, r.clientErr = HTTPClientForNode(r.node, proxyURL)
  119. })
  120. return r.client, r.clientErr
  121. }
  122. func (r *Remote) baseURL() (string, error) {
  123. addr, err := netsafe.NormalizeHost(r.node.Address)
  124. if err != nil {
  125. return "", err
  126. }
  127. scheme := r.node.Scheme
  128. if scheme != "http" && scheme != "https" {
  129. scheme = "https"
  130. }
  131. if r.node.Port <= 0 || r.node.Port > 65535 {
  132. return "", fmt.Errorf("invalid node port %d", r.node.Port)
  133. }
  134. bp := r.node.BasePath
  135. if !strings.HasSuffix(bp, "/") {
  136. bp += "/"
  137. }
  138. u := &url.URL{
  139. Scheme: scheme,
  140. Host: net.JoinHostPort(addr, strconv.Itoa(r.node.Port)),
  141. Path: bp,
  142. }
  143. return u.String(), nil
  144. }
  145. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) {
  146. // mtls nodes authenticate via the client certificate, so a bearer token is
  147. // optional for them; every other mode still requires one.
  148. if r.node.ApiToken == "" && r.node.TlsVerifyMode != "mtls" {
  149. return nil, errors.New("node has no API token configured")
  150. }
  151. base, err := r.baseURL()
  152. if err != nil {
  153. return nil, err
  154. }
  155. target := base + strings.TrimPrefix(path, "/")
  156. var (
  157. bodyBytes []byte
  158. contentType string
  159. )
  160. switch b := body.(type) {
  161. case nil:
  162. case url.Values:
  163. bodyBytes = []byte(b.Encode())
  164. contentType = "application/x-www-form-urlencoded"
  165. default:
  166. buf, jerr := json.Marshal(b)
  167. if jerr != nil {
  168. return nil, fmt.Errorf("marshal body: %w", jerr)
  169. }
  170. bodyBytes = buf
  171. contentType = "application/json"
  172. }
  173. // Attach the integrity hash of the uncompressed body unconditionally (a new
  174. // node verifies it, an old one ignores it), and zstd-compress only when the
  175. // node advertised support and the body is worth it.
  176. var (
  177. reqBody io.Reader
  178. hashHex string
  179. zstdEncoded bool
  180. )
  181. if bodyBytes != nil {
  182. hashHex = wirecodec.Sha256Hex(bodyBytes)
  183. if len(bodyBytes) >= zstdMinBodyBytes && r.nodeSupportsZstd() {
  184. bodyBytes = wirecodec.Compress(bodyBytes)
  185. zstdEncoded = true
  186. }
  187. reqBody = bytes.NewReader(bodyBytes)
  188. }
  189. cctx, cancel := context.WithTimeout(netsafe.ContextWithAllowPrivate(ctx, r.node.AllowPrivateAddress), remoteHTTPTimeout)
  190. defer cancel()
  191. req, err := http.NewRequestWithContext(cctx, method, target, reqBody)
  192. if err != nil {
  193. return nil, err
  194. }
  195. if r.node.ApiToken != "" {
  196. req.Header.Set("Authorization", "Bearer "+r.node.ApiToken)
  197. }
  198. req.Header.Set("Accept", "application/json")
  199. if contentType != "" {
  200. req.Header.Set("Content-Type", contentType)
  201. }
  202. if hashHex != "" {
  203. req.Header.Set(wirecodec.HashHeader, hashHex)
  204. }
  205. if zstdEncoded {
  206. req.Header.Set("Content-Encoding", wirecodec.EncodingZstd)
  207. }
  208. client, err := r.httpClient()
  209. if err != nil {
  210. return nil, err
  211. }
  212. resp, err := client.Do(req)
  213. if err != nil {
  214. return nil, fmt.Errorf("%s %s: %w", method, path, err)
  215. }
  216. defer resp.Body.Close()
  217. r.recordCaps(resp.Header)
  218. // Validate status before reading a success payload: a non-OK response's
  219. // body is never used beyond a short diagnostic, so don't let a node force us
  220. // to buffer a large body just to return an HTTP error.
  221. if resp.StatusCode != http.StatusOK {
  222. snippet, _ := io.ReadAll(io.LimitReader(resp.Body, errBodyDiagBytes))
  223. if msg := bytes.TrimSpace(snippet); len(msg) > 0 {
  224. // %q quotes/escapes the untrusted node body so control characters or
  225. // newlines in it can't garble or inject into the error/log output.
  226. return nil, fmt.Errorf("%s %s: HTTP %d: %q", method, path, resp.StatusCode, msg)
  227. }
  228. return nil, fmt.Errorf("%s %s: HTTP %d", method, path, resp.StatusCode)
  229. }
  230. // Fast-fail on an honestly-declared oversize body; the LimitReader below is
  231. // the real guard since Content-Length is untrusted, may be absent, or is -1
  232. // under transparent decompression.
  233. if resp.ContentLength > maxRemoteResponseBytes {
  234. return nil, fmt.Errorf("%s %s: %w (content-length %d, cap %d)", method, path, errRemoteResponseTooLarge, resp.ContentLength, maxRemoteResponseBytes)
  235. }
  236. raw, err := readCappedBody(resp.Body, maxRemoteResponseBytes)
  237. if err != nil {
  238. if errors.Is(err, errRemoteResponseTooLarge) {
  239. return nil, fmt.Errorf("%s %s: %w (cap %d bytes)", method, path, err, maxRemoteResponseBytes)
  240. }
  241. return nil, fmt.Errorf("read body: %w", err)
  242. }
  243. var env envelope
  244. if err := json.Unmarshal(raw, &env); err != nil {
  245. return nil, fmt.Errorf("decode envelope: %w", err)
  246. }
  247. if !env.Success {
  248. return &env, fmt.Errorf("remote: %s", env.Msg)
  249. }
  250. return &env, nil
  251. }
  252. func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) {
  253. if id, ok := r.cacheGetTag(tag); ok {
  254. return id, nil
  255. }
  256. if err := r.refreshRemoteIDs(ctx); err != nil {
  257. return 0, err
  258. }
  259. if id, ok := r.cacheGetTag(tag); ok {
  260. return id, nil
  261. }
  262. return 0, fmt.Errorf("remote inbound with tag %q not found on node %s", tag, r.node.Name)
  263. }
  264. // nodeInboundTagPrefix is the central-panel alias for an inbound on nodeID.
  265. // Kept in sync with service.nodeTagPrefix (port_conflict.go); duplicated here
  266. // so runtime does not import service.
  267. func nodeInboundTagPrefix(nodeID int) string {
  268. return fmt.Sprintf("n%d-", nodeID)
  269. }
  270. // stripNodeInboundTagPrefix removes the central-only n<id>- prefix before
  271. // pushing an inbound to the node so Xray keeps its original tag and routing.
  272. func stripNodeInboundTagPrefix(nodeID int, tag string) string {
  273. if stripped, ok := strings.CutPrefix(tag, nodeInboundTagPrefix(nodeID)); ok {
  274. return stripped
  275. }
  276. return tag
  277. }
  278. // cacheGetTag looks up a remote inbound id by tag, tolerating an n<id>- prefix
  279. // that lives on only one of the two panels: the node may carry the bare tag
  280. // while the central panel stores the prefixed form, or vice versa.
  281. func (r *Remote) cacheGetTag(tag string) (int, bool) {
  282. if id, ok := r.cacheGet(tag); ok {
  283. return id, true
  284. }
  285. prefix := nodeInboundTagPrefix(r.node.Id)
  286. if stripped, found := strings.CutPrefix(tag, prefix); found {
  287. return r.cacheGet(stripped)
  288. }
  289. return r.cacheGet(prefix + tag)
  290. }
  291. func (r *Remote) cacheGet(tag string) (int, bool) {
  292. r.mu.RLock()
  293. defer r.mu.RUnlock()
  294. id, ok := r.remoteIDByTag[tag]
  295. return id, ok
  296. }
  297. func (r *Remote) cacheSet(tag string, id int) {
  298. r.mu.Lock()
  299. defer r.mu.Unlock()
  300. r.remoteIDByTag[tag] = id
  301. }
  302. func (r *Remote) cacheDel(tag string) {
  303. r.mu.Lock()
  304. defer r.mu.Unlock()
  305. delete(r.remoteIDByTag, tag)
  306. }
  307. func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) {
  308. if err := r.refreshRemoteIDs(ctx); err != nil {
  309. return nil, err
  310. }
  311. r.mu.RLock()
  312. defer r.mu.RUnlock()
  313. tags := make([]string, 0, len(r.remoteIDByTag))
  314. for tag := range r.remoteIDByTag {
  315. tags = append(tags, tag)
  316. }
  317. return tags, nil
  318. }
  319. func (r *Remote) ListInboundOptions(ctx context.Context) ([]RemoteInboundOption, error) {
  320. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  321. if err != nil {
  322. return nil, err
  323. }
  324. var list []RemoteInboundOption
  325. if err := json.Unmarshal(env.Obj, &list); err != nil {
  326. return nil, fmt.Errorf("decode inbound list: %w", err)
  327. }
  328. return list, nil
  329. }
  330. func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
  331. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  332. if err != nil {
  333. return err
  334. }
  335. var list []struct {
  336. Id int `json:"id"`
  337. Tag string `json:"tag"`
  338. }
  339. if err := json.Unmarshal(env.Obj, &list); err != nil {
  340. return fmt.Errorf("decode inbound list: %w", err)
  341. }
  342. next := make(map[string]int, len(list))
  343. for _, ib := range list {
  344. if ib.Tag == "" {
  345. continue
  346. }
  347. next[ib.Tag] = ib.Id
  348. }
  349. r.mu.Lock()
  350. r.remoteIDByTag = next
  351. r.mu.Unlock()
  352. return nil
  353. }
  354. func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error {
  355. payload := wireInbound(ib, r.node.Id)
  356. env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload)
  357. if err != nil {
  358. return err
  359. }
  360. var created struct {
  361. Id int `json:"id"`
  362. Tag string `json:"tag"`
  363. }
  364. if len(env.Obj) > 0 {
  365. if err := json.Unmarshal(env.Obj, &created); err == nil && created.Id > 0 && created.Tag != "" {
  366. r.cacheSet(created.Tag, created.Id)
  367. }
  368. }
  369. return nil
  370. }
  371. func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error {
  372. id, err := r.resolveRemoteID(ctx, ib.Tag)
  373. if err != nil {
  374. logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name)
  375. return nil
  376. }
  377. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil {
  378. return err
  379. }
  380. r.cacheDel(ib.Tag)
  381. return nil
  382. }
  383. func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
  384. id, err := r.resolveRemoteID(ctx, oldIb.Tag)
  385. if err != nil {
  386. return r.AddInbound(ctx, newIb)
  387. }
  388. payload := wireInbound(newIb, r.node.Id)
  389. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil {
  390. return err
  391. }
  392. if oldIb.Tag != newIb.Tag {
  393. r.cacheDel(oldIb.Tag)
  394. }
  395. r.cacheSet(newIb.Tag, id)
  396. return nil
  397. }
  398. // ReconcileInbound pushes ib only when its wire payload differs from the last
  399. // successful push, or when the node no longer reports the tag (existsOnNode
  400. // false) — a node that dropped/restarted must still be re-seeded. Returns
  401. // whether a push actually happened. This turns a full-fleet reconcile from "send
  402. // every inbound's full settings" into "send only what changed".
  403. func (r *Remote) ReconcileInbound(ctx context.Context, ib *model.Inbound, existsOnNode bool) (bool, error) {
  404. fp := wireFingerprint(wireInbound(ib, r.node.Id))
  405. if existsOnNode {
  406. r.mu.RLock()
  407. prev, ok := r.pushedFP[ib.Tag]
  408. r.mu.RUnlock()
  409. if ok && prev == fp {
  410. return false, nil
  411. }
  412. }
  413. if err := r.UpdateInbound(ctx, ib, ib); err != nil {
  414. return false, err
  415. }
  416. r.mu.Lock()
  417. r.pushedFP[ib.Tag] = fp
  418. r.mu.Unlock()
  419. return true, nil
  420. }
  421. // wireFingerprint hashes a wire payload so an unchanged inbound is cheap to detect.
  422. func wireFingerprint(v url.Values) string {
  423. sum := sha256.Sum256([]byte(v.Encode()))
  424. return hex.EncodeToString(sum[:])
  425. }
  426. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
  427. return r.UpdateInbound(ctx, ib, ib)
  428. }
  429. func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) error {
  430. return r.UpdateInbound(ctx, ib, ib)
  431. }
  432. func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error {
  433. id, err := r.resolveRemoteID(ctx, ib.Tag)
  434. if err != nil {
  435. return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err)
  436. }
  437. payload := map[string]any{
  438. "client": client,
  439. "inboundIds": []int{id},
  440. }
  441. if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil {
  442. return err
  443. }
  444. return nil
  445. }
  446. func (r *Remote) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error {
  447. if email == "" {
  448. return nil
  449. }
  450. id, err := r.resolveRemoteID(ctx, ib.Tag)
  451. if err != nil {
  452. // Can't confirm the delete reached the node — surface it so the caller
  453. // marks the node dirty and a reconcile converges, instead of silently
  454. // dropping the delete and letting the next snapshot resurrect the client.
  455. return fmt.Errorf("remote DeleteUser: resolve tag %q: %w", ib.Tag, err)
  456. }
  457. body := map[string]any{"inboundIds": []int{id}}
  458. _, err = r.do(ctx, http.MethodPost,
  459. "panel/api/clients/"+url.PathEscape(email)+"/detach", body)
  460. if err == nil {
  461. return nil
  462. }
  463. if strings.Contains(strings.ToLower(err.Error()), "not found") {
  464. return nil
  465. }
  466. return err
  467. }
  468. func (r *Remote) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error {
  469. if oldEmail == "" {
  470. oldEmail = payload.Email
  471. }
  472. id, err := r.resolveRemoteID(ctx, ib.Tag)
  473. if err != nil {
  474. return err
  475. }
  476. path := "panel/api/clients/update/" + url.PathEscape(oldEmail) +
  477. "?inboundIds=" + strconv.Itoa(id)
  478. if _, err := r.do(ctx, http.MethodPost, path, payload); err != nil {
  479. return err
  480. }
  481. return nil
  482. }
  483. func (r *Remote) RestartXray(ctx context.Context) error {
  484. _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil)
  485. return err
  486. }
  487. // UpdatePanel asks the node to run its own official self-updater (update.sh)
  488. // and restart onto the latest release. The node returns as soon as the job is
  489. // launched; the new version surfaces on the next heartbeat.
  490. func (r *Remote) UpdatePanel(ctx context.Context) error {
  491. _, err := r.do(ctx, http.MethodPost, "panel/api/server/updatePanel", nil)
  492. return err
  493. }
  494. // WebCertFiles holds a node's own web TLS certificate and key file paths.
  495. type WebCertFiles struct {
  496. WebCertFile string `json:"webCertFile"`
  497. WebKeyFile string `json:"webKeyFile"`
  498. }
  499. // GetWebCertFiles fetches the node's own web TLS certificate/key file paths so
  500. // the central panel can offer them as the "Set Cert from Panel" default for a
  501. // node-assigned inbound — those paths exist on the node, the central panel's
  502. // don't. See issue #4854.
  503. func (r *Remote) GetWebCertFiles(ctx context.Context) (*WebCertFiles, error) {
  504. env, err := r.do(ctx, http.MethodGet, "panel/api/server/getWebCertFiles", nil)
  505. if err != nil {
  506. return nil, err
  507. }
  508. var files WebCertFiles
  509. if err := json.Unmarshal(env.Obj, &files); err != nil {
  510. return nil, fmt.Errorf("decode web cert files: %w", err)
  511. }
  512. return &files, nil
  513. }
  514. // GetDescendants fetches the node's read-only summaries of the nodes IT
  515. // manages, so this panel can surface them as transitive sub-nodes in a chained
  516. // topology (#4983). Best-effort: an old-build node without the endpoint returns
  517. // an error the caller ignores.
  518. func (r *Remote) GetDescendants(ctx context.Context) ([]model.NodeSummary, error) {
  519. env, err := r.do(ctx, http.MethodGet, "panel/api/server/descendants", nil)
  520. if err != nil {
  521. return nil, err
  522. }
  523. var out []model.NodeSummary
  524. if len(env.Obj) > 0 {
  525. if err := json.Unmarshal(env.Obj, &out); err != nil {
  526. return nil, fmt.Errorf("decode descendants: %w", err)
  527. }
  528. }
  529. return out, nil
  530. }
  531. func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error {
  532. _, err := r.do(ctx, http.MethodPost,
  533. "panel/api/clients/resetTraffic/"+url.PathEscape(email), nil)
  534. return err
  535. }
  536. func (r *Remote) ResetAllTraffics(ctx context.Context) error {
  537. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/resetAllTraffics", nil)
  538. return err
  539. }
  540. func (r *Remote) ResetInboundTraffic(ctx context.Context, ib *model.Inbound) error {
  541. _, err := r.do(ctx, http.MethodPost, fmt.Sprintf("panel/api/inbounds/%d/resetTraffic", ib.Id), nil)
  542. return err
  543. }
  544. type TrafficSnapshot struct {
  545. Inbounds []*model.Inbound
  546. OnlineEmails []string
  547. // OnlineTree is the node's GUID-keyed online subtree (its own clients under
  548. // its panelGuid plus every descendant under theirs). Preferred over the flat
  549. // OnlineEmails so the master can attribute deeply nested clients to the real
  550. // node across a chain (#4983). Empty when the node is an old build without
  551. // the per-GUID endpoint — OnlineEmails is the fallback then.
  552. OnlineTree map[string][]string
  553. LastOnlineMap map[string]int64
  554. }
  555. func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) {
  556. snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}}
  557. envList, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  558. if err != nil {
  559. return nil, err
  560. }
  561. if err := json.Unmarshal(envList.Obj, &snap.Inbounds); err != nil {
  562. return nil, fmt.Errorf("decode inbound list: %w", err)
  563. }
  564. // Prefer the GUID-keyed subtree; fall back to the flat list only when the
  565. // node is an old build without the per-GUID endpoint (#4983).
  566. envTree, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlinesByGuid", nil)
  567. if err == nil && len(envTree.Obj) > 0 {
  568. _ = json.Unmarshal(envTree.Obj, &snap.OnlineTree)
  569. }
  570. if len(snap.OnlineTree) == 0 {
  571. envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil)
  572. if err != nil {
  573. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
  574. } else if len(envOnlines.Obj) > 0 {
  575. _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
  576. }
  577. }
  578. envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil)
  579. if err != nil {
  580. logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
  581. } else if len(envLastOnline.Obj) > 0 {
  582. _ = json.Unmarshal(envLastOnline.Obj, &snap.LastOnlineMap)
  583. }
  584. return snap, nil
  585. }
  586. // PushGlobalClientTraffics sends this panel's aggregated per-client usage to
  587. // the node, tagged with this panel's GUID so the node keeps one row per
  588. // pushing master. Display/enforcement input on the node only — the node never
  589. // folds these into the counters it reports back, so this panel's (and any
  590. // other master's) delta accounting over the node snapshot stays intact.
  591. func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error {
  592. payload := map[string]any{
  593. "masterGuid": masterGuid,
  594. "traffics": traffics,
  595. }
  596. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload)
  597. return err
  598. }
  599. func wireInbound(ib *model.Inbound, remoteNodeID int) url.Values {
  600. v := url.Values{}
  601. v.Set("total", strconv.FormatInt(ib.Total, 10))
  602. v.Set("remark", ib.Remark)
  603. v.Set("subSortIndex", strconv.Itoa(ib.SubSortIndex))
  604. v.Set("enable", strconv.FormatBool(ib.Enable))
  605. v.Set("expiryTime", strconv.FormatInt(ib.ExpiryTime, 10))
  606. v.Set("listen", ib.Listen)
  607. v.Set("port", strconv.Itoa(ib.Port))
  608. v.Set("protocol", string(ib.Protocol))
  609. v.Set("settings", ib.Settings)
  610. v.Set("streamSettings", sanitizeStreamSettingsForRemote(ib.StreamSettings))
  611. tag := ib.Tag
  612. if remoteNodeID > 0 {
  613. tag = stripNodeInboundTagPrefix(remoteNodeID, tag)
  614. }
  615. v.Set("tag", tag)
  616. v.Set("sniffing", ib.Sniffing)
  617. shareAddrStrategy := strings.TrimSpace(ib.ShareAddrStrategy)
  618. switch shareAddrStrategy {
  619. case "listen", "custom":
  620. default:
  621. shareAddrStrategy = "node"
  622. }
  623. v.Set("shareAddrStrategy", shareAddrStrategy)
  624. v.Set("shareAddr", ib.ShareAddr)
  625. if ib.TrafficReset != "" {
  626. v.Set("trafficReset", ib.TrafficReset)
  627. }
  628. return v
  629. }
  630. // sanitizeStreamSettingsForRemote strips file-based TLS certificate paths
  631. // from the StreamSettings before sending to a remote node, but ONLY when
  632. // inline certificate content (certificate / key) is also present in the same
  633. // entry. In that case the file paths are redundant and stripping them avoids
  634. // confusion when the central panel's local paths don't exist on the remote.
  635. //
  636. // When a certificate entry contains ONLY file paths (no inline content) the
  637. // paths are left untouched: the user explicitly entered paths that exist on
  638. // the remote node's filesystem, and removing them would leave Xray with TLS
  639. // configured but no certificate, causing Xray to crash on the remote node.
  640. func sanitizeStreamSettingsForRemote(streamSettings string) string {
  641. if streamSettings == "" {
  642. return streamSettings
  643. }
  644. var stream map[string]any
  645. if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
  646. return streamSettings
  647. }
  648. tlsSettings, ok := stream["tlsSettings"].(map[string]any)
  649. if !ok {
  650. return streamSettings
  651. }
  652. certificates, ok := tlsSettings["certificates"].([]any)
  653. if !ok {
  654. return streamSettings
  655. }
  656. changed := false
  657. for _, cert := range certificates {
  658. c, ok := cert.(map[string]any)
  659. if !ok {
  660. continue
  661. }
  662. // Only strip file paths when inline content is present so that the
  663. // remote Xray still has a valid certificate to use.
  664. hasCertFile := c["certificateFile"] != nil && c["certificateFile"] != ""
  665. hasKeyFile := c["keyFile"] != nil && c["keyFile"] != ""
  666. hasCertInline := isNonEmptySlice(c["certificate"])
  667. hasKeyInline := isNonEmptySlice(c["key"])
  668. if hasCertFile && hasCertInline {
  669. delete(c, "certificateFile")
  670. changed = true
  671. }
  672. if hasKeyFile && hasKeyInline {
  673. delete(c, "keyFile")
  674. changed = true
  675. }
  676. }
  677. if !changed {
  678. return streamSettings
  679. }
  680. out, err := json.Marshal(stream)
  681. if err != nil {
  682. return streamSettings
  683. }
  684. return string(out)
  685. }
  686. // isNonEmptySlice reports whether v is a non-nil, non-empty JSON array value.
  687. func isNonEmptySlice(v any) bool {
  688. s, ok := v.([]any)
  689. return ok && len(s) > 0
  690. }
  691. func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) {
  692. env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil)
  693. if err != nil {
  694. return nil, err
  695. }
  696. var ips []model.InboundClientIps
  697. if len(env.Obj) > 0 {
  698. if err := json.Unmarshal(env.Obj, &ips); err != nil {
  699. return nil, fmt.Errorf("decode client ips: %w", err)
  700. }
  701. }
  702. return ips, nil
  703. }
  704. func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error {
  705. _, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
  706. return err
  707. }
  708. // FetchClientIpsByGuid pulls the node's per-node IP attribution subtree
  709. // (guid -> email -> observed IPs). Unlike FetchAllClientIps (the flat union the
  710. // master also pushes back), this preserves which physical node each IP is on.
  711. // Returns an empty map for older nodes that lack the endpoint.
  712. func (r *Remote) FetchClientIpsByGuid(ctx context.Context) (map[string]map[string][]model.ClientIpEntry, error) {
  713. env, err := r.do(ctx, http.MethodPost, "panel/api/clients/clientIpsByGuid", nil)
  714. if err != nil {
  715. return nil, err
  716. }
  717. out := map[string]map[string][]model.ClientIpEntry{}
  718. if len(env.Obj) > 0 {
  719. if err := json.Unmarshal(env.Obj, &out); err != nil {
  720. return nil, fmt.Errorf("decode client ips by guid: %w", err)
  721. }
  722. }
  723. return out, nil
  724. }