audio_from_youtube/app/youtube_downloader.py

253 lines
9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Модуль для скачивания и конвертации YouTube видео в MP3."""
import asyncio
import logging
import re
import subprocess
from pathlib import Path
from typing import Optional
from app.config import Config
logger = logging.getLogger(__name__)
def sanitize_filename(filename: str, max_length: int = 150) -> str:
"""
Очистка имени файла от запрещённых символов.
Args:
filename: Исходное имя файла
max_length: Максимальная длина имени файла
Returns:
Безопасное имя файла
"""
# Заменяем запрещённые символы на подчёркивание
# Windows: < > : " / \ | ? *
# Linux: /
forbidden_chars = r'[<>:"/\\|?*\x00-\x1f]'
sanitized = re.sub(forbidden_chars, '_', filename)
# Удаляем пробелы в начале и конце
sanitized = sanitized.strip()
# Ограничиваем длину
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
# Если имя пустое, используем дефолтное
if not sanitized:
sanitized = "audio"
return sanitized
def is_youtube_url(url: str) -> bool:
"""Проверка, является ли ссылка YouTube."""
patterns = [
r'https?://(www\.)?youtube\.com/',
r'https?://youtu\.be/',
]
return any(re.search(pattern, url) for pattern in patterns)
async def get_video_title(url: str, config: Optional[Config] = None) -> Optional[str]:
"""
Получить название видео через yt-dlp.
Args:
url: URL видео на YouTube
Returns:
Название видео или None в случае ошибки
"""
try:
cmd = [
'yt-dlp',
'--no-download',
'--skip-download',
'--get-title',
'--no-warnings',
url
]
if config:
if config.ytdlp_user_agent:
cmd.extend(['--user-agent', config.ytdlp_user_agent])
if config.ytdlp_cookies_file:
cmd.extend(['--cookies', config.ytdlp_cookies_file])
if config.ytdlp_player_client:
cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}'])
if config.ytdlp_force_ipv4:
cmd.append('--force-ipv4')
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
title = stdout.decode('utf-8', errors='ignore').strip()
if title:
logger.info(f"Got title for {url}: {title[:50]}...")
return title
else:
error = stderr.decode('utf-8', errors='ignore')
logger.warning(f"Failed to get title for {url}: {error}")
return None
except Exception as e:
logger.error(f"Error getting video title: {e}", exc_info=True)
return None
async def download_and_convert(
url: str,
output_path: Path,
custom_title: Optional[str] = None,
config: Optional[Config] = None
) -> Path:
"""
Скачать видео и сконвертировать в MP3.
Args:
url: URL видео на YouTube
output_path: Путь для сохранения файла (без расширения)
custom_title: Кастомное название файла (опционально)
Returns:
Путь к созданному MP3 файлу
Raises:
Exception: При ошибке скачивания или конвертации
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
# Если задано кастомное название, используем его
if custom_title:
final_path = output_path.parent / f"{sanitize_filename(custom_title)}.mp3"
else:
final_path = output_path.with_suffix('.mp3')
# Временный файл для скачивания (с шаблоном для yt-dlp)
temp_template = output_path.parent / f"temp_{output_path.name}.%(ext)s"
try:
cmd = [
'yt-dlp',
'-x', # Извлечь аудио
'-f', 'bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio/best',
'--hls-prefer-ffmpeg',
'--audio-format', 'mp3',
'--audio-quality', '0', # Лучшее качество
'-o', str(temp_template),
'--no-warnings',
'--progress',
'--newline',
url
]
if config:
if config.ytdlp_user_agent:
cmd.extend(['--user-agent', config.ytdlp_user_agent])
if config.ytdlp_cookies_file:
cmd.extend(['--cookies', config.ytdlp_cookies_file])
if config.ytdlp_player_client:
cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}'])
if config.ytdlp_force_ipv4:
cmd.append('--force-ipv4')
logger.info(f"Downloading {url}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async def _log_stream(stream, level: str):
while True:
line = await stream.readline()
if not line:
break
text = line.decode('utf-8', errors='ignore').strip()
if text:
if level == "info":
logger.info(f"yt-dlp: {text}")
else:
logger.warning(f"yt-dlp: {text}")
stderr_task = asyncio.create_task(_log_stream(process.stderr, "info"))
stdout_task = asyncio.create_task(_log_stream(process.stdout, "info"))
await process.wait()
await stderr_task
await stdout_task
if process.returncode != 0:
logger.error("yt-dlp failed")
raise Exception("Ошибка скачивания: yt-dlp завершился с ошибкой")
# Находим скачанный файл (yt-dlp создаст файл с расширением)
# Ищем файлы, начинающиеся с temp_ и соответствующие нашему шаблону
temp_base = f"temp_{output_path.name}"
# Не используем glob-шаблоны, потому что в названии могут быть спецсимволы вроде [].
temp_files = [
f for f in output_path.parent.iterdir()
if f.is_file()
and f.name.startswith(f"{temp_base}.")
and f.suffix in ['.mp3', '.m4a', '.webm', '.ogg']
]
if not temp_files:
raise Exception("Скачанный файл не найден")
temp_file = temp_files[0]
# Если файл не MP3, конвертируем через ffmpeg
if temp_file.suffix != '.mp3':
logger.info(f"Converting {temp_file.suffix} to MP3")
await _convert_to_mp3(temp_file, final_path)
temp_file.unlink() # Удаляем исходный файл
else:
# Просто переименовываем
if temp_file != final_path:
temp_file.rename(final_path)
logger.info(f"Renamed {temp_file.name} to {final_path.name}")
logger.info(f"Successfully downloaded and converted: {final_path}")
return final_path
except subprocess.CalledProcessError as e:
logger.error(f"Subprocess error: {e}", exc_info=True)
raise Exception(f"Ошибка при скачивании: {str(e)}")
except Exception as e:
logger.error(f"Error in download_and_convert: {e}", exc_info=True)
raise
async def _convert_to_mp3(input_file: Path, output_file: Path):
"""Конвертировать аудио файл в MP3 через ffmpeg."""
cmd = [
'ffmpeg',
'-i', str(input_file),
'-codec:a', 'libmp3lame',
'-qscale:a', '0', # Лучшее качество
'-y', # Перезаписать выходной файл
str(output_file)
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error = stderr.decode('utf-8', errors='ignore')
logger.error(f"ffmpeg conversion failed: {error}")
raise Exception(f"Ошибка конвертации: {error[:200]}")