videoDownloadTGbot/bot.py

1348 lines
60 KiB
Python
Raw Normal View History

import os
import re
import logging
import asyncio
import sqlite3
import time
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
import httpx
from telegram import Update, Message, Bot, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler, Defaults, CallbackQueryHandler
from telegram.request import HTTPXRequest
from dataclasses import dataclass
from typing import Optional
# Таймаут для HTTP запросов
# Все таймауты убраны - видео может качаться и отправляться очень долго
HTTP_TIMEOUT = httpx.Timeout(connect=None, read=None, write=None, pool=None)
# Таймаут для запроса форматов (не такой критичный, но не должен висеть вечно)
FORMATS_TIMEOUT = httpx.Timeout(connect=15, read=30, write=15, pool=15)
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Токен бота и имя бота из переменных окружения
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot')
2025-12-20 22:17:20 +03:00
# Токен админ бота
ADMIN_BOT_TOKEN = os.getenv('ADMIN_BOT_TOKEN')
# URL сервисов для скачивания видео
YOUTUBE_DOWNLOADER_URL = os.getenv('YOUTUBE_DOWNLOADER_URL', 'http://localhost:5557')
INSTAGRAM_DOWNLOADER_URL = os.getenv('INSTAGRAM_DOWNLOADER_URL', 'http://localhost:5556')
VK_DOWNLOADER_URL = os.getenv('VK_DOWNLOADER_URL', 'http://localhost:5555')
2025-12-12 10:32:06 +03:00
YAPFILES_DOWNLOADER_URL = os.getenv('YAPFILES_DOWNLOADER_URL', 'http://localhost:5558')
TIKTOK_DOWNLOADER_URL = os.getenv('TIKTOK_DOWNLOADER_URL', 'http://localhost:5559')
# Базовая директория проекта (абсолютный путь), чтобы не зависеть от рабочей директории процесса
BASE_DIR = Path(__file__).resolve().parent
# Директория для временных файлов
DOWNLOADS_DIR = BASE_DIR / 'video'
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
# База данных (внутри папки data)
DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_FILE = DATA_DIR / 'bot.db'
# ============================================================================
# ЛОКАЛИЗАЦИЯ
# ============================================================================
TEXTS = {
'ru': {
'start': (
"👋 Привет! Я бот для скачивания видео.\n\n"
"Просто отправь мне ссылку на видео, и я скачаю его для тебя.\n\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• TikTok (tiktok.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"👥 Работа в группах:\n"
"Добавь меня в группу и дай права администратора (нужно право на удаление сообщений). "
"После этого я буду автоматически находить ссылки на видео в сообщениях участников, "
"скачивать их и отправлять прямо в группу, заменяя исходное сообщение со ссылкой.\n\n"
"Отправь ссылку на видео:"
),
'support': (
" <b>О боте</b>\n\n"
"Этот бот позволяет скачивать видео из популярных источников:\n"
"• YouTube — видео и shorts\n"
"• Instagram — reels и посты с видео\n"
"• TikTok — видео\n"
"• VK — видеозаписи\n"
"• Yapfiles — видеофайлы\n\n"
"🔧 <b>Как использовать:</b>\n"
"1. Отправьте ссылку на видео в личный чат с ботом\n"
"2. Дождитесь скачивания\n"
"3. Получите видео прямо в Telegram!\n\n"
"👥 <b>В группах:</b>\n"
"Добавьте бота в группу с правами администратора — "
2025-12-12 17:00:31 +03:00
"он будет автоматически скачивать видео из сообщений участников."
),
2025-12-20 05:22:55 +03:00
'stat': "📊 Статистика бота:\n\n👥 Всего пользователей: {users}\n📹 Всего скачано видео: {downloads}\n\n❌ Ошибки по сервисам:\n{error_stats}",
'send_link': (
"Пожалуйста, отправьте ссылку на видео.\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• TikTok (tiktok.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"Для других источников: Пардон, не умеем 😅"
),
'unsupported_source': "Пардон, не умеем работать с этим источником 😅",
'processing': "🔍 Обрабатываю ссылку...",
'downloading': "⬇️ Скачиваю видео...",
'sending': "📤 Отправляю видео...",
'caption': "Видео скачано с @{bot_username}",
'error': "❌ Произошла ошибка при обработке видео:\n{error}",
'error_unknown_source': "Пардон, не умеем работать с этим источником",
'error_file_too_large': "❌ Видео слишком большое ({size_mb:.1f} МБ, max = 50)",
'queue_position': "🕐 Ваше видео #{position} в очереди\nВаш запрос очень важен для нас!",
'queue_first': "⬇️ Скачиваю видео...",
2026-05-03 03:27:21 +03:00
'select_quality': "Выберите качество видео:\n(через 10 сек — автоскачивание)",
'quality_cancelled': "❌ Выбор отменён",
'fetching_formats': "🔍 Получаю доступные форматы...",
},
'en': {
'start': (
"👋 Hi! I'm a video download bot.\n\n"
"Just send me a video link, and I'll download it for you.\n\n"
"Supported sources:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• TikTok (tiktok.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"👥 Group usage:\n"
"Add me to a group with admin rights (message deletion required). "
"I'll automatically find video links in messages, "
"download them and send directly to the group.\n\n"
"Send a video link:"
),
'support': (
" <b>About the bot</b>\n\n"
"This bot allows you to download videos from popular sources:\n"
"• YouTube — videos and shorts\n"
"• Instagram — reels and video posts\n"
"• TikTok — videos\n"
"• VK — video recordings\n"
"• Yapfiles — video files\n\n"
"🔧 <b>How to use:</b>\n"
"1. Send a video link in a private chat with the bot\n"
"2. Wait for the download\n"
"3. Get the video right in Telegram!\n\n"
"👥 <b>In groups:</b>\n"
"Add the bot to a group with admin rights — "
2025-12-12 17:00:31 +03:00
"it will automatically download videos from participants' messages."
),
2025-12-20 05:22:55 +03:00
'stat': "📊 Bot statistics:\n\n👥 Total users: {users}\n📹 Total downloads: {downloads}\n\n❌ Errors by service:\n{error_stats}",
'send_link': (
"Please send a video link.\n"
"Supported sources:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• TikTok (tiktok.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"Other sources: Sorry, not supported 😅"
),
'unsupported_source': "Sorry, this source is not supported 😅",
'processing': "🔍 Processing link...",
'downloading': "⬇️ Downloading video...",
'sending': "📤 Sending video...",
'caption': "Video downloaded via @{bot_username}",
'error': "❌ Error processing video:\n{error}",
'error_unknown_source': "Sorry, this source is not supported",
'error_file_too_large': "❌ Video is too large ({size_mb:.1f} MB, max = 50)",
'queue_position': "🕐 Your video is #{position} in queue\nYour request is very important to us!",
'queue_first': "⬇️ Downloading video...",
2026-05-03 03:27:21 +03:00
'select_quality': "Select video quality:\n(10 sec auto-download)",
'quality_cancelled': "❌ Cancelled",
'fetching_formats': "🔍 Fetching available formats...",
}
}
def get_locale_from_language_code(language_code: str | None) -> str:
"""Определяет локаль на основе language_code из Telegram"""
if language_code and language_code.lower().startswith('ru'):
return 'ru'
return 'en'
def get_text(locale: str, key: str, **kwargs) -> str:
"""Возвращает локализованный текст"""
if locale not in TEXTS:
locale = 'en'
text = TEXTS[locale].get(key, TEXTS['en'].get(key, key))
if kwargs:
text = text.format(**kwargs)
return text
# ============================================================================
# БАЗА ДАННЫХ
# ============================================================================
def init_database():
"""Инициализирует базу данных и создает таблицы если их нет"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
# Таблица пользователей
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
chat_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL
)
''')
# Проверяем, есть ли колонка locale (миграция для существующей базы)
cursor.execute("PRAGMA table_info(users)")
columns = [col[1] for col in cursor.fetchall()]
if 'locale' not in columns:
cursor.execute("ALTER TABLE users ADD COLUMN locale TEXT DEFAULT 'en'")
logger.info("Добавлена колонка locale в таблицу users")
# Таблица статистики
cursor.execute('''
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_downloads INTEGER DEFAULT 0
)
''')
# Инициализируем stats если его нет
cursor.execute('SELECT COUNT(*) FROM stats')
if cursor.fetchone()[0] == 0:
cursor.execute('INSERT INTO stats (id, total_downloads) VALUES (1, 0)')
2025-12-20 05:22:55 +03:00
# Таблица статистики ошибок по сервисам
cursor.execute('''
CREATE TABLE IF NOT EXISTS error_stats (
service TEXT PRIMARY KEY,
error_count INTEGER DEFAULT 0
)
''')
# Инициализируем счетчики ошибок для всех сервисов
services = ['youtube', 'instagram', 'tiktok', 'vk', 'yapfiles', 'unknown']
for service in services:
cursor.execute('INSERT OR IGNORE INTO error_stats (service, error_count) VALUES (?, 0)', (service,))
conn.commit()
conn.close()
logger.info("База данных инициализирована")
except Exception as e:
logger.error(f"Ошибка при инициализации базы данных: {e}")
def get_total_downloads() -> int:
"""Возвращает общее количество скачанных видео"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT total_downloads FROM stats WHERE id = 1')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Ошибка при получении количества скачанных видео: {e}")
return 0
def increment_downloads():
"""Увеличивает счетчик скачанных видео"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('UPDATE stats SET total_downloads = total_downloads + 1 WHERE id = 1')
conn.commit()
new_total = get_total_downloads()
conn.close()
logger.info(f"Общее количество скачанных видео: {new_total}")
except Exception as e:
logger.error(f"Ошибка при увеличении счетчика скачанных видео: {e}")
def get_total_users() -> int:
"""Возвращает общее количество уникальных пользователей"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM users')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Ошибка при получении количества пользователей: {e}")
return 0
2025-12-20 05:22:55 +03:00
def increment_error_count(service: str):
"""Увеличивает счетчик ошибок для указанного сервиса"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('''
INSERT INTO error_stats (service, error_count)
VALUES (?, 1)
ON CONFLICT(service) DO UPDATE SET error_count = error_count + 1
''', (service,))
conn.commit()
conn.close()
logger.info(f"Увеличено количество ошибок для сервиса {service}")
except Exception as e:
logger.error(f"Ошибка при увеличении счетчика ошибок для {service}: {e}")
def get_error_stats() -> dict[str, int]:
"""Возвращает статистику ошибок по сервисам"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT service, error_count FROM error_stats ORDER BY service')
results = cursor.fetchall()
conn.close()
return {service: count for service, count in results}
except Exception as e:
logger.error(f"Ошибка при получении статистики ошибок: {e}")
return {}
def get_user_locale(chat_id: int) -> str:
"""Возвращает локаль пользователя из базы данных"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT locale FROM users WHERE chat_id = ?', (chat_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result and result[0] else 'en'
except Exception as e:
logger.error(f"Ошибка при получении локали пользователя: {e}")
return 'en'
def add_user(chat_id: int, username: str = None, first_name: str = None, locale: str = 'en'):
"""Добавляет пользователя в базу данных или обновляет информацию о нем"""
try:
now = datetime.now().isoformat()
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
# Проверяем, существует ли пользователь
cursor.execute('SELECT chat_id FROM users WHERE chat_id = ?', (chat_id,))
exists = cursor.fetchone()
if exists:
# Обновляем last_seen и locale
cursor.execute(
'UPDATE users SET last_seen = ?, username = ?, first_name = ?, locale = ? WHERE chat_id = ?',
(now, username, first_name, locale, chat_id)
)
else:
# Добавляем нового пользователя
cursor.execute(
'INSERT INTO users (chat_id, username, first_name, first_seen, last_seen, locale) VALUES (?, ?, ?, ?, ?, ?)',
(chat_id, username, first_name, now, now, locale)
)
total_users = get_total_users()
logger.info(f"Добавлен новый пользователь (chat_id: {chat_id}, locale: {locale}). Всего пользователей: {total_users}")
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Ошибка при добавлении пользователя: {e}")
# ============================================================================
# УТИЛИТЫ
# ============================================================================
def detect_video_source(url: str) -> str:
"""Определяет источник видео по URL"""
domain = urlparse(url).netloc.lower()
if 'youtube.com' in domain or 'youtu.be' in domain:
return 'youtube'
elif 'instagram.com' in domain:
return 'instagram'
elif 'vk.com' in domain or 'vk.ru' in domain or 'vkontakte.ru' in domain:
return 'vk'
elif 'yapfiles.ru' in domain:
2025-12-12 10:32:06 +03:00
return 'yapfiles'
elif 'tiktok.com' in domain:
return 'tiktok'
else:
return 'unknown'
def extract_urls_from_text(text: str) -> list[str]:
"""Извлекает все URL из текста сообщения"""
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
urls = re.findall(url_pattern, text)
return urls
def cleanup_old_files():
"""Удаляет только .part файлы (недокачанные) из папки загрузок"""
try:
for file_path in DOWNLOADS_DIR.glob('*'):
if not file_path.is_file():
continue
if file_path.suffix == '.part':
try:
file_path.unlink()
logger.info(f"Удален .part файл: {file_path.name}")
except Exception as e:
logger.warning(f"Не удалось удалить .part файл {file_path.name}: {e}")
except Exception as e:
logger.error(f"Ошибка при очистке .part файлов: {e}")
2025-12-20 22:17:20 +03:00
def get_admin_chat_id() -> int | None:
"""Получает сохраненный chat_id админа из файла"""
try:
admin_chat_id_file = DATA_DIR / 'admin_chat_id.txt'
if admin_chat_id_file.exists():
with open(admin_chat_id_file, 'r') as f:
chat_id = f.read().strip()
if chat_id:
return int(chat_id)
except Exception as e:
logger.error(f"Ошибка при чтении chat_id админа: {e}")
return None
async def send_video_to_admin_bot(video_path: str, url: str, from_user=None):
"""Отправляет копию видео админ боту"""
if not ADMIN_BOT_TOKEN:
return # Админ бот не настроен, пропускаем
# Получаем chat_id админа из файла
admin_chat_id = get_admin_chat_id()
if not admin_chat_id:
logger.debug("Админ chat_id не найден, пропускаю отправку видео")
return
try:
# Создаем бота для отправки
request = HTTPXRequest(
read_timeout=600,
write_timeout=600,
connect_timeout=60,
pool_timeout=60
)
admin_bot = Bot(token=ADMIN_BOT_TOKEN, request=request)
# Формируем информацию о пользователе
user_info = ""
if from_user:
username = f"@{from_user.username}" if from_user.username else "без username"
user_info = f"Пользователь: {username} (ID: {from_user.id})"
if from_user.first_name:
user_info += f", {from_user.first_name}"
# Формируем подпись
caption = f"📥 Видео скачано пользователем\n\n🔗 URL: {url}"
if user_info:
caption += f"\n👤 {user_info}"
# Отправляем видео
with open(video_path, 'rb') as video_file:
await admin_bot.send_video(
chat_id=admin_chat_id,
video=video_file,
caption=caption,
supports_streaming=True,
read_timeout=600,
write_timeout=600,
connect_timeout=60,
pool_timeout=60
)
logger.info(f"Видео отправлено админ боту: {video_path}")
except Exception as e:
logger.error(f"Ошибка при отправке видео админ боту: {e}")
# ============================================================================
# СИСТЕМА ОЧЕРЕДЕЙ
# ============================================================================
@dataclass
class QueueItem:
"""Элемент очереди для скачивания видео"""
original_message: Message
status_message: Message
url: str
chat_id: int
chat_type: str
locale: str
format_id: str | None = None
# Глобальная очередь и список элементов для отслеживания позиций
download_queue: asyncio.Queue = None
queue_items: list[QueueItem] = []
queue_lock = asyncio.Lock()
async def update_queue_positions():
"""Обновляет статусы позиций в очереди для всех ожидающих"""
async with queue_lock:
for i, item in enumerate(queue_items):
position = i + 1
try:
if position == 1:
# Первый в очереди - сейчас качается
await item.status_message.edit_text(get_text(item.locale, 'queue_first'))
else:
# Остальные в очереди
await item.status_message.edit_text(
get_text(item.locale, 'queue_position', position=position)
)
except Exception as e:
logger.warning(f"Не удалось обновить статус очереди: {e}")
async def process_queue_item(item: QueueItem):
"""Обрабатывает один элемент очереди"""
try:
# Скачиваем видео
video_path = await download_video(item.url, item.chat_id, item.locale, format_id=item.format_id)
# Проверяем размер файла (лимит Telegram Bot API - 50 МБ)
file_size = Path(video_path).stat().st_size
max_size = 50 * 1024 * 1024 # 50 MB
if file_size > max_size:
2025-12-20 05:22:55 +03:00
# Определяем источник и увеличиваем счетчик ошибок
source = detect_video_source(item.url)
increment_error_count(source)
size_mb = file_size / (1024 * 1024)
error_msg = get_text(item.locale, 'error_file_too_large', size_mb=size_mb)
await item.status_message.edit_text(error_msg)
# Удаляем слишком большой файл
try:
Path(video_path).unlink()
logger.info(f"Удалён слишком большой файл: {video_path} ({size_mb:.1f} MB)")
except:
pass
return
# Отправляем файл пользователю
await item.status_message.edit_text(get_text(item.locale, 'sending'))
video_file = open(video_path, 'rb')
caption = get_text(item.locale, 'caption', bot_username=TELEGRAM_BOT_USERNAME)
caption += f"\n\n{item.url}"
# Определяем имя файла для отправки
video_filename = Path(video_path).name
# Отправляем как видео со streaming — встроенный плеер Telegram
await item.original_message.reply_video(
video=video_file,
filename=video_filename,
caption=caption,
supports_streaming=True,
read_timeout=600,
write_timeout=600,
connect_timeout=60,
pool_timeout=60
)
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
logger.info(f"Видео сохранено: {video_path}")
2025-12-20 22:17:20 +03:00
# Отправляем копию видео админ боту
await send_video_to_admin_bot(video_path, item.url, item.original_message.from_user)
# Удаляем файл после успешной отправки пользователю и админ боту
try:
Path(video_path).unlink()
logger.info(f"Файл удален после успешной отправки: {video_path}")
except Exception as delete_error:
logger.warning(f"Не удалось удалить файл {video_path}: {delete_error}")
# Удаляем статусное сообщение и исходное сообщение со ссылкой
try:
await item.status_message.delete()
await item.original_message.delete()
logger.info(f"Удалено сообщение пользователя с ссылкой (chat_id: {item.chat_id})")
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
except Exception as e:
logger.error(f"Ошибка при обработке {item.url}: {e}")
2025-12-20 05:22:55 +03:00
# Определяем источник и увеличиваем счетчик ошибок
source = detect_video_source(item.url)
increment_error_count(source)
error_msg = get_text(item.locale, 'error', error=str(e))
try:
await item.status_message.edit_text(error_msg)
except:
try:
await item.original_message.reply_text(error_msg)
except:
pass
try:
for part_file in DOWNLOADS_DIR.glob(f'{item.chat_id}_*.part'):
part_file.unlink()
logger.info(f"Удален .part файл после ошибки: {part_file.name}")
except Exception as cleanup_error:
logger.warning(f"Не удалось удалить .part файлы после ошибки: {cleanup_error}")
async def queue_worker():
"""Воркер, обрабатывающий очередь последовательно"""
global download_queue, queue_items
logger.info("Воркер очереди запущен")
while True:
try:
# Ждём элемент из очереди
item = await download_queue.get()
logger.info(f"Начинаю обработку: {item.url} (в очереди: {len(queue_items)})")
# Обновляем статусы - первый теперь качается
await update_queue_positions()
# Обрабатываем элемент
await process_queue_item(item)
# Удаляем из списка отслеживания
async with queue_lock:
if item in queue_items:
queue_items.remove(item)
# Обновляем позиции оставшихся
await update_queue_positions()
# Сообщаем что задача выполнена
download_queue.task_done()
logger.info(f"Обработка завершена. Осталось в очереди: {len(queue_items)}")
except Exception as e:
logger.error(f"Ошибка в воркере очереди: {e}")
async def add_to_queue(item: QueueItem) -> int:
"""Добавляет элемент в очередь и возвращает позицию"""
global queue_items
async with queue_lock:
queue_items.append(item)
position = len(queue_items)
await download_queue.put(item)
logger.info(f"Добавлено в очередь: {item.url}, позиция: {position}")
return position
# ============================================================================
# ФУНКЦИИ СКАЧИВАНИЯ
# ============================================================================
async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3, format_id: str | None = None) -> str:
"""Скачивает видео с YouTube через внешний сервис"""
logger.info(f"YouTube: отправка запроса на внешний сервис {YOUTUBE_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
# Формируем тело запроса, опционально с format_id
body = {"url": url}
if format_id:
body["format_id"] = format_id
response = await client.post(
f"{YOUTUBE_DOWNLOADER_URL}/download/stream",
json=body,
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"YouTube сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_youtube_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"YouTube: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к YouTube сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"YouTube: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube через внешний сервис")
async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Instagram через внешний сервис"""
logger.info(f"Instagram: отправка запроса на внешний сервис {INSTAGRAM_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post(
f"{INSTAGRAM_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"Instagram сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_instagram_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"Instagram: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к Instagram сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"Instagram: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram через внешний сервис")
async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с VK через внешний сервис"""
logger.info(f"VK: отправка запроса на внешний сервис {VK_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post(
f"{VK_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"VK сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_vk_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"VK: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к VK сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"VK: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"VK: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с VK через внешний сервис")
2025-12-12 10:32:06 +03:00
async def download_yapfiles_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Yapfiles через внешний сервис"""
logger.info(f"Yapfiles: отправка запроса на внешний сервис {YAPFILES_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
2025-12-12 10:32:06 +03:00
response = await client.post(
f"{YAPFILES_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"Yapfiles сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_yapfiles_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"Yapfiles: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к Yapfiles сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"Yapfiles: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"Yapfiles: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Yapfiles через внешний сервис")
async def download_tiktok_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с TikTok через внешний сервис"""
logger.info(f"TikTok: отправка запроса на внешний сервис {TIKTOK_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post(
f"{TIKTOK_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"TikTok сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_tiktok_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"TikTok: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к TikTok сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"TikTok: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"TikTok: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с TikTok через внешний сервис")
# ============================================================================
# ВЫБОР КАЧЕСТВА (только для YouTube)
# ============================================================================
async def get_formats_from_service(url: str) -> list[dict] | None:
"""Получает список доступных форматов для YouTube URL через сервис youtube-downloader"""
logger.info(f"Получение форматов для YouTube: {url}")
try:
async with httpx.AsyncClient(timeout=FORMATS_TIMEOUT) as client:
response = await client.post(
f"{YOUTUBE_DOWNLOADER_URL}/formats",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
return data.get('formats', [])
logger.warning(f"Не удалось получить форматы: {response.status_code}")
return None
except Exception as e:
logger.error(f"Ошибка при получении форматов: {e}")
return None
async def show_quality_selection(status_message: Message, formats: list[dict], locale: str):
"""Показывает inline клавиатуру с выбором качества видео
Используем короткий индекс (quality:0, quality:1, ...) вместо полного format_id,
т.к. Telegram ограничивает callback_data 64 байтами, а format_id может быть длинным
(например "308+251-drc/bestvideo[height<=1080]+bestaudio/best[height<=1080]").
"""
keyboard = []
for idx, fmt in enumerate(formats):
button_text = fmt.get('label', fmt.get('quality', 'Unknown'))
keyboard.append([InlineKeyboardButton(
text=button_text,
callback_data=f"quality:{idx}"
)])
# Кнопка отмены
keyboard.append([InlineKeyboardButton(
text=get_text(locale, 'quality_cancelled'),
callback_data="quality:cancel"
)])
reply_markup = InlineKeyboardMarkup(keyboard)
await status_message.edit_text(
get_text(locale, 'select_quality'),
reply_markup=reply_markup
)
async def handle_format_selection(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает выбор качества пользователем через callback query"""
query = update.callback_query
await query.answer()
chat_id = query.message.chat_id
callback_data = query.data
# Получаем сохраненные данные
data = context.user_data.pop(f'quality_{chat_id}', None)
# Отменяем авто-выбор качества
auto_task = context.user_data.pop(f'quality_auto_{chat_id}', None)
if auto_task:
auto_task.cancel()
if not data:
await query.edit_message_text("Session expired, please send the link again")
return
locale = data['locale']
status_message = data['status_message']
if callback_data == "quality:cancel":
await status_message.edit_text(get_text(locale, 'quality_cancelled'))
return
# Извлекаем индекс формата и получаем format_id из сохранённого списка
try:
format_index = int(callback_data.replace('quality:', ''))
formats_list = data.get('formats_list', [])
if format_index < 0 or format_index >= len(formats_list):
raise ValueError(f"Index {format_index} out of range")
format_id = formats_list[format_index].get('format_id', '')
except (ValueError, IndexError) as e:
logger.error(f"Invalid format selection: {e}")
await status_message.edit_text(get_text(locale, 'processing'))
format_id = None # Скачиваем без выбора качества
# Обновляем сообщение - добавляем в очередь
await status_message.edit_text(get_text(locale, 'processing'))
# Создаём элемент очереди с выбранным format_id
item = QueueItem(
original_message=data['original_message'],
status_message=status_message,
url=data['url'],
chat_id=chat_id,
chat_type=data['chat_type'],
locale=locale,
format_id=format_id
)
# Добавляем в очередь
position = await add_to_queue(item)
# Показываем позицию в очереди
if position == 1:
await status_message.edit_text(get_text(locale, 'queue_first'))
else:
await status_message.edit_text(
get_text(locale, 'queue_position', position=position)
)
async def _auto_select_after_delay(context: ContextTypes.DEFAULT_TYPE, chat_id: int, delay: int = 10):
"""Автовыбор лучшего качества через delay секунд, если пользователь не выбрал"""
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
return # пользователь выбрал вручную
data = context.user_data.pop(f'quality_{chat_id}', None)
if not data:
return # уже обработано
context.user_data.pop(f'quality_auto_{chat_id}', None) # чистим
formats_list = data.get('formats_list', [])
if not formats_list:
return
locale = data['locale']
status_message = data['status_message']
# Автовыбор: ищем 480p или ближайшее ниже
preferred_qualities = ['480p', '360p', '240p', '144p']
selected = None
for pq in preferred_qualities:
for fmt in formats_list:
if fmt.get('quality', '') == pq:
selected = fmt
break
if selected:
break
if not selected:
selected = formats_list[0] # fallback на лучшее
format_id = selected.get('format_id', '')
quality_label = selected.get('quality', selected.get('label', 'best'))
logger.info(f"Автовыбор качества для chat_id={chat_id}: {quality_label}")
await status_message.edit_text(get_text(locale, 'processing'))
item = QueueItem(
original_message=data['original_message'],
status_message=status_message,
url=data['url'],
chat_id=chat_id,
chat_type=data['chat_type'],
locale=locale,
format_id=format_id
)
position = await add_to_queue(item)
if position == 1:
await status_message.edit_text(get_text(locale, 'queue_first'))
else:
await status_message.edit_text(
get_text(locale, 'queue_position', position=position)
)
async def download_video(url: str, chat_id: int, locale: str, max_retries: int = 3, format_id: str | None = None) -> str:
"""Главная функция скачивания - вызывает нужную функцию в зависимости от источника"""
source = detect_video_source(url)
logger.info(f"Определен источник: {source} для URL: {url}")
if source == 'youtube':
return await download_youtube_video(url, chat_id, max_retries, format_id=format_id)
elif source == 'instagram':
return await download_instagram_video(url, chat_id, max_retries)
elif source == 'vk':
return await download_vk_video(url, chat_id, max_retries)
2025-12-12 10:32:06 +03:00
elif source == 'yapfiles':
return await download_yapfiles_video(url, chat_id, max_retries)
elif source == 'tiktok':
return await download_tiktok_video(url, chat_id, max_retries)
else:
raise Exception(get_text(locale, 'error_unknown_source'))
# ============================================================================
# ОБРАБОТЧИКИ КОМАНД И СООБЩЕНИЙ
# ============================================================================
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает сообщения от пользователей"""
if not update.message or not update.message.text:
return
text = update.message.text.strip()
chat_id = update.message.chat_id
chat_type = update.message.chat.type
username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
language_code = update.message.from_user.language_code if update.message.from_user else None
# Определяем локаль
locale = get_locale_from_language_code(language_code)
# Добавляем пользователя в статистику
add_user(chat_id, username, first_name, locale)
# Извлекаем все URL из текста
urls = extract_urls_from_text(text)
# Если это личный чат и нет ссылок, отправляем инструкцию
if not urls and chat_type == 'private':
await update.message.reply_text(get_text(locale, 'send_link'))
return
# Если нет ссылок в группе, просто игнорируем сообщение
if not urls:
return
# Обрабатываем первую найденную ссылку
url = urls[0]
# Проверяем источник до начала обработки
source = detect_video_source(url)
if source == 'unknown':
if chat_type == 'private':
await update.message.reply_text(get_text(locale, 'unsupported_source'))
return
# Отправляем сообщение о начале обработки
status_message = await update.message.reply_text(get_text(locale, 'processing'))
# Для YouTube - показываем выбор качества перед добавлением в очередь
if source == 'youtube':
await status_message.edit_text(get_text(locale, 'fetching_formats'))
formats = await get_formats_from_service(url)
if formats:
# Сохраняем данные для обработки в колбэке
context.user_data[f'quality_{chat_id}'] = {
'url': url,
'locale': locale,
'chat_id': chat_id,
'chat_type': chat_type,
'original_message': update.message,
'status_message': status_message,
'formats_list': formats, # для lookup по индексу в callback
}
await show_quality_selection(status_message, formats, locale)
# Автовыбор лучшего качества через 10 сек
auto_task = asyncio.create_task(_auto_select_after_delay(context, chat_id, 10))
context.user_data[f'quality_auto_{chat_id}'] = auto_task
return
# Если не удалось получить форматы, скачиваем как обычно (без выбора качества)
await status_message.edit_text(get_text(locale, 'processing'))
# Создаём элемент очереди
item = QueueItem(
original_message=update.message,
status_message=status_message,
url=url,
chat_id=chat_id,
chat_type=chat_type,
locale=locale
)
# Добавляем в очередь
position = await add_to_queue(item)
# Показываем позицию в очереди
if position == 1:
# Первый - сразу начинаем качать
await status_message.edit_text(get_text(locale, 'queue_first'))
else:
# В очереди - показываем позицию
await status_message.edit_text(
get_text(locale, 'queue_position', position=position)
)
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /start"""
chat_id = update.message.chat_id
username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
language_code = update.message.from_user.language_code if update.message.from_user else None
locale = get_locale_from_language_code(language_code)
add_user(chat_id, username, first_name, locale)
await update.message.reply_text(get_text(locale, 'start'))
async def support_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /support"""
language_code = update.message.from_user.language_code if update.message.from_user else None
locale = get_locale_from_language_code(language_code)
await update.message.reply_text(
get_text(locale, 'support'),
parse_mode='HTML'
)
# ============================================================================
# MAIN
# ============================================================================
def main():
"""Главная функция для запуска бота"""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN не установлен!")
return
# Инициализируем базу данных
init_database()
# Очищаем .part файлы при старте
logger.info("Очистка .part файлов при старте...")
cleanup_old_files()
# Создаем приложение с максимальными таймаутами для больших файлов
request = HTTPXRequest(
read_timeout=3600, # 1 час на получение ответа
write_timeout=3600, # 1 час на отправку видео
connect_timeout=300, # 5 минут на соединение
pool_timeout=300 # 5 минут на получение соединения из пула
)
application = (
Application.builder()
.token(TELEGRAM_BOT_TOKEN)
.request(request)
.get_updates_request(HTTPXRequest(read_timeout=120, connect_timeout=60))
.build()
)
# Регистрируем обработчики
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("support", support_command))
application.add_handler(CallbackQueryHandler(handle_format_selection, pattern=r'^quality:'))
# Инициализация очереди и запуск воркера
async def post_init(application: Application):
"""Выполняется после инициализации приложения"""
global download_queue
# Инициализируем очередь
download_queue = asyncio.Queue()
# Запускаем воркер очереди
asyncio.create_task(queue_worker())
logger.info("Воркер очереди запущен")
# Запускаем периодическую очистку .part файлов (каждые 6 часов)
async def periodic_cleanup():
while True:
await asyncio.sleep(6 * 3600)
cleanup_old_files()
logger.info("Периодическая очистка .part файлов выполнена")
asyncio.create_task(periodic_cleanup())
logger.info("Фоновая задача периодической очистки файлов запущена")
application.post_init = post_init
# Запускаем бота
logger.info("Бот запущен")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()