videoDownloadTGbot/youtube-downloader/app.py

850 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YouTube Video Downloader Service
Отдельный микросервис для скачивания видео с YouTube
"""
import os
import time
import logging
import traceback
from pathlib import Path
from flask import Flask, request, jsonify
from flask_cors import CORS
import yt_dlp
import uuid
import re
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Разрешаем CORS для взаимодействия с основным ботом
# Директория для временных файлов
DOWNLOADS_DIR = Path('downloads')
DOWNLOADS_DIR.mkdir(exist_ok=True)
def _safe_filename(title: str) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{uuid.uuid4()}_{safe_title}.%(ext)s')
def _cleanup_downloads():
"""Удаляет все файлы из папки загрузок"""
for f in DOWNLOADS_DIR.glob('*'):
try:
f.unlink()
except Exception:
pass
def _is_valid_cookies_file(cookies_path: Path) -> bool:
"""Проверяет, что файл cookies существует и содержит данные (не только заголовки)"""
logger.info(f"[COOKIES CHECK] Проверка файла cookies: {cookies_path.absolute()}")
if not cookies_path.exists():
logger.warning(f"[COOKIES CHECK] Файл не существует: {cookies_path.absolute()}")
return False
try:
file_size = cookies_path.stat().st_size
logger.info(f"[COOKIES CHECK] Размер файла: {file_size} байт")
with open(cookies_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
lines = [line.strip() for line in all_lines if line.strip() and not line.strip().startswith('#')]
logger.info(f"[COOKIES CHECK] Всего строк в файле: {len(all_lines)}, валидных строк (не комментариев): {len(lines)}")
# Логируем первые 3 строки для диагностики (без чувствительных данных)
if lines:
preview_lines = []
for i, line in enumerate(lines[:3]):
# Маскируем значения cookie для безопасности
if '\t' in line:
parts = line.split('\t')
if len(parts) > 6:
masked_line = '\t'.join(parts[:6]) + '\t***MASKED***'
preview_lines.append(f" Строка {i+1}: {masked_line[:100]}")
else:
preview_lines.append(f" Строка {i+1}: {line[:100]}")
logger.info(f"[COOKIES CHECK] Превью первых строк:\n" + "\n".join(preview_lines))
# Проверяем, что есть хотя бы одна строка с данными cookie
is_valid = len(lines) > 0
logger.info(f"[COOKIES CHECK] Результат проверки: {'VALID' if is_valid else 'INVALID'}")
return is_valid
except Exception as e:
logger.error(f"[COOKIES CHECK] Ошибка при проверке файла cookies: {e}")
logger.error(f"[COOKIES CHECK] Traceback:\n{traceback.format_exc()}")
return False
def _parse_height(format_dict: dict) -> int:
"""Извлекает реальную высоту из формата: height/width/format_note"""
h = format_dict.get('height')
w = format_dict.get('width')
# Для вертикальных видео (Shorts) height и width могут быть перепутаны —
# берём меньшее значение как честный показатель разрешения
if h and w and isinstance(h, (int, float)) and isinstance(w, (int, float)):
real_h = min(int(h), int(w))
if real_h > 0:
return real_h
if h and isinstance(h, (int, float)) and h > 0:
return int(h)
# Если вообще нет размеров — парсим format_note (например "360p")
note = format_dict.get('format_note', '') or ''
match = re.search(r'(\d+)p', str(note))
if match:
return int(match.group(1))
return 0
def _extract_height_from_format_id(format_id: str) -> int | None:
"""Извлекает ограничение по высоте из format_id (например 'best[height<=360]' -> 360)"""
match = re.search(r'height<[=]?\s*(\d+)', format_id)
if match:
return int(match.group(1))
return None
def _make_base_ydl_opts(user_agent: str, cookies_file_path: Path | None = None) -> dict:
"""Формирует базовые опции yt-dlp, общие для info и download
Стратегия выбора player_client:
- Cookies есть → используем web клиенты (требуют n-challenge решения),
включаем js_runtimes + remote_components для решения n-challenge через Node.js.
- Cookies нет → используем android клиент (не поддерживает cookies,
но не требует n-challenge, хотя даёт меньше форматов).
"""
extractor_args = {}
cookies_available = cookies_file_path is not None and cookies_file_path.exists()
if not cookies_available:
# Без cookies используем android — он не требует n-challenge
extractor_args = {
'youtube': {
'player_client': ['android'],
'skip': ['translated_subs', 'hls'],
},
}
opts = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 60,
'extractor_retries': 3,
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://www.youtube.com/',
},
}
if extractor_args:
opts['extractor_args'] = extractor_args
if cookies_available:
opts['cookiefile'] = str(cookies_file_path.absolute())
# Включаем n-challenge решение через Node.js + EJS скрипт с GitHub
# yt-dlp скачает challenge solver скрипт из официального репозитория
opts['js_runtimes'] = {'node': {}}
opts['remote_components'] = ['ejs:github']
return opts
def download_youtube_video(url: str, max_retries: int = 3, format_id: str | None = None) -> Path:
"""Скачивает видео с YouTube - используем cookies для обхода блокировок"""
logger.info(f"[DOWNLOAD] Начало скачивания: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
logger.info(f"[DOWNLOAD] Путь к файлу cookies из env: {cookies_file}, абсолютный: {cookies_file_path.absolute()}")
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[DOWNLOAD] Файл cookies не найден или невалиден ({cookies_file_path}). "
f"Работаем без cookies. Для лучшей работы рекомендуется обновить cookies через скрипт get_youtube_cookies.sh")
else:
logger.info(f"[DOWNLOAD] Cookies файл валиден, будет использован: {cookies_file_path.absolute()}")
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
last_download_errors = [] # собираем ошибки по форматам для диагностики
for attempt in range(max_retries):
try:
is_shorts = '/shorts/' in url
# Базовые настройки для получения информации
ydl_opts_info = _make_base_ydl_opts(user_agent, cookies_file_path if cookies_valid else None)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: {'cookies включены' if cookies_valid else 'работаем без cookies'}")
# Пробуем получить информацию о видео
info = None
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: извлечение информации о видео с URL: {url}")
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: информация о видео успешно получена")
except Exception as info_error:
error_str = str(info_error)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при получении информации о видео: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если не получилось с cookies, пробуем без них
# Проверяем различные признаки проблем с cookies:
# - явные ошибки с cookies/bot/sign in
# - ошибки формата (могут быть из-за блокировки YouTube при неполных cookies)
# - "Only images are available" (признак блокировки YouTube)
# - "Missing required Data Sync ID" (неполные cookies)
should_retry_without_cookies = cookies_valid and (
'cookies' in error_lower or
'bot' in error_lower or
'sign in' in error_lower or
'authentication' in error_lower or
'format is not available' in error_lower or
'only images are available' in error_lower or
'missing required data sync id' in error_lower or
'challenge solving failed' in error_lower
)
if should_retry_without_cookies:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка, возможно связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies...")
ydl_opts_info.pop('cookiefile', None)
try:
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно получена информация без cookies!")
cookies_valid = False # Отключаем cookies для скачивания тоже
except Exception as retry_error:
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
raise
else:
raise
video_title = info.get('title', 'video') if info else 'video'
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Настройки для скачивания
# Если передан format_id — это может быть:
# 1) Конкретный format code (число, например "18" или "137+140") — точный выбор качества
# 2) Format selector (например "bestvideo[height<=240]+bestaudio/best") — старый формат
#
# Для конкретных format codes: если формат недоступен, НЕ падаем на best,
# а пробуем format selector для того же разрешения (извлекаем height из запроса пользователя).
# Это важно, т.к. format_id из get_youtube_formats() может не совпадать
# с format_id при повторном extract_info() в download_youtube_video().
default_format_options = [
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'best[ext=mp4]/best',
'bestvideo+bestaudio/best',
'best',
]
# Добавляем fallback на combined форматы (например 18), которые всегда доступны
combined_fallback = ['best[ext=mp4]/best', 'best']
requested_height = None # высота, запрошенная пользователем
if format_id:
is_specific_code = not ('[' in format_id or ']' in format_id)
requested_height = _extract_height_from_format_id(format_id)
format_options = [format_id]
if requested_height is not None:
if is_specific_code:
logger.info(f"[DOWNLOAD] Конкретный format code: {format_id}")
else:
logger.info(f"[DOWNLOAD] Format selector: {format_id}")
logger.info(f"[DOWNLOAD] Добавляем качество-сохраняющий fallback для height<={requested_height}")
format_options.append(f"bestvideo[height<={requested_height}]+bestaudio/best[height<={requested_height}]")
format_options.extend(default_format_options)
format_options.extend(combined_fallback)
else:
format_options.extend(default_format_options)
logger.info(f"[DOWNLOAD] Итоговый список format_options ({len(format_options)} шт.): {format_options}")
else:
format_options = default_format_options
download_success = False
for format_option in format_options:
ydl_opts_download = _make_base_ydl_opts(user_agent, cookies_file_path if cookies_valid else None)
ydl_opts_download.update({
'format': format_option,
'outtmpl': _safe_filename(video_title),
# fragment_retries — для DASH форматов (видео без аудио),
# YouTube может разрывать фрагменты; увеличиваем retries
'fragment_retries': 3,
# allow_unplayable_formats — позволяет скачивать форматы,
# которые YouTube помечает как "недоступные" для сторонних клиентов
'allow_unplayable_formats': True,
})
use_cookies_this_attempt = cookies_valid
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries}: начинаем скачивание (Shorts: {is_shorts}, формат: {format_option}, cookies: {use_cookies_this_attempt})")
try:
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: запуск yt-dlp для скачивания с форматом {format_option}")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
result_info = ydl.download([url])
# Логируем информацию о том, что реально скачалось
if result_info:
for entry in result_info:
if entry:
actual_format_id = entry.get('format_id', 'unknown')
actual_height = entry.get('height', 'unknown')
actual_ext = entry.get('ext', 'unknown')
actual_filesize = entry.get('filesize') or entry.get('filesize_approx') or 'unknown'
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: реально скачан формат: id={actual_format_id}, height={actual_height}, ext={actual_ext}, size={actual_filesize}")
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано с форматом {format_option}")
download_success = True
break
except Exception as download_error:
error_str = str(download_error)
error_lower = error_str.lower()
last_download_errors.append(f"[{format_option}] {error_str[:300]}")
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback:\n{traceback.format_exc()}")
# Если ошибка с cookies, пробуем без них
if use_cookies_this_attempt and ('cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower):
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies для формата {format_option}: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: пробуем без cookies...")
# Пересоздаём opts без cookies
ydl_opts_download_no_cookies = _make_base_ydl_opts(user_agent, None)
ydl_opts_download_no_cookies.update({
'format': format_option,
'outtmpl': _safe_filename(video_title),
'fragment_retries': 3,
'allow_unplayable_formats': True,
})
try:
with yt_dlp.YoutubeDL(ydl_opts_download_no_cookies) as ydl:
ydl.download([url])
logger.info(f"[DOWNLOAD] Попытка {attempt + 1}: успешно скачано без cookies")
download_success = True
cookies_valid = False # Отключаем cookies для следующих попыток
break
except Exception as retry_error:
retry_str = str(retry_error)
last_download_errors.append(f"[{format_option} без cookies] {retry_str[:300]}")
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка даже без cookies: {retry_error}")
logger.error(f"[DOWNLOAD] Полный traceback без cookies:\n{traceback.format_exc()}")
# Если и без cookies не получилось, пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: не удалось скачать без cookies, пробуем следующий формат...")
continue
# Если ошибка формата, пробуем следующий формат
elif 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: формат {format_option} недоступен, пробуем следующий...")
continue
else:
# Другая ошибка - пробуем следующий формат
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: ошибка при скачивании формата {format_option}: {error_str[:200]}, пробуем следующий...")
continue
if not download_success:
# Собираем детальный отчёт об ошибках
errors_summary = "; ".join(last_download_errors[-10:]) # последние 10 ошибок
raise Exception(f"Не удалось скачать видео ни с одним из доступных форматов. Ошибки: {errors_summary}")
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob('*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return downloaded_files[0]
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[DOWNLOAD] Попытка {attempt + 1}/{max_retries} не удалась: {error_str}")
logger.error(f"[DOWNLOAD] Полный traceback попытки {attempt + 1}:\n{traceback.format_exc()}")
# Если ошибка связана с форматом, пробуем другие настройки
if 'format is not available' in error_lower or 'requested format' in error_lower:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: проблема с форматом, пробуем другие настройки на следующей попытке")
if attempt < max_retries - 1:
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
continue
# Если ошибка связана с cookies и они были использованы, попробуем без cookies на следующей попытке
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
if cookies_valid and attempt == 0:
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: обнаружена ошибка связанная с cookies: {error_str[:200]}")
logger.warning(f"[DOWNLOAD] Попытка {attempt + 1}: на следующей попытке попробуем без cookies")
cookies_valid = False
if attempt < max_retries - 1:
sleep_time = (attempt + 1) * 2
logger.info(f"[DOWNLOAD] Ожидание {sleep_time} секунд перед следующей попыткой...")
time.sleep(sleep_time)
# Включаем в итоговую ошибку сводку по форматам
errors_summary = "; ".join(last_download_errors[-10:]) if last_download_errors else ""
error_msg = str(last_error) if last_error else "Неизвестная ошибка при скачивании с YouTube"
if errors_summary:
error_msg += f" | Ошибки форматов: {errors_summary}"
raise Exception(error_msg)
def get_youtube_formats(url: str) -> list[dict]:
"""Получает список доступных форматов видео с YouTube"""
logger.info(f"[FORMATS] Получение списка форматов для: {url}")
cookies_file = os.getenv('YOUTUBE_COOKIES_FILE', 'youtube_cookies.txt')
cookies_file_path = Path(cookies_file)
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
cookies_valid = _is_valid_cookies_file(cookies_file_path)
if not cookies_valid:
logger.warning(f"[FORMATS] Cookies файл не найден или невалиден. Работаем без cookies.")
# Пробуем сначала с cookies (если есть), потом без
attempts_configs = []
if cookies_valid:
attempts_configs.append({
'use_cookies': True,
'label': 'с cookies'
})
attempts_configs.append({
'use_cookies': False,
'label': 'без cookies'
})
last_error = None
info = None
for config in attempts_configs:
try:
logger.info(f"[FORMATS] Попытка: {config['label']}")
# Для /formats используем те же улучшенные опции (player_client, retries и т.д.),
# но с quiet=True чтобы не засорять логи
ydl_opts = _make_base_ydl_opts(
user_agent,
cookies_file_path if config['use_cookies'] else None
)
ydl_opts['quiet'] = True
ydl_opts['no_warnings'] = True
ydl_opts['socket_timeout'] = 30
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
logger.info(f"[FORMATS] Успешно получена информация {config['label']}")
break # Успех - выходим из цикла
except Exception as e:
error_str = str(e)
last_error = e
logger.warning(f"[FORMATS] Ошибка {config['label']}: {error_str[:200]}")
# Если это была попытка с cookies, и ошибка похожа на проблему с cookies -
# продолжаем дальше (следующая попытка будет без cookies)
if config['use_cookies'] and (
'cookiefile' in error_str.lower()
or 'requested format' in error_str.lower()
or 'http error' in error_str.lower()
or 'only images are available' in error_str.lower()
or 'n challenge' in error_str.lower()
or 'challenge solving' in error_str.lower()
):
logger.info(f"[FORMATS] Ошибка с cookies, пробуем без cookies...")
continue
continue
if info is None:
logger.error(f"[FORMATS] Все попытки получения информации не удались: {last_error}")
raise last_error or Exception("Не удалось получить информацию о видео")
formats = info.get('formats', [])
logger.info(f"[FORMATS] Всего форматов: {len(formats)}")
duration = info.get('duration') # длительность видео в секундах
logger.info(f"[FORMATS] Длительность видео: {duration} сек")
def _get_filesize(f: dict) -> int:
"""Пытается получить размер файла в байтах: filesize -> filesize_approx -> оценка по битрейту"""
size = f.get('filesize') or f.get('filesize_approx') or 0
if size:
return size
# Если размер неизвестен, оцениваем по битрейту и длительности
if duration:
# Для форматов, которые содержат и видео и аудио, используем tbr
tbr = f.get('tbr') or 0
if tbr:
return int(tbr * 1024 / 8 * duration)
# Для видео-без-аудио: vbr видео + abr аудио
vbr = f.get('vbr') or 0
abr = f.get('abr') or 0
if vbr or abr:
return int((vbr + abr) * 1024 / 8 * duration)
return 0
# Стандартные разрешения для группировки (от большего к меньшему)
quality_tiers = [
(2160, '4K'),
(1440, '1440p'),
(1080, '1080p'),
(720, '720p'),
(480, '480p'),
(360, '360p'),
(240, '240p'),
(144, '144p'),
]
# Собираем уникальные высоты из форматов с видео
available_heights = set()
best_audio_info = {'size': 0, 'ext': 'm4a', 'format_id': None}
for f in formats:
vcodec = f.get('vcodec', 'none')
acodec = f.get('acodec', 'none')
height = _parse_height(f)
format_id = f.get('format_id', '')
if vcodec != 'none' and height > 0:
available_heights.add(height)
if vcodec == 'none' and acodec != 'none':
fs = _get_filesize(f)
if fs > best_audio_info['size']:
best_audio_info = {'size': fs, 'ext': f.get('ext', 'm4a'), 'format_id': format_id}
logger.info(f"[FORMATS] Доступные разрешения: {sorted(available_heights)}")
logger.info(f"[FORMATS] Лучший аудиопоток: {best_audio_info['size']} bytes, {best_audio_info['ext']}, format_id={best_audio_info['format_id']}")
result = []
used_heights = set()
# Определяем реальную максимальную высоту видео
max_actual_height = max(available_heights) if available_heights else 2160
for max_height, label in quality_tiers:
if max_height > max_actual_height:
continue # не показываем 4K для видео с макс высотой 1080p
best_video = None
best_video_height = 0
for f in formats:
vcodec = f.get('vcodec', 'none')
height = _parse_height(f)
if vcodec == 'none' or height <= 0:
continue
if height <= max_height and height > best_video_height:
best_video = f
best_video_height = height
if not best_video:
continue
if best_video_height in used_heights:
continue
used_heights.add(best_video_height)
video_size = _get_filesize(best_video)
has_audio = best_video.get('acodec', 'none') != 'none'
total_size = video_size + (best_audio_info['size'] if not has_audio else 0)
video_ext = best_video.get('ext', 'mp4')
video_format_id = best_video.get('format_id', '')
# Честный лейбл из реальной высоты
format_note = best_video.get('format_note', '') or ''
if format_note and str(best_video_height) in format_note:
display_label = format_note
else:
display_label = f"{best_video_height}p"
logger.info(f"[FORMATS] {display_label} (height={best_video_height}): video_size={video_size}, has_audio={has_audio}, total={total_size}, format_id={video_format_id}")
# format_selector без /best в конце — чтобы yt-dlp не молча скатывался на другой размер
if has_audio:
format_selector = f"{video_format_id}/bestvideo[height<={best_video_height}]+bestaudio/best[height<={best_video_height}]"
elif best_audio_info['format_id']:
format_selector = (
f"{video_format_id}+{best_audio_info['format_id']}/"
f"bestvideo[height<={best_video_height}]+bestaudio/"
f"best[height<={best_video_height}]"
)
else:
format_selector = f"{video_format_id}+bestaudio/bestvideo[height<={best_video_height}]+bestaudio/best[height<={best_video_height}]"
result.append({
'format_id': format_selector,
'label': f"{display_label} ({video_ext})",
'quality': display_label,
'ext': video_ext,
'filesize_mb': round(total_size / 1024 / 1024, 1) if total_size else None,
})
# Добавляем аудиодорожку
if best_audio_info['size']:
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only ({best_audio_info['ext']})",
'quality': 'audio',
'ext': best_audio_info['ext'],
'filesize_mb': round(best_audio_info['size'] / 1024 / 1024, 1) if best_audio_info['size'] else None,
})
# ---------------------------------------------------------------
# Если реальных форматов совсем нет — генерируем оценочные
# (бывает при очень плохих cookies, когда даже format_note пустой)
# ---------------------------------------------------------------
if len(result) == 0:
logger.info(f"[FORMATS] Реальных форматов не найдено, генерируем оценочные")
max_available_height = max(available_heights) if available_heights else 2160
available_tiers = [(h, l) for h, l in quality_tiers if h <= max_available_height]
TYPICAL_VIDEO_BITRATES: dict[int, int] = {
2160: 40000, 1440: 20000, 1080: 10000, 720: 5000,
480: 2500, 360: 1200, 240: 600, 144: 300,
}
AUDIO_BITRATE = 128
result = []
if duration:
for max_height, label in available_tiers:
video_kbps = TYPICAL_VIDEO_BITRATES.get(max_height, 1000)
total_kbps = video_kbps + AUDIO_BITRATE
estimated_bytes = total_kbps * 1000 / 8 * duration
estimated_mb = round(estimated_bytes / 1024 / 1024, 1)
format_selector = f"bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]"
result.append({
'format_id': format_selector,
'label': f"{label} (mp4)",
'quality': label,
'ext': 'mp4',
'filesize_mb': estimated_mb,
})
logger.info(f"[FORMATS] Оценка: {label}: ~{estimated_mb} МБ (битрейт {video_kbps} кбит/с)")
audio_bytes = AUDIO_BITRATE * 1000 / 8 * duration
audio_mb = round(audio_bytes / 1024 / 1024, 1)
result.append({
'format_id': 'bestaudio/best',
'label': f"Audio only (m4a)",
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': audio_mb,
})
else:
for max_height, label in available_tiers:
format_selector = f"bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]"
result.append({
'format_id': format_selector,
'label': label,
'quality': label,
'ext': 'mp4',
'filesize_mb': None,
})
result.append({
'format_id': 'bestaudio/best',
'label': 'Audio only (m4a)',
'quality': 'audio',
'ext': 'm4a',
'filesize_mb': None,
})
logger.info(f"[FORMATS] Возвращаем {len(result)} форматов")
return result
# Простой кэш форматов: {normalized_url: (timestamp, list_of_formats)}
# Форматы YouTube не меняются часто, кэшируем на 30 минут
_formats_cache: dict[str, tuple[float, list[dict]]] = {}
_FORMATS_CACHE_TTL = 30 * 60 # 30 минут в секундах
def _normalize_youtube_url(url: str) -> str:
"""Нормализует YouTube URL: убирает tracking параметры (?si=...),
приводит к единому виду для кэширования."""
import re
# Оставляем только video ID из youtu.be или youtube.com
# youtu.be/VIDEO_ID?si=... -> youtu.be/VIDEO_ID
m = re.search(r'(youtu\.be/|youtube\.com/watch\?v=)([a-zA-Z0-9_-]{11})', url)
if m:
prefix, video_id = m.group(1), m.group(2)
return f"https://www.youtube.com/watch?v={video_id}"
return url # если не распознали, кэшируем как есть
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'youtube-downloader'}), 200
@app.route('/formats', methods=['POST'])
def formats():
"""Возвращает список доступных форматов для YouTube URL"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[FORMATS {request_id}] ========== ЗАПРОС ФОРМАТОВ ==========")
try:
data = request.get_json()
if not data or 'url' not in data:
return jsonify({'error': 'URL is required'}), 400
url = data['url']
if 'youtube.com' not in url and 'youtu.be' not in url:
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
# Нормализуем URL и проверяем кэш
cache_key = _normalize_youtube_url(url)
now = time.time()
if cache_key in _formats_cache:
cached_time, cached_formats = _formats_cache[cache_key]
if now - cached_time < _FORMATS_CACHE_TTL:
logger.info(f"[FORMATS {request_id}] Возвращаем из кэша ({len(cached_formats)} форматов, возраст {now - cached_time:.0f}с)")
return jsonify({'formats': cached_formats}), 200
else:
logger.info(f"[FORMATS {request_id}] Кэш устарел ({now - cached_time:.0f}с > {_FORMATS_CACHE_TTL}с), обновляем...")
del _formats_cache[cache_key]
format_list = get_youtube_formats(url)
# Сохраняем в кэш
_formats_cache[cache_key] = (time.time(), format_list)
logger.info(f"[FORMATS {request_id}] Сохранено в кэш {len(format_list)} форматов для {cache_key}")
return jsonify({'formats': format_list}), 200
except Exception as e:
logger.error(f"[FORMATS {request_id}] Ошибка: {e}")
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/download/stream', methods=['POST'])
def download_stream():
"""Скачивает видео с YouTube и возвращает бинарные данные"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[REQUEST {request_id}] ========== НОВЫЙ ЗАПРОС ==========")
logger.info(f"[REQUEST {request_id}] Метод: {request.method}")
logger.info(f"[REQUEST {request_id}] URL: {request.url}")
logger.info(f"[REQUEST {request_id}] Remote Address: {request.remote_addr}")
logger.info(f"[REQUEST {request_id}] Headers: {dict(request.headers)}")
try:
data = request.get_json()
logger.info(f"[REQUEST {request_id}] Body (JSON): {data}")
if not data or 'url' not in data:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не предоставлен в запросе")
return jsonify({'error': 'URL is required'}), 400
url = data['url']
format_id = data.get('format_id') # Опциональный параметр
logger.info(f"[REQUEST {request_id}] Получен запрос на скачивание (stream): {url}, format_id: {format_id}")
# Проверяем, что это YouTube URL
if 'youtube.com' not in url and 'youtu.be' not in url:
logger.warning(f"[REQUEST {request_id}] Ошибка: URL не является YouTube URL: {url}")
return jsonify({'error': 'Only YouTube URLs are supported'}), 400
# Скачиваем видео
logger.info(f"[REQUEST {request_id}] Начинаем скачивание видео...")
video_path = download_youtube_video(url, format_id=format_id)
logger.info(f"[REQUEST {request_id}] Видео успешно скачано: {video_path}")
# Читаем файл и отправляем
file_size = video_path.stat().st_size
logger.info(f"[REQUEST {request_id}] Размер файла: {file_size} байт")
with open(video_path, 'rb') as f:
video_data = f.read()
# Безопасное имя файла без кириллицы для заголовка
safe_filename = video_path.name.encode('ascii', 'ignore').decode('ascii') or 'youtube_video.mp4'
if not safe_filename.endswith(('.mp4', '.webm', '.mkv')):
safe_filename = 'youtube_video.mp4'
# Определяем content-type
content_type = 'video/mp4'
if video_path.suffix == '.webm':
content_type = 'video/webm'
elif video_path.suffix == '.mkv':
content_type = 'video/x-matroska'
logger.info(f"[REQUEST {request_id}] Отправляем файл: {safe_filename}, Content-Type: {content_type}, размер: {len(video_data)} байт")
# Удаляем временный файл
video_path.unlink()
logger.info(f"[REQUEST {request_id}] Временный файл удален")
logger.info(f"[REQUEST {request_id}] ========== ЗАПРОС УСПЕШНО ЗАВЕРШЕН ==========")
return video_data, 200, {
'Content-Type': content_type,
'Content-Disposition': f'attachment; filename="{safe_filename}"'
}
except Exception as e:
error_str = str(e)
error_lower = error_str.lower()
logger.error(f"[REQUEST {request_id}] ========== ОШИБКА В ЗАПРОСЕ ==========")
logger.error(f"[REQUEST {request_id}] Ошибка при скачивании: {error_str}")
logger.error(f"[REQUEST {request_id}] Полный traceback:\n{traceback.format_exc()}")
# Улучшаем сообщение об ошибке, если проблема с cookies
if 'cookies' in error_lower or 'bot' in error_lower or 'sign in' in error_lower or 'authentication' in error_lower:
logger.error(f"[REQUEST {request_id}] Обнаружена ошибка связанная с cookies!")
error_msg = (
f"{error_str}\n\n"
"💡 Совет: Cookies устарели или недействительны. "
"Обновите cookies, запустив скрипт:\n"
" ./youtube-downloader/get_youtube_cookies.sh\n"
"Затем перезапустите сервис."
)
else:
error_msg = error_str
logger.error(f"[REQUEST {request_id}] Возвращаем 500 ошибку клиенту")
logger.error(f"[REQUEST {request_id}] ========== КОНЕЦ ОБРАБОТКИ ОШИБКИ ==========")
return jsonify({'error': error_msg}), 500
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000)) # Внутренний порт контейнера
host = os.getenv('HOST', '0.0.0.0')
logger.info(f"Запуск YouTube Downloader сервиса на {host}:{port}")
app.run(host=host, port=port, debug=False)