| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- package tgbot
- import (
- "context"
- "crypto/rand"
- "embed"
- "math/big"
- "net/http"
- "net/url"
- "os"
- "regexp"
- "slices"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/mhsanaei/3x-ui/v3/internal/util/common"
- "github.com/mhsanaei/3x-ui/v3/internal/web/global"
- "github.com/mhsanaei/3x-ui/v3/internal/web/locale"
- "github.com/mhsanaei/3x-ui/v3/internal/web/service"
- "github.com/mymmrac/telego"
- th "github.com/mymmrac/telego/telegohandler"
- "github.com/valyala/fasthttp"
- "github.com/valyala/fasthttp/fasthttpproxy"
- )
- var (
- bot *telego.Bot
- // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
- botCancel context.CancelFunc
- // tgBotMutex protects concurrent access to botCancel variable
- tgBotMutex sync.Mutex
- // botWG waits for the OnReceive Long Polling goroutine to finish.
- botWG sync.WaitGroup
- botHandler *th.BotHandler
- adminIds []int64
- isRunning bool
- hostname string
- hashStorage *global.HashStorage
- // Performance improvements
- messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
- optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
- // Simple cache for frequently accessed data
- statusCache struct {
- data *service.Status
- timestamp time.Time
- mutex sync.RWMutex
- }
- serverStatsCache struct {
- data string
- timestamp time.Time
- mutex sync.RWMutex
- }
- // clients data to adding new client. receiver_inbound_IDs is the set of
- // inbounds the new client will be attached to; receiver_inbound_ID mirrors
- // the primary pick for the legacy attach-picker entry point. Per-protocol
- // secrets (UUID, password, flow, method) are filled per-inbound on submit
- // by ClientService.fillProtocolDefaults, so the bot only tracks universal
- // client fields here.
- receiver_inbound_ID int
- receiver_inbound_IDs []int
- client_Email string
- client_LimitIP int
- client_TotalGB int64
- client_ExpiryTime int64
- client_Enable bool
- client_TgID string
- client_SubID string
- client_Comment string
- client_Reset int
- )
- var userStates = make(map[int64]string)
- // LoginStatus represents the result of a login attempt.
- type LoginStatus byte
- // Login status constants
- const (
- LoginSuccess LoginStatus = 1 // Login was successful
- LoginFail LoginStatus = 0 // Login failed
- EmptyTelegramUserID = int64(0) // Default value for empty Telegram user ID
- )
- // LoginAttempt contains safe metadata for panel login notifications.
- // It intentionally does not include attempted passwords.
- type LoginAttempt struct {
- Username string
- IP string
- Time string
- Status LoginStatus
- Reason string
- }
- // Tgbot provides business logic for Telegram bot integration.
- // It handles bot commands, user interactions, and status reporting via Telegram.
- type Tgbot struct {
- inboundService service.InboundService
- clientService service.ClientService
- settingService service.SettingService
- serverService service.ServerService
- xrayService service.XrayService
- lastStatus *service.Status
- }
- // NewTgbot creates a new Tgbot instance.
- func (t *Tgbot) NewTgbot() *Tgbot {
- return new(Tgbot)
- }
- // I18nBot retrieves a localized message for the bot interface.
- func (t *Tgbot) I18nBot(name string, params ...string) string {
- return locale.I18n(locale.Bot, name, params...)
- }
- // GetHashStorage returns the hash storage instance for callback queries.
- func (t *Tgbot) GetHashStorage() *global.HashStorage {
- return hashStorage
- }
- // getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
- func (t *Tgbot) getCachedStatus() (*service.Status, bool) {
- statusCache.mutex.RLock()
- defer statusCache.mutex.RUnlock()
- if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
- return statusCache.data, true
- }
- return nil, false
- }
- // setCachedStatus updates the status cache
- func (t *Tgbot) setCachedStatus(status *service.Status) {
- statusCache.mutex.Lock()
- defer statusCache.mutex.Unlock()
- statusCache.data = status
- statusCache.timestamp = time.Now()
- }
- // getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
- func (t *Tgbot) getCachedServerStats() (string, bool) {
- serverStatsCache.mutex.RLock()
- defer serverStatsCache.mutex.RUnlock()
- if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
- return serverStatsCache.data, true
- }
- return "", false
- }
- // setCachedServerStats updates the server stats cache
- func (t *Tgbot) setCachedServerStats(stats string) {
- serverStatsCache.mutex.Lock()
- defer serverStatsCache.mutex.Unlock()
- serverStatsCache.data = stats
- serverStatsCache.timestamp = time.Now()
- }
- // Start initializes and starts the Telegram bot with the provided translation files.
- func (t *Tgbot) Start(i18nFS embed.FS) error {
- // Initialize localizer
- err := locale.InitLocalizer(i18nFS, &t.settingService)
- if err != nil {
- return err
- }
- // If Start is called again (e.g. during reload), ensure any previous long-polling
- // loop is stopped before creating a new bot / receiver.
- StopBot()
- // Initialize hash storage to store callback queries
- hashStorage = global.NewHashStorage(20 * time.Minute)
- // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
- messageWorkerPool = make(chan struct{}, 10)
- // Initialize optimized HTTP client with connection pooling
- optimizedHTTPClient = &http.Client{
- Timeout: 15 * time.Second,
- Transport: &http.Transport{
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 10,
- IdleConnTimeout: 30 * time.Second,
- DisableKeepAlives: false,
- },
- }
- t.SetHostname()
- // Get Telegram bot token
- tgBotToken, err := t.settingService.GetTgBotToken()
- if err != nil || tgBotToken == "" {
- logger.Warning("Failed to get Telegram bot token:", err)
- return err
- }
- // Get Telegram bot chat ID(s)
- tgBotID, err := t.settingService.GetTgBotChatId()
- if err != nil {
- logger.Warning("Failed to get Telegram bot chat ID:", err)
- return err
- }
- parsedAdminIds := make([]int64, 0)
- // Parse admin IDs from comma-separated string
- if tgBotID != "" {
- for adminID := range strings.SplitSeq(tgBotID, ",") {
- id, err := strconv.ParseInt(adminID, 10, 64)
- if err != nil {
- logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
- return err
- }
- parsedAdminIds = append(parsedAdminIds, int64(id))
- }
- }
- tgBotMutex.Lock()
- adminIds = parsedAdminIds
- tgBotMutex.Unlock()
- // Get Telegram bot proxy URL
- tgBotProxy, err := t.settingService.GetTgBotProxy()
- if err != nil {
- logger.Warning("Failed to get Telegram bot proxy URL:", err)
- }
- // Fall back to the panel-wide proxy when no dedicated bot proxy is set.
- if tgBotProxy == "" {
- panelProxy, perr := t.settingService.GetPanelProxy()
- if perr != nil {
- logger.Warning("Failed to get panel proxy URL:", perr)
- } else if isSupportedBotProxyScheme(panelProxy) {
- tgBotProxy = panelProxy
- }
- }
- // Get Telegram bot API server URL
- tgBotAPIServer, err := t.settingService.GetTgBotAPIServer()
- if err != nil {
- logger.Warning("Failed to get Telegram bot API server URL:", err)
- }
- // Create new Telegram bot instance
- bot, err = t.NewBot(tgBotToken, tgBotProxy, tgBotAPIServer)
- if err != nil {
- logger.Error("Failed to initialize Telegram bot API:", err)
- return err
- }
- t.trySetBotCommands(bot)
- // Start receiving Telegram bot messages
- tgBotMutex.Lock()
- alreadyRunning := isRunning || botCancel != nil
- tgBotMutex.Unlock()
- if !alreadyRunning {
- logger.Info("Telegram bot receiver started")
- go t.OnReceive()
- }
- return nil
- }
- func (t *Tgbot) trySetBotCommands(bot *telego.Bot) {
- defer func() {
- if r := recover(); r != nil {
- logger.Warning("Failed to register bot commands (Telegram may be rate-limiting); bot will continue without them:", r)
- }
- }()
- err := bot.SetMyCommands(context.Background(), &telego.SetMyCommandsParams{
- Commands: []telego.BotCommand{
- {Command: "start", Description: t.I18nBot("tgbot.commands.startDesc")},
- {Command: "help", Description: t.I18nBot("tgbot.commands.helpDesc")},
- {Command: "status", Description: t.I18nBot("tgbot.commands.statusDesc")},
- {Command: "id", Description: t.I18nBot("tgbot.commands.idDesc")},
- },
- })
- if err != nil {
- logger.Warning("Failed to set bot commands:", err)
- }
- }
- func isSupportedBotProxyScheme(proxyUrl string) bool {
- return strings.HasPrefix(proxyUrl, "socks5://") ||
- strings.HasPrefix(proxyUrl, "http://") ||
- strings.HasPrefix(proxyUrl, "https://")
- }
- // createRobustFastHTTPClient creates a fasthttp.Client with proper connection handling
- func (t *Tgbot) createRobustFastHTTPClient(proxyUrl string) *fasthttp.Client {
- client := &fasthttp.Client{
- // Connection timeouts
- ReadTimeout: 30 * time.Second,
- WriteTimeout: 30 * time.Second,
- MaxIdleConnDuration: 60 * time.Second,
- MaxConnDuration: 0, // unlimited, but controlled by MaxIdleConnDuration
- MaxIdemponentCallAttempts: 3,
- ReadBufferSize: 4096,
- WriteBufferSize: 4096,
- MaxConnsPerHost: 100,
- MaxConnWaitTimeout: 10 * time.Second,
- DisableHeaderNamesNormalizing: false,
- DisablePathNormalizing: false,
- // Retry on connection errors
- RetryIf: func(request *fasthttp.Request) bool {
- // Retry on connection errors for GET requests
- return string(request.Header.Method()) == "GET" || string(request.Header.Method()) == "POST"
- },
- }
- if proxyUrl != "" {
- if strings.HasPrefix(proxyUrl, "socks5://") {
- client.Dial = fasthttpproxy.FasthttpSocksDialer(proxyUrl)
- } else {
- client.Dial = fasthttpproxy.FasthttpHTTPDialer(proxyUrl)
- }
- }
- return client
- }
- // NewBot creates a new Telegram bot instance with optional proxy and API server settings.
- func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*telego.Bot, error) {
- // Validate proxy URL if provided
- if proxyUrl != "" {
- if !isSupportedBotProxyScheme(proxyUrl) {
- logger.Warning("Unsupported proxy scheme (want socks5:// or http(s)://), ignoring proxy")
- proxyUrl = "" // Clear invalid proxy
- } else if _, err := url.Parse(proxyUrl); err != nil {
- logger.Warningf("Can't parse proxy URL, ignoring proxy: %v", err)
- proxyUrl = ""
- }
- }
- // Validate API server URL if provided
- if apiServerUrl != "" {
- safeURL, err := service.SanitizePublicHTTPURL(apiServerUrl, false)
- if err != nil {
- logger.Warningf("Invalid or blocked API server URL, using default: %v", err)
- apiServerUrl = ""
- } else {
- apiServerUrl = safeURL
- }
- }
- // Create robust fasthttp client
- client := t.createRobustFastHTTPClient(proxyUrl)
- // Build bot options
- var options []telego.BotOption
- options = append(options, telego.WithFastHTTPClient(client))
- if apiServerUrl != "" {
- options = append(options, telego.WithAPIServer(apiServerUrl))
- }
- return telego.NewBot(token, options...)
- }
- // IsRunning checks if the Telegram bot is currently running.
- func (t *Tgbot) IsRunning() bool {
- tgBotMutex.Lock()
- defer tgBotMutex.Unlock()
- return isRunning
- }
- // SetHostname sets the hostname for the bot.
- func (t *Tgbot) SetHostname() {
- host, err := os.Hostname()
- if err != nil {
- logger.Error("get hostname error:", err)
- hostname = ""
- return
- }
- hostname = host
- }
- // Stop safely stops the Telegram bot's Long Polling operation.
- // This method now calls the global StopBot function and cleans up other resources.
- func (t *Tgbot) Stop() {
- StopBot()
- logger.Info("Stop Telegram receiver ...")
- tgBotMutex.Lock()
- adminIds = nil
- tgBotMutex.Unlock()
- }
- // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
- // This is the global function called from main.go's signal handler and t.Stop().
- func StopBot() {
- // Don't hold the mutex while cancelling/waiting.
- tgBotMutex.Lock()
- cancel := botCancel
- botCancel = nil
- handler := botHandler
- botHandler = nil
- isRunning = false
- tgBotMutex.Unlock()
- if handler != nil {
- handler.Stop()
- }
- if cancel != nil {
- logger.Info("Sending cancellation signal to Telegram bot...")
- // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
- // and lets botHandler.Start() exit cleanly.
- cancel()
- botWG.Wait()
- logger.Info("Telegram bot successfully stopped.")
- }
- }
- // encodeQuery encodes the query string if it's longer than 64 characters.
- func (t *Tgbot) encodeQuery(query string) string {
- // NOTE: we only need to hash for more than 64 chars
- if len(query) <= 64 {
- return query
- }
- return hashStorage.SaveHash(query)
- }
- // decodeQuery decodes a hashed query string back to its original form.
- func (t *Tgbot) decodeQuery(query string) (string, error) {
- if !hashStorage.IsMD5(query) {
- return query, nil
- }
- decoded, exists := hashStorage.GetValue(query)
- if !exists {
- return "", common.NewError("hash not found in storage!")
- }
- return decoded, nil
- }
- // randomLowerAndNum generates a random string of lowercase letters and numbers.
- func (t *Tgbot) randomLowerAndNum(length int) string {
- charset := "abcdefghijklmnopqrstuvwxyz0123456789"
- bytes := make([]byte, length)
- for i := range bytes {
- randomIndex, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
- bytes[i] = charset[randomIndex.Int64()]
- }
- return string(bytes)
- }
- // int64Contains checks if an int64 slice contains a specific item.
- func int64Contains(slice []int64, item int64) bool {
- return slices.Contains(slice, item)
- }
- // isSingleWord checks if the text contains only a single word.
- func (t *Tgbot) isSingleWord(text string) bool {
- text = strings.TrimSpace(text)
- re := regexp.MustCompile(`\s+`)
- return re.MatchString(text)
- }
|