videoDownloadTGbot/youtube-downloader/app.py

928 lines
48 KiB
Python
Raw Normal View History

"""
YouTube Video Downloader Service
Отдельный микросервис для скачивания видео с YouTube
"""
import os
import time
import logging
import traceback
from pathlib import Path
from flask import Flask, request, jsonify
from flask_cors import CORS
import yt_dlp
import uuid
import re
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Разрешаем CORS для взаимодействия с основным ботом
# Директория для временных файлов
DOWNLOADS_DIR = Path('downloads')
DOWNLOADS_DIR.mkdir(exist_ok=True)
def _safe_filename(title: str) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{uuid.uuid4()}_{safe_title}.%(ext)s')
def _cleanup_downloads():
"""Удаляет все файлы из папки загрузок"""
for f in DOWNLOADS_DIR.glob('*'):
try:
f.unlink()
except Exception:
pass
def _is_valid_cookies_file(cookies_path: Path) -> bool:
"""Проверяет, что файл cookies существует и содержит данные (не только заголовки)"""
logger.info(f"[COOKIES CHECK] Проверка файла cookies: {cookies_path.absolute()}")
if not cookies_path.exists():
logger.warning(f"[COOKIES CHECK] Файл не существует: {cookies_path.absolute()}")
return False
try:
file_size = cookies_path.stat().st_size
logger.info(f"[COOKIES CHECK] Размер файла: {file_size} байт")
with open(cookies_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
lines = [line.strip() for line in all_lines if line.strip() and not line.strip().startswith('#')]
logger.info(f"[COOKIES CHECK] Всего строк в файле: {len(all_lines)}, валидных строк (не комментариев): {len(lines)}")
# Логируем первые 3 строки для диагностики (без чувствительных данных)
if lines:
preview_lines = []
for i, line in enumerate(lines[:3]):
# Маскируем значения cookie для безопасности
if '\t' in line:
parts = line.split('\t')
if len(parts) > 6:
masked_line = '\t'.join(parts[:6]) + '\t***MASKED***'
preview_lines.append(f" Строка {i+1}: {masked_line[:100]}")
else:
preview_lines.append(f" Строка {i+1}: {line[:100]}")
logger.info(f"[COOKIES CHECK] Превью первых строк:\n" + "\n".join(preview_lines))
# Проверяем, что есть хотя бы одна строка с данными cookie
is_valid = len(lines) > 0
logger.info(f"[COOKIES CHECK] Результат проверки: {'VALID' if is_valid else 'INVALID'}")
return is_valid
except Exception as e:
logger.error(f"[COOKIES CHECK] Ошибка при проверке файла cookies: {e}")
logger.error(f"[COOKIES CHECK] Traceback:\n{traceback.format_exc()}")
return False
def _parse_height(format_dict: dict) -> int:
"""Извлекает реальную высоту из формата: height/width/format_note/resolution"""
h = format_dict.get('height')
w = format_dict.get('width')
# Для вертикальных видео (Shorts) height и width могут быть перепутаны —
# берём меньшее значение как честный показатель разрешения
if h and w and isinstance(h, (int, float)) and isinstance(w, (int, float)):
real_h = min(int(h), int(w))
if real_h > 0:
return real_h
if h and isinstance(h, (int, float)) and h > 0:
return int(h)
if w and isinstance(w, (int, float)) and w > 0:
return int(w)
# Если вообще нет размеров — парсим format_note (например "360p" или "1080x1920")
note = str(format_dict.get('format_note', '') or '')
match = re.search(r'(\d+)\s*p', note)
if match:
return int(match.group(1))
match = re.search(r'(\d+)\s*x\s*(\d+)', note, re.IGNORECASE)
if match:
return min(int(match.group(1)), int(match.group(2)))
# Парсим поле resolution (например "1920x1080")
res = str(format_dict.get('resolution', '') or '')
match = re.search(r'(\d+)\s*x\s*(\d+)', res, re.IGNORECASE)
if match:
return min(int(match.group(1)), int(match.group(2)))
return 0
def _extract_height_from_format_id(format_id: str) -> int | None:
"""Извлекает ограничение по высоте из format_id (например 'best[height<=360]' -> 360)"""
match = re.search(r'height<[=]?\s*(\d+)', format_id)
if match:
return int(match.group(1))
return None
def _make_base_ydl_opts(user_agent: str, cookies_file_path: Path | None = None) -> dict:
"""Формирует базовые опции yt-dlp, общие для info и download
Стратегия выбора player_client:
- Cookies есть используем web клиенты (требуют n-challenge решения),
включаем js_runtimes + remote_components для решения n-challenge через Node.js.
- Cookies нет используем android клиент (не поддерживает cookies,
но не требует n-challenge, хотя даёт меньше форматов).
"""
extractor_args = {}
cookies_available = cookies_file_path is not None and cookies_file_path.exists()
if not cookies_available:
# Без cookies используем android — он не требует n-challenge
extractor_args = {
'youtube': {
'player_client': ['android'],
'skip': ['translated_subs', 'hls'],
},
}
opts = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 60,
'extractor_retries': 3,
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://www.youtube.com/',
},
}
if extractor_args:
opts['extractor_args'] = extractor_args
if cookies_available:
opts['cookiefile'] = str(cookies_file_path.absolute())
# Включаем n-challenge решение через Node.js + EJS скрипт с GitHub
# yt-dlp скачает challenge solver скрипт из официального репозитория
opts['js_runtimes'] = {'node': {}}
opts['remote_components'] = ['ejs:github']
return opts
def _find_latest_downloaded() -> Path | None:
"""Возвращает самый свежий файл в папке загрузок."""
files = list(DOWNLOADS_DIR.glob('*'))
if not files:
return None
files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return files[0]
def _file_has_video_stream(filepath: Path) -> bool:
"""Проверяет через ffprobe, содержит ли файл видео-поток."""
import subprocess
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=codec_type', '-of', 'csv=p=0',
str(filepath)],
capture_output=True, text=True, timeout=15
)
return result.stdout.strip() == 'video'
except Exception as e:
logger.warning(f"[VALIDATE] Не удалось проверить видео-поток в {filepath.name}: {e}")
return True # в случае ошибки считаем, что видео есть
def download_youtube_video(url: str, max_retries: int = 3, format_id: str | None = None) -> Path:
"""Скачивает видео с YouTube - используем cookies для обхода блокировок"""
logger.info(f"[DOWNLOAD] Начало скачивания: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
logger.info(f"[DOWNLOAD] Путь к файлу cookies из env: {cookies_file}, абсолютный: {cookies_file_path.absolute()}")
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[DOWNLOAD] Файл cookies не найден или невалиден ({cookies_file_path}). "
f"Работаем без cookies. Для лучшей работы рекомендуется обновить cookies через скрипт get_youtube_cookies.sh")
else:
logger.info(f"[DOWNLOAD] Cookies файл валиден, будет использован: {cookies_file_path.absolute()}")
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
last_download_errors = [] # собираем ошибки по форматам для диагностики
for attempt in range(max_retries):
try:
is_shorts = '/shorts/' in url
# Базовые настройки для получения информации
ydl_opts_info = _make_base_ydl_opts(user_agent, cookies_file_path if cookies_valid else None)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: {'cookies включены' if cookies_valid else 'работаем без cookies'}")
# Пробуем получить информацию о видео
info = None
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: извлечение информации о видео с URL: {url}")
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: информация о видео успешно получена")
except Exception as info_error:
error_str = str(info_error)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при получении информации о видео: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если не получилось с cookies, пробуем без них
# Проверяем различные признаки проблем с cookies:
# - явные ошибки с cookies/bot/sign in
# - ошибки формата (могут быть из-за блокировки YouTube при неполных cookies)
# - "Only images are available" (признак блокировки YouTube)
# - "Missing required Data Sync ID" (неполные cookies)
should_retry_without_cookies = cookies_valid and (
'cookies' in error_lower or
'bot' in error_lower or
'sign in' in error_lower or
'authentication' in error_lower or
'format is not available' in error_lower or
'only images are available' in error_lower or
'missing required data sync id' in error_lower or
'challenge solving failed' in error_lower
)
if should_retry_without_cookies:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка, возможно связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies...")
ydl_opts_info.pop('cookiefile', None)
try:
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно получена информация без cookies!")
cookies_valid = False # Отключаем cookies для скачивания тоже
except Exception as retry_error:
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
raise
else:
raise
video_title = info.get('title', 'video') if info else 'video'
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Настройки для скачивания
# Если передан format_id — это может быть:
# 1) Конкретный format code (число, например "18" или "137+140") — точный выбор качества
# 2) Format selector (например "bestvideo[height<=240]+bestaudio/best") — старый формат
#
# Для конкретных format codes: если формат недоступен, НЕ падаем на best,
# а пробуем format selector для того же разрешения (извлекаем height из запроса пользователя).
# Это важно, т.к. format_id из get_youtube_formats() может не совпадать
# с format_id при повторном extract_info() в download_youtube_video().
default_format_options = [
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'best[ext=mp4]/best',
'bestvideo+bestaudio/best',
'best',
]
# Добавляем fallback на combined форматы (например 18), которые всегда доступны
combined_fallback = ['best[ext=mp4]/best', 'best']
requested_height = None # высота, запрошенная пользователем
if format_id:
is_specific_code = not ('[' in format_id or ']' in format_id)
requested_height = _extract_height_from_format_id(format_id)
if requested_height is not None:
# Конкретный format_id (из /formats) ставим ПЕРВЫМ —
# он точно указывает выбранные пользователем format codes.
# Height-ограниченный селектор идёт как fallback
# (c исключением av01, чтобы yt-dlp не выбрал unplayable формат 400).
format_options = [
format_id,
f"bestvideo[height<={requested_height}][vcodec!=av01]+bestaudio/best[height<={requested_height}]",
]
logger.info(f"[DOWNLOAD] Размерное ограничение: {requested_height}p, format_id: {format_id}")
format_options.extend(default_format_options)
format_options.extend(combined_fallback)
else:
format_options = [format_id]
format_options.extend(default_format_options)
logger.info(f"[DOWNLOAD] Итоговый список format_options ({len(format_options)} шт.): {format_options}")
else:
format_options = default_format_options
download_success = False
for format_option in format_options:
ydl_opts_download = _make_base_ydl_opts(user_agent, cookies_file_path if cookies_valid else None)
ydl_opts_download.update({
'format': format_option,
'outtmpl': _safe_filename(video_title),
# fragment_retries — для DASH форматов (видео без аудио),
# YouTube может разрывать фрагменты; увеличиваем retries
'fragment_retries': 3,
# allow_unplayable_formats — позволяет скачивать форматы,
# которые YouTube помечает как "недоступные" для сторонних клиентов
'allow_unplayable_formats': True,
})
use_cookies_this_attempt = cookies_valid
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries}: начинаем скачивание (Shorts: {is_shorts}, формат: {format_option}, cookies: {use_cookies_this_attempt})")
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: запуск yt-dlp для скачивания с форматом {format_option}")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
result_info = ydl.download([url])
# Логируем информацию о том, что реально скачалось
if result_info:
for entry in result_info:
if entry:
actual_format_id = entry.get('format_id', 'unknown')
actual_height = entry.get('height', 'unknown')
actual_ext = entry.get('ext', 'unknown')
actual_filesize = entry.get('filesize') or entry.get('filesize_approx') or 'unknown'
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: реально скачан формат: id={actual_format_id}, height={actual_height}, ext={actual_ext}, size={actual_filesize}")
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано с форматом {format_option}")
# Проверяем, что файл содержит видео-поток, а не только аудио
# (yt-dlp c allow_unplayable_formats может скачать av01 формат
# и отказаться от мержа, вернув только аудио)
downloaded = _find_latest_downloaded()
if downloaded and not _file_has_video_stream(downloaded):
logger.warning(f"[DOWNLOAD] Файл {downloaded.name} не содержит видео-потока (только аудио). Удаляем и пробуем следующий формат...")
try:
downloaded.unlink()
except Exception:
pass
continue
download_success = True
break
except Exception as download_error:
error_str = str(download_error)
error_lower = error_str.lower()
last_download_errors.append(f"[{format_option}] {error_str[:300]}")
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если ошибка с cookies, пробуем без них
if use_cookies_this_attempt and ('cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower):
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies для формата {format_option}: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies...")
# Пересоздаём opts без cookies
ydl_opts_download_no_cookies = _make_base_ydl_opts(user_agent, None)
ydl_opts_download_no_cookies.update({
'format': format_option,
'outtmpl': _safe_filename(video_title),
'fragment_retries': 3,
'allow_unplayable_formats': True,
})
try:
with yt_dlp.YoutubeDL(ydl_opts_download_no_cookies) as ydl:
ydl.download([url])
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано без cookies")
downloaded = _find_latest_downloaded()
if downloaded and not _file_has_video_stream(downloaded):
logger.warning(f"[DOWNLOAD] Файл {downloaded.name} без видео-потока. Удаляем и пробуем следующий формат...")
try:
downloaded.unlink()
except Exception:
pass
continue
download_success = True
cookies_valid = False # Отключаем cookies для следующих попыток
break
except Exception as retry_error:
retry_str = str(retry_error)
last_download_errors.append(f"[{format_option} без cookies] {retry_str[:300]}")
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
# Если и без cookies не получилось, пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: не удалось скачать без cookies, пробуем следующий формат...")
continue
# Если ошибка формата, пробуем следующий формат
elif 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: формат {format_option} недоступен, пробуем следующий...")
continue
else:
# Другая ошибка - пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str[:200]}, пробуем следующий...")
continue
if not download_success:
# Собираем детальный отчёт об ошибках
errors_summary = "; ".join(last_download_errors[-10:]) # последние 10 ошибок
raise Exception(f"Не удалось скачать видео ни с одним из доступных форматов. Ошибки: {errors_summary}")
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob('*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return downloaded_files[0]
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries} не удалась: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback попытки {attempt + 1}:\n{traceback.format_exc()}")
# Если ошибка связана с форматом, пробуем другие настройки
if 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: проблема с форматом, пробуем другие настройки на следующей попытке")
if attempt < max_retries - 1:
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
continue
# Если ошибка связана с cookies и они были использованы, попробуем без cookies на следующей попытке
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
if cookies_valid and attempt == 0:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: на следующей попытке попробуем без cookies")
cookies_valid = False
if attempt < max_retries - 1:
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
# Включаем в итоговую ошибку сводку по форматам
errors_summary = "; ".join(last_download_errors[-10:]) if last_download_errors else ""
error_msg = str(last_error) if last_error else "Неизвестная ошибка при скачивании с YouTube"
if errors_summary:
error_msg += f" | Ошибки форматов: {errors_summary}"
raise Exception(error_msg)
def get_youtube_formats(url: str) -> list[dict]:
"""Получает список доступных форматов видео с YouTube"""
logger.info(f"[FORMATS] Получение списка форматов для: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[FORMATS] Cookies файл не найден или невалиден. Работаем без cookies.")
# Пробуем сначала с cookies (если есть), потом без
attempts_configs = []
if cookies_valid:
attempts_configs.append({
'use_cookies': True,
'label': 'с cookies'
})
attempts_configs.append({
'use_cookies': False,
'label': 'без cookies'
})
last_error = None
info = None
for config in attempts_configs:
try:
logger.info(f"[FORMATS] Попытка: {config['label']}")
# Для /formats используем те же улучшенные опции (player_client, retries и т.д.),
# но с quiet=True чтобы не засорять логи
ydl_opts = _make_base_ydl_opts(
user_agent,
cookies_file_path if config['use_cookies'] else None
)
ydl_opts['quiet'] = True
ydl_opts['no_warnings'] = True
ydl_opts['socket_timeout'] = 30
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[FORMATS] Успешно получена информация {config['label']}")
break # Успех - выходим из цикла
except Exception as e:
error_str = str(e)
last_error = e
logger.warning(f"[FORMATS] Ошибка {config['label']}: {error_str[:200]}")
# Если это была попытка с cookies, и ошибка похожа на проблему с cookies -
# продолжаем дальше (следующая попытка будет без cookies)
if config['use_cookies'] and (
'cookiefile' in error_str.lower()
or 'requested format' in error_str.lower()
or 'http error' in error_str.lower()
or 'only images are available' in error_str.lower()
or 'n challenge' in error_str.lower()
or 'challenge solving' in error_str.lower()
):
logger.info(f"[FORMATS] Ошибка с cookies, пробуем без cookies...")
continue
continue
if info is None:
logger.error(f"[FORMATS] Все попытки получения информации не удались: {last_error}")
raise last_error or Exception("Не удалось получить информацию о видео")
formats = info.get('formats', [])
logger.info(f"[FORMATS] Всего форматов: {len(formats)}")
duration = info.get('duration') # длительность видео в секундах
logger.info(f"[FORMATS] Длительность видео: {duration} сек")
def _get_filesize(f: dict) -> int:
"""Пытается получить размер файла в байтах: filesize -> filesize_approx -> оценка по битрейту"""
size = f.get('filesize') or f.get('filesize_approx') or 0
if size:
return size
# Если размер неизвестен, оцениваем по битрейту и длительности
if duration:
# Для форматов, которые содержат и видео и аудио, используем tbr
tbr = f.get('tbr') or 0
if tbr:
return int(tbr * 1024 / 8 * duration)
# Для видео-без-аудио: vbr видео + abr аудио
vbr = f.get('vbr') or 0
abr = f.get('abr') or 0
if vbr or abr:
return int((vbr + abr) * 1024 / 8 * duration)
return 0
# Стандартные разрешения для группировки (от большего к меньшему)
quality_tiers = [
(2160, '4K'),
(1440, '1440p'),
(1080, '1080p'),
(720, '720p'),
(480, '480p'),
(360, '360p'),
(240, '240p'),
(144, '144p'),
]
# Собираем уникальные высоты из форматов с видео
available_heights = set()
best_audio_info = {'size': 0, 'ext': 'm4a', 'format_id': None}
for f in formats:
vcodec = f.get('vcodec', 'none')
acodec = f.get('acodec', 'none')
height = _parse_height(f)
format_id = f.get('format_id', '')
if vcodec != 'none' and height > 0:
available_heights.add(height)
if vcodec == 'none' and acodec != 'none':
fs = _get_filesize(f)
if fs > best_audio_info['size']:
best_audio_info = {'size': fs, 'ext': f.get('ext', 'm4a'), 'format_id': format_id}
logger.info(f"[FORMATS] Доступные разрешения: {sorted(available_heights)}")
logger.info(f"[FORMATS] Лучший аудиопоток: {best_audio_info['size']} bytes, {best_audio_info['ext']}, format_id={best_audio_info['format_id']}")
result = []
used_heights = set()
# Определяем реальную максимальную высоту видео
max_actual_height = max(available_heights) if available_heights else 2160
for max_height, label in quality_tiers:
if max_height > max_actual_height:
continue # не показываем 4K для видео с макс высотой 1080p
best_video = None
best_video_height = 0
for f in formats:
vcodec = f.get('vcodec', 'none')
height = _parse_height(f)
if vcodec == 'none' or height <= 0:
continue
if height <= max_height and height > best_video_height:
best_video = f
best_video_height = height
if not best_video:
continue
if best_video_height in used_heights:
continue
used_heights.add(best_video_height)
video_size = _get_filesize(best_video)
has_audio = best_video.get('acodec', 'none') != 'none'
total_size = video_size + (best_audio_info['size'] if not has_audio else 0)
video_ext = best_video.get('ext', 'mp4')
video_format_id = best_video.get('format_id', '')
# Честный лейбл из реальной высоты
format_note = best_video.get('format_note', '') or ''
if format_note and str(best_video_height) in format_note:
display_label = format_note
else:
display_label = f"{best_video_height}p"
logger.info(f"[FORMATS] {display_label} (height={best_video_height}): video_size={video_size}, has_audio={has_audio}, total={total_size}, format_id={video_format_id}")
# format_selector без /best в конце — чтобы yt-dlp не молча скатывался на другой размер
if has_audio:
format_selector = f"{video_format_id}/bestvideo[height<={best_video_height}]+bestaudio/best[height<={best_video_height}]"
elif best_audio_info['format_id']:
format_selector = (
f"{video_format_id}+{best_audio_info['format_id']}/"
f"bestvideo[height<={best_video_height}]+bestaudio/"
f"best[height<={best_video_height}]"
)
else:
format_selector = f"{video_format_id}+bestaudio/bestvideo[height<={best_video_height}]+bestaudio/best[height<={best_video_height}]"
result.append({
'format_id': format_selector,
'label': f"{display_label} ({video_ext})",
'quality': display_label,
'ext': video_ext,
'filesize_mb': round(total_size / 1024 / 1024, 1) if total_size else None,
})
# Добавляем аудиодорожку
if best_audio_info['size']:
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only ({best_audio_info['ext']})",
'quality': 'audio',
'ext': best_audio_info['ext'],
'filesize_mb': round(best_audio_info['size'] / 1024 / 1024, 1) if best_audio_info['size'] else None,
})
# ---------------------------------------------------------------
# Если реальных форматов совсем нет — генерируем оценочные
# (бывает при очень плохих cookies, когда даже format_note пустой)
# ---------------------------------------------------------------
if len(result) == 0:
logger.info(f"[FORMATS] Реальных форматов не найдено, генерируем оценочные")
# Пытаемся определить реальную максимальную высоту из всех полей
max_possible_height = 0
for f in formats:
height = _parse_height(f)
if height > max_possible_height:
max_possible_height = height
if max_possible_height == 0:
# Если ничего не смогли определить — используем format_note напрямую
for f in formats:
note = str(f.get('format_note', '') or '')
numbers = re.findall(r'(\d+)', note)
for num in numbers:
n = int(num)
if 100 < n < 10000 and n > max_possible_height:
max_possible_height = n
if max_possible_height == 0:
max_possible_height = 2160
available_tiers = [(h, l) for h, l in quality_tiers if h <= max_possible_height]
TYPICAL_VIDEO_BITRATES: dict[int, int] = {
2160: 40000, 1440: 20000, 1080: 10000, 720: 5000,
480: 2500, 360: 1200, 240: 600, 144: 300,
}
AUDIO_BITRATE = 128
result = []
if duration:
for max_height, label in available_tiers:
video_kbps = TYPICAL_VIDEO_BITRATES.get(max_height, 1000)
total_kbps = video_kbps + AUDIO_BITRATE
estimated_bytes = total_kbps * 1000 / 8 * duration
estimated_mb = round(estimated_bytes / 1024 / 1024, 1)
format_selector = f"bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]"
result.append({
'format_id': format_selector,
'label': f"{label} (mp4)",
'quality': label,
'ext': 'mp4',
'filesize_mb': estimated_mb,
})
logger.info(f"[FORMATS] Оценка: {label}: ~{estimated_mb} МБ (битрейт {video_kbps} кбит/с)")
audio_bytes = AUDIO_BITRATE * 1000 / 8 * duration
audio_mb = round(audio_bytes / 1024 / 1024, 1)
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only (m4a)",
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': audio_mb,
})
else:
for max_height, label in available_tiers:
format_selector = f"bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]"
result.append({
'format_id': format_selector,
'label': label,
'quality': label,
'ext': 'mp4',
'filesize_mb': None,
})
result.append({
'format_id': 'bestaudio/best',
'label': 'Audio only (m4a)',
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': None,
})
logger.info(f"[FORMATS] Возвращаем {len(result)} форматов")
return result
# Простой кэш форматов: {normalized_url: (timestamp, list_of_formats)}
# Форматы YouTube не меняются часто, кэшируем на 30 минут
_formats_cache: dict[str, tuple[float, list[dict]]] = {}
_FORMATS_CACHE_TTL = 30 * 60 # 30 минут в секундах
def _normalize_youtube_url(url: str) -> str:
"""Нормализует YouTube URL: убирает tracking параметры (?si=...),
приводит к единому виду для кэширования."""
import re
# Оставляем только video ID из youtu.be или youtube.com
# youtu.be/VIDEO_ID?si=... -> youtu.be/VIDEO_ID
m = re.search(r'(youtu\.be/|youtube\.com/watch\?v=)([a-zA-Z0-9_-]{11})', url)
if m:
prefix, video_id = m.group(1), m.group(2)
return f"https://www.youtube.com/watch?v={video_id}"
return url # если не распознали, кэшируем как есть
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'youtube-downloader'}), 200
@app.route('/formats', methods=['POST'])
def formats():
"""Возвращает список доступных форматов для YouTube URL"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[FORMATS {request_id}] ========== ЗАПРОС ФОРМАТОВ ==========")
try:
data = request.get_json()
if not data or 'url' not in data:
return jsonify({'error': 'URL is required'}), 400
url = data['url']
if 'youtube.com' not in url and 'youtu.be' not in url:
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
# Нормализуем URL и проверяем кэш
cache_key = _normalize_youtube_url(url)
now = time.time()
if cache_key in _formats_cache:
cached_time, cached_formats = _formats_cache[cache_key]
if now - cached_time < _FORMATS_CACHE_TTL:
logger.info(f"[FORMATS {request_id}] Возвращаем из кэша ({len(cached_formats)} форматов, возраст {now - cached_time:.0f}с)")
return jsonify({'formats': cached_formats}), 200
else:
logger.info(f"[FORMATS {request_id}] Кэш устарел ({now - cached_time:.0f}с > {_FORMATS_CACHE_TTL}с), обновляем...")
del _formats_cache[cache_key]
format_list = get_youtube_formats(url)
# Сохраняем в кэш
_formats_cache[cache_key] = (time.time(), format_list)
logger.info(f"[FORMATS {request_id}] Сохранено в кэш {len(format_list)} форматов для {cache_key}")
return jsonify({'formats': format_list}), 200
except Exception as e:
logger.error(f"[FORMATS {request_id}] Ошибка: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/download/stream', methods=['POST'])
def download_stream():
"""Скачивает видео с YouTube и возвращает бинарные данные"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[REQUEST {request_id}] ========== НОВЫЙ ЗАПРОС ==========")
logger.info(f"[REQUEST {request_id}] Метод: {request.method}")
logger.info(f"[REQUEST {request_id}] URL: {request.url}")
logger.info(f"[REQUEST {request_id}] Remote Address: {request.remote_addr}")
logger.info(f"[REQUEST {request_id}] Headers: {dict(request.headers)}")
try:
data = request.get_json()
logger.info(f"[REQUEST {request_id}] Body (JSON): {data}")
if not data or 'url' not in data:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не предоставлен в запросе")
return jsonify({'error': 'URL is required'}), 400
url = data['url']
format_id = data.get('format_id') # Опциональный параметр
logger.info(f"[REQUEST {request_id}] Получен запрос на скачивание (stream): {url}, format_id: {format_id}")
# Проверяем, что это YouTube URL
if 'youtube.com' not in url and 'youtu.be' not in url:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не является YouTube URL: {url}")
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
# Скачиваем видео
logger.info(f"[REQUEST {request_id}] Начинаем скачивание видео...")
video_path = download_youtube_video(url, format_id=format_id)
logger.info(f"[REQUEST {request_id}] Видео успешно скачано: {video_path}")
# Читаем файл и отправляем
file_size = video_path.stat().st_size
logger.info(f"[REQUEST {request_id}] Размер файла: {file_size} байт")
with open(video_path, 'rb') as f:
video_data = f.read()
# Безопасное имя файла без кириллицы для заголовка
safe_filename = video_path.name.encode('ascii', 'ignore').decode('ascii') or 'youtube_video.mp4'
if not safe_filename.endswith(('.mp4', '.webm', '.mkv')):
safe_filename = 'youtube_video.mp4'
# Определяем content-type
content_type = 'video/mp4'
if video_path.suffix == '.webm':
content_type = 'video/webm'
elif video_path.suffix == '.mkv':
content_type = 'video/x-matroska'
logger.info(f"[REQUEST {request_id}] Отправляем файл: {safe_filename}, Content-Type: {content_type}, размер: {len(video_data)} байт")
# Удаляем временный файл
video_path.unlink()
logger.info(f"[REQUEST {request_id}] Временный файл удален")
logger.info(f"[REQUEST {request_id}] ========== ЗАПРОС УСПЕШНО ЗАВЕРШЕН ==========")
return video_data, 200, {
'Content-Type': content_type,
'Content-Disposition': f'attachment; filename="{safe_filename}"'
}
except Exception as e:
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[REQUEST {request_id}] ========== ОШИБКА В ЗАПРОСЕ ==========")
logger.error(f"[REQUEST {request_id}] Ошибка при скачивании: {error_str}")
logger.error(f"[REQUEST {request_id}] Полный traceback:\n{traceback.format_exc()}")
# Улучшаем сообщение об ошибке, если проблема с cookies
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
logger.error(f"[REQUEST {request_id}] Обнаружена ошибка связанная с cookies!")
error_msg = (
f"{error_str}\n\n"
"💡 Совет: Cookies устарели или недействительны. "
"Обновите cookies, запустив скрипт:\n"
" ./youtube-downloader/get_youtube_cookies.sh\n"
"Затем перезапустите сервис."
)
else:
error_msg = error_str
logger.error(f"[REQUEST {request_id}] Возвращаем 500 ошибку клиенту")
logger.error(f"[REQUEST {request_id}] ========== КОНЕЦ ОБРАБОТКИ ОШИБКИ ==========")
return jsonify({'error': error_msg}), 500
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000)) # Внутренний порт контейнера
host = os.getenv('HOST', '0.0.0.0')
logger.info(f"Запуск YouTube Downloader сервиса на {host}:{port}")
app.run(host=host, port=port, debug=False)