Initial commit: Torrent search and download application

This commit is contained in:
vrubelroman 2025-10-05 22:20:49 +03:00
commit e38be704ff
4313 changed files with 791544 additions and 0 deletions

669
app.py Normal file
View file

@ -0,0 +1,669 @@
import os
import re
import httpx
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Movie Search API", version="1.0.0")
# Настройка CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # В продакшене лучше указать конкретные домены
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Настройка шаблонов
templates = Jinja2Templates(directory="templates")
# API ключ TMDB
TMDB_API_KEY = os.getenv("TMDB_API_KEY", "6d58225585fb77af5945a964de41849f")
TMDB_BASE_URL = "https://api.themoviedb.org/3"
# URL torAPI
TORAPI_URL = os.getenv("TORAPI_URL", "http://localhost:8088")
# URL Torrent Search API
TORRENT_SEARCH_URL = os.getenv("TORRENT_SEARCH_URL", "http://localhost:8443")
TORRENT_ADD_URL = os.getenv("TORRENT_ADD_URL", "http://localhost:8444")
async def search_movies(query: str) -> dict:
"""Поиск фильмов через TMDB API"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{TMDB_BASE_URL}/search/movie",
params={
"api_key": TMDB_API_KEY,
"query": query,
"language": "ru-RU",
"include_adult": False
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise HTTPException(status_code=500, detail=f"TMDB API error: {str(e)}")
async def get_movie_details(movie_id: int) -> dict:
"""Получение детальной информации о фильме из TMDB"""
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{TMDB_BASE_URL}/movie/{movie_id}",
params={
"api_key": TMDB_API_KEY,
"language": "ru-RU",
"append_to_response": "external_ids"
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise HTTPException(status_code=500, detail=f"TMDB API error: {str(e)}")
def parse_size(size_str: str) -> tuple:
"""Парсинг размера файла и возврат в байтах и читаемом виде"""
if not size_str:
return 0, "Неизвестно"
size_str = size_str.upper().strip()
multipliers = {
'KB': 1024,
'MB': 1024**2,
'GB': 1024**3,
'TB': 1024**4
}
for unit, multiplier in multipliers.items():
if unit in size_str:
try:
number = float(re.findall(r'[\d.]+', size_str)[0])
bytes_size = int(number * multiplier)
return bytes_size, size_str
except (ValueError, IndexError):
pass
return 0, size_str
def extract_resolution(title: str) -> str:
"""Извлечение разрешения из названия торрента"""
title_upper = title.upper()
# Поиск разрешений
resolutions = ['2160P', '4K', '1080P', '720P', '480P', '360P']
for res in resolutions:
if res in title_upper:
if res == '4K':
return '2160p'
return res.lower()
# Поиск по паттернам
if re.search(r'\b(2160|4k)\b', title_upper):
return '2160p'
elif re.search(r'\b1080\b', title_upper):
return '1080p'
elif re.search(r'\b720\b', title_upper):
return '720p'
elif re.search(r'\b480\b', title_upper):
return '480p'
return 'Неизвестно'
def normalize_search_term(term: str) -> str:
"""Нормализация поискового термина для сцены"""
if not term:
return ""
# Убираем артикли
term = re.sub(r'\b(the|a|an)\b', '', term, flags=re.IGNORECASE)
# Заменяем пробелы на точки и дефисы
term = re.sub(r'\s+', '.', term.strip())
# Убираем лишние точки
term = re.sub(r'\.+', '.', term)
# Убираем точки в начале и конце
term = term.strip('.')
return term
def generate_search_variants(title: str, original_title: str = None, year: str = None) -> list:
"""Генерация вариантов названий для поиска"""
variants = []
# Базовые варианты
variants.extend([title, original_title] if original_title else [title])
# Варианты с годом
if year:
variants.extend([f"{title} {year}", f"{original_title} {year}"] if original_title else [f"{title} {year}"])
# Нормализованные варианты
normalized_variants = [normalize_search_term(v) for v in variants if v]
variants.extend(normalized_variants)
# Специальные варианты для известных фильмов
title_lower = title.lower()
if 'терминатор' in title_lower or 'terminator' in title_lower:
variants.extend(['T2', 'T2.Judgment.Day', 'T2.Judgement.Day'])
if '2' in title_lower or 'второй' in title_lower:
variants.extend(['Terminator.2', 'Терминатор.2'])
# Убираем дубликаты и пустые значения
return list(set([v for v in variants if v and v.strip()]))
def score_torrent(torrent: dict, movie_title: str, original_title: str = None, year: str = None) -> float:
"""Скоринг торрента по релевантности"""
score = 0.0
torrent_name = torrent.get('title', '').lower()
movie_title_lower = movie_title.lower()
original_title_lower = original_title.lower() if original_title else ""
# Скоринг по названию (0-1)
if movie_title_lower in torrent_name:
score += 0.8
if original_title_lower and original_title_lower in torrent_name:
score += 0.9
# Скоринг по году (0-0.5)
if year and year in torrent_name:
score += 0.5
# Скоринг по качеству (0-0.3)
quality = torrent.get('quality', '').lower()
if 'bluray' in quality or 'remux' in quality:
score += 0.3
elif 'web-dl' in quality or 'webdl' in quality:
score += 0.2
elif 'hdtv' in quality:
score += 0.1
# Скоринг по разрешению (0-0.2)
resolution = torrent.get('resolution', '').lower()
if '2160p' in resolution or '4k' in resolution:
score += 0.2
elif '1080p' in resolution:
score += 0.15
elif '720p' in resolution:
score += 0.1
# Скоринг по сидам (0-0.1)
seeds = torrent.get('seeds', 0)
if seeds > 100:
score += 0.1
elif seeds > 50:
score += 0.05
return min(score, 1.0) # Ограничиваем максимум 1.0
def generate_demo_torrents(movie_title: str, year: str = None, original_title: str = None) -> list:
"""Генерация демо-данных для тестирования (когда API не работает)"""
torrents = []
# Создаем реалистичные варианты торрентов
quality_variants = [
{"quality": "BluRay", "resolution": "1080p", "size": "8.5 GB", "seeds": 150, "peers": 25},
{"quality": "WEB-DL", "resolution": "1080p", "size": "6.2 GB", "seeds": 120, "peers": 20},
{"quality": "BluRay", "resolution": "2160p", "size": "25.3 GB", "seeds": 200, "peers": 35},
{"quality": "WEBRip", "resolution": "720p", "size": "3.8 GB", "seeds": 80, "peers": 15},
{"quality": "HDTV", "resolution": "720p", "size": "2.1 GB", "seeds": 50, "peers": 8}
]
# Генерируем варианты названий
title_variants = [movie_title]
if original_title and original_title != movie_title:
title_variants.append(original_title)
for i, variant in enumerate(quality_variants):
# Выбираем случайное название
base_title = title_variants[i % len(title_variants)]
# Создаем реалистичное название торрента
torrent_title = f"{base_title} ({year or '2020'}) - {variant['resolution']} - {variant['quality']}"
# Создаем fake magnet hash
fake_hash = f"08ada5a7a6183aae1e09d831df6748d566095a10{i:02d}"
torrent = {
"title": torrent_title,
"size_bytes": parse_size_to_bytes(variant['size']),
"size_readable": variant['size'],
"resolution": variant['resolution'],
"quality": variant['quality'],
"seeds": variant['seeds'],
"peers": variant['peers'],
"magnet": f"magnet:?xt=urn:btih:{fake_hash}&dn={base_title.replace(' ', '+')}&tr=udp://tracker.openbittorrent.com:80",
"download_url": f"https://example.com/torrent/{fake_hash}/",
"category": "Фильмы",
"date": "2025-01-05",
"provider": "Demo",
"relevance_score": 0.9 - (i * 0.1) # Убывающий скор
}
torrents.append(torrent)
return torrents
async def search_torrents(movie_title: str, year: str = None, original_title: str = None, genres: list = None, imdb_id: str = None) -> list:
"""Поиск торрентов для фильма через Torrent Search API"""
torrents = []
try:
# Генерируем варианты поисковых запросов
search_queries = generate_search_variants(movie_title, original_title, year)
print(f"Generated search variants: {search_queries}")
# Добавляем IMDB ID если есть
if imdb_id:
search_queries.append(imdb_id)
# Получаем список доступных провайдеров
async with httpx.AsyncClient() as client:
providers_response = await client.get(f"{TORRENT_SEARCH_URL}/api/provider/list")
if providers_response.status_code != 200:
print(f"Failed to get providers: {providers_response.status_code}")
return torrents
providers = providers_response.json()
print(f"Available providers: {[p['Provider'] for p in providers]}")
# Ищем торренты на всех доступных провайдерах
for provider in providers:
provider_name = provider['Provider'].lower()
try:
# Пробуем каждый вариант поискового запроса
for search_query in search_queries:
print(f"Searching '{search_query}' on {provider_name}")
# Поиск на конкретном провайдере - исправленный эндпоинт
search_response = await client.get(
f"{TORRENT_SEARCH_URL}/api/search/title/{provider_name}",
params={"query": search_query}
)
if search_response.status_code == 200:
results = search_response.json()
if not isinstance(results, list):
print(f"Unexpected response format from {provider_name}: {type(results)}")
continue
print(f"Found {len(results)} results for '{search_query}' on {provider_name}")
# Обрабатываем все результаты и скорим их
for result in results[:20]: # Берем больше результатов для скоринга
print(f"Processing torrent: {result['Name'][:100]}...")
torrent = parse_torrent_result(result, movie_title, year)
if torrent:
# Скорим торрент
score = score_torrent(torrent, movie_title, original_title, year)
torrent['relevance_score'] = score
size_mb = torrent.get('size_bytes', 0) / (1024 * 1024)
print(f"Torrent score: {score:.2f}, size: {size_mb:.1f}MB, seeds: {torrent.get('seeds', 0)} - {result['Name'][:100]}...")
# Добавляем только если скор больше 0.1 и размер больше 100MB
if score > 0.1 and torrent.get('size_bytes', 0) > 100 * 1024 * 1024:
torrents.append(torrent)
print(f" ✅ Added to results")
else:
print(f" ❌ Filtered out (score: {score:.2f}, size: {size_mb:.1f}MB)")
# Если нашли достаточно результатов, переходим к следующему провайдеру
if len(torrents) >= 20:
break
except Exception as e:
print(f"Error searching on {provider_name}: {e}")
continue
# Сортируем по количеству сидов и ограничиваем общее количество
torrents.sort(key=lambda x: x.get('seeds', 0), reverse=True)
return torrents[:30] # Возвращаем топ-30 результатов
except Exception as e:
print(f"Error searching torrents: {e}")
import traceback
traceback.print_exc()
return torrents
async def search_torrent_by_id(torrent_id: str) -> dict:
"""Поиск торрента по ID через Torrent Search API"""
try:
async with httpx.AsyncClient() as client:
print(f"Searching torrent by ID: {torrent_id}")
# Используем правильный эндпоинт для поиска по ID
response = await client.get(
f"{TORRENT_SEARCH_URL}/api/search/id/rutracker",
params={"query": torrent_id}
)
if response.status_code == 200:
results = response.json()
if results and len(results) > 0:
# Берем первый результат
result = results[0]
print(f"Found torrent by ID: {result.get('Name', 'Unknown')[:100]}...")
# Получаем magnet-ссылку и добавляем публичные трекеры
magnet = result.get('Magnet', '')
if magnet and not magnet.startswith('magnet:'):
# Если нет magnet-ссылки, генерируем из хэша
hash_value = result.get('Hash', '')
if hash_value:
magnet = f"magnet:?xt=urn:btih:{hash_value}"
# Создаем чистую magnet-ссылку только с хэшем для использования DHT
if magnet and magnet.startswith('magnet:'):
# Извлекаем только хэш из magnet-ссылки
import re
hash_match = re.search(r'xt=urn:btih:([A-F0-9]+)', magnet)
if hash_match:
hash_value = hash_match.group(1)
# Создаем чистую magnet-ссылку только с хэшем (DHT будет искать пиров)
magnet = f"magnet:?xt=urn:btih:{hash_value}"
# Парсим результат в стандартный формат
torrent = {
"title": result.get('Name', ''),
"url": result.get('Url', ''),
"hash": result.get('Hash', ''),
"magnet": magnet,
"torrent_url": result.get('Torrent', ''),
"imdb_url": result.get('IMDb_link', ''),
"kinopoisk_url": result.get('Kinopoisk_link', ''),
"poster": result.get('Poster', ''),
"year": result.get('Year', ''),
"release": result.get('Release', ''),
"type": result.get('Type', ''),
"duration": result.get('Duration', ''),
"audio": result.get('Audio', ''),
"director": result.get('Directer', ''),
"actors": result.get('Actors', ''),
"description": result.get('Description', ''),
"quality": result.get('Quality', ''),
"video": result.get('Video', ''),
"files": result.get('Files', []),
"provider": "rutracker",
"id": torrent_id
}
return torrent
else:
print(f"No results found for ID: {torrent_id}")
return None
else:
print(f"Error searching by ID: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error searching torrent by ID: {e}")
import traceback
traceback.print_exc()
return None
def is_movie_torrent(torrent_name: str, movie_title: str, original_title: str = None) -> bool:
"""Проверяет, является ли торрент фильмом"""
torrent_name_lower = torrent_name.lower()
movie_title_lower = movie_title.lower()
original_title_lower = original_title.lower() if original_title else ""
# Исключаем только явно не-фильмы
exclude_keywords = ['игра', 'game', 'музыка', 'music', 'программа', 'program', 'софт', 'software']
for keyword in exclude_keywords:
if keyword in torrent_name_lower:
print(f"Excluding due to keyword '{keyword}': {torrent_name[:100]}...")
return False
# Проверяем, что название содержит название фильма (русское или оригинальное)
contains_title = movie_title_lower in torrent_name_lower
if original_title_lower and not contains_title:
contains_title = original_title_lower in torrent_name_lower
print(f"Title '{movie_title_lower}' or '{original_title_lower}' in torrent: {contains_title}")
return contains_title
def parse_torrent_result(result: dict, movie_title: str, year: str = None) -> dict:
"""Парсит результат поиска торрентов"""
try:
# Извлекаем разрешение из названия
resolution = extract_resolution(result['Name'])
# Извлекаем качество из названия
quality = extract_quality(result['Name'])
# Парсим размер
size_bytes = parse_size_to_bytes(result['Size'])
# Создаем magnet-ссылку (используем поле Magnet если есть, иначе генерируем из хэша)
magnet = None
if 'Magnet' in result and result['Magnet']:
magnet = result['Magnet']
elif 'Hash' in result and result['Hash']:
# Генерируем magnet-ссылку из хэша
hash_value = result['Hash']
magnet = f"magnet:?xt=urn:btih:{hash_value}"
elif 'Torrent' in result and result['Torrent']:
# Используем URL на .torrent файл как fallback
magnet = result['Torrent']
# Определяем провайдера по URL
provider = 'Unknown'
torrent_url = result.get('Torrent', '')
if 'rutracker.org' in torrent_url:
provider = 'rutracker'
elif 'kinozal.tv' in torrent_url:
provider = 'kinozal'
elif 'rutor.info' in torrent_url:
provider = 'rutor'
elif 'nnmclub.to' in torrent_url:
provider = 'nonameclub'
return {
"title": result['Name'],
"size_bytes": size_bytes,
"size_readable": result['Size'],
"resolution": resolution,
"quality": quality,
"seeds": int(result.get('Seeds', 0)),
"peers": int(result.get('Peers', 0)),
"magnet": magnet,
"download_url": result.get('Torrent', ''),
"category": result.get('Category', ''),
"date": result.get('Date', ''),
"provider": provider,
"id": result.get('Id', '')
}
except Exception as e:
print(f"Error parsing torrent result: {e}")
return None
def extract_quality(title: str) -> str:
"""Извлекает качество из названия торрента"""
title_upper = title.upper()
if 'BLURAY' in title_upper or 'BDRIP' in title_upper:
return 'BluRay'
elif 'WEB-DL' in title_upper or 'WEBDL' in title_upper:
return 'WEB-DL'
elif 'WEBRIP' in title_upper or 'WEB-RIP' in title_upper:
return 'WEBRip'
elif 'HDTV' in title_upper:
return 'HDTV'
elif 'DVDRIP' in title_upper or 'DVD-RIP' in title_upper:
return 'DVDRip'
else:
return 'Unknown'
def parse_size_to_bytes(size_str: str) -> int:
"""Конвертирует строку размера в байты"""
size_str = size_str.upper().strip()
multipliers = {
'KB': 1024,
'MB': 1024**2,
'GB': 1024**3,
'TB': 1024**4
}
for unit, multiplier in multipliers.items():
if unit in size_str:
try:
number = float(re.findall(r'[\d.]+', size_str)[0])
return int(number * multiplier)
except (ValueError, IndexError):
pass
return 0
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Главная страница с формой поиска"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/search", response_class=HTMLResponse)
async def search(request: Request, movie_title: str = Form(...)):
"""Обработка поиска фильмов"""
try:
search_results = await search_movies(movie_title)
return templates.TemplateResponse(
"results.html",
{
"request": request,
"query": movie_title,
"movies": search_results.get("results", []),
"total_results": search_results.get("total_results", 0)
}
)
except HTTPException as e:
return templates.TemplateResponse(
"error.html",
{
"request": request,
"error": str(e.detail)
}
)
@app.get("/api/search/{movie_title}")
async def api_search(movie_title: str):
"""API endpoint для поиска фильмов (возвращает JSON)"""
try:
search_results = await search_movies(movie_title)
return search_results
except HTTPException as e:
raise e
@app.get("/api/torrents/{movie_title}")
async def api_search_torrents(movie_title: str, year: str = None, original_title: str = None, genres: str = None, imdb_id: str = None):
"""API endpoint для поиска торрентов фильма"""
try:
# Парсим жанры из строки
genres_list = None
if genres:
genres_list = [g.strip() for g in genres.split(',')]
torrents = await search_torrents(movie_title, year, original_title, genres_list, imdb_id)
return {"torrents": torrents, "movie_title": movie_title, "year": year}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Torrent search error: {str(e)}")
@app.get("/api/torrent/id/{torrent_id}")
async def api_search_torrent_by_id(torrent_id: str):
"""API endpoint для поиска торрента по ID"""
try:
torrent = await search_torrent_by_id(torrent_id)
if torrent:
return {"status": "success", "torrent": torrent}
else:
return {"status": "error", "message": f"Торрент с ID {torrent_id} не найден"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Torrent ID search error: {str(e)}")
@app.get("/torrents/{movie_title}", response_class=HTMLResponse)
async def torrents_page(request: Request, movie_title: str, year: str = None):
"""Страница с результатами поиска торрентов"""
try:
torrents = await search_torrents(movie_title, year)
return templates.TemplateResponse(
"torrents.html",
{
"request": request,
"movie_title": movie_title,
"year": year,
"torrents": torrents
}
)
except Exception as e:
return templates.TemplateResponse(
"error.html",
{
"request": request,
"error": f"Ошибка поиска торрентов: {str(e)}"
}
)
@app.post("/api/add-torrent")
async def add_torrent_to_client(torrent_id: str = Form(...)):
"""Добавление торрента в qBittorrent через прямое API"""
try:
print(f"Attempting to add torrent with ID: {torrent_id}")
# Получаем magnet-ссылку по ID
torrent_info = await search_torrent_by_id(torrent_id)
if not torrent_info:
return {"status": "error", "message": f"Торрент с ID {torrent_id} не найден"}
magnet = torrent_info.get('magnet')
if not magnet:
return {"status": "error", "message": f"Magnet-ссылка не найдена для торрента {torrent_id}"}
print(f"Found magnet link: {magnet[:100]}...")
# Получаем учетные данные из переменных окружения
qb_username = os.getenv("QBITTORRENT_USERNAME", "admin")
qb_password = os.getenv("QBITTORRENT_PASSWORD", "vrubel07")
qb_host = os.getenv("QBITTORRENT_HOST", "172.17.0.1")
qb_port = os.getenv("QBITTORRENT_PORT", "8080")
qb_url = f"http://{qb_host}:{qb_port}"
async with httpx.AsyncClient(timeout=60.0) as client:
# Аутентификация в qBittorrent
auth_response = await client.post(
f"{qb_url}/api/v2/auth/login",
data={"username": qb_username, "password": qb_password}
)
if auth_response.status_code != 200 or auth_response.text.strip() != "Ok.":
return {"status": "error", "message": f"Ошибка аутентификации в qBittorrent: {auth_response.text}"}
print("Successfully authenticated with qBittorrent")
# Добавляем торрент по magnet-ссылке
add_response = await client.post(
f"{qb_url}/api/v2/torrents/add",
data={"urls": magnet}
)
print(f"Add response status: {add_response.status_code}")
print(f"Add response text: {add_response.text}")
if add_response.status_code == 200:
return {"status": "success", "message": f"Торрент '{torrent_info.get('title', 'Unknown')[:50]}...' добавлен в qBittorrent!"}
else:
return {"status": "error", "message": f"Ошибка добавления торрента (HTTP {add_response.status_code}): {add_response.text}"}
except httpx.ConnectError as e:
print(f"Connection error: {e}")
raise HTTPException(status_code=503, detail=f"qBittorrent недоступен по адресу {qb_url}. Убедитесь, что сервис запущен")
except Exception as e:
print(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=f"Ошибка при добавлении торрента: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)