audio_from_youtube/app/user_bot.py

489 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""User-bot для обработки запросов пользователей."""
import asyncio
import logging
from pathlib import Path
from typing import Optional
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
ConversationHandler,
)
from telegram.request import HTTPXRequest
from app.config import Config
from app.queue_manager import QueueManager, Task
from app.youtube_downloader import (
is_youtube_url,
get_video_title,
download_and_convert,
sanitize_filename,
normalize_youtube_url,
)
from app.admin_manager import AdminManager
from app.statistics import Statistics
logger = logging.getLogger(__name__)
# Состояния для ConversationHandler (ожидание имени файла)
WAITING_FOR_FILENAME = 1
# Хранилище ожидающих ответа пользователей: user_id -> (url, output_path, event, result_container)
pending_filename_requests = {}
async def send_status_message(context: ContextTypes.DEFAULT_TYPE, chat_id: int, text: str) -> Optional[int]:
"""Отправить сообщение о статусе и вернуть message_id."""
try:
message = await context.bot.send_message(chat_id=chat_id, text=text)
return message.message_id
except Exception as e:
logger.error(f"Error sending status message: {e}")
return None
async def process_task(task: Task, config: Config, admin_manager: AdminManager, admin_bot_app: Application) -> str:
"""
Обработать задачу: скачать видео и сконвертировать в MP3.
Returns:
Путь к созданному MP3 файлу
"""
normalized_url = normalize_youtube_url(task.url)
# Получаем название видео
title = await get_video_title(normalized_url, config=config)
# Если название не получено, запрашиваем у пользователя
if not title:
# Сохраняем информацию о запросе
output_path = config.workdir / f"task_{task.task_id}"
pending_filename_requests[task.user_id] = (normalized_url, output_path)
# Отправляем запрос пользователю
status_callback = task.callback
if status_callback:
await status_callback("Не смог определить название. Введи имя файла (без расширения .mp3).")
# Ждём ответа пользователя (таймаут 5 минут)
try:
custom_title = await asyncio.wait_for(
_wait_for_filename(task.user_id),
timeout=300.0 # 5 минут
)
title = custom_title
except asyncio.TimeoutError:
raise Exception("Таймаут ожидания имени файла (5 минут)")
finally:
# Удаляем из ожидающих
pending_filename_requests.pop(task.user_id, None)
# Формируем безопасное имя файла
safe_title = sanitize_filename(title)
output_path = config.workdir / f"task_{task.task_id}_{safe_title}"
# Скачиваем и конвертируем
mp3_path = await download_and_convert(normalized_url, output_path, custom_title=safe_title, config=config)
return str(mp3_path)
async def _wait_for_filename(user_id: int) -> str:
"""
Ожидание ответа пользователя с именем файла.
Используется asyncio.Event для синхронизации.
"""
event = asyncio.Event()
result_container = {'value': None}
# Сохраняем event в глобальном словаре для доступа из handler
if user_id not in pending_filename_requests:
raise Exception("Request not found")
# Получаем существующую запись и добавляем event
url, output_path = pending_filename_requests[user_id]
pending_filename_requests[user_id] = (url, output_path, event, result_container)
# Ждём события
await event.wait()
if result_container['value'] is None:
raise Exception("No filename received")
return result_container['value']
async def handle_filename_response(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка ответа пользователя с именем файла."""
user_id = update.effective_user.id
text = update.message.text.strip()
language = get_user_language(update)
if user_id in pending_filename_requests:
try:
url, output_path, event, result_container = pending_filename_requests[user_id]
result_container['value'] = text
event.set()
confirm_msg = f"Использую имя: {text}.mp3" if language == 'ru' else f"Using name: {text}.mp3"
await update.message.reply_text(confirm_msg)
except (ValueError, KeyError):
# Если структура не та, что ожидалась
error_msg = "Ошибка обработки имени файла." if language == 'ru' else "Error processing filename."
await update.message.reply_text(error_msg)
return ConversationHandler.END
else:
# Если нет активного запроса, проверяем, не является ли это ссылкой
return await handle_message(update, context)
def get_user_language(update: Update) -> str:
"""Определить язык пользователя для локализации."""
user = update.effective_user
lang_code = user.language_code or 'en'
# Если язык русский или начинается с ru, возвращаем 'ru', иначе 'en'
return 'ru' if lang_code.startswith('ru') else 'en'
def get_start_message(language: str) -> str:
"""Получить приветственное сообщение в зависимости от языка."""
if language == 'ru':
return (
"Привет! 👋\n\n"
"Я бот для скачивания аудио из YouTube видео.\n\n"
"Просто отправь мне ссылку на YouTube видео, и я верну тебе MP3 файл с аудиодорожкой.\n\n"
"Поддерживаются ссылки:\n"
"• https://www.youtube.com/...\n"
"• https://youtu.be/..."
)
else:
return (
"Hello! 👋\n\n"
"I'm a bot for downloading audio from YouTube videos.\n\n"
"Just send me a link to a YouTube video, and I'll return an MP3 file with the audio track.\n\n"
"Supported links:\n"
"• https://www.youtube.com/...\n"
"• https://youtu.be/..."
)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /start для user-bot."""
user = update.effective_user
language = get_user_language(update)
message = get_start_message(language)
# Добавляем пользователя в статистику
statistics = context.bot_data.get('statistics')
if statistics:
statistics.add_user(user.id)
await update.message.reply_text(message)
logger.info(f"User {user.id} (@{user.username}) started the bot (language: {language})")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработка сообщения от пользователя."""
user = update.effective_user
message = update.message
# Если пользователь ожидает ввода имени файла, не обрабатываем как ссылку
if user.id in pending_filename_requests:
# Это обработается в handle_filename_response
return
text = message.text.strip()
language = get_user_language(update)
# Проверяем, является ли это ссылкой на YouTube
if not is_youtube_url(text):
error_msg = "Пришли ссылку на YouTube-видео." if language == 'ru' else "Please send a YouTube video link."
await message.reply_text(error_msg)
return
# Получаем queue_manager из context
queue_manager: QueueManager = context.bot_data.get('queue_manager')
config: Config = context.bot_data.get('config')
admin_manager: AdminManager = context.bot_data.get('admin_manager')
admin_bot_app: Application = context.bot_data.get('admin_bot_app')
if not queue_manager:
error_msg = "Ошибка: очередь не инициализирована." if language == 'ru' else "Error: queue not initialized."
await message.reply_text(error_msg)
return
# Добавляем пользователя в статистику
statistics = context.bot_data.get('statistics')
if statistics:
statistics.add_user(user.id)
status_message_ids: list[int] = []
# Callback для отправки статуса
async def status_callback(status_text: str):
msg_id = await send_status_message(context, message.chat_id, status_text)
if msg_id is not None:
status_message_ids.append(msg_id)
# Добавляем задачу в очередь
await queue_manager.add_task(
user_id=user.id,
username=user.username,
url=text,
chat_id=message.chat_id,
message_id=message.message_id,
callback=status_callback,
status_message_ids=status_message_ids
)
async def _split_audio_to_parts(file_path: Path, max_part_mb: int, bitrate_kbps: int) -> list[Path]:
"""Разбить аудио на части (перекодирование в CBR для стабильного размера)."""
parts_dir = file_path.parent / f"parts_{file_path.stem}"
parts_dir.mkdir(parents=True, exist_ok=True)
part_pattern = parts_dir / f"{file_path.stem}_part%03d.mp3"
max_bytes = max_part_mb * 1024 * 1024
segment_time = max(60, int((max_bytes * 8) / (bitrate_kbps * 1000))) # минимум 60 секунд
cmd = [
'ffmpeg',
'-i', str(file_path),
'-b:a', f'{bitrate_kbps}k',
'-f', 'segment',
'-segment_time', str(segment_time),
'-reset_timestamps', '1',
'-y',
str(part_pattern),
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error = stderr.decode('utf-8', errors='ignore')
raise Exception(f"Ошибка разбиения на части: {error[:200]}")
prefix = f"{file_path.stem}_part"
parts = sorted(
p for p in parts_dir.iterdir()
if p.is_file() and p.name.startswith(prefix) and p.suffix == ".mp3"
)
logger.info(f"Split into {len(parts)} parts in {parts_dir}")
return parts
async def send_file_to_user(bot, chat_id: int, file_path: str, filename: str, config: Config):
"""Отправить MP3 файл пользователю. Если файл большой — отправить частями."""
try:
path = Path(file_path)
max_bytes = config.max_part_mb * 1024 * 1024
if path.stat().st_size > max_bytes:
parts = await _split_audio_to_parts(path, config.max_part_mb, config.audio_bitrate_kbps)
total = len(parts)
if total == 0:
raise Exception("Не удалось разбить файл на части")
for idx, part in enumerate(parts, start=1):
part_name = f"{idx:02d}_{path.stem}.mp3"
with open(part, 'rb') as f:
await bot.send_document(
chat_id=chat_id,
document=f,
filename=part_name
)
logger.info(f"Sent part {idx}/{total} to user {chat_id}")
# Очистка частей
for part in parts:
part.unlink(missing_ok=True)
parts_dir = parts[0].parent if parts else None
if parts_dir:
try:
parts_dir.rmdir()
except OSError:
logger.warning(f"Parts dir not empty: {parts_dir}")
else:
with open(path, 'rb') as f:
await bot.send_document(
chat_id=chat_id,
document=f,
filename=filename
)
logger.info(f"Sent file {filename} to user {chat_id}")
except Exception as e:
logger.error(f"Error sending file to user: {e}", exc_info=True)
raise
async def send_to_admin_bot(
admin_bot_app: Application,
admin_manager: AdminManager,
config: Config,
title: str,
username: Optional[str],
user_id: int,
url: str,
file_path: str
):
"""Отправить уведомление и файл в admin-bot."""
try:
# Формируем сообщение
requested_by = f"@{username}" if username else f"user_id: {user_id}"
message_text = f"title: {title}\nrequested_by: {requested_by}\nurl: {url}"
# Определяем получателей
if config.admin_chat_id:
recipients = [int(config.admin_chat_id)]
else:
recipients = admin_manager.get_all_admins()
if not recipients:
logger.warning("No admin recipients found")
return
# Отправляем всем получателям
filename = Path(file_path).name
for chat_id in recipients:
try:
# Отправляем текст
await admin_bot_app.bot.send_message(
chat_id=chat_id,
text=message_text
)
# Отправляем файл (если большой — частями)
path = Path(file_path)
max_bytes = config.max_part_mb * 1024 * 1024
if path.stat().st_size > max_bytes:
parts = await _split_audio_to_parts(path, config.max_part_mb, config.audio_bitrate_kbps)
total = len(parts)
for idx, part in enumerate(parts, start=1):
part_name = f"{idx:02d}_{path.stem}.mp3"
with open(part, 'rb') as f:
await admin_bot_app.bot.send_document(
chat_id=chat_id,
document=f,
filename=part_name
)
for part in parts:
part.unlink(missing_ok=True)
parts_dir = parts[0].parent if parts else None
if parts_dir:
parts_dir.rmdir()
else:
with open(file_path, 'rb') as f:
await admin_bot_app.bot.send_document(
chat_id=chat_id,
document=f,
filename=filename
)
logger.info(f"Sent notification and file to admin {chat_id}")
except Exception as e:
logger.error(f"Error sending to admin {chat_id}: {e}")
except Exception as e:
logger.error(f"Error in send_to_admin_bot: {e}", exc_info=True)
def create_user_bot_application(
config: Config,
queue_manager: QueueManager,
admin_manager: AdminManager,
admin_bot_app: Application,
statistics: Statistics
) -> Application:
"""Создать и настроить приложение user-bot."""
# Создаём приложение с увеличенными таймаутами для отправки файлов
request = HTTPXRequest(
connect_timeout=config.tg_connect_timeout,
read_timeout=config.tg_read_timeout,
write_timeout=config.tg_write_timeout,
pool_timeout=config.tg_pool_timeout,
)
app = Application.builder().token(config.user_bot_token).request(request).build()
# Сохраняем в bot_data для доступа из handlers
app.bot_data['config'] = config
app.bot_data['queue_manager'] = queue_manager
app.bot_data['admin_manager'] = admin_manager
app.bot_data['admin_bot_app'] = admin_bot_app
app.bot_data['statistics'] = statistics
# Обработчик команды /start
app.add_handler(CommandHandler("start", start_command))
# Обработчик сообщений (сначала проверяем на ожидание имени файла, потом на ссылку)
message_handler = MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_filename_response # Этот handler проверяет оба случая
)
app.add_handler(message_handler)
return app
async def worker_function(task: Task, config: Config, admin_manager: AdminManager, admin_bot_app: Application, statistics: Statistics, user_bot_app: Optional[Application]) -> str:
"""Функция-воркер для обработки задачи."""
mp3_path = None
try:
# Обрабатываем задачу
mp3_path = await process_task(task, config, admin_manager, admin_bot_app)
# Получаем название файла
filename = Path(mp3_path).name
# Отправляем файл пользователю
if user_bot_app:
await send_file_to_user(
user_bot_app.bot,
task.chat_id,
mp3_path,
filename,
config
)
else:
logger.warning("user_bot_app is not available; skipping send to user")
# Отправляем в admin-bot
await send_to_admin_bot(
admin_bot_app,
admin_manager,
config,
filename.replace('.mp3', ''),
task.username,
task.user_id,
task.url,
mp3_path
)
# Увеличиваем счётчик обработанных ссылок
statistics.increment_processed_urls()
# Удаляем статусные сообщения после завершения
if user_bot_app and task.status_message_ids:
for msg_id in task.status_message_ids:
try:
await user_bot_app.bot.delete_message(chat_id=task.chat_id, message_id=msg_id)
except Exception as e:
logger.warning(f"Failed to delete status message {msg_id}: {e}")
return mp3_path
finally:
# Удаляем временный файл
if mp3_path:
try:
Path(mp3_path).unlink(missing_ok=True)
logger.info(f"Deleted temporary file: {mp3_path}")
except Exception as e:
logger.error(f"Error deleting file {mp3_path}: {e}")