|
@@ -16,6 +16,7 @@ import (
|
|
|
"regexp"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/mhsanaei/3x-ui/v2/config"
|
|
@@ -44,6 +45,23 @@ var (
|
|
|
hostname string
|
|
|
hashStorage *global.HashStorage
|
|
|
|
|
|
+ // Performance improvements
|
|
|
+ messageWorkerPool chan struct{} // Semaphore for limiting concurrent message processing
|
|
|
+ optimizedHTTPClient *http.Client // HTTP client with connection pooling and timeouts
|
|
|
+
|
|
|
+ // Simple cache for frequently accessed data
|
|
|
+ statusCache struct {
|
|
|
+ data *Status
|
|
|
+ timestamp time.Time
|
|
|
+ mutex sync.RWMutex
|
|
|
+ }
|
|
|
+
|
|
|
+ serverStatsCache struct {
|
|
|
+ data string
|
|
|
+ timestamp time.Time
|
|
|
+ mutex sync.RWMutex
|
|
|
+ }
|
|
|
+
|
|
|
// clients data to adding new client
|
|
|
receiver_inbound_ID int
|
|
|
client_Id string
|
|
@@ -100,6 +118,46 @@ func (t *Tgbot) GetHashStorage() *global.HashStorage {
|
|
|
return hashStorage
|
|
|
}
|
|
|
|
|
|
+// getCachedStatus returns cached server status if it's fresh enough (less than 5 seconds old)
|
|
|
+func (t *Tgbot) getCachedStatus() (*Status, bool) {
|
|
|
+ statusCache.mutex.RLock()
|
|
|
+ defer statusCache.mutex.RUnlock()
|
|
|
+
|
|
|
+ if statusCache.data != nil && time.Since(statusCache.timestamp) < 5*time.Second {
|
|
|
+ return statusCache.data, true
|
|
|
+ }
|
|
|
+ return nil, false
|
|
|
+}
|
|
|
+
|
|
|
+// setCachedStatus updates the status cache
|
|
|
+func (t *Tgbot) setCachedStatus(status *Status) {
|
|
|
+ statusCache.mutex.Lock()
|
|
|
+ defer statusCache.mutex.Unlock()
|
|
|
+
|
|
|
+ statusCache.data = status
|
|
|
+ statusCache.timestamp = time.Now()
|
|
|
+}
|
|
|
+
|
|
|
+// getCachedServerStats returns cached server stats if it's fresh enough (less than 10 seconds old)
|
|
|
+func (t *Tgbot) getCachedServerStats() (string, bool) {
|
|
|
+ serverStatsCache.mutex.RLock()
|
|
|
+ defer serverStatsCache.mutex.RUnlock()
|
|
|
+
|
|
|
+ if serverStatsCache.data != "" && time.Since(serverStatsCache.timestamp) < 10*time.Second {
|
|
|
+ return serverStatsCache.data, true
|
|
|
+ }
|
|
|
+ return "", false
|
|
|
+}
|
|
|
+
|
|
|
+// setCachedServerStats updates the server stats cache
|
|
|
+func (t *Tgbot) setCachedServerStats(stats string) {
|
|
|
+ serverStatsCache.mutex.Lock()
|
|
|
+ defer serverStatsCache.mutex.Unlock()
|
|
|
+
|
|
|
+ serverStatsCache.data = stats
|
|
|
+ serverStatsCache.timestamp = time.Now()
|
|
|
+}
|
|
|
+
|
|
|
// Start initializes and starts the Telegram bot with the provided translation files.
|
|
|
func (t *Tgbot) Start(i18nFS embed.FS) error {
|
|
|
// Initialize localizer
|
|
@@ -111,6 +169,20 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
|
|
|
// Initialize hash storage to store callback queries
|
|
|
hashStorage = global.NewHashStorage(20 * time.Minute)
|
|
|
|
|
|
+ // Initialize worker pool for concurrent message processing (max 10 concurrent handlers)
|
|
|
+ messageWorkerPool = make(chan struct{}, 10)
|
|
|
+
|
|
|
+ // Initialize optimized HTTP client with connection pooling
|
|
|
+ optimizedHTTPClient = &http.Client{
|
|
|
+ Timeout: 15 * time.Second,
|
|
|
+ Transport: &http.Transport{
|
|
|
+ MaxIdleConns: 100,
|
|
|
+ MaxIdleConnsPerHost: 10,
|
|
|
+ IdleConnTimeout: 30 * time.Second,
|
|
|
+ DisableKeepAlives: false,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
t.SetHostname()
|
|
|
|
|
|
// Get Telegram bot token
|
|
@@ -271,7 +343,7 @@ func (t *Tgbot) decodeQuery(query string) (string, error) {
|
|
|
// OnReceive starts the message receiving loop for the Telegram bot.
|
|
|
func (t *Tgbot) OnReceive() {
|
|
|
params := telego.GetUpdatesParams{
|
|
|
- Timeout: 10,
|
|
|
+ Timeout: 30, // Increased timeout to reduce API calls
|
|
|
}
|
|
|
|
|
|
updates, _ := bot.UpdatesViaLongPolling(context.Background(), ¶ms)
|
|
@@ -285,14 +357,26 @@ func (t *Tgbot) OnReceive() {
|
|
|
}, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
|
|
|
|
|
|
botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
|
|
|
- delete(userStates, message.Chat.ID)
|
|
|
- t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID))
|
|
|
+ // Use goroutine with worker pool for concurrent command processing
|
|
|
+ go func() {
|
|
|
+ messageWorkerPool <- struct{}{} // Acquire worker
|
|
|
+ defer func() { <-messageWorkerPool }() // Release worker
|
|
|
+
|
|
|
+ delete(userStates, message.Chat.ID)
|
|
|
+ t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID))
|
|
|
+ }()
|
|
|
return nil
|
|
|
}, th.AnyCommand())
|
|
|
|
|
|
botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
|
|
|
- delete(userStates, query.Message.GetChat().ID)
|
|
|
- t.answerCallback(&query, checkAdmin(query.From.ID))
|
|
|
+ // Use goroutine with worker pool for concurrent callback processing
|
|
|
+ go func() {
|
|
|
+ messageWorkerPool <- struct{}{} // Acquire worker
|
|
|
+ defer func() { <-messageWorkerPool }() // Release worker
|
|
|
+
|
|
|
+ delete(userStates, query.Message.GetChat().ID)
|
|
|
+ t.answerCallback(&query, checkAdmin(query.From.ID))
|
|
|
+ }()
|
|
|
return nil
|
|
|
}, th.AnyCallbackQueryWithMessage())
|
|
|
|
|
@@ -2099,7 +2183,10 @@ func (t *Tgbot) SendMsgToTgbot(chatId int64, msg string, replyMarkup ...telego.R
|
|
|
if err != nil {
|
|
|
logger.Warning("Error sending telegram message :", err)
|
|
|
}
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
+ // Reduced delay to improve performance (only needed for rate limiting)
|
|
|
+ if n < len(allMessages)-1 { // Only delay between messages, not after the last one
|
|
|
+ time.Sleep(100 * time.Millisecond)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2208,12 +2295,12 @@ func (t *Tgbot) sendClientIndividualLinks(chatId int64, email string) {
|
|
|
// Force plain text to avoid HTML page; controller respects Accept header
|
|
|
req.Header.Set("Accept", "text/plain, */*;q=0.1")
|
|
|
|
|
|
- // Use default client with reasonable timeout via context
|
|
|
+ // Use optimized client with connection pooling
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
|
defer cancel()
|
|
|
req = req.WithContext(ctx)
|
|
|
|
|
|
- resp, err := http.DefaultClient.Do(req)
|
|
|
+ resp, err := optimizedHTTPClient.Do(req)
|
|
|
if err != nil {
|
|
|
t.SendMsgToTgbot(chatId, t.I18nBot("tgbot.answers.errorOperation")+"\r\n"+err.Error())
|
|
|
return
|
|
@@ -2323,7 +2410,7 @@ func (t *Tgbot) sendClientQRLinks(chatId int64, email string) {
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
|
defer cancel()
|
|
|
req = req.WithContext(ctx)
|
|
|
- if resp, err := http.DefaultClient.Do(req); err == nil {
|
|
|
+ if resp, err := optimizedHTTPClient.Do(req); err == nil {
|
|
|
body, _ := io.ReadAll(resp.Body)
|
|
|
_ = resp.Body.Close()
|
|
|
encoded, _ := t.settingService.GetSubEncrypt()
|
|
@@ -2356,7 +2443,10 @@ func (t *Tgbot) sendClientQRLinks(chatId int64, email string) {
|
|
|
tu.FileFromBytes(png, filename),
|
|
|
)
|
|
|
_, _ = bot.SendDocument(context.Background(), document)
|
|
|
- time.Sleep(200 * time.Millisecond)
|
|
|
+ // Reduced delay for better performance
|
|
|
+ if i < max-1 { // Only delay between documents, not after the last one
|
|
|
+ time.Sleep(50 * time.Millisecond)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2443,10 +2533,20 @@ func (t *Tgbot) sendServerUsage() string {
|
|
|
|
|
|
// prepareServerUsageInfo prepares the server usage information string.
|
|
|
func (t *Tgbot) prepareServerUsageInfo() string {
|
|
|
+ // Check if we have cached data first
|
|
|
+ if cachedStats, found := t.getCachedServerStats(); found {
|
|
|
+ return cachedStats
|
|
|
+ }
|
|
|
+
|
|
|
info, ipv4, ipv6 := "", "", ""
|
|
|
|
|
|
- // get latest status of server
|
|
|
- t.lastStatus = t.serverService.GetStatus(t.lastStatus)
|
|
|
+ // get latest status of server with caching
|
|
|
+ if cachedStatus, found := t.getCachedStatus(); found {
|
|
|
+ t.lastStatus = cachedStatus
|
|
|
+ } else {
|
|
|
+ t.lastStatus = t.serverService.GetStatus(t.lastStatus)
|
|
|
+ t.setCachedStatus(t.lastStatus)
|
|
|
+ }
|
|
|
onlines := p.GetOnlineClients()
|
|
|
|
|
|
info += t.I18nBot("tgbot.messages.hostname", "Hostname=="+hostname)
|
|
@@ -2488,6 +2588,10 @@ func (t *Tgbot) prepareServerUsageInfo() string {
|
|
|
info += t.I18nBot("tgbot.messages.udpCount", "Count=="+strconv.Itoa(t.lastStatus.UdpCount))
|
|
|
info += t.I18nBot("tgbot.messages.traffic", "Total=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Sent+t.lastStatus.NetTraffic.Recv)), "Upload=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Sent)), "Download=="+common.FormatTraffic(int64(t.lastStatus.NetTraffic.Recv)))
|
|
|
info += t.I18nBot("tgbot.messages.xrayStatus", "State=="+fmt.Sprint(t.lastStatus.Xray.State))
|
|
|
+
|
|
|
+ // Cache the complete server stats
|
|
|
+ t.setCachedServerStats(info)
|
|
|
+
|
|
|
return info
|
|
|
}
|
|
|
|