import sqlite3 import logging from typing import Optional, List, Dict, Any from config import DATABASE_PATH logger = logging.getLogger(__name__) class Database: def __init__(self, db_path: str = DATABASE_PATH): self.db_path = db_path self.init_database() def init_database(self): """Initialize database tables""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create telegram_users table (Telegram bot users) cursor.execute(''' CREATE TABLE IF NOT EXISTS telegram_users ( user_id INTEGER PRIMARY KEY, username TEXT, first_name TEXT, last_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create gamers table (Lichess players only) cursor.execute(''' CREATE TABLE IF NOT EXISTS gamers ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, token TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create user_gamers table (relationship between users and gamers) cursor.execute(''' CREATE TABLE IF NOT EXISTS user_gamers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, gamer_id INTEGER NOT NULL, token TEXT, is_active BOOLEAN DEFAULT FALSE, period_minutes INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES telegram_users(user_id), FOREIGN KEY (gamer_id) REFERENCES gamers(id), UNIQUE(user_id, gamer_id) ) ''') # Add token column to user_gamers if it doesn't exist try: cursor.execute("ALTER TABLE user_gamers ADD COLUMN token TEXT") except sqlite3.OperationalError: # Column already exists pass conn.commit() # Migrate tokens from gamers to user_gamers if needed self._migrate_tokens_from_gamers() logger.info("Database initialized successfully") def _migrate_tokens_from_gamers(self): """Migrate tokens from old gamers table to user_gamers table if needed""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if there are tokens in gamers table that need migration cursor.execute("SELECT COUNT(*) FROM gamers WHERE token IS NOT NULL") gamers_with_tokens = cursor.fetchone()[0] if gamers_with_tokens == 0: return # No tokens to migrate # Check if user_gamers already has tokens cursor.execute("SELECT COUNT(*) FROM user_gamers WHERE token IS NOT NULL") user_gamers_with_tokens = cursor.fetchone()[0] if user_gamers_with_tokens > 0: return # Migration already done # Migrate tokens from gamers to user_gamers cursor.execute("SELECT id, token FROM gamers WHERE token IS NOT NULL") gamers_tokens = cursor.fetchall() migrated = 0 for gamer_id, token in gamers_tokens: # Update all user-gamer relationships for this gamer cursor.execute( "UPDATE user_gamers SET token = ? WHERE gamer_id = ? AND token IS NULL", (token, gamer_id) ) migrated += cursor.rowcount conn.commit() if migrated > 0: logger.info(f"Migrated {migrated} tokens from gamers to user_gamers") def add_or_get_telegram_user(self, user_id: int, username: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None) -> bool: """Add or update Telegram user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT user_id FROM telegram_users WHERE user_id = ?", (user_id,)) existing = cursor.fetchone() if not existing: cursor.execute( "INSERT INTO telegram_users (user_id, username, first_name, last_name) VALUES (?, ?, ?, ?)", (user_id, username, first_name, last_name) ) conn.commit() return True return False def add_gamer(self, username: str) -> int: """Add a new gamer to the database (return gamer_id)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if gamer already exists cursor.execute("SELECT id FROM gamers WHERE username = ?", (username,)) existing = cursor.fetchone() if existing: gamer_id = existing[0] else: # Add new gamer (without token - tokens are stored in user_gamers) cursor.execute( "INSERT INTO gamers (username) VALUES (?)", (username,) ) gamer_id = cursor.lastrowid conn.commit() return gamer_id def add_user_gamer(self, user_id: int, gamer_id: int, token: Optional[str] = None) -> bool: """Add relationship between user and gamer with optional token""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() try: cursor.execute( "INSERT INTO user_gamers (user_id, gamer_id, token) VALUES (?, ?, ?)", (user_id, gamer_id, token) ) conn.commit() return True except sqlite3.IntegrityError: # Already exists - update token if provided if token: cursor.execute( "UPDATE user_gamers SET token = ? WHERE user_id = ? AND gamer_id = ?", (token, user_id, gamer_id) ) conn.commit() return False def get_user_gamers(self, user_id: int) -> List[Dict[str, Any]]: """Get all gamers for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token, ug.is_active, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? ORDER BY ug.id ''', (user_id,)) gamers = [] for row in cursor.fetchall(): gamers.append({ 'id': row[0], 'username': row[1], 'token': row[2], # Token from user_gamers, not gamers 'is_active': bool(row[3]), 'period_minutes': row[4] }) return gamers def get_user_active_gamer(self, user_id: int) -> Optional[Dict[str, Any]]: """Get the active gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? AND ug.is_active = TRUE LIMIT 1 ''', (user_id,)) row = cursor.fetchone() if row: return { 'id': row[0], 'username': row[1], 'token': row[2] # Token from user_gamers } return None def set_user_active_gamer(self, user_id: int, gamer_id: int): """Set active gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Deactivate all gamers for this user cursor.execute( "UPDATE user_gamers SET is_active = FALSE WHERE user_id = ?", (user_id,) ) # Activate the selected gamer cursor.execute( "UPDATE user_gamers SET is_active = TRUE WHERE user_id = ? AND gamer_id = ?", (user_id, gamer_id) ) conn.commit() def set_user_gamer_period(self, user_id: int, gamer_id: int, period_minutes: int): """Set period for a gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "UPDATE user_gamers SET period_minutes = ? WHERE user_id = ? AND gamer_id = ?", (period_minutes, user_id, gamer_id) ) conn.commit() def get_user_gamers_with_periods(self, user_id: int) -> List[Dict[str, Any]]: """Get all gamers for a user that have periods set""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? AND ug.period_minutes > 0 ''', (user_id,)) gamers = [] for row in cursor.fetchall(): gamers.append({ 'id': row[0], 'username': row[1], 'token': row[2], # Token from user_gamers 'period_minutes': row[3] }) return gamers def get_all_gamers_with_periods(self) -> List[Dict[str, Any]]: """Get all user-gamer pairs that have periods set (for periodic checks across all users)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT ug.user_id, g.id, g.username, ug.token, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.period_minutes > 0 ''') gamers = [] for row in cursor.fetchall(): gamers.append({ 'user_id': row[0], 'id': row[1], 'username': row[2], 'token': row[3], # Token from user_gamers 'period_minutes': row[4] }) return gamers