1
0
Эх сурвалжийг харах

Fix telegram bot issue (#3608)

* fix: improve Telegram bot handling for concurrent starts and graceful shutdown

- Added logic to stop any existing long-polling loop when Start is called again.
- Introduced a mutex to manage access to shared state variables, ensuring thread safety.
- Updated the OnReceive method to prevent multiple concurrent executions.
- Enhanced Stop method to ensure proper cleanup of resources and state management.

* fix: enhance Telegram bot's long-polling management

- Improved handling of concurrent starts by stopping existing long-polling loops.
- Implemented mutex for thread-safe access to shared state variables.
- Updated OnReceive method to prevent multiple executions.
- Enhanced Stop method for better resource cleanup and state management.

* .
Vlad Yaroslavlev 4 өдөр өмнө
parent
commit
278aa1c85c
1 өөрчлөгдсөн 55 нэмэгдсэн , 41 устгасан
  1. 55 41
      web/service/tgbot.go

+ 55 - 41
web/service/tgbot.go

@@ -174,6 +174,10 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
 		return err
 	}
 
+	// If Start is called again (e.g. during reload), ensure any previous long-polling
+	// loop is stopped before creating a new bot / receiver.
+	StopBot()
+
 	// Initialize hash storage to store callback queries
 	hashStorage = global.NewHashStorage(20 * time.Minute)
 
@@ -207,6 +211,7 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
 		return err
 	}
 
+	parsedAdminIds := make([]int64, 0)
 	// Parse admin IDs from comma-separated string
 	if tgBotID != "" {
 		for _, adminID := range strings.Split(tgBotID, ",") {
@@ -215,9 +220,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
 				logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
 				return err
 			}
-			adminIds = append(adminIds, int64(id))
+			parsedAdminIds = append(parsedAdminIds, int64(id))
 		}
 	}
+	tgBotMutex.Lock()
+	adminIds = parsedAdminIds
+	tgBotMutex.Unlock()
 
 	// Get Telegram bot proxy URL
 	tgBotProxy, err := t.settingService.GetTgBotProxy()
@@ -252,10 +260,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
 	}
 
 	// Start receiving Telegram bot messages
-	if !isRunning {
+	tgBotMutex.Lock()
+	alreadyRunning := isRunning || botCancel != nil
+	tgBotMutex.Unlock()
+	if !alreadyRunning {
 		logger.Info("Telegram bot receiver started")
 		go t.OnReceive()
-		isRunning = true
 	}
 
 	return nil
@@ -300,6 +310,8 @@ func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*tel
 
 // IsRunning checks if the Telegram bot is currently running.
 func (t *Tgbot) IsRunning() bool {
+	tgBotMutex.Lock()
+	defer tgBotMutex.Unlock()
 	return isRunning
 }
 
@@ -317,34 +329,34 @@ func (t *Tgbot) SetHostname() {
 // Stop safely stops the Telegram bot's Long Polling operation.
 // This method now calls the global StopBot function and cleans up other resources.
 func (t *Tgbot) Stop() {
-	// Call the global StopBot function to gracefully shut down Long Polling
 	StopBot()
-
-	// Stop the bot handler (in case the goroutine hasn't exited yet)
-	if botHandler != nil {
-		botHandler.Stop()
-	}
 	logger.Info("Stop Telegram receiver ...")
-	isRunning = false
+	tgBotMutex.Lock()
 	adminIds = nil
+	tgBotMutex.Unlock()
 }
 
 // StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
 // This is the global function called from main.go's signal handler and t.Stop().
 func StopBot() {
+	// Don't hold the mutex while cancelling/waiting.
 	tgBotMutex.Lock()
-	defer tgBotMutex.Unlock()
-
-	if botCancel != nil {
-		logger.Info("Sending cancellation signal to Telegram bot...")
+	cancel := botCancel
+	botCancel = nil
+	handler := botHandler
+	botHandler = nil
+	isRunning = false
+	tgBotMutex.Unlock()
 
-		// Calling botCancel() cancels the context passed to UpdatesViaLongPolling,
-		// which stops the Long Polling operation and closes the updates channel,
-		// allowing the th.Start() goroutine to exit cleanly.
-		botCancel()
+	if handler != nil {
+		handler.Stop()
+	}
 
-		botCancel = nil
-		// Giving the goroutine a small delay to exit cleanly.
+	if cancel != nil {
+		logger.Info("Sending cancellation signal to Telegram bot...")
+		// Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
+		// and lets botHandler.Start() exit cleanly.
+		cancel()
 		botWG.Wait()
 		logger.Info("Telegram bot successfully stopped.")
 	}
@@ -379,36 +391,38 @@ func (t *Tgbot) OnReceive() {
 	params := telego.GetUpdatesParams{
 		Timeout: 30, // Increased timeout to reduce API calls
 	}
-	// --- GRACEFUL SHUTDOWN FIX: Context creation ---
+	// Strict singleton: never start a second long-polling loop.
 	tgBotMutex.Lock()
-
-	// Create a context with cancellation and store the cancel function.
-	var ctx context.Context
-
-	// Check if botCancel is already set (to prevent race condition overwrite and goroutine leak)
-	if botCancel == nil {
-		ctx, botCancel = context.WithCancel(context.Background())
-	} else {
-		// If botCancel is already set, use a non-cancellable context for this redundant call.
-		// This prevents overwriting the active botCancel and causing a goroutine leak from the previous call.
-		logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.")
-		ctx = context.Background() // <<< ИЗМЕНЕНИЕ
+	if botCancel != nil || isRunning {
+		tgBotMutex.Unlock()
+		logger.Warning("TgBot OnReceive called while already running; ignoring.")
+		return
 	}
 
+	ctx, cancel := context.WithCancel(context.Background())
+	botCancel = cancel
+	isRunning = true
+	// Add to WaitGroup before releasing the lock so StopBot() can't return
+	// before this receiver goroutine is accounted for.
+	botWG.Add(1)
 	tgBotMutex.Unlock()
 
 	// Get updates channel using the context.
 	updates, _ := bot.UpdatesViaLongPolling(ctx, &params)
-	botWG.Go(func() {
+	go func() {
+		defer botWG.Done()
+		h, _ := th.NewBotHandler(bot, updates)
+		tgBotMutex.Lock()
+		botHandler = h
+		tgBotMutex.Unlock()
 
-		botHandler, _ = th.NewBotHandler(bot, updates)
-		botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+		h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
 			delete(userStates, message.Chat.ID)
 			t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
 			return nil
 		}, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
 
-		botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+		h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
 			// Use goroutine with worker pool for concurrent command processing
 			go func() {
 				messageWorkerPool <- struct{}{}        // Acquire worker
@@ -420,7 +434,7 @@ func (t *Tgbot) OnReceive() {
 			return nil
 		}, th.AnyCommand())
 
-		botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
+		h.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
 			// Use goroutine with worker pool for concurrent callback processing
 			go func() {
 				messageWorkerPool <- struct{}{}        // Acquire worker
@@ -432,7 +446,7 @@ func (t *Tgbot) OnReceive() {
 			return nil
 		}, th.AnyCallbackQueryWithMessage())
 
-		botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+		h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
 			if userState, exists := userStates[message.Chat.ID]; exists {
 				switch userState {
 				case "awaiting_id":
@@ -578,8 +592,8 @@ func (t *Tgbot) OnReceive() {
 			return nil
 		}, th.AnyMessage())
 
-		botHandler.Start()
-	})
+		h.Start()
+	}()
 }
 
 // answerCommand processes incoming command messages from Telegram users.