t2sTelegramBot/app/bot.py

220 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import tempfile
from datetime import datetime
import edge_tts
from mutagen.mp3 import MP3
from telegram import Bot, Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
USER_TOKEN = os.environ['USER_BOT_TOKEN']
ADMIN_TOKEN = os.environ['ADMIN_BOT_TOKEN']
TTS_VOICE = os.environ.get('TTS_VOICE', 'ru-RU-DmitryNeural')
DATA_DIR = os.environ.get('DATA_DIR', '/app/data')
os.makedirs(DATA_DIR, exist_ok=True)
ADMIN_CHAT_ID_FILE = os.path.join(DATA_DIR, 'admin_chat_id.txt')
STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
def load_admin_chat_id():
try:
with open(ADMIN_CHAT_ID_FILE) as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
return None
def save_admin_chat_id(chat_id):
with open(ADMIN_CHAT_ID_FILE, 'w') as f:
f.write(str(chat_id))
def load_stats():
try:
with open(STATS_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"users": [], "total_messages": 0}
def save_stats(stats):
with open(STATS_FILE, 'w') as f:
json.dump(stats, f)
def get_ogg_duration(path: str) -> int:
"""Read audio duration in seconds (MP3 from edge-tts v7)."""
try:
audio = MP3(path)
return int(audio.info.length)
except Exception:
logger.warning(f"Could not read duration from {path}")
return 0
admin_chat_id = load_admin_chat_id()
# Separate Bot instance for sending admin notifications
admin_bot = Bot(token=ADMIN_TOKEN)
# ────────────────────────── ADMIN BOT ──────────────────────────
async def admin_start(update: Update, context):
"""Capture admin chat ID on /start and persist."""
global admin_chat_id
admin_chat_id = update.effective_chat.id
save_admin_chat_id(admin_chat_id)
await update.message.reply_text(
f'✅ Админ-панель готова.\n'
f'Твой chat_id: <code>{admin_chat_id}</code>\n\n'
f'Сюда будут приходить все озвучки пользователей.\n'
f'Используй /stat для статистики.',
parse_mode='HTML'
)
logger.info(f"Admin registered: chat_id={admin_chat_id}")
async def admin_stat(update: Update, context):
"""Show bot usage statistics."""
s = load_stats()
unique = len(s['users'])
total = s['total_messages']
await update.message.reply_text(
f'📊 <b>Статистика бота</b>\n\n'
f'👥 Уникальных пользователей: <b>{unique}</b>\n'
f'🎤 Озвучено сообщений: <b>{total}</b>',
parse_mode='HTML'
)
# ────────────────────────── USER BOT ───────────────────────────
async def user_start(update: Update, context):
await update.message.reply_text(
'👋 Привет! Отправь мне текст, и я озвучу его '
'голосом <b>Дмитрия</b> (ru-RU).\n\n'
'Просто пришли любое текстовое сообщение.',
parse_mode='HTML'
)
async def handle_user_message(update: Update, context):
"""Receive text/caption → TTS → reply voice → forward to admin."""
global admin_chat_id
user = update.effective_user
text = update.message.text or update.message.caption
if not text:
# Media without caption — silently ignore
return
logger.info(f"User {user.full_name} (id={user.id}): {text[:80]}")
# Track stats
stats = load_stats()
if user.id not in stats['users']:
stats['users'].append(user.id)
stats['total_messages'] = stats.get('total_messages', 0) + 1
save_stats(stats)
# Tell Telegram we're recording audio
await update.message.chat.send_action(action='record_voice')
# Generate TTS with edge-tts (Microsoft, free, offline-capable)
tmp = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False)
tmp_path = tmp.name
tmp.close()
try:
communicate = edge_tts.Communicate(text, voice=TTS_VOICE)
await communicate.save(tmp_path)
# ── 1. Send voice back to user ──
duration = get_ogg_duration(tmp_path)
with open(tmp_path, 'rb') as f:
await update.message.reply_voice(voice=f, duration=duration)
# ── 2. Forward to admin ──
if admin_chat_id:
caption = (
f'👤 <b>{user.full_name}</b>\n'
f'🆔 <code>{user.id}</code>\n'
f'📝 {text}\n'
f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n'
f'🔊 Длительность: {duration // 60}:{duration % 60:02d}'
)
try:
with open(tmp_path, 'rb') as f:
await admin_bot.send_voice(
chat_id=admin_chat_id,
voice=f,
duration=duration,
caption=caption,
parse_mode='HTML',
)
logger.info(f"Admin notified: {user.full_name}{text[:40]}")
except Exception as e:
logger.error(f"Failed to notify admin: {e}")
else:
logger.warning("Admin not registered — run /start in admin bot")
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
# ────────────────────────── SETUP ──────────────────────────────
def build_admin_app() -> Application:
app = Application.builder().token(ADMIN_TOKEN).build()
app.add_handler(CommandHandler('start', admin_start))
app.add_handler(CommandHandler('stat', admin_stat))
return app
def build_user_app() -> Application:
app = Application.builder().token(USER_TOKEN).build()
app.add_handler(CommandHandler('start', user_start))
app.add_handler(MessageHandler((filters.TEXT | filters.CAPTION) & ~filters.COMMAND, handle_user_message))
return app
async def main():
admin_app = build_admin_app()
user_app = build_user_app()
# Run both bots concurrently
async with admin_app:
await admin_app.initialize()
await admin_app.start()
await admin_app.updater.start_polling()
logger.info("Admin bot polling started")
async with user_app:
await user_app.initialize()
await user_app.start()
await user_app.updater.start_polling()
logger.info("User bot polling started")
# Keep alive
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
logger.info("Shutting down gracefully...")