""" Lichess Statistics API - Сервис обработки статистики Этот модуль содержит класс StatsService для обработки и агрегации данных от Lichess API. Включает в себя: - Парсинг и обработку активности пользователей - Агрегацию статистики игр по режимам - Обработку статистики решения задач (пазлов) - Фильтрацию данных по временным периодам - Расчет рейтинговых изменений Автор: Lichess Web Services Team Версия: 1.0.0 """ from typing import List, Dict, Any, Optional from datetime import datetime, timedelta, date from lichess_client import LichessClient from models import UserStats, TaskStats, GameModeStats, GamesStats, ActivityResponse, GameStats, GamesOfPeriodStats, GamesOfPeriodResponse, PuzzleStats, PuzzleOfPeriodResponse import logging # Настройка логирования для модуля logger = logging.getLogger(__name__) class StatsService: """ Сервис для обработки и агрегации статистики Lichess. Предоставляет методы для: - Получения статистики за разные периоды (сегодня, вчера, неделя) - Обработки игр за произвольный период - Анализа активности по решению задач - Агрегации данных по режимам игр """ def __init__(self): """ Инициализация сервиса статистики. Создает экземпляр LichessClient для взаимодействия с API. """ self.lichess_client = LichessClient() # ============================================================================= # ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ДЛЯ ОБРАБОТКИ ДАННЫХ # ============================================================================= def _parse_lichess_interval(self, interval: Dict[str, int]) -> date: """ Парсит дату из временного интервала Lichess. Lichess API возвращает временные интервалы в миллисекундах, этот метод конвертирует их в объект date. Args: interval: Словарь с ключом 'start' содержащим timestamp в миллисекундах Returns: Объект date с датой активности """ # Lichess использует миллисекунды, конвертируем в секунды timestamp = interval['start'] / 1000 return datetime.fromtimestamp(timestamp).date() def _is_date_in_range(self, target_date: date, activity_date: date, days_back: int) -> bool: """ Проверяет, попадает ли дата активности в нужный диапазон. Args: target_date: Целевая дата (обычно сегодня) activity_date: Дата активности из Lichess days_back: Количество дней назад для проверки Returns: True, если дата активности попадает в диапазон """ today = date.today() start_date = today - timedelta(days=days_back-1) return start_date <= activity_date <= today def _calculate_rating_change(self, mode_data: Dict[str, Any]) -> int: """ Вычисляет изменение рейтинга для режима игры. Args: mode_data: Данные режима игры из Lichess API Returns: Изменение рейтинга (может быть отрицательным) """ rp = mode_data.get('rp', {}) before = rp.get('before', 0) # Рейтинг до периода after = rp.get('after', 0) # Рейтинг после периода return after - before def _get_final_rating(self, mode_data: Dict[str, Any]) -> int: """ Получает финальный рейтинг для режима игры. Args: mode_data: Данные режима игры из Lichess API Returns: Финальный рейтинг игрока в данном режиме """ rp = mode_data.get('rp', {}) return rp.get('after', 0) def _count_game_results(self, mode_data: Dict[str, Any]) -> Dict[str, int]: """ Подсчитывает результаты игр для режима (победы, поражения, ничьи). Args: mode_data: Данные режима игры из Lichess API Returns: Словарь с количеством побед, поражений и ничьих """ wins = mode_data.get('win', 0) # Количество побед losses = mode_data.get('loss', 0) # Количество поражений draws = mode_data.get('draw', 0) # Количество ничьих return {"wins": wins, "losses": losses, "draws": draws} def _process_games_by_mode(self, games_data: Dict[str, Any]) -> Dict[str, GameModeStats]: """ Обрабатывает игры по режимам (bullet, blitz, rapid). Преобразует сырые данные от Lichess API в структурированную статистику по каждому режиму игры. Args: games_data: Сырые данные игр от Lichess API Returns: Словарь с статистикой по каждому режиму игры """ result = {} # Инициализируем все режимы нулевыми значениями # Это гарантирует, что все режимы будут присутствовать в результате for mode in ["bullet", "blitz", "rapid", "classical"]: result[mode] = GameModeStats( games_played=0, rating_change=0, final_rating=0, wins=0, losses=0, draws=0 ) # Обрабатываем данные по режимам for mode_name, mode_data in games_data.items(): if mode_name in result: # Извлекаем результаты игр wins = mode_data.get('win', 0) losses = mode_data.get('loss', 0) draws = mode_data.get('draw', 0) games_played = wins + losses + draws # Для недельной статистики используем предвычисленные значения if 'rating_change' in mode_data: rating_change = mode_data['rating_change'] final_rating = mode_data['final_rating'] else: # Для дневной статистики вычисляем как обычно rating_change = self._calculate_rating_change(mode_data) final_rating = self._get_final_rating(mode_data) # Создаем объект статистики для режима result[mode_name] = GameModeStats( games_played=games_played, rating_change=rating_change, final_rating=final_rating, wins=wins, losses=losses, draws=draws ) return result def _process_tasks(self, puzzles_data: Dict[str, Any]) -> TaskStats: """Обрабатывает статистику задач (пазлов)""" score = puzzles_data.get('score', {}) wins = score.get('win', 0) losses = score.get('loss', 0) draws = score.get('draw', 0) total = wins + losses + draws solved = wins unsolved = losses + draws return TaskStats( total=total, solved=solved, unsolved=unsolved ) # ============================================================================= # ПУБЛИЧНЫЕ МЕТОДЫ ДЛЯ ПОЛУЧЕНИЯ СТАТИСТИКИ # ============================================================================= async def get_today_stats(self, username: str) -> ActivityResponse: """ Получает статистику за сегодняшний день. Анализирует активность пользователя и возвращает статистику игр и задач за сегодняшний день. Args: username: Имя пользователя на Lichess Returns: ActivityResponse с данными статистики или сообщением об ошибке """ logger.info(f"🔍 StatsService.get_today_stats: username={username}") try: logger.info(f"🔍 Calling lichess_client.get_user_activity for {username}") activity_data = await self.lichess_client.get_user_activity(username) logger.info(f"🔍 Activity data received: {activity_data is not None}, type={type(activity_data)}") if not activity_data: logger.warning(f"⚠️ No activity data for {username}") return ActivityResponse( message=f"Пользователь {username} не найден или неактивен" ) logger.info(f"🔍 Activity data length: {len(activity_data) if isinstance(activity_data, list) else 'not a list'}") today = date.today() logger.info(f"🔍 Looking for activity for today: {today}") # Ищем активность за сегодня # Lichess возвращает интервалы, которые могут начинаться вчера, но включать активность сегодня # Для "сегодня" нужен интервал, который НАЧИНАЕТСЯ сегодня (start_date == today) # Если интервал начинается вчера и заканчивается сегодня, это интервал за вчера (еще не закрыт) today_activity = None for activity in activity_data: interval = activity['interval'] # Парсим начало и конец интервала start_timestamp = interval['start'] / 1000 end_timestamp = interval['end'] / 1000 start_date = datetime.fromtimestamp(start_timestamp).date() end_date = datetime.fromtimestamp(end_timestamp).date() logger.debug(f"🔍 Checking activity interval: {start_date} to {end_date} vs today: {today}") # Проверяем, что интервал НАЧИНАЕТСЯ СЕГОДНЯ # Это гарантирует, что мы берем интервал за сегодняшний день if start_date == today: today_activity = activity logger.info(f"✅ Found today activity for {username} (interval starts today: {start_date} to {end_date})") break if not today_activity: logger.info(f"ℹ️ No activity found for today ({today}) for {username}") return ActivityResponse( message=f"Активности за сегодняшний день ({today}) не было" ) # Обрабатываем данные logger.info(f"🔍 Processing activity data for {username}") games_stats = self._process_games_by_mode(today_activity.get('games', {})) tasks_stats = self._process_tasks(today_activity.get('puzzles', {})) user_stats = UserStats( username=username, tasks=tasks_stats, games=GamesStats(**games_stats) ) logger.info(f"✅ Successfully processed stats for {username}") return ActivityResponse( message="Статистика за сегодняшний день", data=user_stats ) except Exception as e: logger.error(f"❌ Ошибка при получении статистики за сегодня для {username}: {e}") import traceback logger.error(traceback.format_exc()) return ActivityResponse( message=f"Ошибка при получении статистики: {str(e)}" ) async def get_yesterday_stats(self, username: str) -> ActivityResponse: """ Получает статистику за вчерашний день. Анализирует активность пользователя и возвращает статистику игр и задач за вчерашний день. Args: username: Имя пользователя на Lichess Returns: ActivityResponse с данными статистики или сообщением об ошибке """ try: activity_data = await self.lichess_client.get_user_activity(username) if not activity_data: return ActivityResponse( message=f"Пользователь {username} не найден или неактивен" ) today = date.today() yesterday = today - timedelta(days=1) logger.info(f"🔍 Looking for activity for yesterday: {yesterday} (today: {today})") # Ищем активность за вчера # Lichess возвращает интервалы, которые могут начинаться позавчера, но включать активность вчера # Для "вчера" нужен интервал, который: # 1. ЗАКАНЧИВАЕТСЯ вчера (end_date == yesterday) - закрытый интервал за вчера # 2. ИЛИ начинается вчера и заканчивается сегодня (start_date == yesterday and end_date == today) - открытый интервал за вчера yesterday_activity = None for activity in activity_data: interval = activity['interval'] # Парсим начало и конец интервала start_timestamp = interval['start'] / 1000 end_timestamp = interval['end'] / 1000 start_date = datetime.fromtimestamp(start_timestamp).date() end_date = datetime.fromtimestamp(end_timestamp).date() logger.debug(f"🔍 Checking activity interval: {start_date} to {end_date} vs yesterday: {yesterday}, today: {today}") # Проверяем два случая: # 1. Интервал заканчивается вчера (закрытый интервал за вчера) # 2. Интервал начинается вчера и заканчивается сегодня (открытый интервал за вчера, еще не закрыт) if end_date == yesterday or (start_date == yesterday and end_date == today): yesterday_activity = activity logger.info(f"✅ Found yesterday activity for {username} (interval: {start_date} to {end_date})") break if not yesterday_activity: logger.info(f"ℹ️ No activity found for yesterday ({yesterday}) for {username}") return ActivityResponse( message=f"Активности за вчерашний день ({yesterday}) не было" ) # Обрабатываем данные games_stats = self._process_games_by_mode(yesterday_activity.get('games', {})) tasks_stats = self._process_tasks(yesterday_activity.get('puzzles', {})) user_stats = UserStats( username=username, tasks=tasks_stats, games=GamesStats(**games_stats) ) return ActivityResponse( message="Статистика за вчерашний день", data=user_stats ) except Exception as e: logger.error(f"Ошибка при получении статистики за вчера: {e}") return ActivityResponse( message=f"Ошибка при получении статистики: {str(e)}" ) async def get_week_stats(self, username: str) -> ActivityResponse: """ Получает статистику за последние 7 дней. Анализирует активность пользователя и возвращает агрегированную статистику игр и задач за последние 7 дней. Args: username: Имя пользователя на Lichess Returns: ActivityResponse с данными статистики или сообщением об ошибке """ try: activity_data = await self.lichess_client.get_user_activity(username) if not activity_data: return ActivityResponse( message=f"Пользователь {username} не найден или неактивен" ) today = date.today() week_activities = [] # Фильтруем активности за последние 7 дней for activity in activity_data: activity_date = self._parse_lichess_interval(activity['interval']) if self._is_date_in_range(activity_date, activity_date, 7): week_activities.append(activity) if not week_activities: return ActivityResponse( message="Активности за последние 7 дней не было" ) # Объединяем все игры и задачи за неделю combined_games = {} combined_puzzles = {} for activity in week_activities: # Суммируем игры по режимам for mode, mode_data in activity.get('games', {}).items(): if mode not in combined_games: combined_games[mode] = { 'win': 0, 'loss': 0, 'draw': 0, 'rating_change': 0, # Суммируем изменения рейтинга 'final_rating': 0 # Берем последний рейтинг } combined_games[mode]['win'] += mode_data.get('win', 0) combined_games[mode]['loss'] += mode_data.get('loss', 0) combined_games[mode]['draw'] += mode_data.get('draw', 0) # Суммируем изменения рейтинга (delta = after - before) rp = mode_data.get('rp', {}) before = rp.get('before', 0) after = rp.get('after', 0) delta = after - before combined_games[mode]['rating_change'] += delta # Для финального рейтинга берем последнее значение combined_games[mode]['final_rating'] = after # Суммируем задачи puzzles_score = activity.get('puzzles', {}).get('score', {}) if not combined_puzzles: combined_puzzles = {'score': {'win': 0, 'loss': 0, 'draw': 0}} combined_puzzles['score']['win'] += puzzles_score.get('win', 0) combined_puzzles['score']['loss'] += puzzles_score.get('loss', 0) combined_puzzles['score']['draw'] += puzzles_score.get('draw', 0) # Обрабатываем данные games_stats = self._process_games_by_mode(combined_games) tasks_stats = self._process_tasks(combined_puzzles) user_stats = UserStats( username=username, tasks=tasks_stats, games=GamesStats(**games_stats) ) return ActivityResponse( message="Статистика за последние 7 дней", data=user_stats ) except Exception as e: logger.error(f"Ошибка при получении статистики за неделю: {e}") return ActivityResponse( message=f"Ошибка при получении статистики: {str(e)}" ) def _determine_game_result(self, game: Dict[str, Any], username: str) -> str: """ Определяет результат игры для указанного пользователя Returns: 'win', 'loss', 'draw' или 'unknown' """ winner = game.get('winner') players = game.get('players', {}) # Определяем цвет игрока (сравниваем имена без учета регистра) username_lower = username.lower() user_color = None white_name = players.get('white', {}).get('user', {}).get('name', '') black_name = players.get('black', {}).get('user', {}).get('name', '') if white_name.lower() == username_lower: user_color = 'white' elif black_name.lower() == username_lower: user_color = 'black' if user_color is None: return 'unknown' # Определяем результат if winner is None: return 'draw' elif winner == user_color: return 'win' else: return 'loss' def _get_rating_change(self, game: Dict[str, Any], username: str) -> int: """ Получает изменение рейтинга для указанного пользователя """ players = game.get('players', {}) # Определяем цвет игрока (сравниваем имена без учета регистра) username_lower = username.lower() user_color = None white_name = players.get('white', {}).get('user', {}).get('name', '') black_name = players.get('black', {}).get('user', {}).get('name', '') if white_name.lower() == username_lower: user_color = 'white' elif black_name.lower() == username_lower: user_color = 'black' if user_color is None: return 0 # Получаем изменение рейтинга rating_diff = players.get(user_color, {}).get('ratingDiff') return rating_diff if rating_diff is not None else 0 def _get_rating_info(self, game: Dict[str, Any], username: str) -> tuple[int, int]: """ Получает изменение рейтинга и итоговый рейтинг для указанного пользователя Returns: tuple: (rating_change, final_rating) """ players = game.get('players', {}) # Определяем цвет игрока (без учета регистра) user_color = None white_user = players.get('white', {}).get('user', {}) black_user = players.get('black', {}).get('user', {}) if white_user.get('name', '').lower() == username.lower(): user_color = 'white' elif black_user.get('name', '').lower() == username.lower(): user_color = 'black' if user_color is None: return 0, 0 # Получаем рейтинг до партии и изменение рейтинга player_data = players.get(user_color, {}) rating_before = player_data.get('rating', 0) rating_diff = player_data.get('ratingDiff', 0) # Вычисляем итоговый рейтинг: rating + ratingDiff final_rating = rating_before + rating_diff return rating_diff, final_rating def _process_games_of_period(self, games: List[Dict[str, Any]], username: str) -> GamesOfPeriodStats: """ Обрабатывает игры за период и возвращает статистику """ # Инициализируем статистику для всех типов игр stats = { 'bullet': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None}, 'blitz': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None}, 'rapid': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None}, 'classical': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None}, 'correspondence': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None}, 'total': {'games_played': 0, 'wins': 0, 'losses': 0, 'draws': 0, 'rating_change': 0, 'rating': None} } # Сортируем игры по времени создания (от старых к новым) для правильного вычисления итогового рейтинга sorted_games = sorted(games, key=lambda x: x.get('createdAt', 0)) logger.info(f"🔍 Processing {len(sorted_games)} games for {username}") for idx, game in enumerate(sorted_games): speed = game.get('speed', 'unknown') game_id = game.get('id', 'unknown') logger.debug(f"🔍 Game {idx+1}/{len(sorted_games)}: id={game_id}, speed={speed}, username={username}") # Пропускаем неизвестные типы игр if speed not in stats: logger.warning(f"⚠️ Skipping game {game_id}: unknown speed '{speed}' (expected: bullet, blitz, rapid, classical, correspondence)") continue # Определяем результат игры result = self._determine_game_result(game, username) if result == 'unknown': players = game.get('players', {}) white_name = players.get('white', {}).get('user', {}).get('name', 'N/A') black_name = players.get('black', {}).get('user', {}).get('name', 'N/A') logger.warning(f"⚠️ Skipping game {game_id}: cannot determine result for username '{username}' (white: '{white_name}', black: '{black_name}')") continue logger.debug(f"✅ Processing game {game_id}: speed={speed}, result={result}") # Получаем изменение рейтинга и итоговый рейтинг rating_change, final_rating = self._get_rating_info(game, username) # Обновляем статистику для конкретного типа stats[speed]['games_played'] += 1 if result == 'win': stats[speed]['wins'] += 1 elif result == 'loss': stats[speed]['losses'] += 1 elif result == 'draw': stats[speed]['draws'] += 1 stats[speed]['rating_change'] += rating_change # Сохраняем итоговый рейтинг после последней игры if final_rating is not None: stats[speed]['rating'] = final_rating # Обновляем общую статистику stats['total']['games_played'] += 1 if result == 'win': stats['total']['wins'] += 1 elif result == 'loss': stats['total']['losses'] += 1 elif result == 'draw': stats['total']['draws'] += 1 stats['total']['rating_change'] += rating_change # Для общей статистики берем рейтинг из последней игры (любого типа) if final_rating is not None: stats['total']['rating'] = final_rating # Создаем объекты GameStats, устанавливая rating только для режимов с играми def create_game_stats(mode_stats): # Устанавливаем rating только если были игры if mode_stats['games_played'] > 0 and mode_stats['rating'] is not None: return GameStats(**mode_stats) else: # Убираем rating для режимов без игр mode_stats_copy = mode_stats.copy() mode_stats_copy['rating'] = None return GameStats(**mode_stats_copy) return GamesOfPeriodStats( bullet=create_game_stats(stats['bullet']), blitz=create_game_stats(stats['blitz']), rapid=create_game_stats(stats['rapid']), classical=create_game_stats(stats['classical']), correspondence=create_game_stats(stats['correspondence']), total=create_game_stats(stats['total']) ) async def get_games_of_period(self, username: str, since_timestamp: int, until_timestamp: int, rated_only: bool = True) -> GamesOfPeriodResponse: """ Получает статистику игр пользователя за определенный период. Получает игры от Lichess API за указанный период, обрабатывает их и возвращает агрегированную статистику по режимам игр. Args: username: Имя пользователя на Lichess since_timestamp: Начало периода (Unix timestamp в секундах) until_timestamp: Конец периода (Unix timestamp в секундах) rated_only: Только рейтинговые игры (по умолчанию True) Returns: GamesOfPeriodResponse с статистикой игр """ try: # Конвертируем timestamp в миллисекунды для API Lichess since_ms = since_timestamp * 1000 until_ms = until_timestamp * 1000 # Получаем игры games = await self.lichess_client.get_games_of_period(username, since_ms, until_ms, rated_only) if games is None: return GamesOfPeriodResponse( message=f"Пользователь {username} не найден", username=username, period_start=since_timestamp, period_end=until_timestamp, games_count=0 ) if not games: return GamesOfPeriodResponse( message=f"Игры за указанный период не найдены", username=username, period_start=since_timestamp, period_end=until_timestamp, games_count=0 ) # Обрабатываем игры games_stats = self._process_games_of_period(games, username) # Определяем время самой старой партии (в секундах) earliest_game_ts = None try: if games: earliest_game_ts = min(g.get('createdAt', 0) for g in games if isinstance(g.get('createdAt', None), int)) if earliest_game_ts: earliest_game_ts = earliest_game_ts // 1000 except Exception: earliest_game_ts = None return GamesOfPeriodResponse( message="Статистика игр за период", username=username, period_start=since_timestamp, period_end=until_timestamp, games_count=len(games), earliest_game_ts=earliest_game_ts, data=games_stats ) except Exception as e: logger.error(f"Ошибка при получении статистики игр за период: {e}") return GamesOfPeriodResponse( message=f"Ошибка при получении статистики: {str(e)}", username=username, period_start=since_timestamp, period_end=until_timestamp, games_count=0 ) def _process_puzzle_activities(self, activities: List[Dict[str, Any]], since_ms: int, until_ms: int) -> PuzzleStats: """ Обрабатывает активности по задачам и возвращает статистику за период """ puzzles_in_period = [] for i, activity in enumerate(activities): # Lichess API использует поле 'date' вместо 'createdAt' created_at = activity.get('date') if created_at is None: if i < 3: # Логируем только первые 3 logger.warning(f"Активность {i} не имеет date: {list(activity.keys())}") continue # Логируем первую активность для отладки if i == 0: logger.info(f"Первая активность: date={created_at}, since={since_ms}, until={until_ms}") # Фильтруем по периоду [since_ms, until_ms) if since_ms <= created_at < until_ms: puzzles_in_period.append(activity) logger.info(f"Найдено {len(puzzles_in_period)} активностей в периоде из {len(activities)}") # Подсчитываем статистику total_attempts = len(puzzles_in_period) solved = sum(1 for activity in puzzles_in_period if activity.get('win', False)) failed = total_attempts - solved success_rate = (solved / total_attempts * 100) if total_attempts > 0 else 0.0 return PuzzleStats( total_attempts=total_attempts, solved=solved, failed=failed, success_rate=round(success_rate, 2) ) async def get_puzzle_of_period(self, token: str, since_ms: int, until_ms: int, max_puzzles: int = 50) -> PuzzleOfPeriodResponse: """ Получает статистику решения задач за определенный период. Получает активность по решению задач от Lichess API, фильтрует по периоду и возвращает агрегированную статистику решения задач. Args: token: Bearer токен авторизации от Lichess since_ms: Начало периода (Unix timestamp в миллисекундах) until_ms: Конец периода (Unix timestamp в миллисекундах) max_puzzles: Максимальное количество задач для получения (по умолчанию 50) Returns: PuzzleOfPeriodResponse с статистикой решения задач """ try: # Получаем активности по задачам activities = await self.lichess_client.get_puzzle_activity(token, max_puzzles) if activities is None: return PuzzleOfPeriodResponse( message="Неверный токен авторизации или доступ запрещен", period_start=since_ms, period_end=until_ms, max_puzzles=max_puzzles, puzzles_in_period=0 ) if not activities: return PuzzleOfPeriodResponse( message="Активности по задачам не найдены", period_start=since_ms, period_end=until_ms, max_puzzles=max_puzzles, puzzles_in_period=0 ) # Обрабатываем активности puzzle_stats = self._process_puzzle_activities(activities, since_ms, until_ms) return PuzzleOfPeriodResponse( message="Статистика решения задач за период", period_start=since_ms, period_end=until_ms, max_puzzles=max_puzzles, puzzles_in_period=puzzle_stats.total_attempts, data=puzzle_stats ) except Exception as e: logger.error(f"Ошибка при получении статистики решения задач за период: {e}") return PuzzleOfPeriodResponse( message=f"Ошибка при получении статистики: {str(e)}", period_start=since_ms, period_end=until_ms, max_puzzles=max_puzzles, puzzles_in_period=0 ) async def close(self): """ Закрывает сервис статистики. Освобождает ресурсы и корректно закрывает HTTP клиент. Должен вызываться при завершении работы с сервисом. """ await self.lichess_client.close()