tgbot.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package tgbot
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "embed"
  6. "math/big"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  17. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  19. "github.com/mhsanaei/3x-ui/v3/internal/web/global"
  20. "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
  21. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  22. "github.com/mymmrac/telego"
  23. th "github.com/mymmrac/telego/telegohandler"
  24. "github.com/valyala/fasthttp"
  25. "github.com/valyala/fasthttp/fasthttpproxy"
  26. )
  27. var (
  28. bot *telego.Bot
  29. // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
  30. botCancel context.CancelFunc
  31. // tgBotMutex protects concurrent access to botCancel variable
  32. tgBotMutex sync.Mutex
  33. // botWG waits for the OnReceive Long Polling goroutine to finish.
  34. botWG sync.WaitGroup
  35. botHandler *th.BotHandler
  36. adminIds []int64
  37. isRunning bool
  38. hostname string
  39. hashStorage *global.HashStorage
  40. // EventBus is set from web layer to publish login/security events.
  41. EventBus *eventbus.Bus
  42. // Performance improvements
  43. messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
  44. optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
  45. // Simple cache for frequently accessed data
  46. statusCache struct {
  47. data *service.Status
  48. timestamp time.Time
  49. mutex sync.RWMutex
  50. }
  51. serverStatsCache struct {
  52. data string
  53. timestamp time.Time
  54. mutex sync.RWMutex
  55. }
  56. // clients data to adding new client. receiver_inbound_IDs is the set of
  57. // inbounds the new client will be attached to; receiver_inbound_ID mirrors
  58. // the primary pick for the legacy attach-picker entry point. Per-protocol
  59. // secrets (UUID, password, flow, method) are filled per-inbound on submit
  60. // by ClientService.fillProtocolDefaults, so the bot only tracks universal
  61. // client fields here.
  62. receiver_inbound_ID int
  63. receiver_inbound_IDs []int
  64. client_Email string
  65. client_LimitIP int
  66. client_TotalGB int64
  67. client_ExpiryTime int64
  68. client_Enable bool
  69. client_TgID string
  70. client_SubID string
  71. client_Comment string
  72. client_Reset int
  73. )
  74. // userStateStore guards the per-chat conversation states. The Telegram command
  75. // and callback handlers run on a worker-pool goroutine while the message handler
  76. // runs on the dispatch goroutine, so a bare map would be a concurrent-map-write
  77. // crash. It also expires abandoned conversations so a user who starts a flow and
  78. // goes silent doesn't leave an entry forever.
  79. type userStateStore struct {
  80. mu sync.Mutex
  81. states map[int64]userStateEntry
  82. lastPrune time.Time
  83. }
  84. type userStateEntry struct {
  85. state string
  86. at time.Time
  87. }
  88. var userStateMgr = &userStateStore{states: make(map[int64]userStateEntry)}
  89. func (s *userStateStore) set(chatID int64, state string) {
  90. s.mu.Lock()
  91. s.states[chatID] = userStateEntry{state: state, at: time.Now()}
  92. s.mu.Unlock()
  93. }
  94. func (s *userStateStore) get(chatID int64) (string, bool) {
  95. s.mu.Lock()
  96. defer s.mu.Unlock()
  97. e, ok := s.states[chatID]
  98. return e.state, ok
  99. }
  100. func (s *userStateStore) clear(chatID int64) {
  101. s.mu.Lock()
  102. delete(s.states, chatID)
  103. s.mu.Unlock()
  104. }
  105. func (s *userStateStore) reset() {
  106. s.mu.Lock()
  107. s.states = make(map[int64]userStateEntry)
  108. s.mu.Unlock()
  109. }
  110. // maybePrune drops conversations older than maxAge, at most once per maxAge so a
  111. // busy bot doesn't sweep the whole map on every message.
  112. func (s *userStateStore) maybePrune(maxAge time.Duration) {
  113. s.mu.Lock()
  114. defer s.mu.Unlock()
  115. now := time.Now()
  116. if now.Sub(s.lastPrune) < maxAge {
  117. return
  118. }
  119. s.lastPrune = now
  120. for id, e := range s.states {
  121. if now.Sub(e.at) > maxAge {
  122. delete(s.states, id)
  123. }
  124. }
  125. }
  126. // LoginStatus represents the result of a login attempt.
  127. type LoginStatus byte
  128. // Login status constants
  129. const (
  130. LoginSuccess LoginStatus = 1 // Login was successful
  131. LoginFail LoginStatus = 0 // Login failed
  132. EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
  133. )
  134. // LoginAttempt contains safe metadata for panel login notifications.
  135. // It intentionally does not include attempted passwords.
  136. type LoginAttempt struct {
  137. Username string
  138. IP string
  139. Time string
  140. Status LoginStatus
  141. Reason string
  142. }
  143. // Tgbot provides business logic for Telegram bot integration.
  144. // It handles bot commands, user interactions, and status reporting via Telegram.
  145. type Tgbot struct {
  146. inboundService service.InboundService
  147. clientService service.ClientService
  148. settingService service.SettingService
  149. serverService service.ServerService
  150. xrayService service.XrayService
  151. lastStatus *service.Status
  152. }
  153. // NewTgbot creates a new Tgbot instance.
  154. func (t *Tgbot) NewTgbot() *Tgbot {
  155. return new(Tgbot)
  156. }
  157. // I18nBot retrieves a localized message for the bot interface.
  158. func (t *Tgbot) I18nBot(name string, params ...string) string {
  159. return locale.I18n(locale.Bot, name, params...)
  160. }
  161. // GetHashStorage returns the hash storage instance for callback queries.
  162. func (t *Tgbot) GetHashStorage() *global.HashStorage {
  163. return hashStorage
  164. }
  165. // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
  166. func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
  167. statusCache.mutex.RLock()
  168. defer statusCache.mutex.RUnlock()
  169. if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
  170. return statusCache.data, true
  171. }
  172. return nil, false
  173. }
  174. // setCachedStatus updates the status cache
  175. func (t *Tgbot) setCachedStatus(status *service.Status) {
  176. statusCache.mutex.Lock()
  177. defer statusCache.mutex.Unlock()
  178. statusCache.data = status
  179. statusCache.timestamp = time.Now()
  180. }
  181. // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
  182. func (t *Tgbot) getCachedServerStats() (string, bool) {
  183. serverStatsCache.mutex.RLock()
  184. defer serverStatsCache.mutex.RUnlock()
  185. if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
  186. return serverStatsCache.data, true
  187. }
  188. return "", false
  189. }
  190. // setCachedServerStats updates the server stats cache
  191. func (t *Tgbot) setCachedServerStats(stats string) {
  192. serverStatsCache.mutex.Lock()
  193. defer serverStatsCache.mutex.Unlock()
  194. serverStatsCache.data = stats
  195. serverStatsCache.timestamp = time.Now()
  196. }
  197. // Start initializes and starts the Telegram bot with the provided translation files.
  198. func (t *Tgbot) Start(i18nFS embed.FS) error {
  199. // Initialize localizer
  200. err := locale.InitLocalizer(i18nFS, &t.settingService)
  201. if err != nil {
  202. return err
  203. }
  204. // If Start is called again (e.g. during reload), ensure any previous long-polling
  205. // loop is stopped before creating a new bot / receiver.
  206. StopBot()
  207. // Initialize hash storage to store callback queries
  208. hashStorage = global.NewHashStorage(20 * time.Minute)
  209. // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
  210. messageWorkerPool = make(chan struct{}, 10)
  211. // Initialize optimized HTTP client with connection pooling
  212. optimizedHTTPClient = &http.Client{
  213. Timeout: 15 * time.Second,
  214. Transport: &http.Transport{
  215. MaxIdleConns: 100,
  216. MaxIdleConnsPerHost: 10,
  217. IdleConnTimeout: 30 * time.Second,
  218. DisableKeepAlives: false,
  219. },
  220. }
  221. t.SetHostname()
  222. // Get Telegram bot token
  223. tgBotToken, err := t.settingService.GetTgBotToken()
  224. if err != nil || tgBotToken == "" {
  225. logger.Warning("Failed to get Telegram bot token:", err)
  226. return err
  227. }
  228. // Get Telegram bot chat ID(s)
  229. tgBotID, err := t.settingService.GetTgBotChatId()
  230. if err != nil {
  231. logger.Warning("Failed to get Telegram bot chat ID:", err)
  232. return err
  233. }
  234. parsedAdminIds := make([]int64, 0)
  235. // Parse admin IDs from comma-separated string
  236. if tgBotID != "" {
  237. for adminID := range strings.SplitSeq(tgBotID, ",") {
  238. id, err := strconv.ParseInt(adminID, 10, 64)
  239. if err != nil {
  240. logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
  241. return err
  242. }
  243. parsedAdminIds = append(parsedAdminIds, id)
  244. }
  245. }
  246. tgBotMutex.Lock()
  247. adminIds = parsedAdminIds
  248. tgBotMutex.Unlock()
  249. // Get Telegram bot proxy URL
  250. tgBotProxy, err := t.settingService.GetTgBotProxy()
  251. if err != nil {
  252. logger.Warning("Failed to get Telegram bot proxy URL:", err)
  253. }
  254. // Fall back to the panel-wide egress bridge when no dedicated bot proxy is
  255. // set. Resolved once at bot start: if Xray comes up later, the bot keeps
  256. // its direct connection until it is restarted.
  257. if tgBotProxy == "" {
  258. if egress := t.settingService.PanelEgressProxyURL(); egress != "" && isSupportedBotProxyScheme(egress) {
  259. tgBotProxy = egress
  260. }
  261. }
  262. // Get Telegram bot API server URL
  263. tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
  264. if err != nil {
  265. logger.Warning("Failed to get Telegram bot API server URL:", err)
  266. }
  267. // Create new Telegram bot instance
  268. bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
  269. if err != nil {
  270. logger.Error("Failed to initialize Telegram bot API:", err)
  271. return err
  272. }
  273. t.trySetBotCommands(bot)
  274. // Start receiving Telegram bot messages
  275. tgBotMutex.Lock()
  276. alreadyRunning := isRunning || botCancel != nil
  277. tgBotMutex.Unlock()
  278. if !alreadyRunning {
  279. logger.Info("Telegram bot receiver started")
  280. go t.OnReceive()
  281. }
  282. return nil
  283. }
  284. func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
  285. defer func() {
  286. if r := recover(); r != nil {
  287. logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
  288. }
  289. }()
  290. err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
  291. Commands: []telego.BotCommand{
  292. {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
  293. {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
  294. {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
  295. {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
  296. {Command: "usage", Description: t.I18nBot("tgbot.commands.usageDesc")},
  297. {Command: "inbound", Description: t.I18nBot("tgbot.commands.inboundDesc")},
  298. {Command: "restart", Description: t.I18nBot("tgbot.commands.restartDesc")},
  299. {Command: "clearall", Description: t.I18nBot("tgbot.commands.clearallDesc")},
  300. },
  301. })
  302. if err != nil {
  303. logger.Warning("Failed to set bot commands:", err)
  304. }
  305. }
  306. func isSupportedBotProxyScheme(proxyUrl string) bool {
  307. return strings.HasPrefix(proxyUrl, "socks5://") ||
  308. strings.HasPrefix(proxyUrl, "http://") ||
  309. strings.HasPrefix(proxyUrl, "https://")
  310. }
  311. // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
  312. func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
  313. client := &fasthttp.Client{
  314. // Connection timeouts
  315. ReadTimeout: 30 * time.Second,
  316. WriteTimeout: 30 * time.Second,
  317. MaxIdleConnDuration: 60 * time.Second,
  318. MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
  319. MaxIdemponentCallAttempts: 3,
  320. ReadBufferSize: 4096,
  321. WriteBufferSize: 4096,
  322. MaxConnsPerHost: 100,
  323. MaxConnWaitTimeout: 10 * time.Second,
  324. DisableHeaderNamesNormalizing: false,
  325. DisablePathNormalizing: false,
  326. // Retry on connection errors
  327. RetryIf: func(request *fasthttp.Request) bool {
  328. // Retry on connection errors for GET requests
  329. return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
  330. },
  331. }
  332. if proxyUrl != "" {
  333. if strings.HasPrefix(proxyUrl, "socks5://") {
  334. client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
  335. } else {
  336. client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
  337. }
  338. }
  339. return client
  340. }
  341. // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
  342. func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
  343. // Validate proxy URL if provided
  344. if proxyUrl != "" {
  345. if !isSupportedBotProxyScheme(proxyUrl) {
  346. logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
  347. proxyUrl = "" // Clear invalid proxy
  348. } else if _, err := url.Parse(proxyUrl); err != nil {
  349. logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
  350. proxyUrl = ""
  351. }
  352. }
  353. // Validate API server URL if provided
  354. if apiServerUrl != "" {
  355. safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
  356. if err != nil {
  357. logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
  358. apiServerUrl = ""
  359. } else {
  360. apiServerUrl = safeURL
  361. }
  362. }
  363. // Create robust fasthttp client
  364. client := t.createRobustFastHTTPClient(proxyUrl)
  365. // Build bot options
  366. var options []telego.BotOption
  367. options = append(options, telego.WithFastHTTPClient(client))
  368. if apiServerUrl != "" {
  369. options = append(options, telego.WithAPIServer(apiServerUrl))
  370. }
  371. return telego.NewBot(token, options...)
  372. }
  373. // IsRunning checks if the Telegram bot is currently running.
  374. func (t *Tgbot) IsRunning() bool {
  375. tgBotMutex.Lock()
  376. defer tgBotMutex.Unlock()
  377. return isRunning
  378. }
  379. // SetHostname sets the hostname for the bot.
  380. func (t *Tgbot) SetHostname() {
  381. host, err := os.Hostname()
  382. if err != nil {
  383. logger.Error("get hostname error:", err)
  384. hostname = ""
  385. return
  386. }
  387. hostname = host
  388. }
  389. // Stop safely stops the Telegram bot's Long Polling operation.
  390. // This method now calls the global StopBot function and cleans up other resources.
  391. func (t *Tgbot) Stop() {
  392. StopBot()
  393. logger.Info("Stop Telegram receiver ...")
  394. tgBotMutex.Lock()
  395. adminIds = nil
  396. tgBotMutex.Unlock()
  397. }
  398. // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
  399. // This is the global function called from main.go's signal handler and t.Stop().
  400. func StopBot() {
  401. // Don't hold the mutex while cancelling/waiting.
  402. tgBotMutex.Lock()
  403. cancel := botCancel
  404. botCancel = nil
  405. handler := botHandler
  406. botHandler = nil
  407. isRunning = false
  408. tgBotMutex.Unlock()
  409. userStateMgr.reset()
  410. if handler != nil {
  411. _ = handler.Stop()
  412. }
  413. if cancel != nil {
  414. logger.Info("Sending cancellation signal to Telegram bot...")
  415. // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
  416. // and lets botHandler.Start() exit cleanly.
  417. cancel()
  418. botWG.Wait()
  419. logger.Info("Telegram bot successfully stopped.")
  420. }
  421. }
  422. // encodeQuery encodes the query string if it's longer than 64 characters.
  423. func (t *Tgbot) encodeQuery(query string) string {
  424. // NOTE: we only need to hash for more than 64 chars
  425. if len(query) <= 64 {
  426. return query
  427. }
  428. return hashStorage.SaveHash(query)
  429. }
  430. // decodeQuery decodes a hashed query string back to its original form.
  431. func (t *Tgbot) decodeQuery(query string) (string, error) {
  432. if !hashStorage.IsMD5(query) {
  433. return query, nil
  434. }
  435. decoded, exists := hashStorage.GetValue(query)
  436. if !exists {
  437. return "", common.NewError("hash not found in storage!")
  438. }
  439. return decoded, nil
  440. }
  441. // randomLowerAndNum generates a random string of lowercase letters and numbers.
  442. func (t *Tgbot) randomLowerAndNum(length int) string {
  443. charset := "abcdefghijklmnopqrstuvwxyz0123456789"
  444. bytes := make([]byte, length)
  445. for i := range bytes {
  446. randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
  447. bytes[i] = charset[randomIndex.Int64()]
  448. }
  449. return string(bytes)
  450. }
  451. // int64Contains checks if an int64 slice contains a specific item.
  452. func int64Contains(slice []int64, item int64) bool {
  453. return slices.Contains(slice, item)
  454. }
  455. // isSingleWord checks if the text contains only a single word.
  456. func (t *Tgbot) isSingleWord(text string) bool {
  457. text = strings.TrimSpace(text)
  458. re := regexp.MustCompile(`\s+`)
  459. return re.MatchString(text)
  460. }