remote.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package runtime
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/mhsanaei/3x-ui/v2/database/model"
  16. "github.com/mhsanaei/3x-ui/v2/logger"
  17. )
  18. // remoteHTTPTimeout bounds a single remote API call. Generous enough for
  19. // a slow node under load, short enough that a wedged remote doesn't
  20. // block the central panel's UI thread for the user.
  21. const remoteHTTPTimeout = 10 * time.Second
  22. // remoteHTTPClient is shared so repeated calls to the same node reuse
  23. // connections. Per-request timeouts are set via context.
  24. var remoteHTTPClient = &http.Client{
  25. Transport: &http.Transport{
  26. MaxIdleConns: 64,
  27. MaxIdleConnsPerHost: 4,
  28. IdleConnTimeout: 60 * time.Second,
  29. },
  30. }
  31. // envelope mirrors entity.Msg without depending on the entity package
  32. // (avoids a cycle on the controller side that pulls in this runtime).
  33. type envelope struct {
  34. Success bool `json:"success"`
  35. Msg string `json:"msg"`
  36. Obj json.RawMessage `json:"obj"`
  37. }
  38. // Remote implements Runtime by calling the existing /panel/api/inbounds/*
  39. // endpoints on a remote 3x-ui panel. The remote is authenticated as
  40. // the central panel via its per-node Bearer token.
  41. //
  42. // remoteIDByTag caches the {tag → remote inbound id} mapping so the
  43. // hot path (update/delete/addClient) avoids /list lookups. The cache
  44. // is in-memory and rebuilt lazily on first miss after a process restart
  45. // or InvalidateNode call.
  46. type Remote struct {
  47. node *model.Node
  48. mu sync.RWMutex
  49. remoteIDByTag map[string]int
  50. }
  51. // NewRemote constructs a Remote runtime for one node. The node pointer
  52. // is cached; callers that mutate node config (via NodeService.Update)
  53. // must drop the runtime through Manager.InvalidateNode so a fresh one
  54. // picks up the new fields.
  55. func NewRemote(n *model.Node) *Remote {
  56. return &Remote{
  57. node: n,
  58. remoteIDByTag: make(map[string]int),
  59. }
  60. }
  61. func (r *Remote) Name() string { return "node:" + r.node.Name }
  62. // baseURL composes the panel root for r.node, e.g. https://1.2.3.4:2053/
  63. // Always ends in '/' so callers can append "panel/api/...".
  64. func (r *Remote) baseURL() string {
  65. bp := r.node.BasePath
  66. if bp == "" {
  67. bp = "/"
  68. }
  69. if !strings.HasSuffix(bp, "/") {
  70. bp += "/"
  71. }
  72. return fmt.Sprintf("%s://%s:%d%s", r.node.Scheme, r.node.Address, r.node.Port, bp)
  73. }
  74. // do issues an HTTP request against the remote panel and decodes the
  75. // entity.Msg envelope. Returns an error for transport failures, non-2xx
  76. // responses, or {success:false} bodies.
  77. //
  78. // body may be nil. For application/x-www-form-urlencoded calls (the
  79. // existing controllers bind via c.ShouldBind which prefers form-encoded)
  80. // pass url.Values; for JSON pass any other type and we'll marshal it.
  81. func (r *Remote) do(ctx context.Context, method, path string, body any) (*envelope, error) {
  82. if r.node.ApiToken == "" {
  83. return nil, errors.New("node has no API token configured")
  84. }
  85. target := r.baseURL() + strings.TrimPrefix(path, "/")
  86. var (
  87. reqBody io.Reader
  88. contentType string
  89. )
  90. switch b := body.(type) {
  91. case nil:
  92. // nothing
  93. case url.Values:
  94. reqBody = strings.NewReader(b.Encode())
  95. contentType = "application/x-www-form-urlencoded"
  96. default:
  97. buf, err := json.Marshal(b)
  98. if err != nil {
  99. return nil, fmt.Errorf("marshal body: %w", err)
  100. }
  101. reqBody = bytes.NewReader(buf)
  102. contentType = "application/json"
  103. }
  104. cctx, cancel := context.WithTimeout(ctx, remoteHTTPTimeout)
  105. defer cancel()
  106. req, err := http.NewRequestWithContext(cctx, method, target, reqBody)
  107. if err != nil {
  108. return nil, err
  109. }
  110. req.Header.Set("Authorization", "Bearer "+r.node.ApiToken)
  111. req.Header.Set("Accept", "application/json")
  112. if contentType != "" {
  113. req.Header.Set("Content-Type", contentType)
  114. }
  115. resp, err := remoteHTTPClient.Do(req)
  116. if err != nil {
  117. return nil, fmt.Errorf("%s %s: %w", method, path, err)
  118. }
  119. defer resp.Body.Close()
  120. raw, err := io.ReadAll(resp.Body)
  121. if err != nil {
  122. return nil, fmt.Errorf("read body: %w", err)
  123. }
  124. if resp.StatusCode != http.StatusOK {
  125. return nil, fmt.Errorf("%s %s: HTTP %d", method, path, resp.StatusCode)
  126. }
  127. var env envelope
  128. if err := json.Unmarshal(raw, &env); err != nil {
  129. return nil, fmt.Errorf("decode envelope: %w", err)
  130. }
  131. if !env.Success {
  132. return &env, fmt.Errorf("remote: %s", env.Msg)
  133. }
  134. return &env, nil
  135. }
  136. // resolveRemoteID returns the remote panel's local inbound ID for the
  137. // given tag. Cache-backed; on miss it hits /panel/api/inbounds/list and
  138. // repopulates the whole map (one-shot list is cheaper than per-tag
  139. // lookups when several inbounds need resolving in sequence).
  140. func (r *Remote) resolveRemoteID(ctx context.Context, tag string) (int, error) {
  141. if id, ok := r.cacheGet(tag); ok {
  142. return id, nil
  143. }
  144. if err := r.refreshRemoteIDs(ctx); err != nil {
  145. return 0, err
  146. }
  147. if id, ok := r.cacheGet(tag); ok {
  148. return id, nil
  149. }
  150. return 0, fmt.Errorf("remote inbound with tag %q not found on node %s", tag, r.node.Name)
  151. }
  152. func (r *Remote) cacheGet(tag string) (int, bool) {
  153. r.mu.RLock()
  154. defer r.mu.RUnlock()
  155. id, ok := r.remoteIDByTag[tag]
  156. return id, ok
  157. }
  158. func (r *Remote) cacheSet(tag string, id int) {
  159. r.mu.Lock()
  160. defer r.mu.Unlock()
  161. r.remoteIDByTag[tag] = id
  162. }
  163. func (r *Remote) cacheDel(tag string) {
  164. r.mu.Lock()
  165. defer r.mu.Unlock()
  166. delete(r.remoteIDByTag, tag)
  167. }
  168. // refreshRemoteIDs replaces the in-memory tag→id map with whatever the
  169. // node currently has. Called on cache miss; also a useful recovery path
  170. // when the remote panel is rebuilt or we get a "not found" on update.
  171. func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
  172. env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  173. if err != nil {
  174. return err
  175. }
  176. var list []struct {
  177. Id int `json:"id"`
  178. Tag string `json:"tag"`
  179. }
  180. if err := json.Unmarshal(env.Obj, &list); err != nil {
  181. return fmt.Errorf("decode inbound list: %w", err)
  182. }
  183. next := make(map[string]int, len(list))
  184. for _, ib := range list {
  185. if ib.Tag == "" {
  186. continue
  187. }
  188. next[ib.Tag] = ib.Id
  189. }
  190. r.mu.Lock()
  191. r.remoteIDByTag = next
  192. r.mu.Unlock()
  193. return nil
  194. }
  195. func (r *Remote) AddInbound(ctx context.Context, ib *model.Inbound) error {
  196. // Strip NodeID from the wire payload so the remote stores a "local"
  197. // row from its own perspective. We also ship the full model.Inbound
  198. // minus runtime metadata. Tag is preserved so central + remote agree
  199. // on the identifier — relies on InboundController being patched to
  200. // not overwrite a non-empty Tag.
  201. payload := wireInbound(ib)
  202. env, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/add", payload)
  203. if err != nil {
  204. return err
  205. }
  206. // Response body contains the saved inbound (with the remote's Id).
  207. var created struct {
  208. Id int `json:"id"`
  209. Tag string `json:"tag"`
  210. }
  211. if len(env.Obj) > 0 {
  212. if err := json.Unmarshal(env.Obj, &created); err == nil && created.Id > 0 && created.Tag != "" {
  213. r.cacheSet(created.Tag, created.Id)
  214. }
  215. }
  216. return nil
  217. }
  218. func (r *Remote) DelInbound(ctx context.Context, ib *model.Inbound) error {
  219. id, err := r.resolveRemoteID(ctx, ib.Tag)
  220. if err != nil {
  221. // Already gone on remote — treat as success so a sync after a
  222. // remote panel reset doesn't strand the central panel.
  223. logger.Warning("remote DelInbound: tag", ib.Tag, "not found on", r.node.Name, "— treating as success")
  224. return nil
  225. }
  226. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/del/"+strconv.Itoa(id), nil); err != nil {
  227. return err
  228. }
  229. r.cacheDel(ib.Tag)
  230. return nil
  231. }
  232. func (r *Remote) UpdateInbound(ctx context.Context, oldIb, newIb *model.Inbound) error {
  233. // The remote's old row is keyed by oldIb.Tag (tags can change on
  234. // edit if listen/port changed). We update by remote-id so the row
  235. // keeps its identity even when its tag flips.
  236. id, err := r.resolveRemoteID(ctx, oldIb.Tag)
  237. if err != nil {
  238. // Remote lost the row — fall back to add. This can happen if
  239. // the node panel was reset; we'd rather end up with the inbound
  240. // existing than fail the user's update.
  241. return r.AddInbound(ctx, newIb)
  242. }
  243. payload := wireInbound(newIb)
  244. if _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/update/"+strconv.Itoa(id), payload); err != nil {
  245. return err
  246. }
  247. // Tag may have changed — remap the cache.
  248. if oldIb.Tag != newIb.Tag {
  249. r.cacheDel(oldIb.Tag)
  250. }
  251. r.cacheSet(newIb.Tag, id)
  252. return nil
  253. }
  254. // AddUser pushes a single client into the remote inbound's settings JSON.
  255. // We can't reuse the central panel's xrayApi.AddUser shape directly
  256. // because the remote's HTTP endpoint expects {id, settings} where
  257. // settings is a JSON string with a "clients":[...] array. The central
  258. // panel's InboundService has already updated its own settings JSON
  259. // before calling us, so we just ship the new full settings to the
  260. // remote via /update — simpler than reconstructing the partial AddUser
  261. // payload remote-side.
  262. //
  263. // Caller passes the full updated *model.Inbound on the same code path
  264. // AddUser is called from in InboundService. To avoid changing the
  265. // Runtime interface for that, AddUser/RemoveUser delegate to UpdateInbound.
  266. func (r *Remote) AddUser(ctx context.Context, ib *model.Inbound, _ map[string]any) error {
  267. return r.UpdateInbound(ctx, ib, ib)
  268. }
  269. func (r *Remote) RemoveUser(ctx context.Context, ib *model.Inbound, _ string) error {
  270. return r.UpdateInbound(ctx, ib, ib)
  271. }
  272. func (r *Remote) RestartXray(ctx context.Context) error {
  273. _, err := r.do(ctx, http.MethodPost, "panel/api/server/restartXrayService", nil)
  274. return err
  275. }
  276. func (r *Remote) ResetClientTraffic(ctx context.Context, ib *model.Inbound, email string) error {
  277. id, err := r.resolveRemoteID(ctx, ib.Tag)
  278. if err != nil {
  279. // Already gone on remote — central reset is enough.
  280. logger.Warning("remote ResetClientTraffic: tag", ib.Tag, "not found on", r.node.Name, "— treating as success")
  281. return nil
  282. }
  283. _, err = r.do(ctx, http.MethodPost,
  284. fmt.Sprintf("panel/api/inbounds/%d/resetClientTraffic/%s", id, url.PathEscape(email)),
  285. nil)
  286. return err
  287. }
  288. func (r *Remote) ResetInboundClientTraffics(ctx context.Context, ib *model.Inbound) error {
  289. id, err := r.resolveRemoteID(ctx, ib.Tag)
  290. if err != nil {
  291. logger.Warning("remote ResetInboundClientTraffics: tag", ib.Tag, "not found on", r.node.Name, "— treating as success")
  292. return nil
  293. }
  294. _, err = r.do(ctx, http.MethodPost,
  295. fmt.Sprintf("panel/api/inbounds/resetAllClientTraffics/%d", id), nil)
  296. return err
  297. }
  298. func (r *Remote) ResetAllTraffics(ctx context.Context) error {
  299. _, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/resetAllTraffics", nil)
  300. return err
  301. }
  302. // TrafficSnapshot is what NodeTrafficSyncJob pulls from a remote node
  303. // every cron tick. Inbounds carry absolute up/down/all_time + ClientStats
  304. // (the same shape /panel/api/inbounds/list returns); the two map fields
  305. // come from the dedicated /onlines and /lastOnline endpoints.
  306. type TrafficSnapshot struct {
  307. Inbounds []*model.Inbound
  308. OnlineEmails []string
  309. LastOnlineMap map[string]int64
  310. }
  311. // FetchTrafficSnapshot pulls the three pieces in series. Sequential is
  312. // fine because the cron job already fans out across nodes — adding
  313. // per-node parallelism on top would just thrash the remote.
  314. //
  315. // Not on the Runtime interface: only the sync job needs it, and Local
  316. // has no equivalent (XrayTrafficJob already covers the local engine).
  317. func (r *Remote) FetchTrafficSnapshot(ctx context.Context) (*TrafficSnapshot, error) {
  318. snap := &TrafficSnapshot{LastOnlineMap: map[string]int64{}}
  319. envList, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
  320. if err != nil {
  321. return nil, err
  322. }
  323. if err := json.Unmarshal(envList.Obj, &snap.Inbounds); err != nil {
  324. return nil, fmt.Errorf("decode inbound list: %w", err)
  325. }
  326. envOnlines, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/onlines", nil)
  327. if err != nil {
  328. // Onlines/lastOnline are nice-to-have. A failure here shouldn't
  329. // invalidate the inbound counter merge — log and continue with
  330. // empty values, the next tick may succeed.
  331. logger.Warning("remote", r.node.Name, "onlines fetch failed:", err)
  332. } else if len(envOnlines.Obj) > 0 {
  333. _ = json.Unmarshal(envOnlines.Obj, &snap.OnlineEmails)
  334. }
  335. envLastOnline, err := r.do(ctx, http.MethodPost, "panel/api/inbounds/lastOnline", nil)
  336. if err != nil {
  337. logger.Warning("remote", r.node.Name, "lastOnline fetch failed:", err)
  338. } else if len(envLastOnline.Obj) > 0 {
  339. _ = json.Unmarshal(envLastOnline.Obj, &snap.LastOnlineMap)
  340. }
  341. return snap, nil
  342. }
  343. // wireInbound builds the request body for /panel/api/inbounds/add and
  344. // /update. Mirrors the form fields the existing InboundController
  345. // expects via c.ShouldBind — we use form-encoded to match exactly.
  346. //
  347. // We deliberately omit Id (remote assigns its own), UserId (remote's
  348. // fallback user takes over), NodeID (the remote sees itself as local),
  349. // and ClientStats (those are joined-table data the remote rebuilds).
  350. func wireInbound(ib *model.Inbound) url.Values {
  351. v := url.Values{}
  352. v.Set("up", strconv.FormatInt(ib.Up, 10))
  353. v.Set("down", strconv.FormatInt(ib.Down, 10))
  354. v.Set("total", strconv.FormatInt(ib.Total, 10))
  355. v.Set("remark", ib.Remark)
  356. v.Set("enable", strconv.FormatBool(ib.Enable))
  357. v.Set("expiryTime", strconv.FormatInt(ib.ExpiryTime, 10))
  358. v.Set("listen", ib.Listen)
  359. v.Set("port", strconv.Itoa(ib.Port))
  360. v.Set("protocol", string(ib.Protocol))
  361. v.Set("settings", ib.Settings)
  362. v.Set("streamSettings", ib.StreamSettings)
  363. v.Set("tag", ib.Tag)
  364. v.Set("sniffing", ib.Sniffing)
  365. if ib.TrafficReset != "" {
  366. v.Set("trafficReset", ib.TrafficReset)
  367. }
  368. return v
  369. }