Создание единого проекта Lichess Statistics Ecosystem

- Объединены три проекта в один репозиторий
- LichessWebServices - REST API для статистики
- LichessClientTG_bot - Telegram бот с поддержкой множества пользователей
- LichessWebView - Веб-интерфейс для просмотра пользователей и игроков
- Добавлен общий docker-compose.yml для запуска всех сервисов
- Добавлен скрипт start.sh для удобного запуска
- Добавлен README с полным описанием проекта
This commit is contained in:
vrubelroman 2025-10-26 20:23:26 +03:00
commit a08fc8c962
32 changed files with 4990 additions and 0 deletions

520
LichessClientTG_bot/bot.py Normal file
View file

@ -0,0 +1,520 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application, CommandHandler, CallbackQueryHandler,
MessageHandler, filters, ContextTypes, ConversationHandler
)
from config import (
TELEGRAM_BOT_TOKEN, PERIOD_OPTIONS, POLL_INTERVAL,
POLL_TIMEOUT, DROP_PENDING_UPDATES, ALLOWED_UPDATES,
LICHESS_STATS_API_BASE_URL
)
from database import Database
from lichess_api import LichessAPI
from formatters import StatsFormatter
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Conversation states
WAITING_FOR_TOKEN, WAITING_FOR_USERNAME = range(2)
class LichessBot:
def __init__(self):
self.db = Database()
self.lichess_api = LichessAPI()
self.periodic_tasks = {} # Store periodic tasks
self.period_start_times = {} # Store start times for each gamer
self.application = None # Will be set when application is created
async def start_existing_periodic_tasks(self):
"""Start periodic tasks for all user-gamer pairs that have periods set"""
try:
gamers_with_periods = self.db.get_all_gamers_with_periods()
logger.info(f"Found {len(gamers_with_periods)} user-gamer pairs with periodic notifications")
for gamer in gamers_with_periods:
if gamer['period_minutes'] > 0:
user_id = gamer['user_id']
# Start periodic task with user_id and gamer
await self.start_periodic_task(gamer, user_id, gamer['period_minutes'])
logger.info(f"Started periodic task for {gamer['username']} (user {user_id}) with period {gamer['period_minutes']} minutes")
except Exception as e:
logger.error(f"Error starting existing periodic tasks: {e}")
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command handler"""
# Register user in database
user = update.effective_user
self.db.add_or_get_telegram_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name
)
await update.message.reply_text(
"🎯 Добро пожаловать в Lichess Statistics Bot!\n\n"
"Доступные команды:\n"
"/adduser - Добавить игрока Lichess для отслеживания\n"
"/getgamers - Выбрать активного игрока\n"
"/today - Статистика за сегодня\n"
"/yesterday - Статистика за вчера\n"
"/week - Статистика за неделю\n"
"/setperiod - Настроить периодические уведомления"
)
async def adduser_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start adduser command"""
await update.message.reply_text(
"🔑 Введите ваш Lichess API token.\n"
"Если у вас нет токена, введите 0"
)
return WAITING_FOR_TOKEN
async def handle_token(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle token input"""
token = update.message.text.strip()
user_id = update.effective_user.id
if token == "0":
await update.message.reply_text("👤 Введите ваш Lichess username:")
return WAITING_FOR_USERNAME
else:
# Get username from token
profile = await self.lichess_api.get_user_profile(token)
if profile:
username = profile.get('username')
if username:
# Add gamer to database
gamer_id = self.db.add_gamer(username, token)
# Link user to gamer
self.db.add_user_gamer(user_id, gamer_id)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
await update.message.reply_text(
f"✅ Игрок {username} успешно добавлен!"
)
return ConversationHandler.END
else:
await update.message.reply_text(
"Не удалось получить username из токена. Попробуйте еще раз или введите 0 для ввода username вручную."
)
return WAITING_FOR_TOKEN
else:
await update.message.reply_text(
"❌ Неверный токен. Попробуйте еще раз или введите 0 для ввода username вручную."
)
return WAITING_FOR_TOKEN
async def handle_username(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle username input"""
username = update.message.text.strip()
user_id = update.effective_user.id
if username:
# Add gamer to database
gamer_id = self.db.add_gamer(username)
# Link user to gamer
self.db.add_user_gamer(user_id, gamer_id)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
await update.message.reply_text(
f"✅ Игрок {username} успешно добавлен!"
)
else:
await update.message.reply_text(
"❌ Username не может быть пустым. Попробуйте еще раз."
)
return WAITING_FOR_USERNAME
return ConversationHandler.END
async def getgamers(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Get all gamers for the current user and allow selection"""
user_id = update.effective_user.id
gamers = self.db.get_user_gamers(user_id)
if not gamers:
await update.message.reply_text("📭 В базе нет игроков. Используйте /adduser для добавления.")
return
# Show loading message
loading_msg = await update.message.reply_text("🔄 Загружаем рейтинги игроков...")
# Prepare data for each gamer
gamers_data = []
for gamer in gamers:
status = "🟢" if gamer['is_active'] else ""
username = gamer['username']
# Get user ratings from Lichess API
ratings_data = await self.lichess_api.get_user_ratings(username)
if ratings_data and 'perfs' in ratings_data:
perfs = ratings_data['perfs']
bullet_rating = perfs.get('bullet', {}).get('rating', 'N/A')
blitz_rating = perfs.get('blitz', {}).get('rating', 'N/A')
rapid_rating = perfs.get('rapid', {}).get('rating', 'N/A')
else:
bullet_rating = blitz_rating = rapid_rating = 'N/A'
# Add period information if period > 0
period_minutes = gamer.get('period_minutes', 0)
period_text = f" · {period_minutes}м" if period_minutes > 0 else ""
gamers_data.append({
'id': gamer['id'],
'status': status,
'username': username,
'bullet': bullet_rating,
'blitz': blitz_rating,
'rapid': rapid_rating,
'period': period_text
})
# Create text message with stats
text_lines = []
for gamer in gamers_data:
text_lines.append(
f"{gamer['status']} <b>{gamer['username']}</b> "
f"{gamer['bullet']} 🔥 {gamer['blitz']} 🐇 {gamer['rapid']}{gamer['period']}"
)
gamers_text = "👥 <b>Выберите активного игрока:</b>\n\n" + "\n".join(text_lines)
# Create simple keyboard with just usernames
keyboard = []
for gamer in gamers_data:
keyboard.append([InlineKeyboardButton(
text=f"{gamer['status']} {gamer['username']}",
callback_data=f"select_{gamer['id']}"
)])
reply_markup = InlineKeyboardMarkup(keyboard)
# Edit the loading message with the results
await loading_msg.edit_text(
gamers_text,
parse_mode='HTML',
reply_markup=reply_markup
)
async def select_gamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle gamer selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
gamer_id = int(query.data.split('_')[1])
# Get user gamers to find the selected one
gamers = self.db.get_user_gamers(user_id)
selected_gamer = next((g for g in gamers if g['id'] == gamer_id), None)
if selected_gamer:
# Set active gamer for this user
self.db.set_user_active_gamer(user_id, gamer_id)
await query.edit_message_text(
f"✅ Активный игрок: {selected_gamer['username']}"
)
else:
await query.edit_message_text("❌ Игрок не найден")
async def get_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE, period: str):
"""Get statistics for a period"""
user_id = update.effective_user.id
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if not active_gamer:
await update.message.reply_text(
"❌ Нет активного игрока. Используйте /getgamers для выбора."
)
return
username = active_gamer['username']
# Get stats based on period
if period == "today":
data = await self.lichess_api.get_today_stats(username)
elif period == "yesterday":
data = await self.lichess_api.get_yesterday_stats(username)
elif period == "week":
data = await self.lichess_api.get_week_stats(username)
else:
await update.message.reply_text("❌ Неизвестный период")
return
# Format and send response
formatted_response = StatsFormatter.format_stats_response(data, username, period)
await update.message.reply_text(formatted_response)
async def today(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Today command"""
await self.get_stats(update, context, "today")
async def yesterday(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Yesterday command"""
await self.get_stats(update, context, "yesterday")
async def week(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Week command"""
await self.get_stats(update, context, "week")
async def setperiod(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Set period command"""
user_id = update.effective_user.id
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if not active_gamer:
await update.message.reply_text(
"❌ Нет активного игрока. Используйте /getgamers для выбора."
)
return
keyboard = []
for period in PERIOD_OPTIONS:
if period == 0:
button_text = "❌ Отключить уведомления"
else:
button_text = f"{period} минут"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"period_{period}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"⏱️ Выберите период для игрока {active_gamer['username']}:\n"
f"📱 Уведомления будут приходить в личные сообщения",
reply_markup=reply_markup
)
async def select_period(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle period selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
period = int(query.data.split('_')[1])
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if active_gamer:
# Set period for this user-gamer pair
self.db.set_user_gamer_period(user_id, active_gamer['id'], period)
if period == 0:
await query.edit_message_text(
f"✅ Уведомления для {active_gamer['username']} отключены"
)
else:
await query.edit_message_text(
f"✅ Период {period} минут установлен для {active_gamer['username']}\n"
f"📱 Уведомления будут приходить в личные сообщения"
)
# Start periodic task for this gamer (send to user's personal messages)
await self.start_periodic_task(active_gamer, user_id, period)
async def start_periodic_task(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Start periodic task for a gamer"""
task_key = f"{gamer['id']}_{user_id}"
# Cancel existing task if any
if task_key in self.periodic_tasks:
self.periodic_tasks[task_key].cancel()
logger.info(f"Cancelled existing periodic task for {gamer['username']}")
# Remove old start time
if task_key in self.period_start_times:
del self.period_start_times[task_key]
# Create new periodic task
task = asyncio.create_task(
self.periodic_check(gamer, user_id, period_minutes)
)
self.periodic_tasks[task_key] = task
async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Periodic check for gamer activity"""
task_key = f"{gamer['id']}_{user_id}"
# Запоминаем время начала отслеживания
start_time = datetime.now()
self.period_start_times[task_key] = start_time
logger.info(f"Started periodic monitoring for {gamer['username']} with {period_minutes} minute intervals")
while True:
try:
# Ждем заданное количество минут
await asyncio.sleep(period_minutes * 60)
# Получаем время начала периода
period_start = self.period_start_times.get(task_key, start_time)
now = datetime.now()
# Рассчитываем timestamps в миллисекундах
since_timestamp = int(period_start.timestamp() * 1000)
until_timestamp = int(now.timestamp() * 1000)
logger.info(f"Checking period for {gamer['username']}: {period_start} to {now}")
logger.info(f"Unix timestamps: since={since_timestamp}, until={until_timestamp}")
# Делаем запросы к API
games_url = f"{LICHESS_STATS_API_BASE_URL}/games/{gamer['username']}/period?since={since_timestamp}&until={until_timestamp}"
logger.info(f"🎮 GAMES API REQUEST: {games_url}")
games_data = await self.lichess_api.get_games_period(
gamer['username'], since_timestamp, until_timestamp
)
logger.info(f"Games API response: {games_data}")
puzzles_data = None
if gamer['token']:
puzzles_url = f"{LICHESS_STATS_API_BASE_URL}/puzzle/period?since={since_timestamp}&until={until_timestamp}&max=150"
logger.info(f"🧩 PUZZLES API REQUEST: {puzzles_url}")
puzzles_data = await self.lichess_api.get_puzzles_period(
gamer['token'], since_timestamp, until_timestamp, max_puzzles=150
)
logger.info(f"Puzzles API response: {puzzles_data}")
else:
logger.info(f"No token for {gamer['username']}, skipping puzzles API call")
# Проверяем наличие реальной активности
has_games = False
total_games = 0
total_losses = 0
if games_data and games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
total_losses = games_data.get('data', {}).get('total', {}).get('losses', 0)
has_games = total_games > 0
has_puzzles = False
if puzzles_data and puzzles_data.get('data'):
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
has_puzzles = total_puzzles > 0
# Детальное логирование для отладки
logger.info(f"Activity check for {gamer['username']}: has_games={has_games}, has_puzzles={has_puzzles}")
if games_data and games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
total_losses = games_data.get('data', {}).get('total', {}).get('losses', 0)
logger.info(f"Games data: total_games={total_games}, total_losses={total_losses}")
if puzzles_data and puzzles_data.get('data'):
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
logger.info(f"Puzzles data: total_attempts={total_puzzles}")
# Отправляем уведомление только если есть реальная активность
if has_games or has_puzzles:
try:
notification = StatsFormatter.format_period_notification(
gamer['username'], games_data, puzzles_data, period_minutes
)
if self.application:
try:
await self.application.bot.send_message(
chat_id=user_id,
text=notification
)
logger.info(f"Sent periodic notification for {gamer['username']} to user {user_id}")
# Обновляем время начала только после успешной отправки уведомления
self.period_start_times[task_key] = now
except Exception as e:
logger.error(f"Failed to send notification to user {user_id}: {e}")
# Не обновляем время начала при ошибке отправки
except Exception as e:
logger.error(f"Error formatting notification for {gamer['username']}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Не обновляем время начала при ошибке форматирования
else:
logger.info(f"No activity found for {gamer['username']} in the last {period_minutes} minutes")
# Обновляем время начала даже если нет активности, чтобы не зацикливаться
self.period_start_times[task_key] = now
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in periodic check: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
def setup_handlers(self, application: Application):
"""Setup all handlers"""
self.application = application # Store application reference
# Conversation handler for adduser
adduser_conv = ConversationHandler(
entry_points=[CommandHandler("adduser", self.adduser_start)],
states={
WAITING_FOR_TOKEN: [MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_token)],
WAITING_FOR_USERNAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_username)],
},
fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)]
)
# Add all handlers
application.add_handler(CommandHandler("start", self.start))
application.add_handler(adduser_conv)
application.add_handler(CommandHandler("getgamers", self.getgamers))
application.add_handler(CommandHandler("today", self.today))
application.add_handler(CommandHandler("yesterday", self.yesterday))
application.add_handler(CommandHandler("week", self.week))
application.add_handler(CommandHandler("setperiod", self.setperiod))
# Callback handlers
application.add_handler(CallbackQueryHandler(self.select_gamer, pattern="^select_"))
application.add_handler(CallbackQueryHandler(self.select_period, pattern="^period_"))
def main():
"""Main function"""
bot = LichessBot()
# Create application with Long Polling configuration
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Setup handlers
bot.setup_handlers(application)
# Set application reference for periodic tasks
bot.application = application
# Start periodic tasks for existing gamers (will be called after application starts)
async def post_init(app):
await bot.start_existing_periodic_tasks()
application.post_init = post_init
# Start the bot with Long Polling
logger.info("Starting Lichess Statistics Bot with Long Polling...")
application.run_polling(
poll_interval=POLL_INTERVAL,
timeout=POLL_TIMEOUT,
drop_pending_updates=DROP_PENDING_UPDATES,
allowed_updates=ALLOWED_UPDATES
)
if __name__ == '__main__':
main()