LichessStatTgWeb/LichessClientTG_bot/bot.py
vrubelroman 2c87dc60f7 Исправление хранения токенов: токены теперь в user_gamers, а не в gamers
- Добавлено поле token в таблицу user_gamers
- Токены теперь привязываются к паре пользователь-игрок, а не глобально
- Обновлены методы работы с токенами
- Теперь каждый пользователь может иметь свой токен для одного игрока
2025-10-26 20:35:23 +03:00

520 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application, CommandHandler, CallbackQueryHandler,
MessageHandler, filters, ContextTypes, ConversationHandler
)
from config import (
TELEGRAM_BOT_TOKEN, PERIOD_OPTIONS, POLL_INTERVAL,
POLL_TIMEOUT, DROP_PENDING_UPDATES, ALLOWED_UPDATES,
LICHESS_STATS_API_BASE_URL
)
from database import Database
from lichess_api import LichessAPI
from formatters import StatsFormatter
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Conversation states
WAITING_FOR_TOKEN, WAITING_FOR_USERNAME = range(2)
class LichessBot:
def __init__(self):
self.db = Database()
self.lichess_api = LichessAPI()
self.periodic_tasks = {} # Store periodic tasks
self.period_start_times = {} # Store start times for each gamer
self.application = None # Will be set when application is created
async def start_existing_periodic_tasks(self):
"""Start periodic tasks for all user-gamer pairs that have periods set"""
try:
gamers_with_periods = self.db.get_all_gamers_with_periods()
logger.info(f"Found {len(gamers_with_periods)} user-gamer pairs with periodic notifications")
for gamer in gamers_with_periods:
if gamer['period_minutes'] > 0:
user_id = gamer['user_id']
# Start periodic task with user_id and gamer
await self.start_periodic_task(gamer, user_id, gamer['period_minutes'])
logger.info(f"Started periodic task for {gamer['username']} (user {user_id}) with period {gamer['period_minutes']} minutes")
except Exception as e:
logger.error(f"Error starting existing periodic tasks: {e}")
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command handler"""
# Register user in database
user = update.effective_user
self.db.add_or_get_telegram_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name
)
await update.message.reply_text(
"🎯 Добро пожаловать в Lichess Statistics Bot!\n\n"
"Доступные команды:\n"
"/adduser - Добавить игрока Lichess для отслеживания\n"
"/getgamers - Выбрать активного игрока\n"
"/today - Статистика за сегодня\n"
"/yesterday - Статистика за вчера\n"
"/week - Статистика за неделю\n"
"/setperiod - Настроить периодические уведомления"
)
async def adduser_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start adduser command"""
await update.message.reply_text(
"🔑 Введите ваш Lichess API token.\n"
"Если у вас нет токена, введите 0"
)
return WAITING_FOR_TOKEN
async def handle_token(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle token input"""
token = update.message.text.strip()
user_id = update.effective_user.id
if token == "0":
await update.message.reply_text("👤 Введите ваш Lichess username:")
return WAITING_FOR_USERNAME
else:
# Get username from token
profile = await self.lichess_api.get_user_profile(token)
if profile:
username = profile.get('username')
if username:
# Add gamer to database (without token)
gamer_id = self.db.add_gamer(username)
# Link user to gamer WITH token
self.db.add_user_gamer(user_id, gamer_id, token)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
await update.message.reply_text(
f"✅ Игрок {username} успешно добавлен!"
)
return ConversationHandler.END
else:
await update.message.reply_text(
"Не удалось получить username из токена. Попробуйте еще раз или введите 0 для ввода username вручную."
)
return WAITING_FOR_TOKEN
else:
await update.message.reply_text(
"❌ Неверный токен. Попробуйте еще раз или введите 0 для ввода username вручную."
)
return WAITING_FOR_TOKEN
async def handle_username(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle username input"""
username = update.message.text.strip()
user_id = update.effective_user.id
if username:
# Add gamer to database
gamer_id = self.db.add_gamer(username)
# Link user to gamer
self.db.add_user_gamer(user_id, gamer_id)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
await update.message.reply_text(
f"✅ Игрок {username} успешно добавлен!"
)
else:
await update.message.reply_text(
"❌ Username не может быть пустым. Попробуйте еще раз."
)
return WAITING_FOR_USERNAME
return ConversationHandler.END
async def getgamers(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Get all gamers for the current user and allow selection"""
user_id = update.effective_user.id
gamers = self.db.get_user_gamers(user_id)
if not gamers:
await update.message.reply_text("📭 В базе нет игроков. Используйте /adduser для добавления.")
return
# Show loading message
loading_msg = await update.message.reply_text("🔄 Загружаем рейтинги игроков...")
# Prepare data for each gamer
gamers_data = []
for gamer in gamers:
status = "🟢" if gamer['is_active'] else ""
username = gamer['username']
# Get user ratings from Lichess API
ratings_data = await self.lichess_api.get_user_ratings(username)
if ratings_data and 'perfs' in ratings_data:
perfs = ratings_data['perfs']
bullet_rating = perfs.get('bullet', {}).get('rating', 'N/A')
blitz_rating = perfs.get('blitz', {}).get('rating', 'N/A')
rapid_rating = perfs.get('rapid', {}).get('rating', 'N/A')
else:
bullet_rating = blitz_rating = rapid_rating = 'N/A'
# Add period information if period > 0
period_minutes = gamer.get('period_minutes', 0)
period_text = f" · {period_minutes}м" if period_minutes > 0 else ""
gamers_data.append({
'id': gamer['id'],
'status': status,
'username': username,
'bullet': bullet_rating,
'blitz': blitz_rating,
'rapid': rapid_rating,
'period': period_text
})
# Create text message with stats
text_lines = []
for gamer in gamers_data:
text_lines.append(
f"{gamer['status']} <b>{gamer['username']}</b> "
f"{gamer['bullet']} 🔥 {gamer['blitz']} 🐇 {gamer['rapid']}{gamer['period']}"
)
gamers_text = "👥 <b>Выберите активного игрока:</b>\n\n" + "\n".join(text_lines)
# Create simple keyboard with just usernames
keyboard = []
for gamer in gamers_data:
keyboard.append([InlineKeyboardButton(
text=f"{gamer['status']} {gamer['username']}",
callback_data=f"select_{gamer['id']}"
)])
reply_markup = InlineKeyboardMarkup(keyboard)
# Edit the loading message with the results
await loading_msg.edit_text(
gamers_text,
parse_mode='HTML',
reply_markup=reply_markup
)
async def select_gamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle gamer selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
gamer_id = int(query.data.split('_')[1])
# Get user gamers to find the selected one
gamers = self.db.get_user_gamers(user_id)
selected_gamer = next((g for g in gamers if g['id'] == gamer_id), None)
if selected_gamer:
# Set active gamer for this user
self.db.set_user_active_gamer(user_id, gamer_id)
await query.edit_message_text(
f"✅ Активный игрок: {selected_gamer['username']}"
)
else:
await query.edit_message_text("❌ Игрок не найден")
async def get_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE, period: str):
"""Get statistics for a period"""
user_id = update.effective_user.id
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if not active_gamer:
await update.message.reply_text(
"❌ Нет активного игрока. Используйте /getgamers для выбора."
)
return
username = active_gamer['username']
# Get stats based on period
if period == "today":
data = await self.lichess_api.get_today_stats(username)
elif period == "yesterday":
data = await self.lichess_api.get_yesterday_stats(username)
elif period == "week":
data = await self.lichess_api.get_week_stats(username)
else:
await update.message.reply_text("❌ Неизвестный период")
return
# Format and send response
formatted_response = StatsFormatter.format_stats_response(data, username, period)
await update.message.reply_text(formatted_response)
async def today(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Today command"""
await self.get_stats(update, context, "today")
async def yesterday(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Yesterday command"""
await self.get_stats(update, context, "yesterday")
async def week(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Week command"""
await self.get_stats(update, context, "week")
async def setperiod(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Set period command"""
user_id = update.effective_user.id
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if not active_gamer:
await update.message.reply_text(
"❌ Нет активного игрока. Используйте /getgamers для выбора."
)
return
keyboard = []
for period in PERIOD_OPTIONS:
if period == 0:
button_text = "❌ Отключить уведомления"
else:
button_text = f"{period} минут"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"period_{period}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"⏱️ Выберите период для игрока {active_gamer['username']}:\n"
f"📱 Уведомления будут приходить в личные сообщения",
reply_markup=reply_markup
)
async def select_period(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle period selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
period = int(query.data.split('_')[1])
# Get active gamer for this user
active_gamer = self.db.get_user_active_gamer(user_id)
if active_gamer:
# Set period for this user-gamer pair
self.db.set_user_gamer_period(user_id, active_gamer['id'], period)
if period == 0:
await query.edit_message_text(
f"✅ Уведомления для {active_gamer['username']} отключены"
)
else:
await query.edit_message_text(
f"✅ Период {period} минут установлен для {active_gamer['username']}\n"
f"📱 Уведомления будут приходить в личные сообщения"
)
# Start periodic task for this gamer (send to user's personal messages)
await self.start_periodic_task(active_gamer, user_id, period)
async def start_periodic_task(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Start periodic task for a gamer"""
task_key = f"{gamer['id']}_{user_id}"
# Cancel existing task if any
if task_key in self.periodic_tasks:
self.periodic_tasks[task_key].cancel()
logger.info(f"Cancelled existing periodic task for {gamer['username']}")
# Remove old start time
if task_key in self.period_start_times:
del self.period_start_times[task_key]
# Create new periodic task
task = asyncio.create_task(
self.periodic_check(gamer, user_id, period_minutes)
)
self.periodic_tasks[task_key] = task
async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Periodic check for gamer activity"""
task_key = f"{gamer['id']}_{user_id}"
# Запоминаем время начала отслеживания
start_time = datetime.now()
self.period_start_times[task_key] = start_time
logger.info(f"Started periodic monitoring for {gamer['username']} with {period_minutes} minute intervals")
while True:
try:
# Ждем заданное количество минут
await asyncio.sleep(period_minutes * 60)
# Получаем время начала периода
period_start = self.period_start_times.get(task_key, start_time)
now = datetime.now()
# Рассчитываем timestamps в миллисекундах
since_timestamp = int(period_start.timestamp() * 1000)
until_timestamp = int(now.timestamp() * 1000)
logger.info(f"Checking period for {gamer['username']}: {period_start} to {now}")
logger.info(f"Unix timestamps: since={since_timestamp}, until={until_timestamp}")
# Делаем запросы к API
games_url = f"{LICHESS_STATS_API_BASE_URL}/games/{gamer['username']}/period?since={since_timestamp}&until={until_timestamp}"
logger.info(f"🎮 GAMES API REQUEST: {games_url}")
games_data = await self.lichess_api.get_games_period(
gamer['username'], since_timestamp, until_timestamp
)
logger.info(f"Games API response: {games_data}")
puzzles_data = None
if gamer['token']:
puzzles_url = f"{LICHESS_STATS_API_BASE_URL}/puzzle/period?since={since_timestamp}&until={until_timestamp}&max=150"
logger.info(f"🧩 PUZZLES API REQUEST: {puzzles_url}")
puzzles_data = await self.lichess_api.get_puzzles_period(
gamer['token'], since_timestamp, until_timestamp, max_puzzles=150
)
logger.info(f"Puzzles API response: {puzzles_data}")
else:
logger.info(f"No token for {gamer['username']}, skipping puzzles API call")
# Проверяем наличие реальной активности
has_games = False
total_games = 0
total_losses = 0
if games_data and games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
total_losses = games_data.get('data', {}).get('total', {}).get('losses', 0)
has_games = total_games > 0
has_puzzles = False
if puzzles_data and puzzles_data.get('data'):
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
has_puzzles = total_puzzles > 0
# Детальное логирование для отладки
logger.info(f"Activity check for {gamer['username']}: has_games={has_games}, has_puzzles={has_puzzles}")
if games_data and games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
total_losses = games_data.get('data', {}).get('total', {}).get('losses', 0)
logger.info(f"Games data: total_games={total_games}, total_losses={total_losses}")
if puzzles_data and puzzles_data.get('data'):
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
logger.info(f"Puzzles data: total_attempts={total_puzzles}")
# Отправляем уведомление только если есть реальная активность
if has_games or has_puzzles:
try:
notification = StatsFormatter.format_period_notification(
gamer['username'], games_data, puzzles_data, period_minutes
)
if self.application:
try:
await self.application.bot.send_message(
chat_id=user_id,
text=notification
)
logger.info(f"Sent periodic notification for {gamer['username']} to user {user_id}")
# Обновляем время начала только после успешной отправки уведомления
self.period_start_times[task_key] = now
except Exception as e:
logger.error(f"Failed to send notification to user {user_id}: {e}")
# Не обновляем время начала при ошибке отправки
except Exception as e:
logger.error(f"Error formatting notification for {gamer['username']}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Не обновляем время начала при ошибке форматирования
else:
logger.info(f"No activity found for {gamer['username']} in the last {period_minutes} minutes")
# Обновляем время начала даже если нет активности, чтобы не зацикливаться
self.period_start_times[task_key] = now
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in periodic check: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
def setup_handlers(self, application: Application):
"""Setup all handlers"""
self.application = application # Store application reference
# Conversation handler for adduser
adduser_conv = ConversationHandler(
entry_points=[CommandHandler("adduser", self.adduser_start)],
states={
WAITING_FOR_TOKEN: [MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_token)],
WAITING_FOR_USERNAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_username)],
},
fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)]
)
# Add all handlers
application.add_handler(CommandHandler("start", self.start))
application.add_handler(adduser_conv)
application.add_handler(CommandHandler("getgamers", self.getgamers))
application.add_handler(CommandHandler("today", self.today))
application.add_handler(CommandHandler("yesterday", self.yesterday))
application.add_handler(CommandHandler("week", self.week))
application.add_handler(CommandHandler("setperiod", self.setperiod))
# Callback handlers
application.add_handler(CallbackQueryHandler(self.select_gamer, pattern="^select_"))
application.add_handler(CallbackQueryHandler(self.select_period, pattern="^period_"))
def main():
"""Main function"""
bot = LichessBot()
# Create application with Long Polling configuration
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Setup handlers
bot.setup_handlers(application)
# Set application reference for periodic tasks
bot.application = application
# Start periodic tasks for existing gamers (will be called after application starts)
async def post_init(app):
await bot.start_existing_periodic_tasks()
application.post_init = post_init
# Start the bot with Long Polling
logger.info("Starting Lichess Statistics Bot with Long Polling...")
application.run_polling(
poll_interval=POLL_INTERVAL,
timeout=POLL_TIMEOUT,
drop_pending_updates=DROP_PENDING_UPDATES,
allowed_updates=ALLOWED_UPDATES
)
if __name__ == '__main__':
main()