LichessStatTgWeb/LichessClientTG_bot/admin_bot.py
2025-11-13 15:07:53 +03:00

360 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Admin Panel Telegram Bot
Sends notifications about new users and new tracked players
"""
import logging
import sqlite3
import aiohttp
import asyncio
from typing import Optional
from telegram import Update
from telegram.ext import (
Application, CommandHandler, ContextTypes,
ConversationHandler, MessageHandler, filters
)
from config import ADMINPANEL_TELEGRAM_BOT_TOKEN, DATABASE_PATH, TELEGRAM_BOT_TOKEN
from database import Database
from message_counters import MessageCounters
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Conversation states for system message
WAITING_FOR_SYSTEM_MESSAGE = range(1)
class AdminBot:
def __init__(self):
self.db = Database()
self.application = None
self.counters = MessageCounters()
async def send_notification(self, message: str):
"""Send notification to admin chat"""
admin_chat_id = self.get_admin_chat_id()
if not admin_chat_id:
logger.warning("Admin chat ID not set, cannot send notification")
return
# Try to use application if available (when running as separate service)
if self.application:
try:
await self.application.bot.send_message(
chat_id=admin_chat_id,
text=message,
parse_mode='HTML'
)
logger.debug(f"Notification sent via application to chat {admin_chat_id}")
return
except Exception as e:
logger.warning(f"Failed to send via application: {e}, trying direct API call")
# Fallback: use direct Telegram API call
try:
import aiohttp
url = f"https://api.telegram.org/bot{ADMINPANEL_TELEGRAM_BOT_TOKEN}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": admin_chat_id,
"text": message,
"parse_mode": "HTML"
}) as response:
if response.status == 200:
logger.debug(f"Notification sent via direct API to chat {admin_chat_id}")
else:
error_text = await response.text()
logger.error(f"Failed to send notification: {response.status} - {error_text}")
except Exception as e:
logger.error(f"Failed to send admin notification via API: {e}")
import traceback
logger.error(traceback.format_exc())
def get_admin_chat_id(self) -> Optional[int]:
"""Get admin chat ID from database"""
return self.db.get_admin_chat_id()
def set_admin_chat_id(self, chat_id: int):
"""Set admin chat ID in database"""
self.db.set_admin_chat_id(chat_id)
async def notify_new_user(self, user_id: int, username: Optional[str], first_name: Optional[str]):
"""Send notification about new Telegram user"""
username_text = f"@{username}" if username else "без username"
name_text = first_name if first_name else "без имени"
message = (
f"🆕 <b>Новый пользователь Telegram</b>\n\n"
f"ID: {user_id}\n"
f"Username: {username_text}\n"
f"Имя: {name_text}"
)
await self.send_notification(message)
async def notify_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str]):
"""Send notification about new tracked player"""
added_by_text = f"@{added_by_username}" if added_by_username else f"ID: {added_by_user_id}"
lichess_url = f"https://lichess.org/@/{player_username}"
message = (
f"🎮 <b>Добавлен новый игрок для отслеживания</b>\n\n"
f"Игрок: <a href=\"{lichess_url}\">{player_username}</a>\n"
f"Добавил: {added_by_text}"
)
await self.send_notification(message)
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Status command - show statistics"""
try:
import sqlite3
conn = sqlite3.connect(self.db.db_path)
cursor = conn.cursor()
# Count users
cursor.execute("SELECT COUNT(*) FROM telegram_users")
users_count = cursor.fetchone()[0]
# Count unique gamers
cursor.execute("SELECT COUNT(DISTINCT username) FROM gamers")
gamers_count = cursor.fetchone()[0]
conn.close()
# Get message counters statistics
stats = self.counters.get_stats_summary()
# Filter out commands that should not be displayed
excluded_commands = {'lang', 'resetlang', 'start'}
filtered_stats = {
cmd: data for cmd, data in stats['by_command'].items()
if cmd not in excluded_commands
}
# Format message counters
counters_text = "\n".join([
f"{cmd}: {data['total']} (сегодня: {data['today']})"
for cmd, data in sorted(filtered_stats.items())
])
message = (
f"📊 <b>Статистика базы данных</b>\n\n"
f"👥 Пользователей Telegram: {users_count}\n"
f"🎮 Отслеживаемых игроков: {gamers_count}\n\n"
f"📨 <b>Счетчики сообщений</b>\n\n"
f"Всего отправлено: <b>{stats['total_all_time']}</b>\n"
f"Сегодня отправлено: <b>{stats['total_today']}</b>\n\n"
f"<b>По командам:</b>\n{counters_text}"
)
await update.message.reply_text(message, parse_mode='HTML')
except Exception as e:
logger.error(f"Error getting status: {e}")
await update.message.reply_text(f"❌ Ошибка получения статистики: {e}")
async def system_mes_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start system message command - ask for message text"""
await update.message.reply_text(
"📨 Введите текст сообщения для отправки всем пользователям:\n\n"
"Используйте /cancel для отмены."
)
return WAITING_FOR_SYSTEM_MESSAGE
async def handle_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle system message text input and send to all users"""
message_text = update.message.text
if not message_text or not message_text.strip():
await update.message.reply_text("❌ Сообщение не может быть пустым. Попробуйте снова или используйте /cancel для отмены.")
return WAITING_FOR_SYSTEM_MESSAGE
# Get all users from database
try:
conn = sqlite3.connect(self.db.db_path)
cursor = conn.cursor()
cursor.execute("SELECT user_id FROM telegram_users")
users = cursor.fetchall()
conn.close()
total_users = len(users)
if total_users == 0:
await update.message.reply_text("В базе данных нет пользователей для отправки сообщения.")
return ConversationHandler.END
# Send status message
status_msg = await update.message.reply_text(
f"📤 Отправка сообщения {total_users} пользователям...\n\n"
f"Сообщение:\n{message_text}"
)
# Send message to all users
success_count = 0
failed_count = 0
# Format message with system message header
formatted_message = f"🔔 <b>System Message</b>\n\n{message_text}"
for i, (user_id,) in enumerate(users):
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": user_id,
"text": formatted_message,
"parse_mode": "HTML"
}, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
success_count += 1
else:
failed_count += 1
error_text = await response.text()
logger.warning(f"Failed to send to user {user_id}: {error_text}")
except Exception as e:
failed_count += 1
logger.error(f"Error sending to user {user_id}: {e}")
# Add small delay to avoid rate limiting (except for the last user)
if i < len(users) - 1:
await asyncio.sleep(0.05) # 50ms delay between messages
# Update status message
result_text = (
f"✅ <b>Рассылка завершена</b>\n\n"
f"📊 Статистика:\n"
f"Всего пользователей: {total_users}\n"
f"• Успешно отправлено: {success_count}\n"
f"• Ошибок: {failed_count}\n\n"
f"<b>Отправленное сообщение:</b>\n{message_text}"
)
await status_msg.edit_text(result_text, parse_mode='HTML')
logger.info(f"System message sent: {success_count} successful, {failed_count} failed out of {total_users} users")
except Exception as e:
logger.error(f"Error in system message sending: {e}")
import traceback
logger.error(traceback.format_exc())
await update.message.reply_text(f"❌ Ошибка при отправке сообщений: {e}")
return ConversationHandler.END
async def cancel_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Cancel system message command"""
await update.message.reply_text("❌ Отправка сообщения отменена.")
return ConversationHandler.END
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command - register admin chat"""
try:
chat_id = update.effective_chat.id
self.set_admin_chat_id(chat_id)
await update.message.reply_text(
"✅ Админ-панель активирована!\n\n"
"Доступные команды:\n"
"/status - Показать статистику\n"
"/system_mes - Отправить сообщение всем пользователям"
)
except Exception as e:
logger.error(f"Error in start command: {e}")
import traceback
logger.error(traceback.format_exc())
try:
await update.message.reply_text(f"Ошибка: {e}")
except:
pass
def setup_handlers(self, application: Application):
"""Setup all handlers"""
self.application = application # Store application reference
# Conversation handler for system message
system_mes_conv = ConversationHandler(
entry_points=[CommandHandler("system_mes", self.system_mes_start)],
states={
WAITING_FOR_SYSTEM_MESSAGE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_system_message)
],
},
fallbacks=[CommandHandler("cancel", self.cancel_system_message)]
)
# Add conversation handler
application.add_handler(system_mes_conv)
# Add start command handler
application.add_handler(CommandHandler("start", self.start))
# Add status command handler
application.add_handler(CommandHandler("status", self.status))
# Global admin bot instance
_admin_bot_instance: Optional[AdminBot] = None
def get_admin_bot() -> Optional[AdminBot]:
"""Get global admin bot instance"""
return _admin_bot_instance
def init_admin_bot():
"""Initialize admin bot"""
global _admin_bot_instance
if not _admin_bot_instance:
_admin_bot_instance = AdminBot()
return _admin_bot_instance
def main():
"""Main function"""
admin_bot = init_admin_bot()
# Create application with Long Polling configuration
application = Application.builder().token(ADMINPANEL_TELEGRAM_BOT_TOKEN).build()
# Setup handlers
admin_bot.setup_handlers(application)
# Set application reference
admin_bot.application = application
# Add error handler
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log the error and send a telegram message to notify the developer."""
logger.error(f"Exception while handling an update: {context.error}")
import traceback
logger.error(traceback.format_exc())
application.add_error_handler(error_handler)
# Add post_init hook to log bot info
async def post_init(app: Application) -> None:
bot_info = await app.bot.get_me()
logger.info(f"Admin bot initialized: @{bot_info.username} (ID: {bot_info.id})")
application.post_init = post_init
# Start the bot with Long Polling
logger.info("Starting Admin Panel Bot with Long Polling...")
try:
application.run_polling(
poll_interval=1.0,
timeout=30,
drop_pending_updates=True,
allowed_updates=["message", "callback_query"]
)
except Exception as e:
logger.error(f"Fatal error in run_polling: {e}")
import traceback
logger.error(traceback.format_exc())
raise
if __name__ == '__main__':
main()