LichessStatTgWeb/LichessWebView/app.py

317 lines
12 KiB
Python

from flask import Flask, jsonify, render_template, request, session, redirect, url_for
from flask_cors import CORS
import sqlite3
from datetime import datetime, date
from functools import wraps
import os
import auth_config
app = Flask(__name__)
# Secret key для сессий (в продакшене лучше использовать переменную окружения)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-change-in-production-please')
CORS(app)
# Путь к базе данных бота
DB_PATH = "/app/data/lichess_bot.db"
def login_required(f):
"""Декоратор для защиты роутов, требующих аутентификации"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'authenticated' not in session or not session['authenticated']:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def get_message_counters_stats(db_path):
"""Get message counters statistics directly from database"""
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Check if table exists
cursor.execute('''
SELECT name FROM sqlite_master
WHERE type='table' AND name='message_counters'
''')
if not cursor.fetchone():
# Table doesn't exist, return empty stats
return {
'total_all_time': 0,
'total_today': 0,
'by_command': {}
}
# Reset daily counters if needed
today = date.today()
cursor.execute('''
SELECT command, last_reset_date FROM message_counters
''')
for row in cursor.fetchall():
cmd, last_reset = row[0], row[1]
if last_reset:
try:
last_reset_date = datetime.strptime(last_reset, '%Y-%m-%d').date()
if last_reset_date < today:
cursor.execute('''
UPDATE message_counters
SET today_count = 0, last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
except (ValueError, TypeError):
cursor.execute('''
UPDATE message_counters
SET today_count = 0, last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
else:
cursor.execute('''
UPDATE message_counters
SET last_reset_date = ?
WHERE command = ?
''', (today.isoformat(), cmd))
conn.commit()
# Get all counters
cursor.execute('''
SELECT command, total_count, today_count
FROM message_counters
ORDER BY command
''')
stats = {
'by_command': {},
'total_all_time': 0,
'total_today': 0
}
for row in cursor.fetchall():
cmd, total, today = row[0], row[1], row[2]
stats['by_command'][cmd] = {
'total': total,
'today': today
}
stats['total_all_time'] += total
stats['total_today'] += today
return {
'total_all_time': stats['total_all_time'],
'total_today': stats['total_today'],
'by_command': stats['by_command']
}
except Exception as e:
print(f"Error getting message counters: {e}")
import traceback
print(traceback.format_exc())
return {
'total_all_time': 0,
'total_today': 0,
'by_command': {}
}
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Страница входа"""
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
if auth_config.validate_credentials(username, password):
session['authenticated'] = True
session['username'] = username
return redirect(url_for('index'))
else:
error_msg = 'Неверный логин или пароль'
return redirect(url_for('login', error=error_msg))
# GET запрос - показать форму логина
error = request.args.get('error', '')
# Если уже авторизован, перенаправляем на главную
if 'authenticated' in session and session.get('authenticated'):
return redirect(url_for('index'))
return render_template('login.html', error=error)
@app.route('/logout')
def logout():
"""Выход из системы"""
session.pop('authenticated', None)
session.pop('username', None)
return redirect(url_for('login'))
@app.route('/')
@login_required
def index():
"""Главная страница"""
return render_template('index.html')
@app.route('/api/users')
@login_required
def get_users():
"""Получить всех пользователей"""
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
# Получаем всех пользователей
cursor.execute('''
SELECT
tu.user_id,
tu.username,
tu.first_name,
tu.last_name,
tu.created_at,
tu.bot_language,
COUNT(ug.id) as gamer_count,
SUM(CASE WHEN ug.period_minutes > 0 THEN 1 ELSE 0 END) as monitored_gamers
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
GROUP BY tu.user_id, tu.username, tu.first_name, tu.last_name, tu.created_at, tu.bot_language
ORDER BY tu.created_at DESC
''')
rows = cursor.fetchall()
users = []
for row in rows:
users.append({
'user_id': row[0],
'username': row[1] or '-',
'first_name': row[2] or '-',
'last_name': row[3],
'created_at': row[4],
'bot_language': row[5] or 'en', # Default to 'en' if not set
'gamer_count': row[6],
'monitored_gamers': row[7]
})
# Получаем общее количество игроков (уникальных)
cursor.execute('''
SELECT COUNT(DISTINCT g.id)
FROM gamers g
''')
total_gamers = cursor.fetchone()[0]
# Count new users today
today = date.today().isoformat()
cursor.execute("SELECT COUNT(*) FROM telegram_users WHERE DATE(created_at) = ?", (today,))
users_today = cursor.fetchone()[0]
# Count users without gamers
cursor.execute("""
SELECT COUNT(DISTINCT tu.user_id)
FROM telegram_users tu
LEFT JOIN user_gamers ug ON tu.user_id = ug.user_id
WHERE ug.id IS NULL
""")
users_without_gamers = cursor.fetchone()[0]
# Count users with Russian language
cursor.execute("""
SELECT COUNT(DISTINCT tu.user_id)
FROM telegram_users tu
WHERE tu.bot_language = 'ru'
""")
users_with_ru_language = cursor.fetchone()[0]
# Calculate percentages
total_users = len(users)
users_without_gamers_percent = round((users_without_gamers / total_users * 100)) if total_users > 0 else 0
users_ru_language_percent = round((users_with_ru_language / total_users * 100)) if total_users > 0 else 0
# Count new gamers today (from user_gamers table)
cursor.execute("""
SELECT COUNT(DISTINCT g.id)
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE DATE(ug.created_at) = ?
""", (today,))
gamers_today = cursor.fetchone()[0]
# Get message counters statistics
message_stats = get_message_counters_stats(DB_PATH)
# Filter out excluded commands (same as in admin_bot)
excluded_commands = {'lang', 'resetlang', 'start'}
if message_stats.get('by_command'):
filtered_by_command = {
cmd: data for cmd, data in message_stats['by_command'].items()
if cmd not in excluded_commands
}
message_stats['by_command'] = filtered_by_command
return jsonify({
'success': True,
'users': users,
'total_users': total_users,
'users_today': users_today,
'users_without_gamers': users_without_gamers,
'users_without_gamers_percent': users_without_gamers_percent,
'users_with_ru_language': users_with_ru_language,
'users_ru_language_percent': users_ru_language_percent,
'total_gamers': total_gamers,
'gamers_today': gamers_today,
'message_stats': message_stats
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/users/<int:user_id>/gamers')
@login_required
def get_user_gamers(user_id):
"""Получить игроков конкретного пользователя"""
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
# Получаем игроков пользователя
cursor.execute('''
SELECT
g.id,
g.username,
ug.token,
ug.period_minutes,
ug.created_at
FROM user_gamers ug
JOIN gamers g ON ug.gamer_id = g.id
WHERE ug.user_id = ?
ORDER BY g.username
''', (user_id,))
rows = cursor.fetchall()
gamers = []
for row in rows:
gamers.append({
'id': row[0],
'username': row[1],
'has_token': bool(row[2]), # Token from user_gamers, not gamers
'period_minutes': row[3],
'created_at': row[4]
})
return jsonify({
'success': True,
'gamers': gamers
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)