LichessStatTgWeb/LichessWebView/app.py
2025-11-20 02:22:44 +03:00

255 lines
8.9 KiB
Python

from flask import Flask, jsonify, render_template
from flask_cors import CORS
import sqlite3
from datetime import datetime, date
app = Flask(__name__)
CORS(app)
# Путь к базе данных бота
DB_PATH = "/app/data/lichess_bot.db"
def get_message_counters_stats(db_path):
"""Get message counters statistics directly from database"""
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Check if table exists
cursor.execute('''
SELECT name FROM sqlite_master
WHERE type='table' AND name='message_counters'
''')
if not cursor.fetchone():
# Table doesn't exist, return empty stats
return {
'total_all_time': 0,
'total_today': 0,
'by_command': {}
}
# Reset daily counters if needed
today = date.today()
cursor.execute('''
SELECT command, last_reset_date FROM message_counters
''')
for row in cursor.fetchall():
cmd, last_reset = row[0], row[1]
if last_reset:
try:
last_reset_date = datetime.strptime(last_reset, '%Y-%m-%d').date()
if last_reset_date < today:
cursor.execute('''
UPDATE message_counters
SET today_count = 0, last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
except (ValueError, TypeError):
cursor.execute('''
UPDATE message_counters
SET today_count = 0, last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
else:
cursor.execute('''
UPDATE message_counters
SET last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
conn.commit()
# Get all counters
cursor.execute('''
SELECT command, total_count, today_count
FROM message_counters
ORDER BY command
''')
stats = {
'by_command': {},
'total_all_time': 0,
'total_today': 0
}
for row in cursor.fetchall():
cmd, total, today = row[0], row[1], row[2]
stats['by_command'][cmd] = {
'total': total,
'today': today
}
stats['total_all_time'] += total
stats['total_today'] += today
return {
'total_all_time': stats['total_all_time'],
'total_today': stats['total_today'],
'by_command': stats['by_command']
}
except Exception as e:
print(f"Error getting message counters: {e}")
import traceback
print(traceback.format_exc())
return {
'total_all_time': 0,
'total_today': 0,
'by_command': {}
}
@app.route('/')
def index():
"""Главная страница"""
return render_template('index.html')
@app.route('/api/users')
def get_users():
"""Получить всех пользователей"""
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
# Получаем всех пользователей
cursor.execute('''
SELECT
tu.user_id,
tu.username,
tu.first_name,
tu.last_name,
tu.created_at,
COUNT(ug.id) as gamer_count,
SUM(CASE WHEN ug.period_minutes > 0 THEN 1 ELSE 0 END) as monitored_gamers
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
GROUP BY tu.user_id, tu.username, tu.first_name, tu.last_name, tu.created_at
ORDER BY tu.created_at DESC
''')
rows = cursor.fetchall()
users = []
for row in rows:
users.append({
'user_id': row[0],
'username': row[1] or '-',
'first_name': row[2] or '-',
'last_name': row[3],
'created_at': row[4],
'gamer_count': row[5],
'monitored_gamers': row[6]
})
# Получаем общее количество игроков (уникальных)
cursor.execute('''
SELECT COUNT(DISTINCT g.id)
FROM gamers g
''')
total_gamers = cursor.fetchone()[0]
# Count new users today
today = date.today().isoformat()
cursor.execute("SELECT COUNT(*) FROM telegram_users WHERE DATE(created_at) = ?", (today,))
users_today = cursor.fetchone()[0]
# Count users without gamers
cursor.execute("""
SELECT COUNT(DISTINCT tu.user_id)
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
WHERE ug.id IS NULL
""")
users_without_gamers = cursor.fetchone()[0]
# Calculate percentage
total_users = len(users)
users_without_gamers_percent = round((users_without_gamers / total_users * 100)) if total_users > 0 else 0
# Count new gamers today (from user_gamers table)
cursor.execute("""
SELECT COUNT(DISTINCT g.id)
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE DATE(ug.created_at) = ?
""", (today,))
gamers_today = cursor.fetchone()[0]
# Get message counters statistics
message_stats = get_message_counters_stats(DB_PATH)
# Filter out excluded commands (same as in admin_bot)
excluded_commands = {'lang', 'resetlang', 'start'}
if message_stats.get('by_command'):
filtered_by_command = {
cmd: data for cmd, data in message_stats['by_command'].items()
if cmd not in excluded_commands
}
message_stats['by_command'] = filtered_by_command
return jsonify({
'success': True,
'users': users,
'total_users': total_users,
'users_today': users_today,
'users_without_gamers': users_without_gamers,
'users_without_gamers_percent': users_without_gamers_percent,
'total_gamers': total_gamers,
'gamers_today': gamers_today,
'message_stats': message_stats
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/users/<int:user_id>/gamers')
def get_user_gamers(user_id):
"""Получить игроков конкретного пользователя"""
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
# Получаем игроков пользователя
cursor.execute('''
SELECT
g.id,
g.username,
ug.token,
ug.period_minutes,
ug.created_at
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.user_id = ?
ORDER BY g.username
''', (user_id,))
rows = cursor.fetchall()
gamers = []
for row in rows:
gamers.append({
'id': row[0],
'username': row[1],
'has_token': bool(row[2]), # Token from user_gamers, not gamers
'period_minutes': row[3],
'created_at': row[4]
})
return jsonify({
'success': True,
'gamers': gamers
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)