videoDownloadTGbot/bot.py

440 lines
19 KiB
Python
Raw Normal View History

import os
import re
import json
import logging
import asyncio
from pathlib import Path
from urllib.parse import urlparse
import yt_dlp
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Токен бота из переменной окружения
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Директория для временных файлов
DOWNLOADS_DIR = Path('video')
DOWNLOADS_DIR.mkdir(exist_ok=True)
# Файл для хранения статистики
STATS_FILE = Path('stats.json')
def load_stats() -> dict:
"""Загружает статистику из файла"""
if STATS_FILE.exists():
try:
with open(STATS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Ошибка при загрузке статистики: {e}")
return {'total_downloads': 0}
return {'total_downloads': 0}
def save_stats(stats: dict):
"""Сохраняет статистику в файл"""
try:
with open(STATS_FILE, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Ошибка при сохранении статистики: {e}")
def increment_downloads():
"""Увеличивает счетчик скачанных видео"""
stats = load_stats()
stats['total_downloads'] = stats.get('total_downloads', 0) + 1
save_stats(stats)
logger.info(f"Общее количество скачанных видео: {stats['total_downloads']}")
def detect_video_source(url: str) -> str:
"""Определяет источник видео по URL"""
domain = urlparse(url).netloc.lower()
if 'youtube.com' in domain or 'youtu.be' in domain:
return 'youtube'
elif 'instagram.com' in domain:
return 'instagram'
elif 'vk.com' in domain or 'vkontakte.ru' in domain:
return 'vk'
else:
return 'unknown'
def _safe_filename(title: str, chat_id: int) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{chat_id}_{safe_title}.%(ext)s')
async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с YouTube"""
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
for attempt in range(max_retries):
try:
# Получаем информацию о видео
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
video_title = info.get('title', 'video')
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Скачиваем видео
ydl_opts_download = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': _safe_filename(video_title, chat_id),
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
logger.info(f"YouTube: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube")
async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Instagram - используем cookies с правильными заголовками"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
# Парсим cookies для получения csrf token (формат Netscape)
csrf_token = None
sessionid = None
if cookies_file_path.exists():
try:
with open(cookies_file_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
if len(parts) >= 7:
domain = parts[0]
# Ищем только cookies от instagram.com
if 'instagram' in domain.lower():
cookie_name = parts[5] # Имя cookie
cookie_value = parts[6] # Значение cookie
if cookie_name == 'csrftoken':
csrf_token = cookie_value
elif cookie_name == 'sessionid':
sessionid = cookie_value
# Если нашли оба - можно выходить
if csrf_token and sessionid:
break
except Exception as e:
logger.warning(f"Не удалось прочитать cookies: {e}")
last_error = None
for attempt in range(max_retries):
try:
# Базовые настройки
ydl_opts = {
'format': 'best',
'outtmpl': str(DOWNLOADS_DIR / f'{chat_id}_%(title)s.%(ext)s'),
'quiet': False,
'no_warnings': False,
'socket_timeout': 30,
}
# Если есть файл с cookies, используем его
if cookies_file_path.exists():
# Используем абсолютный путь к cookies
ydl_opts['cookiefile'] = str(cookies_file_path.absolute())
logger.info(f"Instagram: используем cookies из {cookies_file_path}")
# Добавляем заголовки с csrf token если есть
headers = {
'Referer': 'https://www.instagram.com/',
'X-Requested-With': 'XMLHttpRequest',
}
if csrf_token:
headers['X-CSRFToken'] = csrf_token
logger.info(f"Instagram: добавлен csrf token в заголовки")
if sessionid:
logger.info(f"Instagram: sessionid найден (длина: {len(sessionid)})")
ydl_opts['http_headers'] = headers
logger.info(f"Instagram: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram. Возможно, нужно обновить cookies.")
async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с VK"""
vk_user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
for attempt in range(max_retries):
try:
# Получаем информацию о видео
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': vk_user_agent,
'socket_timeout': 60, # Увеличенный таймаут для VK
'extractor_args': {
'vk': {},
},
'http_headers': {
'User-Agent': vk_user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.9',
'Referer': 'https://vk.com/',
'Connection': 'keep-alive',
},
# Пробуем использовать более надежные настройки SSL
'nocheckcertificate': False,
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
video_title = info.get('title', 'vk_video')
logger.info(f"VK: получена информация о видео: {video_title}")
# Скачиваем видео
ydl_opts_download = {
'format': 'best',
'outtmpl': _safe_filename(video_title, chat_id),
'quiet': False,
'no_warnings': False,
'user_agent': vk_user_agent,
'socket_timeout': 60, # Увеличенный таймаут для VK
'extractor_args': {
'vk': {},
},
'http_headers': {
'User-Agent': vk_user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.9',
'Referer': 'https://vk.com/',
},
}
logger.info(f"VK: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"VK: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с VK")
async def download_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Главная функция скачивания - вызывает нужную функцию в зависимости от источника"""
source = detect_video_source(url)
logger.info(f"Определен источник: {source} для URL: {url}")
if source == 'youtube':
return await download_youtube_video(url, chat_id, max_retries)
elif source == 'instagram':
return await download_instagram_video(url, chat_id, max_retries)
elif source == 'vk':
return await download_vk_video(url, chat_id, max_retries)
else:
raise Exception("Пардон, не умеем работать с этим источником")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает сообщения от пользователей"""
if not update.message or not update.message.text:
return
url = update.message.text.strip()
chat_id = update.message.chat_id
# Проверяем, является ли сообщение URL
if not (url.startswith('http://') or url.startswith('https://')):
await update.message.reply_text(
"Пожалуйста, отправьте ссылку на видео.\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n\n"
"Для других источников: Пардон, не умеем 😅"
)
return
# Проверяем источник до начала обработки
source = detect_video_source(url)
if source == 'unknown':
await update.message.reply_text("Пардон, не умеем работать с этим источником 😅")
return
# Отправляем сообщение о начале обработки
status_message = await update.message.reply_text("🔍 Обрабатываю ссылку...")
try:
# Скачиваем видео
await status_message.edit_text("⬇️ Скачиваю видео...")
video_path = await download_video(url, chat_id)
# Отправляем файл пользователю
await status_message.edit_text("📤 Отправляю видео...")
video_file = open(video_path, 'rb')
await update.message.reply_video(
video=video_file,
caption="✅ Видео готово!",
supports_streaming=True
)
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
# Удаляем временный файл
try:
os.remove(video_path)
logger.info(f"Удален временный файл: {video_path}")
except Exception as e:
logger.warning(f"Не удалось удалить файл {video_path}: {e}")
await status_message.delete()
except Exception as e:
logger.error(f"Ошибка: {e}")
error_msg = f"❌ Произошла ошибка при обработке видео:\n{str(e)}"
await status_message.edit_text(error_msg)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /start"""
await update.message.reply_text(
"👋 Привет! Я бот для скачивания видео.\n\n"
"Просто отправь мне ссылку на видео, и я скачаю его для тебя.\n\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n\n"
"Команды:\n"
"/start - Начать работу\n"
"/stat - Статистика скачанных видео\n\n"
"Отправь ссылку на видео:"
)
async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /stat"""
stats = load_stats()
total = stats.get('total_downloads', 0)
await update.message.reply_text(
f"📊 Статистика скачанных видео:\n\n"
f"Всего скачано: {total} видео"
)
def main():
"""Главная функция для запуска бота"""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN не установлен!")
return
# Создаем приложение
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Регистрируем обработчики
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("stat", stat_command))
# Запускаем бота
logger.info("Бот запущен")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()