fix(app): исправление скачивания торрентов

- generate_clean_magnet: убраны мёртвые трекеры (coppersurfer.tk, leechers-paradise.org),
  добавлены рабочие (tamersunion.org, exodus.desync.com, moeking.me),
  включено &dn= с URL-кодированием кириллицы
- extract_hash_from_result: новая единая функция извлечения хэша из 5 источников
  (Hash, InfoHash, Magnet, btih: в URL, Id)
- /api/add-torrent: убран ложный success — после Ok. от qBittorrent идёт реальная
  верификация (торрент появился в списке по хэшу или названию). Если не появился — error.
- /api/proxy-torrent-download: новый endpoint для скачивания .torrent файлов
  через NL-прокси (обходит DPI-блокировку)
- torrents.html: кнопка Копировать magnet (Clipboard API + fallback),
  proxy-ссылки для .torrent, disabled-состояния для пустых magnet/torrent_url
- tmdb-proxy: добавлен /proxy-torrent endpoint
- urlencode filter для Jinja2
- test_app.py: 47 тестов на чистые функции
This commit is contained in:
vrubelroman 2026-06-03 19:27:14 +00:00
parent fb2aa5a60a
commit a5497eef26
4 changed files with 598 additions and 130 deletions

View file

@ -4,11 +4,12 @@ import asyncio
import httpx
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse
from fastapi import FastAPI, Request, Form, HTTPException, Query
from fastapi.responses import HTMLResponse, Response
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from urllib.parse import quote
app = FastAPI(title="Movie Search API", version="1.0.0")
@ -24,6 +25,13 @@ app.add_middleware(
# Настройка шаблонов
templates = Jinja2Templates(directory="templates")
# Регистрируем фильтр urlencode для шаблонов
def urlencode_filter(s):
if s:
return quote(s, safe='')
return ''
templates.env.filters['urlencode'] = urlencode_filter
# URL прокси-сервиса TMDB
TMDB_PROXY_URL = os.getenv("TMDB_PROXY_URL", "http://localhost:8001")
@ -418,33 +426,22 @@ async def search_torrent_by_id(torrent_id: str) -> dict:
torrent_name = result.get('Name', '') or result.get('Original_Name', '')
print(f"Found torrent by ID on {provider_name}: {torrent_name[:100]}...")
# Получаем хэш и создаем чистую magnet-ссылку с публичными трекерами
hash_value = result.get('Hash', '')
torrent_title = torrent_name
# Если хэша нет в Hash, пытаемся извлечь из Magnet ссылки
if not hash_value:
original_magnet = result.get('Magnet', '')
if original_magnet:
# Извлекаем хэш из magnet ссылки
hash_match = re.search(r'urn:btih:([a-fA-F0-9]{40}|[a-zA-Z0-9]{32})', original_magnet)
if hash_match:
hash_value = hash_match.group(1)
print(f"Extracted hash from magnet link: {hash_value[:10]}...")
# Получаем хэш — используем единую функцию извлечения
hash_value = extract_hash_from_result(result)
if hash_value:
# Генерируем чистую magnet-ссылку с публичными трекерами
magnet = generate_clean_magnet(hash_value, torrent_title)
print(f"Generated clean magnet with public trackers: {magnet[:100]}...")
magnet = generate_clean_magnet(hash_value, torrent_name)
print(f"Generated clean magnet with hash {hash_value[:10]}...: {magnet[:100]}...")
else:
# Fallback на оригинальную magnet-ссылку если нет хэша
magnet = result.get('Magnet', '')
if not magnet or not magnet.startswith('magnet:'):
print(f"Warning: No hash found and no valid magnet link. Hash: {hash_value}, Magnet: {result.get('Magnet', 'None')[:50]}")
print(f"Warning: No hash found and no valid magnet link. Hash fields: Hash={result.get('Hash', 'None')[:20]}, Magnet={result.get('Magnet', 'None')[:50]}")
magnet = ""
# Пробуем локальный torapi-qbit если хэш пустой или битый
if not hash_value or not re.search(r'urn:btih:([a-fA-F0-9]{40}|[a-zA-Z0-9]{32})', magnet):
if not magnet or not re.search(r'urn:btih:([a-fA-F0-9]{40}|[a-zA-Z0-9]{32})', magnet):
try:
torapi_add_url = os.getenv("TORAPI_ADD_URL", "http://localhost:8444")
fb_resp = await client.get(
@ -456,16 +453,10 @@ async def search_torrent_by_id(torrent_id: str) -> dict:
fb_data = fb_resp.json()
if isinstance(fb_data, list) and len(fb_data) > 0:
fb_result = fb_data[0]
fb_hash = fb_result.get('Hash', '')
if not fb_hash:
import re as re2
fb_magnet = fb_result.get('Magnet', '')
hm2 = re2.search(r'urn:btih:([a-fA-F0-9]{40})', fb_magnet)
if hm2:
fb_hash = hm2.group(1)
fb_hash = extract_hash_from_result(fb_result)
if fb_hash:
magnet = generate_clean_magnet(fb_hash, torrent_name)
print(f"torapi-qbit fallback: got magnet hash {fb_hash[:10]}...")
print(f"torapi-qbit fallback: got hash {fb_hash[:10]}... via extract_hash_from_result")
except Exception as fbe:
print(f"torapi-qbit fallback failed: {fbe}")
@ -689,25 +680,69 @@ def parse_size_to_bytes(size_str: str) -> int:
return 0
def extract_hash_from_result(result: dict) -> str:
"""Извлекает хэш торрента из результата API — пробует все возможные поля"""
hash_value = ""
# 1. Прямое поле Hash
hash_value = result.get('Hash', '') or result.get('hash', '') or ''
if hash_value and re.match(r'^[a-fA-F0-9]{40}$', hash_value):
return hash_value.upper()
# 2. Поле InfoHash
if not hash_value:
hash_value = result.get('InfoHash', '') or result.get('info_hash', '') or ''
if hash_value and re.match(r'^[a-fA-F0-9]{40}$', hash_value):
return hash_value.upper()
# 3. Из Magnet ссылки
magnet = result.get('Magnet', '') or result.get('magnet', '') or ''
if magnet:
hm = re.search(r'urn:btih:([a-fA-F0-9]{40})', magnet)
if hm:
return hm.group(1).upper()
hm32 = re.search(r'urn:btih:([a-zA-Z0-9]{32})', magnet)
if hm32:
return hm32.group(1)
# 4. Из Torrent URL (rutracker: /dl.php?t=XXXXX)
torrent_url = result.get('Torrent', '') or result.get('torrent', '') or result.get('Url', '') or ''
# Некоторые провайдеры кодируют хэш в URL
if 'btih:' in torrent_url:
hm = re.search(r'btih:([a-fA-F0-9]{40})', torrent_url)
if hm:
return hm.group(1).upper()
# 5. Из поля Id/ID (некоторые API возвращают хэш как ID)
tid = result.get('Id') or result.get('id') or result.get('ID') or ''
if tid and re.match(r'^[a-fA-F0-9]{40}$', tid):
return tid.upper()
return hash_value # Может быть пустым
def generate_clean_magnet(hash_value: str, title: str = None) -> str:
"""Генерирует чистую magnet-ссылку с публичными трекерами"""
if not hash_value:
return ""
# Проверенные рабочие трекеры (минимальный набор)
# Рабочие публичные трекеры (2025+)
public_trackers = [
"udp://tracker.opentrackr.org:1337/announce",
"udp://open.stealth.si:80/announce",
"udp://tracker.openbittorrent.com:6969/announce",
"udp://tracker.coppersurfer.tk:6969/announce",
"udp://tracker.leechers-paradise.org:6969/announce"
"https://tracker.tamersunion.org:443/announce",
"udp://exodus.desync.com:6969/announce",
"udp://tracker.moeking.me:6969/announce",
]
# Создаем базовую magnet-ссылку с хэшем
magnet = f"magnet:?xt=urn:btih:{hash_value}"
# НЕ добавляем название файла - это может вызывать проблемы с кириллицей
# DHT сам найдет название по хэшу
# Добавляем название в кодировке RFC 3986 для удобства
if title:
from urllib.parse import quote
magnet += f"&dn={quote(title)}"
# Добавляем публичные трекеры
for tracker in public_trackers:
@ -1135,7 +1170,61 @@ async def torrents_page(request: Request, movie_title: str, year: str = None):
}
)
@app.post("/api/add-torrent")
@app.get("/api/proxy-torrent-download")
async def proxy_torrent_download(url: str = Query(...)):
"""Прокси скачивание .torrent файла через NL-сервер (обходит DPI)"""
try:
proxy_base = os.getenv("TMDB_PROXY_URL", "http://localhost:8001")
print(f"Proxying .torrent download: {url} via {proxy_base}")
async with httpx.AsyncClient(timeout=30.0) as client:
# Пробуем через NL прокси (tmdb-proxy имеет прямой выход в интернет)
try:
proxy_resp = await client.get(
f"{proxy_base}/proxy-torrent",
params={"url": url},
timeout=30.0
)
if proxy_resp.status_code == 200:
content_type = proxy_resp.headers.get("content-type", "application/x-bittorrent")
return Response(
content=proxy_resp.content,
media_type=content_type,
headers={
"Content-Disposition": f'attachment; filename="{os.path.basename(url)}.torrent"',
"Content-Length": str(len(proxy_resp.content))
}
)
print(f"NL proxy returned {proxy_resp.status_code}, trying direct download...")
except Exception as proxy_err:
print(f"NL proxy error: {proxy_err}, trying direct download...")
# Fallback: пытаемся скачать напрямую из контейнера (может работать если DNS не блокируется)
try:
direct_resp = await client.get(url, timeout=15.0)
if direct_resp.status_code == 200 and len(direct_resp.content) > 100:
content_type = direct_resp.headers.get("content-type", "application/x-bittorrent")
return Response(
content=direct_resp.content,
media_type=content_type,
headers={
"Content-Disposition": f'attachment; filename="{os.path.basename(url)}.torrent"',
"Content-Length": str(len(direct_resp.content))
}
)
except Exception as direct_err:
print(f"Direct download failed: {direct_err}")
raise HTTPException(
status_code=502,
detail=f"Не удалось скачать .torrent файл. NL-прокси и прямой доступ недоступны."
)
except HTTPException:
raise
except Exception as e:
print(f"Error in proxy-torrent-download: {e}")
raise HTTPException(status_code=500, detail=f"Ошибка прокси: {str(e)}")
async def add_torrent_to_client(torrent_id: str = Form(...), magnet: str = Form(""), torrent_title: str = Form("")):
"""Добавление торрента в qBittorrent через прямое API"""
try:
@ -1207,27 +1296,42 @@ async def add_torrent_to_client(torrent_id: str = Form(...), magnet: str = Form(
if hash_match:
torrent_hash = hash_match.group(1).upper()
if torrent_hash:
torrents_response = await client.get(f"{qb_url}/api/v2/torrents/info")
if torrents_response.status_code == 200:
torrents = torrents_response.json()
# Ищем торрент по хэшу
for torrent in torrents:
if torrent.get('hash', '').upper() == torrent_hash:
return {
"status": "success",
"message": f"Торрент '{torrent_info.get('title', 'Unknown')[:50]}...' добавлен в qBittorrent через magnet-ссылку!",
"torrent_hash": torrent.get('hash'),
"torrent_name": torrent_info.get('title', 'Unknown')
}
# Реальная верификация: проверяем, что торрент появился в qBittorrent
verified = False
actual_hash = torrent_hash
torrents_response = await client.get(f"{qb_url}/api/v2/torrents/info")
if torrents_response.status_code == 200:
qb_torrents = torrents_response.json()
if torrent_hash:
for qt in qb_torrents:
if qt.get('hash', '').upper() == torrent_hash:
verified = True
actual_hash = qt.get('hash', '')
print(f"Verified: torrent found by hash in qBittorrent")
break
if not verified:
# Пробуем найти по названию
t_title = torrent_info.get('title', '').lower()
for qt in qb_torrents:
qt_name = qt.get('name', '').lower()
if t_title and qt_name and (t_title[:30] in qt_name or qt_name[:30] in t_title):
verified = True
actual_hash = qt.get('hash', '')
print(f"Verified: torrent found by name in qBittorrent: {qt.get('name', '')}")
break
# Если не нашли по хэшу, но ответ был Ok, считаем успешным
return {
"status": "success",
"message": f"Торрент '{torrent_info.get('title', 'Unknown')[:50]}...' добавлен в qBittorrent через magnet-ссылку!",
"torrent_hash": torrent_hash,
"torrent_name": torrent_info.get('title', 'Unknown')
}
if verified:
return {
"status": "success",
"message": f"Торрент '{torrent_info.get('title', 'Unknown')[:50]}...' добавлен в qBittorrent!",
"torrent_hash": actual_hash,
"torrent_name": torrent_info.get('title', 'Unknown')
}
else:
# qBittorrent сказал Ok., но торрент не появился — ложный успех
print(f"WARNING: qBittorrent returned Ok. but torrent was not added (hash={torrent_hash or 'empty'})")
print("Proceeding to .torrent file fallback...")
# Не возвращаем success, а пробуем .torrent fallback ниже
else:
print(f"Magnet link failed (status: {add_response.status_code}, response: {add_response.text}), trying .torrent file...")
else:
@ -1288,13 +1392,12 @@ async def add_torrent_to_client(torrent_id: str = Form(...), magnet: str = Form(
if torrents_response.status_code == 200:
torrents = torrents_response.json()
# Ищем торрент по названию (первые 20 символов для точного совпадения)
# Ищем торрент по названию (первые 30 символов)
added_torrent = None
for torrent in torrents:
torrent_name = torrent.get('name', '').lower()
# Проверяем совпадение по началу названия
if torrent_title and torrent_name:
if torrent_title[:20] in torrent_name or torrent_name[:20] in torrent_title:
if torrent_title[:30] in torrent_name or torrent_name[:30] in torrent_title:
added_torrent = torrent
break
@ -1306,12 +1409,11 @@ async def add_torrent_to_client(torrent_id: str = Form(...), magnet: str = Form(
"torrent_name": added_torrent.get('name', torrent_info.get('title', 'Unknown'))
}
# Если не нашли по названию, но ответ был Ok, считаем успешным
# Торрент не появился — честный error вместо ложного success
print(f"WARNING: qBittorrent returned Ok. for .torrent but torrent was not found in list (title={torrent_title or 'empty'})")
return {
"status": "success",
"message": f"Торрент '{torrent_info.get('title', 'Unknown')[:50]}...' добавлен в qBittorrent через .torrent файл (проверьте список торрентов в qBittorrent)!",
"torrent_hash": torrent_info.get('hash', ''),
"torrent_name": torrent_info.get('title', 'Unknown')
"status": "error",
"message": f"qBittorrent не добавил торрент. Возможно, файл недоступен или DPI-блокировка. Попробуйте magnet-ссылку."
}
else:
error_msg = add_response.text.strip()

View file

@ -18,10 +18,7 @@
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
color: #333;
margin-bottom: 20px;
}
h1 { color: #333; margin-bottom: 20px; }
.back-link {
display: inline-block;
margin-bottom: 20px;
@ -31,20 +28,14 @@
border: 1px solid #007bff;
border-radius: 5px;
}
.back-link:hover {
background-color: #007bff;
color: white;
}
.back-link:hover { background-color: #007bff; color: white; }
.movie-info {
background-color: #e9ecef;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}
.torrents-list {
display: grid;
gap: 15px;
}
.torrents-list { display: grid; gap: 15px; }
.torrent-item {
border: 1px solid #ddd;
border-radius: 8px;
@ -53,9 +44,7 @@
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
transition: box-shadow 0.3s ease;
}
.torrent-item:hover {
box-shadow: 0 4px 10px rgba(0,0,0,0.15);
}
.torrent-item:hover { box-shadow: 0 4px 10px rgba(0,0,0,0.15); }
.torrent-title {
font-size: 18px;
font-weight: bold;
@ -68,30 +57,24 @@
gap: 15px;
margin-bottom: 15px;
}
.detail-item {
display: flex;
flex-direction: column;
}
.detail-item { display: flex; flex-direction: column; }
.detail-label {
font-size: 12px;
color: #666;
text-transform: uppercase;
margin-bottom: 4px;
}
.detail-value {
font-size: 14px;
font-weight: bold;
color: #333;
}
.detail-value { font-size: 14px; font-weight: bold; color: #333; }
.resolution-1080p { color: #28a745; }
.resolution-720p { color: #ffc107; }
.resolution-480p { color: #dc3545; }
.resolution-2160p { color: #6f42c1; }
.torrent-actions {
display: flex;
gap: 10px;
margin-top: 15px;
flex-wrap: wrap;
}
.btn {
padding: 8px 16px;
@ -99,31 +82,25 @@
border-radius: 5px;
cursor: pointer;
text-decoration: none;
display: inline-block;
display: inline-flex;
align-items: center;
gap: 6px;
font-size: 14px;
transition: background-color 0.3s ease;
white-space: nowrap;
}
.btn-primary {
background-color: #007bff;
color: white;
}
.btn-primary:hover {
background-color: #0056b3;
}
.btn-success {
background-color: #28a745;
color: white;
}
.btn-success:hover {
background-color: #1e7e34;
}
.btn-secondary {
background-color: #6c757d;
color: white;
}
.btn-secondary:hover {
background-color: #545b62;
}
.btn-primary { background-color: #007bff; color: white; }
.btn-primary:hover { background-color: #0056b3; }
.btn-success { background-color: #28a745; color: white; }
.btn-success:hover { background-color: #1e7e34; }
.btn-secondary { background-color: #6c757d; color: white; }
.btn-secondary:hover { background-color: #545b62; }
.btn-warning { background-color: #ffc107; color: #333; }
.btn-warning:hover { background-color: #e0a800; }
.btn-info { background-color: #17a2b8; color: white; }
.btn-info:hover { background-color: #138496; }
.btn:disabled { opacity: 0.6; cursor: not-allowed; }
.no-torrents {
text-align: center;
color: #666;
@ -141,69 +118,93 @@
.quality-bluray { background-color: #007bff; color: white; }
.quality-web-dl { background-color: #28a745; color: white; }
.quality-hdtv { background-color: #ffc107; color: #333; }
.toast {
position: fixed;
bottom: 20px;
left: 50%;
transform: translateX(-50%);
background: #333;
color: #fff;
padding: 12px 24px;
border-radius: 8px;
font-size: 14px;
z-index: 1000;
opacity: 0;
transition: opacity 0.3s ease;
pointer-events: none;
}
.toast.show { opacity: 1; }
.toast.success { background: #28a745; }
.toast.error { background: #dc3545; }
.magnet-link {
word-break: break-all;
font-size: 11px;
color: #666;
margin-top: 8px;
padding: 6px 8px;
background: #f8f9fa;
border-radius: 4px;
display: none;
}
.magnet-link.visible { display: block; }
</style>
</head>
<body>
<div class="container">
<a href="/" class="back-link">← Назад к поиску</a>
<h1>🔍 Торренты для: "{{ movie_title }}"</h1>
<div class="movie-info">
<strong>Фильм:</strong> {{ movie_title }}
{% if year %}
<br><strong>Год:</strong> {{ year }}
{% endif %}
</div>
{% if torrents %}
<div class="torrents-list">
{% for torrent in torrents %}
<div class="torrent-item">
<div class="torrent-title">{{ torrent.title }}</div>
<div class="torrent-details">
<div class="detail-item">
<div class="detail-label">Размер</div>
<div class="detail-value">{{ torrent.size_readable }}</div>
</div>
<div class="detail-item">
<div class="detail-label">Разрешение</div>
<div class="detail-value resolution-{{ torrent.resolution }}">{{ torrent.resolution }}</div>
</div>
<div class="detail-item">
<div class="detail-label">Качество</div>
<div class="detail-value">
<span class="quality-badge quality-{{ torrent.quality.lower() }}">{{ torrent.quality }}</span>
</div>
</div>
<div class="detail-item">
<div class="detail-label">Сиды</div>
<div class="detail-value">{{ torrent.seeds }}</div>
</div>
<div class="detail-item">
<div class="detail-label">Пиры</div>
<div class="detail-value">{{ torrent.peers }}</div>
</div>
{% if torrent.category %}
<div class="detail-item">
<div class="detail-label">Категория</div>
<div class="detail-value">{{ torrent.category }}</div>
</div>
{% endif %}
{% if torrent.date %}
<div class="detail-item">
<div class="detail-label">Дата</div>
<div class="detail-value">{{ torrent.date }}</div>
</div>
{% endif %}
{% if torrent.provider %}
<div class="detail-item">
<div class="detail-label">Провайдер</div>
@ -211,11 +212,26 @@
</div>
{% endif %}
</div>
<div class="torrent-actions">
<a href="{{ torrent.magnet }}" class="btn btn-primary">Magnet</a>
<a href="{{ torrent.download_url }}" class="btn btn-success">Скачать .torrent</a>
<button onclick="addToTorrentClient('{{ torrent.id }}', '{{ torrent.magnet|e }}', '{{ torrent.title|e }}')" class="btn btn-secondary">Добавить в клиент</button>
{% if torrent.magnet and torrent.magnet.startswith('magnet:') %}
<a href="{{ torrent.magnet }}" class="btn btn-primary" title="Открыть magnet-ссылку">🧲 Magnet</a>
<button onclick="copyMagnet('{{ torrent.magnet|e }}', this)" class="btn btn-info" title="Копировать magnet-ссылку">📋 Копировать</button>
{% else %}
<button class="btn btn-primary" disabled title="Magnet-ссылка недоступна">🧲 Magnet</button>
{% endif %}
{% if torrent.download_url and torrent.download_url.startswith('http') %}
<a href="/api/proxy-torrent-download?url={{ torrent.download_url|urlencode }}" class="btn btn-success" target="_blank">💾 Скачать .torrent</a>
{% else %}
<button class="btn btn-success" disabled title=".torrent файл недоступен">💾 Скачать .torrent</button>
{% endif %}
{% if torrent.magnet and torrent.magnet.startswith('magnet:') %}
<button onclick="addToTorrentClient('{{ torrent.id }}', '{{ torrent.magnet|e }}', '{{ torrent.title|e }}')" class="btn btn-secondary"> Добавить в клиент</button>
{% else %}
<button class="btn btn-secondary" disabled title="Нет magnet-ссылки для добавления"> Добавить в клиент</button>
{% endif %}
</div>
</div>
{% endfor %}
@ -227,20 +243,68 @@
{% endif %}
</div>
<div id="toast" class="toast"></div>
<script>
function showToast(message, type) {
const toast = document.getElementById('toast');
toast.textContent = message;
toast.className = 'toast ' + (type || '');
toast.classList.add('show');
setTimeout(() => toast.classList.remove('show'), 3000);
}
function copyMagnet(magnet, button) {
if (!magnet || !magnet.startsWith('magnet:')) {
showToast('❌ Magnet-ссылка недоступна', 'error');
return;
}
// Пробуем Clipboard API
if (navigator.clipboard && navigator.clipboard.writeText) {
navigator.clipboard.writeText(magnet).then(() => {
const originalText = button.textContent;
button.textContent = '✅ Скопировано';
setTimeout(() => button.textContent = originalText, 2000);
showToast('✅ Magnet-ссылка скопирована в буфер', 'success');
}).catch(() => {
fallbackCopyMagnet(magnet, button);
});
} else {
fallbackCopyMagnet(magnet, button);
}
}
function fallbackCopyMagnet(magnet, button) {
const textarea = document.createElement('textarea');
textarea.value = magnet;
textarea.style.position = 'fixed';
textarea.style.opacity = '0';
document.body.appendChild(textarea);
textarea.select();
try {
document.execCommand('copy');
const originalText = button.textContent;
button.textContent = '✅ Скопировано';
setTimeout(() => button.textContent = originalText, 2000);
showToast('✅ Magnet-ссылка скопирована в буфер', 'success');
} catch (e) {
showToast('❌ Не удалось скопировать. Выделите ссылку вручную.', 'error');
}
document.body.removeChild(textarea);
}
function addToTorrentClient(torrentId, magnet, title) {
// Показываем индикатор загрузки
const button = event.target;
const originalText = button.textContent;
button.textContent = '⏳ Добавляем...';
button.disabled = true;
// Отправляем ID + magnet + название
const formData = new FormData();
formData.append('torrent_id', torrentId);
formData.append('magnet', magnet || '');
formData.append('torrent_title', title || '');
fetch('/api/add-torrent', {
method: 'POST',
body: formData
@ -248,17 +312,17 @@
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
alert('✅ ' + data.message);
showToast('✅ ' + data.message, 'success');
button.textContent = '✅ Добавлено';
button.style.backgroundColor = '#28a745';
} else {
alert('❌ ' + data.message);
showToast('❌ ' + data.message, 'error');
button.textContent = originalText;
button.disabled = false;
}
})
.catch(error => {
alert('❌ Ошибка при добавлении торрента: ' + error);
showToast('❌ Ошибка при добавлении торрента: ' + error, 'error');
button.textContent = originalText;
button.disabled = false;
});

273
app/test_app.py Normal file
View file

@ -0,0 +1,273 @@
#!/usr/bin/env python3
"""
Тесты для app.py изолированные, без внешних зависимостей.
Запуск: python3 -m unittest test_app -v
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import unittest
from app import (
extract_hash_from_result,
generate_clean_magnet,
parse_size_to_bytes,
parse_size,
extract_resolution,
extract_quality,
generate_search_variants,
score_torrent,
normalize_search_term,
is_movie_torrent,
)
class TestExtractHash(unittest.TestCase):
def test_direct_hash_field(self):
result = {"Hash": "08ada5a7a6183aae1e09d831df6748d566095a10"}
self.assertEqual(extract_hash_from_result(result), "08ADA5A7A6183AAE1E09D831DF6748D566095A10")
def test_hash_from_magnet(self):
result = {
"Hash": "",
"Magnet": "magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=test"
}
self.assertEqual(extract_hash_from_result(result), "08ADA5A7A6183AAE1E09D831DF6748D566095A10")
def test_hash_from_info_hash(self):
result = {"InfoHash": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0"}
self.assertEqual(
extract_hash_from_result(result),
"A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0"
)
def test_empty_result(self):
self.assertEqual(extract_hash_from_result({}), "")
self.assertEqual(extract_hash_from_result({"Hash": ""}), "")
def test_hash_from_torrent_url_no_btih(self):
result = {"Torrent": "https://rutracker.org/forum/dl.php?t=123456"}
self.assertEqual(extract_hash_from_result(result), "")
def test_case_insensitive_hash(self):
result = {"Hash": "08ada5a7a6183aae1e09d831df6748d566095a10"}
self.assertEqual(extract_hash_from_result(result), "08ADA5A7A6183AAE1E09D831DF6748D566095A10")
def test_id_field_as_hash(self):
result = {"Id": "08ada5a7a6183aae1e09d831df6748d566095a10"}
self.assertEqual(extract_hash_from_result(result), "08ADA5A7A6183AAE1E09D831DF6748D566095A10")
def test_non_hash_id_field(self):
result = {"Id": "12345", "Hash": ""}
self.assertEqual(extract_hash_from_result(result), "")
def test_hash_from_lowercase_field(self):
result = {"hash": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0"}
self.assertEqual(
extract_hash_from_result(result),
"A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6E7F8A9B0"
)
class TestGenerateCleanMagnet(unittest.TestCase):
def test_basic_magnet(self):
magnet = generate_clean_magnet("08ada5a7a6183aae1e09d831df6748d566095a10", "Test Movie")
self.assertTrue(magnet.startswith("magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10"))
self.assertIn("&dn=Test%20Movie", magnet)
self.assertIn("&tr=udp://tracker.opentrackr.org:1337/announce", magnet)
self.assertIn("&tr=udp://tracker.openbittorrent.com:6969/announce", magnet)
def test_empty_hash(self):
self.assertEqual(generate_clean_magnet(""), "")
self.assertEqual(generate_clean_magnet(None), "")
def test_no_title(self):
magnet = generate_clean_magnet("08ada5a7a6183aae1e09d831df6748d566095a10")
self.assertTrue(magnet.startswith("magnet:?xt=urn:btih:"))
self.assertNotIn("&dn=", magnet)
def test_cyrillic_title(self):
magnet = generate_clean_magnet("a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0", "Терминатор 2")
self.assertIn("&dn=%D0%A2%D0%B5%D1%80%D0%BC%D0%B8%D0%BD%D0%B0%D1%82%D0%BE%D1%80%202", magnet)
def test_has_trackers(self):
magnet = generate_clean_magnet("a" * 40)
expected_trackers = [
"tracker.opentrackr.org",
"open.stealth.si",
"tracker.openbittorrent.com",
"tracker.tamersunion.org",
"exodus.desync.com",
"tracker.moeking.me",
]
for tracker in expected_trackers:
self.assertIn(tracker, magnet, f"Missing tracker: {tracker}")
def test_no_dead_trackers(self):
magnet = generate_clean_magnet("a" * 40)
self.assertNotIn("coppersurfer.tk", magnet)
self.assertNotIn("leechers-paradise.org", magnet)
class TestParseSize(unittest.TestCase):
def test_gb(self):
bytes_size, readable = parse_size("25.3 GB")
self.assertEqual(bytes_size, int(25.3 * 1024**3))
self.assertEqual(readable, "25.3 GB")
def test_mb(self):
bytes_size, readable = parse_size("6.2 MB")
self.assertEqual(bytes_size, int(6.2 * 1024**2))
def test_kb(self):
bytes_size, readable = parse_size("512 KB")
self.assertEqual(bytes_size, 512 * 1024)
def test_empty(self):
bytes_size, readable = parse_size("")
self.assertEqual(bytes_size, 0)
self.assertEqual(readable, "Неизвестно")
def test_none(self):
bytes_size, readable = parse_size(None)
self.assertEqual(bytes_size, 0)
self.assertEqual(readable, "Неизвестно")
class TestParseSizeToBytes(unittest.TestCase):
def test_gb(self):
result = parse_size_to_bytes("25.3 GB")
self.assertAlmostEqual(result, 25.3 * 1024**3, delta=1)
def test_mb(self):
result = parse_size_to_bytes("500 MB")
self.assertEqual(result, 500 * 1024**2)
def test_empty(self):
self.assertEqual(parse_size_to_bytes(""), 0)
class TestExtractResolution(unittest.TestCase):
def test_2160p(self):
self.assertEqual(extract_resolution("Movie 2024 UHD 2160p BluRay"), "2160p")
def test_4k(self):
self.assertEqual(extract_resolution("Movie 2024 4K HDR"), "2160p")
def test_1080p(self):
self.assertEqual(extract_resolution("Movie 2024 1080p WEB-DL"), "1080p")
def test_720p(self):
self.assertEqual(extract_resolution("Movie 2024 720p HDTV"), "720p")
def test_unknown(self):
self.assertEqual(extract_resolution("Movie Unknown Quality"), "Неизвестно")
class TestExtractQuality(unittest.TestCase):
def test_bluray(self):
self.assertEqual(extract_quality("Movie 2024 Bluray 1080p"), "BluRay")
def test_bdrip(self):
self.assertEqual(extract_quality("Movie 2024 BDRip 720p"), "BluRay")
def test_webdl(self):
self.assertEqual(extract_quality("Movie 2024 WEB-DL 1080p"), "WEB-DL")
def test_webrip(self):
self.assertEqual(extract_quality("Movie 2024 WEBRip 720p"), "WEBRip")
def test_hdtv(self):
self.assertEqual(extract_quality("Movie 2024 HDTV 720p"), "HDTV")
def test_unknown(self):
self.assertEqual(extract_quality("Movie 2024 Some Quality"), "Unknown")
class TestNormalizeSearchTerm(unittest.TestCase):
def test_basic(self):
self.assertEqual(normalize_search_term("The Terminator"), "Terminator")
def test_with_articles(self):
self.assertEqual(normalize_search_term("The Matrix Revolutions"), "Matrix.Revolutions")
def test_empty(self):
self.assertEqual(normalize_search_term(""), "")
self.assertEqual(normalize_search_term(None), "")
def test_spaces_to_dots(self):
self.assertEqual(normalize_search_term("Harry Potter"), "Harry.Potter")
class TestGenerateSearchVariants(unittest.TestCase):
def test_basic(self):
variants = generate_search_variants("Terminator", "The Terminator", "1984")
self.assertIn("Terminator", variants)
self.assertIn("The Terminator", variants)
self.assertIn("Terminator 1984", variants)
def test_terminator_special(self):
variants = generate_search_variants("Терминатор 2", "Terminator 2", "1991")
self.assertIn("T2", variants)
self.assertIn("T2.Judgment.Day", variants)
def test_no_duplicates(self):
variants = generate_search_variants("Terminator", "Terminator", "1984")
self.assertEqual(len(variants), len(set(variants)))
class TestScoreTorrent(unittest.TestCase):
def test_high_score_exact_match(self):
torrent = {
"title": "The Terminator 1984 1080p BluRay",
"quality": "BluRay",
"resolution": "1080p",
"seeds": 150,
}
score = score_torrent(torrent, "Terminator", "The Terminator", "1984")
self.assertGreater(score, 0.8)
def test_low_score_no_match(self):
torrent = {
"title": "Some Other Movie 2023 720p WEB-DL",
"quality": "WEB-DL",
"resolution": "720p",
"seeds": 5,
}
score = score_torrent(torrent, "Terminator", "The Terminator", "1984")
self.assertAlmostEqual(score, 0.3)
def test_score_capped_at_one(self):
torrent = {
"title": "The Terminator 1984 2160p BluRay",
"quality": "BluRay",
"resolution": "2160p",
"seeds": 200,
}
score = score_torrent(torrent, "The Terminator", "The Terminator", "1984")
self.assertLessEqual(score, 1.0)
class TestIsMovieTorrent(unittest.TestCase):
def test_is_movie(self):
self.assertTrue(is_movie_torrent("The Terminator 1984 BluRay", "Terminator", "The Terminator"))
def test_not_movie_excluded_keyword(self):
self.assertFalse(is_movie_torrent("The Game 1997 1080p BluRay", "The Game", ""))
def test_not_movie_unknown_title(self):
self.assertFalse(is_movie_torrent("Some Other Movie 2023 720p", "Terminator", "The Terminator"))
if __name__ == "__main__":
unittest.main()