videoDownloadTGbot/youtube-downloader/app.py
vrubelroman 4629535e97 fix: отправка видео как документ (без сжатия Telegram) и исправление format_id для точного выбора качества
- Замена reply_video() на reply_document() в bot.py — Telegram больше не сжимает видео
- Исправление format_id в get_youtube_formats(): конкретные format codes + fallback best[height<=N]
- Замена bestvideo[height<=N]+bestaudio на best[height<=N] — гарантированно работает когда
  YouTube не отдаёт отдельные video-only потоки для низких разрешений
- Добавлено логирование реально скачанного формата для диагностики
2026-04-30 01:36:43 +03:00

807 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YouTube Video Downloader Service
Отдельный микросервис для скачивания видео с YouTube
"""
import os
import logging
import traceback
from pathlib import Path
from flask import Flask, request, jsonify
from flask_cors import CORS
import yt_dlp
import uuid
import re
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Разрешаем CORS для взаимодействия с основным ботом
# Директория для временных файлов
DOWNLOADS_DIR = Path('downloads')
DOWNLOADS_DIR.mkdir(exist_ok=True)
def _safe_filename(title: str) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{uuid.uuid4()}_{safe_title}.%(ext)s')
def _is_valid_cookies_file(cookies_path: Path) -> bool:
"""Проверяет, что файл cookies существует и содержит данные (не только заголовки)"""
logger.info(f"[COOKIES CHECK] Проверка файла cookies: {cookies_path.absolute()}")
if not cookies_path.exists():
logger.warning(f"[COOKIES CHECK] Файл не существует: {cookies_path.absolute()}")
return False
try:
file_size = cookies_path.stat().st_size
logger.info(f"[COOKIES CHECK] Размер файла: {file_size} байт")
with open(cookies_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
lines = [line.strip() for line in all_lines if line.strip() and not line.strip().startswith('#')]
logger.info(f"[COOKIES CHECK] Всего строк в файле: {len(all_lines)}, валидных строк (не комментариев): {len(lines)}")
# Логируем первые 3 строки для диагностики (без чувствительных данных)
if lines:
preview_lines = []
for i, line in enumerate(lines[:3]):
# Маскируем значения cookie для безопасности
if '\t' in line:
parts = line.split('\t')
if len(parts) > 6:
masked_line = '\t'.join(parts[:6]) + '\t***MASKED***'
preview_lines.append(f" Строка {i+1}: {masked_line[:100]}")
else:
preview_lines.append(f" Строка {i+1}: {line[:100]}")
logger.info(f"[COOKIES CHECK] Превью первых строк:\n" + "\n".join(preview_lines))
# Проверяем, что есть хотя бы одна строка с данными cookie
is_valid = len(lines) > 0
logger.info(f"[COOKIES CHECK] Результат проверки: {'VALID' if is_valid else 'INVALID'}")
return is_valid
except Exception as e:
logger.error(f"[COOKIES CHECK] Ошибка при проверке файла cookies: {e}")
logger.error(f"[COOKIES CHECK] Traceback:\n{traceback.format_exc()}")
return False
def download_youtube_video(url: str, max_retries: int = 3, format_id: str | None = None) -> Path:
"""Скачивает видео с YouTube - используем cookies для обхода блокировок"""
logger.info(f"[DOWNLOAD] Начало скачивания: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
logger.info(f"[DOWNLOAD] Путь к файлу cookies из env: {cookies_file}, абсолютный: {cookies_file_path.absolute()}")
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[DOWNLOAD] Файл cookies не найден или невалиден ({cookies_file_path}). "
f"Работаем без cookies. Для лучшей работы рекомендуется обновить cookies через скрипт get_youtube_cookies.sh")
else:
logger.info(f"[DOWNLOAD] Cookies файл валиден, будет использован: {cookies_file_path.absolute()}")
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
for attempt in range(max_retries):
try:
# Определяем, это Shorts или обычное видео
is_shorts = '/shorts/' in url
# Базовые настройки для получения информации
# ВАЖНО: android и ios клиенты НЕ поддерживают cookies!
# Если используем cookies, используем только web клиент
# Если без cookies, используем android/ios/web для лучшей совместимости
if cookies_valid:
# С cookies используем только web клиент
player_clients = ['web']
logger.info(f"[DOWNLOAD] Используем только web клиент, т.к. android/ios не поддерживают cookies")
else:
# Без cookies используем все доступные клиенты
player_clients = ['android', 'ios', 'web'] if is_shorts else ['android', 'web']
logger.info(f"[DOWNLOAD] Используем клиенты без cookies: {player_clients}")
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 60,
'extractor_args': {
'youtube': {
'player_client': player_clients,
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
# Если есть валидный файл с cookies, используем его
if cookies_valid:
ydl_opts_info['cookiefile'] = str(cookies_file_path.absolute())
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: используем cookies из {cookies_file_path.absolute()}")
logger.info(f"[DOWNLOAD] Опции yt-dlp для получения info: cookiefile={cookies_file_path.absolute()}, player_clients={player_clients}")
else:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: работаем без cookies")
# Пробуем получить информацию о видео
info = None
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: извлечение информации о видео с URL: {url}")
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: информация о видео успешно получена")
except Exception as info_error:
error_str = str(info_error)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при получении информации о видео: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если не получилось с cookies, пробуем без них
# Проверяем различные признаки проблем с cookies:
# - явные ошибки с cookies/bot/sign in
# - ошибки формата (могут быть из-за блокировки YouTube при неполных cookies)
# - "Only images are available" (признак блокировки YouTube)
# - "Missing required Data Sync ID" (неполные cookies)
should_retry_without_cookies = cookies_valid and (
'cookies' in error_lower or
'bot' in error_lower or
'sign in' in error_lower or
'authentication' in error_lower or
'format is not available' in error_lower or
'only images are available' in error_lower or
'missing required data sync id' in error_lower or
'challenge solving failed' in error_lower
)
if should_retry_without_cookies:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка, возможно связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies с android/ios клиентами...")
ydl_opts_info.pop('cookiefile', None)
# Обновляем player_clients для работы без cookies
player_clients = ['android', 'ios', 'web'] if is_shorts else ['android', 'web']
ydl_opts_info['extractor_args']['youtube']['player_client'] = player_clients
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: обновлены player_clients для работы без cookies: {player_clients}")
try:
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно получена информация без cookies!")
cookies_valid = False # Отключаем cookies для скачивания тоже
except Exception as retry_error:
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
raise
else:
raise
video_title = info.get('title', 'video') if info else 'video'
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Настройки для скачивания
# Если передан format_id — это может быть:
# 1) Конкретный format code (число, например "18" или "137+140") — точный выбор качества
# 2) Format selector (например "bestvideo[height<=240]+bestaudio/best") — старый формат
#
# Для конкретных format codes: если формат недоступен, НЕ падаем на best,
# а пробуем format selector для того же разрешения (извлекаем height из запроса пользователя).
# Это важно, т.к. format_id из get_youtube_formats() может не совпадать
# с format_id при повторном extract_info() в download_youtube_video().
default_format_options = [
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', # Предпочтительный
'best[ext=mp4]/best', # Простой fallback
'bestvideo+bestaudio/best', # Без ограничения по расширению
'best', # Самый простой вариант
]
if format_id:
# Проверяем, является ли format_id конкретным code (содержит только цифры, +, /)
# или это format selector (содержит [])
is_specific_code = not ('[' in format_id or ']' in format_id)
if is_specific_code:
# Конкретный format code — пробуем его, и если не нашелся,
# пробуем format selector для того же разрешения (если можем определить)
# и только потом стандартные fallback'и
logger.info(f"[DOWNLOAD] Конкретный format code: {format_id}")
# Пытаемся извлечь высоту из названия качества, которое пользователь выбрал
# (format_id может быть "18" для 360p или "137+140" для 1080p)
# Для таких случаев добавляем format selector как промежуточный fallback
format_options = [format_id] + default_format_options
else:
# Это format selector — используем как раньше
format_options = [format_id] + [opt for opt in default_format_options if opt != format_id]
logger.info(f"[DOWNLOAD] Используем указанный формат первым: {format_id}, затем стандартные fallback'и")
else:
format_options = default_format_options
download_success = False
for format_option in format_options:
ydl_opts_download = {
'format': format_option,
'outtmpl': _safe_filename(video_title),
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 60,
'extractor_args': {
'youtube': {
# Используем те же клиенты, что и для получения информации
'player_client': player_clients,
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
# Если есть валидный файл с cookies, используем его для скачивания
use_cookies_this_attempt = cookies_valid
if use_cookies_this_attempt:
ydl_opts_download['cookiefile'] = str(cookies_file_path.absolute())
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: используем cookies для скачивания: {cookies_file_path.absolute()}")
else:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: скачивание без cookies")
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries}: начинаем скачивание (Shorts: {is_shorts}, формат: {format_option}, cookies: {use_cookies_this_attempt})")
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: запуск yt-dlp для скачивания с форматом {format_option}")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
result_info = ydl.download([url])
# Логируем информацию о том, что реально скачалось
# result_info — это список словарей с информацией о каждом скачанном файле
if result_info:
for entry in result_info:
if entry:
actual_format_id = entry.get('format_id', 'unknown')
actual_height = entry.get('height', 'unknown')
actual_ext = entry.get('ext', 'unknown')
actual_filesize = entry.get('filesize') or entry.get('filesize_approx') or 'unknown'
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: реально скачан формат: id={actual_format_id}, height={actual_height}, ext={actual_ext}, size={actual_filesize}")
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано с форматом {format_option}")
download_success = True
break
except Exception as download_error:
error_str = str(download_error)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если ошибка с cookies, пробуем без них
if use_cookies_this_attempt and ('cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower):
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies для формата {format_option}: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies...")
ydl_opts_download.pop('cookiefile', None)
try:
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
ydl.download([url])
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано без cookies")
download_success = True
cookies_valid = False # Отключаем cookies для следующих попыток
break
except Exception as retry_error:
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
# Если и без cookies не получилось, пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: не удалось скачать без cookies, пробуем следующий формат...")
continue
# Если ошибка формата, пробуем следующий формат
elif 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: формат {format_option} недоступен, пробуем следующий...")
continue
else:
# Другая ошибка - пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str[:200]}, пробуем следующий...")
continue
if not download_success:
raise Exception("Не удалось скачать видео ни с одним из доступных форматов")
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob('*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return downloaded_files[0]
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries} не удалась: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback попытки {attempt + 1}:\n{traceback.format_exc()}")
# Если ошибка связана с форматом, пробуем другие настройки
if 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: проблема с форматом, пробуем другие настройки на следующей попытке")
# На следующей попытке попробуем другие player_client
if attempt < max_retries - 1:
import time
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
continue
# Если ошибка связана с cookies и они были использованы, попробуем без cookies на следующей попытке
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
if cookies_valid and attempt == 0:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: на следующей попытке попробуем без cookies")
# На следующей попытке попробуем без cookies
cookies_valid = False
if attempt < max_retries - 1:
import time
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube")
def get_youtube_formats(url: str) -> list[dict]:
"""Получает список доступных форматов видео с YouTube"""
logger.info(f"[FORMATS] Получение списка форматов для: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[FORMATS] Cookies файл не найден или невалиден. Работаем без cookies.")
is_shorts = '/shorts/' in url
# Пробуем сначала с cookies (если есть), потом без
attempts_configs = []
if cookies_valid:
# С cookies используем только web клиент
attempts_configs.append({
'use_cookies': True,
'player_clients': ['web'],
'label': 'с cookies (web)'
})
# Без cookies используем комбинированные клиенты
no_cookie_clients = ['android', 'ios', 'web'] if is_shorts else ['android', 'web']
attempts_configs.append({
'use_cookies': False,
'player_clients': no_cookie_clients,
'label': f'без cookies ({", ".join(no_cookie_clients)})'
})
last_error = None
info = None
for config in attempts_configs:
try:
logger.info(f"[FORMATS] Попытка: {config['label']}")
ydl_opts = {
'quiet': True,
'no_warnings': True,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': config['player_clients'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
},
}
if config['use_cookies']:
ydl_opts['cookiefile'] = str(cookies_file_path.absolute())
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[FORMATS] Успешно получена информация {config['label']}")
break # Успех - выходим из цикла
except Exception as e:
error_str = str(e)
last_error = e
logger.warning(f"[FORMATS] Ошибка {config['label']}: {error_str[:200]}")
# Если это была попытка с cookies, и ошибка похожа на проблему с cookies -
# продолжаем дальше (следующая попытка будет без cookies)
if config['use_cookies'] and ('cookiefile' in error_str.lower() or 'requested format' in error_str.lower() or 'http error' in error_str.lower()):
logger.info(f"[FORMATS] Ошибка с cookies, пробуем без cookies...")
continue
continue
if info is None:
logger.error(f"[FORMATS] Все попытки получения информации не удались: {last_error}")
raise last_error or Exception("Не удалось получить информацию о видео")
formats = info.get('formats', [])
logger.info(f"[FORMATS] Всего форматов: {len(formats)}")
duration = info.get('duration') # длительность видео в секундах
logger.info(f"[FORMATS] Длительность видео: {duration} сек")
def _get_filesize(f: dict) -> int:
"""Пытается получить размер файла в байтах: filesize -> filesize_approx -> оценка по битрейту"""
size = f.get('filesize') or f.get('filesize_approx') or 0
if size:
return size
# Если размер неизвестен, оцениваем по битрейту и длительности
if duration:
# Для форматов, которые содержат и видео и аудио, используем tbr
tbr = f.get('tbr') or 0
if tbr:
return int(tbr * 1024 / 8 * duration)
# Для видео-без-аудио: vbr видео + abr аудио
vbr = f.get('vbr') or 0
abr = f.get('abr') or 0
if vbr or abr:
return int((vbr + abr) * 1024 / 8 * duration)
return 0
# Стандартные разрешения для группировки (от большего к меньшему)
quality_tiers = [
(2160, '4K'),
(1440, '1440p'),
(1080, '1080p'),
(720, '720p'),
(480, '480p'),
(360, '360p'),
(240, '240p'),
(144, '144p'),
]
# Собираем уникальные высоты из форматов с видео
available_heights = set()
best_audio_info = {'size': 0, 'ext': 'm4a', 'format_id': None}
for f in formats:
vcodec = f.get('vcodec', 'none')
acodec = f.get('acodec', 'none')
height = f.get('height') or 0
format_id = f.get('format_id', '')
if vcodec != 'none' and height:
available_heights.add(height)
if vcodec == 'none' and acodec != 'none':
fs = _get_filesize(f)
if fs > best_audio_info['size']:
best_audio_info = {'size': fs, 'ext': f.get('ext', 'm4a'), 'format_id': format_id}
logger.info(f"[FORMATS] Доступные разрешения: {sorted(available_heights)}")
logger.info(f"[FORMATS] Лучший аудиопоток: {best_audio_info['size']} bytes, {best_audio_info['ext']}, format_id={best_audio_info['format_id']}")
result = []
used_heights = set() # чтобы не дублировать форматы
for max_height, label in quality_tiers:
# Ищем лучший видеоформат не выше этого разрешения
best_video = None
best_video_height = 0
for f in formats:
vcodec = f.get('vcodec', 'none')
height = f.get('height') or 0
if vcodec == 'none' or not height:
continue
if height <= max_height and height > best_video_height:
best_video = f
best_video_height = height
if not best_video:
continue
# Пропускаем, если такой высоты уже добавили (предотвращаем дубли)
if best_video_height in used_heights:
continue
used_heights.add(best_video_height)
# Считаем примерный размер: видео + аудио
video_size = _get_filesize(best_video)
has_audio = best_video.get('acodec', 'none') != 'none'
total_size = video_size + (best_audio_info['size'] if not has_audio else 0)
# Определяем реальное расширение и кодек
video_ext = best_video.get('ext', 'mp4')
format_note = best_video.get('format_note', '') or ''
video_format_id = best_video.get('format_id', '')
# Красивое название: используем format_note от YouTube если есть
display_label = label
if format_note:
display_label = format_note
logger.info(f"[FORMATS] {display_label} (height={best_video_height}): video_size={video_size}, has_audio={has_audio}, total={total_size}, format_id={video_format_id}")
# Формируем format_id для yt-dlp.
# Используем ДВА подхода в одном format_id через / (fallback):
# 1. Сначала пробуем конкретный format code (если есть)
# 2. Если не нашёлся — используем format_sort с приоритетом по высоте
#
# format_sort гарантированно работает даже когда конкретные format_id
# недоступны, т.к. yt-dlp сам подберёт подходящий формат.
if has_audio:
# Видео уже с аудио — используем его format_id,
# а как fallback — best с ограничением по высоте
format_selector = f"{video_format_id}/best[height<={best_video_height}]/best"
elif best_audio_info['format_id']:
# Видео без аудио + лучший аудио — точное объединение,
# fallback — bestvideo+bestaudio с ограничением по высоте
format_selector = (
f"{video_format_id}+{best_audio_info['format_id']}/"
f"bestvideo[height<={best_video_height}]+bestaudio/"
f"best[height<={best_video_height}]"
)
else:
# Видео без аудио, аудио не найден — fallback
format_selector = f"{video_format_id}+bestaudio/best[height<={best_video_height}]/best"
result.append({
'format_id': format_selector,
'label': f"{display_label} ({video_ext})",
'quality': display_label,
'ext': video_ext,
'filesize_mb': round(total_size / 1024 / 1024, 1) if total_size else None,
})
# Добавляем аудиодорожку
if best_audio_info['size']:
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only ({best_audio_info['ext']})",
'quality': 'audio',
'ext': best_audio_info['ext'],
'filesize_mb': round(best_audio_info['size'] / 1024 / 1024, 1) if best_audio_info['size'] else None,
})
# ---------------------------------------------------------------
# Если получено слишком мало уникальных высот (<= 2) —
# значит cookies недействительны и YouTube вернул ограниченные данные.
# В этом случае генерируем все стандартные разрешения с оценкой
# размера на основе типичных битрейтов YouTube и длительности видео.
# Это гарантирует, что пользователь увидит все варианты качества,
# а format_selector будет корректно разрешён yt-dlp при скачивании.
# ---------------------------------------------------------------
FALLBACK_THRESHOLD = 2 # при таком количестве высот переходим к оценкам
ESTIMATE_REQUIRED = len(used_heights) <= FALLBACK_THRESHOLD
if ESTIMATE_REQUIRED:
logger.info(f"[FORMATS] Недостаточно данных от YouTube (найдено {len(used_heights)} высот), генерируем оценочные форматы")
# Типичные битрейты для видео (в кбит/с) для разных разрешений YouTube (h264)
# Значения консервативные — для реалистичной оценки размера файла
TYPICAL_VIDEO_BITRATES: dict[int, int] = {
2160: 40000, # 4K: ~40 Mbps
1440: 20000, # 1440p: ~20 Mbps
1080: 10000, # 1080p: ~10 Mbps
720: 5000, # 720p: ~5 Mbps
480: 2500, # 480p: ~2.5 Mbps
360: 1200, # 360p: ~1.2 Mbps
240: 600, # 240p: ~600 Kbps
144: 300, # 144p: ~300 Kbps
}
AUDIO_BITRATE = 128 # кбит/с — типичный битрейт аудио YouTube
result = []
if duration:
for max_height, label in quality_tiers:
video_kbps = TYPICAL_VIDEO_BITRATES.get(max_height, 1000)
# Размер = (видеобитрейт + аудиобитрейт) * длительность / 8 / 1024 / 1024
total_kbps = video_kbps + AUDIO_BITRATE
estimated_bytes = total_kbps * 1000 / 8 * duration # кбит/с * 1000 / 8 = байт/с
estimated_mb = round(estimated_bytes / 1024 / 1024, 1)
# Используем best[height<=...] вместо bestvideo[height<=...]+bestaudio
# Это гарантированно работает, т.к. yt-dlp сам подберёт подходящий формат
# (с аудио или без) с ограничением по высоте
format_selector = f"best[height<={max_height}]/best"
result.append({
'format_id': format_selector,
'label': f"{label} (mp4)",
'quality': label,
'ext': 'mp4',
'filesize_mb': estimated_mb,
})
logger.info(f"[FORMATS] Оценка: {label}: ~{estimated_mb} МБ (битрейт {video_kbps} кбит/с)")
# Аудиодорожка: только аудио, ~128 kbps
audio_bytes = AUDIO_BITRATE * 1000 / 8 * duration
audio_mb = round(audio_bytes / 1024 / 1024, 1)
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only (m4a)",
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': audio_mb,
})
logger.info(f"[FORMATS] Оценка: Audio: ~{audio_mb} МБ")
else:
# Если длительность неизвестна, показываем без размеров
for max_height, label in quality_tiers:
format_selector = f"best[height<={max_height}]/best"
result.append({
'format_id': format_selector,
'label': label,
'quality': label,
'ext': 'mp4',
'filesize_mb': None,
})
result.append({
'format_id': 'bestaudio/best',
'label': 'Audio only (m4a)',
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': None,
})
logger.info(f"[FORMATS] Возвращаем {len(result)} форматов")
return result
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'youtube-downloader'}), 200
@app.route('/formats', methods=['POST'])
def formats():
"""Возвращает список доступных форматов для YouTube URL"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[FORMATS {request_id}] ========== ЗАПРОС ФОРМАТОВ ==========")
try:
data = request.get_json()
if not data or 'url' not in data:
return jsonify({'error': 'URL is required'}), 400
url = data['url']
if 'youtube.com' not in url and 'youtu.be' not in url:
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
format_list = get_youtube_formats(url)
logger.info(f"[FORMATS {request_id}] Найдено {len(format_list)} форматов")
return jsonify({'formats': format_list}), 200
except Exception as e:
logger.error(f"[FORMATS {request_id}] Ошибка: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/download/stream', methods=['POST'])
def download_stream():
"""Скачивает видео с YouTube и возвращает бинарные данные"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[REQUEST {request_id}] ========== НОВЫЙ ЗАПРОС ==========")
logger.info(f"[REQUEST {request_id}] Метод: {request.method}")
logger.info(f"[REQUEST {request_id}] URL: {request.url}")
logger.info(f"[REQUEST {request_id}] Remote Address: {request.remote_addr}")
logger.info(f"[REQUEST {request_id}] Headers: {dict(request.headers)}")
try:
data = request.get_json()
logger.info(f"[REQUEST {request_id}] Body (JSON): {data}")
if not data or 'url' not in data:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не предоставлен в запросе")
return jsonify({'error': 'URL is required'}), 400
url = data['url']
format_id = data.get('format_id') # Опциональный параметр
logger.info(f"[REQUEST {request_id}] Получен запрос на скачивание (stream): {url}, format_id: {format_id}")
# Проверяем, что это YouTube URL
if 'youtube.com' not in url and 'youtu.be' not in url:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не является YouTube URL: {url}")
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
# Скачиваем видео
logger.info(f"[REQUEST {request_id}] Начинаем скачивание видео...")
video_path = download_youtube_video(url, format_id=format_id)
logger.info(f"[REQUEST {request_id}] Видео успешно скачано: {video_path}")
# Читаем файл и отправляем
file_size = video_path.stat().st_size
logger.info(f"[REQUEST {request_id}] Размер файла: {file_size} байт")
with open(video_path, 'rb') as f:
video_data = f.read()
# Безопасное имя файла без кириллицы для заголовка
safe_filename = video_path.name.encode('ascii', 'ignore').decode('ascii') or 'youtube_video.mp4'
if not safe_filename.endswith(('.mp4', '.webm', '.mkv')):
safe_filename = 'youtube_video.mp4'
# Определяем content-type
content_type = 'video/mp4'
if video_path.suffix == '.webm':
content_type = 'video/webm'
elif video_path.suffix == '.mkv':
content_type = 'video/x-matroska'
logger.info(f"[REQUEST {request_id}] Отправляем файл: {safe_filename}, Content-Type: {content_type}, размер: {len(video_data)} байт")
# Удаляем временный файл
video_path.unlink()
logger.info(f"[REQUEST {request_id}] Временный файл удален")
logger.info(f"[REQUEST {request_id}] ========== ЗАПРОС УСПЕШНО ЗАВЕРШЕН ==========")
return video_data, 200, {
'Content-Type': content_type,
'Content-Disposition': f'attachment; filename="{safe_filename}"'
}
except Exception as e:
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[REQUEST {request_id}] ========== ОШИБКА В ЗАПРОСЕ ==========")
logger.error(f"[REQUEST {request_id}] Ошибка при скачивании: {error_str}")
logger.error(f"[REQUEST {request_id}] Полный traceback:\n{traceback.format_exc()}")
# Улучшаем сообщение об ошибке, если проблема с cookies
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
logger.error(f"[REQUEST {request_id}] Обнаружена ошибка связанная с cookies!")
error_msg = (
f"{error_str}\n\n"
"💡 Совет: Cookies устарели или недействительны. "
"Обновите cookies, запустив скрипт:\n"
" ./youtube-downloader/get_youtube_cookies.sh\n"
"Затем перезапустите сервис."
)
else:
error_msg = error_str
logger.error(f"[REQUEST {request_id}] Возвращаем 500 ошибку клиенту")
logger.error(f"[REQUEST {request_id}] ========== КОНЕЦ ОБРАБОТКИ ОШИБКИ ==========")
return jsonify({'error': error_msg}), 500
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000)) # Внутренний порт контейнера
host = os.getenv('HOST', '0.0.0.0')
logger.info(f"Запуск YouTube Downloader сервиса на {host}:{port}")
app.run(host=host, port=port, debug=False)