tgbot.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package tgbot
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "embed"
  6. "math/big"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  17. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  18. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  19. "github.com/mhsanaei/3x-ui/v3/internal/web/global"
  20. "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
  21. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  22. "github.com/mymmrac/telego"
  23. th "github.com/mymmrac/telego/telegohandler"
  24. "github.com/valyala/fasthttp"
  25. "github.com/valyala/fasthttp/fasthttpproxy"
  26. )
  27. var (
  28. bot *telego.Bot
  29. // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
  30. botCancel context.CancelFunc
  31. // tgBotMutex protects concurrent access to botCancel variable
  32. tgBotMutex sync.Mutex
  33. // botWG waits for the OnReceive Long Polling goroutine to finish.
  34. botWG sync.WaitGroup
  35. botHandler *th.BotHandler
  36. adminIds []int64
  37. isRunning bool
  38. hostname string
  39. hashStorage *global.HashStorage
  40. // EventBus is set from web layer to publish login/security events.
  41. EventBus *eventbus.Bus
  42. // Performance improvements
  43. messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
  44. optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
  45. // Simple cache for frequently accessed data
  46. statusCache struct {
  47. data *service.Status
  48. timestamp time.Time
  49. mutex sync.RWMutex
  50. }
  51. serverStatsCache struct {
  52. data string
  53. timestamp time.Time
  54. mutex sync.RWMutex
  55. }
  56. // clients data to adding new client. receiver_inbound_IDs is the set of
  57. // inbounds the new client will be attached to; receiver_inbound_ID mirrors
  58. // the primary pick for the legacy attach-picker entry point. Per-protocol
  59. // secrets (UUID, password, flow, method) are filled per-inbound on submit
  60. // by ClientService.fillProtocolDefaults, so the bot only tracks universal
  61. // client fields here.
  62. receiver_inbound_ID int
  63. receiver_inbound_IDs []int
  64. client_Email string
  65. client_LimitIP int
  66. client_TotalGB int64
  67. client_ExpiryTime int64
  68. client_Enable bool
  69. client_TgID string
  70. client_SubID string
  71. client_Comment string
  72. client_Reset int
  73. )
  74. var userStates = make(map[int64]string)
  75. // LoginStatus represents the result of a login attempt.
  76. type LoginStatus byte
  77. // Login status constants
  78. const (
  79. LoginSuccess LoginStatus = 1 // Login was successful
  80. LoginFail LoginStatus = 0 // Login failed
  81. EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
  82. )
  83. // LoginAttempt contains safe metadata for panel login notifications.
  84. // It intentionally does not include attempted passwords.
  85. type LoginAttempt struct {
  86. Username string
  87. IP string
  88. Time string
  89. Status LoginStatus
  90. Reason string
  91. }
  92. // Tgbot provides business logic for Telegram bot integration.
  93. // It handles bot commands, user interactions, and status reporting via Telegram.
  94. type Tgbot struct {
  95. inboundService service.InboundService
  96. clientService service.ClientService
  97. settingService service.SettingService
  98. serverService service.ServerService
  99. xrayService service.XrayService
  100. lastStatus *service.Status
  101. }
  102. // NewTgbot creates a new Tgbot instance.
  103. func (t *Tgbot) NewTgbot() *Tgbot {
  104. return new(Tgbot)
  105. }
  106. // I18nBot retrieves a localized message for the bot interface.
  107. func (t *Tgbot) I18nBot(name string, params ...string) string {
  108. return locale.I18n(locale.Bot, name, params...)
  109. }
  110. // GetHashStorage returns the hash storage instance for callback queries.
  111. func (t *Tgbot) GetHashStorage() *global.HashStorage {
  112. return hashStorage
  113. }
  114. // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
  115. func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
  116. statusCache.mutex.RLock()
  117. defer statusCache.mutex.RUnlock()
  118. if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
  119. return statusCache.data, true
  120. }
  121. return nil, false
  122. }
  123. // setCachedStatus updates the status cache
  124. func (t *Tgbot) setCachedStatus(status *service.Status) {
  125. statusCache.mutex.Lock()
  126. defer statusCache.mutex.Unlock()
  127. statusCache.data = status
  128. statusCache.timestamp = time.Now()
  129. }
  130. // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
  131. func (t *Tgbot) getCachedServerStats() (string, bool) {
  132. serverStatsCache.mutex.RLock()
  133. defer serverStatsCache.mutex.RUnlock()
  134. if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
  135. return serverStatsCache.data, true
  136. }
  137. return "", false
  138. }
  139. // setCachedServerStats updates the server stats cache
  140. func (t *Tgbot) setCachedServerStats(stats string) {
  141. serverStatsCache.mutex.Lock()
  142. defer serverStatsCache.mutex.Unlock()
  143. serverStatsCache.data = stats
  144. serverStatsCache.timestamp = time.Now()
  145. }
  146. // Start initializes and starts the Telegram bot with the provided translation files.
  147. func (t *Tgbot) Start(i18nFS embed.FS) error {
  148. // Initialize localizer
  149. err := locale.InitLocalizer(i18nFS, &t.settingService)
  150. if err != nil {
  151. return err
  152. }
  153. // If Start is called again (e.g. during reload), ensure any previous long-polling
  154. // loop is stopped before creating a new bot / receiver.
  155. StopBot()
  156. // Initialize hash storage to store callback queries
  157. hashStorage = global.NewHashStorage(20 * time.Minute)
  158. // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
  159. messageWorkerPool = make(chan struct{}, 10)
  160. // Initialize optimized HTTP client with connection pooling
  161. optimizedHTTPClient = &http.Client{
  162. Timeout: 15 * time.Second,
  163. Transport: &http.Transport{
  164. MaxIdleConns: 100,
  165. MaxIdleConnsPerHost: 10,
  166. IdleConnTimeout: 30 * time.Second,
  167. DisableKeepAlives: false,
  168. },
  169. }
  170. t.SetHostname()
  171. // Get Telegram bot token
  172. tgBotToken, err := t.settingService.GetTgBotToken()
  173. if err != nil || tgBotToken == "" {
  174. logger.Warning("Failed to get Telegram bot token:", err)
  175. return err
  176. }
  177. // Get Telegram bot chat ID(s)
  178. tgBotID, err := t.settingService.GetTgBotChatId()
  179. if err != nil {
  180. logger.Warning("Failed to get Telegram bot chat ID:", err)
  181. return err
  182. }
  183. parsedAdminIds := make([]int64, 0)
  184. // Parse admin IDs from comma-separated string
  185. if tgBotID != "" {
  186. for adminID := range strings.SplitSeq(tgBotID, ",") {
  187. id, err := strconv.ParseInt(adminID, 10, 64)
  188. if err != nil {
  189. logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
  190. return err
  191. }
  192. parsedAdminIds = append(parsedAdminIds, int64(id))
  193. }
  194. }
  195. tgBotMutex.Lock()
  196. adminIds = parsedAdminIds
  197. tgBotMutex.Unlock()
  198. // Get Telegram bot proxy URL
  199. tgBotProxy, err := t.settingService.GetTgBotProxy()
  200. if err != nil {
  201. logger.Warning("Failed to get Telegram bot proxy URL:", err)
  202. }
  203. // Fall back to the panel-wide egress bridge when no dedicated bot proxy is
  204. // set. Resolved once at bot start: if Xray comes up later, the bot keeps
  205. // its direct connection until it is restarted.
  206. if tgBotProxy == "" {
  207. if egress := t.settingService.PanelEgressProxyURL(); egress != "" && isSupportedBotProxyScheme(egress) {
  208. tgBotProxy = egress
  209. }
  210. }
  211. // Get Telegram bot API server URL
  212. tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
  213. if err != nil {
  214. logger.Warning("Failed to get Telegram bot API server URL:", err)
  215. }
  216. // Create new Telegram bot instance
  217. bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
  218. if err != nil {
  219. logger.Error("Failed to initialize Telegram bot API:", err)
  220. return err
  221. }
  222. t.trySetBotCommands(bot)
  223. // Start receiving Telegram bot messages
  224. tgBotMutex.Lock()
  225. alreadyRunning := isRunning || botCancel != nil
  226. tgBotMutex.Unlock()
  227. if !alreadyRunning {
  228. logger.Info("Telegram bot receiver started")
  229. go t.OnReceive()
  230. }
  231. return nil
  232. }
  233. func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
  234. defer func() {
  235. if r := recover(); r != nil {
  236. logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
  237. }
  238. }()
  239. err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
  240. Commands: []telego.BotCommand{
  241. {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
  242. {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
  243. {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
  244. {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
  245. },
  246. })
  247. if err != nil {
  248. logger.Warning("Failed to set bot commands:", err)
  249. }
  250. }
  251. func isSupportedBotProxyScheme(proxyUrl string) bool {
  252. return strings.HasPrefix(proxyUrl, "socks5://") ||
  253. strings.HasPrefix(proxyUrl, "http://") ||
  254. strings.HasPrefix(proxyUrl, "https://")
  255. }
  256. // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
  257. func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
  258. client := &fasthttp.Client{
  259. // Connection timeouts
  260. ReadTimeout: 30 * time.Second,
  261. WriteTimeout: 30 * time.Second,
  262. MaxIdleConnDuration: 60 * time.Second,
  263. MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
  264. MaxIdemponentCallAttempts: 3,
  265. ReadBufferSize: 4096,
  266. WriteBufferSize: 4096,
  267. MaxConnsPerHost: 100,
  268. MaxConnWaitTimeout: 10 * time.Second,
  269. DisableHeaderNamesNormalizing: false,
  270. DisablePathNormalizing: false,
  271. // Retry on connection errors
  272. RetryIf: func(request *fasthttp.Request) bool {
  273. // Retry on connection errors for GET requests
  274. return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
  275. },
  276. }
  277. if proxyUrl != "" {
  278. if strings.HasPrefix(proxyUrl, "socks5://") {
  279. client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
  280. } else {
  281. client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
  282. }
  283. }
  284. return client
  285. }
  286. // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
  287. func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
  288. // Validate proxy URL if provided
  289. if proxyUrl != "" {
  290. if !isSupportedBotProxyScheme(proxyUrl) {
  291. logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
  292. proxyUrl = "" // Clear invalid proxy
  293. } else if _, err := url.Parse(proxyUrl); err != nil {
  294. logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
  295. proxyUrl = ""
  296. }
  297. }
  298. // Validate API server URL if provided
  299. if apiServerUrl != "" {
  300. safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
  301. if err != nil {
  302. logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
  303. apiServerUrl = ""
  304. } else {
  305. apiServerUrl = safeURL
  306. }
  307. }
  308. // Create robust fasthttp client
  309. client := t.createRobustFastHTTPClient(proxyUrl)
  310. // Build bot options
  311. var options []telego.BotOption
  312. options = append(options, telego.WithFastHTTPClient(client))
  313. if apiServerUrl != "" {
  314. options = append(options, telego.WithAPIServer(apiServerUrl))
  315. }
  316. return telego.NewBot(token, options...)
  317. }
  318. // IsRunning checks if the Telegram bot is currently running.
  319. func (t *Tgbot) IsRunning() bool {
  320. tgBotMutex.Lock()
  321. defer tgBotMutex.Unlock()
  322. return isRunning
  323. }
  324. // SetHostname sets the hostname for the bot.
  325. func (t *Tgbot) SetHostname() {
  326. host, err := os.Hostname()
  327. if err != nil {
  328. logger.Error("get hostname error:", err)
  329. hostname = ""
  330. return
  331. }
  332. hostname = host
  333. }
  334. // Stop safely stops the Telegram bot's Long Polling operation.
  335. // This method now calls the global StopBot function and cleans up other resources.
  336. func (t *Tgbot) Stop() {
  337. StopBot()
  338. logger.Info("Stop Telegram receiver ...")
  339. tgBotMutex.Lock()
  340. adminIds = nil
  341. tgBotMutex.Unlock()
  342. }
  343. // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
  344. // This is the global function called from main.go's signal handler and t.Stop().
  345. func StopBot() {
  346. // Don't hold the mutex while cancelling/waiting.
  347. tgBotMutex.Lock()
  348. cancel := botCancel
  349. botCancel = nil
  350. handler := botHandler
  351. botHandler = nil
  352. isRunning = false
  353. tgBotMutex.Unlock()
  354. if handler != nil {
  355. handler.Stop()
  356. }
  357. if cancel != nil {
  358. logger.Info("Sending cancellation signal to Telegram bot...")
  359. // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
  360. // and lets botHandler.Start() exit cleanly.
  361. cancel()
  362. botWG.Wait()
  363. logger.Info("Telegram bot successfully stopped.")
  364. }
  365. }
  366. // encodeQuery encodes the query string if it's longer than 64 characters.
  367. func (t *Tgbot) encodeQuery(query string) string {
  368. // NOTE: we only need to hash for more than 64 chars
  369. if len(query) <= 64 {
  370. return query
  371. }
  372. return hashStorage.SaveHash(query)
  373. }
  374. // decodeQuery decodes a hashed query string back to its original form.
  375. func (t *Tgbot) decodeQuery(query string) (string, error) {
  376. if !hashStorage.IsMD5(query) {
  377. return query, nil
  378. }
  379. decoded, exists := hashStorage.GetValue(query)
  380. if !exists {
  381. return "", common.NewError("hash not found in storage!")
  382. }
  383. return decoded, nil
  384. }
  385. // randomLowerAndNum generates a random string of lowercase letters and numbers.
  386. func (t *Tgbot) randomLowerAndNum(length int) string {
  387. charset := "abcdefghijklmnopqrstuvwxyz0123456789"
  388. bytes := make([]byte, length)
  389. for i := range bytes {
  390. randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
  391. bytes[i] = charset[randomIndex.Int64()]
  392. }
  393. return string(bytes)
  394. }
  395. // int64Contains checks if an int64 slice contains a specific item.
  396. func int64Contains(slice []int64, item int64) bool {
  397. return slices.Contains(slice, item)
  398. }
  399. // isSingleWord checks if the text contains only a single word.
  400. func (t *Tgbot) isSingleWord(text string) bool {
  401. text = strings.TrimSpace(text)
  402. re := regexp.MustCompile(`\s+`)
  403. return re.MatchString(text)
  404. }