""" Admin Panel Telegram Bot Sends notifications about new users and new tracked players """ import logging import sqlite3 import aiohttp import asyncio from typing import Optional from telegram import Update from telegram.ext import ( Application, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters ) from config import ADMINPANEL_TELEGRAM_BOT_TOKEN, DATABASE_PATH, TELEGRAM_BOT_TOKEN from database import Database from message_counters import MessageCounters # Configure logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Conversation states for system message WAITING_FOR_SYSTEM_MESSAGE = range(1) class AdminBot: def __init__(self): self.db = Database() self.application = None self.counters = MessageCounters() async def send_notification(self, message: str): """Send notification to admin chat""" admin_chat_id = self.get_admin_chat_id() if not admin_chat_id: logger.warning("Admin chat ID not set, cannot send notification") return # Try to use application if available (when running as separate service) if self.application: try: await self.application.bot.send_message( chat_id=admin_chat_id, text=message, parse_mode='HTML' ) logger.debug(f"Notification sent via application to chat {admin_chat_id}") return except Exception as e: logger.warning(f"Failed to send via application: {e}, trying direct API call") # Fallback: use direct Telegram API call try: import aiohttp url = f"https://api.telegram.org/bot{ADMINPANEL_TELEGRAM_BOT_TOKEN}/sendMessage" async with aiohttp.ClientSession() as session: async with session.post(url, json={ "chat_id": admin_chat_id, "text": message, "parse_mode": "HTML" }) as response: if response.status == 200: logger.debug(f"Notification sent via direct API to chat {admin_chat_id}") else: error_text = await response.text() logger.error(f"Failed to send notification: {response.status} - {error_text}") except Exception as e: logger.error(f"Failed to send admin notification via API: {e}") import traceback logger.error(traceback.format_exc()) def get_admin_chat_id(self) -> Optional[int]: """Get admin chat ID from database""" return self.db.get_admin_chat_id() def set_admin_chat_id(self, chat_id: int): """Set admin chat ID in database""" self.db.set_admin_chat_id(chat_id) async def notify_new_user(self, user_id: int, username: Optional[str], first_name: Optional[str]): """Send notification about new Telegram user""" username_text = f"@{username}" if username else "без username" name_text = first_name if first_name else "без имени" message = ( f"🆕 Новый пользователь Telegram\n\n" f"ID: {user_id}\n" f"Username: {username_text}\n" f"Имя: {name_text}" ) await self.send_notification(message) async def notify_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str]): """Send notification about new tracked player""" added_by_text = f"@{added_by_username}" if added_by_username else f"ID: {added_by_user_id}" lichess_url = f"https://lichess.org/@/{player_username}" message = ( f"🎮 Добавлен новый игрок для отслеживания\n\n" f"Игрок: {player_username}\n" f"Добавил: {added_by_text}" ) await self.send_notification(message) async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Status command - show statistics""" try: import sqlite3 from datetime import date conn = sqlite3.connect(self.db.db_path) cursor = conn.cursor() # Count users cursor.execute("SELECT COUNT(*) FROM telegram_users") users_count = cursor.fetchone()[0] # Count new users today today = date.today().isoformat() cursor.execute("SELECT COUNT(*) FROM telegram_users WHERE DATE(created_at) = ?", (today,)) users_today = cursor.fetchone()[0] # Count unique gamers cursor.execute("SELECT COUNT(DISTINCT username) FROM gamers") gamers_count = cursor.fetchone()[0] # Count new gamers today (from user_gamers table) cursor.execute(""" SELECT COUNT(DISTINCT g.id) FROM user_gamers ug JOIN gamers g ON ug.gamer_id = g.id WHERE DATE(ug.created_at) = ? """, (today,)) gamers_today = cursor.fetchone()[0] conn.close() # Get message counters statistics stats = self.counters.get_stats_summary() # Filter out commands that should not be displayed excluded_commands = {'lang', 'resetlang', 'start'} filtered_stats = { cmd: data for cmd, data in stats['by_command'].items() if cmd not in excluded_commands } # Format message counters counters_text = "\n".join([ f" • {cmd}: {data['total']} (сегодня: {data['today']})" for cmd, data in sorted(filtered_stats.items()) ]) message = ( f"📊 Статистика базы данных\n\n" f"👥 Пользователей Telegram: {users_count} (сегодня: {users_today})\n" f"🎮 Отслеживаемых игроков: {gamers_count} (сегодня: {gamers_today})\n\n" f"📨 Счетчики сообщений\n\n" f"Всего отправлено: {stats['total_all_time']}\n" f"Сегодня отправлено: {stats['total_today']}\n\n" f"По командам:\n{counters_text}" ) await update.message.reply_text(message, parse_mode='HTML') except Exception as e: logger.error(f"Error getting status: {e}") await update.message.reply_text(f"❌ Ошибка получения статистики: {e}") async def system_mes_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Start system message command - ask for message text""" await update.message.reply_text( "📨 Введите текст сообщения для отправки всем пользователям:\n\n" "Используйте /cancel для отмены." ) return WAITING_FOR_SYSTEM_MESSAGE async def handle_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle system message text input and send to all users""" message_text = update.message.text if not message_text or not message_text.strip(): await update.message.reply_text("❌ Сообщение не может быть пустым. Попробуйте снова или используйте /cancel для отмены.") return WAITING_FOR_SYSTEM_MESSAGE # Get all users from database try: conn = sqlite3.connect(self.db.db_path) cursor = conn.cursor() cursor.execute("SELECT user_id FROM telegram_users") users = cursor.fetchall() conn.close() total_users = len(users) if total_users == 0: await update.message.reply_text("❌ В базе данных нет пользователей для отправки сообщения.") return ConversationHandler.END # Send status message status_msg = await update.message.reply_text( f"📤 Отправка сообщения {total_users} пользователям...\n\n" f"Сообщение:\n{message_text}" ) # Send message to all users success_count = 0 failed_count = 0 # Format message with system message header formatted_message = f"🔔 System Message\n\n{message_text}" for i, (user_id,) in enumerate(users): try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" async with aiohttp.ClientSession() as session: async with session.post(url, json={ "chat_id": user_id, "text": formatted_message, "parse_mode": "HTML" }, timeout=aiohttp.ClientTimeout(total=5)) as response: if response.status == 200: success_count += 1 else: failed_count += 1 error_text = await response.text() logger.warning(f"Failed to send to user {user_id}: {error_text}") except Exception as e: failed_count += 1 logger.error(f"Error sending to user {user_id}: {e}") # Add small delay to avoid rate limiting (except for the last user) if i < len(users) - 1: await asyncio.sleep(0.05) # 50ms delay between messages # Update status message result_text = ( f"✅ Рассылка завершена\n\n" f"📊 Статистика:\n" f"• Всего пользователей: {total_users}\n" f"• Успешно отправлено: {success_count}\n" f"• Ошибок: {failed_count}\n\n" f"Отправленное сообщение:\n{message_text}" ) await status_msg.edit_text(result_text, parse_mode='HTML') logger.info(f"System message sent: {success_count} successful, {failed_count} failed out of {total_users} users") except Exception as e: logger.error(f"Error in system message sending: {e}") import traceback logger.error(traceback.format_exc()) await update.message.reply_text(f"❌ Ошибка при отправке сообщений: {e}") return ConversationHandler.END async def cancel_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Cancel system message command""" await update.message.reply_text("❌ Отправка сообщения отменена.") return ConversationHandler.END async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Start command - register admin chat""" try: chat_id = update.effective_chat.id self.set_admin_chat_id(chat_id) await update.message.reply_text( "✅ Админ-панель активирована!\n\n" "Доступные команды:\n" "/status - Показать статистику\n" "/system_mes - Отправить сообщение всем пользователям" ) except Exception as e: logger.error(f"Error in start command: {e}") import traceback logger.error(traceback.format_exc()) try: await update.message.reply_text(f"Ошибка: {e}") except: pass def setup_handlers(self, application: Application): """Setup all handlers""" self.application = application # Store application reference # Conversation handler for system message system_mes_conv = ConversationHandler( entry_points=[CommandHandler("system_mes", self.system_mes_start)], states={ WAITING_FOR_SYSTEM_MESSAGE: [ MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_system_message) ], }, fallbacks=[CommandHandler("cancel", self.cancel_system_message)] ) # Add conversation handler application.add_handler(system_mes_conv) # Add start command handler application.add_handler(CommandHandler("start", self.start)) # Add status command handler application.add_handler(CommandHandler("status", self.status)) # Global admin bot instance _admin_bot_instance: Optional[AdminBot] = None def get_admin_bot() -> Optional[AdminBot]: """Get global admin bot instance""" return _admin_bot_instance def init_admin_bot(): """Initialize admin bot""" global _admin_bot_instance if not _admin_bot_instance: _admin_bot_instance = AdminBot() return _admin_bot_instance def main(): """Main function""" admin_bot = init_admin_bot() # Create application with Long Polling configuration application = Application.builder().token(ADMINPANEL_TELEGRAM_BOT_TOKEN).build() # Setup handlers admin_bot.setup_handlers(application) # Set application reference admin_bot.application = application # Add error handler async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log the error and send a telegram message to notify the developer.""" logger.error(f"Exception while handling an update: {context.error}") import traceback logger.error(traceback.format_exc()) application.add_error_handler(error_handler) # Add post_init hook to log bot info async def post_init(app: Application) -> None: bot_info = await app.bot.get_me() logger.info(f"Admin bot initialized: @{bot_info.username} (ID: {bot_info.id})") application.post_init = post_init # Start the bot with Long Polling logger.info("Starting Admin Panel Bot with Long Polling...") try: application.run_polling( poll_interval=1.0, timeout=30, drop_pending_updates=True, allowed_updates=["message", "callback_query"] ) except Exception as e: logger.error(f"Fatal error in run_polling: {e}") import traceback logger.error(traceback.format_exc()) raise if __name__ == '__main__': main()