1
0

remote.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. package runtime
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  17. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
  19. "github.com/mhsanaei/3x-ui/v3/internal/util/wirecodec"
  20. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  21. )
  22. const remoteHTTPTimeout = 10 * time.Second
  23. // zstdMinBodyBytes is the smallest body worth compressing; below it the framing
  24. // overhead can outweigh the savings.
  25. const zstdMinBodyBytes = 1024
  26. // maxRemoteResponseBytes caps a single node RPC's response body. It bounds the
  27. // wire/decompressed size of one response — the real guard against a broken or
  28. // hostile node streaming an unbounded body. It is NOT a process-wide memory
  29. // bound: concurrent RPCs and the decoded JSON can each exceed it, so
  30. // endpoint-specific caps and a concurrency budget remain follow-ups. Node
  31. // responses (traffic snapshots, client-IP lists, inbound options) are JSON and
  32. // stay well under it.
  33. const maxRemoteResponseBytes = 64 << 20 // 64 MiB
  34. // errBodyDiagBytes bounds how much of a non-OK error body we read for a
  35. // diagnostic snippet (and to let small-error connections be reused) without
  36. // buffering a potentially huge or hostile error payload.
  37. const errBodyDiagBytes = 8 << 10 // 8 KiB
  38. // errRemoteResponseTooLarge is returned when a node response exceeds the cap.
  39. var errRemoteResponseTooLarge = errors.New("remote response exceeds size limit")
  40. // readCappedBody reads all of r but rejects bodies larger than limit, returning
  41. // errRemoteResponseTooLarge. It reads at most limit+1 bytes so a body of exactly
  42. // limit is accepted and the first oversize byte is detected without buffering
  43. // more.
  44. func readCappedBody(r io.Reader, limit int64) ([]byte, error) {
  45. raw, err := io.ReadAll(io.LimitReader(r, limit+1))
  46. if err != nil {
  47. return nil, err
  48. }
  49. if int64(len(raw)) > limit {
  50. return nil, errRemoteResponseTooLarge
  51. }
  52. return raw, nil
  53. }
  54. type envelope struct {
  55. Success bool `json:"success"`
  56. Msg string `json:"msg"`
  57. Obj json.RawMessage `json:"obj"`
  58. }
  59. type Remote struct {
  60. node *model.Node
  61. mu sync.RWMutex
  62. remoteIDByTag map[string]int
  63. // supportsZstd is learned from the node's X-3x-Node-Caps response header; once
  64. // seen, config pushes to this node are zstd-compressed. Old nodes never set
  65. // it, so they keep receiving plain bodies (mixed-version safe).
  66. supportsZstd bool
  67. // Per-node client honoring the TLS verify mode, built once and reused; a
  68. // node config change drops the cached Remote so the next one rebuilds it.
  69. clientOnce sync.Once
  70. client *http.Client
  71. clientErr error
  72. egressResolver NodeEgressResolver
  73. }
  74. type RemoteInboundOption struct {
  75. Tag string `json:"tag"`
  76. Remark string `json:"remark"`
  77. Protocol model.Protocol `json:"protocol"`
  78. Port int `json:"port"`
  79. }
  80. func NewRemote(n *model.Node, r NodeEgressResolver) *Remote {
  81. return &Remote{
  82. node: n,
  83. remoteIDByTag: make(map[string]int),
  84. egressResolver: r,
  85. }
  86. }
  87. func (r *Remote) Name() string { return "node:" + r.node.Name }
  88. func (r *Remote) nodeSupportsZstd() bool {
  89. r.mu.RLock()
  90. defer r.mu.RUnlock()
  91. return r.supportsZstd
  92. }
  93. // recordCaps learns the node's capabilities from a response header so later
  94. // pushes can use the negotiated envelope.
  95. func (r *Remote) recordCaps(h http.Header) {
  96. if !strings.Contains(h.Get(wirecodec.CapsHeader), wirecodec.CapZstd) {
  97. return
  98. }
  99. r.mu.Lock()
  100. r.supportsZstd = true
  101. r.mu.Unlock()
  102. }
  103. // httpClient lazily builds and caches the per-node client honoring the TLS
  104. // verify mode, so Remote ops don't fall back to system CA on skip/pin (#5264).
  105. func (r *Remote) httpClient() (*http.Client, error) {
  106. r.clientOnce.Do(func() {
  107. proxyURL := ""
  108. if r.node.OutboundTag != "" && r.egressResolver != nil {
  109. proxyURL = r.egressResolver.NodeEgressProxyURL(r.node.Id)
  110. }
  111. r.client, r.clientErr = HTTPClientForNode(r.node, proxyURL)
  112. })
  113. return r.client, r.clientErr
  114. }
  115. func (r *Remote) baseURL() (string, error) {
  116. addr, err := netsafe.NormalizeHost(r.node.Address)
  117. if err != nil {
  118. return "", err
  119. }
  120. scheme := r.node.Scheme
  121. if scheme != "http" && scheme != "https" {
  122. scheme = "https"
  123. }
  124. if r.node.Port <= 0 || r.node.Port > 65535 {
  125. return "", fmt.Errorf("invalid node port %d", r.node.Port)
  126. }
  127. bp := r.node.BasePath
  128. if !strings.HasSuffix(bp, "/") {
  129. bp += "/"
  130. }
  131. u := &url.URL{
  132. Scheme: scheme,
  133. Host: net.JoinHostPort(addr, strconv.Itoa(r.node.Port)),
  134. Path: bp,
  135. }
  136. return u.String(), nil
  137. }
  138. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) {
  139. // mtls nodes authenticate via the client certificate, so a bearer token is
  140. // optional for them; every other mode still requires one.
  141. if r.node.ApiToken == "" && r.node.TlsVerifyMode != "mtls" {
  142. return nil, errors.New("node has no API token configured")
  143. }
  144. base, err := r.baseURL()
  145. if err != nil {
  146. return nil, err
  147. }
  148. target := base + strings.TrimPrefix(path, "/")
  149. var (
  150. bodyBytes []byte
  151. contentType string
  152. )
  153. switch b := body.(type) {
  154. case nil:
  155. case url.Values:
  156. bodyBytes = []byte(b.Encode())
  157. contentType = "application/x-www-form-urlencoded"
  158. default:
  159. buf, jerr := json.Marshal(b)
  160. if jerr != nil {
  161. return nil, fmt.Errorf("marshal body: %w", jerr)
  162. }
  163. bodyBytes = buf
  164. contentType = "application/json"
  165. }
  166. // Attach the integrity hash of the uncompressed body unconditionally (a new
  167. // node verifies it, an old one ignores it), and zstd-compress only when the
  168. // node advertised support and the body is worth it.
  169. var (
  170. reqBody io.Reader
  171. hashHex string
  172. zstdEncoded bool
  173. )
  174. if bodyBytes != nil {
  175. hashHex = wirecodec.Sha256Hex(bodyBytes)
  176. if len(bodyBytes) >= zstdMinBodyBytes && r.nodeSupportsZstd() {
  177. bodyBytes = wirecodec.Compress(bodyBytes)
  178. zstdEncoded = true
  179. }
  180. reqBody = bytes.NewReader(bodyBytes)
  181. }
  182. cctx, cancel := context.WithTimeout(netsafe.ContextWithAllowPrivate(ctx, r.node.AllowPrivateAddress), remoteHTTPTimeout)
  183. defer cancel()
  184. req, err := http.NewRequestWithContext(cctx, method, target, reqBody)
  185. if err != nil {
  186. return nil, err
  187. }
  188. if r.node.ApiToken != "" {
  189. req.Header.Set("Authorization", "Bearer "+r.node.ApiToken)
  190. }
  191. req.Header.Set("Accept", "application/json")
  192. if contentType != "" {
  193. req.Header.Set("Content-Type", contentType)
  194. }
  195. if hashHex != "" {
  196. req.Header.Set(wirecodec.HashHeader, hashHex)
  197. }
  198. if zstdEncoded {
  199. req.Header.Set("Content-Encoding", wirecodec.EncodingZstd)
  200. }
  201. client, err := r.httpClient()
  202. if err != nil {
  203. return nil, err
  204. }
  205. resp, err := client.Do(req)
  206. if err != nil {
  207. return nil, fmt.Errorf("%s %s: %w", method, path, err)
  208. }
  209. defer resp.Body.Close()
  210. r.recordCaps(resp.Header)
  211. // Validate status before reading a success payload: a non-OK response's
  212. // body is never used beyond a short diagnostic, so don't let a node force us
  213. // to buffer a large body just to return an HTTP error.
  214. if resp.StatusCode != http.StatusOK {
  215. snippet, _ := io.ReadAll(io.LimitReader(resp.Body, errBodyDiagBytes))
  216. if msg := bytes.TrimSpace(snippet); len(msg) > 0 {
  217. // %q quotes/escapes the untrusted node body so control characters or
  218. // newlines in it can't garble or inject into the error/log output.
  219. return nil, fmt.Errorf("%s %s: HTTP %d: %q", method, path, resp.StatusCode, msg)
  220. }
  221. return nil, fmt.Errorf("%s %s: HTTP %d", method, path, resp.StatusCode)
  222. }
  223. // Fast-fail on an honestly-declared oversize body; the LimitReader below is
  224. // the real guard since Content-Length is untrusted, may be absent, or is -1
  225. // under transparent decompression.
  226. if resp.ContentLength > maxRemoteResponseBytes {
  227. return nil, fmt.Errorf("%s %s: %w (content-length %d, cap %d)", method, path, errRemoteResponseTooLarge, resp.ContentLength, maxRemoteResponseBytes)
  228. }
  229. raw, err := readCappedBody(resp.Body, maxRemoteResponseBytes)
  230. if err != nil {
  231. if errors.Is(err, errRemoteResponseTooLarge) {
  232. return nil, fmt.Errorf("%s %s: %w (cap %d bytes)", method, path, err, maxRemoteResponseBytes)
  233. }
  234. return nil, fmt.Errorf("read body: %w", err)
  235. }
  236. var env envelope
  237. if err := json.Unmarshal(raw, &env); err != nil {
  238. return nil, fmt.Errorf("decode envelope: %w", err)
  239. }
  240. if !env.Success {
  241. return &env, fmt.Errorf("remote: %s", env.Msg)
  242. }
  243. return &env, nil
  244. }
  245. func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) {
  246. if id, ok := r.cacheGetTag(tag); ok {
  247. return id, nil
  248. }
  249. if err := r.refreshRemoteIDs(ctx); err != nil {
  250. return 0, err
  251. }
  252. if id, ok := r.cacheGetTag(tag); ok {
  253. return id, nil
  254. }
  255. return 0, fmt.Errorf("remote inbound with tag %q not found on node %s", tag, r.node.Name)
  256. }
  257. // cacheGetTag looks up a remote inbound id by tag, tolerating an n<id>- prefix
  258. // that lives on only one of the two panels: the node may carry the bare tag
  259. // while the central panel stores the prefixed form, or vice versa.
  260. func (r *Remote) cacheGetTag(tag string) (int, bool) {
  261. if id, ok := r.cacheGet(tag); ok {
  262. return id, true
  263. }
  264. prefix := fmt.Sprintf("n%d-", r.node.Id)
  265. if stripped, found := strings.CutPrefix(tag, prefix); found {
  266. return r.cacheGet(stripped)
  267. }
  268. return r.cacheGet(prefix + tag)
  269. }
  270. func (r *Remote) cacheGet(tag string) (int, bool) {
  271. r.mu.RLock()
  272. defer r.mu.RUnlock()
  273. id, ok := r.remoteIDByTag[tag]
  274. return id, ok
  275. }
  276. func (r *Remote) cacheSet(tag string, id int) {
  277. r.mu.Lock()
  278. defer r.mu.Unlock()
  279. r.remoteIDByTag[tag] = id
  280. }
  281. func (r *Remote) cacheDel(tag string) {
  282. r.mu.Lock()
  283. defer r.mu.Unlock()
  284. delete(r.remoteIDByTag, tag)
  285. }
  286. func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) {
  287. if err := r.refreshRemoteIDs(ctx); err != nil {
  288. return nil, err
  289. }
  290. r.mu.RLock()
  291. defer r.mu.RUnlock()
  292. tags := make([]string, 0, len(r.remoteIDByTag))
  293. for tag := range r.remoteIDByTag {
  294. tags = append(tags, tag)
  295. }
  296. return tags, nil
  297. }
  298. func (r *Remote) ListInboundOptions(ctx context.Context) ([]RemoteInboundOption, error) {
  299. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  300. if err != nil {
  301. return nil, err
  302. }
  303. var list []RemoteInboundOption
  304. if err := json.Unmarshal(env.Obj, &list); err != nil {
  305. return nil, fmt.Errorf("decode inbound list: %w", err)
  306. }
  307. return list, nil
  308. }
  309. func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
  310. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  311. if err != nil {
  312. return err
  313. }
  314. var list []struct {
  315. Id int `json:"id"`
  316. Tag string `json:"tag"`
  317. }
  318. if err := json.Unmarshal(env.Obj, &list); err != nil {
  319. return fmt.Errorf("decode inbound list: %w", err)
  320. }
  321. next := make(map[string]int, len(list))
  322. for _, ib := range list {
  323. if ib.Tag == "" {
  324. continue
  325. }
  326. next[ib.Tag] = ib.Id
  327. }
  328. r.mu.Lock()
  329. r.remoteIDByTag = next
  330. r.mu.Unlock()
  331. return nil
  332. }
  333. func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error {
  334. payload := wireInbound(ib)
  335. env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload)
  336. if err != nil {
  337. return err
  338. }
  339. var created struct {
  340. Id int `json:"id"`
  341. Tag string `json:"tag"`
  342. }
  343. if len(env.Obj) > 0 {
  344. if err := json.Unmarshal(env.Obj, &created); err == nil && created.Id > 0 && created.Tag != "" {
  345. r.cacheSet(created.Tag, created.Id)
  346. }
  347. }
  348. return nil
  349. }
  350. func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error {
  351. id, err := r.resolveRemoteID(ctx, ib.Tag)
  352. if err != nil {
  353. logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name)
  354. return nil
  355. }
  356. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil {
  357. return err
  358. }
  359. r.cacheDel(ib.Tag)
  360. return nil
  361. }
  362. func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
  363. id, err := r.resolveRemoteID(ctx, oldIb.Tag)
  364. if err != nil {
  365. return r.AddInbound(ctx, newIb)
  366. }
  367. payload := wireInbound(newIb)
  368. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil {
  369. return err
  370. }
  371. if oldIb.Tag != newIb.Tag {
  372. r.cacheDel(oldIb.Tag)
  373. }
  374. r.cacheSet(newIb.Tag, id)
  375. return nil
  376. }
  377. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
  378. return r.UpdateInbound(ctx, ib, ib)
  379. }
  380. func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) error {
  381. return r.UpdateInbound(ctx, ib, ib)
  382. }
  383. func (r *Remote) AddClient(ctx context.Context, ib *model.Inbound, client model.Client) error {
  384. id, err := r.resolveRemoteID(ctx, ib.Tag)
  385. if err != nil {
  386. return fmt.Errorf("remote AddClient: resolve tag %q: %w", ib.Tag, err)
  387. }
  388. payload := map[string]any{
  389. "client": client,
  390. "inboundIds": []int{id},
  391. }
  392. if _, err := r.do(ctx, http.MethodPost, "panel/api/clients/add", payload); err != nil {
  393. return err
  394. }
  395. return nil
  396. }
  397. func (r *Remote) DeleteUser(ctx context.Context, ib *model.Inbound, email string) error {
  398. if email == "" {
  399. return nil
  400. }
  401. id, err := r.resolveRemoteID(ctx, ib.Tag)
  402. if err != nil {
  403. // Can't confirm the delete reached the node — surface it so the caller
  404. // marks the node dirty and a reconcile converges, instead of silently
  405. // dropping the delete and letting the next snapshot resurrect the client.
  406. return fmt.Errorf("remote DeleteUser: resolve tag %q: %w", ib.Tag, err)
  407. }
  408. body := map[string]any{"inboundIds": []int{id}}
  409. _, err = r.do(ctx, http.MethodPost,
  410. "panel/api/clients/"+url.PathEscape(email)+"/detach", body)
  411. if err == nil {
  412. return nil
  413. }
  414. if strings.Contains(strings.ToLower(err.Error()), "not found") {
  415. return nil
  416. }
  417. return err
  418. }
  419. func (r *Remote) UpdateUser(ctx context.Context, ib *model.Inbound, oldEmail string, payload model.Client) error {
  420. if oldEmail == "" {
  421. oldEmail = payload.Email
  422. }
  423. id, err := r.resolveRemoteID(ctx, ib.Tag)
  424. if err != nil {
  425. return err
  426. }
  427. path := "panel/api/clients/update/" + url.PathEscape(oldEmail) +
  428. "?inboundIds=" + strconv.Itoa(id)
  429. if _, err := r.do(ctx, http.MethodPost, path, payload); err != nil {
  430. return err
  431. }
  432. return nil
  433. }
  434. func (r *Remote) RestartXray(ctx context.Context) error {
  435. _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil)
  436. return err
  437. }
  438. // UpdatePanel asks the node to run its own official self-updater (update.sh)
  439. // and restart onto the latest release. The node returns as soon as the job is
  440. // launched; the new version surfaces on the next heartbeat.
  441. func (r *Remote) UpdatePanel(ctx context.Context) error {
  442. _, err := r.do(ctx, http.MethodPost, "panel/api/server/updatePanel", nil)
  443. return err
  444. }
  445. // WebCertFiles holds a node's own web TLS certificate and key file paths.
  446. type WebCertFiles struct {
  447. WebCertFile string `json:"webCertFile"`
  448. WebKeyFile string `json:"webKeyFile"`
  449. }
  450. // GetWebCertFiles fetches the node's own web TLS certificate/key file paths so
  451. // the central panel can offer them as the "Set Cert from Panel" default for a
  452. // node-assigned inbound — those paths exist on the node, the central panel's
  453. // don't. See issue #4854.
  454. func (r *Remote) GetWebCertFiles(ctx context.Context) (*WebCertFiles, error) {
  455. env, err := r.do(ctx, http.MethodGet, "panel/api/server/getWebCertFiles", nil)
  456. if err != nil {
  457. return nil, err
  458. }
  459. var files WebCertFiles
  460. if err := json.Unmarshal(env.Obj, &files); err != nil {
  461. return nil, fmt.Errorf("decode web cert files: %w", err)
  462. }
  463. return &files, nil
  464. }
  465. // GetDescendants fetches the node's read-only summaries of the nodes IT
  466. // manages, so this panel can surface them as transitive sub-nodes in a chained
  467. // topology (#4983). Best-effort: an old-build node without the endpoint returns
  468. // an error the caller ignores.
  469. func (r *Remote) GetDescendants(ctx context.Context) ([]model.NodeSummary, error) {
  470. env, err := r.do(ctx, http.MethodGet, "panel/api/server/descendants", nil)
  471. if err != nil {
  472. return nil, err
  473. }
  474. var out []model.NodeSummary
  475. if len(env.Obj) > 0 {
  476. if err := json.Unmarshal(env.Obj, &out); err != nil {
  477. return nil, fmt.Errorf("decode descendants: %w", err)
  478. }
  479. }
  480. return out, nil
  481. }
  482. func (r *Remote) ResetClientTraffic(ctx context.Context, _ *model.Inbound, email string) error {
  483. _, err := r.do(ctx, http.MethodPost,
  484. "panel/api/clients/resetTraffic/"+url.PathEscape(email), nil)
  485. return err
  486. }
  487. func (r *Remote) ResetAllTraffics(ctx context.Context) error {
  488. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/resetAllTraffics", nil)
  489. return err
  490. }
  491. func (r *Remote) ResetInboundTraffic(ctx context.Context, ib *model.Inbound) error {
  492. _, err := r.do(ctx, http.MethodPost, fmt.Sprintf("panel/api/inbounds/%d/resetTraffic", ib.Id), nil)
  493. return err
  494. }
  495. type TrafficSnapshot struct {
  496. Inbounds []*model.Inbound
  497. OnlineEmails []string
  498. // OnlineTree is the node's GUID-keyed online subtree (its own clients under
  499. // its panelGuid plus every descendant under theirs). Preferred over the flat
  500. // OnlineEmails so the master can attribute deeply nested clients to the real
  501. // node across a chain (#4983). Empty when the node is an old build without
  502. // the per-GUID endpoint — OnlineEmails is the fallback then.
  503. OnlineTree map[string][]string
  504. LastOnlineMap map[string]int64
  505. }
  506. func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) {
  507. snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}}
  508. envList, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  509. if err != nil {
  510. return nil, err
  511. }
  512. if err := json.Unmarshal(envList.Obj, &snap.Inbounds); err != nil {
  513. return nil, fmt.Errorf("decode inbound list: %w", err)
  514. }
  515. // Prefer the GUID-keyed subtree; fall back to the flat list only when the
  516. // node is an old build without the per-GUID endpoint (#4983).
  517. envTree, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlinesByGuid", nil)
  518. if err == nil && len(envTree.Obj) > 0 {
  519. _ = json.Unmarshal(envTree.Obj, &snap.OnlineTree)
  520. }
  521. if len(snap.OnlineTree) == 0 {
  522. envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/clients/onlines", nil)
  523. if err != nil {
  524. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
  525. } else if len(envOnlines.Obj) > 0 {
  526. _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
  527. }
  528. }
  529. envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/clients/lastOnline", nil)
  530. if err != nil {
  531. logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
  532. } else if len(envLastOnline.Obj) > 0 {
  533. _ = json.Unmarshal(envLastOnline.Obj, &snap.LastOnlineMap)
  534. }
  535. return snap, nil
  536. }
  537. // PushGlobalClientTraffics sends this panel's aggregated per-client usage to
  538. // the node, tagged with this panel's GUID so the node keeps one row per
  539. // pushing master. Display/enforcement input on the node only — the node never
  540. // folds these into the counters it reports back, so this panel's (and any
  541. // other master's) delta accounting over the node snapshot stays intact.
  542. func (r *Remote) PushGlobalClientTraffics(ctx context.Context, masterGuid string, traffics []*xray.ClientTraffic) error {
  543. payload := map[string]any{
  544. "masterGuid": masterGuid,
  545. "traffics": traffics,
  546. }
  547. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/pushClientTraffics", payload)
  548. return err
  549. }
  550. func wireInbound(ib *model.Inbound) url.Values {
  551. v := url.Values{}
  552. v.Set("total", strconv.FormatInt(ib.Total, 10))
  553. v.Set("remark", ib.Remark)
  554. v.Set("subSortIndex", strconv.Itoa(ib.SubSortIndex))
  555. v.Set("enable", strconv.FormatBool(ib.Enable))
  556. v.Set("expiryTime", strconv.FormatInt(ib.ExpiryTime, 10))
  557. v.Set("listen", ib.Listen)
  558. v.Set("port", strconv.Itoa(ib.Port))
  559. v.Set("protocol", string(ib.Protocol))
  560. v.Set("settings", ib.Settings)
  561. v.Set("streamSettings", sanitizeStreamSettingsForRemote(ib.StreamSettings))
  562. v.Set("tag", ib.Tag)
  563. v.Set("sniffing", ib.Sniffing)
  564. shareAddrStrategy := strings.TrimSpace(ib.ShareAddrStrategy)
  565. switch shareAddrStrategy {
  566. case "listen", "custom":
  567. default:
  568. shareAddrStrategy = "node"
  569. }
  570. v.Set("shareAddrStrategy", shareAddrStrategy)
  571. v.Set("shareAddr", ib.ShareAddr)
  572. if ib.TrafficReset != "" {
  573. v.Set("trafficReset", ib.TrafficReset)
  574. }
  575. return v
  576. }
  577. // sanitizeStreamSettingsForRemote strips file-based TLS certificate paths
  578. // from the StreamSettings before sending to a remote node, but ONLY when
  579. // inline certificate content (certificate / key) is also present in the same
  580. // entry. In that case the file paths are redundant and stripping them avoids
  581. // confusion when the central panel's local paths don't exist on the remote.
  582. //
  583. // When a certificate entry contains ONLY file paths (no inline content) the
  584. // paths are left untouched: the user explicitly entered paths that exist on
  585. // the remote node's filesystem, and removing them would leave Xray with TLS
  586. // configured but no certificate, causing Xray to crash on the remote node.
  587. func sanitizeStreamSettingsForRemote(streamSettings string) string {
  588. if streamSettings == "" {
  589. return streamSettings
  590. }
  591. var stream map[string]any
  592. if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
  593. return streamSettings
  594. }
  595. tlsSettings, ok := stream["tlsSettings"].(map[string]any)
  596. if !ok {
  597. return streamSettings
  598. }
  599. certificates, ok := tlsSettings["certificates"].([]any)
  600. if !ok {
  601. return streamSettings
  602. }
  603. changed := false
  604. for _, cert := range certificates {
  605. c, ok := cert.(map[string]any)
  606. if !ok {
  607. continue
  608. }
  609. // Only strip file paths when inline content is present so that the
  610. // remote Xray still has a valid certificate to use.
  611. hasCertFile := c["certificateFile"] != nil && c["certificateFile"] != ""
  612. hasKeyFile := c["keyFile"] != nil && c["keyFile"] != ""
  613. hasCertInline := isNonEmptySlice(c["certificate"])
  614. hasKeyInline := isNonEmptySlice(c["key"])
  615. if hasCertFile && hasCertInline {
  616. delete(c, "certificateFile")
  617. changed = true
  618. }
  619. if hasKeyFile && hasKeyInline {
  620. delete(c, "keyFile")
  621. changed = true
  622. }
  623. }
  624. if !changed {
  625. return streamSettings
  626. }
  627. out, err := json.Marshal(stream)
  628. if err != nil {
  629. return streamSettings
  630. }
  631. return string(out)
  632. }
  633. // isNonEmptySlice reports whether v is a non-nil, non-empty JSON array value.
  634. func isNonEmptySlice(v any) bool {
  635. s, ok := v.([]any)
  636. return ok && len(s) > 0
  637. }
  638. func (r *Remote) FetchAllClientIps(ctx context.Context) ([]model.InboundClientIps, error) {
  639. env, err := r.do(ctx, http.MethodGet, "panel/api/server/clientIps", nil)
  640. if err != nil {
  641. return nil, err
  642. }
  643. var ips []model.InboundClientIps
  644. if len(env.Obj) > 0 {
  645. if err := json.Unmarshal(env.Obj, &ips); err != nil {
  646. return nil, fmt.Errorf("decode client ips: %w", err)
  647. }
  648. }
  649. return ips, nil
  650. }
  651. func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClientIps) error {
  652. _, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
  653. return err
  654. }
  655. // FetchClientIpsByGuid pulls the node's per-node IP attribution subtree
  656. // (guid -> email -> observed IPs). Unlike FetchAllClientIps (the flat union the
  657. // master also pushes back), this preserves which physical node each IP is on.
  658. // Returns an empty map for older nodes that lack the endpoint.
  659. func (r *Remote) FetchClientIpsByGuid(ctx context.Context) (map[string]map[string][]model.ClientIpEntry, error) {
  660. env, err := r.do(ctx, http.MethodPost, "panel/api/clients/clientIpsByGuid", nil)
  661. if err != nil {
  662. return nil, err
  663. }
  664. out := map[string]map[string][]model.ClientIpEntry{}
  665. if len(env.Obj) > 0 {
  666. if err := json.Unmarshal(env.Obj, &out); err != nil {
  667. return nil, fmt.Errorf("decode client ips by guid: %w", err)
  668. }
  669. }
  670. return out, nil
  671. }