1
0

remote.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843
  1. package runtime
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  19. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  20. "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
  21. "github.com/mhsanaei/3x-ui/v3/internal/util/wirecodec"
  22. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  23. )
  24. const remoteHTTPTimeout = 10 * time.Second
  25. // zstdMinBodyBytes is the smallest body worth compressing; below it the framing
  26. // overhead can outweigh the savings.
  27. const zstdMinBodyBytes = 1024
  28. // maxRemoteResponseBytes caps a single node RPC's response body. It bounds the
  29. // wire/decompressed size of one response — the real guard against a broken or
  30. // hostile node streaming an unbounded body. It is NOT a process-wide memory
  31. // bound: concurrent RPCs and the decoded JSON can each exceed it, so
  32. // endpoint-specific caps and a concurrency budget remain follow-ups. Node
  33. // responses (traffic snapshots, client-IP lists, inbound options) are JSON and
  34. // stay well under it.
  35. const maxRemoteResponseBytes = 64 << 20 // 64 MiB
  36. // errBodyDiagBytes bounds how much of a non-OK error body we read for a
  37. // diagnostic snippet (and to let small-error connections be reused) without
  38. // buffering a potentially huge or hostile error payload.
  39. const errBodyDiagBytes = 8 << 10 // 8 KiB
  40. // errRemoteResponseTooLarge is returned when a node response exceeds the cap.
  41. var errRemoteResponseTooLarge = errors.New("remote response exceeds size limit")
  42. // readCappedBody reads all of r but rejects bodies larger than limit, returning
  43. // errRemoteResponseTooLarge. It reads at most limit+1 bytes so a body of exactly
  44. // limit is accepted and the first oversize byte is detected without buffering
  45. // more.
  46. func readCappedBody(r io.Reader, limit int64) ([]byte, error) {
  47. raw, err := io.ReadAll(io.LimitReader(r, limit+1))
  48. if err != nil {
  49. return nil, err
  50. }
  51. if int64(len(raw)) > limit {
  52. return nil, errRemoteResponseTooLarge
  53. }
  54. return raw, nil
  55. }
  56. type envelope struct {
  57. Success bool `json:"success"`
  58. Msg string `json:"msg"`
  59. Obj json.RawMessage `json:"obj"`
  60. }
  61. // remoteAPIError is a node-panel envelope failure (HTTP 200, success=false),
  62. // distinct from transport/HTTP-status errors so callers can trust its message.
  63. type remoteAPIError struct{ msg string }
  64. func (e *remoteAPIError) Error() string { return "remote: " + e.msg }
  65. type Remote struct {
  66. node *model.Node
  67. mu sync.RWMutex
  68. remoteIDByTag map[string]int
  69. // pushedFP holds the fingerprint of the last inbound wire payload successfully
  70. // pushed, keyed by panel-side tag, so reconcile can skip re-sending an
  71. // unchanged inbound. Guarded by mu; dropped with the Remote on node config change.
  72. pushedFP map[string]string
  73. // supportsZstd is learned from the node's X-3x-Node-Caps response header; once
  74. // seen, config pushes to this node are zstd-compressed. Old nodes never set
  75. // it, so they keep receiving plain bodies (mixed-version safe).
  76. supportsZstd bool
  77. // Per-node client honoring the TLS verify mode, built once and reused; a
  78. // node config change drops the cached Remote so the next one rebuilds it.
  79. clientOnce sync.Once
  80. client *http.Client
  81. clientErr error
  82. egressResolver NodeEgressResolver
  83. }
  84. type RemoteInboundOption struct {
  85. Tag string `json:"tag"`
  86. Remark string `json:"remark"`
  87. Protocol model.Protocol `json:"protocol"`
  88. Port int `json:"port"`
  89. }
  90. func NewRemote(n *model.Node, r NodeEgressResolver) *Remote {
  91. return &Remote{
  92. node: n,
  93. remoteIDByTag: make(map[string]int),
  94. pushedFP: make(map[string]string),
  95. egressResolver: r,
  96. }
  97. }
  98. func (r *Remote) Name() string { return "node:" + r.node.Name }
  99. func (r *Remote) nodeSupportsZstd() bool {
  100. r.mu.RLock()
  101. defer r.mu.RUnlock()
  102. return r.supportsZstd
  103. }
  104. // recordCaps learns the node's capabilities from a response header so later
  105. // pushes can use the negotiated envelope.
  106. func (r *Remote) recordCaps(h http.Header) {
  107. if !strings.Contains(h.Get(wirecodec.CapsHeader), wirecodec.CapZstd) {
  108. return
  109. }
  110. r.mu.Lock()
  111. r.supportsZstd = true
  112. r.mu.Unlock()
  113. }
  114. // httpClient lazily builds and caches the per-node client honoring the TLS
  115. // verify mode, so Remote ops don't fall back to system CA on skip/pin (#5264).
  116. func (r *Remote) httpClient() (*http.Client, error) {
  117. r.clientOnce.Do(func() {
  118. proxyURL := ""
  119. if r.node.OutboundTag != "" && r.egressResolver != nil {
  120. proxyURL = r.egressResolver.NodeEgressProxyURL(r.node.Id)
  121. }
  122. r.client, r.clientErr = HTTPClientForNode(r.node, proxyURL)
  123. })
  124. return r.client, r.clientErr
  125. }
  126. func (r *Remote) baseURL() (string, error) {
  127. addr, err := netsafe.NormalizeHost(r.node.Address)
  128. if err != nil {
  129. return "", err
  130. }
  131. scheme := r.node.Scheme
  132. if scheme != "http" && scheme != "https" {
  133. scheme = "https"
  134. }
  135. if r.node.Port <= 0 || r.node.Port > 65535 {
  136. return "", fmt.Errorf("invalid node port %d", r.node.Port)
  137. }
  138. bp := r.node.BasePath
  139. if !strings.HasSuffix(bp, "/") {
  140. bp += "/"
  141. }
  142. u := &url.URL{
  143. Scheme: scheme,
  144. Host: net.JoinHostPort(addr, strconv.Itoa(r.node.Port)),
  145. Path: bp,
  146. }
  147. return u.String(), nil
  148. }
  149. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) {
  150. // mtls nodes authenticate via the client certificate, so a bearer token is
  151. // optional for them; every other mode still requires one.
  152. if r.node.ApiToken == "" && r.node.TlsVerifyMode != "mtls" {
  153. return nil, errors.New("node has no API token configured")
  154. }
  155. base, err := r.baseURL()
  156. if err != nil {
  157. return nil, err
  158. }
  159. target := base + strings.TrimPrefix(path, "/")
  160. var (
  161. bodyBytes []byte
  162. contentType string
  163. )
  164. switch b := body.(type) {
  165. case nil:
  166. case url.Values:
  167. bodyBytes = []byte(b.Encode())
  168. contentType = "application/x-www-form-urlencoded"
  169. default:
  170. buf, jerr := json.Marshal(b)
  171. if jerr != nil {
  172. return nil, fmt.Errorf("marshal body: %w", jerr)
  173. }
  174. bodyBytes = buf
  175. contentType = "application/json"
  176. }
  177. // Attach the integrity hash of the uncompressed body unconditionally (a new
  178. // node verifies it, an old one ignores it), and zstd-compress only when the
  179. // node advertised support and the body is worth it.
  180. var (
  181. reqBody io.Reader
  182. hashHex string
  183. zstdEncoded bool
  184. )
  185. if bodyBytes != nil {
  186. hashHex = wirecodec.Sha256Hex(bodyBytes)
  187. if len(bodyBytes) >= zstdMinBodyBytes && r.nodeSupportsZstd() {
  188. bodyBytes = wirecodec.Compress(bodyBytes)
  189. zstdEncoded = true
  190. }
  191. reqBody = bytes.NewReader(bodyBytes)
  192. }
  193. cctx, cancel := context.WithTimeout(netsafe.ContextWithAllowPrivate(ctx, r.node.AllowPrivateAddress), remoteHTTPTimeout)
  194. defer cancel()
  195. req, err := http.NewRequestWithContext(cctx, method, target, reqBody)
  196. if err != nil {
  197. return nil, err
  198. }
  199. if r.node.ApiToken != "" {
  200. req.Header.Set("Authorization", "Bearer "+r.node.ApiToken)
  201. }
  202. req.Header.Set("Accept", "application/json")
  203. if contentType != "" {
  204. req.Header.Set("Content-Type", contentType)
  205. }
  206. if hashHex != "" {
  207. req.Header.Set(wirecodec.HashHeader, hashHex)
  208. }
  209. if zstdEncoded {
  210. req.Header.Set("Content-Encoding", wirecodec.EncodingZstd)
  211. }
  212. client, err := r.httpClient()
  213. if err != nil {
  214. return nil, err
  215. }
  216. resp, err := client.Do(req)
  217. if err != nil {
  218. return nil, fmt.Errorf("%s %s: %w", method, path, err)
  219. }
  220. defer resp.Body.Close()
  221. r.recordCaps(resp.Header)
  222. // Validate status before reading a success payload: a non-OK response's
  223. // body is never used beyond a short diagnostic, so don't let a node force us
  224. // to buffer a large body just to return an HTTP error.
  225. if resp.StatusCode != http.StatusOK {
  226. snippet, _ := io.ReadAll(io.LimitReader(resp.Body, errBodyDiagBytes))
  227. if msg := bytes.TrimSpace(snippet); len(msg) > 0 {
  228. // %q quotes/escapes the untrusted node body so control characters or
  229. // newlines in it can't garble or inject into the error/log output.
  230. return nil, fmt.Errorf("%s %s: HTTP %d: %q", method, path, resp.StatusCode, msg)
  231. }
  232. return nil, fmt.Errorf("%s %s: HTTP %d", method, path, resp.StatusCode)
  233. }
  234. // Fast-fail on an honestly-declared oversize body; the LimitReader below is
  235. // the real guard since Content-Length is untrusted, may be absent, or is -1
  236. // under transparent decompression.
  237. if resp.ContentLength > maxRemoteResponseBytes {
  238. return nil, fmt.Errorf("%s %s: %w (content-length %d, cap %d)", method, path, errRemoteResponseTooLarge, resp.ContentLength, maxRemoteResponseBytes)
  239. }
  240. raw, err := readCappedBody(resp.Body, maxRemoteResponseBytes)
  241. if err != nil {
  242. if errors.Is(err, errRemoteResponseTooLarge) {
  243. return nil, fmt.Errorf("%s %s: %w (cap %d bytes)", method, path, err, maxRemoteResponseBytes)
  244. }
  245. return nil, fmt.Errorf("read body: %w", err)
  246. }
  247. var env envelope
  248. if err := json.Unmarshal(raw, &env); err != nil {
  249. return nil, fmt.Errorf("decode envelope: %w", err)
  250. }
  251. if !env.Success {
  252. return &env, &remoteAPIError{msg: env.Msg}
  253. }
  254. return &env, nil
  255. }
  256. func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) {
  257. if id, ok := r.cacheGetTag(tag); ok {
  258. return id, nil
  259. }
  260. if err := r.refreshRemoteIDs(ctx); err != nil {
  261. return 0, err
  262. }
  263. if id, ok := r.cacheGetTag(tag); ok {
  264. return id, nil
  265. }
  266. return 0, fmt.Errorf("remote inbound with tag %q not found on node %s", tag, r.node.Name)
  267. }
  268. // nodeInboundTagPrefix is the central-panel alias for an inbound on nodeID.
  269. // Kept in sync with service.nodeTagPrefix (port_conflict.go); duplicated here
  270. // so runtime does not import service.
  271. func nodeInboundTagPrefix(nodeID int) string {
  272. return fmt.Sprintf("n%d-", nodeID)
  273. }
  274. // stripNodeInboundTagPrefix removes the central-only n<id>- prefix before
  275. // pushing an inbound to the node so Xray keeps its original tag and routing.
  276. func stripNodeInboundTagPrefix(nodeID int, tag string) string {
  277. if stripped, ok := strings.CutPrefix(tag, nodeInboundTagPrefix(nodeID)); ok {
  278. return stripped
  279. }
  280. return tag
  281. }
  282. // cacheGetTag looks up a remote inbound id by tag, tolerating an n<id>- prefix
  283. // that lives on only one of the two panels: the node may carry the bare tag
  284. // while the central panel stores the prefixed form, or vice versa.
  285. func (r *Remote) cacheGetTag(tag string) (int, bool) {
  286. if id, ok := r.cacheGet(tag); ok {
  287. return id, true
  288. }
  289. prefix := nodeInboundTagPrefix(r.node.Id)
  290. if stripped, found := strings.CutPrefix(tag, prefix); found {
  291. return r.cacheGet(stripped)
  292. }
  293. return r.cacheGet(prefix + tag)
  294. }
  295. func (r *Remote) cacheGet(tag string) (int, bool) {
  296. r.mu.RLock()
  297. defer r.mu.RUnlock()
  298. id, ok := r.remoteIDByTag[tag]
  299. return id, ok
  300. }
  301. func (r *Remote) cacheSet(tag string, id int) {
  302. r.mu.Lock()
  303. defer r.mu.Unlock()
  304. r.remoteIDByTag[tag] = id
  305. }
  306. func (r *Remote) cacheDel(tag string) {
  307. r.mu.Lock()
  308. defer r.mu.Unlock()
  309. delete(r.remoteIDByTag, tag)
  310. delete(r.pushedFP, tag)
  311. }
  312. func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) {
  313. if err := r.refreshRemoteIDs(ctx); err != nil {
  314. return nil, err
  315. }
  316. r.mu.RLock()
  317. defer r.mu.RUnlock()
  318. tags := make([]string, 0, len(r.remoteIDByTag))
  319. for tag := range r.remoteIDByTag {
  320. tags = append(tags, tag)
  321. }
  322. return tags, nil
  323. }
  324. func (r *Remote) ListInboundOptions(ctx context.Context) ([]RemoteInboundOption, error) {
  325. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  326. if err != nil {
  327. return nil, err
  328. }
  329. var list []RemoteInboundOption
  330. if err := json.Unmarshal(env.Obj, &list); err != nil {
  331. return nil, fmt.Errorf("decode inbound list: %w", err)
  332. }
  333. return list, nil
  334. }
  335. func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
  336. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  337. if err != nil {
  338. return err
  339. }
  340. var list []struct {
  341. Id int `json:"id"`
  342. Tag string `json:"tag"`
  343. }
  344. if err := json.Unmarshal(env.Obj, &list); err != nil {
  345. return fmt.Errorf("decode inbound list: %w", err)
  346. }
  347. next := make(map[string]int, len(list))
  348. for _, ib := range list {
  349. if ib.Tag == "" {
  350. continue
  351. }
  352. next[ib.Tag] = ib.Id
  353. }
  354. r.mu.Lock()
  355. r.remoteIDByTag = next
  356. r.mu.Unlock()
  357. return nil
  358. }
  359. func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error {
  360. payload := wireInbound(ib, r.node.Id)
  361. env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload)
  362. if err != nil {
  363. return err
  364. }
  365. var created struct {
  366. Id int `json:"id"`
  367. Tag string `json:"tag"`
  368. }
  369. if len(env.Obj) > 0 {
  370. if err := json.Unmarshal(env.Obj, &created); err == nil && created.Id > 0 && created.Tag != "" {
  371. r.cacheSet(created.Tag, created.Id)
  372. }
  373. }
  374. r.recordPushedInbound(ib)
  375. return nil
  376. }
  377. func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error {
  378. id, err := r.resolveRemoteID(ctx, ib.Tag)
  379. if err != nil {
  380. logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name)
  381. return nil
  382. }
  383. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil {
  384. return err
  385. }
  386. r.cacheDel(ib.Tag)
  387. return nil
  388. }
  389. func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
  390. id, err := r.resolveRemoteID(ctx, oldIb.Tag)
  391. if err != nil {
  392. return r.AddInbound(ctx, newIb)
  393. }
  394. payload := wireInbound(newIb, r.node.Id)
  395. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil {
  396. return err
  397. }
  398. if oldIb.Tag != newIb.Tag {
  399. r.cacheDel(oldIb.Tag)
  400. }
  401. r.cacheSet(newIb.Tag, id)
  402. r.recordPushedInbound(newIb)
  403. return nil
  404. }
  405. // ReconcileInbound pushes ib only when its wire payload differs from the last
  406. // successful push, or when the node no longer reports the tag (existsOnNode
  407. // false) — a node that dropped/restarted must still be re-seeded. Returns
  408. // whether a push actually happened. This turns a full-fleet reconcile from "send
  409. // every inbound's full settings" into "send only what changed".
  410. func (r *Remote) ReconcileInbound(ctx context.Context, ib *model.Inbound, existsOnNode bool) (bool, error) {
  411. fp := wireFingerprint(wireInbound(ib, r.node.Id))
  412. if existsOnNode {
  413. r.mu.RLock()
  414. prev, ok := r.pushedFP[ib.Tag]
  415. r.mu.RUnlock()
  416. if ok && prev == fp {
  417. return false, nil
  418. }
  419. }
  420. if err := r.UpdateInbound(ctx, ib, ib); err != nil {
  421. return false, err
  422. }
  423. return true, nil
  424. }
  425. // recordPushedInbound stamps the fingerprint after a full-payload push — the
  426. // only operation that proves the node holds the entire wire payload.
  427. func (r *Remote) recordPushedInbound(ib *model.Inbound) {
  428. fp := wireFingerprint(wireInbound(ib, r.node.Id))
  429. r.mu.Lock()
  430. r.pushedFP[ib.Tag] = fp
  431. r.mu.Unlock()
  432. }
  433. // RecordAdoptedInbound stamps the fingerprint when the master adopts the
  434. // node's own settings serialization into its DB — direct knowledge of the
  435. // exact payload the node holds.
  436. func (r *Remote) RecordAdoptedInbound(ib *model.Inbound) {
  437. r.recordPushedInbound(ib)
  438. }
  439. // AdvancePushedInbound moves the reconcile-skip fingerprint from an inbound's
  440. // pre-edit payload to its post-edit payload once every per-client push for the
  441. // edit succeeded. It advances only when the recorded fingerprint proves the
  442. // node held the exact pre-edit state; otherwise the stale fingerprint stays and
  443. // the next reconcile re-sends the full inbound.
  444. func (r *Remote) AdvancePushedInbound(prevIb, ib *model.Inbound) {
  445. prevFP := wireFingerprint(wireInbound(prevIb, r.node.Id))
  446. nextFP := wireFingerprint(wireInbound(ib, r.node.Id))
  447. r.mu.Lock()
  448. if r.pushedFP[ib.Tag] == prevFP {
  449. r.pushedFP[ib.Tag] = nextFP
  450. }
  451. r.mu.Unlock()
  452. }
  453. // wireFingerprint hashes a wire payload so an unchanged inbound is cheap to detect.
  454. func wireFingerprint(v url.Values) string {
  455. sum := sha256.Sum256([]byte(v.Encode()))
  456. return hex.EncodeToString(sum[:])
  457. }
  458. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
  459. return r.UpdateInbound(ctx, ib, ib)
  460. }
  461. func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) error {
  462. return r.UpdateInbound(ctx, ib, ib)
  463. }
  464. func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error {
  465. id, err := r.resolveRemoteID(ctx, ib.Tag)
  466. if err != nil {
  467. return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err)
  468. }
  469. payload := map[string]any{
  470. "client": client,
  471. "inboundIds": []int{id},
  472. }
  473. if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil {
  474. return err
  475. }
  476. return nil
  477. }
  478. func (r *Remote) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error {
  479. if email == "" {
  480. return nil
  481. }
  482. id, err := r.resolveRemoteID(ctx, ib.Tag)
  483. if err != nil {
  484. // Can't confirm the delete reached the node — surface it so the caller
  485. // marks the node dirty and a reconcile converges, instead of silently
  486. // dropping the delete and letting the next snapshot resurrect the client.
  487. return fmt.Errorf("remote DeleteUser: resolve tag %q: %w", ib.Tag, err)
  488. }
  489. body := map[string]any{"inboundIds": []int{id}}
  490. _, err = r.do(ctx, http.MethodPost,
  491. "panel/api/clients/"+url.PathEscape(email)+"/detach", body)
  492. if err == nil {
  493. return nil
  494. }
  495. var apiErr *remoteAPIError
  496. if errors.As(err, &apiErr) && strings.Contains(strings.ToLower(apiErr.msg), "not found") {
  497. return nil
  498. }
  499. return err
  500. }
  501. func (r *Remote) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error {
  502. if oldEmail == "" {
  503. oldEmail = payload.Email
  504. }
  505. id, err := r.resolveRemoteID(ctx, ib.Tag)
  506. if err != nil {
  507. return err
  508. }
  509. path := "panel/api/clients/update/" + url.PathEscape(oldEmail) +
  510. "?inboundIds=" + strconv.Itoa(id)
  511. if _, err := r.do(ctx, http.MethodPost, path, payload); err != nil {
  512. return err
  513. }
  514. return nil
  515. }
  516. func (r *Remote) RestartXray(ctx context.Context) error {
  517. _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil)
  518. return err
  519. }
  520. // UpdatePanel asks the node to run its own official self-updater (update.sh)
  521. // and restart onto the latest release. The node returns as soon as the job is
  522. // launched; the new version surfaces on the next heartbeat. When dev is true the
  523. // node is moved to the rolling dev channel instead of the latest stable release.
  524. func (r *Remote) UpdatePanel(ctx context.Context, dev bool) error {
  525. var body any
  526. if dev {
  527. body = url.Values{"dev": {"true"}}
  528. }
  529. _, err := r.do(ctx, http.MethodPost, "panel/api/server/updatePanel", body)
  530. return err
  531. }
  532. // WebCertFiles holds a node's own web TLS certificate and key file paths.
  533. type WebCertFiles struct {
  534. WebCertFile string `json:"webCertFile"`
  535. WebKeyFile string `json:"webKeyFile"`
  536. }
  537. // GetWebCertFiles fetches the node's own web TLS certificate/key file paths so
  538. // the central panel can offer them as the "Set Cert from Panel" default for a
  539. // node-assigned inbound — those paths exist on the node, the central panel's
  540. // don't. See issue #4854.
  541. func (r *Remote) GetWebCertFiles(ctx context.Context) (*WebCertFiles, error) {
  542. env, err := r.do(ctx, http.MethodGet, "panel/api/server/getWebCertFiles", nil)
  543. if err != nil {
  544. return nil, err
  545. }
  546. var files WebCertFiles
  547. if err := json.Unmarshal(env.Obj, &files); err != nil {
  548. return nil, fmt.Errorf("decode web cert files: %w", err)
  549. }
  550. return &files, nil
  551. }
  552. // GetDescendants fetches the node's read-only summaries of the nodes IT
  553. // manages, so this panel can surface them as transitive sub-nodes in a chained
  554. // topology (#4983). Best-effort: an old-build node without the endpoint returns
  555. // an error the caller ignores.
  556. func (r *Remote) GetDescendants(ctx context.Context) ([]model.NodeSummary, error) {
  557. env, err := r.do(ctx, http.MethodGet, "panel/api/server/descendants", nil)
  558. if err != nil {
  559. return nil, err
  560. }
  561. var out []model.NodeSummary
  562. if len(env.Obj) > 0 {
  563. if err := json.Unmarshal(env.Obj, &out); err != nil {
  564. return nil, fmt.Errorf("decode descendants: %w", err)
  565. }
  566. }
  567. return out, nil
  568. }
  569. func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error {
  570. _, err := r.do(ctx, http.MethodPost,
  571. "panel/api/clients/resetTraffic/"+url.PathEscape(email), nil)
  572. return err
  573. }
  574. func (r *Remote) ResetAllTraffics(ctx context.Context) error {
  575. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/resetAllTraffics", nil)
  576. return err
  577. }
  578. func (r *Remote) ResetInboundTraffic(ctx context.Context, ib *model.Inbound) error {
  579. _, err := r.do(ctx, http.MethodPost, fmt.Sprintf("panel/api/inbounds/%d/resetTraffic", ib.Id), nil)
  580. return err
  581. }
  582. type TrafficSnapshot struct {
  583. Inbounds []*model.Inbound
  584. OnlineEmails []string
  585. // OnlineTree is the node's GUID-keyed online subtree (its own clients under
  586. // its panelGuid plus every descendant under theirs). Preferred over the flat
  587. // OnlineEmails so the master can attribute deeply nested clients to the real
  588. // node across a chain (#4983). Empty when the node is an old build without
  589. // the per-GUID endpoint — OnlineEmails is the fallback then.
  590. OnlineTree map[string][]string
  591. LastOnlineMap map[string]int64
  592. }
  593. func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) {
  594. snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}}
  595. envList, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  596. if err != nil {
  597. return nil, err
  598. }
  599. if err := json.Unmarshal(envList.Obj, &snap.Inbounds); err != nil {
  600. return nil, fmt.Errorf("decode inbound list: %w", err)
  601. }
  602. // Prefer the GUID-keyed subtree; fall back to the flat list only when the
  603. // node is an old build without the per-GUID endpoint (#4983).
  604. envTree, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlinesByGuid", nil)
  605. if err == nil && len(envTree.Obj) > 0 {
  606. _ = json.Unmarshal(envTree.Obj, &snap.OnlineTree)
  607. }
  608. if len(snap.OnlineTree) == 0 {
  609. envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil)
  610. if err != nil {
  611. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
  612. } else if len(envOnlines.Obj) > 0 {
  613. _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
  614. }
  615. }
  616. envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil)
  617. if err != nil {
  618. logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
  619. } else if len(envLastOnline.Obj) > 0 {
  620. _ = json.Unmarshal(envLastOnline.Obj, &snap.LastOnlineMap)
  621. }
  622. return snap, nil
  623. }
  624. // PushGlobalClientTraffics sends this panel's aggregated per-client usage to
  625. // the node, tagged with this panel's GUID so the node keeps one row per
  626. // pushing master. Display/enforcement input on the node only — the node never
  627. // folds these into the counters it reports back, so this panel's (and any
  628. // other master's) delta accounting over the node snapshot stays intact.
  629. func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error {
  630. payload := map[string]any{
  631. "masterGuid": masterGuid,
  632. "traffics": traffics,
  633. }
  634. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload)
  635. return err
  636. }
  637. func wireInbound(ib *model.Inbound, remoteNodeID int) url.Values {
  638. v := url.Values{}
  639. v.Set("total", strconv.FormatInt(ib.Total, 10))
  640. v.Set("remark", ib.Remark)
  641. v.Set("subSortIndex", strconv.Itoa(ib.SubSortIndex))
  642. v.Set("enable", strconv.FormatBool(ib.Enable))
  643. v.Set("expiryTime", strconv.FormatInt(ib.ExpiryTime, 10))
  644. v.Set("listen", ib.Listen)
  645. v.Set("port", strconv.Itoa(ib.Port))
  646. v.Set("protocol", string(ib.Protocol))
  647. v.Set("settings", ib.Settings)
  648. v.Set("streamSettings", sanitizeStreamSettingsForRemote(ib.StreamSettings))
  649. tag := ib.Tag
  650. if remoteNodeID > 0 {
  651. tag = stripNodeInboundTagPrefix(remoteNodeID, tag)
  652. }
  653. v.Set("tag", tag)
  654. v.Set("sniffing", ib.Sniffing)
  655. shareAddrStrategy := strings.TrimSpace(ib.ShareAddrStrategy)
  656. switch shareAddrStrategy {
  657. case "listen", "custom":
  658. default:
  659. shareAddrStrategy = "node"
  660. }
  661. v.Set("shareAddrStrategy", shareAddrStrategy)
  662. v.Set("shareAddr", ib.ShareAddr)
  663. if ib.TrafficReset != "" {
  664. v.Set("trafficReset", ib.TrafficReset)
  665. }
  666. return v
  667. }
  668. // sanitizeStreamSettingsForRemote strips file-based TLS certificate paths
  669. // from the StreamSettings before sending to a remote node, but ONLY when
  670. // inline certificate content (certificate / key) is also present in the same
  671. // entry. In that case the file paths are redundant and stripping them avoids
  672. // confusion when the central panel's local paths don't exist on the remote.
  673. //
  674. // When a certificate entry contains ONLY file paths (no inline content) the
  675. // paths are left untouched: the user explicitly entered paths that exist on
  676. // the remote node's filesystem, and removing them would leave Xray with TLS
  677. // configured but no certificate, causing Xray to crash on the remote node.
  678. func sanitizeStreamSettingsForRemote(streamSettings string) string {
  679. if streamSettings == "" {
  680. return streamSettings
  681. }
  682. var stream map[string]any
  683. if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
  684. return streamSettings
  685. }
  686. tlsSettings, ok := stream["tlsSettings"].(map[string]any)
  687. if !ok {
  688. return streamSettings
  689. }
  690. certificates, ok := tlsSettings["certificates"].([]any)
  691. if !ok {
  692. return streamSettings
  693. }
  694. changed := false
  695. for _, cert := range certificates {
  696. c, ok := cert.(map[string]any)
  697. if !ok {
  698. continue
  699. }
  700. // Only strip file paths when inline content is present so that the
  701. // remote Xray still has a valid certificate to use.
  702. hasCertFile := c["certificateFile"] != nil && c["certificateFile"] != ""
  703. hasKeyFile := c["keyFile"] != nil && c["keyFile"] != ""
  704. hasCertInline := isNonEmptySlice(c["certificate"])
  705. hasKeyInline := isNonEmptySlice(c["key"])
  706. if hasCertFile && hasCertInline {
  707. delete(c, "certificateFile")
  708. changed = true
  709. }
  710. if hasKeyFile && hasKeyInline {
  711. delete(c, "keyFile")
  712. changed = true
  713. }
  714. }
  715. if !changed {
  716. return streamSettings
  717. }
  718. out, err := json.Marshal(stream)
  719. if err != nil {
  720. return streamSettings
  721. }
  722. return string(out)
  723. }
  724. // isNonEmptySlice reports whether v is a non-nil, non-empty JSON array value.
  725. func isNonEmptySlice(v any) bool {
  726. s, ok := v.([]any)
  727. return ok && len(s) > 0
  728. }
  729. func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) {
  730. env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil)
  731. if err != nil {
  732. return nil, err
  733. }
  734. var ips []model.InboundClientIps
  735. if len(env.Obj) > 0 {
  736. if err := json.Unmarshal(env.Obj, &ips); err != nil {
  737. return nil, fmt.Errorf("decode client ips: %w", err)
  738. }
  739. }
  740. return ips, nil
  741. }
  742. func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error {
  743. _, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
  744. return err
  745. }
  746. // FetchClientIpsByGuid pulls the node's per-node IP attribution subtree
  747. // (guid -> email -> observed IPs). Unlike FetchAllClientIps (the flat union the
  748. // master also pushes back), this preserves which physical node each IP is on.
  749. // Returns an empty map for older nodes that lack the endpoint.
  750. func (r *Remote) FetchClientIpsByGuid(ctx context.Context) (map[string]map[string][]model.ClientIpEntry, error) {
  751. env, err := r.do(ctx, http.MethodPost, "panel/api/clients/clientIpsByGuid", nil)
  752. if err != nil {
  753. return nil, err
  754. }
  755. out := map[string]map[string][]model.ClientIpEntry{}
  756. if len(env.Obj) > 0 {
  757. if err := json.Unmarshal(env.Obj, &out); err != nil {
  758. return nil, fmt.Errorf("decode client ips by guid: %w", err)
  759. }
  760. }
  761. return out, nil
  762. }