очередь запросов и интервал 7 сек

This commit is contained in:
vrubelroman 2025-11-20 03:23:38 +03:00
parent 4dc5539da2
commit c16a11cf63
2 changed files with 203 additions and 32 deletions

View file

@ -21,6 +21,7 @@ from formatters import StatsFormatter
from i18n import t from i18n import t
from admin_bot import get_admin_bot, init_admin_bot from admin_bot import get_admin_bot, init_admin_bot
from message_counters import MessageCounters from message_counters import MessageCounters
from request_queue import get_request_queue
import time import time
import aiohttp import aiohttp
@ -42,6 +43,7 @@ class LichessBot:
self.period_start_times = {} # Store start times for each gamer self.period_start_times = {} # Store start times for each gamer
self.application = None # Will be set when application is created self.application = None # Will be set when application is created
self.counters = MessageCounters() # Message counters self.counters = MessageCounters() # Message counters
self.request_queue = get_request_queue() # Request queue for rate limiting
async def _notify_admin_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str], is_new_gamer: bool = False): async def _notify_admin_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str], is_new_gamer: bool = False):
"""Notify admin about newly linked player (always try to send).""" """Notify admin about newly linked player (always try to send)."""
@ -164,12 +166,20 @@ class LichessBot:
if len(gamers_with_periods) == 0: if len(gamers_with_periods) == 0:
logger.warning("⚠️ No periodic notifications configured! Users need to set periods using /setperiod") logger.warning("⚠️ No periodic notifications configured! Users need to set periods using /setperiod")
# Start request queue processor
self.request_queue._start_processor()
logger.info("✅ Request queue processor started")
for gamer in gamers_with_periods: for gamer in gamers_with_periods:
if gamer['period_minutes'] > 0: if gamer['period_minutes'] > 0:
user_id = gamer['user_id'] user_id = gamer['user_id']
username = gamer['username']
period = gamer['period_minutes']
# Start periodic task with user_id and gamer # Start periodic task with user_id and gamer
await self.start_periodic_task(gamer, user_id, gamer['period_minutes']) await self.start_periodic_task(gamer, user_id, period)
logger.info(f"✅ Started periodic task for {gamer['username']} (user {user_id}) with period {gamer['period_minutes']} minutes") logger.info(f"✅ Started periodic task for {username} (user {user_id}) with period {period} minutes")
logger.info(f"✅ All periodic tasks started. Total: {len([g for g in gamers_with_periods if g['period_minutes'] > 0])}")
# Start daily counter reset task # Start daily counter reset task
asyncio.create_task(self.daily_counter_reset_task()) asyncio.create_task(self.daily_counter_reset_task())
@ -1186,11 +1196,13 @@ class LichessBot:
async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int): async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Periodic check for gamer activity""" """Periodic check for gamer activity"""
task_key = f"{gamer['id']}_{user_id}" task_key = f"{gamer['id']}_{user_id}"
username = gamer['username']
# Запоминаем время начала отслеживания (текущее время минус период, чтобы сразу проверить последний период) # Инициализируем время начала отслеживания как текущее время
start_time = datetime.now() - timedelta(minutes=period_minutes) # Первая проверка произойдет через period_minutes минут
start_time = datetime.now()
self.period_start_times[task_key] = start_time self.period_start_times[task_key] = start_time
logger.info(f"Started periodic monitoring for {gamer['username']} with {period_minutes} minute intervals") logger.info(f"🔄 Started periodic monitoring for {username} (user {user_id}) with {period_minutes} minute intervals")
consecutive_errors = 0 consecutive_errors = 0
max_consecutive_errors = 5 max_consecutive_errors = 5
@ -1221,33 +1233,36 @@ class LichessBot:
logger.info(f"Period changed for {gamer['username']} from {period_minutes} to {current_period} minutes") logger.info(f"Period changed for {gamer['username']} from {period_minutes} to {current_period} minutes")
period_minutes = current_period period_minutes = current_period
# Ждем заданное количество минут # Ждем заданное количество минут перед следующей проверкой
logger.info(f"⏳ Waiting {period_minutes} minutes before next check for {username}")
await asyncio.sleep(period_minutes * 60) await asyncio.sleep(period_minutes * 60)
# Получаем время начала периода (время последней проверки или время старта задачи) # Получаем текущее время
period_start = self.period_start_times.get(task_key, start_time)
now = datetime.now() now = datetime.now()
# Рассчитываем период: от (текущее время - период) до текущего времени # Рассчитываем период: от (текущее время - период) до текущего времени
# Это гарантирует, что мы проверяем последний час активности # Это гарантирует, что мы проверяем последний период активности
since_time = now - timedelta(minutes=period_minutes) since_time = now - timedelta(minutes=period_minutes)
since_timestamp = int(since_time.timestamp() * 1000) since_timestamp = int(since_time.timestamp() * 1000)
until_timestamp = int(now.timestamp() * 1000) until_timestamp = int(now.timestamp() * 1000)
logger.info(f"Checking period for {gamer['username']}: {since_time} to {now} (last {period_minutes} minutes)") logger.info(f"🔍 Checking activity for {username} (user {user_id}): period from {since_time} to {now} (last {period_minutes} minutes)")
logger.info(f"Unix timestamps: since={since_timestamp}, until={until_timestamp}") logger.info(f"📅 Unix timestamps: since={since_timestamp}, until={until_timestamp}")
# Делаем запросы к API с обработкой ошибок # Делаем запросы к API через очередь с обработкой ошибок
games_data = None games_data = None
puzzles_data = None puzzles_data = None
try: try:
games_data = await self.lichess_api.get_games_period( # Добавляем запрос в очередь (будет выполнен с задержкой 7 секунд)
logger.info(f"📥 Adding games request to queue for {gamer['username']}")
games_data = await self.request_queue.add_request(
self.lichess_api.get_games_period,
gamer['username'], since_timestamp, until_timestamp gamer['username'], since_timestamp, until_timestamp
) )
logger.info(f"Games API response received for {gamer['username']}") logger.info(f"Games API response received for {gamer['username']}")
except Exception as e: except Exception as e:
logger.error(f"Error getting games data for {gamer['username']}: {e}") logger.error(f"Error getting games data for {gamer['username']}: {e}")
consecutive_errors += 1 consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors: if consecutive_errors >= max_consecutive_errors:
logger.error(f"Too many consecutive errors for {gamer['username']}, stopping periodic check") logger.error(f"Too many consecutive errors for {gamer['username']}, stopping periodic check")
@ -1259,12 +1274,15 @@ class LichessBot:
if gamer.get('token'): if gamer.get('token'):
try: try:
puzzles_data = await self.lichess_api.get_puzzles_period( # Добавляем запрос в очередь (будет выполнен с задержкой 7 секунд)
gamer['token'], since_timestamp, until_timestamp, max_puzzles=150 logger.info(f"📥 Adding puzzles request to queue for {gamer['username']}")
puzzles_data = await self.request_queue.add_request(
self.lichess_api.get_puzzles_period,
gamer['token'], since_timestamp, until_timestamp, 150
) )
logger.info(f"Puzzles API response received for {gamer['username']}") logger.info(f"Puzzles API response received for {gamer['username']}")
except Exception as e: except Exception as e:
logger.warning(f"Error getting puzzles data for {gamer['username']}: {e}") logger.warning(f"⚠️ Error getting puzzles data for {gamer['username']}: {e}")
# Продолжаем без данных по пазлам # Продолжаем без данных по пазлам
# Сбрасываем счетчик ошибок при успешном запросе # Сбрасываем счетчик ошибок при успешном запросе
@ -1275,29 +1293,38 @@ class LichessBot:
total_games = 0 total_games = 0
if games_data: if games_data:
# Логируем структуру ответа для отладки # Логируем структуру ответа для отладки
logger.debug(f"Games data structure for {gamer['username']}: {games_data}") logger.info(f"📊 Games data structure for {username}: {games_data}")
if games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0) # Проверяем games_count на верхнем уровне (приоритет)
has_games = total_games > 0 if games_data.get('games_count', 0) > 0:
# Также проверяем games_count на верхнем уровне
elif games_data.get('games_count', 0) > 0:
total_games = games_data.get('games_count', 0) total_games = games_data.get('games_count', 0)
has_games = True has_games = True
logger.info(f"✅ Found {total_games} games via games_count field")
# Также проверяем data.total.games_played
elif games_data.get('data') and games_data.get('data', {}).get('total', {}).get('games_played', 0) > 0:
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
has_games = True
logger.info(f"✅ Found {total_games} games via data.total.games_played field")
else: else:
logger.warning(f"No games_data returned for {gamer['username']}") logger.warning(f"⚠️ No games_data returned for {username}")
has_puzzles = False has_puzzles = False
total_puzzles = 0 total_puzzles = 0
if puzzles_data: if puzzles_data:
if puzzles_data.get('data'): logger.info(f"📊 Puzzles data structure for {username}: {puzzles_data}")
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
has_puzzles = total_puzzles > 0 # Проверяем puzzles_in_period на верхнем уровне (приоритет)
# Также проверяем puzzles_in_period на верхнем уровне if puzzles_data.get('puzzles_in_period', 0) > 0:
elif puzzles_data.get('puzzles_in_period', 0) > 0:
total_puzzles = puzzles_data.get('puzzles_in_period', 0) total_puzzles = puzzles_data.get('puzzles_in_period', 0)
has_puzzles = True has_puzzles = True
logger.info(f"✅ Found {total_puzzles} puzzles via puzzles_in_period field")
# Также проверяем data.total_attempts
elif puzzles_data.get('data') and puzzles_data.get('data', {}).get('total_attempts', 0) > 0:
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
has_puzzles = True
logger.info(f"✅ Found {total_puzzles} puzzles via data.total_attempts field")
logger.info(f"Activity check for {gamer['username']}: has_games={has_games} (total={total_games}), has_puzzles={has_puzzles} (total={total_puzzles})") logger.info(f"🔍 Activity check result for {username}: has_games={has_games} (total={total_games}), has_puzzles={has_puzzles} (total={total_puzzles})")
# Отправляем уведомление только если есть реальная активность # Отправляем уведомление только если есть реальная активность
if has_games or has_puzzles: if has_games or has_puzzles:

View file

@ -0,0 +1,144 @@
"""
Request Queue for managing API requests with rate limiting
Ensures minimum delay between requests to avoid DDoS and rate limiting
"""
import asyncio
import logging
from typing import Callable, Any, Optional, Dict
from datetime import datetime
logger = logging.getLogger(__name__)
class RequestQueue:
"""
Queue for managing API requests with rate limiting.
Ensures minimum delay between requests.
"""
def __init__(self, min_delay: float = 7.0):
"""
Initialize request queue.
Args:
min_delay: Minimum delay in seconds between requests (default: 7.0)
"""
self.min_delay = min_delay
self.queue = asyncio.Queue()
self.is_processing = False
self.last_request_time: Optional[float] = None
self.lock = asyncio.Lock()
self._processor_task: Optional[asyncio.Task] = None
async def add_request(self, request_func: Callable, *args, **kwargs) -> Any:
"""
Add a request to the queue and wait for its result.
Args:
request_func: Async function to call
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Result of the request function
"""
# Create a future to wait for the result
future = asyncio.Future()
# Add request to queue
await self.queue.put({
'func': request_func,
'args': args,
'kwargs': kwargs,
'future': future
})
# Start processor if not already running
if not self.is_processing:
self._start_processor()
# Wait for result
return await future
def _start_processor(self):
"""Start the queue processor task"""
if self._processor_task is None or self._processor_task.done():
self.is_processing = True
self._processor_task = asyncio.create_task(self._process_queue())
logger.info(f"🚀 Started request queue processor (delay: {self.min_delay}s)")
async def _process_queue(self):
"""Process requests from the queue with rate limiting"""
logger.info("📋 Request queue processor started")
while True:
try:
# Get next request from queue (wait indefinitely)
request_item = await self.queue.get()
# Wait if needed to maintain minimum delay
await self._wait_if_needed()
# Execute the request
func = request_item['func']
args = request_item['args']
kwargs = request_item['kwargs']
future = request_item['future']
try:
logger.debug(f"🔄 Executing request: {func.__name__}")
result = await func(*args, **kwargs)
future.set_result(result)
logger.debug(f"✅ Request completed: {func.__name__}")
except Exception as e:
logger.error(f"❌ Request failed: {func.__name__}: {e}")
future.set_exception(e)
# Mark task as done
self.queue.task_done()
except asyncio.CancelledError:
logger.info("🛑 Request queue processor cancelled")
break
except Exception as e:
logger.error(f"❌ Error in request queue processor: {e}")
import traceback
logger.error(traceback.format_exc())
# Continue processing
await asyncio.sleep(1)
async def _wait_if_needed(self):
"""Wait if necessary to maintain minimum delay between requests"""
async with self.lock:
now = datetime.now().timestamp()
if self.last_request_time is not None:
elapsed = now - self.last_request_time
if elapsed < self.min_delay:
wait_time = self.min_delay - elapsed
logger.debug(f"⏳ Rate limiter: waiting {wait_time:.2f} seconds")
await asyncio.sleep(wait_time)
now = datetime.now().timestamp()
self.last_request_time = now
async def stop(self):
"""Stop the queue processor"""
if self._processor_task and not self._processor_task.done():
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
self.is_processing = False
logger.info("🛑 Request queue processor stopped")
# Global request queue instance
_request_queue: Optional[RequestQueue] = None
def get_request_queue() -> RequestQueue:
"""Get the global request queue instance"""
global _request_queue
if _request_queue is None:
_request_queue = RequestQueue(min_delay=7.0)
return _request_queue