tgbot.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. package tgbot
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "embed"
  6. "math/big"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  17. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  18. "github.com/mhsanaei/3x-ui/v3/internal/web/global"
  19. "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
  20. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  21. "github.com/mymmrac/telego"
  22. th "github.com/mymmrac/telego/telegohandler"
  23. "github.com/valyala/fasthttp"
  24. "github.com/valyala/fasthttp/fasthttpproxy"
  25. )
  26. var (
  27. bot *telego.Bot
  28. // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
  29. botCancel context.CancelFunc
  30. // tgBotMutex protects concurrent access to botCancel variable
  31. tgBotMutex sync.Mutex
  32. // botWG waits for the OnReceive Long Polling goroutine to finish.
  33. botWG sync.WaitGroup
  34. botHandler *th.BotHandler
  35. adminIds []int64
  36. isRunning bool
  37. hostname string
  38. hashStorage *global.HashStorage
  39. // Performance improvements
  40. messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
  41. optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
  42. // Simple cache for frequently accessed data
  43. statusCache struct {
  44. data *service.Status
  45. timestamp time.Time
  46. mutex sync.RWMutex
  47. }
  48. serverStatsCache struct {
  49. data string
  50. timestamp time.Time
  51. mutex sync.RWMutex
  52. }
  53. // clients data to adding new client. receiver_inbound_IDs is the set of
  54. // inbounds the new client will be attached to; receiver_inbound_ID mirrors
  55. // the primary pick for the legacy attach-picker entry point. Per-protocol
  56. // secrets (UUID, password, flow, method) are filled per-inbound on submit
  57. // by ClientService.fillProtocolDefaults, so the bot only tracks universal
  58. // client fields here.
  59. receiver_inbound_ID int
  60. receiver_inbound_IDs []int
  61. client_Email string
  62. client_LimitIP int
  63. client_TotalGB int64
  64. client_ExpiryTime int64
  65. client_Enable bool
  66. client_TgID string
  67. client_SubID string
  68. client_Comment string
  69. client_Reset int
  70. )
  71. var userStates = make(map[int64]string)
  72. // LoginStatus represents the result of a login attempt.
  73. type LoginStatus byte
  74. // Login status constants
  75. const (
  76. LoginSuccess LoginStatus = 1 // Login was successful
  77. LoginFail LoginStatus = 0 // Login failed
  78. EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
  79. )
  80. // LoginAttempt contains safe metadata for panel login notifications.
  81. // It intentionally does not include attempted passwords.
  82. type LoginAttempt struct {
  83. Username string
  84. IP string
  85. Time string
  86. Status LoginStatus
  87. Reason string
  88. }
  89. // Tgbot provides business logic for Telegram bot integration.
  90. // It handles bot commands, user interactions, and status reporting via Telegram.
  91. type Tgbot struct {
  92. inboundService service.InboundService
  93. clientService service.ClientService
  94. settingService service.SettingService
  95. serverService service.ServerService
  96. xrayService service.XrayService
  97. lastStatus *service.Status
  98. }
  99. // NewTgbot creates a new Tgbot instance.
  100. func (t *Tgbot) NewTgbot() *Tgbot {
  101. return new(Tgbot)
  102. }
  103. // I18nBot retrieves a localized message for the bot interface.
  104. func (t *Tgbot) I18nBot(name string, params ...string) string {
  105. return locale.I18n(locale.Bot, name, params...)
  106. }
  107. // GetHashStorage returns the hash storage instance for callback queries.
  108. func (t *Tgbot) GetHashStorage() *global.HashStorage {
  109. return hashStorage
  110. }
  111. // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
  112. func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
  113. statusCache.mutex.RLock()
  114. defer statusCache.mutex.RUnlock()
  115. if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
  116. return statusCache.data, true
  117. }
  118. return nil, false
  119. }
  120. // setCachedStatus updates the status cache
  121. func (t *Tgbot) setCachedStatus(status *service.Status) {
  122. statusCache.mutex.Lock()
  123. defer statusCache.mutex.Unlock()
  124. statusCache.data = status
  125. statusCache.timestamp = time.Now()
  126. }
  127. // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
  128. func (t *Tgbot) getCachedServerStats() (string, bool) {
  129. serverStatsCache.mutex.RLock()
  130. defer serverStatsCache.mutex.RUnlock()
  131. if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
  132. return serverStatsCache.data, true
  133. }
  134. return "", false
  135. }
  136. // setCachedServerStats updates the server stats cache
  137. func (t *Tgbot) setCachedServerStats(stats string) {
  138. serverStatsCache.mutex.Lock()
  139. defer serverStatsCache.mutex.Unlock()
  140. serverStatsCache.data = stats
  141. serverStatsCache.timestamp = time.Now()
  142. }
  143. // Start initializes and starts the Telegram bot with the provided translation files.
  144. func (t *Tgbot) Start(i18nFS embed.FS) error {
  145. // Initialize localizer
  146. err := locale.InitLocalizer(i18nFS, &t.settingService)
  147. if err != nil {
  148. return err
  149. }
  150. // If Start is called again (e.g. during reload), ensure any previous long-polling
  151. // loop is stopped before creating a new bot / receiver.
  152. StopBot()
  153. // Initialize hash storage to store callback queries
  154. hashStorage = global.NewHashStorage(20 * time.Minute)
  155. // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
  156. messageWorkerPool = make(chan struct{}, 10)
  157. // Initialize optimized HTTP client with connection pooling
  158. optimizedHTTPClient = &http.Client{
  159. Timeout: 15 * time.Second,
  160. Transport: &http.Transport{
  161. MaxIdleConns: 100,
  162. MaxIdleConnsPerHost: 10,
  163. IdleConnTimeout: 30 * time.Second,
  164. DisableKeepAlives: false,
  165. },
  166. }
  167. t.SetHostname()
  168. // Get Telegram bot token
  169. tgBotToken, err := t.settingService.GetTgBotToken()
  170. if err != nil || tgBotToken == "" {
  171. logger.Warning("Failed to get Telegram bot token:", err)
  172. return err
  173. }
  174. // Get Telegram bot chat ID(s)
  175. tgBotID, err := t.settingService.GetTgBotChatId()
  176. if err != nil {
  177. logger.Warning("Failed to get Telegram bot chat ID:", err)
  178. return err
  179. }
  180. parsedAdminIds := make([]int64, 0)
  181. // Parse admin IDs from comma-separated string
  182. if tgBotID != "" {
  183. for adminID := range strings.SplitSeq(tgBotID, ",") {
  184. id, err := strconv.ParseInt(adminID, 10, 64)
  185. if err != nil {
  186. logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
  187. return err
  188. }
  189. parsedAdminIds = append(parsedAdminIds, int64(id))
  190. }
  191. }
  192. tgBotMutex.Lock()
  193. adminIds = parsedAdminIds
  194. tgBotMutex.Unlock()
  195. // Get Telegram bot proxy URL
  196. tgBotProxy, err := t.settingService.GetTgBotProxy()
  197. if err != nil {
  198. logger.Warning("Failed to get Telegram bot proxy URL:", err)
  199. }
  200. // Fall back to the panel-wide proxy when no dedicated bot proxy is set.
  201. if tgBotProxy == "" {
  202. panelProxy, perr := t.settingService.GetPanelProxy()
  203. if perr != nil {
  204. logger.Warning("Failed to get panel proxy URL:", perr)
  205. } else if isSupportedBotProxyScheme(panelProxy) {
  206. tgBotProxy = panelProxy
  207. }
  208. }
  209. // Get Telegram bot API server URL
  210. tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
  211. if err != nil {
  212. logger.Warning("Failed to get Telegram bot API server URL:", err)
  213. }
  214. // Create new Telegram bot instance
  215. bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
  216. if err != nil {
  217. logger.Error("Failed to initialize Telegram bot API:", err)
  218. return err
  219. }
  220. t.trySetBotCommands(bot)
  221. // Start receiving Telegram bot messages
  222. tgBotMutex.Lock()
  223. alreadyRunning := isRunning || botCancel != nil
  224. tgBotMutex.Unlock()
  225. if !alreadyRunning {
  226. logger.Info("Telegram bot receiver started")
  227. go t.OnReceive()
  228. }
  229. return nil
  230. }
  231. func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
  232. defer func() {
  233. if r := recover(); r != nil {
  234. logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
  235. }
  236. }()
  237. err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
  238. Commands: []telego.BotCommand{
  239. {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
  240. {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
  241. {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
  242. {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
  243. },
  244. })
  245. if err != nil {
  246. logger.Warning("Failed to set bot commands:", err)
  247. }
  248. }
  249. func isSupportedBotProxyScheme(proxyUrl string) bool {
  250. return strings.HasPrefix(proxyUrl, "socks5://") ||
  251. strings.HasPrefix(proxyUrl, "http://") ||
  252. strings.HasPrefix(proxyUrl, "https://")
  253. }
  254. // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
  255. func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
  256. client := &fasthttp.Client{
  257. // Connection timeouts
  258. ReadTimeout: 30 * time.Second,
  259. WriteTimeout: 30 * time.Second,
  260. MaxIdleConnDuration: 60 * time.Second,
  261. MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
  262. MaxIdemponentCallAttempts: 3,
  263. ReadBufferSize: 4096,
  264. WriteBufferSize: 4096,
  265. MaxConnsPerHost: 100,
  266. MaxConnWaitTimeout: 10 * time.Second,
  267. DisableHeaderNamesNormalizing: false,
  268. DisablePathNormalizing: false,
  269. // Retry on connection errors
  270. RetryIf: func(request *fasthttp.Request) bool {
  271. // Retry on connection errors for GET requests
  272. return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
  273. },
  274. }
  275. if proxyUrl != "" {
  276. if strings.HasPrefix(proxyUrl, "socks5://") {
  277. client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
  278. } else {
  279. client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
  280. }
  281. }
  282. return client
  283. }
  284. // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
  285. func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
  286. // Validate proxy URL if provided
  287. if proxyUrl != "" {
  288. if !isSupportedBotProxyScheme(proxyUrl) {
  289. logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
  290. proxyUrl = "" // Clear invalid proxy
  291. } else if _, err := url.Parse(proxyUrl); err != nil {
  292. logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
  293. proxyUrl = ""
  294. }
  295. }
  296. // Validate API server URL if provided
  297. if apiServerUrl != "" {
  298. safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
  299. if err != nil {
  300. logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
  301. apiServerUrl = ""
  302. } else {
  303. apiServerUrl = safeURL
  304. }
  305. }
  306. // Create robust fasthttp client
  307. client := t.createRobustFastHTTPClient(proxyUrl)
  308. // Build bot options
  309. var options []telego.BotOption
  310. options = append(options, telego.WithFastHTTPClient(client))
  311. if apiServerUrl != "" {
  312. options = append(options, telego.WithAPIServer(apiServerUrl))
  313. }
  314. return telego.NewBot(token, options...)
  315. }
  316. // IsRunning checks if the Telegram bot is currently running.
  317. func (t *Tgbot) IsRunning() bool {
  318. tgBotMutex.Lock()
  319. defer tgBotMutex.Unlock()
  320. return isRunning
  321. }
  322. // SetHostname sets the hostname for the bot.
  323. func (t *Tgbot) SetHostname() {
  324. host, err := os.Hostname()
  325. if err != nil {
  326. logger.Error("get hostname error:", err)
  327. hostname = ""
  328. return
  329. }
  330. hostname = host
  331. }
  332. // Stop safely stops the Telegram bot's Long Polling operation.
  333. // This method now calls the global StopBot function and cleans up other resources.
  334. func (t *Tgbot) Stop() {
  335. StopBot()
  336. logger.Info("Stop Telegram receiver ...")
  337. tgBotMutex.Lock()
  338. adminIds = nil
  339. tgBotMutex.Unlock()
  340. }
  341. // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
  342. // This is the global function called from main.go's signal handler and t.Stop().
  343. func StopBot() {
  344. // Don't hold the mutex while cancelling/waiting.
  345. tgBotMutex.Lock()
  346. cancel := botCancel
  347. botCancel = nil
  348. handler := botHandler
  349. botHandler = nil
  350. isRunning = false
  351. tgBotMutex.Unlock()
  352. if handler != nil {
  353. handler.Stop()
  354. }
  355. if cancel != nil {
  356. logger.Info("Sending cancellation signal to Telegram bot...")
  357. // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
  358. // and lets botHandler.Start() exit cleanly.
  359. cancel()
  360. botWG.Wait()
  361. logger.Info("Telegram bot successfully stopped.")
  362. }
  363. }
  364. // encodeQuery encodes the query string if it's longer than 64 characters.
  365. func (t *Tgbot) encodeQuery(query string) string {
  366. // NOTE: we only need to hash for more than 64 chars
  367. if len(query) <= 64 {
  368. return query
  369. }
  370. return hashStorage.SaveHash(query)
  371. }
  372. // decodeQuery decodes a hashed query string back to its original form.
  373. func (t *Tgbot) decodeQuery(query string) (string, error) {
  374. if !hashStorage.IsMD5(query) {
  375. return query, nil
  376. }
  377. decoded, exists := hashStorage.GetValue(query)
  378. if !exists {
  379. return "", common.NewError("hash not found in storage!")
  380. }
  381. return decoded, nil
  382. }
  383. // randomLowerAndNum generates a random string of lowercase letters and numbers.
  384. func (t *Tgbot) randomLowerAndNum(length int) string {
  385. charset := "abcdefghijklmnopqrstuvwxyz0123456789"
  386. bytes := make([]byte, length)
  387. for i := range bytes {
  388. randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
  389. bytes[i] = charset[randomIndex.Int64()]
  390. }
  391. return string(bytes)
  392. }
  393. // int64Contains checks if an int64 slice contains a specific item.
  394. func int64Contains(slice []int64, item int64) bool {
  395. return slices.Contains(slice, item)
  396. }
  397. // isSingleWord checks if the text contains only a single word.
  398. func (t *Tgbot) isSingleWord(text string) bool {
  399. text = strings.TrimSpace(text)
  400. re := regexp.MustCompile(`\s+`)
  401. return re.MatchString(text)
  402. }