LichessStatTgWeb/LichessWebServices/lichess_client.py

266 lines
13 KiB
Python
Raw Permalink Normal View History

"""
Lichess Statistics API - Клиент для работы с Lichess API
Этот модуль содержит класс LichessClient для взаимодействия с официальным API Lichess.
Обеспечивает:
- Получение активности пользователей
- Получение игр за период
- Получение активности по решению задач (пазлов)
- Обработку ошибок и таймаутов
- Парсинг NDJSON формата
Автор: Lichess Web Services Team
Версия: 1.0.0
"""
import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
import json
2025-11-18 15:10:19 +03:00
from rate_limiter import get_rate_limiter
# Настройка логирования для модуля
logger = logging.getLogger(__name__)
class LichessClient:
"""
Клиент для взаимодействия с Lichess API.
Предоставляет методы для получения различных данных от Lichess:
- Активность пользователей
- Игры за период
- Статистика решения задач
Все методы асинхронные и используют httpx для HTTP запросов.
"""
def __init__(self):
"""
Инициализация клиента Lichess API.
Создает HTTP клиент с таймаутом 30 секунд для всех запросов.
"""
self.base_url = "https://lichess.org/api" # Базовый URL Lichess API
self.client = httpx.AsyncClient(timeout=30.0) # HTTP клиент с таймаутом
2025-11-18 15:10:19 +03:00
self.rate_limiter = get_rate_limiter()
async def get_user_activity(self, username: str) -> Optional[List[Dict[str, Any]]]:
"""
Получает активность пользователя за последние 7 активных дней.
Args:
username: Имя пользователя на Lichess
Returns:
Список активностей пользователя или None, если пользователь не найден
Raises:
httpx.HTTPStatusError: При ошибках HTTP (кроме 404)
Exception: При других ошибках
"""
2025-11-20 03:14:06 +03:00
logger.info(f"🔍 LichessClient.get_user_activity: username={username}")
try:
2025-11-18 15:10:19 +03:00
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения активности пользователя
url = f"{self.base_url}/user/{username}/activity"
2025-11-20 03:14:06 +03:00
logger.info(f"🔍 Making request to Lichess API: {url}")
# Выполняем HTTP GET запрос
response = await self.client.get(url)
2025-11-20 03:14:06 +03:00
logger.info(f"🔍 Lichess API response status: {response.status_code} for {username}")
response.raise_for_status() # Проверяем статус ответа
# Возвращаем JSON данные
2025-11-20 03:14:06 +03:00
result = response.json()
logger.info(f"🔍 Received activity data for {username}: {len(result) if isinstance(result, list) else 'not a list'} items")
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
# Пользователь не найден - это нормальная ситуация
2025-11-20 03:14:06 +03:00
logger.warning(f"⚠️ Пользователь {username} не найден (404)")
return None
else:
# Другие HTTP ошибки - логируем и пробрасываем
2025-11-20 03:14:06 +03:00
logger.error(f"❌ HTTP ошибка при получении активности пользователя {username}: status={e.response.status_code}, error={e}")
raise
except Exception as e:
# Обрабатываем все остальные ошибки
2025-11-20 03:14:06 +03:00
logger.error(f"❌ Ошибка при получении активности пользователя {username}: {e}")
import traceback
logger.error(traceback.format_exc())
raise
async def get_games_of_period(self, username: str, since_ms: int, until_ms: int, rated_only: bool = True) -> Optional[List[Dict[str, Any]]]:
"""
Получает игры пользователя за определенный период.
Lichess API возвращает игры в формате NDJSON (Newline Delimited JSON),
где каждая строка содержит JSON объект с информацией об игре.
Args:
username: Имя пользователя на Lichess
since_ms: Начало периода в миллисекундах (Unix timestamp * 1000)
until_ms: Конец периода в миллисекундах (Unix timestamp * 1000)
rated_only: Только рейтинговые игры (по умолчанию True)
Returns:
Список игр в формате JSON или None при ошибке
Raises:
httpx.HTTPStatusError: При ошибках HTTP
Exception: При других ошибках
"""
try:
2025-11-18 15:10:19 +03:00
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения игр пользователя
url = f"{self.base_url}/games/user/{username}"
2026-02-05 01:38:33 +03:00
# Параметры запроса. Параметр 'rated' в API Lichess не передаём:
# при rated=true API часто возвращает 0 игр даже для рейтинговых партий.
# Фильтрация по рейтинговости делается в stats_service после получения списка.
params = {
'since': since_ms, # Начало периода
'until': until_ms, # Конец периода
'max': 1000 # Максимум игр за запрос (лимит Lichess API)
}
# Заголовки для получения NDJSON формата
headers = {
'Accept': 'application/x-ndjson' # Запрашиваем NDJSON формат
}
logger.info(f"Запрос игр для {username} с {since_ms} по {until_ms}")
# Выполняем HTTP GET запрос
response = await self.client.get(url, params=params, headers=headers)
response.raise_for_status() # Проверяем статус ответа
# Парсим NDJSON (Newline Delimited JSON)
# Каждая строка содержит отдельный JSON объект
games = []
content = response.text.strip()
if content:
for line in content.split('\n'):
if line.strip():
try:
# Парсим каждую строку как отдельный JSON объект
game = json.loads(line)
games.append(game)
except json.JSONDecodeError as e:
# Логируем ошибки парсинга, но продолжаем обработку
logger.warning(f"Ошибка парсинга JSON строки: {e}")
continue
logger.info(f"Получено {len(games)} игр для пользователя {username}")
return games
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
# Пользователь не найден - это нормальная ситуация
logger.warning(f"Пользователь {username} не найден")
return None
else:
# Другие HTTP ошибки - логируем и пробрасываем
logger.error(f"HTTP ошибка при получении игр пользователя {username}: {e}")
raise
except Exception as e:
# Обрабатываем все остальные ошибки
logger.error(f"Ошибка при получении игр пользователя {username}: {e}")
raise
async def get_puzzle_activity(self, token: str, max_puzzles: int = 50) -> Optional[List[Dict[str, Any]]]:
"""
Получает активность пользователя по решению задач (пазлов).
Требует авторизации через Bearer токен. Lichess API возвращает данные
в формате NDJSON (Newline Delimited JSON).
Args:
token: Bearer токен авторизации от Lichess
max_puzzles: Максимальное количество задач для получения (по умолчанию 50)
Returns:
Список активностей по задачам в формате JSON или None при ошибке
Raises:
httpx.HTTPStatusError: При ошибках HTTP
Exception: При других ошибках
"""
try:
2025-11-18 15:10:19 +03:00
# Rate limiting: ждем если нужно
await self.rate_limiter.wait_if_needed()
# Формируем URL для получения активности по задачам
url = f"{self.base_url}/puzzle/activity"
# Параметры запроса
params = {
'max': max_puzzles # Максимальное количество задач
}
# Заголовки с авторизацией и форматом данных
headers = {
'Authorization': f'Bearer {token}', # Bearer токен авторизации
'Accept': 'application/x-ndjson' # Запрашиваем NDJSON формат
}
logger.info(f"Запрос активности по задачам, max={max_puzzles}")
# Выполняем HTTP GET запрос
response = await self.client.get(url, params=params, headers=headers)
response.raise_for_status() # Проверяем статус ответа
# Парсим NDJSON (Newline Delimited JSON)
# Каждая строка содержит отдельный JSON объект с активностью
activities = []
content = response.text.strip()
if content:
for line in content.split('\n'):
if line.strip():
try:
# Парсим каждую строку как отдельный JSON объект
activity = json.loads(line)
activities.append(activity)
except json.JSONDecodeError as e:
# Логируем ошибки парсинга, но продолжаем обработку
logger.warning(f"Ошибка парсинга JSON строки: {e}")
continue
logger.info(f"Получено {len(activities)} активностей по задачам")
return activities
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
# Неверный токен авторизации
logger.warning("Неверный токен авторизации")
return None
elif e.response.status_code == 403:
# Доступ запрещен (недостаточно прав)
logger.warning("Доступ запрещен")
return None
else:
# Другие HTTP ошибки - логируем и пробрасываем
logger.error(f"HTTP ошибка при получении активности по задачам: {e}")
raise
except Exception as e:
# Обрабатываем все остальные ошибки
logger.error(f"Ошибка при получении активности по задачам: {e}")
raise
async def close(self):
"""
Закрывает HTTP клиент.
Освобождает ресурсы и корректно закрывает соединения.
Должен вызываться при завершении работы с клиентом.
"""
await self.client.aclose()