"""
Admin Panel Telegram Bot
Sends notifications about new users and new tracked players
"""
import logging
import sqlite3
import aiohttp
import asyncio
from typing import Optional
from telegram import Update
from telegram.ext import (
Application, CommandHandler, ContextTypes,
ConversationHandler, MessageHandler, filters
)
from config import ADMINPANEL_TELEGRAM_BOT_TOKEN, DATABASE_PATH, TELEGRAM_BOT_TOKEN
from database import Database
from message_counters import MessageCounters
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Conversation states for system message
WAITING_FOR_SYSTEM_MESSAGE = range(1)
WAITING_FOR_USERS0_MESSAGE = range(1, 2)
class AdminBot:
def __init__(self):
self.db = Database()
self.application = None
self.counters = MessageCounters()
async def send_notification(self, message: str):
"""Send notification to admin chat"""
admin_chat_id = self.get_admin_chat_id()
if not admin_chat_id:
logger.warning("Admin chat ID not set, cannot send notification")
return
# Try to use application if available (when running as separate service)
if self.application:
try:
await self.application.bot.send_message(
chat_id=admin_chat_id,
text=message,
parse_mode='HTML'
)
logger.debug(f"Notification sent via application to chat {admin_chat_id}")
return
except Exception as e:
logger.warning(f"Failed to send via application: {e}, trying direct API call")
# Fallback: use direct Telegram API call
try:
import aiohttp
url = f"https://api.telegram.org/bot{ADMINPANEL_TELEGRAM_BOT_TOKEN}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": admin_chat_id,
"text": message,
"parse_mode": "HTML"
}) as response:
if response.status == 200:
logger.debug(f"Notification sent via direct API to chat {admin_chat_id}")
else:
error_text = await response.text()
logger.error(f"Failed to send notification: {response.status} - {error_text}")
except Exception as e:
logger.error(f"Failed to send admin notification via API: {e}")
import traceback
logger.error(traceback.format_exc())
def get_admin_chat_id(self) -> Optional[int]:
"""Get admin chat ID from database"""
return self.db.get_admin_chat_id()
def set_admin_chat_id(self, chat_id: int):
"""Set admin chat ID in database"""
self.db.set_admin_chat_id(chat_id)
async def _call_tts_service(self, text: str):
"""Call TTS service with given text"""
try:
import urllib.parse
import aiohttp
encoded_text = urllib.parse.quote(text)
tts_url = f"http://192.168.8.111:7901/TTS?text={encoded_text}"
async with aiohttp.ClientSession() as session:
async with session.get(tts_url, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
logger.info(f"TTS service called successfully: {text}")
else:
logger.warning(f"TTS service returned status {response.status}")
except Exception as e:
logger.error(f"Failed to call TTS service: {e}")
async def notify_new_user(self, user_id: int, username: Optional[str], first_name: Optional[str]):
"""Send notification about new Telegram user"""
username_text = f"@{username}" if username else "без username"
name_text = first_name if first_name else "без имени"
message = (
f"🆕 Новый пользователь Telegram\n\n"
f"ID: {user_id}\n"
f"Username: {username_text}\n"
f"Имя: {name_text}"
)
await self.send_notification(message)
# Call TTS service with user count
try:
import sqlite3
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM telegram_users")
total_users = cursor.fetchone()[0]
tts_text = f"Появился новый пользователь {total_users}"
await self._call_tts_service(tts_text)
except Exception as e:
logger.error(f"Failed to call TTS for new user: {e}")
async def notify_new_player(self, player_username: str, added_by_user_id: int, added_by_username: Optional[str], is_new_gamer: bool = False):
"""Send notification about new tracked player"""
added_by_text = f"@{added_by_username}" if added_by_username else f"ID: {added_by_user_id}"
lichess_url = f"https://lichess.org/@/{player_username}"
message = (
f"🎮 Добавлен новый игрок для отслеживания\n\n"
f"Игрок: {player_username}\n"
f"Добавил: {added_by_text}"
)
await self.send_notification(message)
# Call TTS service if this is a new gamer
if is_new_gamer:
try:
import sqlite3
with sqlite3.connect(self.db.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(DISTINCT username) FROM gamers")
total_gamers = cursor.fetchone()[0]
tts_text = f"Добавлен новый игрок {total_gamers}"
await self._call_tts_service(tts_text)
except Exception as e:
logger.error(f"Failed to call TTS for new gamer: {e}")
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Status command - show statistics"""
try:
import sqlite3
from datetime import date
conn = sqlite3.connect(self.db.db_path)
cursor = conn.cursor()
# Count users
cursor.execute("SELECT COUNT(*) FROM telegram_users")
users_count = cursor.fetchone()[0]
# Count new users today
today = date.today().isoformat()
cursor.execute("SELECT COUNT(*) FROM telegram_users WHERE DATE(created_at) = ?", (today,))
users_today = cursor.fetchone()[0]
# Count users without gamers
cursor.execute("""
SELECT COUNT(DISTINCT tu.user_id)
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
WHERE ug.id IS NULL
""")
users_without_gamers = cursor.fetchone()[0]
# Calculate percentage
users_without_gamers_percent = round((users_without_gamers / users_count * 100)) if users_count > 0 else 0
# Count unique gamers
cursor.execute("SELECT COUNT(DISTINCT username) FROM gamers")
gamers_count = cursor.fetchone()[0]
# Count new gamers today (from user_gamers table)
cursor.execute("""
SELECT COUNT(DISTINCT g.id)
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE DATE(ug.created_at) = ?
""", (today,))
gamers_today = cursor.fetchone()[0]
conn.close()
# Get message counters statistics
stats = self.counters.get_stats_summary()
# Filter out commands that should not be displayed
excluded_commands = {'lang', 'resetlang', 'start'}
filtered_stats = {
cmd: data for cmd, data in stats['by_command'].items()
if cmd not in excluded_commands
}
# Format message counters
counters_text = "\n".join([
f" • {cmd}: {data['total']} (сегодня: {data['today']})"
for cmd, data in sorted(filtered_stats.items())
])
message = (
f"📊 Статистика базы данных\n\n"
f"👥 Пользователей Telegram: {users_count} (сегодня: {users_today})\n"
f"👤 Пользователей без игроков: {users_without_gamers} ({users_without_gamers_percent}%)\n"
f"🎮 Отслеживаемых игроков: {gamers_count} (сегодня: {gamers_today})\n\n"
f"📨 Счетчики сообщений\n\n"
f"Всего отправлено: {stats['total_all_time']}\n"
f"Сегодня отправлено: {stats['total_today']}\n\n"
f"По командам:\n{counters_text}"
)
await update.message.reply_text(message, parse_mode='HTML')
except Exception as e:
logger.error(f"Error getting status: {e}")
await update.message.reply_text(f"❌ Ошибка получения статистики: {e}")
async def system_mes_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start system message command - ask for message text"""
await update.message.reply_text(
"📨 Введите текст сообщения для отправки всем пользователям:\n\n"
"Используйте /cancel для отмены."
)
return WAITING_FOR_SYSTEM_MESSAGE
async def handle_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle system message text input and send to all users"""
message_text = update.message.text
if not message_text or not message_text.strip():
await update.message.reply_text("❌ Сообщение не может быть пустым. Попробуйте снова или используйте /cancel для отмены.")
return WAITING_FOR_SYSTEM_MESSAGE
# Get all users from database
try:
conn = sqlite3.connect(self.db.db_path)
cursor = conn.cursor()
cursor.execute("SELECT user_id FROM telegram_users")
users = cursor.fetchall()
conn.close()
total_users = len(users)
if total_users == 0:
await update.message.reply_text("❌ В базе данных нет пользователей для отправки сообщения.")
return ConversationHandler.END
# Send status message
status_msg = await update.message.reply_text(
f"📤 Отправка сообщения {total_users} пользователям...\n\n"
f"Сообщение:\n{message_text}"
)
# Send message to all users
success_count = 0
failed_count = 0
# Format message with system message header
formatted_message = f"🔔 System Message\n\n{message_text}"
for i, (user_id,) in enumerate(users):
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": user_id,
"text": formatted_message,
"parse_mode": "HTML"
}, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
success_count += 1
else:
failed_count += 1
error_text = await response.text()
logger.warning(f"Failed to send to user {user_id}: {error_text}")
except Exception as e:
failed_count += 1
logger.error(f"Error sending to user {user_id}: {e}")
# Add small delay to avoid rate limiting (except for the last user)
if i < len(users) - 1:
await asyncio.sleep(0.05) # 50ms delay between messages
# Update status message
result_text = (
f"✅ Рассылка завершена\n\n"
f"📊 Статистика:\n"
f"• Всего пользователей: {total_users}\n"
f"• Успешно отправлено: {success_count}\n"
f"• Ошибок: {failed_count}\n\n"
f"Отправленное сообщение:\n{message_text}"
)
await status_msg.edit_text(result_text, parse_mode='HTML')
logger.info(f"System message sent: {success_count} successful, {failed_count} failed out of {total_users} users")
except Exception as e:
logger.error(f"Error in system message sending: {e}")
import traceback
logger.error(traceback.format_exc())
await update.message.reply_text(f"❌ Ошибка при отправке сообщений: {e}")
return ConversationHandler.END
async def cancel_system_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Cancel system message command"""
await update.message.reply_text("❌ Отправка сообщения отменена.")
return ConversationHandler.END
async def sendusers0_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start sendusers0 command - ask for message text"""
await update.message.reply_text(
"📨 Введите текст сообщения для отправки пользователям без добавленных игроков:\n\n"
"Используйте /cancel для отмены."
)
return WAITING_FOR_USERS0_MESSAGE
async def handle_users0_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle message text input and send to users without gamers"""
message_text = update.message.text
if not message_text or not message_text.strip():
await update.message.reply_text("❌ Сообщение не может быть пустым. Попробуйте снова или используйте /cancel для отмены.")
return WAITING_FOR_USERS0_MESSAGE
# Get users without gamers from database
try:
conn = sqlite3.connect(self.db.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT tu.user_id
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
WHERE ug.id IS NULL
""")
users = cursor.fetchall()
conn.close()
total_users = len(users)
if total_users == 0:
await update.message.reply_text("❌ В базе данных нет пользователей без игроков для отправки сообщения.")
return ConversationHandler.END
# Send status message
status_msg = await update.message.reply_text(
f"📤 Отправка сообщения {total_users} пользователям без игроков...\n\n"
f"Сообщение:\n{message_text}"
)
# Send message to users without gamers
success_count = 0
failed_count = 0
# Format message with system message header
formatted_message = f"🔔 System Message\n\n{message_text}"
for i, (user_id,) in enumerate(users):
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={
"chat_id": user_id,
"text": formatted_message,
"parse_mode": "HTML"
}, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
success_count += 1
else:
failed_count += 1
error_text = await response.text()
logger.warning(f"Failed to send to user {user_id}: {error_text}")
except Exception as e:
failed_count += 1
logger.error(f"Error sending to user {user_id}: {e}")
# Add small delay to avoid rate limiting (except for the last user)
if i < len(users) - 1:
await asyncio.sleep(0.05) # 50ms delay between messages
# Update status message
result_text = (
f"✅ Рассылка завершена\n\n"
f"📊 Статистика:\n"
f"• Всего пользователей без игроков: {total_users}\n"
f"• Успешно отправлено: {success_count}\n"
f"• Ошибок: {failed_count}\n\n"
f"Отправленное сообщение:\n{message_text}"
)
await status_msg.edit_text(result_text, parse_mode='HTML')
logger.info(f"Users0 message sent: {success_count} successful, {failed_count} failed out of {total_users} users")
except Exception as e:
logger.error(f"Error in users0 message sending: {e}")
import traceback
logger.error(traceback.format_exc())
await update.message.reply_text(f"❌ Ошибка при отправке сообщений: {e}")
return ConversationHandler.END
async def cancel_users0_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Cancel users0 message command"""
await update.message.reply_text("❌ Отправка сообщения отменена.")
return ConversationHandler.END
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Start command - register admin chat"""
try:
chat_id = update.effective_chat.id
self.set_admin_chat_id(chat_id)
await update.message.reply_text(
"✅ Админ-панель активирована!\n\n"
"Доступные команды:\n"
"/status - Показать статистику\n"
"/system_mes - Отправить сообщение всем пользователям"
)
except Exception as e:
logger.error(f"Error in start command: {e}")
import traceback
logger.error(traceback.format_exc())
try:
await update.message.reply_text(f"Ошибка: {e}")
except:
pass
def setup_handlers(self, application: Application):
"""Setup all handlers"""
self.application = application # Store application reference
# Conversation handler for system message
system_mes_conv = ConversationHandler(
entry_points=[CommandHandler("system_mes", self.system_mes_start)],
states={
WAITING_FOR_SYSTEM_MESSAGE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_system_message)
],
},
fallbacks=[CommandHandler("cancel", self.cancel_system_message)]
)
# Conversation handler for sendusers0 message
sendusers0_conv = ConversationHandler(
entry_points=[CommandHandler("sendusers0", self.sendusers0_start)],
states={
WAITING_FOR_USERS0_MESSAGE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_users0_message)
],
},
fallbacks=[CommandHandler("cancel", self.cancel_users0_message)]
)
# Add conversation handlers
application.add_handler(system_mes_conv)
application.add_handler(sendusers0_conv)
# Add start command handler
application.add_handler(CommandHandler("start", self.start))
# Add status command handler
application.add_handler(CommandHandler("status", self.status))
# Global admin bot instance
_admin_bot_instance: Optional[AdminBot] = None
def get_admin_bot() -> Optional[AdminBot]:
"""Get global admin bot instance"""
return _admin_bot_instance
def init_admin_bot():
"""Initialize admin bot"""
global _admin_bot_instance
if not _admin_bot_instance:
_admin_bot_instance = AdminBot()
return _admin_bot_instance
def main():
"""Main function"""
admin_bot = init_admin_bot()
# Create application with Long Polling configuration
application = Application.builder().token(ADMINPANEL_TELEGRAM_BOT_TOKEN).build()
# Setup handlers
admin_bot.setup_handlers(application)
# Set application reference
admin_bot.application = application
# Add error handler
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log the error and send a telegram message to notify the developer."""
logger.error(f"Exception while handling an update: {context.error}")
import traceback
logger.error(traceback.format_exc())
application.add_error_handler(error_handler)
# Add post_init hook to log bot info
async def post_init(app: Application) -> None:
bot_info = await app.bot.get_me()
logger.info(f"Admin bot initialized: @{bot_info.username} (ID: {bot_info.id})")
application.post_init = post_init
# Start the bot with Long Polling
logger.info("Starting Admin Panel Bot with Long Polling...")
try:
application.run_polling(
poll_interval=1.0,
timeout=30,
drop_pending_updates=True,
allowed_updates=["message", "callback_query"]
)
except Exception as e:
logger.error(f"Fatal error in run_polling: {e}")
import traceback
logger.error(traceback.format_exc())
raise
if __name__ == '__main__':
main()