tgbot.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. package tgbot
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "embed"
  6. "math/big"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  17. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  18. "github.com/mhsanaei/3x-ui/v3/internal/web/global"
  19. "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
  20. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  21. "github.com/mymmrac/telego"
  22. th "github.com/mymmrac/telego/telegohandler"
  23. "github.com/valyala/fasthttp"
  24. "github.com/valyala/fasthttp/fasthttpproxy"
  25. )
  26. var (
  27. bot *telego.Bot
  28. // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
  29. botCancel context.CancelFunc
  30. // tgBotMutex protects concurrent access to botCancel variable
  31. tgBotMutex sync.Mutex
  32. // botWG waits for the OnReceive Long Polling goroutine to finish.
  33. botWG sync.WaitGroup
  34. botHandler *th.BotHandler
  35. adminIds []int64
  36. isRunning bool
  37. hostname string
  38. hashStorage *global.HashStorage
  39. // Performance improvements
  40. messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
  41. optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
  42. // Simple cache for frequently accessed data
  43. statusCache struct {
  44. data *service.Status
  45. timestamp time.Time
  46. mutex sync.RWMutex
  47. }
  48. serverStatsCache struct {
  49. data string
  50. timestamp time.Time
  51. mutex sync.RWMutex
  52. }
  53. // clients data to adding new client. receiver_inbound_IDs is the set of
  54. // inbounds the new client will be attached to; receiver_inbound_ID mirrors
  55. // the primary pick for the legacy attach-picker entry point. Per-protocol
  56. // secrets (UUID, password, flow, method) are filled per-inbound on submit
  57. // by ClientService.fillProtocolDefaults, so the bot only tracks universal
  58. // client fields here.
  59. receiver_inbound_ID int
  60. receiver_inbound_IDs []int
  61. client_Email string
  62. client_LimitIP int
  63. client_TotalGB int64
  64. client_ExpiryTime int64
  65. client_Enable bool
  66. client_TgID string
  67. client_SubID string
  68. client_Comment string
  69. client_Reset int
  70. )
  71. var userStates = make(map[int64]string)
  72. // LoginStatus represents the result of a login attempt.
  73. type LoginStatus byte
  74. // Login status constants
  75. const (
  76. LoginSuccess LoginStatus = 1 // Login was successful
  77. LoginFail LoginStatus = 0 // Login failed
  78. EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
  79. )
  80. // LoginAttempt contains safe metadata for panel login notifications.
  81. // It intentionally does not include attempted passwords.
  82. type LoginAttempt struct {
  83. Username string
  84. IP string
  85. Time string
  86. Status LoginStatus
  87. Reason string
  88. }
  89. // Tgbot provides business logic for Telegram bot integration.
  90. // It handles bot commands, user interactions, and status reporting via Telegram.
  91. type Tgbot struct {
  92. inboundService service.InboundService
  93. clientService service.ClientService
  94. settingService service.SettingService
  95. serverService service.ServerService
  96. xrayService service.XrayService
  97. lastStatus *service.Status
  98. }
  99. // NewTgbot creates a new Tgbot instance.
  100. func (t *Tgbot) NewTgbot() *Tgbot {
  101. return new(Tgbot)
  102. }
  103. // I18nBot retrieves a localized message for the bot interface.
  104. func (t *Tgbot) I18nBot(name string, params ...string) string {
  105. return locale.I18n(locale.Bot, name, params...)
  106. }
  107. // GetHashStorage returns the hash storage instance for callback queries.
  108. func (t *Tgbot) GetHashStorage() *global.HashStorage {
  109. return hashStorage
  110. }
  111. // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
  112. func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
  113. statusCache.mutex.RLock()
  114. defer statusCache.mutex.RUnlock()
  115. if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
  116. return statusCache.data, true
  117. }
  118. return nil, false
  119. }
  120. // setCachedStatus updates the status cache
  121. func (t *Tgbot) setCachedStatus(status *service.Status) {
  122. statusCache.mutex.Lock()
  123. defer statusCache.mutex.Unlock()
  124. statusCache.data = status
  125. statusCache.timestamp = time.Now()
  126. }
  127. // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
  128. func (t *Tgbot) getCachedServerStats() (string, bool) {
  129. serverStatsCache.mutex.RLock()
  130. defer serverStatsCache.mutex.RUnlock()
  131. if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
  132. return serverStatsCache.data, true
  133. }
  134. return "", false
  135. }
  136. // setCachedServerStats updates the server stats cache
  137. func (t *Tgbot) setCachedServerStats(stats string) {
  138. serverStatsCache.mutex.Lock()
  139. defer serverStatsCache.mutex.Unlock()
  140. serverStatsCache.data = stats
  141. serverStatsCache.timestamp = time.Now()
  142. }
  143. // Start initializes and starts the Telegram bot with the provided translation files.
  144. func (t *Tgbot) Start(i18nFS embed.FS) error {
  145. // Initialize localizer
  146. err := locale.InitLocalizer(i18nFS, &t.settingService)
  147. if err != nil {
  148. return err
  149. }
  150. // If Start is called again (e.g. during reload), ensure any previous long-polling
  151. // loop is stopped before creating a new bot / receiver.
  152. StopBot()
  153. // Initialize hash storage to store callback queries
  154. hashStorage = global.NewHashStorage(20 * time.Minute)
  155. // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
  156. messageWorkerPool = make(chan struct{}, 10)
  157. // Initialize optimized HTTP client with connection pooling
  158. optimizedHTTPClient = &http.Client{
  159. Timeout: 15 * time.Second,
  160. Transport: &http.Transport{
  161. MaxIdleConns: 100,
  162. MaxIdleConnsPerHost: 10,
  163. IdleConnTimeout: 30 * time.Second,
  164. DisableKeepAlives: false,
  165. },
  166. }
  167. t.SetHostname()
  168. // Get Telegram bot token
  169. tgBotToken, err := t.settingService.GetTgBotToken()
  170. if err != nil || tgBotToken == "" {
  171. logger.Warning("Failed to get Telegram bot token:", err)
  172. return err
  173. }
  174. // Get Telegram bot chat ID(s)
  175. tgBotID, err := t.settingService.GetTgBotChatId()
  176. if err != nil {
  177. logger.Warning("Failed to get Telegram bot chat ID:", err)
  178. return err
  179. }
  180. parsedAdminIds := make([]int64, 0)
  181. // Parse admin IDs from comma-separated string
  182. if tgBotID != "" {
  183. for adminID := range strings.SplitSeq(tgBotID, ",") {
  184. id, err := strconv.ParseInt(adminID, 10, 64)
  185. if err != nil {
  186. logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
  187. return err
  188. }
  189. parsedAdminIds = append(parsedAdminIds, int64(id))
  190. }
  191. }
  192. tgBotMutex.Lock()
  193. adminIds = parsedAdminIds
  194. tgBotMutex.Unlock()
  195. // Get Telegram bot proxy URL
  196. tgBotProxy, err := t.settingService.GetTgBotProxy()
  197. if err != nil {
  198. logger.Warning("Failed to get Telegram bot proxy URL:", err)
  199. }
  200. // Fall back to the panel-wide egress bridge when no dedicated bot proxy is
  201. // set. Resolved once at bot start: if Xray comes up later, the bot keeps
  202. // its direct connection until it is restarted.
  203. if tgBotProxy == "" {
  204. if egress := t.settingService.PanelEgressProxyURL(); egress != "" && isSupportedBotProxyScheme(egress) {
  205. tgBotProxy = egress
  206. }
  207. }
  208. // Get Telegram bot API server URL
  209. tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
  210. if err != nil {
  211. logger.Warning("Failed to get Telegram bot API server URL:", err)
  212. }
  213. // Create new Telegram bot instance
  214. bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
  215. if err != nil {
  216. logger.Error("Failed to initialize Telegram bot API:", err)
  217. return err
  218. }
  219. t.trySetBotCommands(bot)
  220. // Start receiving Telegram bot messages
  221. tgBotMutex.Lock()
  222. alreadyRunning := isRunning || botCancel != nil
  223. tgBotMutex.Unlock()
  224. if !alreadyRunning {
  225. logger.Info("Telegram bot receiver started")
  226. go t.OnReceive()
  227. }
  228. return nil
  229. }
  230. func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
  231. defer func() {
  232. if r := recover(); r != nil {
  233. logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
  234. }
  235. }()
  236. err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
  237. Commands: []telego.BotCommand{
  238. {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
  239. {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
  240. {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
  241. {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
  242. },
  243. })
  244. if err != nil {
  245. logger.Warning("Failed to set bot commands:", err)
  246. }
  247. }
  248. func isSupportedBotProxyScheme(proxyUrl string) bool {
  249. return strings.HasPrefix(proxyUrl, "socks5://") ||
  250. strings.HasPrefix(proxyUrl, "http://") ||
  251. strings.HasPrefix(proxyUrl, "https://")
  252. }
  253. // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
  254. func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
  255. client := &fasthttp.Client{
  256. // Connection timeouts
  257. ReadTimeout: 30 * time.Second,
  258. WriteTimeout: 30 * time.Second,
  259. MaxIdleConnDuration: 60 * time.Second,
  260. MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
  261. MaxIdemponentCallAttempts: 3,
  262. ReadBufferSize: 4096,
  263. WriteBufferSize: 4096,
  264. MaxConnsPerHost: 100,
  265. MaxConnWaitTimeout: 10 * time.Second,
  266. DisableHeaderNamesNormalizing: false,
  267. DisablePathNormalizing: false,
  268. // Retry on connection errors
  269. RetryIf: func(request *fasthttp.Request) bool {
  270. // Retry on connection errors for GET requests
  271. return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
  272. },
  273. }
  274. if proxyUrl != "" {
  275. if strings.HasPrefix(proxyUrl, "socks5://") {
  276. client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
  277. } else {
  278. client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
  279. }
  280. }
  281. return client
  282. }
  283. // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
  284. func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
  285. // Validate proxy URL if provided
  286. if proxyUrl != "" {
  287. if !isSupportedBotProxyScheme(proxyUrl) {
  288. logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
  289. proxyUrl = "" // Clear invalid proxy
  290. } else if _, err := url.Parse(proxyUrl); err != nil {
  291. logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
  292. proxyUrl = ""
  293. }
  294. }
  295. // Validate API server URL if provided
  296. if apiServerUrl != "" {
  297. safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
  298. if err != nil {
  299. logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
  300. apiServerUrl = ""
  301. } else {
  302. apiServerUrl = safeURL
  303. }
  304. }
  305. // Create robust fasthttp client
  306. client := t.createRobustFastHTTPClient(proxyUrl)
  307. // Build bot options
  308. var options []telego.BotOption
  309. options = append(options, telego.WithFastHTTPClient(client))
  310. if apiServerUrl != "" {
  311. options = append(options, telego.WithAPIServer(apiServerUrl))
  312. }
  313. return telego.NewBot(token, options...)
  314. }
  315. // IsRunning checks if the Telegram bot is currently running.
  316. func (t *Tgbot) IsRunning() bool {
  317. tgBotMutex.Lock()
  318. defer tgBotMutex.Unlock()
  319. return isRunning
  320. }
  321. // SetHostname sets the hostname for the bot.
  322. func (t *Tgbot) SetHostname() {
  323. host, err := os.Hostname()
  324. if err != nil {
  325. logger.Error("get hostname error:", err)
  326. hostname = ""
  327. return
  328. }
  329. hostname = host
  330. }
  331. // Stop safely stops the Telegram bot's Long Polling operation.
  332. // This method now calls the global StopBot function and cleans up other resources.
  333. func (t *Tgbot) Stop() {
  334. StopBot()
  335. logger.Info("Stop Telegram receiver ...")
  336. tgBotMutex.Lock()
  337. adminIds = nil
  338. tgBotMutex.Unlock()
  339. }
  340. // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
  341. // This is the global function called from main.go's signal handler and t.Stop().
  342. func StopBot() {
  343. // Don't hold the mutex while cancelling/waiting.
  344. tgBotMutex.Lock()
  345. cancel := botCancel
  346. botCancel = nil
  347. handler := botHandler
  348. botHandler = nil
  349. isRunning = false
  350. tgBotMutex.Unlock()
  351. if handler != nil {
  352. handler.Stop()
  353. }
  354. if cancel != nil {
  355. logger.Info("Sending cancellation signal to Telegram bot...")
  356. // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
  357. // and lets botHandler.Start() exit cleanly.
  358. cancel()
  359. botWG.Wait()
  360. logger.Info("Telegram bot successfully stopped.")
  361. }
  362. }
  363. // encodeQuery encodes the query string if it's longer than 64 characters.
  364. func (t *Tgbot) encodeQuery(query string) string {
  365. // NOTE: we only need to hash for more than 64 chars
  366. if len(query) <= 64 {
  367. return query
  368. }
  369. return hashStorage.SaveHash(query)
  370. }
  371. // decodeQuery decodes a hashed query string back to its original form.
  372. func (t *Tgbot) decodeQuery(query string) (string, error) {
  373. if !hashStorage.IsMD5(query) {
  374. return query, nil
  375. }
  376. decoded, exists := hashStorage.GetValue(query)
  377. if !exists {
  378. return "", common.NewError("hash not found in storage!")
  379. }
  380. return decoded, nil
  381. }
  382. // randomLowerAndNum generates a random string of lowercase letters and numbers.
  383. func (t *Tgbot) randomLowerAndNum(length int) string {
  384. charset := "abcdefghijklmnopqrstuvwxyz0123456789"
  385. bytes := make([]byte, length)
  386. for i := range bytes {
  387. randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
  388. bytes[i] = charset[randomIndex.Int64()]
  389. }
  390. return string(bytes)
  391. }
  392. // int64Contains checks if an int64 slice contains a specific item.
  393. func int64Contains(slice []int64, item int64) bool {
  394. return slices.Contains(slice, item)
  395. }
  396. // isSingleWord checks if the text contains only a single word.
  397. func (t *Tgbot) isSingleWord(text string) bool {
  398. text = strings.TrimSpace(text)
  399. re := regexp.MustCompile(`\s+`)
  400. return re.MatchString(text)
  401. }