audio_from_youtube/app/youtube_downloader.py

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Модуль для скачивания и конвертации YouTube видео в MP3."""
import asyncio
import logging
import re
import subprocess
from collections import deque
from urllib.parse import parse_qs, urlparse
from pathlib import Path
from typing import Optional
from app.config import Config
logger = logging.getLogger(__name__)
def sanitize_filename(filename: str, max_length: int = 150, max_bytes: int = 120) -> str:
"""
Очистка имени файла от запрещённых символов.
Args:
filename: Исходное имя файла
max_length: Максимальная длина имени файла
max_bytes: Максимальная длина имени файла в байтах (UTF-8)
Returns:
Безопасное имя файла
"""
# Заменяем запрещённые символы на подчёркивание
# Windows: < > : " / \ | ? *
# Linux: /
forbidden_chars = r'[<>:"/\\|?*\x00-\x1f]'
sanitized = re.sub(forbidden_chars, '_', filename)
# Удаляем пробелы в начале и конце
sanitized = sanitized.strip()
# Ограничиваем длину
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
# Ограничиваем длину в байтах (на случай UTF-8 и лимита FS ~255 байт)
if len(sanitized.encode('utf-8')) > max_bytes:
trimmed = []
total_bytes = 0
for ch in sanitized:
ch_bytes = len(ch.encode('utf-8'))
if total_bytes + ch_bytes > max_bytes:
break
trimmed.append(ch)
total_bytes += ch_bytes
sanitized = ''.join(trimmed)
# Если имя пустое, используем дефолтное
if not sanitized:
sanitized = "audio"
return sanitized
def is_youtube_url(url: str) -> bool:
"""Проверка, является ли ссылка YouTube."""
patterns = [
r'https?://(www\.)?youtube\.com/',
r'https?://youtu\.be/',
]
return any(re.search(pattern, url) for pattern in patterns)
def normalize_youtube_url(url: str) -> str:
"""Свести URL к одиночному видео (убрать list/index и т.п.)."""
try:
parsed = urlparse(url)
host = parsed.netloc.lower()
path = parsed.path or ""
# youtu.be/<id>
if "youtu.be" in host:
video_id = path.strip("/").split("/")[0]
if video_id:
qs = parse_qs(parsed.query)
t = qs.get("t") or qs.get("start")
t_suffix = f"&t={t[0]}" if t else ""
return f"https://www.youtube.com/watch?v={video_id}{t_suffix}"
return url
# youtube.com/watch?v=<id> or /shorts/<id>
qs = parse_qs(parsed.query)
video_id = None
if "v" in qs and qs["v"]:
video_id = qs["v"][0]
elif path.startswith("/shorts/"):
video_id = path.split("/")[2] if len(path.split("/")) > 2 else None
if video_id:
t = qs.get("t") or qs.get("start")
t_suffix = f"&t={t[0]}" if t else ""
return f"https://www.youtube.com/watch?v={video_id}{t_suffix}"
except Exception:
return url
return url
async def get_video_title(url: str, config: Optional[Config] = None) -> Optional[str]:
"""
Получить название видео через yt-dlp.
Args:
url: URL видео на YouTube
Returns:
Название видео или None в случае ошибки
"""
try:
cmd = [
'yt-dlp',
'--no-download',
'--skip-download',
'--get-title',
'--no-warnings',
'--no-playlist',
url
]
if config:
if config.ytdlp_user_agent:
cmd.extend(['--user-agent', config.ytdlp_user_agent])
if config.ytdlp_cookies_file:
cmd.extend(['--cookies', config.ytdlp_cookies_file])
if config.ytdlp_player_client:
cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}'])
if config.ytdlp_force_ipv4:
cmd.append('--force-ipv4')
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
title = stdout.decode('utf-8', errors='ignore').strip()
if title:
logger.info(f"Got title for {url}: {title[:50]}...")
return title
else:
error = stderr.decode('utf-8', errors='ignore')
logger.warning(f"Failed to get title for {url}: {error}")
return None
except Exception as e:
logger.error(f"Error getting video title: {e}", exc_info=True)
return None
async def download_and_convert(
url: str,
output_path: Path,
custom_title: Optional[str] = None,
config: Optional[Config] = None
) -> Path:
"""
Скачать видео и сконвертировать в MP3.
Args:
url: URL видео на YouTube
output_path: Путь для сохранения файла (без расширения)
custom_title: Кастомное название файла (опционально)
Returns:
Путь к созданному MP3 файлу
Raises:
Exception: При ошибке скачивания или конвертации
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
# Если задано кастомное название, используем его
if custom_title:
final_path = output_path.parent / f"{sanitize_filename(custom_title)}.mp3"
else:
final_path = output_path.with_suffix('.mp3')
# Временный файл для скачивания (с шаблоном для yt-dlp)
temp_template = output_path.parent / f"temp_{output_path.name}.%(ext)s"
try:
cmd = [
'yt-dlp',
'-x', # Извлечь аудио
'-f', 'bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio/best',
'--hls-prefer-ffmpeg',
'--audio-format', 'mp3',
'--audio-quality', '0', # Лучшее качество
'-o', str(temp_template),
'--no-warnings',
'--progress',
'--newline',
'--no-playlist',
url
]
if config:
if config.ytdlp_user_agent:
cmd.extend(['--user-agent', config.ytdlp_user_agent])
if config.ytdlp_cookies_file:
cmd.extend(['--cookies', config.ytdlp_cookies_file])
if config.ytdlp_player_client:
cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}'])
if config.ytdlp_force_ipv4:
cmd.append('--force-ipv4')
logger.info(f"Downloading {url}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stderr_tail = deque(maxlen=12)
stdout_tail = deque(maxlen=12)
async def _log_stream(stream, level: str, tail: deque[str]):
while True:
line = await stream.readline()
if not line:
break
text = line.decode('utf-8', errors='ignore').strip()
if text:
tail.append(text)
if level == "info":
logger.info(f"yt-dlp: {text}")
else:
logger.warning(f"yt-dlp: {text}")
stderr_task = asyncio.create_task(_log_stream(process.stderr, "warn", stderr_tail))
stdout_task = asyncio.create_task(_log_stream(process.stdout, "info", stdout_tail))
await process.wait()
await stderr_task
await stdout_task
if process.returncode != 0:
logger.error("yt-dlp failed")
tail_lines = list(stderr_tail) + list(stdout_tail)
if tail_lines:
tail_text = "\n".join(tail_lines[-12:])
raise Exception(f"Ошибка скачивания: yt-dlp завершился с ошибкой\n{tail_text}")
raise Exception("Ошибка скачивания: yt-dlp завершился с ошибкой")
# Находим скачанный файл (yt-dlp создаст файл с расширением)
# Ищем файлы, начинающиеся с temp_ и соответствующие нашему шаблону
temp_base = f"temp_{output_path.name}"
# Не используем glob-шаблоны, потому что в названии могут быть спецсимволы вроде [].
temp_files = [
f for f in output_path.parent.iterdir()
if f.is_file()
and f.name.startswith(f"{temp_base}.")
and f.suffix in ['.mp3', '.m4a', '.webm', '.ogg']
]
if not temp_files:
raise Exception("Скачанный файл не найден")
temp_file = temp_files[0]
# Если файл не MP3, конвертируем через ffmpeg
if temp_file.suffix != '.mp3':
logger.info(f"Converting {temp_file.suffix} to MP3")
await _convert_to_mp3(temp_file, final_path)
temp_file.unlink() # Удаляем исходный файл
else:
# Просто переименовываем
if temp_file != final_path:
temp_file.rename(final_path)
logger.info(f"Renamed {temp_file.name} to {final_path.name}")
logger.info(f"Successfully downloaded and converted: {final_path}")
return final_path
except subprocess.CalledProcessError as e:
logger.error(f"Subprocess error: {e}", exc_info=True)
raise Exception(f"Ошибка при скачивании: {str(e)}")
except Exception as e:
logger.error(f"Error in download_and_convert: {e}", exc_info=True)
raise
async def _convert_to_mp3(input_file: Path, output_file: Path):
"""Конвертировать аудио файл в MP3 через ffmpeg."""
cmd = [
'ffmpeg',
'-i', str(input_file),
'-codec:a', 'libmp3lame',
'-qscale:a', '0', # Лучшее качество
'-y', # Перезаписать выходной файл
str(output_file)
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error = stderr.decode('utf-8', errors='ignore')
logger.error(f"ffmpeg conversion failed: {error}")
raise Exception(f"Ошибка конвертации: {error[:200]}")