2025-11-20 03:23:38 +03:00
|
|
|
"""
|
|
|
|
|
Request Queue for managing API requests with rate limiting
|
|
|
|
|
Ensures minimum delay between requests to avoid DDoS and rate limiting
|
|
|
|
|
"""
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Callable, Any, Optional, Dict
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
2026-02-04 23:51:32 +03:00
|
|
|
import config
|
|
|
|
|
|
2025-11-20 03:23:38 +03:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
class RequestQueue:
|
|
|
|
|
"""
|
|
|
|
|
Queue for managing API requests with rate limiting.
|
|
|
|
|
Ensures minimum delay between requests.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, min_delay: float = 7.0):
|
|
|
|
|
"""
|
|
|
|
|
Initialize request queue.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
min_delay: Minimum delay in seconds between requests (default: 7.0)
|
|
|
|
|
"""
|
|
|
|
|
self.min_delay = min_delay
|
|
|
|
|
self.queue = asyncio.Queue()
|
|
|
|
|
self.is_processing = False
|
|
|
|
|
self.last_request_time: Optional[float] = None
|
|
|
|
|
self.lock = asyncio.Lock()
|
|
|
|
|
self._processor_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
|
|
|
|
async def add_request(self, request_func: Callable, *args, **kwargs) -> Any:
|
|
|
|
|
"""
|
|
|
|
|
Add a request to the queue and wait for its result.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
request_func: Async function to call
|
|
|
|
|
*args: Positional arguments for the function
|
|
|
|
|
**kwargs: Keyword arguments for the function
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Result of the request function
|
|
|
|
|
"""
|
|
|
|
|
# Create a future to wait for the result
|
|
|
|
|
future = asyncio.Future()
|
|
|
|
|
|
|
|
|
|
# Add request to queue
|
|
|
|
|
await self.queue.put({
|
|
|
|
|
'func': request_func,
|
|
|
|
|
'args': args,
|
|
|
|
|
'kwargs': kwargs,
|
|
|
|
|
'future': future
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# Start processor if not already running
|
|
|
|
|
if not self.is_processing:
|
|
|
|
|
self._start_processor()
|
|
|
|
|
|
|
|
|
|
# Wait for result
|
|
|
|
|
return await future
|
|
|
|
|
|
|
|
|
|
def _start_processor(self):
|
|
|
|
|
"""Start the queue processor task"""
|
|
|
|
|
if self._processor_task is None or self._processor_task.done():
|
|
|
|
|
self.is_processing = True
|
|
|
|
|
self._processor_task = asyncio.create_task(self._process_queue())
|
|
|
|
|
logger.info(f"🚀 Started request queue processor (delay: {self.min_delay}s)")
|
|
|
|
|
|
|
|
|
|
async def _process_queue(self):
|
|
|
|
|
"""Process requests from the queue with rate limiting"""
|
|
|
|
|
logger.info("📋 Request queue processor started")
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
# Get next request from queue (wait indefinitely)
|
|
|
|
|
request_item = await self.queue.get()
|
|
|
|
|
|
|
|
|
|
# Wait if needed to maintain minimum delay
|
|
|
|
|
await self._wait_if_needed()
|
|
|
|
|
|
|
|
|
|
# Execute the request
|
|
|
|
|
func = request_item['func']
|
|
|
|
|
args = request_item['args']
|
|
|
|
|
kwargs = request_item['kwargs']
|
|
|
|
|
future = request_item['future']
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
logger.debug(f"🔄 Executing request: {func.__name__}")
|
|
|
|
|
result = await func(*args, **kwargs)
|
|
|
|
|
future.set_result(result)
|
|
|
|
|
logger.debug(f"✅ Request completed: {func.__name__}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Request failed: {func.__name__}: {e}")
|
|
|
|
|
future.set_exception(e)
|
|
|
|
|
|
|
|
|
|
# Mark task as done
|
|
|
|
|
self.queue.task_done()
|
|
|
|
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
logger.info("🛑 Request queue processor cancelled")
|
|
|
|
|
break
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"❌ Error in request queue processor: {e}")
|
|
|
|
|
import traceback
|
|
|
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
# Continue processing
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
|
|
|
|
async def _wait_if_needed(self):
|
|
|
|
|
"""Wait if necessary to maintain minimum delay between requests"""
|
|
|
|
|
async with self.lock:
|
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
|
|
|
|
|
|
if self.last_request_time is not None:
|
|
|
|
|
elapsed = now - self.last_request_time
|
|
|
|
|
if elapsed < self.min_delay:
|
|
|
|
|
wait_time = self.min_delay - elapsed
|
|
|
|
|
logger.debug(f"⏳ Rate limiter: waiting {wait_time:.2f} seconds")
|
|
|
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
|
|
|
|
|
|
self.last_request_time = now
|
|
|
|
|
|
|
|
|
|
async def stop(self):
|
|
|
|
|
"""Stop the queue processor"""
|
|
|
|
|
if self._processor_task and not self._processor_task.done():
|
|
|
|
|
self._processor_task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await self._processor_task
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
self.is_processing = False
|
|
|
|
|
logger.info("🛑 Request queue processor stopped")
|
|
|
|
|
|
|
|
|
|
# Global request queue instance
|
|
|
|
|
_request_queue: Optional[RequestQueue] = None
|
|
|
|
|
|
|
|
|
|
def get_request_queue() -> RequestQueue:
|
|
|
|
|
"""Get the global request queue instance"""
|
|
|
|
|
global _request_queue
|
|
|
|
|
if _request_queue is None:
|
2026-02-04 23:51:32 +03:00
|
|
|
_request_queue = RequestQueue(min_delay=config.LICHESS_REQUEST_QUEUE_MIN_DELAY)
|
2025-11-20 03:23:38 +03:00
|
|
|
return _request_queue
|
|
|
|
|
|