Рефакторинг: микросервисная архитектура

- Разделение на микросервисы: youtube-downloader, instagram-downloader, vk-downloader
- Основной бот в корне проекта, работает через HTTP API с сервисами
- Каждый сервис запускается отдельно в своей папке
- Видео сохраняются в папке video/ и не удаляются
- Обновлена документация и архитектура
- Скрипты для Instagram cookies перенесены в instagram-downloader/
This commit is contained in:
vrubelroman 2025-12-11 01:07:04 +03:00
parent 8024eea868
commit 436e0cd541
41 changed files with 1348 additions and 693 deletions

550
bot.py
View file

@ -1,17 +1,13 @@
import os
import re
import json
import logging
import asyncio
import sqlite3
import time
import subprocess
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import yt_dlp
import httpx
from telegram import Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler
@ -26,7 +22,10 @@ logger = logging.getLogger(__name__)
# Токен бота и имя бота из переменных окружения
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot')
# URL VK сервиса для скачивания видео
# URL сервисов для скачивания видео
YOUTUBE_DOWNLOADER_URL = os.getenv('YOUTUBE_DOWNLOADER_URL', 'http://localhost:5557')
INSTAGRAM_DOWNLOADER_URL = os.getenv('INSTAGRAM_DOWNLOADER_URL', 'http://localhost:5556')
VK_DOWNLOADER_URL = os.getenv('VK_DOWNLOADER_URL', 'http://localhost:5555')
# Базовая директория проекта (абсолютный путь), чтобы не зависеть от рабочей директории процесса
@ -41,9 +40,6 @@ DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_FILE = DATA_DIR / 'bot.db'
# ThreadPoolExecutor для выполнения блокирующих операций (скачивание видео)
# Позволяет обрабатывать несколько запросов параллельно
DOWNLOAD_EXECUTOR = ThreadPoolExecutor(max_workers=5, thread_name_prefix="download")
def init_database():
"""Инициализирует базу данных и создает таблицы если их нет"""
@ -174,458 +170,144 @@ def extract_urls_from_text(text: str) -> list[str]:
return urls
def cleanup_old_files(max_age_hours: int = 24):
"""Удаляет старые файлы и .part файлы из папки загрузок"""
def cleanup_old_files():
"""Удаляет только .part файлы (недокачанные) из папки загрузок"""
try:
current_time = time.time()
max_age_seconds = max_age_hours * 3600
for file_path in DOWNLOADS_DIR.glob('*'):
if not file_path.is_file():
continue
# Удаляем все .part файлы (недокачанные)
# Удаляем только .part файлы (недокачанные)
if file_path.suffix == '.part':
try:
file_path.unlink()
logger.info(f"Удален .part файл: {file_path.name}")
except Exception as e:
logger.warning(f"Не удалось удалить .part файл {file_path.name}: {e}")
continue
# Удаляем старые файлы (старше max_age_hours)
try:
file_age = current_time - file_path.stat().st_mtime
if file_age > max_age_seconds:
file_path.unlink()
logger.info(f"Удален старый файл: {file_path.name} (возраст: {file_age/3600:.1f} часов)")
except Exception as e:
logger.warning(f"Не удалось проверить/удалить файл {file_path.name}: {e}")
except Exception as e:
logger.error(f"Ошибка при очистке старых файлов: {e}")
def _safe_filename(title: str, chat_id: int) -> str:
"""Создает безопасное имя файла"""
safe_title = re.sub(r'[<>:"/\\|?*]', '', title)[:100]
return str(DOWNLOADS_DIR / f'{chat_id}_{safe_title}.%(ext)s')
logger.error(f"Ошибка при очистке .part файлов: {e}")
async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с YouTube"""
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
"""Скачивает видео с YouTube через внешний сервис"""
logger.info(f"YouTube: отправка запроса на внешний сервис {YOUTUBE_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
# Получаем информацию о видео
ydl_opts_info = {
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
# Получаем информацию о видео в executor (неблокирующе)
def extract_info_sync():
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
return ydl.extract_info(url, download=False)
loop = asyncio.get_event_loop()
info = await loop.run_in_executor(DOWNLOAD_EXECUTOR, extract_info_sync)
video_title = info.get('title', 'video')
logger.info(f"YouTube: получена информация о видео: {video_title}")
# Скачиваем видео
ydl_opts_download = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': _safe_filename(video_title, chat_id),
'quiet': False,
'no_warnings': False,
'user_agent': user_agent,
'socket_timeout': 30,
'extractor_args': {
'youtube': {
'player_client': ['android', 'web'],
'player_skip': ['webpage'],
},
},
'http_headers': {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
},
}
logger.info(f"YouTube: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
# Скачиваем в executor (неблокирующе)
def download_sync():
with yt_dlp.YoutubeDL(ydl_opts_download) as ydl:
ydl.download([url])
await loop.run_in_executor(DOWNLOAD_EXECUTOR, download_sync)
# Находим скачанный файл (тоже в executor для консистентности)
def find_downloaded_file():
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
return None
video_path = await loop.run_in_executor(DOWNLOAD_EXECUTOR, find_downloaded_file)
if video_path:
return video_path
else:
raise Exception("Файл не был найден после скачивания")
async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для YouTube
# Отправляем запрос на YouTube сервис
response = await client.post(
f"{YOUTUBE_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"YouTube сервис вернул ошибку {response.status_code}: {error_text}")
# Сохраняем видео во временный файл
video_data = response.content
video_ext = 'mp4' # По умолчанию mp4
# Пробуем определить расширение из заголовков
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
# Получаем имя файла из заголовка или создаем случайное
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_youtube_video.{video_ext}'
# Сохраняем файл
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"YouTube: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к YouTube сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"YouTube: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"YouTube: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube")
raise last_error or Exception("Неизвестная ошибка при скачивании с YouTube через внешний сервис")
async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3) -> str:
"""Скачивает видео с Instagram - используем cookies с правильными заголовками"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
# Проверяем срок действия cookies перед использованием
if cookies_file_path.exists():
is_valid, days_left = check_instagram_cookies_expiry()
if not is_valid:
logger.error("Instagram cookies истекли! Необходимо обновить cookies.")
raise Exception("Instagram cookies истекли. Пожалуйста, обновите cookies в файле instagram_cookies.txt")
elif days_left < 7:
logger.warning(f"Instagram cookies истекают через {days_left} дней. Рекомендуется обновить.")
# Парсим cookies для получения csrf token (формат Netscape)
csrf_token = None
sessionid = None
if cookies_file_path.exists():
try:
with open(cookies_file_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
if len(parts) >= 7:
domain = parts[0]
# Ищем только cookies от instagram.com
if 'instagram' in domain.lower():
cookie_name = parts[5] # Имя cookie
cookie_value = parts[6] # Значение cookie
if cookie_name == 'csrftoken':
csrf_token = cookie_value
elif cookie_name == 'sessionid':
sessionid = cookie_value
# Если нашли оба - можно выходить
if csrf_token and sessionid:
break
except Exception as e:
logger.warning(f"Не удалось прочитать cookies: {e}")
"""Скачивает видео с Instagram через внешний сервис"""
logger.info(f"Instagram: отправка запроса на внешний сервис {INSTAGRAM_DOWNLOADER_URL}")
last_error = None
for attempt in range(max_retries):
try:
# Базовые настройки
ydl_opts = {
'format': 'best',
'outtmpl': str(DOWNLOADS_DIR / f'{chat_id}_%(title)s.%(ext)s'),
'quiet': False,
'no_warnings': False,
'socket_timeout': 30,
}
# Если есть файл с cookies, используем его
if cookies_file_path.exists():
# Используем абсолютный путь к cookies
ydl_opts['cookiefile'] = str(cookies_file_path.absolute())
logger.info(f"Instagram: используем cookies из {cookies_file_path}")
async with httpx.AsyncClient(timeout=600.0) as client: # Увеличенный таймаут для Instagram
# Отправляем запрос на Instagram сервис
response = await client.post(
f"{INSTAGRAM_DOWNLOADER_URL}/download/stream",
json={"url": url},
headers={"Content-Type": "application/json"}
)
# Добавляем заголовки с csrf token если есть
headers = {
'Referer': 'https://www.instagram.com/',
'X-Requested-With': 'XMLHttpRequest',
}
if csrf_token:
headers['X-CSRFToken'] = csrf_token
logger.info(f"Instagram: добавлен csrf token в заголовки")
if sessionid:
logger.info(f"Instagram: sessionid найден (длина: {len(sessionid)})")
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get('error', error_text)
except:
pass
raise Exception(f"Instagram сервис вернул ошибку {response.status_code}: {error_text}")
ydl_opts['http_headers'] = headers
logger.info(f"Instagram: начинаем скачивание (попытка {attempt + 1}/{max_retries})")
# Скачиваем в executor (неблокирующе)
def download_sync():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
loop = asyncio.get_event_loop()
await loop.run_in_executor(DOWNLOAD_EXECUTOR, download_sync)
# Находим скачанный файл (тоже в executor для консистентности)
def find_downloaded_file():
downloaded_files = list(DOWNLOADS_DIR.glob(f'{chat_id}_*'))
if downloaded_files:
downloaded_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return str(downloaded_files[0])
return None
video_path = await loop.run_in_executor(DOWNLOAD_EXECUTOR, find_downloaded_file)
if video_path:
return video_path
else:
raise Exception("Файл не был найден после скачивания")
# Сохраняем видео во временный файл
video_data = response.content
video_ext = 'mp4' # По умолчанию mp4
# Пробуем определить расширение из заголовков
content_type = response.headers.get('Content-Type', '')
if 'video/' in content_type:
video_ext = content_type.split('/')[-1].split(';')[0]
# Получаем имя файла из заголовка или создаем случайное
filename = response.headers.get('Content-Disposition', '')
if filename and 'filename=' in filename:
video_filename = filename.split('filename=')[1].strip('"\'')
else:
video_filename = f'{chat_id}_instagram_video.{video_ext}'
# Сохраняем файл
video_path = DOWNLOADS_DIR / video_filename
with open(video_path, 'wb') as f:
f.write(video_data)
logger.info(f"Instagram: видео скачано через внешний сервис: {video_path}")
return str(video_path)
except httpx.TimeoutException:
last_error = Exception(f"Таймаут при запросе к Instagram сервису (попытка {attempt + 1}/{max_retries})")
logger.warning(f"Instagram: таймаут при запросе к сервису: {last_error}")
except Exception as e:
last_error = e
logger.warning(f"Instagram: попытка {attempt + 1}/{max_retries} не удалась: {e}")
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram. Возможно, нужно обновить cookies.")
async def update_instagram_cookies_from_browser(browser: str = 'chrome') -> bool:
"""
Автоматически обновляет Instagram cookies из браузера
Returns: True если успешно, False если ошибка
"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
try:
logger.info(f"Попытка обновления Instagram cookies из браузера {browser}...")
# Пробуем обновить cookies из браузера
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: subprocess.run(
[
'yt-dlp',
'--cookies-from-browser', browser,
'--cookies', str(cookies_file_path.absolute()),
'--no-download',
'https://www.instagram.com/'
],
capture_output=True,
timeout=30,
text=True
)
)
if result.returncode == 0:
logger.info(f"✅ Instagram cookies успешно обновлены из браузера {browser}")
return True
else:
logger.warning(f"Не удалось обновить cookies из {browser}: {result.stderr[:200]}")
return False
except subprocess.TimeoutExpired:
logger.warning(f"Таймаут при обновлении cookies из {browser}")
return False
except FileNotFoundError:
logger.warning(f"yt-dlp не найден для обновления cookies")
return False
except Exception as e:
logger.error(f"Ошибка при обновлении cookies из браузера: {e}")
return False
def check_instagram_cookies_expiry() -> tuple[bool, int]:
"""
Проверяет срок действия Instagram cookies
Returns: (is_valid, days_until_expiry)
"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
if attempt < max_retries - 1:
await asyncio.sleep((attempt + 1) * 2)
if not cookies_file_path.exists():
return False, 0
try:
current_time = time.time()
valid_expiries = []
# Важные cookies для Instagram (проверяем их в первую очередь)
important_cookies = ['sessionid', 'csrftoken', 'ds_user_id']
with open(cookies_file_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('\t')
if len(parts) >= 7:
domain = parts[0]
if 'instagram' in domain.lower():
try:
expiry = int(parts[4]) # Unix timestamp
cookie_name = parts[5] if len(parts) > 5 else ''
# Игнорируем невалидные expiry (0, отрицательные, или слишком старые)
# Session cookies (expiry = 0) также игнорируем для проверки срока
if expiry > 0 and expiry > 946684800: # Фильтр: после 2000-01-01 (избегаем epoch 0)
# Для важных cookies проверяем строже
if cookie_name in important_cookies:
if expiry > current_time:
valid_expiries.append(expiry)
else:
valid_expiries.append(expiry)
except (ValueError, IndexError):
continue
if not valid_expiries:
logger.warning("Не найдено валидных Instagram cookies с нормальным сроком действия")
# Если нет валидных expiry, но есть cookies - считаем их действительными
# (возможно, это session cookies)
return True, 30 # Возвращаем разумное значение по умолчанию
# Берем минимальный валидный expiry
min_expiry = min(valid_expiries)
days_until_expiry = (min_expiry - current_time) / 86400
is_valid = min_expiry > current_time
return is_valid, int(days_until_expiry)
except Exception as e:
logger.error(f"Ошибка при проверке срока действия cookies: {e}")
# В случае ошибки считаем cookies действительными (не блокируем работу)
return True, 30
async def keep_instagram_session_alive():
"""Поддерживает сессию Instagram активной через периодические запросы и автоматически обновляет cookies"""
cookies_file = os.getenv('INSTAGRAM_COOKIES_FILE', 'instagram_cookies.txt')
cookies_file_path = Path(cookies_file)
AUTO_UPDATE_DAYS_BEFORE_EXPIRY = int(os.getenv('INSTAGRAM_AUTO_UPDATE_DAYS', '3')) # Обновлять за 3 дня до истечения
if not cookies_file_path.exists():
logger.info("Instagram cookies не найдены, пропускаем поддержание сессии")
return
# Список браузеров для попытки обновления (по приоритету)
browsers_to_try = ['chrome', 'firefox', 'edge', 'opera']
# Проверяем cookies при старте
is_valid, days_left = check_instagram_cookies_expiry()
if not is_valid:
logger.warning("Instagram cookies истекли! Пытаемся автоматически обновить...")
# Пытаемся обновить из браузера
updated = False
for browser in browsers_to_try:
if await update_instagram_cookies_from_browser(browser):
updated = True
break
if not updated:
logger.error("Не удалось автоматически обновить cookies! Необходимо обновить вручную.")
return
else:
# Перепроверяем после обновления
is_valid, days_left = check_instagram_cookies_expiry()
if days_left < AUTO_UPDATE_DAYS_BEFORE_EXPIRY:
logger.warning(f"Instagram cookies истекают через {days_left} дней! Пытаемся автоматически обновить...")
# Пытаемся обновить заранее
for browser in browsers_to_try:
if await update_instagram_cookies_from_browser(browser):
# Перепроверяем
_, days_left = check_instagram_cookies_expiry()
logger.info(f"Cookies обновлены! Новый срок: {days_left} дней")
break
else:
logger.info(f"Instagram cookies действительны еще {days_left} дней")
# Интервал проверки: 24 часа (86400 секунд)
check_interval = 86400
while True:
try:
await asyncio.sleep(check_interval)
# Проверяем срок действия перед каждым запросом
is_valid, days_left = check_instagram_cookies_expiry()
# Автоматическое обновление за N дней до истечения
if days_left < AUTO_UPDATE_DAYS_BEFORE_EXPIRY and days_left > 0:
logger.warning(f"Instagram cookies истекают через {days_left} дней. Автоматическое обновление...")
updated = False
for browser in browsers_to_try:
if await update_instagram_cookies_from_browser(browser):
updated = True
_, days_left = check_instagram_cookies_expiry()
logger.info(f"✅ Cookies обновлены автоматически! Новый срок: {days_left} дней")
break
if not updated:
logger.warning("Не удалось автоматически обновить cookies. Попробуйте обновить вручную.")
if not is_valid:
logger.error("Instagram cookies истекли! Пытаемся автоматически обновить...")
updated = False
for browser in browsers_to_try:
if await update_instagram_cookies_from_browser(browser):
updated = True
is_valid, days_left = check_instagram_cookies_expiry()
break
if not updated:
logger.error("Не удалось автоматически обновить cookies! Остановка поддержания сессии.")
break
# Делаем легкий запрос к Instagram для поддержания активности
logger.info("Поддерживаем активность сессии Instagram...")
try:
ydl_opts = {
'cookiefile': str(cookies_file_path.absolute()),
'quiet': True,
'no_warnings': True,
'socket_timeout': 10,
}
def extract_info_sync():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info('https://www.instagram.com/', download=False)
loop = asyncio.get_event_loop()
await loop.run_in_executor(DOWNLOAD_EXECUTOR, extract_info_sync)
logger.info(f"Сессия Instagram успешно обновлена. Cookies действительны еще {days_left} дней")
except Exception as e:
logger.warning(f"Не удалось обновить сессию Instagram: {e}")
# Продолжаем работу, попробуем в следующий раз
except asyncio.CancelledError:
logger.info("Поддержание сессии Instagram остановлено")
break
except Exception as e:
logger.error(f"Ошибка в задаче поддержания сессии Instagram: {e}")
# Ждем перед следующей попыткой
await asyncio.sleep(3600) # 1 час
raise last_error or Exception("Неизвестная ошибка при скачивании с Instagram через внешний сервис")
async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str:
@ -771,12 +453,8 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Увеличиваем счетчик скачанных видео
increment_downloads()
# Удаляем временный файл
try:
os.remove(video_path)
logger.info(f"Удален временный файл: {video_path}")
except Exception as e:
logger.warning(f"Не удалось удалить файл {video_path}: {e}")
# Сохраняем видео в папку video (не удаляем)
logger.info(f"Видео сохранено: {video_path}")
# Удаляем статусное сообщение и исходное сообщение со ссылкой
try:
@ -853,9 +531,9 @@ def main():
# Инициализируем базу данных
init_database()
# Очищаем старые файлы при старте
logger.info("Очистка старых файлов при старте...")
cleanup_old_files(max_age_hours=1) # Удаляем файлы старше 1 часа
# Очищаем .part файлы при старте
logger.info("Очистка .part файлов при старте...")
cleanup_old_files() # Удаляем только недокачанные .part файлы
# Создаем приложение
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
@ -865,19 +543,15 @@ def main():
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("stat", stat_command))
# Запускаем фоновую задачу для поддержания сессии Instagram
# Запускаем периодическую очистку файлов
async def post_init(application: Application):
"""Выполняется после инициализации приложения"""
# Запускаем задачу поддержания сессии Instagram в фоне
asyncio.create_task(keep_instagram_session_alive())
logger.info("Фоновая задача поддержания сессии Instagram запущена")
# Запускаем периодическую очистку файлов (каждые 6 часов)
# Запускаем периодическую очистку .part файлов (каждые 6 часов)
async def periodic_cleanup():
while True:
await asyncio.sleep(6 * 3600) # 6 часов
cleanup_old_files(max_age_hours=1)
logger.info("Периодическая очистка старых файлов выполнена")
cleanup_old_files()
logger.info("Периодическая очистка .part файлов выполнена")
asyncio.create_task(periodic_cleanup())
logger.info("Фоновая задача периодической очистки файлов запущена")