LichessStatTgWeb/LichessClientTG_bot/database.py
2025-11-12 23:20:01 +03:00

331 lines
13 KiB
Python

import sqlite3
import logging
from typing import Optional, List, Dict, Any
from config import DATABASE_PATH
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_path: str = DATABASE_PATH):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create telegram_users table (Telegram bot users)
cursor.execute('''
CREATE TABLE IF NOT EXISTS telegram_users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
language_code TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Add language_code column if it doesn't exist
try:
cursor.execute("ALTER TABLE telegram_users ADD COLUMN language_code TEXT")
except sqlite3.OperationalError:
# Column already exists
pass
# Create gamers table (Lichess players only)
cursor.execute('''
CREATE TABLE IF NOT EXISTS gamers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
token TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create user_gamers table (relationship between users and gamers)
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_gamers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
gamer_id INTEGER NOT NULL,
token TEXT,
is_active BOOLEAN DEFAULT FALSE,
period_minutes INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES telegram_users(user_id),
FOREIGN KEY (gamer_id) REFERENCES gamers(id),
UNIQUE(user_id, gamer_id)
)
''')
# Add token column to user_gamers if it doesn't exist
try:
cursor.execute("ALTER TABLE user_gamers ADD COLUMN token TEXT")
except sqlite3.OperationalError:
# Column already exists
pass
conn.commit()
# Migrate tokens from gamers to user_gamers if needed
self._migrate_tokens_from_gamers()
logger.info("Database initialized successfully")
def _migrate_tokens_from_gamers(self):
"""Migrate tokens from old gamers table to user_gamers table if needed"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if there are tokens in gamers table that need migration
cursor.execute("SELECT COUNT(*) FROM gamers WHERE token IS NOT NULL")
gamers_with_tokens = cursor.fetchone()[0]
if gamers_with_tokens == 0:
return # No tokens to migrate
# Check if user_gamers already has tokens
cursor.execute("SELECT COUNT(*) FROM user_gamers WHERE token IS NOT NULL")
user_gamers_with_tokens = cursor.fetchone()[0]
if user_gamers_with_tokens > 0:
return # Migration already done
# Migrate tokens from gamers to user_gamers
cursor.execute("SELECT id, token FROM gamers WHERE token IS NOT NULL")
gamers_tokens = cursor.fetchall()
migrated = 0
for gamer_id, token in gamers_tokens:
# Update all user-gamer relationships for this gamer
cursor.execute(
"UPDATE user_gamers SET token = ? WHERE gamer_id = ? AND token IS NULL",
(token, gamer_id)
)
migrated += cursor.rowcount
conn.commit()
if migrated > 0:
logger.info(f"Migrated {migrated} tokens from gamers to user_gamers")
def add_or_get_telegram_user(self, user_id: int, username: Optional[str] = None,
first_name: Optional[str] = None, last_name: Optional[str] = None,
language_code: Optional[str] = None) -> bool:
"""Add or update Telegram user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT user_id FROM telegram_users WHERE user_id = ?", (user_id,))
existing = cursor.fetchone()
if not existing:
cursor.execute(
"INSERT INTO telegram_users (user_id, username, first_name, last_name, language_code) VALUES (?, ?, ?, ?, ?)",
(user_id, username, first_name, last_name, language_code)
)
conn.commit()
return True
else:
# Update language_code if provided (always update, even if None to clear old value)
cursor.execute(
"UPDATE telegram_users SET language_code = ? WHERE user_id = ?",
(language_code, user_id)
)
conn.commit()
return False
def get_user_language(self, user_id: int) -> str:
"""Always return English language"""
return 'en'
def add_gamer(self, username: str) -> int:
"""Add a new gamer to the database (return gamer_id)"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if gamer already exists
cursor.execute("SELECT id FROM gamers WHERE username = ?", (username,))
existing = cursor.fetchone()
if existing:
gamer_id = existing[0]
else:
# Add new gamer (without token - tokens are stored in user_gamers)
cursor.execute(
"INSERT INTO gamers (username) VALUES (?)",
(username,)
)
gamer_id = cursor.lastrowid
conn.commit()
return gamer_id
def add_user_gamer(self, user_id: int, gamer_id: int, token: Optional[str] = None) -> bool:
"""Add relationship between user and gamer with optional token"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO user_gamers (user_id, gamer_id, token) VALUES (?, ?, ?)",
(user_id, gamer_id, token)
)
conn.commit()
return True
except sqlite3.IntegrityError:
# Already exists - update token if provided
if token:
cursor.execute(
"UPDATE user_gamers SET token = ? WHERE user_id = ? AND gamer_id = ?",
(token, user_id, gamer_id)
)
conn.commit()
return False
def get_user_gamers(self, user_id: int) -> List[Dict[str, Any]]:
"""Get all gamers for a specific user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT g.id, g.username, ug.token, ug.is_active, ug.period_minutes
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.user_id = ?
ORDER BY ug.id
''', (user_id,))
gamers = []
for row in cursor.fetchall():
gamers.append({
'id': row[0],
'username': row[1],
'token': row[2], # Token from user_gamers, not gamers
'is_active': bool(row[3]),
'period_minutes': row[4]
})
return gamers
def get_user_active_gamer(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Get the active gamer for a specific user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT g.id, g.username, ug.token
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.user_id = ? AND ug.is_active = TRUE
LIMIT 1
''', (user_id,))
row = cursor.fetchone()
if row:
return {
'id': row[0],
'username': row[1],
'token': row[2] # Token from user_gamers
}
return None
def set_user_active_gamer(self, user_id: int, gamer_id: int):
"""Set active gamer for a specific user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Deactivate all gamers for this user
cursor.execute(
"UPDATE user_gamers SET is_active = FALSE WHERE user_id = ?",
(user_id,)
)
# Activate the selected gamer
cursor.execute(
"UPDATE user_gamers SET is_active = TRUE WHERE user_id = ? AND gamer_id = ?",
(user_id, gamer_id)
)
conn.commit()
def set_user_gamer_period(self, user_id: int, gamer_id: int, period_minutes: int):
"""Set period for a gamer for a specific user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE user_gamers SET period_minutes = ? WHERE user_id = ? AND gamer_id = ?",
(period_minutes, user_id, gamer_id)
)
conn.commit()
def remove_user_gamer(self, user_id: int, gamer_id: int) -> bool:
"""Remove gamer from user's tracked list"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Delete the user-gamer relationship
cursor.execute(
"DELETE FROM user_gamers WHERE user_id = ? AND gamer_id = ?",
(user_id, gamer_id)
)
deleted = cursor.rowcount > 0
# Check if this gamer is still tracked by any other user
cursor.execute(
"SELECT COUNT(*) FROM user_gamers WHERE gamer_id = ?",
(gamer_id,)
)
other_users_count = cursor.fetchone()[0]
# If no other users track this gamer, remove from gamers table
if other_users_count == 0:
cursor.execute("DELETE FROM gamers WHERE id = ?", (gamer_id,))
conn.commit()
return deleted
def get_user_gamers_with_periods(self, user_id: int) -> List[Dict[str, Any]]:
"""Get all gamers for a user that have periods set"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT g.id, g.username, ug.token, ug.period_minutes
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.user_id = ? AND ug.period_minutes > 0
''', (user_id,))
gamers = []
for row in cursor.fetchall():
gamers.append({
'id': row[0],
'username': row[1],
'token': row[2], # Token from user_gamers
'period_minutes': row[3]
})
return gamers
def get_all_gamers_with_periods(self) -> List[Dict[str, Any]]:
"""Get all user-gamer pairs that have periods set (for periodic checks across all users)"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ug.user_id, g.id, g.username, ug.token, ug.period_minutes
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.period_minutes > 0
''')
gamers = []
for row in cursor.fetchall():
gamers.append({
'user_id': row[0],
'id': row[1],
'username': row[2],
'token': row[3], # Token from user_gamers
'period_minutes': row[4]
})
return gamers