node.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015
  1. package service
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "crypto/tls"
  6. "encoding/base64"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "slices"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "github.com/mhsanaei/3x-ui/v3/internal/database"
  19. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  20. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  21. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  22. "github.com/mhsanaei/3x-ui/v3/internal/util/json_util"
  23. "github.com/mhsanaei/3x-ui/v3/internal/util/netsafe"
  24. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  25. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  26. "gorm.io/gorm"
  27. )
  28. type HeartbeatPatch struct {
  29. Status string
  30. LastHeartbeat int64
  31. LatencyMs int
  32. XrayVersion string
  33. PanelVersion string
  34. Guid string
  35. CpuPct float64
  36. MemPct float64
  37. UptimeSecs uint64
  38. // NetUp/NetDown are the node's current interface throughput (bytes/sec),
  39. // summed over non-virtual interfaces, read from its status response.
  40. NetUp uint64
  41. NetDown uint64
  42. LastError string
  43. // XrayState and XrayError come from the remote /panel/api/server/status when the
  44. // panel API is reachable. They allow distinguishing panel connectivity from
  45. // Xray core health on the node.
  46. XrayState string
  47. XrayError string
  48. }
  49. type NodeService struct{}
  50. // FetchCertFingerprint connects to the node over HTTPS without verifying the
  51. // certificate and returns the leaf certificate's SHA-256 as base64, so the UI
  52. // can offer a "fetch and pin current certificate" action.
  53. func (s *NodeService) FetchCertFingerprint(ctx context.Context, n *model.Node) (string, error) {
  54. addr, err := netsafe.NormalizeHost(n.Address)
  55. if err != nil {
  56. return "", err
  57. }
  58. scheme := n.Scheme
  59. if scheme != "http" && scheme != "https" {
  60. scheme = "https"
  61. }
  62. if scheme != "https" {
  63. return "", common.NewError("certificate pinning is only available for https nodes")
  64. }
  65. if n.Port <= 0 || n.Port > 65535 {
  66. return "", common.NewError("node port must be 1-65535")
  67. }
  68. probeURL := &url.URL{
  69. Scheme: scheme,
  70. Host: net.JoinHostPort(addr, strconv.Itoa(n.Port)),
  71. Path: normalizeBasePath(n.BasePath) + "panel/api/server/status",
  72. }
  73. req, err := http.NewRequestWithContext(
  74. netsafe.ContextWithAllowPrivate(ctx, n.AllowPrivateAddress),
  75. http.MethodGet, probeURL.String(), nil)
  76. if err != nil {
  77. return "", err
  78. }
  79. client := &http.Client{
  80. Transport: &http.Transport{
  81. DialContext: netsafe.SSRFGuardedDialContext,
  82. TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // lgtm[go/disabled-certificate-check]
  83. },
  84. }
  85. resp, err := client.Do(req)
  86. if err != nil {
  87. return "", err
  88. }
  89. defer resp.Body.Close()
  90. if resp.TLS == nil || len(resp.TLS.PeerCertificates) == 0 {
  91. return "", common.NewError("node did not present a TLS certificate")
  92. }
  93. sum := sha256.Sum256(resp.TLS.PeerCertificates[0].Raw)
  94. return base64.StdEncoding.EncodeToString(sum[:]), nil
  95. }
  96. func (s *NodeService) GetAll() ([]*model.Node, error) {
  97. db := database.GetDB()
  98. var nodes []*model.Node
  99. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  100. if err != nil || len(nodes) == 0 {
  101. return nodes, err
  102. }
  103. type inboundRow struct {
  104. Id int
  105. NodeID int `gorm:"column:node_id"`
  106. }
  107. var inboundRows []inboundRow
  108. if err := db.Table("inbounds").
  109. Select("id, node_id").
  110. Where("node_id IS NOT NULL").
  111. Scan(&inboundRows).Error; err != nil {
  112. return nodes, nil
  113. }
  114. if len(inboundRows) == 0 {
  115. return nodes, nil
  116. }
  117. inboundsByNode := make(map[int][]int, len(nodes))
  118. nodeByInbound := make(map[int]int, len(inboundRows))
  119. for _, row := range inboundRows {
  120. inboundsByNode[row.NodeID] = append(inboundsByNode[row.NodeID], row.Id)
  121. nodeByInbound[row.Id] = row.NodeID
  122. }
  123. type clientCountRow struct {
  124. NodeID int `gorm:"column:node_id"`
  125. Count int `gorm:"column:count"`
  126. }
  127. var clientCounts []clientCountRow
  128. if err := db.Raw(`
  129. SELECT inbounds.node_id AS node_id, COUNT(DISTINCT client_inbounds.client_id) AS count
  130. FROM inbounds
  131. JOIN client_inbounds ON client_inbounds.inbound_id = inbounds.id
  132. WHERE inbounds.node_id IS NOT NULL
  133. GROUP BY inbounds.node_id
  134. `).Scan(&clientCounts).Error; err == nil {
  135. for _, row := range clientCounts {
  136. for _, n := range nodes {
  137. if n.Id == row.NodeID {
  138. n.ClientCount = row.Count
  139. break
  140. }
  141. }
  142. }
  143. }
  144. now := time.Now().UnixMilli()
  145. type trafficRow struct {
  146. InboundID int `gorm:"column:inbound_id"`
  147. Email string
  148. Enable bool
  149. Total int64
  150. Up int64
  151. Down int64
  152. ExpiryTime int64 `gorm:"column:expiry_time"`
  153. }
  154. var trafficRows []trafficRow
  155. inboundIDs := make([]int, 0, len(nodeByInbound))
  156. for id := range nodeByInbound {
  157. inboundIDs = append(inboundIDs, id)
  158. }
  159. // Chunk the IN clause to avoid "too many SQL variables" on SQLite
  160. // when there are many node-owned inbounds (common with many nodes).
  161. // sqliteMaxVars is defined in this package (inbound.go).
  162. for _, batch := range chunkInts(inboundIDs, sqliteMaxVars) {
  163. var page []trafficRow
  164. if err := db.Table("client_traffics").
  165. Select("inbound_id, email, enable, total, up, down, expiry_time").
  166. Where("inbound_id IN ?", batch).
  167. Scan(&page).Error; err == nil {
  168. trafficRows = append(trafficRows, page...)
  169. }
  170. }
  171. depletedByNode := make(map[int]int)
  172. if len(trafficRows) > 0 {
  173. for _, row := range trafficRows {
  174. nodeID, ok := nodeByInbound[row.InboundID]
  175. if !ok {
  176. continue
  177. }
  178. expired := row.ExpiryTime > 0 && row.ExpiryTime <= now
  179. exhausted := row.Total > 0 && row.Up+row.Down >= row.Total
  180. if expired || exhausted || !row.Enable {
  181. depletedByNode[nodeID]++
  182. }
  183. }
  184. }
  185. onlineByGuid := s.onlineEmailsByGuid()
  186. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  187. ambiguous := ambiguousNodeGuids(nodes, selfGuid)
  188. for _, n := range nodes {
  189. n.InboundCount = len(inboundsByNode[n.Id])
  190. n.DepletedCount = depletedByNode[n.Id]
  191. // Online is attributed to the node that physically hosts the client
  192. // (by GUID): a client on a sub-node counts under the sub-node, not
  193. // the intermediate node it syncs through (#4983).
  194. n.OnlineCount = len(onlineByGuid[effectiveNodeGuid(n, ambiguous)])
  195. }
  196. return nodes, nil
  197. }
  198. func (s *NodeService) onlineEmailsByGuid() map[string]map[string]struct{} {
  199. svc := InboundService{}
  200. byGuid := svc.GetOnlineClientsByGuid()
  201. out := make(map[string]map[string]struct{}, len(byGuid))
  202. for guid, emails := range byGuid {
  203. set := make(map[string]struct{}, len(emails))
  204. for _, email := range emails {
  205. set[email] = struct{}{}
  206. }
  207. out[guid] = set
  208. }
  209. return out
  210. }
  211. // effectiveNodeGuid is a node's stable online/inbound attribution key: its
  212. // reported panelGuid, or a master-local synthetic node-id fallback when the node
  213. // has no GUID yet (old build) or its GUID is ambiguous. ambiguous comes from
  214. // ambiguousNodeGuids.
  215. func effectiveNodeGuid(n *model.Node, ambiguous map[string]struct{}) string {
  216. if n.Guid == "" {
  217. return synthNodeGuid(n.Id)
  218. }
  219. if n.Id > 0 {
  220. if _, bad := ambiguous[n.Guid]; bad {
  221. return synthNodeGuid(n.Id)
  222. }
  223. }
  224. return n.Guid
  225. }
  226. // ambiguousNodeGuids returns the panelGuids a node must not be attributed under
  227. // directly, because doing so would merge two distinct identities: a GUID
  228. // reported by more than one of this master's direct nodes (cloned node servers
  229. // ship the same panelGuid in their copied settings), or a GUID equal to the
  230. // master's own panelGuid (a node cloned from the master). A node holding such a
  231. // GUID falls back to its node-unique synthNodeGuid. Transitive sub-nodes (Id 0)
  232. // carry distinct descendant GUIDs by construction and are excluded.
  233. func ambiguousNodeGuids(nodes []*model.Node, selfGuid string) map[string]struct{} {
  234. counts := make(map[string]int, len(nodes))
  235. for _, n := range nodes {
  236. if n.Id > 0 && n.Guid != "" {
  237. counts[n.Guid]++
  238. }
  239. }
  240. ambiguous := make(map[string]struct{})
  241. for guid, c := range counts {
  242. if c > 1 {
  243. ambiguous[guid] = struct{}{}
  244. }
  245. }
  246. if selfGuid != "" {
  247. if _, ok := counts[selfGuid]; ok {
  248. ambiguous[selfGuid] = struct{}{}
  249. }
  250. }
  251. return ambiguous
  252. }
  253. // effectiveNodeKey returns one node's attribution key without a preloaded node
  254. // list — its panelGuid when that GUID uniquely identifies it among the master's
  255. // nodes and differs from the master's own, otherwise its node-unique
  256. // synthNodeGuid. Same rule as effectiveNodeGuid + ambiguousNodeGuids, for the
  257. // write paths that handle a single node (online tree, IP attribution).
  258. func effectiveNodeKey(node *model.Node) string {
  259. if node == nil {
  260. return ""
  261. }
  262. if node.Guid == "" {
  263. return synthNodeGuid(node.Id)
  264. }
  265. var sameGuid int64
  266. database.GetDB().Model(&model.Node{}).Where("guid = ?", node.Guid).Count(&sameGuid)
  267. masterGuid, _ := (&SettingService{}).GetPanelGuid()
  268. if sameGuid > 1 || node.Guid == masterGuid {
  269. return synthNodeGuid(node.Id)
  270. }
  271. return node.Guid
  272. }
  273. func (s *NodeService) GetById(id int) (*model.Node, error) {
  274. db := database.GetDB()
  275. n := &model.Node{}
  276. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  277. return nil, err
  278. }
  279. return n, nil
  280. }
  281. // NodeExists reports whether a node with the given id exists on this panel.
  282. // Used to drop stale, cross-panel node references on inbound import. A Count
  283. // query distinguishes "no such node" (count 0, no error) from a real DB error.
  284. func (s *NodeService) NodeExists(id int) (bool, error) {
  285. if id <= 0 {
  286. return false, nil
  287. }
  288. var count int64
  289. if err := database.GetDB().Model(model.Node{}).Where("id = ?", id).Count(&count).Error; err != nil {
  290. return false, err
  291. }
  292. return count > 0, nil
  293. }
  294. func normalizeBasePath(p string) string {
  295. p = strings.TrimSpace(p)
  296. if p == "" {
  297. return "/"
  298. }
  299. if !strings.HasPrefix(p, "/") {
  300. p = "/" + p
  301. }
  302. if !strings.HasSuffix(p, "/") {
  303. p = p + "/"
  304. }
  305. return p
  306. }
  307. func (s *NodeService) normalize(n *model.Node) error {
  308. n.Name = strings.TrimSpace(n.Name)
  309. n.ApiToken = strings.TrimSpace(n.ApiToken)
  310. if n.Name == "" {
  311. return common.NewError("node name is required")
  312. }
  313. addr, err := netsafe.NormalizeHost(n.Address)
  314. if err != nil {
  315. return common.NewError(err.Error())
  316. }
  317. n.Address = addr
  318. if n.Port <= 0 || n.Port > 65535 {
  319. return common.NewError("node port must be 1-65535")
  320. }
  321. if n.Scheme != "http" && n.Scheme != "https" {
  322. n.Scheme = "https"
  323. }
  324. if n.TlsVerifyMode != "skip" && n.TlsVerifyMode != "pin" && n.TlsVerifyMode != "mtls" {
  325. n.TlsVerifyMode = "verify"
  326. }
  327. if n.TlsVerifyMode == "mtls" && n.Scheme != "https" {
  328. return common.NewError("mtls requires the node scheme to be https")
  329. }
  330. n.PinnedCertSha256 = strings.TrimSpace(n.PinnedCertSha256)
  331. if n.InboundSyncMode != "selected" {
  332. n.InboundSyncMode = "all"
  333. n.InboundTags = nil
  334. } else {
  335. seen := make(map[string]struct{}, len(n.InboundTags))
  336. tags := make([]string, 0, len(n.InboundTags))
  337. for _, tag := range n.InboundTags {
  338. tag = strings.TrimSpace(tag)
  339. if tag == "" {
  340. continue
  341. }
  342. if _, ok := seen[tag]; ok {
  343. continue
  344. }
  345. seen[tag] = struct{}{}
  346. tags = append(tags, tag)
  347. }
  348. n.InboundTags = tags
  349. }
  350. if n.TlsVerifyMode == "pin" {
  351. if _, err := runtime.DecodeCertPin(n.PinnedCertSha256); err != nil {
  352. return common.NewError(err.Error())
  353. }
  354. }
  355. n.BasePath = normalizeBasePath(n.BasePath)
  356. return nil
  357. }
  358. func (s *NodeService) Create(n *model.Node) error {
  359. if err := s.normalize(n); err != nil {
  360. return err
  361. }
  362. db := database.GetDB()
  363. return db.Create(n).Error
  364. }
  365. func (s *NodeService) Update(id int, in *model.Node) error {
  366. if err := s.normalize(in); err != nil {
  367. return err
  368. }
  369. inboundTagsJSON, err := json.Marshal(in.InboundTags)
  370. if err != nil {
  371. return err
  372. }
  373. db := database.GetDB()
  374. existing := &model.Node{}
  375. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  376. return err
  377. }
  378. updates := map[string]any{
  379. "name": in.Name,
  380. "remark": in.Remark,
  381. "scheme": in.Scheme,
  382. "address": in.Address,
  383. "port": in.Port,
  384. "base_path": in.BasePath,
  385. "api_token": in.ApiToken,
  386. "enable": in.Enable,
  387. "allow_private_address": in.AllowPrivateAddress,
  388. "tls_verify_mode": in.TlsVerifyMode,
  389. "pinned_cert_sha256": in.PinnedCertSha256,
  390. "inbound_sync_mode": in.InboundSyncMode,
  391. "inbound_tags": string(inboundTagsJSON),
  392. "outbound_tag": in.OutboundTag,
  393. }
  394. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  395. return err
  396. }
  397. if dErr := s.MarkNodeDirty(id); dErr != nil {
  398. logger.Warning("mark node dirty after update failed:", dErr)
  399. }
  400. if mgr := runtime.GetManager(); mgr != nil {
  401. mgr.InvalidateNode(id)
  402. }
  403. return nil
  404. }
  405. func (s *NodeService) GetRemoteInboundOptions(ctx context.Context, n *model.Node) ([]runtime.RemoteInboundOption, error) {
  406. if err := s.normalize(n); err != nil {
  407. return nil, err
  408. }
  409. if n.OutboundTag == "" {
  410. return runtime.NewRemote(n, nil).ListInboundOptions(ctx)
  411. }
  412. // Mirror ProbeWithOutbound: a node being added/edited has no persistent
  413. // egress bridge yet, so route the list call through a temporary one or the
  414. // remote panel stays unreachable and the request times out.
  415. var options []runtime.RemoteInboundOption
  416. var err error
  417. s.withOutboundBridge(n.Id, n.OutboundTag, func(proxyURL string) {
  418. options, err = runtime.NewRemote(n, staticEgressResolver(proxyURL)).ListInboundOptions(ctx)
  419. })
  420. return options, err
  421. }
  422. // staticEgressResolver hands a fixed proxy URL to runtime.NewRemote. An empty
  423. // string yields a direct connection, so it doubles as the graceful fallback
  424. // when a temporary bridge can't be built.
  425. type staticEgressResolver string
  426. func (r staticEgressResolver) NodeEgressProxyURL(int) string { return string(r) }
  427. // EnsureInboundTagAllowed adds a panel-managed inbound's tag to the node's
  428. // selection when the node syncs in "selected" mode. Without it, the next
  429. // traffic sync would filter the tag out of the snapshot and the orphan sweep
  430. // would silently delete the central row the panel just created or renamed.
  431. // Tags are only ever added (never removed): on a rename the node may keep
  432. // reporting the old tag until the remote update lands, and a leftover entry
  433. // that matches nothing is harmless.
  434. func (s *NodeService) EnsureInboundTagAllowed(nodeID int, tag string) error {
  435. tag = strings.TrimSpace(tag)
  436. if nodeID <= 0 || tag == "" {
  437. return nil
  438. }
  439. db := database.GetDB()
  440. node := &model.Node{}
  441. if err := db.Where("id = ?", nodeID).First(node).Error; err != nil {
  442. return err
  443. }
  444. if node.InboundSyncMode != "selected" {
  445. return nil
  446. }
  447. if slices.Contains(node.InboundTags, tag) {
  448. return nil
  449. }
  450. buf, err := json.Marshal(append(node.InboundTags, tag))
  451. if err != nil {
  452. return err
  453. }
  454. return db.Model(model.Node{}).Where("id = ?", nodeID).
  455. Updates(map[string]any{"inbound_tags": string(buf)}).Error
  456. }
  457. func FilterNodeSnapshot(n *model.Node, snap *runtime.TrafficSnapshot) {
  458. if n == nil || snap == nil || n.InboundSyncMode != "selected" {
  459. return
  460. }
  461. allowed := make(map[string]struct{}, len(n.InboundTags))
  462. for _, tag := range n.InboundTags {
  463. allowed[tag] = struct{}{}
  464. }
  465. filtered := make([]*model.Inbound, 0, len(snap.Inbounds))
  466. for _, inbound := range snap.Inbounds {
  467. if inbound == nil {
  468. continue
  469. }
  470. if _, ok := allowed[inbound.Tag]; ok {
  471. filtered = append(filtered, inbound)
  472. }
  473. }
  474. snap.Inbounds = filtered
  475. }
  476. func (s *NodeService) Delete(id int) error {
  477. db := database.GetDB()
  478. // Refuse to delete a node that still owns inbounds: dropping the node row
  479. // while inbounds keep its node_id leaves orphaned, dangling references that
  480. // confuse node sync, subscriptions and cleanup. The operator must detach or
  481. // remove those inbounds first. (DB-002)
  482. var attached int64
  483. if err := db.Model(&model.Inbound{}).Where("node_id = ?", id).Count(&attached).Error; err != nil {
  484. return err
  485. }
  486. if attached > 0 {
  487. return common.NewError(fmt.Sprintf("cannot delete node: %d inbound(s) still attached to it; detach or delete them first", attached))
  488. }
  489. // Capture the node's guid before deleting the row so we can drop its per-node
  490. // IP attribution. NodeClientIp is keyed by the node's attribution key, which
  491. // is its guid normally but its node-unique key for a cloned/ambiguous-guid
  492. // node (see effectiveNodeKey) — so we purge both below.
  493. var guid string
  494. var n model.Node
  495. if err := db.Select("guid").Where("id = ?", id).First(&n).Error; err == nil {
  496. guid = n.Guid
  497. }
  498. // Delete the node row and its per-node child rows atomically. Remove the
  499. // children (traffic baselines, IP attribution) before the parent node row so
  500. // the ordering already matches a future ON DELETE constraint. Delete stays
  501. // tolerant of a missing node row so it can still clean up orphaned baselines.
  502. if err := db.Transaction(func(tx *gorm.DB) error {
  503. if err := tx.Where("node_id = ?", id).Delete(&model.NodeClientTraffic{}).Error; err != nil {
  504. return err
  505. }
  506. guids := []string{synthNodeGuid(id)}
  507. if guid != "" {
  508. guids = append(guids, guid)
  509. }
  510. if err := tx.Where("node_guid IN ?", guids).Delete(&model.NodeClientIp{}).Error; err != nil {
  511. return err
  512. }
  513. return tx.Where("id = ?", id).Delete(&model.Node{}).Error
  514. }); err != nil {
  515. return err
  516. }
  517. if mgr := runtime.GetManager(); mgr != nil {
  518. mgr.InvalidateNode(id)
  519. }
  520. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  521. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  522. return nil
  523. }
  524. func (s *NodeService) SetEnable(id int, enable bool) error {
  525. db := database.GetDB()
  526. if err := db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error; err != nil {
  527. return err
  528. }
  529. if mgr := runtime.GetManager(); mgr != nil {
  530. mgr.InvalidateNode(id)
  531. }
  532. return nil
  533. }
  534. // GetWebCertFiles asks a node for its own web TLS certificate/key file paths,
  535. // used by "Set Cert from Panel" so a node-assigned inbound gets paths that
  536. // exist on the node rather than the central panel. See issue #4854.
  537. func (s *NodeService) GetWebCertFiles(id int) (*runtime.WebCertFiles, error) {
  538. n, err := s.GetById(id)
  539. if err != nil || n == nil {
  540. return nil, fmt.Errorf("node not found")
  541. }
  542. if !n.Enable {
  543. return nil, fmt.Errorf("node is disabled")
  544. }
  545. mgr := runtime.GetManager()
  546. if mgr == nil {
  547. return nil, fmt.Errorf("runtime manager unavailable")
  548. }
  549. remote, err := mgr.RemoteFor(n)
  550. if err != nil {
  551. return nil, err
  552. }
  553. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  554. defer cancel()
  555. return remote.GetWebCertFiles(ctx)
  556. }
  557. // NodeUpdateResult reports the outcome of triggering a panel self-update on one
  558. // node so the UI can show per-node success/failure for a bulk request.
  559. type NodeUpdateResult struct {
  560. Id int `json:"id"`
  561. Name string `json:"name"`
  562. OK bool `json:"ok"`
  563. Error string `json:"error,omitempty"`
  564. }
  565. // UpdatePanels triggers the official self-updater on each given node. Only
  566. // enabled, online nodes are eligible — an offline node can't be reached, so it
  567. // is reported as skipped rather than silently dropped.
  568. func (s *NodeService) UpdatePanels(ids []int) ([]NodeUpdateResult, error) {
  569. mgr := runtime.GetManager()
  570. if mgr == nil {
  571. return nil, fmt.Errorf("runtime manager unavailable")
  572. }
  573. results := make([]NodeUpdateResult, 0, len(ids))
  574. for _, id := range ids {
  575. n, err := s.GetById(id)
  576. if err != nil || n == nil {
  577. results = append(results, NodeUpdateResult{Id: id, OK: false, Error: "node not found"})
  578. continue
  579. }
  580. res := NodeUpdateResult{Id: id, Name: n.Name}
  581. switch {
  582. case !n.Enable:
  583. res.Error = "node is disabled"
  584. case n.Status != "online":
  585. res.Error = "node is offline"
  586. default:
  587. remote, remoteErr := mgr.RemoteFor(n)
  588. if remoteErr != nil {
  589. res.Error = remoteErr.Error()
  590. break
  591. }
  592. ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
  593. updErr := remote.UpdatePanel(ctx)
  594. cancel()
  595. if updErr != nil {
  596. res.Error = updErr.Error()
  597. } else {
  598. res.OK = true
  599. }
  600. }
  601. results = append(results, res)
  602. }
  603. return results, nil
  604. }
  605. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  606. db := database.GetDB()
  607. updates := map[string]any{
  608. "status": p.Status,
  609. "last_heartbeat": p.LastHeartbeat,
  610. "latency_ms": p.LatencyMs,
  611. "xray_version": p.XrayVersion,
  612. "panel_version": p.PanelVersion,
  613. "cpu_pct": p.CpuPct,
  614. "mem_pct": p.MemPct,
  615. "uptime_secs": p.UptimeSecs,
  616. "net_up": p.NetUp,
  617. "net_down": p.NetDown,
  618. "last_error": p.LastError,
  619. "xray_state": p.XrayState,
  620. "xray_error": p.XrayError,
  621. }
  622. // Only learn the GUID; never clear a known one if an old-build node (or a
  623. // failed probe) reports none, so the stable identity survives blips.
  624. if p.Guid != "" {
  625. updates["guid"] = p.Guid
  626. s.warnOnDuplicateGuid(id, p.Guid)
  627. }
  628. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  629. return err
  630. }
  631. if p.Status == "online" {
  632. now := time.Unix(p.LastHeartbeat, 0)
  633. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  634. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  635. nodeMetrics.append(nodeMetricKey(id, "netUp"), now, float64(p.NetUp))
  636. nodeMetrics.append(nodeMetricKey(id, "netDown"), now, float64(p.NetDown))
  637. }
  638. return nil
  639. }
  640. // warnedDupGuid remembers the (nodeID -> guid) pairs already warned about so a
  641. // cloned-server collision is logged once, not every heartbeat.
  642. var warnedDupGuid sync.Map
  643. // warnOnDuplicateGuid logs once when a node reports a panelGuid already held by
  644. // another node or by the master itself (the cloned-server footgun). Attribution
  645. // still works — it falls back to node-unique keys — but the operator should
  646. // regenerate the duplicate panelGuid to restore real identity and per-node IP
  647. // attribution. Re-arms if the collision later clears.
  648. func (s *NodeService) warnOnDuplicateGuid(id int, guid string) {
  649. var clash int64
  650. database.GetDB().Model(&model.Node{}).Where("guid = ? AND id <> ?", guid, id).Count(&clash)
  651. masterGuid, _ := (&SettingService{}).GetPanelGuid()
  652. if clash == 0 && guid != masterGuid {
  653. warnedDupGuid.Delete(id)
  654. return
  655. }
  656. if prev, ok := warnedDupGuid.Load(id); ok && prev == guid {
  657. return
  658. }
  659. warnedDupGuid.Store(id, guid)
  660. logger.Warningf("node %d reports panelGuid %s already used by another node or the master (cloned server?) — regenerate it on that node so online and IP attribution stay per-node", id, guid)
  661. }
  662. func (s *NodeService) MarkNodeDirty(id int) error {
  663. if id <= 0 {
  664. return nil
  665. }
  666. return database.GetDB().Model(model.Node{}).
  667. Where("id = ?", id).
  668. Updates(map[string]any{
  669. "config_dirty": true,
  670. "config_dirty_at": time.Now().UnixMilli(),
  671. }).Error
  672. }
  673. func (s *NodeService) ClearNodeDirty(id int, dirtyAt int64) error {
  674. if id <= 0 {
  675. return nil
  676. }
  677. return database.GetDB().Model(model.Node{}).
  678. Where("id = ? AND config_dirty_at = ?", id, dirtyAt).
  679. Update("config_dirty", false).Error
  680. }
  681. func (s *NodeService) NodeSyncState(id int) (enabled bool, status string, dirty bool, dirtyAt int64, err error) {
  682. if id <= 0 {
  683. return false, "", false, 0, errors.New("invalid node id")
  684. }
  685. var row model.Node
  686. err = database.GetDB().Model(model.Node{}).
  687. Select("enable", "status", "config_dirty", "config_dirty_at").
  688. Where("id = ?", id).
  689. First(&row).Error
  690. if err != nil {
  691. return false, "", false, 0, err
  692. }
  693. return row.Enable, row.Status, row.ConfigDirty, row.ConfigDirtyAt, nil
  694. }
  695. func (s *NodeService) IsNodePending(id int) bool {
  696. enabled, status, dirty, _, err := s.NodeSyncState(id)
  697. if err != nil {
  698. return false
  699. }
  700. return !enabled || status != "online" || dirty
  701. }
  702. func nodeMetricKey(id int, metric string) string {
  703. return "node:" + strconv.Itoa(id) + ":" + metric
  704. }
  705. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  706. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  707. }
  708. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  709. proxyURL := ""
  710. if n.OutboundTag != "" {
  711. if mgr := runtime.GetManager(); mgr != nil {
  712. proxyURL = mgr.NodeEgressProxyURL(n.Id)
  713. }
  714. }
  715. return s.probe(ctx, n, proxyURL)
  716. }
  717. func (s *NodeService) ProbeWithOutbound(ctx context.Context, n *model.Node, outboundTag string) (HeartbeatPatch, error) {
  718. if outboundTag == "" {
  719. return s.Probe(ctx, n)
  720. }
  721. var patch HeartbeatPatch
  722. var err error
  723. s.withOutboundBridge(n.Id, outboundTag, func(proxyURL string) {
  724. if proxyURL == "" {
  725. patch, err = s.Probe(ctx, n)
  726. return
  727. }
  728. patch, err = s.probe(ctx, n, proxyURL)
  729. })
  730. return patch, err
  731. }
  732. // withOutboundBridge stands up a temporary loopback SOCKS5 inbound in the
  733. // running Xray, routes it through outboundTag, and runs fn with the bridge's
  734. // proxy URL before tearing it down. It is used to reach a node through its
  735. // connection outbound before the persistent egress bridge has been injected
  736. // into the config (e.g. while the node is still being added or edited). When
  737. // Xray isn't running or the bridge can't be built, fn runs with an empty
  738. // proxyURL so callers fall back to a direct connection.
  739. func (s *NodeService) withOutboundBridge(nodeID int, outboundTag string, fn func(proxyURL string)) {
  740. proc := XrayProcess()
  741. if proc == nil || !proc.IsRunning() {
  742. fn("")
  743. return
  744. }
  745. apiPort := proc.GetAPIPort()
  746. if apiPort <= 0 {
  747. fn("")
  748. return
  749. }
  750. listener, err := net.Listen("tcp", "127.0.0.1:0")
  751. if err != nil {
  752. fn("")
  753. return
  754. }
  755. port := listener.Addr().(*net.TCPAddr).Port
  756. listener.Close()
  757. tag := fmt.Sprintf("node-test-%d-%d", nodeID, time.Now().UnixNano())
  758. proxyURL := fmt.Sprintf("socks5://127.0.0.1:%d", port)
  759. inboundJSON, err := json.Marshal(xray.InboundConfig{
  760. Listen: json_util.RawMessage(`"127.0.0.1"`),
  761. Port: port,
  762. Protocol: "socks",
  763. Settings: json_util.RawMessage(`{"auth":"noauth","udp":false}`),
  764. Tag: tag,
  765. })
  766. if err != nil {
  767. fn("")
  768. return
  769. }
  770. cfg := proc.GetConfig()
  771. routing := map[string]any{}
  772. if len(cfg.RouterConfig) > 0 {
  773. _ = json.Unmarshal(cfg.RouterConfig, &routing)
  774. }
  775. rules, _ := routing["rules"].([]any)
  776. rule := map[string]any{
  777. "type": "field",
  778. "inboundTag": []any{tag},
  779. }
  780. if routingTagIsBalancer(routing, outboundTag) {
  781. rule["balancerTag"] = outboundTag
  782. } else {
  783. rule["outboundTag"] = outboundTag
  784. }
  785. routing["rules"] = append([]any{rule}, rules...)
  786. routingJSON, err := json.Marshal(routing)
  787. if err != nil {
  788. fn("")
  789. return
  790. }
  791. originalRoutingJSON := cfg.RouterConfig
  792. api := xray.XrayAPI{}
  793. if err := api.Init(apiPort); err != nil {
  794. fn("")
  795. return
  796. }
  797. defer api.Close()
  798. if err := api.AddInbound(inboundJSON); err != nil {
  799. fn("")
  800. return
  801. }
  802. defer func() {
  803. if err := api.DelInbound(tag); err != nil {
  804. logger.Warning("remove temp node bridge inbound failed:", err)
  805. }
  806. }()
  807. if err := api.ApplyRoutingConfig(routingJSON); err != nil {
  808. fn("")
  809. return
  810. }
  811. defer func() {
  812. restore := originalRoutingJSON
  813. if len(restore) == 0 {
  814. restore = []byte("{}")
  815. }
  816. if err := api.ApplyRoutingConfig(restore); err != nil {
  817. logger.Warning("restore routing after node bridge failed:", err)
  818. }
  819. }()
  820. fn(proxyURL)
  821. }
  822. func (s *NodeService) probe(ctx context.Context, n *model.Node, proxyURL string) (HeartbeatPatch, error) {
  823. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  824. addr, err := netsafe.NormalizeHost(n.Address)
  825. if err != nil {
  826. patch.LastError = err.Error()
  827. return patch, err
  828. }
  829. scheme := n.Scheme
  830. if scheme != "http" && scheme != "https" {
  831. scheme = "https"
  832. }
  833. if n.Port <= 0 || n.Port > 65535 {
  834. patch.LastError = "node port must be 1-65535"
  835. return patch, errors.New(patch.LastError)
  836. }
  837. probeURL := &url.URL{
  838. Scheme: scheme,
  839. Host: net.JoinHostPort(addr, strconv.Itoa(n.Port)),
  840. Path: normalizeBasePath(n.BasePath) + "panel/api/server/status",
  841. }
  842. req, err := http.NewRequestWithContext(
  843. netsafe.ContextWithAllowPrivate(ctx, n.AllowPrivateAddress),
  844. http.MethodGet, probeURL.String(), nil)
  845. if err != nil {
  846. patch.LastError = err.Error()
  847. return patch, err
  848. }
  849. if n.ApiToken != "" {
  850. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  851. }
  852. req.Header.Set("Accept", "application/json")
  853. client, err := runtime.HTTPClientForNode(n, proxyURL)
  854. if err != nil {
  855. patch.LastError = err.Error()
  856. return patch, err
  857. }
  858. start := time.Now()
  859. resp, err := client.Do(req)
  860. if err != nil {
  861. patch.LastError = err.Error()
  862. return patch, err
  863. }
  864. defer resp.Body.Close()
  865. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  866. if resp.StatusCode != http.StatusOK {
  867. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  868. return patch, errors.New(patch.LastError)
  869. }
  870. var envelope struct {
  871. Success bool `json:"success"`
  872. Msg string `json:"msg"`
  873. Obj *struct {
  874. CpuPct float64 `json:"cpu"`
  875. Mem struct {
  876. Current uint64 `json:"current"`
  877. Total uint64 `json:"total"`
  878. } `json:"mem"`
  879. Xray struct {
  880. Version string `json:"version"`
  881. State string `json:"state"`
  882. ErrorMsg string `json:"errorMsg"`
  883. } `json:"xray"`
  884. PanelVersion string `json:"panelVersion"`
  885. PanelGuid string `json:"panelGuid"`
  886. Uptime uint64 `json:"uptime"`
  887. NetIO struct {
  888. Up uint64 `json:"up"`
  889. Down uint64 `json:"down"`
  890. } `json:"netIO"`
  891. } `json:"obj"`
  892. }
  893. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  894. patch.LastError = "decode response: " + err.Error()
  895. return patch, err
  896. }
  897. if !envelope.Success || envelope.Obj == nil {
  898. patch.LastError = "remote returned success=false: " + envelope.Msg
  899. return patch, errors.New(patch.LastError)
  900. }
  901. o := envelope.Obj
  902. patch.CpuPct = o.CpuPct
  903. if o.Mem.Total > 0 {
  904. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  905. }
  906. patch.XrayVersion = o.Xray.Version
  907. patch.XrayState = o.Xray.State
  908. patch.XrayError = o.Xray.ErrorMsg
  909. patch.PanelVersion = o.PanelVersion
  910. patch.Guid = o.PanelGuid
  911. patch.UptimeSecs = o.Uptime
  912. patch.NetUp = o.NetIO.Up
  913. patch.NetDown = o.NetIO.Down
  914. return patch, nil
  915. }
  916. type ProbeResultUI struct {
  917. Status string `json:"status" example:"online"`
  918. LatencyMs int `json:"latencyMs" example:"42"`
  919. XrayVersion string `json:"xrayVersion" example:"25.10.31"`
  920. PanelVersion string `json:"panelVersion" example:"v3.x.x"`
  921. CpuPct float64 `json:"cpuPct" example:"12.5"`
  922. MemPct float64 `json:"memPct" example:"45.2"`
  923. UptimeSecs uint64 `json:"uptimeSecs" example:"86400"`
  924. Error string `json:"error"`
  925. // XrayState/XrayError are populated on successful probes even when the node's
  926. // Xray core is not healthy. The UI uses them for a distinct "panel ok, xray failed" indicator.
  927. XrayState string `json:"xrayState"`
  928. XrayError string `json:"xrayError"`
  929. }
  930. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  931. r := ProbeResultUI{
  932. LatencyMs: p.LatencyMs,
  933. XrayVersion: p.XrayVersion,
  934. PanelVersion: p.PanelVersion,
  935. CpuPct: p.CpuPct,
  936. MemPct: p.MemPct,
  937. UptimeSecs: p.UptimeSecs,
  938. Error: FriendlyProbeError(p.LastError),
  939. XrayState: p.XrayState,
  940. XrayError: p.XrayError,
  941. }
  942. if ok {
  943. r.Status = "online"
  944. } else {
  945. r.Status = "offline"
  946. }
  947. return r
  948. }
  949. func FriendlyProbeError(msg string) string {
  950. if strings.Contains(msg, "server gave HTTP response to HTTPS client") {
  951. return "the server speaks HTTP, not HTTPS; set the node scheme to http"
  952. }
  953. return msg
  954. }