| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- package job
- import (
- "github.com/mhsanaei/3x-ui/v3/internal/database/model"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/mhsanaei/3x-ui/v3/internal/mtproto"
- "github.com/mhsanaei/3x-ui/v3/internal/web/service"
- "github.com/mhsanaei/3x-ui/v3/internal/xray"
- )
- // MtprotoJob reconciles the running mtg sidecar processes against the enabled
- // mtproto inbounds in the database, restarts any that crashed, and folds the
- // per-inbound traffic scraped from each mtg metrics endpoint into the usual
- // inbound traffic accounting.
- type MtprotoJob struct {
- inboundService service.InboundService
- }
- // NewMtprotoJob creates a new mtproto reconcile/traffic job instance.
- func NewMtprotoJob() *MtprotoJob {
- return new(MtprotoJob)
- }
- // Run reconciles desired mtproto inbounds with running mtg processes and
- // records traffic deltas.
- func (j *MtprotoJob) Run() {
- inbounds, err := j.inboundService.GetAllInbounds()
- if err != nil {
- logger.Warning("mtproto job: get inbounds failed:", err)
- return
- }
- var desired []mtproto.Instance
- routedTags := make(map[string]bool)
- for _, ib := range inbounds {
- if ib.Protocol != model.MTProto || !ib.Enable || ib.NodeID != nil {
- continue
- }
- if inst, ok := mtproto.InstanceFromInbound(ib); ok {
- desired = append(desired, inst)
- if inst.RouteThroughXray {
- routedTags[inst.Tag] = true
- }
- }
- }
- mgr := mtproto.GetManager()
- mgr.Reconcile(desired)
- deltas := mgr.CollectTraffic()
- if len(deltas) == 0 {
- return
- }
- traffics := make([]*xray.Traffic, 0, len(deltas))
- for _, d := range deltas {
- // Routed inbounds egress through the Xray SOCKS bridge, which carries the
- // inbound's tag and is metered by xray_traffic_job. Folding mtg's own
- // metrics in too would double-count, so skip them here.
- if routedTags[d.Tag] {
- continue
- }
- traffics = append(traffics, &xray.Traffic{
- IsInbound: true,
- Tag: d.Tag,
- Up: d.Up,
- Down: d.Down,
- })
- }
- if len(traffics) == 0 {
- return
- }
- if _, _, err := j.inboundService.AddTraffic(traffics, nil); err != nil {
- logger.Warning("mtproto job: add traffic failed:", err)
- }
- }
|