t2sTelegramBot/app/bot.py

217 lines
7 KiB
Python
Raw Normal View History

import asyncio
import json
import logging
import os
import tempfile
from datetime import datetime
import edge_tts
from mutagen.mp3 import MP3
from telegram import Bot, Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
USER_TOKEN = os.environ['USER_BOT_TOKEN']
ADMIN_TOKEN = os.environ['ADMIN_BOT_TOKEN']
TTS_VOICE = os.environ.get('TTS_VOICE', 'ru-RU-DmitryNeural')
DATA_DIR = os.environ.get('DATA_DIR', '/app/data')
os.makedirs(DATA_DIR, exist_ok=True)
ADMIN_CHAT_ID_FILE = os.path.join(DATA_DIR, 'admin_chat_id.txt')
STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
def load_admin_chat_id():
try:
with open(ADMIN_CHAT_ID_FILE) as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
return None
def save_admin_chat_id(chat_id):
with open(ADMIN_CHAT_ID_FILE, 'w') as f:
f.write(str(chat_id))
def load_stats():
try:
with open(STATS_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"users": [], "total_messages": 0}
def save_stats(stats):
with open(STATS_FILE, 'w') as f:
json.dump(stats, f)
def get_ogg_duration(path: str) -> int:
"""Read audio duration in seconds (MP3 from edge-tts v7)."""
try:
audio = MP3(path)
return int(audio.info.length)
except Exception:
logger.warning(f"Could not read duration from {path}")
return 0
admin_chat_id = load_admin_chat_id()
# Separate Bot instance for sending admin notifications
admin_bot = Bot(token=ADMIN_TOKEN)
# ────────────────────────── ADMIN BOT ──────────────────────────
async def admin_start(update: Update, context):
"""Capture admin chat ID on /start and persist."""
global admin_chat_id
admin_chat_id = update.effective_chat.id
save_admin_chat_id(admin_chat_id)
await update.message.reply_text(
f'✅ Админ-панель готова.\n'
f'Твой chat_id: <code>{admin_chat_id}</code>\n\n'
f'Сюда будут приходить все озвучки пользователей.\n'
f'Используй /stat для статистики.',
parse_mode='HTML'
)
logger.info(f"Admin registered: chat_id={admin_chat_id}")
async def admin_stat(update: Update, context):
"""Show bot usage statistics."""
s = load_stats()
unique = len(s['users'])
total = s['total_messages']
await update.message.reply_text(
f'📊 <b>Статистика бота</b>\n\n'
f'👥 Уникальных пользователей: <b>{unique}</b>\n'
f'🎤 Озвучено сообщений: <b>{total}</b>',
parse_mode='HTML'
)
# ────────────────────────── USER BOT ───────────────────────────
async def user_start(update: Update, context):
await update.message.reply_text(
'👋 Привет! Отправь мне текст, и я озвучу его '
'голосом <b>Дмитрия</b> (ru-RU).\n\n'
'Просто пришли любое текстовое сообщение.',
parse_mode='HTML'
)
async def handle_user_message(update: Update, context):
"""Receive text → TTS → reply voice → forward to admin."""
global admin_chat_id
user = update.effective_user
text = update.message.text
logger.info(f"User {user.full_name} (id={user.id}): {text[:80]}")
# Track stats
stats = load_stats()
if user.id not in stats['users']:
stats['users'].append(user.id)
stats['total_messages'] = stats.get('total_messages', 0) + 1
save_stats(stats)
# Tell Telegram we're recording audio
await update.message.chat.send_action(action='record_voice')
# Generate TTS with edge-tts (Microsoft, free, offline-capable)
tmp = tempfile.NamedTemporaryFile(suffix='.ogg', delete=False)
tmp_path = tmp.name
tmp.close()
try:
communicate = edge_tts.Communicate(text, voice=TTS_VOICE)
await communicate.save(tmp_path)
# ── 1. Send voice back to user ──
duration = get_ogg_duration(tmp_path)
with open(tmp_path, 'rb') as f:
await update.message.reply_voice(voice=f, duration=duration)
# ── 2. Forward to admin ──
if admin_chat_id:
caption = (
f'👤 <b>{user.full_name}</b>\n'
f'🆔 <code>{user.id}</code>\n'
f'📝 {text}\n'
f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n'
f'🔊 Длительность: {duration // 60}:{duration % 60:02d}'
)
try:
with open(tmp_path, 'rb') as f:
await admin_bot.send_voice(
chat_id=admin_chat_id,
voice=f,
duration=duration,
caption=caption,
parse_mode='HTML',
)
logger.info(f"Admin notified: {user.full_name}{text[:40]}")
except Exception as e:
logger.error(f"Failed to notify admin: {e}")
else:
logger.warning("Admin not registered — run /start in admin bot")
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
# ────────────────────────── SETUP ──────────────────────────────
def build_admin_app() -> Application:
app = Application.builder().token(ADMIN_TOKEN).build()
app.add_handler(CommandHandler('start', admin_start))
app.add_handler(CommandHandler('stat', admin_stat))
return app
def build_user_app() -> Application:
app = Application.builder().token(USER_TOKEN).build()
app.add_handler(CommandHandler('start', user_start))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_user_message))
return app
async def main():
admin_app = build_admin_app()
user_app = build_user_app()
# Run both bots concurrently
async with admin_app:
await admin_app.initialize()
await admin_app.start()
await admin_app.updater.start_polling()
logger.info("Admin bot polling started")
async with user_app:
await user_app.initialize()
await user_app.start()
await user_app.updater.start_polling()
logger.info("User bot polling started")
# Keep alive
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
logger.info("Shutting down gracefully...")