rate limiter 0.2 sec

This commit is contained in:
vrubelroman 2025-11-18 15:10:19 +03:00
parent e0e8fa8f6b
commit 24d53e731a
4 changed files with 127 additions and 0 deletions

View file

@ -2,6 +2,7 @@ import aiohttp
import logging import logging
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from config import LICHESS_API_BASE_URL, LICHESS_STATS_API_BASE_URL from config import LICHESS_API_BASE_URL, LICHESS_STATS_API_BASE_URL
from rate_limiter import get_rate_limiter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -9,9 +10,11 @@ class LichessAPI:
def __init__(self): def __init__(self):
self.lichess_base_url = LICHESS_API_BASE_URL self.lichess_base_url = LICHESS_API_BASE_URL
self.stats_base_url = LICHESS_STATS_API_BASE_URL self.stats_base_url = LICHESS_STATS_API_BASE_URL
self.rate_limiter = get_rate_limiter()
async def get_user_profile(self, token: str) -> Optional[Dict[str, Any]]: async def get_user_profile(self, token: str) -> Optional[Dict[str, Any]]:
"""Get user profile from Lichess API using token""" """Get user profile from Lichess API using token"""
await self.rate_limiter.wait_if_needed()
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
try: try:
@ -31,6 +34,7 @@ class LichessAPI:
async def get_today_stats(self, username: str) -> Optional[Dict[str, Any]]: async def get_today_stats(self, username: str) -> Optional[Dict[str, Any]]:
"""Get today's statistics from our stats API""" """Get today's statistics from our stats API"""
await self.rate_limiter.wait_if_needed()
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get( async with session.get(
@ -47,6 +51,7 @@ class LichessAPI:
async def get_yesterday_stats(self, username: str) -> Optional[Dict[str, Any]]: async def get_yesterday_stats(self, username: str) -> Optional[Dict[str, Any]]:
"""Get yesterday's statistics from our stats API""" """Get yesterday's statistics from our stats API"""
await self.rate_limiter.wait_if_needed()
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get( async with session.get(
@ -63,6 +68,7 @@ class LichessAPI:
async def get_week_stats(self, username: str) -> Optional[Dict[str, Any]]: async def get_week_stats(self, username: str) -> Optional[Dict[str, Any]]:
"""Get week's statistics from our stats API""" """Get week's statistics from our stats API"""
await self.rate_limiter.wait_if_needed()
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get( async with session.get(
@ -79,6 +85,7 @@ class LichessAPI:
async def get_games_period(self, username: str, since: int, until: int, rated_only: Optional[bool] = None) -> Optional[Dict[str, Any]]: async def get_games_period(self, username: str, since: int, until: int, rated_only: Optional[bool] = None) -> Optional[Dict[str, Any]]:
"""Get games for a specific period""" """Get games for a specific period"""
await self.rate_limiter.wait_if_needed()
try: try:
url = f"{self.stats_base_url}/games/{username}/period" url = f"{self.stats_base_url}/games/{username}/period"
params = {"since": since, "until": until} params = {"since": since, "until": until}
@ -102,6 +109,7 @@ class LichessAPI:
async def get_puzzles_period(self, token: str, since: int, until: int, max_puzzles: int = 150) -> Optional[Dict[str, Any]]: async def get_puzzles_period(self, token: str, since: int, until: int, max_puzzles: int = 150) -> Optional[Dict[str, Any]]:
"""Get puzzles for a specific period""" """Get puzzles for a specific period"""
await self.rate_limiter.wait_if_needed()
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
try: try:
@ -122,6 +130,7 @@ class LichessAPI:
async def check_user_exists(self, username: str) -> bool: async def check_user_exists(self, username: str) -> bool:
"""Check if user exists on Lichess""" """Check if user exists on Lichess"""
await self.rate_limiter.wait_if_needed()
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get( async with session.get(
@ -141,6 +150,7 @@ class LichessAPI:
async def get_user_ratings(self, username: str) -> Optional[Dict[str, Any]]: async def get_user_ratings(self, username: str) -> Optional[Dict[str, Any]]:
"""Get user ratings from Lichess API""" """Get user ratings from Lichess API"""
await self.rate_limiter.wait_if_needed()
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get( async with session.get(

View file

@ -0,0 +1,53 @@
"""
Rate limiter for Lichess API requests
Ensures minimum delay between requests
"""
import asyncio
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Rate limiter that ensures minimum delay between requests.
Thread-safe and async-safe.
"""
def __init__(self, min_delay: float = 0.2):
"""
Initialize rate limiter.
Args:
min_delay: Minimum delay in seconds between requests (default: 0.2)
"""
self.min_delay = min_delay
self.last_request_time: Optional[float] = None
self.lock = asyncio.Lock()
async def wait_if_needed(self):
"""
Wait if necessary to maintain minimum delay between requests.
Should be called before each API request.
"""
async with self.lock:
now = time.time()
if self.last_request_time is not None:
elapsed = now - self.last_request_time
if elapsed < self.min_delay:
wait_time = self.min_delay - elapsed
logger.debug(f"Rate limiter: waiting {wait_time:.3f} seconds")
await asyncio.sleep(wait_time)
now = time.time()
self.last_request_time = now
# Global rate limiter instance
_rate_limiter = RateLimiter(min_delay=0.2)
def get_rate_limiter() -> RateLimiter:
"""Get the global rate limiter instance"""
return _rate_limiter

View file

@ -18,6 +18,7 @@ from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import json import json
from rate_limiter import get_rate_limiter
# Настройка логирования для модуля # Настройка логирования для модуля
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,6 +43,7 @@ class LichessClient:
""" """
self.base_url = "https://lichess.org/api" # Базовый URL Lichess API self.base_url = "https://lichess.org/api" # Базовый URL Lichess API
self.client = httpx.AsyncClient(timeout=30.0) # HTTP клиент с таймаутом self.client = httpx.AsyncClient(timeout=30.0) # HTTP клиент с таймаутом
self.rate_limiter = get_rate_limiter()
async def get_user_activity(self, username: str) -> Optional[List[Dict[str, Any]]]: async def get_user_activity(self, username: str) -> Optional[List[Dict[str, Any]]]:
""" """
@ -58,6 +60,9 @@ class LichessClient:
Exception: При других ошибках Exception: При других ошибках
""" """
try: try:
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения активности пользователя # Формируем URL для получения активности пользователя
url = f"{self.base_url}/user/{username}/activity" url = f"{self.base_url}/user/{username}/activity"
logger.info(f"Запрос активности пользователя {username}") logger.info(f"Запрос активности пользователя {username}")
@ -104,6 +109,9 @@ class LichessClient:
Exception: При других ошибках Exception: При других ошибках
""" """
try: try:
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения игр пользователя # Формируем URL для получения игр пользователя
url = f"{self.base_url}/games/user/{username}" url = f"{self.base_url}/games/user/{username}"
@ -182,6 +190,9 @@ class LichessClient:
Exception: При других ошибках Exception: При других ошибках
""" """
try: try:
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения активности по задачам # Формируем URL для получения активности по задачам
url = f"{self.base_url}/puzzle/activity" url = f"{self.base_url}/puzzle/activity"

View file

@ -0,0 +1,53 @@
"""
Rate limiter for Lichess API requests
Ensures minimum delay between requests
"""
import asyncio
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Rate limiter that ensures minimum delay between requests.
Thread-safe and async-safe.
"""
def __init__(self, min_delay: float = 0.2):
"""
Initialize rate limiter.
Args:
min_delay: Minimum delay in seconds between requests (default: 0.2)
"""
self.min_delay = min_delay
self.last_request_time: Optional[float] = None
self.lock = asyncio.Lock()
async def wait_if_needed(self):
"""
Wait if necessary to maintain minimum delay between requests.
Should be called before each API request.
"""
async with self.lock:
now = time.time()
if self.last_request_time is not None:
elapsed = now - self.last_request_time
if elapsed < self.min_delay:
wait_time = self.min_delay - elapsed
logger.debug(f"Rate limiter: waiting {wait_time:.3f} seconds")
await asyncio.sleep(wait_time)
now = time.time()
self.last_request_time = now
# Global rate limiter instance
_rate_limiter = RateLimiter(min_delay=0.2)
def get_rate_limiter() -> RateLimiter:
"""Get the global rate limiter instance"""
return _rate_limiter