LichessStatTgWeb/LichessClientTG_bot/bot.py
2025-11-18 19:39:52 +03:00

1398 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application, CommandHandler, CallbackQueryHandler,
MessageHandler, filters, ContextTypes, ConversationHandler
)
from config import (
TELEGRAM_BOT_TOKEN, PERIOD_OPTIONS, POLL_INTERVAL,
POLL_TIMEOUT, DROP_PENDING_UPDATES, ALLOWED_UPDATES,
LICHESS_STATS_API_BASE_URL, ADMINPANEL_TELEGRAM_BOT_TOKEN, BOT_VERSION
)
from database import Database
from lichess_api import LichessAPI
from formatters import StatsFormatter
from i18n import t
from admin_bot import get_admin_bot, init_admin_bot
from message_counters import MessageCounters
import time
import aiohttp
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG
)
logger = logging.getLogger(__name__)
# Conversation states
WAITING_FOR_TOKEN, WAITING_FOR_USERNAME = range(2)
class LichessBot:
def __init__(self):
self.db = Database()
self.lichess_api = LichessAPI()
self.periodic_tasks = {} # Store periodic tasks
self.period_start_times = {} # Store start times for each gamer
self.application = None # Will be set when application is created
self.counters = MessageCounters() # Message counters
async def _notify_admin_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str], is_new_gamer: bool = False):
"""Notify admin about newly linked player (always try to send)."""
try:
admin_bot = get_admin_bot()
if admin_bot:
logger.info("Sending admin notification via admin_bot instance")
await admin_bot.notify_new_player(
player_username=player_username,
added_by_user_id=added_by_user_id,
added_by_username=added_by_username,
is_new_gamer=is_new_gamer
)
return
except Exception as e:
logger.warning(f"notify_new_player via admin_bot failed: {e}")
# Fallback: direct API call using admin bot token and chat id from DB
try:
admin_chat_id = self.db.get_admin_chat_id()
if not admin_chat_id:
logger.warning("Admin chat id is not set; cannot send admin notification.")
return
url = f"https://api.telegram.org/bot{ADMINPANEL_TELEGRAM_BOT_TOKEN}/sendMessage"
added_by_text = f"@{added_by_username}" if added_by_username else f"ID: {added_by_user_id}"
lichess_url = f"https://lichess.org/@/{player_username}"
message = (
f"🎮 <b>Добавлен новый игрок для отслеживания</b>\n\n"
f"Игрок: <a href=\"{lichess_url}\">{player_username}</a>\n"
f"Добавил: {added_by_text}"
)
logger.info(f"Sending admin notification via direct API to chat_id={admin_chat_id}")
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": admin_chat_id,
"text": message,
"parse_mode": "HTML"
}) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Failed to send admin notification (fallback): {response.status} - {error_text}")
else:
logger.info("Admin notification sent successfully via direct API")
except Exception as e:
logger.error(f"Fallback admin notification failed: {e}")
async def _notify_admin_new_user(self, user_id: int, username: Optional[str], first_name: Optional[str]):
"""Notify admin about new Telegram user (fallback method)."""
try:
admin_chat_id = self.db.get_admin_chat_id()
if not admin_chat_id:
logger.warning("Admin chat id is not set; cannot send admin notification.")
return
url = f"https://api.telegram.org/bot{ADMINPANEL_TELEGRAM_BOT_TOKEN}/sendMessage"
username_text = f"@{username}" if username else "без username"
name_text = first_name if first_name else "без имени"
message = (
f"🆕 <b>Новый пользователь Telegram</b>\n\n"
f"ID: {user_id}\n"
f"Username: {username_text}\n"
f"Имя: {name_text}"
)
logger.info(f"Sending admin notification via direct API to chat_id={admin_chat_id}")
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": admin_chat_id,
"text": message,
"parse_mode": "HTML"
}) as response:
if response.status == 200:
logger.info(f"Admin notification sent successfully via direct API")
else:
error_text = await response.text()
logger.error(f"Failed to send admin notification: {response.status} - {error_text}")
except Exception as e:
logger.error(f"Failed to send admin notification via API: {e}")
import traceback
logger.error(traceback.format_exc())
async def test_admin_notify(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Manual test to verify admin notifications delivery path."""
user = update.effective_user
try:
await self._notify_admin_new_player("test_player_notify", user.id, user.username if user else None)
await update.message.reply_text("✅ Admin notification test triggered.")
except Exception as e:
logger.error(f"test_admin_notify failed: {e}")
await update.message.reply_text(f"❌ Failed to trigger admin notification: {e}")
def get_user_language_from_update(self, update: Update) -> str:
"""Always return English language"""
# Update user info in database
user = update.effective_user
if user:
self.db.add_or_get_telegram_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=None
)
return 'en'
async def start_existing_periodic_tasks(self):
"""Start periodic tasks for all user-gamer pairs that have periods set"""
try:
gamers_with_periods = self.db.get_all_gamers_with_periods()
# Get statistics
import sqlite3
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM telegram_users")
total_users = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT username) FROM gamers")
total_gamers = cursor.fetchone()[0]
logger.info(f"📊 Statistics: {total_users} users, {total_gamers} tracked gamers")
logger.info(f"🔔 Found {len(gamers_with_periods)} user-gamer pairs with periodic notifications enabled")
if len(gamers_with_periods) == 0:
logger.warning("⚠️ No periodic notifications configured! Users need to set periods using /setperiod")
for gamer in gamers_with_periods:
if gamer['period_minutes'] > 0:
user_id = gamer['user_id']
# Start periodic task with user_id and gamer
await self.start_periodic_task(gamer, user_id, gamer['period_minutes'])
logger.info(f"✅ Started periodic task for {gamer['username']} (user {user_id}) with period {gamer['period_minutes']} minutes")
# Start daily counter reset task
asyncio.create_task(self.daily_counter_reset_task())
logger.info("✅ Started daily counter reset task")
except Exception as e:
logger.error(f"❌ Error starting existing periodic tasks: {e}")
import traceback
logger.error(traceback.format_exc())
async def daily_counter_reset_task(self):
"""Background task to reset daily counters at midnight"""
while True:
try:
# Calculate seconds until next midnight
now = datetime.now()
next_midnight = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
seconds_until_midnight = (next_midnight - now).total_seconds()
logger.info(f"Daily counter reset task: waiting {seconds_until_midnight} seconds until next midnight")
await asyncio.sleep(seconds_until_midnight)
# Reset daily counters
self.counters._reset_daily_counters_if_needed()
logger.info("Daily counters reset at midnight")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in daily counter reset task: {e}")
import traceback
logger.error(traceback.format_exc())
# Wait 1 hour before retrying
await asyncio.sleep(3600)
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command handler"""
logger.info(f"📝 start() method called for user {update.effective_user.id}")
# Register user in database
user = update.effective_user
lang_code = user.language_code if user else None
logger.info(f"User info: id={user.id}, username={user.username}, lang_code={lang_code}")
is_new_user = self.db.add_or_get_telegram_user(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=lang_code
)
# Notify admin bot about new user
if is_new_user:
try:
admin_bot = get_admin_bot()
if admin_bot:
await admin_bot.notify_new_user(
user_id=user.id,
username=user.username,
first_name=user.first_name
)
else:
# Fallback: direct API call
await self._notify_admin_new_user(user.id, user.username, user.first_name)
except Exception as e:
logger.error(f"Failed to notify admin about new user: {e}")
import traceback
logger.error(traceback.format_exc())
# Try fallback
try:
await self._notify_admin_new_user(user.id, user.username, user.first_name)
except Exception as e2:
logger.error(f"Fallback notification also failed: {e2}")
lang = self.get_user_language_from_update(update)
start_msg = t('start_message', lang)
await update.message.reply_text(start_msg)
self.counters.increment('start')
async def start_and_addgamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command that shows welcome message and starts addgamer conversation"""
try:
# Run the regular start command
await self.start(update, context)
# Start addgamer conversation and return state
return await self.addgamer_start(update, context)
except Exception as e:
logger.error(f"Error in start_and_addgamer: {e}")
import traceback
logger.error(traceback.format_exc())
try:
await update.message.reply_text(f"Error: {e}")
except:
pass
return ConversationHandler.END
async def addgamer_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start addgamer command - simple username only"""
logger.info(f"addgamer_start called for user {update.effective_user.id}")
lang = self.get_user_language_from_update(update)
try:
# Mark that we are awaiting a username reply
if context and hasattr(context, "user_data"):
context.user_data['awaiting_addgamer_username'] = True
await update.message.reply_text(t('addgamer_prompt', lang))
logger.info(f"Addgamer prompt sent to user {update.effective_user.id}")
self.counters.increment('addgamer')
except Exception as e:
logger.error(f"Error sending addgamer prompt: {e}")
import traceback
logger.error(traceback.format_exc())
# No conversation state returned; handler-based flow
return
async def addtoken_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start addtoken command - token required"""
lang = self.get_user_language_from_update(update)
await update.message.reply_text(t('addtoken_prompt', lang))
self.counters.increment('addtoken')
return WAITING_FOR_TOKEN
async def handle_token(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle token input for /addtoken"""
token = update.message.text.strip()
user_id = update.effective_user.id
# Get username from token
profile = await self.lichess_api.get_user_profile(token)
if profile:
username = profile.get('username')
if username:
# Check if this gamer is already tracked by this user
user_gamers = self.db.get_user_gamers(user_id)
existing_gamer = next((g for g in user_gamers if g['username'] == username), None)
lang = self.get_user_language_from_update(update)
if existing_gamer:
# Update token for existing gamer
self.db.add_user_gamer(user_id, existing_gamer['id'], token)
await update.message.reply_text(
t('token_added', lang, username=username)
)
# Always notify admin about link/added player
try:
user_obj = update.effective_user
await self._notify_admin_new_player(username, user_id, user_obj.username if user_obj else None)
except Exception as e:
logger.error(f"Admin notify failed after token update: {e}")
else:
# Add new gamer and link with token
# Check if gamer already exists
import sqlite3
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM gamers WHERE username = ?", (username,))
existing_gamer = cursor.fetchone()
is_new_gamer = existing_gamer is None
gamer_id = self.db.add_gamer(username)
self.db.add_user_gamer(user_id, gamer_id, token)
# Set default period to 1 hour (60 minutes) for new gamer
self.db.set_user_gamer_period(user_id, gamer_id, 60)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
# Start periodic task for this gamer (60 minutes period)
try:
gamer_data = {
'id': gamer_id,
'username': username,
'token': token,
'period_minutes': 60
}
await self.start_periodic_task(gamer_data, user_id, 60)
logger.info(f"Started periodic task for {username} (user {user_id}) with period 60 minutes")
except Exception as e:
logger.error(f"Failed to start periodic task for {username}: {e}")
import traceback
logger.error(traceback.format_exc())
# Notify admin bot about new player (always notify on link)
try:
user_obj = update.effective_user
await self._notify_admin_new_player(
username, user_id, user_obj.username if user_obj else None, is_new_gamer
)
except Exception as e:
logger.error(f"Admin notify failed after adding gamer with token: {e}")
await update.message.reply_text(
t('gamer_added_with_token', lang, username=username)
)
return ConversationHandler.END
else:
lang = self.get_user_language_from_update(update)
await update.message.reply_text(
t('token_username_error', lang)
)
return WAITING_FOR_TOKEN
else:
lang = self.get_user_language_from_update(update)
await update.message.reply_text(
t('invalid_token', lang)
)
return WAITING_FOR_TOKEN
async def handle_username(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle username input for /addgamer"""
# Only handle if we are awaiting an addgamer username
if not (context and hasattr(context, "user_data") and context.user_data.get('awaiting_addgamer_username')):
return
username = update.message.text.strip()
user_id = update.effective_user.id
lang = self.get_user_language_from_update(update)
if not username:
await update.message.reply_text(
t('empty_username', lang)
)
return
# Check if user exists on Lichess
user_exists = await self.lichess_api.check_user_exists(username)
if not user_exists:
await update.message.reply_text(
t('user_not_found', lang, username=username)
)
return WAITING_FOR_USERNAME
# Add gamer to database (without token)
# Check if gamer already exists
import sqlite3
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM gamers WHERE username = ?", (username,))
existing_gamer = cursor.fetchone()
is_new_gamer = existing_gamer is None
gamer_id = self.db.add_gamer(username)
# Link user to gamer (without token)
self.db.add_user_gamer(user_id, gamer_id, None)
# Set default period to 1 hour (60 minutes) for new gamer
self.db.set_user_gamer_period(user_id, gamer_id, 60)
# If this is the first gamer for this user, make it active
user_gamers = self.db.get_user_gamers(user_id)
if len(user_gamers) == 1:
self.db.set_user_active_gamer(user_id, gamer_id)
# Start periodic task for this gamer (60 minutes period)
try:
gamer_data = {
'id': gamer_id,
'username': username,
'token': None,
'period_minutes': 60
}
await self.start_periodic_task(gamer_data, user_id, 60)
logger.info(f"Started periodic task for {username} (user {user_id}) with period 60 minutes")
except Exception as e:
logger.error(f"Failed to start periodic task for {username}: {e}")
import traceback
logger.error(traceback.format_exc())
# Notify admin bot about player link (always notify)
try:
user_obj = update.effective_user
await self._notify_admin_new_player(
username, user_id, user_obj.username if user_obj else None, is_new_gamer
)
logger.info(f"Admin notification processed for player {username}")
except Exception as e:
logger.error(f"Failed to notify admin about new player link: {e}")
lang = self.get_user_language_from_update(update)
await update.message.reply_text(
t('gamer_added', lang, username=username)
)
# Clear awaiting flag
try:
context.user_data['awaiting_addgamer_username'] = False
except Exception:
pass
return
async def getgamers(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Get all gamers for the current user and allow selection"""
user_id = update.effective_user.id
gamers = self.db.get_user_gamers(user_id)
logger.info(f"getgamers: user_id={user_id}, found {len(gamers)} gamers")
lang = self.get_user_language_from_update(update)
if not gamers:
await update.message.reply_text(t('no_gamers', lang))
self.counters.increment('getgamers')
return
self.counters.increment('getgamers')
# Show loading message
loading_msg = await update.message.reply_text(t('loading_ratings', lang))
# Prepare data for each gamer
gamers_data = []
for i, gamer in enumerate(gamers):
try:
logger.info(f"Processing gamer {i+1}/{len(gamers)}: {gamer['username']} (ID: {gamer['id']})")
username = gamer['username']
# Get user ratings from Lichess API
logger.debug(f"Requesting ratings for {username}")
ratings_data = await self.lichess_api.get_user_ratings(username)
logger.debug(f"Received ratings for {username}: {ratings_data is not None}")
# Add delay between requests to avoid rate limiting
if i < len(gamers) - 1: # Don't sleep after the last one
await asyncio.sleep(0.5) # 500ms delay between requests
if ratings_data and 'perfs' in ratings_data:
perfs = ratings_data['perfs']
bullet_rating = perfs.get('bullet', {}).get('rating', 'N/A')
blitz_rating = perfs.get('blitz', {}).get('rating', 'N/A')
rapid_rating = perfs.get('rapid', {}).get('rating', 'N/A')
else:
bullet_rating = blitz_rating = rapid_rating = 'N/A'
# Add period information if period > 0
period_minutes = gamer.get('period_minutes', 0)
period_suffix = t('period_minutes_suffix', lang)
period_text = f" · {period_minutes}{period_suffix}" if period_minutes > 0 else ""
gamers_data.append({
'id': gamer['id'],
'username': username,
'bullet': bullet_rating,
'blitz': blitz_rating,
'rapid': rapid_rating,
'period': period_text
})
logger.info(f"Successfully added gamer {username} to gamers_data (total: {len(gamers_data)})")
except Exception as e:
logger.error(f"Error processing gamer {gamer.get('username', 'unknown')}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Still add the gamer with N/A ratings
period_minutes = gamer.get('period_minutes', 0)
period_suffix = t('period_minutes_suffix', lang)
period_text = f" · {period_minutes}{period_suffix}" if period_minutes > 0 else ""
gamers_data.append({
'id': gamer['id'],
'username': gamer['username'],
'bullet': 'N/A',
'blitz': 'N/A',
'rapid': 'N/A',
'period': period_text
})
logger.info(f"Added gamer {gamer['username']} with N/A ratings due to error")
# Create text message with stats
text_lines = []
for gamer in gamers_data:
text_lines.append(
f"<b>{gamer['username']}</b> "
f"{gamer['bullet']} 🔥 {gamer['blitz']} 🐇 {gamer['rapid']}{gamer['period']}"
)
logger.info(f"getgamers: prepared {len(gamers_data)} gamers for display")
gamers_text = t('select_active_gamer', lang) + "\n".join(text_lines)
logger.info(f"getgamers: message length: {len(gamers_text)} characters")
# Edit the loading message with the results (no keyboard)
try:
await loading_msg.edit_text(
gamers_text,
parse_mode='HTML'
)
except Exception as e:
logger.error(f"Error editing message: {e}")
# If edit fails, delete the loading message and send a new one
try:
await loading_msg.delete()
except:
pass
await update.message.reply_text(
gamers_text,
parse_mode='HTML'
)
async def select_gamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle gamer selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
gamer_id = int(query.data.split('_')[1])
# Get user gamers to find the selected one
gamers = self.db.get_user_gamers(user_id)
selected_gamer = next((g for g in gamers if g['id'] == gamer_id), None)
# Для callback query обновляем язык пользователя если есть в update
if update.effective_user:
self.db.add_or_get_telegram_user(
user_id=update.effective_user.id,
username=update.effective_user.username,
first_name=update.effective_user.first_name,
last_name=update.effective_user.last_name,
language_code=update.effective_user.language_code
)
lang = self.db.get_user_language(user_id)
if selected_gamer:
# Set active gamer for this user
self.db.set_user_active_gamer(user_id, gamer_id)
await query.edit_message_text(
t('active_gamer_set', lang, username=selected_gamer['username'])
)
else:
await query.edit_message_text(t('gamer_not_found', lang))
async def delgamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show gamers list for deletion"""
user_id = update.effective_user.id
gamers = self.db.get_user_gamers(user_id)
lang = self.get_user_language_from_update(update)
if not gamers:
await update.message.reply_text(t('no_gamers_to_delete', lang))
self.counters.increment('delgamer')
return
self.counters.increment('delgamer')
# Show loading message
loading_msg = await update.message.reply_text(t('loading_gamers', lang))
# Create text message with stats
text_lines = []
keyboard = []
for i, gamer in enumerate(gamers):
status = "🟢" if gamer['is_active'] else ""
username = gamer['username']
# Get user ratings from Lichess API
ratings_data = await self.lichess_api.get_user_ratings(username)
# Add delay between requests to avoid rate limiting
if i < len(gamers) - 1: # Don't sleep after the last one
await asyncio.sleep(0.5) # 500ms delay between requests
if ratings_data and 'perfs' in ratings_data:
perfs = ratings_data['perfs']
bullet_rating = perfs.get('bullet', {}).get('rating', 'N/A')
blitz_rating = perfs.get('blitz', {}).get('rating', 'N/A')
rapid_rating = perfs.get('rapid', {}).get('rating', 'N/A')
else:
bullet_rating = blitz_rating = rapid_rating = 'N/A'
period_minutes = gamer.get('period_minutes', 0)
period_suffix = t('period_minutes_suffix', lang)
period_text = f" · {period_minutes}{period_suffix}" if period_minutes > 0 else ""
text_lines.append(
f"{status} <b>{username}</b> "
f"{bullet_rating} 🔥 {blitz_rating} 🐇 {rapid_rating}{period_text}"
)
# Add delete button
keyboard.append([InlineKeyboardButton(
text=f"🗑️ {username}",
callback_data=f"delete_{gamer['id']}"
)])
gamers_text = t('select_gamer_to_delete', lang) + "\n".join(text_lines)
reply_markup = InlineKeyboardMarkup(keyboard)
# Edit the loading message with the results
try:
await loading_msg.edit_text(
gamers_text,
parse_mode='HTML',
reply_markup=reply_markup
)
except Exception as e:
logger.error(f"Error editing message: {e}")
try:
await loading_msg.delete()
except:
pass
await update.message.reply_text(
gamers_text,
parse_mode='HTML',
reply_markup=reply_markup
)
async def handle_delete_gamer(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle gamer deletion"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
gamer_id = int(query.data.split('_')[1])
# Get gamer info before deletion
gamers = self.db.get_user_gamers(user_id)
gamer_to_delete = next((g for g in gamers if g['id'] == gamer_id), None)
# Для callback query получаем язык из БД
if update.effective_user:
self.db.add_or_get_telegram_user(
user_id=update.effective_user.id,
username=update.effective_user.username,
first_name=update.effective_user.first_name,
last_name=update.effective_user.last_name,
language_code=update.effective_user.language_code
)
lang = self.db.get_user_language(user_id)
if gamer_to_delete:
username = gamer_to_delete['username']
was_active = gamer_to_delete.get('is_active', False)
total_gamers_before = len(gamers)
deleted = self.db.remove_user_gamer(user_id, gamer_id)
if deleted:
# Check how many gamers remain after deletion
remaining_gamers = self.db.get_user_gamers(user_id)
remaining_count = len(remaining_gamers)
# Determine which message to show
if remaining_count == 0:
# Last gamer deleted
message = t('last_gamer_deleted', lang, username=username)
elif was_active:
# Active gamer deleted but there are other gamers
message = t('active_gamer_deleted', lang, username=username)
else:
# Regular deletion
message = t('gamer_deleted', lang, username=username)
await query.edit_message_text(
message,
parse_mode='HTML'
)
else:
await query.edit_message_text(t('delete_failed', lang))
else:
await query.edit_message_text(t('gamer_not_found', lang))
async def get_stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE, period: str):
"""Get statistics for a period - shows stats for all players with activity"""
user_id = update.effective_user.id
# Get all gamers for this user
gamers = self.db.get_user_gamers(user_id)
lang = self.get_user_language_from_update(update)
if not gamers:
await update.message.reply_text(
t('no_gamers', lang)
)
return
# Send initial message about processing
try:
await update.message.reply_text(t('stats_processing', lang), parse_mode='HTML')
except Exception:
pass
# Process each gamer
has_any_activity = False
for i, gamer in enumerate(gamers):
username = gamer['username']
# Send message about processing this player
processing_msg = None
try:
processing_msg = await update.message.reply_text(t('stats_player_processing', lang, username=username), parse_mode='HTML')
except Exception:
pass
# Get stats based on period
if period == "today":
data = await self.lichess_api.get_today_stats(username)
elif period == "yesterday":
data = await self.lichess_api.get_yesterday_stats(username)
elif period == "week":
data = await self.lichess_api.get_week_stats(username)
else:
await update.message.reply_text(t('unknown_period', lang))
return
# Delete processing message
if processing_msg:
try:
await processing_msg.delete()
except Exception:
pass
# Check if there's activity
has_activity = False
if data and data.get('data'):
api_data = data.get('data', {})
tasks = api_data.get('tasks', {})
games = api_data.get('games', {})
# Check for puzzles activity
if tasks and tasks.get('total', 0) > 0:
has_activity = True
# Check for games activity
if games:
for game_type, game_data in games.items():
if game_data and game_data.get('games_played', 0) > 0:
has_activity = True
break
# Only send response if there's activity
if has_activity:
formatted_response = StatsFormatter.format_stats_response(data, username, period, lang)
await update.message.reply_text(formatted_response)
has_any_activity = True
# Add delay between requests to avoid rate limiting
if i < len(gamers) - 1:
await asyncio.sleep(1.0)
# If no activity for any player
if not has_any_activity:
await update.message.reply_text(t('no_activity', lang))
else:
# Send final message that all is done
await update.message.reply_text(t('stats_all_done', lang))
# Increment counter for the period command
if period == "today":
self.counters.increment('today')
elif period == "yesterday":
self.counters.increment('yesterday')
elif period == "week":
self.counters.increment('week')
async def today(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Today command"""
await self.get_stats(update, context, "today")
async def yesterday(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Yesterday command"""
await self.get_stats(update, context, "yesterday")
async def week(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Week command"""
await self.get_stats(update, context, "week")
async def support(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Support command - show contact information"""
lang = self.get_user_language_from_update(update)
support_msg = t('support_message', lang, version=BOT_VERSION)
await update.message.reply_text(support_msg, parse_mode='HTML')
self.counters.increment('support')
async def last_year_or_1000games(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Get last year stats or last 1000 rated games for all players with activity"""
user_id = update.effective_user.id
# Get all gamers for this user
gamers = self.db.get_user_gamers(user_id)
lang = self.get_user_language_from_update(update)
if not gamers:
await update.message.reply_text(
t('no_gamers', lang)
)
return
# Send initial message about processing
try:
await update.message.reply_text(t('last_year_1000_processing', lang), parse_mode='HTML')
except Exception:
pass
now_ms = int(time.time() * 1000)
year_ms = 365 * 24 * 3600 * 1000
since_ms = now_ms - year_ms
has_any_activity = False
# Process each gamer sequentially
for i, gamer in enumerate(gamers):
username = gamer['username']
try:
# Send message about processing this player
processing_msg = None
try:
processing_msg = await update.message.reply_text(t('last_year_1000_player_processing', lang, username=username), parse_mode='HTML')
except Exception:
pass
# Get data for this player
data = await self.lichess_api.get_games_period(username, since_ms, now_ms, rated_only=True)
# Delete processing message
if processing_msg:
try:
await processing_msg.delete()
except Exception:
pass
if data:
# Check if there's activity (games_count > 0)
games_count = data.get('games_count', 0)
if games_count > 0:
# Format and send immediately
text = StatsFormatter.format_last_year_or_1000(data, username, lang)
await update.message.reply_text(text)
has_any_activity = True
# Wait 3 seconds before next request (except after the last one)
if i < len(gamers) - 1:
await asyncio.sleep(3.0)
except Exception as e:
logger.error(f"/lastYear_or_1000games error for {username}: {e}")
await update.message.reply_text(f"Error for {username}: {e}")
# If no activity for any player
if not has_any_activity:
await update.message.reply_text(t('no_activity', lang))
else:
# Send final message that all is done
await update.message.reply_text(t('stats_all_done', lang))
self.counters.increment('last_year_1000')
async def setperiod(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Set period command - first select gamer, then select period"""
user_id = update.effective_user.id
# Get all gamers for this user
gamers = self.db.get_user_gamers(user_id)
lang = self.get_user_language_from_update(update)
if not gamers:
await update.message.reply_text(t('no_gamers', lang))
self.counters.increment('setperiod')
return
# Create keyboard with gamers and their periods
keyboard = []
for gamer in gamers:
username = gamer['username']
period_minutes = gamer.get('period_minutes', 0)
# Format period text
if period_minutes == 0:
period_text = ""
elif period_minutes < 60:
period_text = f"{period_minutes}m"
elif period_minutes == 60:
period_text = "1h"
else:
hours = period_minutes // 60
period_text = f"{hours}h"
keyboard.append([InlineKeyboardButton(
text=f"{username} · {period_text}",
callback_data=f"select_gamer_period_{gamer['id']}"
)])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"⏱️ Select player to set notification period:",
reply_markup=reply_markup
)
self.counters.increment('setperiod')
async def select_gamer_for_period(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle gamer selection for period setting"""
query = update.callback_query
await query.answer()
logger.info(f"select_gamer_for_period called with callback_data: {query.data}")
user_id = query.from_user.id
# Parse callback_data: select_gamer_period_{gamer_id}
try:
gamer_id = int(query.data.split('_')[-1])
logger.info(f"Parsed gamer_id: {gamer_id}")
except (ValueError, IndexError) as e:
logger.error(f"Error parsing gamer_id from callback_data '{query.data}': {e}")
await query.edit_message_text("❌ Error: Invalid player selection")
return
# Get gamer info
gamers = self.db.get_user_gamers(user_id)
selected_gamer = None
for gamer in gamers:
if gamer['id'] == gamer_id:
selected_gamer = gamer
break
if not selected_gamer:
await query.edit_message_text("❌ Player not found")
return
# Для callback query получаем язык из БД
if update.effective_user:
self.db.add_or_get_telegram_user(
user_id=update.effective_user.id,
username=update.effective_user.username,
first_name=update.effective_user.first_name,
last_name=update.effective_user.last_name,
language_code=update.effective_user.language_code
)
lang = self.db.get_user_language(user_id)
# Show period options for selected gamer
keyboard = []
for period in PERIOD_OPTIONS:
if period == 0:
button_text = t('disable_notifications', lang)
else:
# Format period text: minutes for < 60, hours for >= 60
if period < 60:
button_text = f"{period} minutes"
elif period == 60:
button_text = "⏰ 1 hour"
else:
hours = period // 60
button_text = f"{hours} hours"
keyboard.append([InlineKeyboardButton(
button_text,
callback_data=f"period_{gamer_id}_{period}"
)])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
t('select_period', lang, username=selected_gamer['username']),
reply_markup=reply_markup,
parse_mode='HTML'
)
async def select_period(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle period selection"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
# Parse callback data: period_{gamer_id}_{period}
parts = query.data.split('_')
gamer_id = int(parts[1])
period = int(parts[2])
# Get gamer info
gamers = self.db.get_user_gamers(user_id)
selected_gamer = None
for gamer in gamers:
if gamer['id'] == gamer_id:
selected_gamer = gamer
break
if not selected_gamer:
await query.edit_message_text("❌ Player not found")
return
# Для callback query получаем язык из БД
if update.effective_user:
self.db.add_or_get_telegram_user(
user_id=update.effective_user.id,
username=update.effective_user.username,
first_name=update.effective_user.first_name,
last_name=update.effective_user.last_name,
language_code=update.effective_user.language_code
)
lang = self.db.get_user_language(user_id)
# Set period for this user-gamer pair
self.db.set_user_gamer_period(user_id, gamer_id, period)
if period == 0:
await query.edit_message_text(
t('notifications_disabled', lang, username=selected_gamer['username'])
)
else:
# Format period text for confirmation message
if period < 60:
period_text = f"{period} minutes"
elif period == 60:
period_text = "1 hour"
else:
hours = period // 60
period_text = f"{hours} hours"
await query.edit_message_text(
f"✅ Period {period_text} set for {selected_gamer['username']}\n📱 Notifications will be sent to personal messages"
)
# Start periodic task for this gamer (send to user's personal messages)
await self.start_periodic_task(selected_gamer, user_id, period)
async def check_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Check and display current language settings"""
user = update.effective_user
if user:
# Обновляем язык из update
lang = self.get_user_language_from_update(update)
# Получаем язык из БД для отображения
db_lang = self.db.get_user_language(user.id)
# Получаем language_code из БД напрямую
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT language_code FROM telegram_users WHERE user_id = ?", (user.id,))
row = cursor.fetchone()
db_language_code = row[0] if row and row[0] else None
message = (
f"🌐 Language Info:\n"
f"Current language: {lang}\n"
f"DB language: {db_lang}\n"
f"DB language_code: {db_language_code}\n"
f"Update language_code: {user.language_code}\n\n"
f"Language used: {lang}\n\n"
f"Bot uses English language only."
)
await update.message.reply_text(message)
self.counters.increment('lang')
else:
await update.message.reply_text("❌ Failed to get user information")
async def reset_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Reset user language in database - will be detected from /start"""
user = update.effective_user
if user:
# Сбрасываем язык в БД (устанавливаем NULL)
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE telegram_users SET language_code = NULL WHERE user_id = ?",
(user.id,)
)
conn.commit()
# Language is always English
lang = 'en'
await update.message.reply_text(
"✅ Language reset! Bot uses English language only."
)
self.counters.increment('resetlang')
else:
await update.message.reply_text("❌ Failed to get user information")
async def start_periodic_task(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Start periodic task for a gamer"""
task_key = f"{gamer['id']}_{user_id}"
# Cancel existing task if any
if task_key in self.periodic_tasks:
self.periodic_tasks[task_key].cancel()
logger.info(f"Cancelled existing periodic task for {gamer['username']}")
# Remove old start time
if task_key in self.period_start_times:
del self.period_start_times[task_key]
# Create new periodic task
task = asyncio.create_task(
self.periodic_check(gamer, user_id, period_minutes)
)
self.periodic_tasks[task_key] = task
async def periodic_check(self, gamer: Dict[str, Any], user_id: int, period_minutes: int):
"""Periodic check for gamer activity"""
task_key = f"{gamer['id']}_{user_id}"
# Запоминаем время начала отслеживания (текущее время минус период, чтобы сразу проверить последний период)
start_time = datetime.now() - timedelta(minutes=period_minutes)
self.period_start_times[task_key] = start_time
logger.info(f"Started periodic monitoring for {gamer['username']} with {period_minutes} minute intervals")
consecutive_errors = 0
max_consecutive_errors = 5
while True:
try:
# Проверяем, что период все еще установлен в БД
current_gamers = self.db.get_user_gamers(user_id)
gamer_still_exists = False
current_period = 0
for g in current_gamers:
if g['id'] == gamer['id']:
gamer_still_exists = True
current_period = g.get('period_minutes', 0)
break
# Если игрок удален или период отключен, прекращаем мониторинг
if not gamer_still_exists or current_period == 0:
logger.info(f"Periodic monitoring stopped for {gamer['username']}: gamer removed or period disabled")
if task_key in self.periodic_tasks:
del self.periodic_tasks[task_key]
if task_key in self.period_start_times:
del self.period_start_times[task_key]
break
# Обновляем период на случай, если он был изменен
if current_period != period_minutes:
logger.info(f"Period changed for {gamer['username']} from {period_minutes} to {current_period} minutes")
period_minutes = current_period
# Ждем заданное количество минут
await asyncio.sleep(period_minutes * 60)
# Получаем время начала периода (время последней проверки или время старта задачи)
period_start = self.period_start_times.get(task_key, start_time)
now = datetime.now()
# Рассчитываем период: от (текущее время - период) до текущего времени
# Это гарантирует, что мы проверяем последний час активности
since_time = now - timedelta(minutes=period_minutes)
since_timestamp = int(since_time.timestamp() * 1000)
until_timestamp = int(now.timestamp() * 1000)
logger.info(f"Checking period for {gamer['username']}: {since_time} to {now} (last {period_minutes} minutes)")
logger.info(f"Unix timestamps: since={since_timestamp}, until={until_timestamp}")
# Делаем запросы к API с обработкой ошибок
games_data = None
puzzles_data = None
try:
games_data = await self.lichess_api.get_games_period(
gamer['username'], since_timestamp, until_timestamp
)
logger.info(f"Games API response received for {gamer['username']}")
except Exception as e:
logger.error(f"Error getting games data for {gamer['username']}: {e}")
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
logger.error(f"Too many consecutive errors for {gamer['username']}, stopping periodic check")
break
# Продолжаем с обновлением времени начала, чтобы не зацикливаться
now = datetime.now()
self.period_start_times[task_key] = now
continue
if gamer.get('token'):
try:
puzzles_data = await self.lichess_api.get_puzzles_period(
gamer['token'], since_timestamp, until_timestamp, max_puzzles=150
)
logger.info(f"Puzzles API response received for {gamer['username']}")
except Exception as e:
logger.warning(f"Error getting puzzles data for {gamer['username']}: {e}")
# Продолжаем без данных по пазлам
# Сбрасываем счетчик ошибок при успешном запросе
consecutive_errors = 0
# Проверяем наличие реальной активности
has_games = False
total_games = 0
if games_data and games_data.get('data'):
total_games = games_data.get('data', {}).get('total', {}).get('games_played', 0)
has_games = total_games > 0
has_puzzles = False
if puzzles_data and puzzles_data.get('data'):
total_puzzles = puzzles_data.get('data', {}).get('total_attempts', 0)
has_puzzles = total_puzzles > 0
logger.info(f"Activity check for {gamer['username']}: has_games={has_games}, has_puzzles={has_puzzles}")
# Отправляем уведомление только если есть реальная активность
if has_games or has_puzzles:
logger.info(f"📊 Activity detected for {gamer['username']}, preparing notification...")
try:
# Get user language from database
user_lang = self.db.get_user_language(user_id)
notification = StatsFormatter.format_period_notification(
gamer['username'], games_data, puzzles_data, period_minutes, lang=user_lang
)
if self.application:
try:
await self.application.bot.send_message(
chat_id=user_id,
text=notification
)
logger.info(f"✅ Sent periodic notification for {gamer['username']} to user {user_id}")
# Обновляем время начала на текущее время после успешной отправки уведомления
self.period_start_times[task_key] = now
# Increment periodic notification counter
self.counters.increment('periodic_notification')
except Exception as e:
logger.error(f"❌ Failed to send notification to user {user_id}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Обновляем время начала даже при ошибке отправки, чтобы не зацикливаться
self.period_start_times[task_key] = now
else:
logger.error(f"❌ Application not initialized, cannot send notification for {gamer['username']} to user {user_id}")
self.period_start_times[task_key] = now
except Exception as e:
logger.error(f"Error formatting notification for {gamer['username']}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Обновляем время начала даже при ошибке форматирования
self.period_start_times[task_key] = now
else:
logger.debug(f"⏭️ No activity found for {gamer['username']} in the last {period_minutes} minutes")
# Обновляем время начала на текущее время даже если нет активности
self.period_start_times[task_key] = now
except asyncio.CancelledError:
logger.info(f"Periodic check cancelled for {gamer['username']}")
break
except Exception as e:
consecutive_errors += 1
logger.error(f"Error in periodic check for {gamer['username']}: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
if consecutive_errors >= max_consecutive_errors:
logger.error(f"Too many consecutive errors for {gamer['username']}, stopping periodic check")
if task_key in self.periodic_tasks:
del self.periodic_tasks[task_key]
if task_key in self.period_start_times:
del self.period_start_times[task_key]
break
# Ждем перед повторной попыткой при ошибке
await asyncio.sleep(60) # 1 minute delay before retry
def setup_handlers(self, application: Application):
"""Setup all handlers"""
self.application = application # Store application reference
# Conversation handler for addtoken (token required)
addtoken_conv = ConversationHandler(
entry_points=[CommandHandler("addtoken", self.addtoken_start)],
states={
WAITING_FOR_TOKEN: [MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_token)],
},
fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)]
)
# Add handlers
application.add_handler(CommandHandler("start", self.start_and_addgamer))
application.add_handler(CommandHandler("addgamer", self.addgamer_start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_username))
application.add_handler(addtoken_conv)
application.add_handler(CommandHandler("getgamers", self.getgamers))
application.add_handler(CommandHandler("delgamer", self.delgamer))
application.add_handler(CommandHandler("today", self.today))
application.add_handler(CommandHandler("yesterday", self.yesterday))
application.add_handler(CommandHandler("week", self.week))
application.add_handler(CommandHandler("lastYear_or_1000games", self.last_year_or_1000games))
application.add_handler(CommandHandler("support", self.support))
application.add_handler(CommandHandler("setperiod", self.setperiod))
application.add_handler(CommandHandler("lang", self.check_language))
application.add_handler(CommandHandler("resetlang", self.reset_language))
application.add_handler(CommandHandler("test_admin_notify", self.test_admin_notify))
# Callback handlers (order matters - more specific patterns first)
application.add_handler(CallbackQueryHandler(self.select_gamer_for_period, pattern="^select_gamer_period_"))
application.add_handler(CallbackQueryHandler(self.select_gamer, pattern="^select_"))
application.add_handler(CallbackQueryHandler(self.handle_delete_gamer, pattern="^delete_"))
application.add_handler(CallbackQueryHandler(self.select_period, pattern="^period_"))
def main():
"""Main function"""
# Initialize admin bot for notifications (admin bot runs as separate service)
init_admin_bot()
bot = LichessBot()
# Create application with Long Polling configuration
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Setup handlers
bot.setup_handlers(application)
# Set application reference for periodic tasks
bot.application = application
# Start periodic tasks for existing gamers (will be called after application starts)
async def post_init(app):
await bot.start_existing_periodic_tasks()
application.post_init = post_init
# Add error handler
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log the error and send a telegram message to notify the developer."""
logger.error(f"Exception while handling an update: {context.error}")
import traceback
logger.error(traceback.format_exc())
application.add_error_handler(error_handler)
# Start the bot with Long Polling
logger.info("Starting Lichess Statistics Bot with Long Polling...")
try:
application.run_polling(
poll_interval=POLL_INTERVAL,
timeout=POLL_TIMEOUT,
drop_pending_updates=DROP_PENDING_UPDATES,
allowed_updates=ALLOWED_UPDATES
)
except Exception as e:
logger.error(f"Fatal error in run_polling: {e}")
import traceback
logger.error(traceback.format_exc())
raise
if __name__ == '__main__':
main()