videoDownloadTGbot/bot.py

468 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import json
import logging
import asyncio
from pathlib import Path
from urllib.parse import urlparse
import yt_dlp
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Токен бота и имя бота из переменных окружения
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot')
# Директория для временных файлов
DOWNLOADS_DIR = Path('video')
DOWNLOADS_DIR.mkdir(exist_ok=True)
# Файл для хранения статистики
STATS_FILE = Path('stats.json')
def load_stats() -> dict:
"""Загружает статистику из файла"""
if STATS_FILE.exists():
try:
with open(STATS_FILE, 'r', encoding='utf-8') as f:
stats = json.load(f)
# Обеспечиваем обратную совместимость
if 'users' not in stats:
stats['users'] = []
return stats
except Exception as e:
logger.error(f"Ошибка при загрузке статистики: {e}")
return {'total_downloads': 0, 'users': []}
return {'total_downloads': 0, 'users': []}
def save_stats(stats: dict):
"""Сохраняет статистику в файл"""
try:
with open(STATS_FILE, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Ошибка при сохранении статистики: {e}")
def increment_downloads():
"""Увеличивает счетчик скачанных видео"""
stats = load_stats()
stats['total_downloads'] = stats.get('total_downloads', 0) + 1
save_stats(stats)
logger.info(f"Общее количество скачанных видео: {stats['total_downloads']}")
def add_user(chat_id: int):
"""Добавляет пользователя в список уникальных пользователей"""
stats = load_stats()
users = stats.get('users', [])
# Преобразуем в set для уникальности, затем обратно в list
users_set = set(users)
if chat_id not in users_set:
users_set.add(chat_id)
stats['users'] = list(users_set)
save_stats(stats)
logger.info(f"Добавлен новый пользователь. Всего пользователей: {len(users_set)}")
def detect_video_source(url: str) -> str:
"""Определяет источник видео по URL"""
domain = urlparse(url).netloc.lower()
if 'youtube.com' in domain or 'youtu.be' in domain:
return 'youtube'
elif 'instagram.com' in domain:
return 'instagram'
elif 'vk.com' in domain or 'vkontakte.ru' in domain:
return 'vk'
else:
return 'unknown'
def _safe_filename(title: str, chat_id: int) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{chat_id}_{safe_title}.%(ext)s')
async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с YouTube"""
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
for attempt in range(max_retries):
try:
# Получаем информацию о видео
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
video_title = info.get('title', 'video')
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Скачиваем видео
ydl_opts_download = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': _safe_filename(video_title, chat_id),
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
logger.info(f"YouTube: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube")
async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Instagram - используем cookies с правильными заголовками"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
# Парсим cookies для получения csrf token (формат Netscape)
csrf_token = None
sessionid = None
if cookies_file_path.exists():
try:
with open(cookies_file_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
if len(parts) >= 7:
domain = parts[0]
# Ищем только cookies от instagram.com
if 'instagram' in domain.lower():
cookie_name = parts[5] # Имя cookie
cookie_value = parts[6] # Значение cookie
if cookie_name == 'csrftoken':
csrf_token = cookie_value
elif cookie_name == 'sessionid':
sessionid = cookie_value
# Если нашли оба - можно выходить
if csrf_token and sessionid:
break
except Exception as e:
logger.warning(f"Не удалось прочитать cookies: {e}")
last_error = None
for attempt in range(max_retries):
try:
# Базовые настройки
ydl_opts = {
'format': 'best',
'outtmpl': str(DOWNLOADS_DIR / f'{chat_id}_%(title)s.%(ext)s'),
'quiet': False,
'no_warnings': False,
'socket_timeout': 30,
}
# Если есть файл с cookies, используем его
if cookies_file_path.exists():
# Используем абсолютный путь к cookies
ydl_opts['cookiefile'] = str(cookies_file_path.absolute())
logger.info(f"Instagram: используем cookies из {cookies_file_path}")
# Добавляем заголовки с csrf token если есть
headers = {
'Referer': 'https://www.instagram.com/',
'X-Requested-With': 'XMLHttpRequest',
}
if csrf_token:
headers['X-CSRFToken'] = csrf_token
logger.info(f"Instagram: добавлен csrf token в заголовки")
if sessionid:
logger.info(f"Instagram: sessionid найден (длина: {len(sessionid)})")
ydl_opts['http_headers'] = headers
logger.info(f"Instagram: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram. Возможно, нужно обновить cookies.")
async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с VK"""
vk_user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
last_error = None
for attempt in range(max_retries):
try:
# Получаем информацию о видео
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': vk_user_agent,
'socket_timeout': 60, # Увеличенный таймаут для VK
'extractor_args': {
'vk': {},
},
'http_headers': {
'User-Agent': vk_user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.9',
'Referer': 'https://vk.com/',
'Connection': 'keep-alive',
},
# Пробуем использовать более надежные настройки SSL
'nocheckcertificate': False,
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info = ydl.extract_info(url, download=False)
video_title = info.get('title', 'vk_video')
logger.info(f"VK: получена информация о видео: {video_title}")
# Скачиваем видео
ydl_opts_download = {
'format': 'best',
'outtmpl': _safe_filename(video_title, chat_id),
'quiet': False,
'no_warnings': False,
'user_agent': vk_user_agent,
'socket_timeout': 60, # Увеличенный таймаут для VK
'extractor_args': {
'vk': {},
},
'http_headers': {
'User-Agent': vk_user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.9',
'Referer': 'https://vk.com/',
},
}
logger.info(f"VK: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: ydl.download([url]))
# Находим скачанный файл
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
else:
raise Exception("Файл не был найден после скачивания")
except Exception as e:
last_error = e
logger.warning(f"VK: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с VK")
async def download_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Главная функция скачивания - вызывает нужную функцию в зависимости от источника"""
source = detect_video_source(url)
logger.info(f"Определен источник: {source} для URL: {url}")
if source == 'youtube':
return await download_youtube_video(url, chat_id, max_retries)
elif source == 'instagram':
return await download_instagram_video(url, chat_id, max_retries)
elif source == 'vk':
return await download_vk_video(url, chat_id, max_retries)
else:
raise Exception("Пардон, не умеем работать с этим источником")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает сообщения от пользователей"""
if not update.message or not update.message.text:
return
url = update.message.text.strip()
chat_id = update.message.chat_id
# Добавляем пользователя в статистику при первом взаимодействии
add_user(chat_id)
# Проверяем, является ли сообщение URL
if not (url.startswith('http://') or url.startswith('https://')):
await update.message.reply_text(
"Пожалуйста, отправьте ссылку на видео.\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n\n"
"Для других источников: Пардон, не умеем 😅"
)
return
# Проверяем источник до начала обработки
source = detect_video_source(url)
if source == 'unknown':
await update.message.reply_text("Пардон, не умеем работать с этим источником 😅")
return
# Отправляем сообщение о начале обработки
status_message = await update.message.reply_text("🔍 Обрабатываю ссылку...")
try:
# Скачиваем видео
await status_message.edit_text("⬇️ Скачиваю видео...")
video_path = await download_video(url, chat_id)
# Отправляем файл пользователю
await status_message.edit_text("📤 Отправляю видео...")
video_file = open(video_path, 'rb')
caption = f"Видео скачано с @{TELEGRAM_BOT_USERNAME}"
await update.message.reply_video(
video=video_file,
caption=caption,
supports_streaming=True
)
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
# Удаляем временный файл
try:
os.remove(video_path)
logger.info(f"Удален временный файл: {video_path}")
except Exception as e:
logger.warning(f"Не удалось удалить файл {video_path}: {e}")
await status_message.delete()
except Exception as e:
logger.error(f"Ошибка: {e}")
error_msg = f"❌ Произошла ошибка при обработке видео:\n{str(e)}"
await status_message.edit_text(error_msg)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /start"""
# Добавляем пользователя в статистику
chat_id = update.message.chat_id
add_user(chat_id)
await update.message.reply_text(
"👋 Привет! Я бот для скачивания видео.\n\n"
"Просто отправь мне ссылку на видео, и я скачаю его для тебя.\n\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n\n"
"Команды:\n"
"/start - Начать работу\n"
"/stat - Статистика скачанных видео\n\n"
"Отправь ссылку на видео:"
)
async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /stat"""
stats = load_stats()
total_downloads = stats.get('total_downloads', 0)
users = stats.get('users', [])
total_users = len(users)
await update.message.reply_text(
f"📊 Статистика бота:\n\n"
f"👥 Всего пользователей: {total_users}\n"
f"📹 Всего скачано видео: {total_downloads}"
)
def main():
"""Главная функция для запуска бота"""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN не установлен!")
return
# Создаем приложение
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Регистрируем обработчики
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("stat", stat_command))
# Запускаем бота
logger.info("Бот запущен")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()