videoDownloadTGbot/bot.py
2025-12-12 10:32:06 +03:00

631 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import logging
import asyncio
import sqlite3
import time
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
import httpx
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Токен бота и имя бота из переменных окружения
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot')
# URL сервисов для скачивания видео
YOUTUBE_DOWNLOADER_URL = os.getenv('YOUTUBE_DOWNLOADER_URL', 'http://localhost:5557')
INSTAGRAM_DOWNLOADER_URL = os.getenv('INSTAGRAM_DOWNLOADER_URL', 'http://localhost:5556')
VK_DOWNLOADER_URL = os.getenv('VK_DOWNLOADER_URL', 'http://localhost:5555')
YAPFILES_DOWNLOADER_URL = os.getenv('YAPFILES_DOWNLOADER_URL', 'http://localhost:5558')
# Базовая директория проекта (абсолютный путь), чтобы не зависеть от рабочей директории процесса
BASE_DIR = Path(__file__).resolve().parent
# Директория для временных файлов
DOWNLOADS_DIR = BASE_DIR / 'video'
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
# База данных (внутри папки data)
DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_FILE = DATA_DIR / 'bot.db'
def init_database():
"""Инициализирует базу данных и создает таблицы если их нет"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
# Таблица пользователей
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
chat_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL
)
''')
# Таблица статистики
cursor.execute('''
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_downloads INTEGER DEFAULT 0
)
''')
# Инициализируем stats если его нет
cursor.execute('SELECT COUNT(*) FROM stats')
if cursor.fetchone()[0] == 0:
cursor.execute('INSERT INTO stats (id, total_downloads) VALUES (1, 0)')
conn.commit()
conn.close()
logger.info("База данных инициализирована")
except Exception as e:
logger.error(f"Ошибка при инициализации базы данных: {e}")
def get_total_downloads() -> int:
"""Возвращает общее количество скачанных видео"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT total_downloads FROM stats WHERE id = 1')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Ошибка при получении количества скачанных видео: {e}")
return 0
def increment_downloads():
"""Увеличивает счетчик скачанных видео"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('UPDATE stats SET total_downloads = total_downloads + 1 WHERE id = 1')
conn.commit()
new_total = get_total_downloads()
conn.close()
logger.info(f"Общее количество скачанных видео: {new_total}")
except Exception as e:
logger.error(f"Ошибка при увеличении счетчика скачанных видео: {e}")
def get_total_users() -> int:
"""Возвращает общее количество уникальных пользователей"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM users')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Ошибка при получении количества пользователей: {e}")
return 0
def add_user(chat_id: int, username: str = None, first_name: str = None):
"""Добавляет пользователя в базу данных или обновляет информацию о нем"""
try:
now = datetime.now().isoformat()
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
# Проверяем, существует ли пользователь
cursor.execute('SELECT chat_id FROM users WHERE chat_id = ?', (chat_id,))
exists = cursor.fetchone()
if exists:
# Обновляем last_seen
cursor.execute(
'UPDATE users SET last_seen = ?, username = ?, first_name = ? WHERE chat_id = ?',
(now, username, first_name, chat_id)
)
else:
# Добавляем нового пользователя
cursor.execute(
'INSERT INTO users (chat_id, username, first_name, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)',
(chat_id, username, first_name, now, now)
)
total_users = get_total_users()
logger.info(f"Добавлен новый пользователь (chat_id: {chat_id}). Всего пользователей: {total_users}")
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Ошибка при добавлении пользователя: {e}")
def detect_video_source(url: str) -> str:
"""Определяет источник видео по URL"""
domain = urlparse(url).netloc.lower()
if 'youtube.com' in domain or 'youtu.be' in domain:
return 'youtube'
elif 'instagram.com' in domain:
return 'instagram'
elif 'vk.com' in domain or 'vkontakte.ru' in domain:
return 'vk'
elif 'yapfiles.ru' in domain:
return 'yapfiles'
else:
return 'unknown'
def extract_urls_from_text(text: str) -> list[str]:
"""Извлекает все URL из текста сообщения"""
# Регулярное выражение для поиска URL (http/https)
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
urls = re.findall(url_pattern, text)
return urls
def cleanup_old_files():
"""Удаляет только .part файлы (недокачанные) из папки загрузок"""
try:
for file_path in DOWNLOADS_DIR.glob('*'):
if not file_path.is_file():
continue
# Удаляем только .part файлы (недокачанные)
if file_path.suffix == '.part':
try:
file_path.unlink()
logger.info(f"Удален .part файл: {file_path.name}")
except Exception as e:
logger.warning(f"Не удалось удалить .part файл {file_path.name}: {e}")
except Exception as e:
logger.error(f"Ошибка при очистке .part файлов: {e}")
async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с YouTube через внешний сервис"""
logger.info(f"YouTube: отправка запроса на внешний сервис {YOUTUBE_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для YouTube
# Отправляем запрос на YouTube сервис
response = await client.post(
f"{YOUTUBE_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"YouTube сервис вернул ошибку {response.status_code}: {error_text}")
# Сохраняем видео во временный файл
video_data = response.content
video_ext = 'mp4' # По умолчанию mp4
# Пробуем определить расширение из заголовков
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
# Получаем имя файла из заголовка или создаем случайное
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_youtube_video.{video_ext}'
# Сохраняем файл
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"YouTube: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к YouTube сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"YouTube: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube через внешний сервис")
async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Instagram через внешний сервис"""
logger.info(f"Instagram: отправка запроса на внешний сервис {INSTAGRAM_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для Instagram
# Отправляем запрос на Instagram сервис
response = await client.post(
f"{INSTAGRAM_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"Instagram сервис вернул ошибку {response.status_code}: {error_text}")
# Сохраняем видео во временный файл
video_data = response.content
video_ext = 'mp4' # По умолчанию mp4
# Пробуем определить расширение из заголовков
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
# Получаем имя файла из заголовка или создаем случайное
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_instagram_video.{video_ext}'
# Сохраняем файл
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"Instagram: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к Instagram сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"Instagram: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram через внешний сервис")
async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с VK через внешний сервис"""
logger.info(f"VK: отправка запроса на внешний сервис {VK_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для VK
# Отправляем запрос на VK сервис
response = await client.post(
f"{VK_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"VK сервис вернул ошибку {response.status_code}: {error_text}")
# Сохраняем видео во временный файл
video_data = response.content
video_ext = 'mp4' # По умолчанию mp4
# Пробуем определить расширение из заголовков
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
# Получаем имя файла из заголовка или создаем случайное
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_vk_video.{video_ext}'
# Сохраняем файл
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"VK: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к VK сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"VK: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"VK: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с VK через внешний сервис")
async def download_yapfiles_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Yapfiles через внешний сервис"""
logger.info(f"Yapfiles: отправка запроса на внешний сервис {YAPFILES_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=600.0) as client:
response = await client.post(
f"{YAPFILES_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"Yapfiles сервис вернул ошибку {response.status_code}: {error_text}")
video_data = response.content
video_ext = 'mp4'
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_yapfiles_video.{video_ext}'
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"Yapfiles: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к Yapfiles сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"Yapfiles: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"Yapfiles: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Yapfiles через внешний сервис")
async def download_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Главная функция скачивания - вызывает нужную функцию в зависимости от источника"""
source = detect_video_source(url)
logger.info(f"Определен источник: {source} для URL: {url}")
if source == 'youtube':
return await download_youtube_video(url, chat_id, max_retries)
elif source == 'instagram':
return await download_instagram_video(url, chat_id, max_retries)
elif source == 'vk':
return await download_vk_video(url, chat_id, max_retries)
elif source == 'yapfiles':
return await download_yapfiles_video(url, chat_id, max_retries)
else:
raise Exception("Пардон, не умеем работать с этим источником")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает сообщения от пользователей"""
if not update.message or not update.message.text:
return
text = update.message.text.strip()
chat_id = update.message.chat_id
chat_type = update.message.chat.type # 'private', 'group', 'supergroup'
username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
# Добавляем пользователя в статистику при первом взаимодействии
add_user(chat_id, username, first_name)
# Извлекаем все URL из текста
urls = extract_urls_from_text(text)
# Если это личный чат и нет ссылок, отправляем инструкцию
if not urls and chat_type == 'private':
await update.message.reply_text(
"Пожалуйста, отправьте ссылку на видео.\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"Для других источников: Пардон, не умеем 😅"
)
return
# Если нет ссылок в группе, просто игнорируем сообщение
if not urls:
return
# Обрабатываем первую найденную ссылку
url = urls[0]
# Проверяем источник до начала обработки
source = detect_video_source(url)
if source == 'unknown':
# В группах не отвечаем на неподдерживаемые источники, чтобы не спамить
if chat_type == 'private':
await update.message.reply_text("Пардон, не умеем работать с этим источником 😅")
return
# Отправляем сообщение о начале обработки
status_message = await update.message.reply_text("🔍 Обрабатываю ссылку...")
try:
# Скачиваем видео
await status_message.edit_text("⬇️ Скачиваю видео...")
video_path = await download_video(url, chat_id)
# Отправляем файл пользователю
await status_message.edit_text("📤 Отправляю видео...")
video_file = open(video_path, 'rb')
caption = f"Видео скачано с @{TELEGRAM_BOT_USERNAME}"
await update.message.reply_video(
video=video_file,
caption=caption,
supports_streaming=True
)
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
# Сохраняем видео в папку video (не удаляем)
logger.info(f"Видео сохранено: {video_path}")
# Удаляем статусное сообщение и исходное сообщение со ссылкой
try:
await status_message.delete()
await update.message.delete()
logger.info(f"Удалено сообщение пользователя с ссылкой (chat_id: {chat_id}, тип чата: {chat_type})")
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
# Если не удалось удалить (нет прав), просто логируем
except Exception as e:
logger.error(f"Ошибка: {e}")
error_msg = f"❌ Произошла ошибка при обработке видео:\n{str(e)}"
try:
await status_message.edit_text(error_msg)
except:
# Если status_message не существует, создаем новое сообщение
await update.message.reply_text(error_msg)
# При ошибке тоже пытаемся удалить временные файлы
try:
# Удаляем все .part файлы для этого chat_id
for part_file in DOWNLOADS_DIR.glob(f'{chat_id}_*.part'):
part_file.unlink()
logger.info(f"Удален .part файл после ошибки: {part_file.name}")
except Exception as cleanup_error:
logger.warning(f"Не удалось удалить .part файлы после ошибки: {cleanup_error}")
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /start"""
# Добавляем пользователя в статистику
chat_id = update.message.chat_id
username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
add_user(chat_id, username, first_name)
await update.message.reply_text(
"👋 Привет! Я бот для скачивания видео.\n\n"
"Просто отправь мне ссылку на видео, и я скачаю его для тебя.\n\n"
"Поддерживаемые источники:\n"
"• YouTube (youtube.com, youtu.be)\n"
"• Instagram (instagram.com)\n"
"• VK (vk.com)\n"
"• Yapfiles (yapfiles.ru)\n\n"
"👥 Работа в группах:\n"
"Добавь меня в группу и дай права администратора (нужно право на удаление сообщений). "
"После этого я буду автоматически находить ссылки на видео в сообщениях участников, "
"скачивать их и отправлять прямо в группу, заменяя исходное сообщение со ссылкой.\n\n"
"Команды:\n"
"/start - Начать работу\n"
"/stat - Статистика скачанных видео\n\n"
"Отправь ссылку на видео:"
)
async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /stat"""
total_downloads = get_total_downloads()
total_users = get_total_users()
await update.message.reply_text(
f"📊 Статистика бота:\n\n"
f"👥 Всего пользователей: {total_users}\n"
f"📹 Всего скачано видео: {total_downloads}"
)
def main():
"""Главная функция для запуска бота"""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN не установлен!")
return
# Инициализируем базу данных
init_database()
# Очищаем .part файлы при старте
logger.info("Очистка .part файлов при старте...")
cleanup_old_files() # Удаляем только недокачанные .part файлы
# Создаем приложение
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Регистрируем обработчики
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("stat", stat_command))
# Запускаем периодическую очистку файлов
async def post_init(application: Application):
"""Выполняется после инициализации приложения"""
# Запускаем периодическую очистку .part файлов (каждые 6 часов)
async def periodic_cleanup():
while True:
await asyncio.sleep(6 * 3600) # 6 часов
cleanup_old_files()
logger.info("Периодическая очистка .part файлов выполнена")
asyncio.create_task(periodic_cleanup())
logger.info("Фоновая задача периодической очистки файлов запущена")
application.post_init = post_init
# Запускаем бота
logger.info("Бот запущен")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()