tgbot.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. package tgbot
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "embed"
  6. "math/big"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  17. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  19. "github.com/mhsanaei/3x-ui/v3/internal/web/global"
  20. "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
  21. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  22. "github.com/mymmrac/telego"
  23. th "github.com/mymmrac/telego/telegohandler"
  24. "github.com/valyala/fasthttp"
  25. "github.com/valyala/fasthttp/fasthttpproxy"
  26. )
  27. var (
  28. bot *telego.Bot
  29. // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
  30. botCancel context.CancelFunc
  31. // tgBotMutex protects concurrent access to botCancel variable
  32. tgBotMutex sync.Mutex
  33. // botWG waits for the OnReceive Long Polling goroutine to finish.
  34. botWG sync.WaitGroup
  35. botHandler *th.BotHandler
  36. adminIds []int64
  37. isRunning bool
  38. hostname string
  39. hashStorage *global.HashStorage
  40. // EventBus is set from web layer to publish login/security events.
  41. EventBus *eventbus.Bus
  42. // Performance improvements
  43. messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
  44. optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
  45. // Simple cache for frequently accessed data
  46. statusCache struct {
  47. data *service.Status
  48. timestamp time.Time
  49. mutex sync.RWMutex
  50. }
  51. serverStatsCache struct {
  52. data string
  53. timestamp time.Time
  54. mutex sync.RWMutex
  55. }
  56. // clients data to adding new client. receiver_inbound_IDs is the set of
  57. // inbounds the new client will be attached to; receiver_inbound_ID mirrors
  58. // the primary pick for the legacy attach-picker entry point. Per-protocol
  59. // secrets (UUID, password, flow, method) are filled per-inbound on submit
  60. // by ClientService.fillProtocolDefaults, so the bot only tracks universal
  61. // client fields here.
  62. receiver_inbound_ID int
  63. receiver_inbound_IDs []int
  64. client_Email string
  65. client_LimitIP int
  66. client_TotalGB int64
  67. client_ExpiryTime int64
  68. client_Enable bool
  69. client_TgID string
  70. client_SubID string
  71. client_Comment string
  72. client_Reset int
  73. )
  74. // userStateStore guards the per-chat conversation states. The Telegram command
  75. // and callback handlers run on a worker-pool goroutine while the message handler
  76. // runs on the dispatch goroutine, so a bare map would be a concurrent-map-write
  77. // crash. It also expires abandoned conversations so a user who starts a flow and
  78. // goes silent doesn't leave an entry forever.
  79. type userStateStore struct {
  80. mu sync.Mutex
  81. states map[int64]userStateEntry
  82. lastPrune time.Time
  83. }
  84. type userStateEntry struct {
  85. state string
  86. at time.Time
  87. }
  88. var userStateMgr = &userStateStore{states: make(map[int64]userStateEntry)}
  89. func (s *userStateStore) set(chatID int64, state string) {
  90. s.mu.Lock()
  91. s.states[chatID] = userStateEntry{state: state, at: time.Now()}
  92. s.mu.Unlock()
  93. }
  94. func (s *userStateStore) get(chatID int64) (string, bool) {
  95. s.mu.Lock()
  96. defer s.mu.Unlock()
  97. e, ok := s.states[chatID]
  98. return e.state, ok
  99. }
  100. func (s *userStateStore) clear(chatID int64) {
  101. s.mu.Lock()
  102. delete(s.states, chatID)
  103. s.mu.Unlock()
  104. }
  105. func (s *userStateStore) reset() {
  106. s.mu.Lock()
  107. s.states = make(map[int64]userStateEntry)
  108. s.mu.Unlock()
  109. }
  110. // maybePrune drops conversations older than maxAge, at most once per maxAge so a
  111. // busy bot doesn't sweep the whole map on every message.
  112. func (s *userStateStore) maybePrune(maxAge time.Duration) {
  113. s.mu.Lock()
  114. defer s.mu.Unlock()
  115. now := time.Now()
  116. if now.Sub(s.lastPrune) < maxAge {
  117. return
  118. }
  119. s.lastPrune = now
  120. for id, e := range s.states {
  121. if now.Sub(e.at) > maxAge {
  122. delete(s.states, id)
  123. }
  124. }
  125. }
  126. // LoginStatus represents the result of a login attempt.
  127. type LoginStatus byte
  128. // Login status constants
  129. const (
  130. LoginSuccess LoginStatus = 1 // Login was successful
  131. LoginFail LoginStatus = 0 // Login failed
  132. EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
  133. )
  134. // LoginAttempt contains safe metadata for panel login notifications.
  135. // It intentionally does not include attempted passwords.
  136. type LoginAttempt struct {
  137. Username string
  138. IP string
  139. Time string
  140. Status LoginStatus
  141. Reason string
  142. }
  143. // Tgbot provides business logic for Telegram bot integration.
  144. // It handles bot commands, user interactions, and status reporting via Telegram.
  145. type Tgbot struct {
  146. inboundService service.InboundService
  147. clientService service.ClientService
  148. settingService service.SettingService
  149. serverService service.ServerService
  150. xrayService service.XrayService
  151. lastStatus *service.Status
  152. }
  153. // NewTgbot creates a new Tgbot instance.
  154. func (t *Tgbot) NewTgbot() *Tgbot {
  155. return new(Tgbot)
  156. }
  157. // I18nBot retrieves a localized message for the bot interface.
  158. func (t *Tgbot) I18nBot(name string, params ...string) string {
  159. return locale.I18n(locale.Bot, name, params...)
  160. }
  161. // GetHashStorage returns the hash storage instance for callback queries.
  162. func (t *Tgbot) GetHashStorage() *global.HashStorage {
  163. return hashStorage
  164. }
  165. // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
  166. func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
  167. statusCache.mutex.RLock()
  168. defer statusCache.mutex.RUnlock()
  169. if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
  170. return statusCache.data, true
  171. }
  172. return nil, false
  173. }
  174. // setCachedStatus updates the status cache
  175. func (t *Tgbot) setCachedStatus(status *service.Status) {
  176. statusCache.mutex.Lock()
  177. defer statusCache.mutex.Unlock()
  178. statusCache.data = status
  179. statusCache.timestamp = time.Now()
  180. }
  181. // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
  182. func (t *Tgbot) getCachedServerStats() (string, bool) {
  183. serverStatsCache.mutex.RLock()
  184. defer serverStatsCache.mutex.RUnlock()
  185. if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
  186. return serverStatsCache.data, true
  187. }
  188. return "", false
  189. }
  190. // setCachedServerStats updates the server stats cache
  191. func (t *Tgbot) setCachedServerStats(stats string) {
  192. serverStatsCache.mutex.Lock()
  193. defer serverStatsCache.mutex.Unlock()
  194. serverStatsCache.data = stats
  195. serverStatsCache.timestamp = time.Now()
  196. }
  197. // Start initializes and starts the Telegram bot with the provided translation files.
  198. func (t *Tgbot) Start(i18nFS embed.FS) error {
  199. // Initialize localizer
  200. err := locale.InitLocalizer(i18nFS, &t.settingService)
  201. if err != nil {
  202. return err
  203. }
  204. // If Start is called again (e.g. during reload), ensure any previous long-polling
  205. // loop is stopped before creating a new bot / receiver.
  206. StopBot()
  207. // Initialize hash storage to store callback queries
  208. hashStorage = global.NewHashStorage(20 * time.Minute)
  209. // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
  210. messageWorkerPool = make(chan struct{}, 10)
  211. // Initialize optimized HTTP client with connection pooling
  212. optimizedHTTPClient = &http.Client{
  213. Timeout: 15 * time.Second,
  214. Transport: &http.Transport{
  215. MaxIdleConns: 100,
  216. MaxIdleConnsPerHost: 10,
  217. IdleConnTimeout: 30 * time.Second,
  218. DisableKeepAlives: false,
  219. },
  220. }
  221. t.SetHostname()
  222. // Get Telegram bot token
  223. tgBotToken, err := t.settingService.GetTgBotToken()
  224. if err != nil || tgBotToken == "" {
  225. logger.Warning("Failed to get Telegram bot token:", err)
  226. return err
  227. }
  228. // Get Telegram bot chat ID(s)
  229. tgBotID, err := t.settingService.GetTgBotChatId()
  230. if err != nil {
  231. logger.Warning("Failed to get Telegram bot chat ID:", err)
  232. return err
  233. }
  234. parsedAdminIds := make([]int64, 0)
  235. // Parse admin IDs from comma-separated string
  236. if tgBotID != "" {
  237. for adminID := range strings.SplitSeq(tgBotID, ",") {
  238. id, err := strconv.ParseInt(adminID, 10, 64)
  239. if err != nil {
  240. logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
  241. return err
  242. }
  243. parsedAdminIds = append(parsedAdminIds, int64(id))
  244. }
  245. }
  246. tgBotMutex.Lock()
  247. adminIds = parsedAdminIds
  248. tgBotMutex.Unlock()
  249. // Get Telegram bot proxy URL
  250. tgBotProxy, err := t.settingService.GetTgBotProxy()
  251. if err != nil {
  252. logger.Warning("Failed to get Telegram bot proxy URL:", err)
  253. }
  254. // Fall back to the panel-wide egress bridge when no dedicated bot proxy is
  255. // set. Resolved once at bot start: if Xray comes up later, the bot keeps
  256. // its direct connection until it is restarted.
  257. if tgBotProxy == "" {
  258. if egress := t.settingService.PanelEgressProxyURL(); egress != "" && isSupportedBotProxyScheme(egress) {
  259. tgBotProxy = egress
  260. }
  261. }
  262. // Get Telegram bot API server URL
  263. tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
  264. if err != nil {
  265. logger.Warning("Failed to get Telegram bot API server URL:", err)
  266. }
  267. // Create new Telegram bot instance
  268. bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
  269. if err != nil {
  270. logger.Error("Failed to initialize Telegram bot API:", err)
  271. return err
  272. }
  273. t.trySetBotCommands(bot)
  274. // Start receiving Telegram bot messages
  275. tgBotMutex.Lock()
  276. alreadyRunning := isRunning || botCancel != nil
  277. tgBotMutex.Unlock()
  278. if !alreadyRunning {
  279. logger.Info("Telegram bot receiver started")
  280. go t.OnReceive()
  281. }
  282. return nil
  283. }
  284. func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
  285. defer func() {
  286. if r := recover(); r != nil {
  287. logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
  288. }
  289. }()
  290. err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
  291. Commands: []telego.BotCommand{
  292. {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
  293. {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
  294. {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
  295. {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
  296. },
  297. })
  298. if err != nil {
  299. logger.Warning("Failed to set bot commands:", err)
  300. }
  301. }
  302. func isSupportedBotProxyScheme(proxyUrl string) bool {
  303. return strings.HasPrefix(proxyUrl, "socks5://") ||
  304. strings.HasPrefix(proxyUrl, "http://") ||
  305. strings.HasPrefix(proxyUrl, "https://")
  306. }
  307. // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
  308. func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
  309. client := &fasthttp.Client{
  310. // Connection timeouts
  311. ReadTimeout: 30 * time.Second,
  312. WriteTimeout: 30 * time.Second,
  313. MaxIdleConnDuration: 60 * time.Second,
  314. MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
  315. MaxIdemponentCallAttempts: 3,
  316. ReadBufferSize: 4096,
  317. WriteBufferSize: 4096,
  318. MaxConnsPerHost: 100,
  319. MaxConnWaitTimeout: 10 * time.Second,
  320. DisableHeaderNamesNormalizing: false,
  321. DisablePathNormalizing: false,
  322. // Retry on connection errors
  323. RetryIf: func(request *fasthttp.Request) bool {
  324. // Retry on connection errors for GET requests
  325. return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
  326. },
  327. }
  328. if proxyUrl != "" {
  329. if strings.HasPrefix(proxyUrl, "socks5://") {
  330. client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
  331. } else {
  332. client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
  333. }
  334. }
  335. return client
  336. }
  337. // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
  338. func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
  339. // Validate proxy URL if provided
  340. if proxyUrl != "" {
  341. if !isSupportedBotProxyScheme(proxyUrl) {
  342. logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
  343. proxyUrl = "" // Clear invalid proxy
  344. } else if _, err := url.Parse(proxyUrl); err != nil {
  345. logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
  346. proxyUrl = ""
  347. }
  348. }
  349. // Validate API server URL if provided
  350. if apiServerUrl != "" {
  351. safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
  352. if err != nil {
  353. logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
  354. apiServerUrl = ""
  355. } else {
  356. apiServerUrl = safeURL
  357. }
  358. }
  359. // Create robust fasthttp client
  360. client := t.createRobustFastHTTPClient(proxyUrl)
  361. // Build bot options
  362. var options []telego.BotOption
  363. options = append(options, telego.WithFastHTTPClient(client))
  364. if apiServerUrl != "" {
  365. options = append(options, telego.WithAPIServer(apiServerUrl))
  366. }
  367. return telego.NewBot(token, options...)
  368. }
  369. // IsRunning checks if the Telegram bot is currently running.
  370. func (t *Tgbot) IsRunning() bool {
  371. tgBotMutex.Lock()
  372. defer tgBotMutex.Unlock()
  373. return isRunning
  374. }
  375. // SetHostname sets the hostname for the bot.
  376. func (t *Tgbot) SetHostname() {
  377. host, err := os.Hostname()
  378. if err != nil {
  379. logger.Error("get hostname error:", err)
  380. hostname = ""
  381. return
  382. }
  383. hostname = host
  384. }
  385. // Stop safely stops the Telegram bot's Long Polling operation.
  386. // This method now calls the global StopBot function and cleans up other resources.
  387. func (t *Tgbot) Stop() {
  388. StopBot()
  389. logger.Info("Stop Telegram receiver ...")
  390. tgBotMutex.Lock()
  391. adminIds = nil
  392. tgBotMutex.Unlock()
  393. }
  394. // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
  395. // This is the global function called from main.go's signal handler and t.Stop().
  396. func StopBot() {
  397. // Don't hold the mutex while cancelling/waiting.
  398. tgBotMutex.Lock()
  399. cancel := botCancel
  400. botCancel = nil
  401. handler := botHandler
  402. botHandler = nil
  403. isRunning = false
  404. tgBotMutex.Unlock()
  405. userStateMgr.reset()
  406. if handler != nil {
  407. handler.Stop()
  408. }
  409. if cancel != nil {
  410. logger.Info("Sending cancellation signal to Telegram bot...")
  411. // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
  412. // and lets botHandler.Start() exit cleanly.
  413. cancel()
  414. botWG.Wait()
  415. logger.Info("Telegram bot successfully stopped.")
  416. }
  417. }
  418. // encodeQuery encodes the query string if it's longer than 64 characters.
  419. func (t *Tgbot) encodeQuery(query string) string {
  420. // NOTE: we only need to hash for more than 64 chars
  421. if len(query) <= 64 {
  422. return query
  423. }
  424. return hashStorage.SaveHash(query)
  425. }
  426. // decodeQuery decodes a hashed query string back to its original form.
  427. func (t *Tgbot) decodeQuery(query string) (string, error) {
  428. if !hashStorage.IsMD5(query) {
  429. return query, nil
  430. }
  431. decoded, exists := hashStorage.GetValue(query)
  432. if !exists {
  433. return "", common.NewError("hash not found in storage!")
  434. }
  435. return decoded, nil
  436. }
  437. // randomLowerAndNum generates a random string of lowercase letters and numbers.
  438. func (t *Tgbot) randomLowerAndNum(length int) string {
  439. charset := "abcdefghijklmnopqrstuvwxyz0123456789"
  440. bytes := make([]byte, length)
  441. for i := range bytes {
  442. randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
  443. bytes[i] = charset[randomIndex.Int64()]
  444. }
  445. return string(bytes)
  446. }
  447. // int64Contains checks if an int64 slice contains a specific item.
  448. func int64Contains(slice []int64, item int64) bool {
  449. return slices.Contains(slice, item)
  450. }
  451. // isSingleWord checks if the text contains only a single word.
  452. func (t *Tgbot) isSingleWord(text string) bool {
  453. text = strings.TrimSpace(text)
  454. re := regexp.MustCompile(`\s+`)
  455. return re.MatchString(text)
  456. }