"""Модуль для скачивания и конвертации YouTube видео в MP3.""" import asyncio import logging import re import subprocess from collections import deque from urllib.parse import parse_qs, urlparse from pathlib import Path from typing import Optional from app.config import Config logger = logging.getLogger(__name__) def sanitize_filename(filename: str, max_length: int = 150, max_bytes: int = 120) -> str: """ Очистка имени файла от запрещённых символов. Args: filename: Исходное имя файла max_length: Максимальная длина имени файла max_bytes: Максимальная длина имени файла в байтах (UTF-8) Returns: Безопасное имя файла """ # Заменяем запрещённые символы на подчёркивание # Windows: < > : " / \ | ? * # Linux: / forbidden_chars = r'[<>:"/\\|?*\x00-\x1f]' sanitized = re.sub(forbidden_chars, '_', filename) # Удаляем пробелы в начале и конце sanitized = sanitized.strip() # Ограничиваем длину if len(sanitized) > max_length: sanitized = sanitized[:max_length] # Ограничиваем длину в байтах (на случай UTF-8 и лимита FS ~255 байт) if len(sanitized.encode('utf-8')) > max_bytes: trimmed = [] total_bytes = 0 for ch in sanitized: ch_bytes = len(ch.encode('utf-8')) if total_bytes + ch_bytes > max_bytes: break trimmed.append(ch) total_bytes += ch_bytes sanitized = ''.join(trimmed) # Если имя пустое, используем дефолтное if not sanitized: sanitized = "audio" return sanitized def is_youtube_url(url: str) -> bool: """Проверка, является ли ссылка YouTube.""" patterns = [ r'https?://(www\.)?youtube\.com/', r'https?://youtu\.be/', ] return any(re.search(pattern, url) for pattern in patterns) def normalize_youtube_url(url: str) -> str: """Свести URL к одиночному видео (убрать list/index и т.п.).""" try: parsed = urlparse(url) host = parsed.netloc.lower() path = parsed.path or "" # youtu.be/ if "youtu.be" in host: video_id = path.strip("/").split("/")[0] if video_id: qs = parse_qs(parsed.query) t = qs.get("t") or qs.get("start") t_suffix = f"&t={t[0]}" if t else "" return f"https://www.youtube.com/watch?v={video_id}{t_suffix}" return url # youtube.com/watch?v= or /shorts/ qs = parse_qs(parsed.query) video_id = None if "v" in qs and qs["v"]: video_id = qs["v"][0] elif path.startswith("/shorts/"): video_id = path.split("/")[2] if len(path.split("/")) > 2 else None if video_id: t = qs.get("t") or qs.get("start") t_suffix = f"&t={t[0]}" if t else "" return f"https://www.youtube.com/watch?v={video_id}{t_suffix}" except Exception: return url return url async def get_video_title(url: str, config: Optional[Config] = None) -> Optional[str]: """ Получить название видео через yt-dlp. Args: url: URL видео на YouTube Returns: Название видео или None в случае ошибки """ try: async def _run_get_title(use_player_client: bool) -> tuple[int, str, str]: cmd = [ 'yt-dlp', '--no-download', '--skip-download', '--get-title', '--no-warnings', '--no-playlist', url ] if config: if config.ytdlp_user_agent: cmd.extend(['--user-agent', config.ytdlp_user_agent]) if config.ytdlp_cookies_file: cmd.extend(['--cookies', config.ytdlp_cookies_file]) if use_player_client and config.ytdlp_player_client: cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}']) if config.ytdlp_force_ipv4: cmd.append('--force-ipv4') process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode('utf-8', errors='ignore'), stderr.decode('utf-8', errors='ignore') # Первый проход — как обычно (с player_client, если задан) returncode, stdout, stderr = await _run_get_title(use_player_client=True) if returncode == 0: title = stdout.strip() if title: logger.info(f"Got title for {url}: {title[:50]}...") return title else: logger.warning(f"Failed to get title for {url}: {stderr}") # Фолбэк — без player_client if config and config.ytdlp_player_client: returncode, stdout, stderr = await _run_get_title(use_player_client=False) if returncode == 0: title = stdout.strip() if title: logger.info(f"Got title for {url} without player_client: {title[:50]}...") return title else: logger.warning(f"Failed to get title for {url} without player_client: {stderr}") return None except Exception as e: logger.error(f"Error getting video title: {e}", exc_info=True) return None async def download_and_convert( url: str, output_path: Path, custom_title: Optional[str] = None, config: Optional[Config] = None ) -> Path: """ Скачать видео и сконвертировать в MP3. Args: url: URL видео на YouTube output_path: Путь для сохранения файла (без расширения) custom_title: Кастомное название файла (опционально) Returns: Путь к созданному MP3 файлу Raises: Exception: При ошибке скачивания или конвертации """ output_path.parent.mkdir(parents=True, exist_ok=True) # Если задано кастомное название, используем его if custom_title: final_path = output_path.parent / f"{sanitize_filename(custom_title)}.mp3" else: final_path = output_path.with_suffix('.mp3') # Временный файл для скачивания (с шаблоном для yt-dlp) temp_template = output_path.parent / f"temp_{output_path.name}.%(ext)s" try: formats_to_try = [ # Избегаем HLS/m3u8 (ffmpeg периодически падает на сегментах) 'bestaudio[protocol!=m3u8][protocol!=m3u8_native][ext=m4a]/' 'bestaudio[protocol!=m3u8][protocol!=m3u8_native][ext=webm]/' 'bestaudio[protocol!=m3u8][protocol!=m3u8_native]/' 'bestaudio', 'bestaudio/best', ] async def _run_yt_dlp(format_selector: str, use_player_client: bool, hls_prefer_native: bool): cmd = [ 'yt-dlp', '-x', # Извлечь аудио '-f', format_selector, '--hls-prefer-native' if hls_prefer_native else '--hls-prefer-ffmpeg', '--audio-format', 'mp3', '--audio-quality', '0', # Лучшее качество '-o', str(temp_template), '--no-warnings', '--progress', '--newline', '--no-playlist', url ] if config: if config.ytdlp_user_agent: cmd.extend(['--user-agent', config.ytdlp_user_agent]) if config.ytdlp_cookies_file: cmd.extend(['--cookies', config.ytdlp_cookies_file]) if use_player_client and config.ytdlp_player_client: cmd.extend(['--extractor-args', f'youtube:player_client={config.ytdlp_player_client}']) if config.ytdlp_force_ipv4: cmd.append('--force-ipv4') logger.info(f"Downloading {url} with format: {format_selector}") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stderr_tail = deque(maxlen=12) stdout_tail = deque(maxlen=12) async def _log_stream(stream, level: str, tail: deque[str]): while True: line = await stream.readline() if not line: break text = line.decode('utf-8', errors='ignore').strip() if text: tail.append(text) if level == "info": logger.info(f"yt-dlp: {text}") else: logger.warning(f"yt-dlp: {text}") stderr_task = asyncio.create_task(_log_stream(process.stderr, "warn", stderr_tail)) stdout_task = asyncio.create_task(_log_stream(process.stdout, "info", stdout_tail)) await process.wait() await stderr_task await stdout_task return process.returncode, list(stderr_tail) + list(stdout_tail) last_tail: list[str] = [] attempts = [] for fmt in formats_to_try: attempts.append((fmt, True, False)) # prefer ffmpeg for HLS by default if config and config.ytdlp_player_client: for fmt in formats_to_try: attempts.append((fmt, False, False)) for fmt, use_player_client, hls_prefer_native in attempts: returncode, tail_lines = await _run_yt_dlp(fmt, use_player_client, hls_prefer_native) last_tail = tail_lines if returncode == 0: break tail_text = "\n".join(tail_lines[-12:]) if tail_lines else "" if "Requested format is not available" in tail_text: logger.warning("yt-dlp format unavailable, retrying with fallback") continue if "Error when loading first segment" in tail_text or "m3u8" in tail_text or "ffmpeg exited with code 183" in tail_text: logger.warning("yt-dlp HLS failed, retrying with native HLS downloader") returncode, tail_lines = await _run_yt_dlp(fmt, use_player_client, True) last_tail = tail_lines if returncode == 0: break tail_text = "\n".join(tail_lines[-12:]) if tail_lines else "" logger.error("yt-dlp failed") if tail_text: raise Exception(f"Ошибка скачивания: yt-dlp завершился с ошибкой\n{tail_text}") raise Exception("Ошибка скачивания: yt-dlp завершился с ошибкой") if returncode != 0: logger.error("yt-dlp failed") if last_tail: tail_text = "\n".join(last_tail[-12:]) raise Exception(f"Ошибка скачивания: yt-dlp завершился с ошибкой\n{tail_text}") raise Exception("Ошибка скачивания: yt-dlp завершился с ошибкой") # Находим скачанный файл (yt-dlp создаст файл с расширением) # Ищем файлы, начинающиеся с temp_ и соответствующие нашему шаблону temp_base = f"temp_{output_path.name}" # Не используем glob-шаблоны, потому что в названии могут быть спецсимволы вроде []. temp_files = [ f for f in output_path.parent.iterdir() if f.is_file() and f.name.startswith(f"{temp_base}.") and f.suffix in ['.mp3', '.m4a', '.webm', '.ogg'] ] if not temp_files: raise Exception("Скачанный файл не найден") temp_file = temp_files[0] # Если файл не MP3, конвертируем через ffmpeg if temp_file.suffix != '.mp3': logger.info(f"Converting {temp_file.suffix} to MP3") await _convert_to_mp3(temp_file, final_path) temp_file.unlink() # Удаляем исходный файл else: # Просто переименовываем if temp_file != final_path: temp_file.rename(final_path) logger.info(f"Renamed {temp_file.name} to {final_path.name}") logger.info(f"Successfully downloaded and converted: {final_path}") return final_path except subprocess.CalledProcessError as e: logger.error(f"Subprocess error: {e}", exc_info=True) raise Exception(f"Ошибка при скачивании: {str(e)}") except Exception as e: logger.error(f"Error in download_and_convert: {e}", exc_info=True) raise async def _convert_to_mp3(input_file: Path, output_file: Path): """Конвертировать аудио файл в MP3 через ffmpeg.""" cmd = [ 'ffmpeg', '-i', str(input_file), '-codec:a', 'libmp3lame', '-qscale:a', '0', # Лучшее качество '-y', # Перезаписать выходной файл str(output_file) ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error = stderr.decode('utf-8', errors='ignore') logger.error(f"ffmpeg conversion failed: {error}") raise Exception(f"Ошибка конвертации: {error[:200]}")