import sqlite3 import logging from typing import Optional, List, Dict, Any from config import DATABASE_PATH logger = logging.getLogger(__name__) class Database: def __init__(self, db_path: str = DATABASE_PATH): self.db_path = db_path self.init_database() def init_database(self): """Initialize database tables""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() logger.info(f"Using database at: {self.db_path}") # Create telegram_users table (Telegram bot users) cursor.execute(''' CREATE TABLE IF NOT EXISTS telegram_users ( user_id INTEGER PRIMARY KEY, username TEXT, first_name TEXT, last_name TEXT, language_code TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Add language_code column if it doesn't exist (Telegram language) try: cursor.execute("ALTER TABLE telegram_users ADD COLUMN language_code TEXT") except sqlite3.OperationalError: # Column already exists pass # Add bot_language column if it doesn't exist (user-selected bot language) try: cursor.execute("ALTER TABLE telegram_users ADD COLUMN bot_language TEXT DEFAULT 'en'") # Set default value for existing users cursor.execute("UPDATE telegram_users SET bot_language = 'en' WHERE bot_language IS NULL") conn.commit() except sqlite3.OperationalError: # Column already exists pass # Create gamers table (Lichess players only) cursor.execute(''' CREATE TABLE IF NOT EXISTS gamers ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, token TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create user_gamers table (relationship between users and gamers) cursor.execute(''' CREATE TABLE IF NOT EXISTS user_gamers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, gamer_id INTEGER NOT NULL, token TEXT, is_active BOOLEAN DEFAULT FALSE, period_minutes INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES telegram_users(user_id), FOREIGN KEY (gamer_id) REFERENCES gamers(id), UNIQUE(user_id, gamer_id) ) ''') # Add token column to user_gamers if it doesn't exist try: cursor.execute("ALTER TABLE user_gamers ADD COLUMN token TEXT") except sqlite3.OperationalError: # Column already exists pass # Create admin_settings table for admin bot configuration cursor.execute(''' CREATE TABLE IF NOT EXISTS admin_settings ( key TEXT PRIMARY KEY, value TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create message_counters table for tracking sent messages cursor.execute(''' CREATE TABLE IF NOT EXISTS message_counters ( command TEXT PRIMARY KEY, total_count INTEGER DEFAULT 0, today_count INTEGER DEFAULT 0, last_reset_date DATE DEFAULT CURRENT_DATE ) ''') # Initialize counters for all commands commands = ['start', 'addgamer', 'addtoken', 'getgamers', 'delgamer', 'today', 'yesterday', 'week', 'setperiod', 'lang', 'resetlang', 'periodic_notification'] for cmd in commands: cursor.execute(''' INSERT OR IGNORE INTO message_counters (command, total_count, today_count, last_reset_date) VALUES (?, 0, 0, CURRENT_DATE) ''', (cmd,)) conn.commit() # Migrate tokens from gamers to user_gamers if needed self._migrate_tokens_from_gamers() logger.info("Database initialized successfully") def _migrate_tokens_from_gamers(self): """Migrate tokens from old gamers table to user_gamers table if needed""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if there are tokens in gamers table that need migration cursor.execute("SELECT COUNT(*) FROM gamers WHERE token IS NOT NULL") gamers_with_tokens = cursor.fetchone()[0] if gamers_with_tokens == 0: return # No tokens to migrate # Check if user_gamers already has tokens cursor.execute("SELECT COUNT(*) FROM user_gamers WHERE token IS NOT NULL") user_gamers_with_tokens = cursor.fetchone()[0] if user_gamers_with_tokens > 0: return # Migration already done # Migrate tokens from gamers to user_gamers cursor.execute("SELECT id, token FROM gamers WHERE token IS NOT NULL") gamers_tokens = cursor.fetchall() migrated = 0 for gamer_id, token in gamers_tokens: # Update all user-gamer relationships for this gamer cursor.execute( "UPDATE user_gamers SET token = ? WHERE gamer_id = ? AND token IS NULL", (token, gamer_id) ) migrated += cursor.rowcount conn.commit() if migrated > 0: logger.info(f"Migrated {migrated} tokens from gamers to user_gamers") def add_or_get_telegram_user(self, user_id: int, username: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, language_code: Optional[str] = None) -> bool: """ Add or update Telegram user. For new users, automatically sets bot_language based on Telegram language_code: - 'ru' -> 'ru' (Russian) - anything else -> 'en' (English) """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT user_id, bot_language FROM telegram_users WHERE user_id = ?", (user_id,)) existing = cursor.fetchone() if not existing: # New user - determine bot language from Telegram language_code bot_language = 'ru' if language_code and language_code.lower().startswith('ru') else 'en' cursor.execute( "INSERT INTO telegram_users (user_id, username, first_name, last_name, language_code, bot_language) VALUES (?, ?, ?, ?, ?, ?)", (user_id, username, first_name, last_name, language_code, bot_language) ) conn.commit() logger.info(f"New user {user_id} added with bot_language={bot_language} (from Telegram language_code={language_code})") return True else: # Existing user - update language_code but keep bot_language unchanged cursor.execute( "UPDATE telegram_users SET language_code = ? WHERE user_id = ?", (language_code, user_id) ) conn.commit() return False def get_user_language(self, user_id: int) -> str: """Get user's selected bot language from database""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT bot_language FROM telegram_users WHERE user_id = ?", (user_id,)) result = cursor.fetchone() if result and result[0]: return result[0] # Default to English if not set return 'en' def set_user_language(self, user_id: int, language: str) -> bool: """Set user's selected bot language""" if language not in ['en', 'ru']: return False with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "UPDATE telegram_users SET bot_language = ? WHERE user_id = ?", (language, user_id) ) conn.commit() return cursor.rowcount > 0 def add_gamer(self, username: str) -> int: """Add a new gamer to the database (return gamer_id)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check if gamer already exists cursor.execute("SELECT id FROM gamers WHERE username = ?", (username,)) existing = cursor.fetchone() if existing: gamer_id = existing[0] else: # Add new gamer (without token - tokens are stored in user_gamers) cursor.execute( "INSERT INTO gamers (username) VALUES (?)", (username,) ) gamer_id = cursor.lastrowid conn.commit() return gamer_id def add_user_gamer(self, user_id: int, gamer_id: int, token: Optional[str] = None) -> bool: """Add relationship between user and gamer with optional token""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() try: cursor.execute( "INSERT INTO user_gamers (user_id, gamer_id, token) VALUES (?, ?, ?)", (user_id, gamer_id, token) ) conn.commit() return True except sqlite3.IntegrityError: # Already exists - update token if provided if token: cursor.execute( "UPDATE user_gamers SET token = ? WHERE user_id = ? AND gamer_id = ?", (token, user_id, gamer_id) ) conn.commit() return False def get_user_gamers(self, user_id: int) -> List[Dict[str, Any]]: """Get all gamers for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token, ug.is_active, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? ORDER BY ug.id ''', (user_id,)) gamers = [] for row in cursor.fetchall(): gamers.append({ 'id': row[0], 'username': row[1], 'token': row[2], # Token from user_gamers, not gamers 'is_active': bool(row[3]), 'period_minutes': row[4] }) return gamers def get_user_active_gamer(self, user_id: int) -> Optional[Dict[str, Any]]: """Get the active gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? AND ug.is_active = TRUE LIMIT 1 ''', (user_id,)) row = cursor.fetchone() if row: return { 'id': row[0], 'username': row[1], 'token': row[2] # Token from user_gamers } return None def set_user_active_gamer(self, user_id: int, gamer_id: int): """Set active gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Deactivate all gamers for this user cursor.execute( "UPDATE user_gamers SET is_active = FALSE WHERE user_id = ?", (user_id,) ) # Activate the selected gamer cursor.execute( "UPDATE user_gamers SET is_active = TRUE WHERE user_id = ? AND gamer_id = ?", (user_id, gamer_id) ) conn.commit() def set_user_gamer_period(self, user_id: int, gamer_id: int, period_minutes: int): """Set period for a gamer for a specific user""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( "UPDATE user_gamers SET period_minutes = ? WHERE user_id = ? AND gamer_id = ?", (period_minutes, user_id, gamer_id) ) conn.commit() def remove_user_gamer(self, user_id: int, gamer_id: int) -> bool: """Remove gamer from user's tracked list""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Delete the user-gamer relationship cursor.execute( "DELETE FROM user_gamers WHERE user_id = ? AND gamer_id = ?", (user_id, gamer_id) ) deleted = cursor.rowcount > 0 # Check if this gamer is still tracked by any other user cursor.execute( "SELECT COUNT(*) FROM user_gamers WHERE gamer_id = ?", (gamer_id,) ) other_users_count = cursor.fetchone()[0] # If no other users track this gamer, remove from gamers table if other_users_count == 0: cursor.execute("DELETE FROM gamers WHERE id = ?", (gamer_id,)) conn.commit() return deleted def get_user_gamers_with_periods(self, user_id: int) -> List[Dict[str, Any]]: """Get all gamers for a user that have periods set""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT g.id, g.username, ug.token, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.user_id = ? AND ug.period_minutes > 0 ''', (user_id,)) gamers = [] for row in cursor.fetchall(): gamers.append({ 'id': row[0], 'username': row[1], 'token': row[2], # Token from user_gamers 'period_minutes': row[3] }) return gamers def get_all_gamers_with_periods(self) -> List[Dict[str, Any]]: """Get all user-gamer pairs that have periods set (for periodic checks across all users)""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' SELECT ug.user_id, g.id, g.username, ug.token, ug.period_minutes FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE ug.period_minutes > 0 ''') gamers = [] for row in cursor.fetchall(): gamers.append({ 'user_id': row[0], 'id': row[1], 'username': row[2], 'token': row[3], # Token from user_gamers 'period_minutes': row[4] }) return gamers def get_admin_chat_id(self) -> Optional[int]: """Get admin chat ID from database""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT value FROM admin_settings WHERE key = 'admin_chat_id'") result = cursor.fetchone() if result: try: return int(result[0]) except (ValueError, TypeError): return None return None def set_admin_chat_id(self, chat_id: int): """Set admin chat ID in database""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO admin_settings (key, value, updated_at) VALUES ('admin_chat_id', ?, CURRENT_TIMESTAMP) ''', (str(chat_id),)) conn.commit() logger.info(f"Admin chat ID saved to database: {chat_id}")