""" Request Queue for managing API requests with rate limiting Ensures minimum delay between requests to avoid DDoS and rate limiting """ import asyncio import logging from typing import Callable, Any, Optional, Dict from datetime import datetime import config logger = logging.getLogger(__name__) class RequestQueue: """ Queue for managing API requests with rate limiting. Ensures minimum delay between requests. """ def __init__(self, min_delay: float = 7.0): """ Initialize request queue. Args: min_delay: Minimum delay in seconds between requests (default: 7.0) """ self.min_delay = min_delay self.queue = asyncio.Queue() self.is_processing = False self.last_request_time: Optional[float] = None self.lock = asyncio.Lock() self._processor_task: Optional[asyncio.Task] = None async def add_request(self, request_func: Callable, *args, **kwargs) -> Any: """ Add a request to the queue and wait for its result. Args: request_func: Async function to call *args: Positional arguments for the function **kwargs: Keyword arguments for the function Returns: Result of the request function """ # Create a future to wait for the result future = asyncio.Future() # Add request to queue await self.queue.put({ 'func': request_func, 'args': args, 'kwargs': kwargs, 'future': future }) # Start processor if not already running if not self.is_processing: self._start_processor() # Wait for result return await future def _start_processor(self): """Start the queue processor task""" if self._processor_task is None or self._processor_task.done(): self.is_processing = True self._processor_task = asyncio.create_task(self._process_queue()) logger.info(f"🚀 Started request queue processor (delay: {self.min_delay}s)") async def _process_queue(self): """Process requests from the queue with rate limiting""" logger.info("📋 Request queue processor started") while True: try: # Get next request from queue (wait indefinitely) request_item = await self.queue.get() # Wait if needed to maintain minimum delay await self._wait_if_needed() # Execute the request func = request_item['func'] args = request_item['args'] kwargs = request_item['kwargs'] future = request_item['future'] try: logger.debug(f"🔄 Executing request: {func.__name__}") result = await func(*args, **kwargs) future.set_result(result) logger.debug(f"✅ Request completed: {func.__name__}") except Exception as e: logger.error(f"❌ Request failed: {func.__name__}: {e}") future.set_exception(e) # Mark task as done self.queue.task_done() except asyncio.CancelledError: logger.info("🛑 Request queue processor cancelled") break except Exception as e: logger.error(f"❌ Error in request queue processor: {e}") import traceback logger.error(traceback.format_exc()) # Continue processing await asyncio.sleep(1) async def _wait_if_needed(self): """Wait if necessary to maintain minimum delay between requests""" async with self.lock: now = datetime.now().timestamp() if self.last_request_time is not None: elapsed = now - self.last_request_time if elapsed < self.min_delay: wait_time = self.min_delay - elapsed logger.debug(f"⏳ Rate limiter: waiting {wait_time:.2f} seconds") await asyncio.sleep(wait_time) now = datetime.now().timestamp() self.last_request_time = now async def stop(self): """Stop the queue processor""" if self._processor_task and not self._processor_task.done(): self._processor_task.cancel() try: await self._processor_task except asyncio.CancelledError: pass self.is_processing = False logger.info("🛑 Request queue processor stopped") # Global request queue instance _request_queue: Optional[RequestQueue] = None def get_request_queue() -> RequestQueue: """Get the global request queue instance""" global _request_queue if _request_queue is None: _request_queue = RequestQueue(min_delay=config.LICHESS_REQUEST_QUEUE_MIN_DELAY) return _request_queue