findFilms/telegram_bot.py

625 lines
27 KiB
Python
Raw Normal View History

Подключили Telegram бот с полной функциональностью - Создан полнофункциональный Telegram бот для поиска фильмов и торрентов - Бот дублирует всю функциональность веб-интерфейса - Реализован поиск фильмов через TMDB API с постерами - Добавлен поиск торрентов на всех трекерах (RuTracker, Kinozal, RuTor, NoNameClub) - Автоматическое добавление торрентов в qBittorrent - Интерактивные кнопки для выбора фильмов и торрентов - Обработка ошибок и пользовательских состояний - Docker контейнеризация с правильной сетевой конфигурацией - Увеличен таймаут HTTP запросов до 60 секунд - Добавлена документация и скрипты запуска Команды бота: - /start, /help - справка - /find - начать поиск фильма Файлы: - telegram_bot.py - основной код бота - run_telegram_bot.py - скрипт запуска - Dockerfile.telegram - Docker образ для бота - docker-compose.yml - обновлен с сервисом бота - requirements.txt - добавлена зависимость python-telegram-bot - README.md - обновлена документация - PROJECT_SUMMARY.md - полная сводка проекта
2025-10-09 12:39:19 +03:00
#!/usr/bin/env python3
"""
Telegram Bot для поиска и загрузки фильмов через торренты
Дублирует функциональность веб-интерфейса в Telegram
"""
import os
import asyncio
import httpx
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes
from telegram.constants import ParseMode
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Конфигурация
TELEGRAM_BOT_TOKEN = "7662650066:AAFgsfYJNYgpcSHaSe6fspsjqmhMkOBT1s4"
TMDB_API_KEY = os.getenv("TMDB_API_KEY", "6d58225585fb77af5945a964de41849f")
TMDB_BASE_URL = "https://api.themoviedb.org/3"
TORRENT_SEARCH_URL = os.getenv("TORRENT_SEARCH_URL", "http://localhost:8443")
TORRENT_ADD_URL = os.getenv("TORRENT_ADD_URL", "http://localhost:8444")
QBITTORRENT_HOST = os.getenv("QBITTORRENT_HOST", "localhost")
QBITTORRENT_PORT = os.getenv("QBITTORRENT_PORT", "8080")
QBITTORRENT_USERNAME = os.getenv("QBITTORRENT_USERNAME", "admin")
QBITTORRENT_PASSWORD = os.getenv("QBITTORRENT_PASSWORD", "vrubel07")
@dataclass
class Movie:
"""Структура данных для фильма"""
id: int
title: str
original_title: str
overview: str
release_date: str
vote_average: float
poster_path: str
backdrop_path: str
genre_ids: List[int]
@dataclass
class Torrent:
"""Структура данных для торрента"""
id: str
title: str
size_bytes: int
size_readable: str
resolution: str
quality: str
seeds: int
peers: int
magnet: str
provider: str
class MovieSearchBot:
"""Основной класс Telegram бота"""
def __init__(self):
self.application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
self.user_states = {} # Состояния пользователей
self.setup_handlers()
def setup_handlers(self):
"""Настройка обработчиков команд"""
# Команды
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("find", self.find_command))
# Обработчики сообщений
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# Обработчики callback запросов
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /start"""
user = update.effective_user
welcome_text = f"""
🎬 <b>Добро пожаловать в Movie Search Bot!</b>
Привет, {user.first_name}! 👋
Этот бот поможет вам найти и скачать фильмы через торренты.
<b>Доступные команды:</b>
/find - Найти фильм
/help - Помощь
<b>Как использовать:</b>
1. Нажмите /find или введите название фильма
2. Выберите нужный фильм из результатов
3. Выберите торрент для скачивания
4. Фильм автоматически добавится в qBittorrent
Начнем поиск? 🚀
"""
await update.message.reply_text(
welcome_text,
parse_mode=ParseMode.HTML
)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /help"""
help_text = """
📖 <b>Справка по использованию бота</b>
<b>Основные команды:</b>
/find - Начать поиск фильма
/help - Показать эту справку
/start - Перезапустить бота
<b>Пошаговая инструкция:</b>
1 <b>Поиск фильма</b>
Нажмите /find или просто введите название фильма
Бот найдет фильмы через TMDB API
2 <b>Выбор фильма</b>
Выберите нужный фильм из списка
Бот покажет постер и информацию о фильме
3 <b>Поиск торрентов</b>
Бот автоматически найдет доступные торренты
Результаты будут отсортированы по качеству и количеству сидов
4 <b>Скачивание</b>
Выберите нужный торрент
Он автоматически добавится в qBittorrent
Вы получите уведомление о начале загрузки
<b>Поддерживаемые трекеры:</b>
RuTracker
Kinozal
RuTor
NoNameClub
<b>Проблемы?</b>
Если что-то не работает, попробуйте команду /start
"""
await update.message.reply_text(
help_text,
parse_mode=ParseMode.HTML
)
async def find_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик команды /find"""
user_id = update.effective_user.id
# Устанавливаем состояние ожидания названия фильма
self.user_states[user_id] = "waiting_movie_title"
await update.message.reply_text(
"🔍 <b>Поиск фильма</b>\n\nВведите название фильма, который хотите найти:",
parse_mode=ParseMode.HTML
)
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик текстовых сообщений"""
user_id = update.effective_user.id
text = update.message.text
# Проверяем состояние пользователя
if user_id not in self.user_states:
# Если пользователь не в состоянии, но отправил текст, считаем это поиском
await self.search_movies(update, context, text)
return
state = self.user_states[user_id]
if state == "waiting_movie_title":
await self.search_movies(update, context, text)
else:
# Неизвестное состояние
await update.message.reply_text(
"❌ Неизвестная команда. Используйте /find для поиска фильма или /help для справки."
)
async def search_movies(self, update: Update, context: ContextTypes.DEFAULT_TYPE, query: str):
"""Поиск фильмов через TMDB API"""
user_id = update.effective_user.id
try:
# Показываем индикатор загрузки
loading_msg = await update.message.reply_text("🔍 Ищу фильмы...")
# Поиск через TMDB API
movies = await self.tmdb_search_movies(query)
if not movies:
await loading_msg.edit_text("❌ Фильмы не найдены. Попробуйте другое название.")
return
# Ограничиваем количество результатов
movies = movies[:10]
# Создаем клавиатуру с результатами
keyboard = []
for i, movie in enumerate(movies):
year = movie.release_date[:4] if movie.release_date else "N/A"
button_text = f"{movie.title} ({year})"
if len(button_text) > 50:
button_text = button_text[:47] + "..."
keyboard.append([InlineKeyboardButton(
button_text,
callback_data=f"movie_{movie.id}"
)])
# Добавляем кнопку отмены
keyboard.append([InlineKeyboardButton("❌ Отмена", callback_data="cancel")])
reply_markup = InlineKeyboardMarkup(keyboard)
# Отправляем результаты
results_text = f"🎬 <b>Найдено фильмов: {len(movies)}</b>\n\nВыберите нужный фильм:"
await loading_msg.edit_text(
results_text,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
# Сохраняем результаты в контексте
context.user_data['search_results'] = movies
self.user_states[user_id] = "movie_selected"
except Exception as e:
logger.error(f"Error searching movies: {e}")
await update.message.reply_text(
f"❌ Ошибка при поиске фильмов: {str(e)}\n\nПопробуйте еще раз или используйте /help"
)
async def tmdb_search_movies(self, query: str) -> List[Movie]:
"""Поиск фильмов через TMDB API"""
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.get(
f"{TMDB_BASE_URL}/search/movie",
params={
"api_key": TMDB_API_KEY,
"query": query,
"language": "ru-RU",
"include_adult": False
}
)
response.raise_for_status()
data = response.json()
movies = []
for movie_data in data.get("results", []):
movie = Movie(
id=movie_data["id"],
title=movie_data.get("title", ""),
original_title=movie_data.get("original_title", ""),
overview=movie_data.get("overview", ""),
release_date=movie_data.get("release_date", ""),
vote_average=movie_data.get("vote_average", 0.0),
poster_path=movie_data.get("poster_path", ""),
backdrop_path=movie_data.get("backdrop_path", ""),
genre_ids=movie_data.get("genre_ids", [])
)
movies.append(movie)
return movies
except Exception as e:
logger.error(f"TMDB API error: {e}")
return []
async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обработчик callback запросов"""
query = update.callback_query
await query.answer()
data = query.data
user_id = update.effective_user.id
if data == "cancel":
await self.cancel_operation(update, context)
return
if data.startswith("movie_"):
movie_id = int(data.split("_")[1])
await self.show_movie_details(update, context, movie_id)
elif data.startswith("torrent_"):
torrent_id = data.split("_")[1]
await self.add_torrent_to_client(update, context, torrent_id)
elif data == "search_torrents":
await self.search_torrents_for_movie(update, context)
elif data == "new_search":
await self.start_new_search(update, context)
async def cancel_operation(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Отмена операции"""
user_id = update.effective_user.id
self.user_states[user_id] = None
await update.callback_query.edit_message_text(
"❌ Операция отменена.\n\nИспользуйте /find для нового поиска."
)
async def start_new_search(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Начать новый поиск"""
user_id = update.effective_user.id
self.user_states[user_id] = "waiting_movie_title"
# Очищаем данные пользователя
context.user_data.clear()
await update.callback_query.edit_message_text(
"🔍 <b>Новый поиск фильма</b>\n\nВведите название фильма, который хотите найти:",
parse_mode=ParseMode.HTML
)
async def show_movie_details(self, update: Update, context: ContextTypes.DEFAULT_TYPE, movie_id: int):
"""Показать детали фильма"""
try:
# Находим фильм в сохраненных результатах
movies = context.user_data.get('search_results', [])
movie = next((m for m in movies if m.id == movie_id), None)
if not movie:
await update.callback_query.edit_message_text("❌ Фильм не найден.")
return
# Получаем детальную информацию о фильме
movie_details = await self.get_movie_details(movie_id)
if movie_details:
movie = movie_details
# Формируем текст сообщения
year = movie.release_date[:4] if movie.release_date else "N/A"
rating = f"{movie.vote_average:.1f}/10" if movie.vote_average > 0 else "⭐ N/A"
text = f"""
🎬 <b>{movie.title}</b>
📅 <b>Год:</b> {year}
{rating}
📝 <b>Описание:</b>
{movie.overview[:500]}{'...' if len(movie.overview) > 500 else ''}
"""
# Создаем клавиатуру
keyboard = [
[InlineKeyboardButton("🔍 Найти торренты", callback_data="search_torrents")],
[InlineKeyboardButton("❌ Отмена", callback_data="cancel")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# Сохраняем выбранный фильм
context.user_data['selected_movie'] = movie
# Отправляем сообщение с постером
if movie.poster_path:
poster_url = f"https://image.tmdb.org/t/p/w500{movie.poster_path}"
try:
await update.callback_query.edit_message_media(
InputMediaPhoto(
media=poster_url,
caption=text,
parse_mode=ParseMode.HTML
),
reply_markup=reply_markup
)
except:
# Если не удалось отправить с постером, отправляем текст
await update.callback_query.edit_message_text(
text,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
else:
await update.callback_query.edit_message_text(
text,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
except Exception as e:
logger.error(f"Error showing movie details: {e}")
await update.callback_query.edit_message_text(
f"❌ Ошибка при получении информации о фильме: {str(e)}"
)
async def get_movie_details(self, movie_id: int) -> Optional[Movie]:
"""Получение детальной информации о фильме"""
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.get(
f"{TMDB_BASE_URL}/movie/{movie_id}",
params={
"api_key": TMDB_API_KEY,
"language": "ru-RU",
"append_to_response": "external_ids"
}
)
response.raise_for_status()
data = response.json()
return Movie(
id=data["id"],
title=data.get("title", ""),
original_title=data.get("original_title", ""),
overview=data.get("overview", ""),
release_date=data.get("release_date", ""),
vote_average=data.get("vote_average", 0.0),
poster_path=data.get("poster_path", ""),
backdrop_path=data.get("backdrop_path", ""),
genre_ids=data.get("genre_ids", [])
)
except Exception as e:
logger.error(f"Error getting movie details: {e}")
return None
async def search_torrents_for_movie(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Поиск торрентов для выбранного фильма"""
try:
movie = context.user_data.get('selected_movie')
if not movie:
await update.callback_query.edit_message_text("❌ Фильм не выбран.")
return
# Показываем индикатор загрузки
try:
await update.callback_query.edit_message_text("🔍 Ищу торренты...")
except:
# Если не можем отредактировать (например, сообщение с изображением), отправляем новое
await update.callback_query.message.reply_text("🔍 Ищу торренты...")
# Поиск торрентов
torrents = await self.search_torrents(movie)
if not torrents:
try:
await update.callback_query.edit_message_text(
f"❌ Торренты для фильма '{movie.title}' не найдены.\n\nПопробуйте другой фильм."
)
except:
await update.callback_query.message.reply_text(
f"❌ Торренты для фильма '{movie.title}' не найдены.\n\nПопробуйте другой фильм."
)
return
# Ограничиваем количество результатов
torrents = torrents[:15]
# Создаем клавиатуру с торрентами
keyboard = []
for i, torrent in enumerate(torrents):
# Формируем текст кнопки
button_text = f"{torrent.quality} {torrent.resolution} - {torrent.size_readable}"
if torrent.seeds > 0:
button_text += f" (👥 {torrent.seeds})"
if len(button_text) > 50:
button_text = button_text[:47] + "..."
keyboard.append([InlineKeyboardButton(
button_text,
callback_data=f"torrent_{torrent.id}"
)])
# Добавляем кнопку отмены
keyboard.append([InlineKeyboardButton("❌ Отмена", callback_data="cancel")])
reply_markup = InlineKeyboardMarkup(keyboard)
# Формируем текст с результатами
text = f"""
🎬 <b>{movie.title}</b>
🔍 <b>Найдено торрентов: {len(torrents)}</b>
Выберите торрент для скачивания:
"""
try:
await update.callback_query.edit_message_text(
text,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
except:
# Если не можем отредактировать, отправляем новое сообщение
await update.callback_query.message.reply_text(
text,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
# Сохраняем торренты в контексте
context.user_data['torrents'] = torrents
except Exception as e:
logger.error(f"Error searching torrents: {e}")
try:
await update.callback_query.edit_message_text(
f"❌ Ошибка при поиске торрентов: {str(e)}"
)
except:
await update.callback_query.message.reply_text(
f"❌ Ошибка при поиске торрентов: {str(e)}"
)
async def search_torrents(self, movie: Movie) -> List[Torrent]:
"""Поиск торрентов для фильма"""
try:
logger.info(f"Searching torrents for movie: {movie.title}")
# Используем существующий API endpoint
async with httpx.AsyncClient(timeout=60.0) as client:
# URL-кодируем название фильма
import urllib.parse
encoded_title = urllib.parse.quote(movie.title)
url = f"http://movie-search:8000/api/torrents/{encoded_title}"
params = {
"year": movie.release_date[:4] if movie.release_date else None,
"original_title": movie.original_title if movie.original_title != movie.title else None
}
logger.info(f"Making request to: {url} with params: {params}")
response = await client.get(url, params=params)
logger.info(f"Response status: {response.status_code}")
if response.status_code == 200:
data = response.json()
torrents_data = data.get("torrents", [])
logger.info(f"Found {len(torrents_data)} torrents")
torrents = []
for torrent_data in torrents_data:
torrent = Torrent(
id=torrent_data.get("id", ""),
title=torrent_data.get("title", ""),
size_bytes=torrent_data.get("size_bytes", 0),
size_readable=torrent_data.get("size_readable", ""),
resolution=torrent_data.get("resolution", ""),
quality=torrent_data.get("quality", ""),
seeds=torrent_data.get("seeds", 0),
peers=torrent_data.get("peers", 0),
magnet=torrent_data.get("magnet", ""),
provider=torrent_data.get("provider", "")
)
torrents.append(torrent)
logger.info(f"Processed {len(torrents)} torrents")
return torrents
else:
logger.error(f"Torrent search API error: {response.status_code} - {response.text}")
return []
except Exception as e:
logger.error(f"Error searching torrents: {e}")
import traceback
traceback.print_exc()
return []
async def add_torrent_to_client(self, update: Update, context: ContextTypes.DEFAULT_TYPE, torrent_id: str):
"""Добавление торрента в qBittorrent"""
try:
# Показываем индикатор загрузки
await update.callback_query.edit_message_text("⬇️ Добавляю торрент в qBittorrent...")
# Используем существующий API endpoint
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
"http://movie-search:8000/api/add-torrent",
data={"torrent_id": torrent_id}
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
message = f"{data.get('message', 'Торрент успешно добавлен!')}"
else:
message = f"{data.get('message', 'Ошибка при добавлении торрента')}"
else:
message = f"❌ Ошибка API: {response.status_code}"
# Создаем клавиатуру для возврата к поиску
keyboard = [
[InlineKeyboardButton("🔍 Найти другой фильм", callback_data="new_search")],
[InlineKeyboardButton("❌ Закрыть", callback_data="cancel")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.callback_query.edit_message_text(
message,
reply_markup=reply_markup
)
except Exception as e:
logger.error(f"Error adding torrent: {e}")
await update.callback_query.edit_message_text(
f"❌ Ошибка при добавлении торрента: {str(e)}"
)
def run(self):
"""Запуск бота"""
logger.info("Starting Movie Search Bot...")
self.application.run_polling()
def main():
"""Главная функция"""
bot = MovieSearchBot()
bot.run()
if __name__ == "__main__":
main()