import os import re import json import logging import asyncio import sqlite3 import time import subprocess from pathlib import Path from urllib.parse import urlparse from datetime import datetime import yt_dlp import httpx from telegram import Update from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler # Настройка логирования logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Токен бота и имя бота из переменных окружения TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot') # URL VK сервиса для скачивания видео VK_DOWNLOADER_URL = os.getenv('VK_DOWNLOADER_URL', 'http://localhost:5555') # Базовая директория проекта (абсолютный путь), чтобы не зависеть от рабочей директории процесса BASE_DIR = Path(__file__).resolve().parent # Директория для временных файлов DOWNLOADS_DIR = BASE_DIR / 'video' DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) # База данных (внутри папки data) DATA_DIR = BASE_DIR / 'data' DATA_DIR.mkdir(parents=True, exist_ok=True) DB_FILE = DATA_DIR / 'bot.db' def init_database(): """Инициализирует базу данных и создает таблицы если их нет""" try: conn = sqlite3.connect(str(DB_FILE)) cursor = conn.cursor() # Таблица пользователей cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( chat_id INTEGER PRIMARY KEY, username TEXT, first_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL ) ''') # Таблица статистики cursor.execute(''' CREATE TABLE IF NOT EXISTS stats ( id INTEGER PRIMARY KEY CHECK (id = 1), total_downloads INTEGER DEFAULT 0 ) ''') # Инициализируем stats если его нет cursor.execute('SELECT COUNT(*) FROM stats') if cursor.fetchone()[0] == 0: cursor.execute('INSERT INTO stats (id, total_downloads) VALUES (1, 0)') conn.commit() conn.close() logger.info("База данных инициализирована") except Exception as e: logger.error(f"Ошибка при инициализации базы данных: {e}") def get_total_downloads() -> int: """Возвращает общее количество скачанных видео""" try: conn = sqlite3.connect(str(DB_FILE)) cursor = conn.cursor() cursor.execute('SELECT total_downloads FROM stats WHERE id = 1') result = cursor.fetchone() conn.close() return result[0] if result else 0 except Exception as e: logger.error(f"Ошибка при получении количества скачанных видео: {e}") return 0 def increment_downloads(): """Увеличивает счетчик скачанных видео""" try: conn = sqlite3.connect(str(DB_FILE)) cursor = conn.cursor() cursor.execute('UPDATE stats SET total_downloads = total_downloads + 1 WHERE id = 1') conn.commit() new_total = get_total_downloads() conn.close() logger.info(f"Общее количество скачанных видео: {new_total}") except Exception as e: logger.error(f"Ошибка при увеличении счетчика скачанных видео: {e}") def get_total_users() -> int: """Возвращает общее количество уникальных пользователей""" try: conn = sqlite3.connect(str(DB_FILE)) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM users') result = cursor.fetchone() conn.close() return result[0] if result else 0 except Exception as e: logger.error(f"Ошибка при получении количества пользователей: {e}") return 0 def add_user(chat_id: int, username: str = None, first_name: str = None): """Добавляет пользователя в базу данных или обновляет информацию о нем""" try: now = datetime.now().isoformat() conn = sqlite3.connect(str(DB_FILE)) cursor = conn.cursor() # Проверяем, существует ли пользователь cursor.execute('SELECT chat_id FROM users WHERE chat_id = ?', (chat_id,)) exists = cursor.fetchone() if exists: # Обновляем last_seen cursor.execute( 'UPDATE users SET last_seen = ?, username = ?, first_name = ? WHERE chat_id = ?', (now, username, first_name, chat_id) ) else: # Добавляем нового пользователя cursor.execute( 'INSERT INTO users (chat_id, username, first_name, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)', (chat_id, username, first_name, now, now) ) total_users = get_total_users() logger.info(f"Добавлен новый пользователь (chat_id: {chat_id}). Всего пользователей: {total_users}") conn.commit() conn.close() except Exception as e: logger.error(f"Ошибка при добавлении пользователя: {e}") def detect_video_source(url: str) -> str: """Определяет источник видео по URL""" domain = urlparse(url).netloc.lower() if 'youtube.com' in domain or 'youtu.be' in domain: return 'youtube' elif 'instagram.com' in domain: return 'instagram' elif 'vk.com' in domain or 'vkontakte.ru' in domain: return 'vk' else: return 'unknown' def _safe_filename(title: str, chat_id: int) -> str: """Создает безопасное имя файла""" safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100] return str(DOWNLOADS_DIR / f'{chat_id}_{safe_title}.%(ext)s') async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -> str: """Скачивает видео с YouTube""" user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' last_error = None for attempt in range(max_retries): try: # Получаем информацию о видео ydl_opts_info = { 'quiet': False, 'no_warnings': False, 'user_agent': user_agent, 'socket_timeout': 30, 'extractor_args': { 'youtube': { 'player_client': ['android', 'web'], 'player_skip': ['webpage'], }, }, 'http_headers': { 'User-Agent': user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', }, } with yt_dlp.YoutubeDL(ydl_opts_info) as ydl: info = ydl.extract_info(url, download=False) video_title = info.get('title', 'video') logger.info(f"YouTube: получена информация о видео: {video_title}") # Скачиваем видео ydl_opts_download = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': _safe_filename(video_title, chat_id), 'quiet': False, 'no_warnings': False, 'user_agent': user_agent, 'socket_timeout': 30, 'extractor_args': { 'youtube': { 'player_client': ['android', 'web'], 'player_skip': ['webpage'], }, }, 'http_headers': { 'User-Agent': user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', }, } logger.info(f"YouTube: начинаем скачивание (попытка {attempt + 1}/{max_retries})") with yt_dlp.YoutubeDL(ydl_opts_download) as ydl: loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: ydl.download([url])) # Находим скачанный файл downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*')) if downloaded_files: downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) return str(downloaded_files[0]) else: raise Exception("Файл не был найден после скачивания") except Exception as e: last_error = e logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}") if attempt < max_retries - 1: await asyncio.sleep((attempt + 1) * 2) raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube") async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str: """Скачивает видео с Instagram - используем cookies с правильными заголовками""" cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt') cookies_file_path = Path(cookies_file) # Проверяем срок действия cookies перед использованием if cookies_file_path.exists(): is_valid, days_left = check_instagram_cookies_expiry() if not is_valid: logger.error("Instagram cookies истекли! Необходимо обновить cookies.") raise Exception("Instagram cookies истекли. Пожалуйста, обновите cookies в файле instagram_cookies.txt") elif days_left < 7: logger.warning(f"Instagram cookies истекают через {days_left} дней. Рекомендуется обновить.") # Парсим cookies для получения csrf token (формат Netscape) csrf_token = None sessionid = None if cookies_file_path.exists(): try: with open(cookies_file_path, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) >= 7: domain = parts[0] # Ищем только cookies от instagram.com if 'instagram' in domain.lower(): cookie_name = parts[5] # Имя cookie cookie_value = parts[6] # Значение cookie if cookie_name == 'csrftoken': csrf_token = cookie_value elif cookie_name == 'sessionid': sessionid = cookie_value # Если нашли оба - можно выходить if csrf_token and sessionid: break except Exception as e: logger.warning(f"Не удалось прочитать cookies: {e}") last_error = None for attempt in range(max_retries): try: # Базовые настройки ydl_opts = { 'format': 'best', 'outtmpl': str(DOWNLOADS_DIR / f'{chat_id}_%(title)s.%(ext)s'), 'quiet': False, 'no_warnings': False, 'socket_timeout': 30, } # Если есть файл с cookies, используем его if cookies_file_path.exists(): # Используем абсолютный путь к cookies ydl_opts['cookiefile'] = str(cookies_file_path.absolute()) logger.info(f"Instagram: используем cookies из {cookies_file_path}") # Добавляем заголовки с csrf token если есть headers = { 'Referer': 'https://www.instagram.com/', 'X-Requested-With': 'XMLHttpRequest', } if csrf_token: headers['X-CSRFToken'] = csrf_token logger.info(f"Instagram: добавлен csrf token в заголовки") if sessionid: logger.info(f"Instagram: sessionid найден (длина: {len(sessionid)})") ydl_opts['http_headers'] = headers logger.info(f"Instagram: начинаем скачивание (попытка {attempt + 1}/{max_retries})") with yt_dlp.YoutubeDL(ydl_opts) as ydl: loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: ydl.download([url])) # Находим скачанный файл downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*')) if downloaded_files: downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) return str(downloaded_files[0]) else: raise Exception("Файл не был найден после скачивания") except Exception as e: last_error = e logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}") if attempt < max_retries - 1: await asyncio.sleep((attempt + 1) * 2) raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram. Возможно, нужно обновить cookies.") async def update_instagram_cookies_from_browser(browser: str = 'chrome') -> bool: """ Автоматически обновляет Instagram cookies из браузера Returns: True если успешно, False если ошибка """ cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt') cookies_file_path = Path(cookies_file) try: logger.info(f"Попытка обновления Instagram cookies из браузера {browser}...") # Пробуем обновить cookies из браузера loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: subprocess.run( [ 'yt-dlp', '--cookies-from-browser', browser, '--cookies', str(cookies_file_path.absolute()), '--no-download', 'https://www.instagram.com/' ], capture_output=True, timeout=30, text=True ) ) if result.returncode == 0: logger.info(f"✅ Instagram cookies успешно обновлены из браузера {browser}") return True else: logger.warning(f"Не удалось обновить cookies из {browser}: {result.stderr[:200]}") return False except subprocess.TimeoutExpired: logger.warning(f"Таймаут при обновлении cookies из {browser}") return False except FileNotFoundError: logger.warning(f"yt-dlp не найден для обновления cookies") return False except Exception as e: logger.error(f"Ошибка при обновлении cookies из браузера: {e}") return False def check_instagram_cookies_expiry() -> tuple[bool, int]: """ Проверяет срок действия Instagram cookies Returns: (is_valid, days_until_expiry) """ cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt') cookies_file_path = Path(cookies_file) if not cookies_file_path.exists(): return False, 0 try: current_time = time.time() min_expiry = None with open(cookies_file_path, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split('\t') if len(parts) >= 7: domain = parts[0] if 'instagram' in domain.lower(): try: expiry = int(parts[4]) # Unix timestamp if min_expiry is None or expiry < min_expiry: min_expiry = expiry except (ValueError, IndexError): continue if min_expiry is None: return False, 0 days_until_expiry = (min_expiry - current_time) / 86400 is_valid = min_expiry > current_time return is_valid, int(days_until_expiry) except Exception as e: logger.error(f"Ошибка при проверке срока действия cookies: {e}") return False, 0 async def keep_instagram_session_alive(): """Поддерживает сессию Instagram активной через периодические запросы и автоматически обновляет cookies""" cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt') cookies_file_path = Path(cookies_file) AUTO_UPDATE_DAYS_BEFORE_EXPIRY = int(os.getenv('INSTAGRAM_AUTO_UPDATE_DAYS', '3')) # Обновлять за 3 дня до истечения if not cookies_file_path.exists(): logger.info("Instagram cookies не найдены, пропускаем поддержание сессии") return # Список браузеров для попытки обновления (по приоритету) browsers_to_try = ['chrome', 'firefox', 'edge', 'opera'] # Проверяем cookies при старте is_valid, days_left = check_instagram_cookies_expiry() if not is_valid: logger.warning("Instagram cookies истекли! Пытаемся автоматически обновить...") # Пытаемся обновить из браузера updated = False for browser in browsers_to_try: if await update_instagram_cookies_from_browser(browser): updated = True break if not updated: logger.error("Не удалось автоматически обновить cookies! Необходимо обновить вручную.") return else: # Перепроверяем после обновления is_valid, days_left = check_instagram_cookies_expiry() if days_left < AUTO_UPDATE_DAYS_BEFORE_EXPIRY: logger.warning(f"Instagram cookies истекают через {days_left} дней! Пытаемся автоматически обновить...") # Пытаемся обновить заранее for browser in browsers_to_try: if await update_instagram_cookies_from_browser(browser): # Перепроверяем _, days_left = check_instagram_cookies_expiry() logger.info(f"Cookies обновлены! Новый срок: {days_left} дней") break else: logger.info(f"Instagram cookies действительны еще {days_left} дней") # Интервал проверки: 24 часа (86400 секунд) check_interval = 86400 while True: try: await asyncio.sleep(check_interval) # Проверяем срок действия перед каждым запросом is_valid, days_left = check_instagram_cookies_expiry() # Автоматическое обновление за N дней до истечения if days_left < AUTO_UPDATE_DAYS_BEFORE_EXPIRY and days_left > 0: logger.warning(f"Instagram cookies истекают через {days_left} дней. Автоматическое обновление...") updated = False for browser in browsers_to_try: if await update_instagram_cookies_from_browser(browser): updated = True _, days_left = check_instagram_cookies_expiry() logger.info(f"✅ Cookies обновлены автоматически! Новый срок: {days_left} дней") break if not updated: logger.warning("Не удалось автоматически обновить cookies. Попробуйте обновить вручную.") if not is_valid: logger.error("Instagram cookies истекли! Пытаемся автоматически обновить...") updated = False for browser in browsers_to_try: if await update_instagram_cookies_from_browser(browser): updated = True is_valid, days_left = check_instagram_cookies_expiry() break if not updated: logger.error("Не удалось автоматически обновить cookies! Остановка поддержания сессии.") break # Делаем легкий запрос к Instagram для поддержания активности logger.info("Поддерживаем активность сессии Instagram...") try: ydl_opts = { 'cookiefile': str(cookies_file_path.absolute()), 'quiet': True, 'no_warnings': True, 'socket_timeout': 10, } loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: yt_dlp.YoutubeDL(ydl_opts).extract_info( 'https://www.instagram.com/', download=False ) ) logger.info(f"Сессия Instagram успешно обновлена. Cookies действительны еще {days_left} дней") except Exception as e: logger.warning(f"Не удалось обновить сессию Instagram: {e}") # Продолжаем работу, попробуем в следующий раз except asyncio.CancelledError: logger.info("Поддержание сессии Instagram остановлено") break except Exception as e: logger.error(f"Ошибка в задаче поддержания сессии Instagram: {e}") # Ждем перед следующей попыткой await asyncio.sleep(3600) # 1 час async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str: """Скачивает видео с VK через внешний сервис""" logger.info(f"VK: отправка запроса на внешний сервис {VK_DOWNLOADER_URL}") last_error = None for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для VK # Отправляем запрос на VK сервис response = await client.post( f"{VK_DOWNLOADER_URL}/download/stream", json={"url": url}, headers={"Content-Type": "application/json"} ) if response.status_code != 200: error_text = response.text try: error_json = response.json() error_text = error_json.get('error', error_text) except: pass raise Exception(f"VK сервис вернул ошибку {response.status_code}: {error_text}") # Сохраняем видео во временный файл video_data = response.content video_ext = 'mp4' # По умолчанию mp4 # Пробуем определить расширение из заголовков content_type = response.headers.get('Content-Type', '') if 'video/' in content_type: video_ext = content_type.split('/')[-1].split(';')[0] # Получаем имя файла из заголовка или создаем случайное filename = response.headers.get('Content-Disposition', '') if filename and 'filename=' in filename: video_filename = filename.split('filename=')[1].strip('"\'') else: video_filename = f'{chat_id}_vk_video.{video_ext}' # Сохраняем файл video_path = DOWNLOADS_DIR / video_filename with open(video_path, 'wb') as f: f.write(video_data) logger.info(f"VK: видео скачано через внешний сервис: {video_path}") return str(video_path) except httpx.TimeoutException: last_error = Exception(f"Таймаут при запросе к VK сервису (попытка {attempt + 1}/{max_retries})") logger.warning(f"VK: таймаут при запросе к сервису: {last_error}") except Exception as e: last_error = e logger.warning(f"VK: попытка {attempt + 1}/{max_retries} не удалась: {e}") if attempt < max_retries - 1: await asyncio.sleep((attempt + 1) * 2) raise last_error or Exception("Неизвестная ошибка при скачивании с VK через внешний сервис") async def download_video(url: str, chat_id: int, max_retries: int = 3) -> str: """Главная функция скачивания - вызывает нужную функцию в зависимости от источника""" source = detect_video_source(url) logger.info(f"Определен источник: {source} для URL: {url}") if source == 'youtube': return await download_youtube_video(url, chat_id, max_retries) elif source == 'instagram': return await download_instagram_video(url, chat_id, max_retries) elif source == 'vk': return await download_vk_video(url, chat_id, max_retries) else: raise Exception("Пардон, не умеем работать с этим источником") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обрабатывает сообщения от пользователей""" if not update.message or not update.message.text: return url = update.message.text.strip() chat_id = update.message.chat_id username = update.message.from_user.username if update.message.from_user else None first_name = update.message.from_user.first_name if update.message.from_user else None # Добавляем пользователя в статистику при первом взаимодействии add_user(chat_id, username, first_name) # Проверяем, является ли сообщение URL if not (url.startswith('http://') or url.startswith('https://')): await update.message.reply_text( "Пожалуйста, отправьте ссылку на видео.\n" "Поддерживаемые источники:\n" "• YouTube (youtube.com, youtu.be)\n" "• Instagram (instagram.com)\n" "• VK (vk.com)\n\n" "Для других источников: Пардон, не умеем 😅" ) return # Проверяем источник до начала обработки source = detect_video_source(url) if source == 'unknown': await update.message.reply_text("Пардон, не умеем работать с этим источником 😅") return # Отправляем сообщение о начале обработки status_message = await update.message.reply_text("🔍 Обрабатываю ссылку...") try: # Скачиваем видео await status_message.edit_text("⬇️ Скачиваю видео...") video_path = await download_video(url, chat_id) # Отправляем файл пользователю await status_message.edit_text("📤 Отправляю видео...") video_file = open(video_path, 'rb') caption = f"Видео скачано с @{TELEGRAM_BOT_USERNAME}" await update.message.reply_video( video=video_file, caption=caption, supports_streaming=True ) video_file.close() # Увеличиваем счетчик скачанных видео increment_downloads() # Удаляем временный файл try: os.remove(video_path) logger.info(f"Удален временный файл: {video_path}") except Exception as e: logger.warning(f"Не удалось удалить файл {video_path}: {e}") await status_message.delete() except Exception as e: logger.error(f"Ошибка: {e}") error_msg = f"❌ Произошла ошибка при обработке видео:\n{str(e)}" await status_message.edit_text(error_msg) async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обрабатывает команду /start""" # Добавляем пользователя в статистику chat_id = update.message.chat_id username = update.message.from_user.username if update.message.from_user else None first_name = update.message.from_user.first_name if update.message.from_user else None add_user(chat_id, username, first_name) await update.message.reply_text( "👋 Привет! Я бот для скачивания видео.\n\n" "Просто отправь мне ссылку на видео, и я скачаю его для тебя.\n\n" "Поддерживаемые источники:\n" "• YouTube (youtube.com, youtu.be)\n" "• Instagram (instagram.com)\n" "• VK (vk.com)\n\n" "Команды:\n" "/start - Начать работу\n" "/stat - Статистика скачанных видео\n\n" "Отправь ссылку на видео:" ) async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обрабатывает команду /stat""" total_downloads = get_total_downloads() total_users = get_total_users() await update.message.reply_text( f"📊 Статистика бота:\n\n" f"👥 Всего пользователей: {total_users}\n" f"📹 Всего скачано видео: {total_downloads}" ) def main(): """Главная функция для запуска бота""" if not TELEGRAM_BOT_TOKEN: logger.error("TELEGRAM_BOT_TOKEN не установлен!") return # Инициализируем базу данных init_database() # Создаем приложение application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # Регистрируем обработчики application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) application.add_handler(CommandHandler("start", start_command)) application.add_handler(CommandHandler("stat", stat_command)) # Запускаем фоновую задачу для поддержания сессии Instagram async def post_init(application: Application): """Выполняется после инициализации приложения""" # Запускаем задачу поддержания сессии Instagram в фоне asyncio.create_task(keep_instagram_session_alive()) logger.info("Фоновая задача поддержания сессии Instagram запущена") application.post_init = post_init # Запускаем бота logger.info("Бот запущен") application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()