LichessStatTgWeb/LichessClientTG_bot/rate_limiter.py

54 lines
1.5 KiB
Python
Raw Permalink Normal View History

2025-11-18 15:10:19 +03:00
"""
Rate limiter for Lichess API requests
Ensures minimum delay between requests
"""
import asyncio
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Rate limiter that ensures minimum delay between requests.
Thread-safe and async-safe.
"""
def __init__(self, min_delay: float = 0.2):
"""
Initialize rate limiter.
Args:
min_delay: Minimum delay in seconds between requests (default: 0.2)
"""
self.min_delay = min_delay
self.last_request_time: Optional[float] = None
self.lock = asyncio.Lock()
async def wait_if_needed(self):
"""
Wait if necessary to maintain minimum delay between requests.
Should be called before each API request.
"""
async with self.lock:
now = time.time()
if self.last_request_time is not None:
elapsed = now - self.last_request_time
if elapsed < self.min_delay:
wait_time = self.min_delay - elapsed
logger.debug(f"Rate limiter: waiting {wait_time:.3f} seconds")
await asyncio.sleep(wait_time)
now = time.time()
self.last_request_time = now
# Global rate limiter instance
_rate_limiter = RateLimiter(min_delay=0.2)
def get_rate_limiter() -> RateLimiter:
"""Get the global rate limiter instance"""
return _rate_limiter