Добавлена система очередей для обработки загрузки видео, улучшена обработка ошибок и добавлены новые текстовые сообщения для пользователей. Обновлены таймауты HTTP-запросов для поддержки больших файлов. Обновлены конфигурации Docker для всех загрузчиков с использованием Gunicorn.

This commit is contained in:
vrubelroman 2025-12-12 15:41:46 +03:00
parent e441f53760
commit 76ce3feecc
11 changed files with 237 additions and 58 deletions

271
bot.py
View file

@ -9,8 +9,15 @@ from urllib.parse import urlparse
from datetime import datetime from datetime import datetime
import httpx import httpx
from telegram import Update from telegram import Update, Message
from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler from telegram.ext import Application, MessageHandler, filters, ContextTypes, CommandHandler, Defaults
from telegram.request import HTTPXRequest
from dataclasses import dataclass
from typing import Optional
# Таймаут для HTTP запросов
# Все таймауты убраны - видео может качаться и отправляться очень долго
HTTP_TIMEOUT = httpx.Timeout(connect=None, read=None, write=None, pool=None)
# Настройка логирования # Настройка логирования
logging.basicConfig( logging.basicConfig(
@ -103,6 +110,9 @@ TEXTS = {
'caption': "Видео скачано с @{bot_username}", 'caption': "Видео скачано с @{bot_username}",
'error': "❌ Произошла ошибка при обработке видео:\n{error}", 'error': "❌ Произошла ошибка при обработке видео:\n{error}",
'error_unknown_source': "Пардон, не умеем работать с этим источником", 'error_unknown_source': "Пардон, не умеем работать с этим источником",
'error_file_too_large': "❌ Видео слишком большое ({size_mb:.1f} МБ)\n\nTelegram Bot API позволяет отправлять файлы до 50 МБ.\n\nПопробуйте выбрать видео покороче или в меньшем качестве.",
'queue_position': "🕐 Ваше видео #{position} в очереди\nВаш запрос очень важен для нас!",
'queue_first': "⬇️ Скачиваю видео...",
}, },
'en': { 'en': {
'start': ( 'start': (
@ -160,6 +170,9 @@ TEXTS = {
'caption': "Video downloaded via @{bot_username}", 'caption': "Video downloaded via @{bot_username}",
'error': "❌ Error processing video:\n{error}", 'error': "❌ Error processing video:\n{error}",
'error_unknown_source': "Sorry, this source is not supported", 'error_unknown_source': "Sorry, this source is not supported",
'error_file_too_large': "❌ Video is too large ({size_mb:.1f} MB)\n\nTelegram Bot API allows files up to 50 MB.\n\nTry a shorter video or lower quality.",
'queue_position': "🕐 Your video is #{position} in queue\nYour request is very important to us!",
'queue_first': "⬇️ Downloading video...",
} }
} }
@ -363,6 +376,162 @@ def cleanup_old_files():
logger.error(f"Ошибка при очистке .part файлов: {e}") logger.error(f"Ошибка при очистке .part файлов: {e}")
# ============================================================================
# СИСТЕМА ОЧЕРЕДЕЙ
# ============================================================================
@dataclass
class QueueItem:
"""Элемент очереди для скачивания видео"""
original_message: Message
status_message: Message
url: str
chat_id: int
chat_type: str
locale: str
# Глобальная очередь и список элементов для отслеживания позиций
download_queue: asyncio.Queue = None
queue_items: list[QueueItem] = []
queue_lock = asyncio.Lock()
async def update_queue_positions():
"""Обновляет статусы позиций в очереди для всех ожидающих"""
async with queue_lock:
for i, item in enumerate(queue_items):
position = i + 1
try:
if position == 1:
# Первый в очереди - сейчас качается
await item.status_message.edit_text(get_text(item.locale, 'queue_first'))
else:
# Остальные в очереди
await item.status_message.edit_text(
get_text(item.locale, 'queue_position', position=position)
)
except Exception as e:
logger.warning(f"Не удалось обновить статус очереди: {e}")
async def process_queue_item(item: QueueItem):
"""Обрабатывает один элемент очереди"""
try:
# Скачиваем видео
video_path = await download_video(item.url, item.chat_id, item.locale)
# Проверяем размер файла (лимит Telegram Bot API - 50 МБ)
file_size = Path(video_path).stat().st_size
max_size = 50 * 1024 * 1024 # 50 MB
if file_size > max_size:
size_mb = file_size / (1024 * 1024)
error_msg = get_text(item.locale, 'error_file_too_large', size_mb=size_mb)
await item.status_message.edit_text(error_msg)
# Удаляем слишком большой файл
try:
Path(video_path).unlink()
logger.info(f"Удалён слишком большой файл: {video_path} ({size_mb:.1f} MB)")
except:
pass
return
# Отправляем файл пользователю
await item.status_message.edit_text(get_text(item.locale, 'sending'))
video_file = open(video_path, 'rb')
caption = get_text(item.locale, 'caption', bot_username=TELEGRAM_BOT_USERNAME)
await item.original_message.reply_video(
video=video_file,
caption=caption,
supports_streaming=True
)
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
logger.info(f"Видео сохранено: {video_path}")
# Удаляем статусное сообщение и исходное сообщение со ссылкой
try:
await item.status_message.delete()
await item.original_message.delete()
logger.info(f"Удалено сообщение пользователя с ссылкой (chat_id: {item.chat_id})")
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
except Exception as e:
logger.error(f"Ошибка при обработке {item.url}: {e}")
error_msg = get_text(item.locale, 'error', error=str(e))
try:
await item.status_message.edit_text(error_msg)
except:
try:
await item.original_message.reply_text(error_msg)
except:
pass
try:
for part_file in DOWNLOADS_DIR.glob(f'{item.chat_id}_*.part'):
part_file.unlink()
logger.info(f"Удален .part файл после ошибки: {part_file.name}")
except Exception as cleanup_error:
logger.warning(f"Не удалось удалить .part файлы после ошибки: {cleanup_error}")
async def queue_worker():
"""Воркер, обрабатывающий очередь последовательно"""
global download_queue, queue_items
logger.info("Воркер очереди запущен")
while True:
try:
# Ждём элемент из очереди
item = await download_queue.get()
logger.info(f"Начинаю обработку: {item.url} (в очереди: {len(queue_items)})")
# Обновляем статусы - первый теперь качается
await update_queue_positions()
# Обрабатываем элемент
await process_queue_item(item)
# Удаляем из списка отслеживания
async with queue_lock:
if item in queue_items:
queue_items.remove(item)
# Обновляем позиции оставшихся
await update_queue_positions()
# Сообщаем что задача выполнена
download_queue.task_done()
logger.info(f"Обработка завершена. Осталось в очереди: {len(queue_items)}")
except Exception as e:
logger.error(f"Ошибка в воркере очереди: {e}")
async def add_to_queue(item: QueueItem) -> int:
"""Добавляет элемент в очередь и возвращает позицию"""
global queue_items
async with queue_lock:
queue_items.append(item)
position = len(queue_items)
await download_queue.put(item)
logger.info(f"Добавлено в очередь: {item.url}, позиция: {position}")
return position
# ============================================================================ # ============================================================================
# ФУНКЦИИ СКАЧИВАНИЯ # ФУНКЦИИ СКАЧИВАНИЯ
# ============================================================================ # ============================================================================
@ -374,7 +543,7 @@ async def download_youtube_video(url: str, chat_id: int, max_retries: int = 3) -
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
async with httpx.AsyncClient(timeout=600.0) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post( response = await client.post(
f"{YOUTUBE_DOWNLOADER_URL}/download/stream", f"{YOUTUBE_DOWNLOADER_URL}/download/stream",
json={"url": url}, json={"url": url},
@ -430,7 +599,7 @@ async def download_instagram_video(url: str, chat_id: int, max_retries: int = 3)
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
async with httpx.AsyncClient(timeout=600.0) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post( response = await client.post(
f"{INSTAGRAM_DOWNLOADER_URL}/download/stream", f"{INSTAGRAM_DOWNLOADER_URL}/download/stream",
json={"url": url}, json={"url": url},
@ -486,7 +655,7 @@ async def download_vk_video(url: str, chat_id: int, max_retries: int = 3) -> str
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
async with httpx.AsyncClient(timeout=600.0) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post( response = await client.post(
f"{VK_DOWNLOADER_URL}/download/stream", f"{VK_DOWNLOADER_URL}/download/stream",
json={"url": url}, json={"url": url},
@ -542,7 +711,7 @@ async def download_yapfiles_video(url: str, chat_id: int, max_retries: int = 3)
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
async with httpx.AsyncClient(timeout=600.0) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post( response = await client.post(
f"{YAPFILES_DOWNLOADER_URL}/download/stream", f"{YAPFILES_DOWNLOADER_URL}/download/stream",
json={"url": url}, json={"url": url},
@ -598,7 +767,7 @@ async def download_tiktok_video(url: str, chat_id: int, max_retries: int = 3) ->
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
async with httpx.AsyncClient(timeout=600.0) as client: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.post( response = await client.post(
f"{TIKTOK_DOWNLOADER_URL}/download/stream", f"{TIKTOK_DOWNLOADER_URL}/download/stream",
json={"url": url}, json={"url": url},
@ -713,50 +882,28 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Отправляем сообщение о начале обработки # Отправляем сообщение о начале обработки
status_message = await update.message.reply_text(get_text(locale, 'processing')) status_message = await update.message.reply_text(get_text(locale, 'processing'))
try: # Создаём элемент очереди
# Скачиваем видео item = QueueItem(
await status_message.edit_text(get_text(locale, 'downloading')) original_message=update.message,
video_path = await download_video(url, chat_id, locale) status_message=status_message,
url=url,
chat_id=chat_id,
chat_type=chat_type,
locale=locale
)
# Отправляем файл пользователю # Добавляем в очередь
await status_message.edit_text(get_text(locale, 'sending')) position = await add_to_queue(item)
video_file = open(video_path, 'rb') # Показываем позицию в очереди
caption = get_text(locale, 'caption', bot_username=TELEGRAM_BOT_USERNAME) if position == 1:
await update.message.reply_video( # Первый - сразу начинаем качать
video=video_file, await status_message.edit_text(get_text(locale, 'queue_first'))
caption=caption, else:
supports_streaming=True # В очереди - показываем позицию
await status_message.edit_text(
get_text(locale, 'queue_position', position=position)
) )
video_file.close()
# Увеличиваем счетчик скачанных видео
increment_downloads()
logger.info(f"Видео сохранено: {video_path}")
# Удаляем статусное сообщение и исходное сообщение со ссылкой
try:
await status_message.delete()
await update.message.delete()
logger.info(f"Удалено сообщение пользователя с ссылкой (chat_id: {chat_id}, тип чата: {chat_type})")
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
except Exception as e:
logger.error(f"Ошибка: {e}")
error_msg = get_text(locale, 'error', error=str(e))
try:
await status_message.edit_text(error_msg)
except:
await update.message.reply_text(error_msg)
try:
for part_file in DOWNLOADS_DIR.glob(f'{chat_id}_*.part'):
part_file.unlink()
logger.info(f"Удален .part файл после ошибки: {part_file.name}")
except Exception as cleanup_error:
logger.warning(f"Не удалось удалить .part файлы после ошибки: {cleanup_error}")
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
@ -813,8 +960,20 @@ def main():
logger.info("Очистка .part файлов при старте...") logger.info("Очистка .part файлов при старте...")
cleanup_old_files() cleanup_old_files()
# Создаем приложение # Создаем приложение с максимальными таймаутами для больших файлов
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() request = HTTPXRequest(
read_timeout=3600, # 1 час на получение ответа
write_timeout=3600, # 1 час на отправку видео
connect_timeout=300, # 5 минут на соединение
pool_timeout=300 # 5 минут на получение соединения из пула
)
application = (
Application.builder()
.token(TELEGRAM_BOT_TOKEN)
.request(request)
.get_updates_request(HTTPXRequest(read_timeout=120, connect_timeout=60))
.build()
)
# Регистрируем обработчики # Регистрируем обработчики
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
@ -822,9 +981,19 @@ def main():
application.add_handler(CommandHandler("stat", stat_command)) application.add_handler(CommandHandler("stat", stat_command))
application.add_handler(CommandHandler("support", support_command)) application.add_handler(CommandHandler("support", support_command))
# Запускаем периодическую очистку файлов # Инициализация очереди и запуск воркера
async def post_init(application: Application): async def post_init(application: Application):
"""Выполняется после инициализации приложения""" """Выполняется после инициализации приложения"""
global download_queue
# Инициализируем очередь
download_queue = asyncio.Queue()
# Запускаем воркер очереди
asyncio.create_task(queue_worker())
logger.info("Воркер очереди запущен")
# Запускаем периодическую очистку .part файлов (каждые 6 часов)
async def periodic_cleanup(): async def periodic_cleanup():
while True: while True:
await asyncio.sleep(6 * 3600) await asyncio.sleep(6 * 3600)

View file

@ -20,5 +20,6 @@ RUN mkdir -p downloads
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
CMD ["python", "app.py"] # Gunicorn: 1 worker (последовательная обработка), без таймаута
CMD ["gunicorn", "--workers=1", "--timeout=0", "--bind=0.0.0.0:5000", "app:app"]

View file

@ -2,3 +2,4 @@ Flask==3.0.0
flask-cors==4.0.0 flask-cors==4.0.0
yt-dlp>=2024.12.13 yt-dlp>=2024.12.13
gunicorn==21.2.0

View file

@ -18,5 +18,6 @@ COPY app.py .
RUN mkdir -p downloads RUN mkdir -p downloads
# Запуск приложения # Запуск приложения
CMD ["python", "app.py"] # Gunicorn: 1 worker (последовательная обработка), без таймаута
CMD ["gunicorn", "--workers=1", "--timeout=0", "--bind=0.0.0.0:5000", "app:app"]

View file

@ -3,3 +3,4 @@ flask-cors==4.0.0
yt-dlp>=2024.12.13 yt-dlp>=2024.12.13
requests==2.31.0 requests==2.31.0
gunicorn==21.2.0

View file

@ -20,5 +20,6 @@ RUN mkdir -p downloads
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
CMD ["python", "app.py"] # Gunicorn: 1 worker (последовательная обработка), без таймаута
CMD ["gunicorn", "--workers=1", "--timeout=0", "--bind=0.0.0.0:5000", "app:app"]

View file

@ -3,3 +3,4 @@ flask-cors==4.0.0
yt-dlp>=2024.12.13 yt-dlp>=2024.12.13
requests==2.31.0 requests==2.31.0
gunicorn==21.2.0

View file

@ -13,5 +13,6 @@ COPY app.py .
RUN mkdir -p downloads RUN mkdir -p downloads
# Запуск приложения # Запуск приложения
CMD ["python", "app.py"] # Gunicorn: 1 worker (последовательная обработка), без таймаута
CMD ["gunicorn", "--workers=1", "--timeout=0", "--bind=0.0.0.0:5000", "app:app"]

View file

@ -3,3 +3,4 @@ flask-cors==4.0.0
requests==2.31.0 requests==2.31.0
beautifulsoup4==4.12.2 beautifulsoup4==4.12.2
gunicorn==21.2.0

View file

@ -20,5 +20,6 @@ RUN mkdir -p downloads
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
CMD ["python", "app.py"] # Gunicorn: 1 worker (последовательная обработка), без таймаута
CMD ["gunicorn", "--workers=1", "--timeout=0", "--bind=0.0.0.0:5000", "app:app"]

View file

@ -1,4 +1,5 @@
Flask==3.0.0 Flask==3.0.0
flask-cors==4.0.0 flask-cors==4.0.0
yt-dlp>=2024.12.13 yt-dlp>=2024.12.13
gunicorn==21.2.0