import os import re import asyncio import httpx import requests from bs4 import BeautifulSoup from fastapi import FastAPI, Request, Form, HTTPException, Query from fastapi.responses import HTMLResponse, Response from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from urllib.parse import quote app = FastAPI(title="Movie Search API", version="1.0.0") # Настройка CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # В продакшене лучше указать конкретные домены allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Настройка шаблонов templates = Jinja2Templates(directory="templates") # Регистрируем фильтр urlencode для шаблонов def urlencode_filter(s): if s: return quote(s, safe='') return '' templates.env.filters['urlencode'] = urlencode_filter # URL прокси-сервиса TMDB TMDB_PROXY_URL = os.getenv("TMDB_PROXY_URL", "http://localhost:8001") # URL torAPI TORAPI_URL = os.getenv("TORAPI_URL", "http://localhost:8088") # URL Torrent Search API TORRENT_SEARCH_URL = os.getenv("TORRENT_SEARCH_URL", "http://localhost:8443") TORRENT_ADD_URL = os.getenv("TORRENT_ADD_URL", "http://localhost:8444") async def search_movies(query: str) -> dict: """Поиск фильмов через TMDB Proxy""" async with httpx.AsyncClient() as client: try: response = await client.get( f"{TMDB_PROXY_URL}/search/movie", params={ "query": query, "language": "ru-RU", "include_adult": False } ) response.raise_for_status() return response.json() except httpx.HTTPError as e: raise HTTPException(status_code=500, detail=f"TMDB Proxy error: {str(e)}") async def get_movie_details(movie_id: int) -> dict: """Получение детальной информации о фильме из TMDB через прокси""" async with httpx.AsyncClient() as client: try: response = await client.get( f"{TMDB_PROXY_URL}/movie/{movie_id}", params={ "language": "ru-RU", "append_to_response": "external_ids" } ) response.raise_for_status() return response.json() except httpx.HTTPError as e: raise HTTPException(status_code=500, detail=f"TMDB Proxy error: {str(e)}") def parse_size(size_str: str) -> tuple: """Парсинг размера файла и возврат в байтах и читаемом виде""" if not size_str: return 0, "Неизвестно" size_str = size_str.upper().strip() multipliers = { 'KB': 1024, 'MB': 1024**2, 'GB': 1024**3, 'TB': 1024**4 } for unit, multiplier in multipliers.items(): if unit in size_str: try: number = float(re.findall(r'[\d.]+', size_str)[0]) bytes_size = int(number * multiplier) return bytes_size, size_str except (ValueError, IndexError): pass return 0, size_str def extract_resolution(title: str) -> str: """Извлечение разрешения из названия торрента""" title_upper = title.upper() # Поиск разрешений resolutions = ['2160P', '4K', '1080P', '720P', '480P', '360P'] for res in resolutions: if res in title_upper: if res == '4K': return '2160p' return res.lower() # Поиск по паттернам if re.search(r'\b(2160|4k)\b', title_upper): return '2160p' elif re.search(r'\b1080\b', title_upper): return '1080p' elif re.search(r'\b720\b', title_upper): return '720p' elif re.search(r'\b480\b', title_upper): return '480p' return 'Неизвестно' def normalize_search_term(term: str) -> str: """Нормализация поискового термина для сцены""" if not term: return "" # Убираем артикли term = re.sub(r'\b(the|a|an)\b', '', term, flags=re.IGNORECASE) # Заменяем пробелы на точки и дефисы term = re.sub(r'\s+', '.', term.strip()) # Убираем лишние точки term = re.sub(r'\.+', '.', term) # Убираем точки в начале и конце term = term.strip('.') return term def generate_search_variants(title: str, original_title: str = None, year: str = None) -> list: """Генерация вариантов названий для поиска""" variants = [] # Базовые варианты variants.extend([title, original_title] if original_title else [title]) # Варианты с годом if year: variants.extend([f"{title} {year}", f"{original_title} {year}"] if original_title else [f"{title} {year}"]) # Нормализованные варианты normalized_variants = [normalize_search_term(v) for v in variants if v] variants.extend(normalized_variants) # Специальные варианты для известных фильмов title_lower = title.lower() if 'терминатор' in title_lower or 'terminator' in title_lower: variants.extend(['T2', 'T2.Judgment.Day', 'T2.Judgement.Day']) if '2' in title_lower or 'второй' in title_lower: variants.extend(['Terminator.2', 'Терминатор.2']) # Убираем дубликаты и пустые значения return list(set([v for v in variants if v and v.strip()])) def score_torrent(torrent: dict, movie_title: str, original_title: str = None, year: str = None) -> float: """Скоринг торрента по релевантности""" score = 0.0 torrent_name = torrent.get('title', '').lower() movie_title_lower = movie_title.lower() original_title_lower = original_title.lower() if original_title else "" # Скоринг по названию (0-1) if movie_title_lower in torrent_name: score += 0.8 if original_title_lower and original_title_lower in torrent_name: score += 0.9 # Скоринг по году (0-0.5) if year and year in torrent_name: score += 0.5 # Скоринг по качеству (0-0.3) quality = torrent.get('quality', '').lower() if 'bluray' in quality or 'remux' in quality: score += 0.3 elif 'web-dl' in quality or 'webdl' in quality: score += 0.2 elif 'hdtv' in quality: score += 0.1 # Скоринг по разрешению (0-0.2) resolution = torrent.get('resolution', '').lower() if '2160p' in resolution or '4k' in resolution: score += 0.2 elif '1080p' in resolution: score += 0.15 elif '720p' in resolution: score += 0.1 # Скоринг по сидам (0-0.1) seeds = torrent.get('seeds', 0) if seeds > 100: score += 0.1 elif seeds > 50: score += 0.05 return min(score, 1.0) # Ограничиваем максимум 1.0 def generate_demo_torrents(movie_title: str, year: str = None, original_title: str = None) -> list: """Генерация демо-данных для тестирования (когда API не работает)""" torrents = [] # Создаем реалистичные варианты торрентов quality_variants = [ {"quality": "BluRay", "resolution": "1080p", "size": "8.5 GB", "seeds": 150, "peers": 25}, {"quality": "WEB-DL", "resolution": "1080p", "size": "6.2 GB", "seeds": 120, "peers": 20}, {"quality": "BluRay", "resolution": "2160p", "size": "25.3 GB", "seeds": 200, "peers": 35}, {"quality": "WEBRip", "resolution": "720p", "size": "3.8 GB", "seeds": 80, "peers": 15}, {"quality": "HDTV", "resolution": "720p", "size": "2.1 GB", "seeds": 50, "peers": 8} ] # Генерируем варианты названий title_variants = [movie_title] if original_title and original_title != movie_title: title_variants.append(original_title) for i, variant in enumerate(quality_variants): # Выбираем случайное название base_title = title_variants[i % len(title_variants)] # Создаем реалистичное название торрента torrent_title = f"{base_title} ({year or '2020'}) - {variant['resolution']} - {variant['quality']}" # Создаем fake magnet hash fake_hash = f"08ada5a7a6183aae1e09d831df6748d566095a10{i:02d}" torrent = { "title": torrent_title, "size_bytes": parse_size_to_bytes(variant['size']), "size_readable": variant['size'], "resolution": variant['resolution'], "quality": variant['quality'], "seeds": variant['seeds'], "peers": variant['peers'], "magnet": f"magnet:?xt=urn:btih:{fake_hash}&dn={base_title.replace(' ', '+')}&tr=udp://tracker.openbittorrent.com:80", "download_url": f"https://example.com/torrent/{fake_hash}/", "category": "Фильмы", "date": "2025-01-05", "provider": "Demo", "relevance_score": 0.9 - (i * 0.1) # Убывающий скор } torrents.append(torrent) return torrents async def search_torrents(movie_title: str, year: str = None, original_title: str = None, genres: list = None, imdb_id: str = None) -> list: """Поиск торрентов для фильма через Torrent Search API""" torrents = [] try: # Генерируем варианты поисковых запросов search_queries = generate_search_variants(movie_title, original_title, year) print(f"Generated search variants: {search_queries}") # Добавляем IMDB ID если есть if imdb_id: search_queries.append(imdb_id) # Получаем список доступных провайдеров async with httpx.AsyncClient() as client: providers_response = await client.get(f"{TORRENT_SEARCH_URL}/api/provider/list") if providers_response.status_code != 200: print(f"Failed to get providers: {providers_response.status_code}") return torrents providers = providers_response.json() print(f"Available providers: {[p['Provider'] for p in providers]}") # Ищем торренты на всех доступных провайдерах for provider in providers: provider_name = provider['Provider'].lower() try: # Пробуем каждый вариант поискового запроса for search_query in search_queries: print(f"Searching '{search_query}' on {provider_name}") # Поиск на конкретном провайдере - исправленный эндпоинт try: search_response = await client.get( f"{TORRENT_SEARCH_URL}/api/search/title/{provider_name}", params={"query": search_query}, timeout=30.0 ) except Exception as request_error: print(f"Request error on {provider_name} for '{search_query}': {request_error}") import traceback traceback.print_exc() continue if search_response.status_code == 200: try: results = search_response.json() except Exception as json_error: print(f"JSON parse error on {provider_name} for '{search_query}': {json_error}") print(f"Response text (first 500 chars): {search_response.text[:500]}") continue if not isinstance(results, list): print(f"Unexpected response format from {provider_name}: {type(results)}") if isinstance(results, dict): print(f"Dict keys: {list(results.keys())}") continue print(f"Found {len(results)} results for '{search_query}' on {provider_name}") # Обрабатываем все результаты и скорим их for result in results[:20]: # Берем больше результатов для скоринга print(f"Processing torrent: {result['Name'][:100]}...") # Логируем доступные поля для отладки torrent_id = result.get('Id') or result.get('id') or result.get('ID') or result.get('Hash', '')[:8] or '' if torrent_id: print(f" Found ID: {torrent_id[:20]}...") torrent = parse_torrent_result(result, movie_title, year) if torrent: # Если ID не был найден, пробуем использовать Hash как ID if not torrent.get('id') or torrent.get('id') == '': torrent_id_from_hash = result.get('Hash', '') if torrent_id_from_hash: torrent['id'] = torrent_id_from_hash print(f" Using Hash as ID: {torrent_id_from_hash[:20]}...") # Скорим торрент score = score_torrent(torrent, movie_title, original_title, year) torrent['relevance_score'] = score size_mb = torrent.get('size_bytes', 0) / (1024 * 1024) print(f"Torrent score: {score:.2f}, size: {size_mb:.1f}MB, seeds: {torrent.get('seeds', 0)}, ID: {torrent.get('id', 'None')[:20]}...") # Добавляем только если скор больше 0.1 и размер больше 100MB if score > 0.1 and torrent.get('size_bytes', 0) > 100 * 1024 * 1024: # Проверяем, что ID есть, иначе используем Hash if not torrent.get('id') or torrent.get('id') == '': if result.get('Hash'): torrent['id'] = result['Hash'] elif result.get('Url'): # Пытаемся извлечь ID из URL url_id_match = re.search(r'/(\d+)/?$', result.get('Url', '')) if url_id_match: torrent['id'] = url_id_match.group(1) else: torrent['id'] = result.get('Url', '')[-20:] # Используем последние 20 символов URL torrents.append(torrent) print(f" ✅ Added to results with ID: {torrent.get('id', 'None')[:20]}...") else: print(f" ❌ Filtered out (score: {score:.2f}, size: {size_mb:.1f}MB)") # Если нашли достаточно результатов, переходим к следующему провайдеру if len(torrents) >= 20: break except Exception as e: print(f"Error searching on {provider_name}: {type(e).__name__}: {e}") import traceback traceback.print_exc() continue # Сортируем по количеству сидов и ограничиваем общее количество torrents.sort(key=lambda x: x.get('seeds', 0), reverse=True) return torrents[:30] # Возвращаем топ-30 результатов except Exception as e: print(f"Error searching torrents: {e}") import traceback traceback.print_exc() return torrents async def search_torrent_by_id(torrent_id: str) -> dict: """Поиск торрента по ID через Torrent Search API""" try: async with httpx.AsyncClient() as client: print(f"Searching torrent by ID: {torrent_id}") # Получаем список доступных провайдеров providers_response = await client.get(f"{TORRENT_SEARCH_URL}/api/provider/list") if providers_response.status_code != 200: print(f"Failed to get providers: {providers_response.status_code}") return None providers = providers_response.json() print(f"Available providers: {[p['Provider'] for p in providers]}") # Пробуем найти торрент на всех доступных провайдерах for provider in providers: provider_name = provider['Provider'].lower() try: print(f"Searching ID {torrent_id} on {provider_name}") # Пробуем разные варианты API для поиска по ID search_urls = [ f"{TORRENT_SEARCH_URL}/api/search/id/{provider_name}", f"{TORRENT_SEARCH_URL}/api/search/{provider_name}", ] response = None results = None for search_url in search_urls: try: response = await client.get(search_url, params={"query": torrent_id}, timeout=30.0) if response.status_code == 200: results = response.json() print(f"Response from {provider_name} ({search_url}): {type(results)} - {len(results) if isinstance(results, list) else 'not a list'}") break except Exception as e: print(f"Error trying {search_url}: {e}") continue if response and response.status_code == 200 and results: if isinstance(results, list) and len(results) > 0: # Берем первый результат result = results[0] # Используем Original_Name если Name пустое torrent_name = result.get('Name', '') or result.get('Original_Name', '') print(f"Found torrent by ID on {provider_name}: {torrent_name[:100]}...") # Получаем хэш — используем единую функцию извлечения hash_value = extract_hash_from_result(result) if hash_value: # Генерируем чистую magnet-ссылку с публичными трекерами magnet = generate_clean_magnet(hash_value, torrent_name) print(f"Generated clean magnet with hash {hash_value[:10]}...: {magnet[:100]}...") else: # Fallback на оригинальную magnet-ссылку если нет хэша magnet = result.get('Magnet', '') if not magnet or not magnet.startswith('magnet:'): print(f"Warning: No hash found and no valid magnet link. Hash fields: Hash={result.get('Hash', 'None')[:20]}, Magnet={result.get('Magnet', 'None')[:50]}") magnet = "" # Пробуем локальный torapi-qbit если хэш пустой или битый if not magnet or not re.search(r'urn:btih:([a-fA-F0-9]{40}|[a-zA-Z0-9]{32})', magnet): try: torapi_add_url = os.getenv("TORAPI_ADD_URL", "http://localhost:8444") fb_resp = await client.get( f"{torapi_add_url}/api/search/id/{provider_name}", params={"query": torrent_id}, timeout=15.0 ) if fb_resp.status_code == 200: fb_data = fb_resp.json() if isinstance(fb_data, list) and len(fb_data) > 0: fb_result = fb_data[0] fb_hash = extract_hash_from_result(fb_result) if fb_hash: magnet = generate_clean_magnet(fb_hash, torrent_name) print(f"torapi-qbit fallback: got hash {fb_hash[:10]}... via extract_hash_from_result") except Exception as fbe: print(f"torapi-qbit fallback failed: {fbe}") # Если хэш всё ещё пустой — пропускаем этого провайдера if not hash_value and not re.search(r'urn:btih:([a-fA-F0-9]{40}|[a-zA-Z0-9]{32})', magnet): print(f"Skipping {provider_name}: no valid magnet hash") continue # Парсим результат в стандартный формат torrent = { "title": torrent_name, "url": result.get('Url', ''), "hash": result.get('Hash', ''), "magnet": magnet, "torrent_url": result.get('Torrent', ''), "imdb_url": result.get('IMDb_link', ''), "kinopoisk_url": result.get('Kinopoisk_link', ''), "poster": result.get('Poster', ''), "year": result.get('Year', ''), "release": result.get('Release', ''), "type": result.get('Type', ''), "duration": result.get('Duration', ''), "audio": result.get('Audio', ''), "director": result.get('Directer', ''), "actors": result.get('Actors', ''), "description": result.get('Description', ''), "quality": result.get('Quality', ''), "video": result.get('Video', ''), "files": result.get('Files', []), "provider": provider_name, "id": torrent_id } return torrent else: print(f"No results found for ID {torrent_id} on {provider_name}") else: print(f"Error searching by ID on {provider_name}: {response.status_code} - {response.text}") except Exception as e: print(f"Error searching on {provider_name}: {type(e).__name__}: {e}") import traceback traceback.print_exc() continue print(f"No results found for ID {torrent_id} on any provider via search API") # Fallback: пробуем локальный torapi-qbit (через qBittorrent) try: torapi_add_url = os.getenv("TORAPI_ADD_URL", "http://localhost:8444") fallback_response = await client.get( f"{torapi_add_url}/api/search/id/rutracker", params={"query": torrent_id}, timeout=30.0 ) if fallback_response.status_code == 200: fallback_results = fallback_response.json() if isinstance(fallback_results, list) and len(fallback_results) > 0: result = fallback_results[0] hash_value = result.get('Hash', '') if not hash_value: orig_magnet = result.get('Magnet', '') hm = re.search(r'urn:btih:([a-fA-F0-9]{40})', orig_magnet) if hm: hash_value = hm.group(1) if hash_value: torrent_name = result.get('Name', '') or result.get('Original_Name', '') magnet = generate_clean_magnet(hash_value, torrent_name) print(f"Fallback: got magnet via torapi-qbit: {magnet[:60]}...") return { "title": torrent_name, "url": result.get('Url', ''), "hash": hash_value, "magnet": magnet, "torrent_url": result.get('Torrent', ''), "provider": "torapi-qbit", "id": torrent_id } except Exception as e: print(f"Fallback to torapi-qbit failed: {e}") print(f"All fallbacks exhausted for ID {torrent_id}") return None except Exception as e: print(f"Error searching torrent by ID: {e}") import traceback traceback.print_exc() return None def is_movie_torrent(torrent_name: str, movie_title: str, original_title: str = None) -> bool: """Проверяет, является ли торрент фильмом""" torrent_name_lower = torrent_name.lower() movie_title_lower = movie_title.lower() original_title_lower = original_title.lower() if original_title else "" # Исключаем только явно не-фильмы exclude_keywords = ['игра', 'game', 'музыка', 'music', 'программа', 'program', 'софт', 'software'] for keyword in exclude_keywords: if keyword in torrent_name_lower: print(f"Excluding due to keyword '{keyword}': {torrent_name[:100]}...") return False # Проверяем, что название содержит название фильма (русское или оригинальное) contains_title = movie_title_lower in torrent_name_lower if original_title_lower and not contains_title: contains_title = original_title_lower in torrent_name_lower print(f"Title '{movie_title_lower}' or '{original_title_lower}' in torrent: {contains_title}") return contains_title def parse_torrent_result(result: dict, movie_title: str, year: str = None) -> dict: """Парсит результат поиска торрентов""" try: # Извлекаем разрешение из названия resolution = extract_resolution(result['Name']) # Извлекаем качество из названия quality = extract_quality(result['Name']) # Парсим размер size_bytes = parse_size_to_bytes(result['Size']) # Создаем чистую magnet-ссылку с публичными трекерами magnet = None if 'Hash' in result and result['Hash']: # Генерируем чистую magnet-ссылку из хэша с публичными трекерами hash_value = result['Hash'] torrent_title = result['Name'] magnet = generate_clean_magnet(hash_value, torrent_title) elif 'Magnet' in result and result['Magnet']: # Если нет хэша, используем оригинальную magnet-ссылку magnet = result['Magnet'] elif 'Torrent' in result and result['Torrent']: # Используем URL на .torrent файл как fallback magnet = result['Torrent'] # Определяем провайдера по URL provider = 'Unknown' torrent_url = result.get('Torrent', '') if 'rutracker.org' in torrent_url: provider = 'rutracker' elif 'kinozal.tv' in torrent_url: provider = 'kinozal' elif 'rutor.info' in torrent_url: provider = 'rutor' elif 'nnmclub.to' in torrent_url: provider = 'nonameclub' # Извлекаем ID из разных возможных полей torrent_id = ( result.get('Id') or result.get('id') or result.get('ID') or result.get('Hash', '')[:8] or # Используем первые 8 символов хэша как fallback '' ) # Если ID не найден, пытаемся извлечь из URL if not torrent_id and result.get('Url'): url_id_match = re.search(r'/(\d+)/?$', result.get('Url', '')) if url_id_match: torrent_id = url_id_match.group(1) # Если все еще нет ID, используем Hash полностью if not torrent_id: torrent_id = result.get('Hash', '') return { "title": result['Name'], "size_bytes": size_bytes, "size_readable": result['Size'], "resolution": resolution, "quality": quality, "seeds": int(result.get('Seeds', 0)), "peers": int(result.get('Peers', 0)), "magnet": magnet, "download_url": result.get('Torrent', ''), "category": result.get('Category', ''), "date": result.get('Date', ''), "provider": provider, "id": torrent_id } except Exception as e: print(f"Error parsing torrent result: {e}") return None def extract_quality(title: str) -> str: """Извлекает качество из названия торрента""" title_upper = title.upper() if 'BLURAY' in title_upper or 'BDRIP' in title_upper: return 'BluRay' elif 'WEB-DL' in title_upper or 'WEBDL' in title_upper: return 'WEB-DL' elif 'WEBRIP' in title_upper or 'WEB-RIP' in title_upper: return 'WEBRip' elif 'HDTV' in title_upper: return 'HDTV' elif 'DVDRIP' in title_upper or 'DVD-RIP' in title_upper: return 'DVDRip' else: return 'Unknown' def parse_size_to_bytes(size_str: str) -> int: """Конвертирует строку размера в байты""" size_str = size_str.upper().strip() multipliers = { 'KB': 1024, 'MB': 1024**2, 'GB': 1024**3, 'TB': 1024**4 } for unit, multiplier in multipliers.items(): if unit in size_str: try: number = float(re.findall(r'[\d.]+', size_str)[0]) return int(number * multiplier) except (ValueError, IndexError): pass return 0 def extract_hash_from_result(result: dict) -> str: """Извлекает хэш торрента из результата API — пробует все возможные поля""" hash_value = "" # 1. Прямое поле Hash hash_value = result.get('Hash', '') or result.get('hash', '') or '' if hash_value and re.match(r'^[a-fA-F0-9]{40}$', hash_value): return hash_value.upper() # 2. Поле InfoHash if not hash_value: hash_value = result.get('InfoHash', '') or result.get('info_hash', '') or '' if hash_value and re.match(r'^[a-fA-F0-9]{40}$', hash_value): return hash_value.upper() # 3. Из Magnet ссылки magnet = result.get('Magnet', '') or result.get('magnet', '') or '' if magnet: hm = re.search(r'urn:btih:([a-fA-F0-9]{40})', magnet) if hm: return hm.group(1).upper() hm32 = re.search(r'urn:btih:([a-zA-Z0-9]{32})', magnet) if hm32: return hm32.group(1) # 4. Из Torrent URL (rutracker: /dl.php?t=XXXXX) torrent_url = result.get('Torrent', '') or result.get('torrent', '') or result.get('Url', '') or '' # Некоторые провайдеры кодируют хэш в URL if 'btih:' in torrent_url: hm = re.search(r'btih:([a-fA-F0-9]{40})', torrent_url) if hm: return hm.group(1).upper() # 5. Из поля Id/ID (некоторые API возвращают хэш как ID) tid = result.get('Id') or result.get('id') or result.get('ID') or '' if tid and re.match(r'^[a-fA-F0-9]{40}$', tid): return tid.upper() return hash_value # Может быть пустым def generate_clean_magnet(hash_value: str, title: str = None) -> str: """Генерирует чистую magnet-ссылку с публичными трекерами""" if not hash_value: return "" # Рабочие публичные трекеры (2025+) public_trackers = [ "udp://tracker.opentrackr.org:1337/announce", "udp://open.stealth.si:80/announce", "udp://tracker.openbittorrent.com:6969/announce", "https://tracker.tamersunion.org:443/announce", "udp://exodus.desync.com:6969/announce", "udp://tracker.moeking.me:6969/announce", ] # Создаем базовую magnet-ссылку с хэшем magnet = f"magnet:?xt=urn:btih:{hash_value}" # Добавляем название в кодировке RFC 3986 для удобства if title: from urllib.parse import quote magnet += f"&dn={quote(title)}" # Добавляем публичные трекеры for tracker in public_trackers: magnet += f"&tr={tracker}" return magnet @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Главная страница с формой поиска""" try: print(f"Home page requested from {request.client.host}") return HTMLResponse("""
Найдите любой фильм или сериал и скачайте его через торрент.
Быстро, удобно и бесплатно!
Ищем фильмы...