прикрутили базу данных

This commit is contained in:
vrubelroman 2025-12-10 15:36:27 +03:00
parent f0e7e93e17
commit 39bf9d1933
3 changed files with 125 additions and 46 deletions

3
.gitignore vendored
View file

@ -11,4 +11,7 @@ env/
venv/ venv/
stats.json stats.json
instagram_cookies.txt instagram_cookies.txt
*.db
*.db-journal
data/

166
bot.py
View file

@ -3,8 +3,10 @@ import re
import json import json
import logging import logging
import asyncio import asyncio
import sqlite3
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from datetime import datetime
import yt_dlp import yt_dlp
from telegram import Update from telegram import Update
@ -21,54 +23,123 @@ logger = logging.getLogger(__name__)
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot') TELEGRAM_BOT_USERNAME = os.getenv('TELEGRAM_BOT_USERNAME', 'vrubelVideoDownload_bot')
# Базовая директория проекта (абсолютный путь), чтобы не зависеть от рабочей директории процесса
BASE_DIR = Path(__file__).resolve().parent
# Директория для временных файлов # Директория для временных файлов
DOWNLOADS_DIR = Path('video') DOWNLOADS_DIR = BASE_DIR / 'video'
DOWNLOADS_DIR.mkdir(exist_ok=True) DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
# Файл для хранения статистики # База данных (внутри папки data)
STATS_FILE = Path('stats.json') DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_FILE = DATA_DIR / 'bot.db'
def load_stats() -> dict: def init_database():
"""Загружает статистику из файла""" """Инициализирует базу данных и создает таблицы если их нет"""
if STATS_FILE.exists():
try:
with open(STATS_FILE, 'r', encoding='utf-8') as f:
stats = json.load(f)
# Обеспечиваем обратную совместимость
if 'users' not in stats:
stats['users'] = []
return stats
except Exception as e:
logger.error(f"Ошибка при загрузке статистики: {e}")
return {'total_downloads': 0, 'users': []}
return {'total_downloads': 0, 'users': []}
def save_stats(stats: dict):
"""Сохраняет статистику в файл"""
try: try:
with open(STATS_FILE, 'w', encoding='utf-8') as f: conn = sqlite3.connect(str(DB_FILE))
json.dump(stats, f, ensure_ascii=False, indent=2) cursor = conn.cursor()
# Таблица пользователей
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
chat_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL
)
''')
# Таблица статистики
cursor.execute('''
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_downloads INTEGER DEFAULT 0
)
''')
# Инициализируем stats если его нет
cursor.execute('SELECT COUNT(*) FROM stats')
if cursor.fetchone()[0] == 0:
cursor.execute('INSERT INTO stats (id, total_downloads) VALUES (1, 0)')
conn.commit()
conn.close()
logger.info("База данных инициализирована")
except Exception as e: except Exception as e:
logger.error(f"Ошибка при сохранении статистики: {e}") logger.error(f"Ошибка при инициализации базы данных: {e}")
def get_total_downloads() -> int:
"""Возвращает общее количество скачанных видео"""
try:
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
cursor.execute('SELECT total_downloads FROM stats WHERE id = 1')
result = cursor.fetchone()
conn.close()
return result[0] if result else 0
except Exception as e:
logger.error(f"Ошибка при получении количества скачанных видео: {e}")
return 0
def increment_downloads(): def increment_downloads():
"""Увеличивает счетчик скачанных видео""" """Увеличивает счетчик скачанных видео"""
stats = load_stats() try:
stats['total_downloads'] = stats.get('total_downloads', 0) + 1 conn = sqlite3.connect(str(DB_FILE))
save_stats(stats) cursor = conn.cursor()
logger.info(f"Общее количество скачанных видео: {stats['total_downloads']}") cursor.execute('UPDATE stats SET total_downloads = total_downloads + 1 WHERE id = 1')
conn.commit()
new_total = get_total_downloads()
conn.close()
logger.info(f"Общее количество скачанных видео: {new_total}")
except Exception as e:
logger.error(f"Ошибка при увеличении счетчика скачанных видео: {e}")
def add_user(chat_id: int): def get_total_users() -> int:
"""Добавляет пользователя в список уникальных пользователей""" """Возвращает общее количество уникальных пользователей"""
stats = load_stats() try:
users = stats.get('users', []) conn = sqlite3.connect(str(DB_FILE))
# Преобразуем в set для уникальности, затем обратно в list cursor = conn.cursor()
users_set = set(users) cursor.execute('SELECT COUNT(*) FROM users')
if chat_id not in users_set: result = cursor.fetchone()
users_set.add(chat_id) conn.close()
stats['users'] = list(users_set) return result[0] if result else 0
save_stats(stats) except Exception as e:
logger.info(f"Добавлен новый пользователь. Всего пользователей: {len(users_set)}") logger.error(f"Ошибка при получении количества пользователей: {e}")
return 0
def add_user(chat_id: int, username: str = None, first_name: str = None):
"""Добавляет пользователя в базу данных или обновляет информацию о нем"""
try:
now = datetime.now().isoformat()
conn = sqlite3.connect(str(DB_FILE))
cursor = conn.cursor()
# Проверяем, существует ли пользователь
cursor.execute('SELECT chat_id FROM users WHERE chat_id = ?', (chat_id,))
exists = cursor.fetchone()
if exists:
# Обновляем last_seen
cursor.execute(
'UPDATE users SET last_seen = ?, username = ?, first_name = ? WHERE chat_id = ?',
(now, username, first_name, chat_id)
)
else:
# Добавляем нового пользователя
cursor.execute(
'INSERT INTO users (chat_id, username, first_name, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)',
(chat_id, username, first_name, now, now)
)
total_users = get_total_users()
logger.info(f"Добавлен новый пользователь (chat_id: {chat_id}). Всего пользователей: {total_users}")
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Ошибка при добавлении пользователя: {e}")
def detect_video_source(url: str) -> str: def detect_video_source(url: str) -> str:
@ -350,9 +421,11 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
url = update.message.text.strip() url = update.message.text.strip()
chat_id = update.message.chat_id chat_id = update.message.chat_id
username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
# Добавляем пользователя в статистику при первом взаимодействии # Добавляем пользователя в статистику при первом взаимодействии
add_user(chat_id) add_user(chat_id, username, first_name)
# Проверяем, является ли сообщение URL # Проверяем, является ли сообщение URL
if not (url.startswith('http://') or url.startswith('https://')): if not (url.startswith('http://') or url.startswith('https://')):
@ -414,7 +487,9 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /start""" """Обрабатывает команду /start"""
# Добавляем пользователя в статистику # Добавляем пользователя в статистику
chat_id = update.message.chat_id chat_id = update.message.chat_id
add_user(chat_id) username = update.message.from_user.username if update.message.from_user else None
first_name = update.message.from_user.first_name if update.message.from_user else None
add_user(chat_id, username, first_name)
await update.message.reply_text( await update.message.reply_text(
"👋 Привет! Я бот для скачивания видео.\n\n" "👋 Привет! Я бот для скачивания видео.\n\n"
@ -432,10 +507,8 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE): async def stat_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Обрабатывает команду /stat""" """Обрабатывает команду /stat"""
stats = load_stats() total_downloads = get_total_downloads()
total_downloads = stats.get('total_downloads', 0) total_users = get_total_users()
users = stats.get('users', [])
total_users = len(users)
await update.message.reply_text( await update.message.reply_text(
f"📊 Статистика бота:\n\n" f"📊 Статистика бота:\n\n"
@ -450,6 +523,9 @@ def main():
logger.error("TELEGRAM_BOT_TOKEN не установлен!") logger.error("TELEGRAM_BOT_TOKEN не установлен!")
return return
# Инициализируем базу данных
init_database()
# Создаем приложение # Создаем приложение
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()

View file

@ -11,7 +11,7 @@ services:
volumes: volumes:
- ./video:/app/video - ./video:/app/video
- ./instagram_cookies.txt:/app/instagram_cookies.txt - ./instagram_cookies.txt:/app/instagram_cookies.txt
- ./stats.json:/app/stats.json - ./data:/app/data:Z
networks: networks:
- bot_network - bot_network