From c16a11cf630117e931ad3241f6c57d1631b77eab Mon Sep 17 00:00:00 2001 From: vrubelroman Date: Thu, 20 Nov 2025 03:23:38 +0300 Subject: [PATCH] =?UTF-8?q?=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D1=8C=20?= =?UTF-8?q?=D0=B7=D0=B0=D0=BF=D1=80=D0=BE=D1=81=D0=BE=D0=B2=20=D0=B8=20?= =?UTF-8?q?=D0=B8=D0=BD=D1=82=D0=B5=D1=80=D0=B2=D0=B0=D0=BB=207=20=D1=81?= =?UTF-8?q?=D0=B5=D0=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LichessClientTG_bot/bot.py | 91 +++++++++++------ LichessClientTG_bot/request_queue.py | 144 +++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 32 deletions(-) create mode 100644 LichessClientTG_bot/request_queue.py diff --git a/LichessClientTG_bot/bot.py b/LichessClientTG_bot/bot.py index 62216cb..f8d1884 100644 --- a/LichessClientTG_bot/bot.py +++ b/LichessClientTG_bot/bot.py @@ -21,6 +21,7 @@ from formatters import StatsFormatter from i18n import t from admin_bot import get_admin_bot, init_admin_bot from message_counters import MessageCounters +from request_queue import get_request_queue import time import aiohttp @@ -42,6 +43,7 @@ class LichessBot: self.period_start_times = {} # Store start times for each gamer self.application = None # Will be set when application is created self.counters = MessageCounters() # Message counters + self.request_queue = get_request_queue() # Request queue for rate limiting async def _notify_admin_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str], is_new_gamer: bool = False): """Notify admin about newly linked player (always try to send).""" @@ -164,12 +166,20 @@ class LichessBot: if len(gamers_with_periods) == 0: logger.warning("⚠️ No periodic notifications configured! Users need to set periods using /setperiod") + # Start request queue processor + self.request_queue._start_processor() + logger.info("✅ Request queue processor started") + for gamer in gamers_with_periods: if gamer['period_minutes'] > 0: user_id = gamer['user_id'] + username = gamer['username'] + period = gamer['period_minutes'] # Start periodic task with user_id and gamer - await self.start_periodic_task(gamer, user_id, gamer['period_minutes']) - logger.info(f"✅ Started periodic task for {gamer['username']} (user {user_id}) with period {gamer['period_minutes']} minutes") + await self.start_periodic_task(gamer, user_id, period) + logger.info(f"✅ Started periodic task for {username} (user {user_id}) with period {period} minutes") + + logger.info(f"✅ All periodic tasks started. Total: {len([g for g in gamers_with_periods if g['period_minutes'] > 0])}") # Start daily counter reset task asyncio.create_task(self.daily_counter_reset_task()) @@ -1186,11 +1196,13 @@ class LichessBot: async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int): """Periodic check for gamer activity""" task_key = f"{gamer['id']}_{user_id}" + username = gamer['username'] - # Запоминаем время начала отслеживания (текущее время минус период, чтобы сразу проверить последний период) - start_time = datetime.now() - timedelta(minutes=period_minutes) + # Инициализируем время начала отслеживания как текущее время + # Первая проверка произойдет через period_minutes минут + start_time = datetime.now() self.period_start_times[task_key] = start_time - logger.info(f"Started periodic monitoring for {gamer['username']} with {period_minutes} minute intervals") + logger.info(f"🔄 Started periodic monitoring for {username} (user {user_id}) with {period_minutes} minute intervals") consecutive_errors = 0 max_consecutive_errors = 5 @@ -1221,33 +1233,36 @@ class LichessBot: logger.info(f"Period changed for {gamer['username']} from {period_minutes} to {current_period} minutes") period_minutes = current_period - # Ждем заданное количество минут + # Ждем заданное количество минут перед следующей проверкой + logger.info(f"⏳ Waiting {period_minutes} minutes before next check for {username}") await asyncio.sleep(period_minutes * 60) - # Получаем время начала периода (время последней проверки или время старта задачи) - period_start = self.period_start_times.get(task_key, start_time) + # Получаем текущее время now = datetime.now() # Рассчитываем период: от (текущее время - период) до текущего времени - # Это гарантирует, что мы проверяем последний час активности + # Это гарантирует, что мы проверяем последний период активности since_time = now - timedelta(minutes=period_minutes) since_timestamp = int(since_time.timestamp() * 1000) until_timestamp = int(now.timestamp() * 1000) - logger.info(f"Checking period for {gamer['username']}: {since_time} to {now} (last {period_minutes} minutes)") - logger.info(f"Unix timestamps: since={since_timestamp}, until={until_timestamp}") + logger.info(f"🔍 Checking activity for {username} (user {user_id}): period from {since_time} to {now} (last {period_minutes} minutes)") + logger.info(f"📅 Unix timestamps: since={since_timestamp}, until={until_timestamp}") - # Делаем запросы к API с обработкой ошибок + # Делаем запросы к API через очередь с обработкой ошибок games_data = None puzzles_data = None try: - games_data = await self.lichess_api.get_games_period( + # Добавляем запрос в очередь (будет выполнен с задержкой 7 секунд) + logger.info(f"📥 Adding games request to queue for {gamer['username']}") + games_data = await self.request_queue.add_request( + self.lichess_api.get_games_period, gamer['username'], since_timestamp, until_timestamp ) - logger.info(f"Games API response received for {gamer['username']}") + logger.info(f"✅ Games API response received for {gamer['username']}") except Exception as e: - logger.error(f"Error getting games data for {gamer['username']}: {e}") + logger.error(f"❌ Error getting games data for {gamer['username']}: {e}") consecutive_errors += 1 if consecutive_errors >= max_consecutive_errors: logger.error(f"Too many consecutive errors for {gamer['username']}, stopping periodic check") @@ -1259,12 +1274,15 @@ class LichessBot: if gamer.get('token'): try: - puzzles_data = await self.lichess_api.get_puzzles_period( - gamer['token'], since_timestamp, until_timestamp, max_puzzles=150 + # Добавляем запрос в очередь (будет выполнен с задержкой 7 секунд) + logger.info(f"📥 Adding puzzles request to queue for {gamer['username']}") + puzzles_data = await self.request_queue.add_request( + self.lichess_api.get_puzzles_period, + gamer['token'], since_timestamp, until_timestamp, 150 ) - logger.info(f"Puzzles API response received for {gamer['username']}") + logger.info(f"✅ Puzzles API response received for {gamer['username']}") except Exception as e: - logger.warning(f"Error getting puzzles data for {gamer['username']}: {e}") + logger.warning(f"⚠️ Error getting puzzles data for {gamer['username']}: {e}") # Продолжаем без данных по пазлам # Сбрасываем счетчик ошибок при успешном запросе @@ -1275,29 +1293,38 @@ class LichessBot: total_games = 0 if games_data: # Логируем структуру ответа для отладки - logger.debug(f"Games data structure for {gamer['username']}: {games_data}") - if games_data.get('data'): - total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0) - has_games = total_games > 0 - # Также проверяем games_count на верхнем уровне - elif games_data.get('games_count', 0) > 0: + logger.info(f"📊 Games data structure for {username}: {games_data}") + + # Проверяем games_count на верхнем уровне (приоритет) + if games_data.get('games_count', 0) > 0: total_games = games_data.get('games_count', 0) has_games = True + logger.info(f"✅ Found {total_games} games via games_count field") + # Также проверяем data.total.games_played + elif games_data.get('data') and games_data.get('data', {}).get('total', {}).get('games_played', 0) > 0: + total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0) + has_games = True + logger.info(f"✅ Found {total_games} games via data.total.games_played field") else: - logger.warning(f"No games_data returned for {gamer['username']}") + logger.warning(f"⚠️ No games_data returned for {username}") has_puzzles = False total_puzzles = 0 if puzzles_data: - if puzzles_data.get('data'): - total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0) - has_puzzles = total_puzzles > 0 - # Также проверяем puzzles_in_period на верхнем уровне - elif puzzles_data.get('puzzles_in_period', 0) > 0: + logger.info(f"📊 Puzzles data structure for {username}: {puzzles_data}") + + # Проверяем puzzles_in_period на верхнем уровне (приоритет) + if puzzles_data.get('puzzles_in_period', 0) > 0: total_puzzles = puzzles_data.get('puzzles_in_period', 0) has_puzzles = True + logger.info(f"✅ Found {total_puzzles} puzzles via puzzles_in_period field") + # Также проверяем data.total_attempts + elif puzzles_data.get('data') and puzzles_data.get('data', {}).get('total_attempts', 0) > 0: + total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0) + has_puzzles = True + logger.info(f"✅ Found {total_puzzles} puzzles via data.total_attempts field") - logger.info(f"Activity check for {gamer['username']}: has_games={has_games} (total={total_games}), has_puzzles={has_puzzles} (total={total_puzzles})") + logger.info(f"🔍 Activity check result for {username}: has_games={has_games} (total={total_games}), has_puzzles={has_puzzles} (total={total_puzzles})") # Отправляем уведомление только если есть реальная активность if has_games or has_puzzles: diff --git a/LichessClientTG_bot/request_queue.py b/LichessClientTG_bot/request_queue.py new file mode 100644 index 0000000..d0c0efc --- /dev/null +++ b/LichessClientTG_bot/request_queue.py @@ -0,0 +1,144 @@ +""" +Request Queue for managing API requests with rate limiting +Ensures minimum delay between requests to avoid DDoS and rate limiting +""" +import asyncio +import logging +from typing import Callable, Any, Optional, Dict +from datetime import datetime + +logger = logging.getLogger(__name__) + +class RequestQueue: + """ + Queue for managing API requests with rate limiting. + Ensures minimum delay between requests. + """ + + def __init__(self, min_delay: float = 7.0): + """ + Initialize request queue. + + Args: + min_delay: Minimum delay in seconds between requests (default: 7.0) + """ + self.min_delay = min_delay + self.queue = asyncio.Queue() + self.is_processing = False + self.last_request_time: Optional[float] = None + self.lock = asyncio.Lock() + self._processor_task: Optional[asyncio.Task] = None + + async def add_request(self, request_func: Callable, *args, **kwargs) -> Any: + """ + Add a request to the queue and wait for its result. + + Args: + request_func: Async function to call + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + + Returns: + Result of the request function + """ + # Create a future to wait for the result + future = asyncio.Future() + + # Add request to queue + await self.queue.put({ + 'func': request_func, + 'args': args, + 'kwargs': kwargs, + 'future': future + }) + + # Start processor if not already running + if not self.is_processing: + self._start_processor() + + # Wait for result + return await future + + def _start_processor(self): + """Start the queue processor task""" + if self._processor_task is None or self._processor_task.done(): + self.is_processing = True + self._processor_task = asyncio.create_task(self._process_queue()) + logger.info(f"🚀 Started request queue processor (delay: {self.min_delay}s)") + + async def _process_queue(self): + """Process requests from the queue with rate limiting""" + logger.info("📋 Request queue processor started") + + while True: + try: + # Get next request from queue (wait indefinitely) + request_item = await self.queue.get() + + # Wait if needed to maintain minimum delay + await self._wait_if_needed() + + # Execute the request + func = request_item['func'] + args = request_item['args'] + kwargs = request_item['kwargs'] + future = request_item['future'] + + try: + logger.debug(f"🔄 Executing request: {func.__name__}") + result = await func(*args, **kwargs) + future.set_result(result) + logger.debug(f"✅ Request completed: {func.__name__}") + except Exception as e: + logger.error(f"❌ Request failed: {func.__name__}: {e}") + future.set_exception(e) + + # Mark task as done + self.queue.task_done() + + except asyncio.CancelledError: + logger.info("🛑 Request queue processor cancelled") + break + except Exception as e: + logger.error(f"❌ Error in request queue processor: {e}") + import traceback + logger.error(traceback.format_exc()) + # Continue processing + await asyncio.sleep(1) + + async def _wait_if_needed(self): + """Wait if necessary to maintain minimum delay between requests""" + async with self.lock: + now = datetime.now().timestamp() + + if self.last_request_time is not None: + elapsed = now - self.last_request_time + if elapsed < self.min_delay: + wait_time = self.min_delay - elapsed + logger.debug(f"⏳ Rate limiter: waiting {wait_time:.2f} seconds") + await asyncio.sleep(wait_time) + now = datetime.now().timestamp() + + self.last_request_time = now + + async def stop(self): + """Stop the queue processor""" + if self._processor_task and not self._processor_task.done(): + self._processor_task.cancel() + try: + await self._processor_task + except asyncio.CancelledError: + pass + self.is_processing = False + logger.info("🛑 Request queue processor stopped") + +# Global request queue instance +_request_queue: Optional[RequestQueue] = None + +def get_request_queue() -> RequestQueue: + """Get the global request queue instance""" + global _request_queue + if _request_queue is None: + _request_queue = RequestQueue(min_delay=7.0) + return _request_queue +