mtproto_job.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package job
  2. import (
  3. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  4. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  5. "github.com/mhsanaei/3x-ui/v3/internal/mtproto"
  6. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  7. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  8. )
  9. // MtprotoJob reconciles the running mtg sidecar processes against the enabled
  10. // mtproto inbounds in the database, restarts any that crashed, and folds the
  11. // per-inbound traffic scraped from each mtg metrics endpoint into the usual
  12. // inbound traffic accounting.
  13. type MtprotoJob struct {
  14. inboundService service.InboundService
  15. }
  16. // NewMtprotoJob creates a new mtproto reconcile/traffic job instance.
  17. func NewMtprotoJob() *MtprotoJob {
  18. return new(MtprotoJob)
  19. }
  20. // Run reconciles desired mtproto inbounds with running mtg processes and
  21. // records traffic deltas.
  22. func (j *MtprotoJob) Run() {
  23. inbounds, err := j.inboundService.GetAllInbounds()
  24. if err != nil {
  25. logger.Warning("mtproto job: get inbounds failed:", err)
  26. return
  27. }
  28. var desired []mtproto.Instance
  29. routedTags := make(map[string]bool)
  30. for _, ib := range inbounds {
  31. if ib.Protocol != model.MTProto || !ib.Enable || ib.NodeID != nil {
  32. continue
  33. }
  34. if inst, ok := mtproto.InstanceFromInbound(ib); ok {
  35. desired = append(desired, inst)
  36. if inst.RouteThroughXray {
  37. routedTags[inst.Tag] = true
  38. }
  39. }
  40. }
  41. mgr := mtproto.GetManager()
  42. mgr.Reconcile(desired)
  43. deltas := mgr.CollectTraffic()
  44. if len(deltas) == 0 {
  45. return
  46. }
  47. traffics := make([]*xray.Traffic, 0, len(deltas))
  48. for _, d := range deltas {
  49. // Routed inbounds egress through the Xray SOCKS bridge, which carries the
  50. // inbound's tag and is metered by xray_traffic_job. Folding mtg's own
  51. // metrics in too would double-count, so skip them here.
  52. if routedTags[d.Tag] {
  53. continue
  54. }
  55. traffics = append(traffics, &xray.Traffic{
  56. IsInbound: true,
  57. Tag: d.Tag,
  58. Up: d.Up,
  59. Down: d.Down,
  60. })
  61. }
  62. if len(traffics) == 0 {
  63. return
  64. }
  65. if _, _, err := j.inboundService.AddTraffic(traffics, nil); err != nil {
  66. logger.Warning("mtproto job: add traffic failed:", err)
  67. }
  68. }