audio_from_youtube/app/queue_manager.py

127 lines
4.7 KiB
Python

"""Менеджер очереди задач для последовательной обработки."""
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable, Awaitable
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class Task:
"""Задача на скачивание и конвертацию."""
task_id: int
user_id: int
username: Optional[str]
url: str
chat_id: int
message_id: int
created_at: datetime
callback: Optional[Callable[[str], Awaitable[None]]] = None # callback для отправки статуса
status_message_ids: list[int] = field(default_factory=list)
class QueueManager:
"""Менеджер очереди для последовательной обработки задач."""
def __init__(self, worker: Callable[[Task], Awaitable[str]]):
"""
Args:
worker: Асинхронная функция для обработки задачи, возвращает путь к файлу
"""
self.queue: asyncio.Queue = asyncio.Queue()
self.worker = worker
self.task_counter = 0
self._worker_task: Optional[asyncio.Task] = None
self._lock = asyncio.Lock()
async def start(self):
"""Запуск воркера для обработки очереди."""
if self._worker_task is None or self._worker_task.done():
self._worker_task = asyncio.create_task(self._process_queue())
logger.info("Queue manager started")
async def stop(self):
"""Остановка воркера."""
if self._worker_task and not self._worker_task.done():
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
logger.info("Queue manager stopped")
async def add_task(
self,
user_id: int,
username: Optional[str],
url: str,
chat_id: int,
message_id: int,
callback: Optional[Callable[[str], Awaitable[None]]] = None,
status_message_ids: Optional[list[int]] = None
) -> int:
"""Добавить задачу в очередь."""
async with self._lock:
self.task_counter += 1
task_id = self.task_counter
task = Task(
task_id=task_id,
user_id=user_id,
username=username,
url=url,
chat_id=chat_id,
message_id=message_id,
created_at=datetime.now(),
callback=callback,
status_message_ids=status_message_ids if status_message_ids is not None else []
)
await self.queue.put(task)
position = self.queue.qsize()
logger.info(f"Task {task_id} added to queue. Position: {position}, User: {user_id} (@{username}), URL: {url}")
if callback:
await callback(f"Принято в очередь, позиция: {position}")
return task_id
async def _process_queue(self):
"""Обработка очереди задач (FIFO, последовательно)."""
logger.info("Queue processor started")
while True:
try:
# Получаем задачу из очереди (блокирующая операция)
task = await self.queue.get()
logger.info(f"Processing task {task.task_id} for user {task.user_id}")
if task.callback:
await task.callback("Начинаю обработку")
try:
# Обрабатываем задачу
result = await self.worker(task)
logger.info(f"Task {task.task_id} completed successfully")
except Exception as e:
error_msg = f"Ошибка при обработке: {str(e)}"
logger.error(f"Task {task.task_id} failed: {e}", exc_info=True)
if task.callback:
await task.callback(error_msg)
finally:
self.queue.task_done()
except asyncio.CancelledError:
logger.info("Queue processor cancelled")
break
except Exception as e:
logger.error(f"Error in queue processor: {e}", exc_info=True)
await asyncio.sleep(1) # Небольшая задержка перед следующей попыткой