scheduleSon/backend/api.py

293 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_
from datetime import datetime, timedelta
import pytz
from typing import List, Union
from backend.database import get_db, Task, Event, WeeklyTaskException
from backend.models import (
TaskCreate, EventCreate, ScheduleResponse, ScheduleItem,
TaskResponse, EventResponse, UpdateRequest
)
from backend.utils import (
get_week_range, check_event_overlap, get_weekday_from_date,
materialize_weekly_tasks, format_date_russian
)
router = APIRouter()
TZ = pytz.timezone("Europe/Moscow")
@router.get("/schedule", response_model=ScheduleResponse)
async def get_schedule(
from_date: str,
to_date: str,
db: AsyncSession = Depends(get_db)
):
"""Получить расписание в диапазоне дат"""
try:
from_dt = datetime.strptime(from_date, "%Y-%m-%d").date()
to_dt = datetime.strptime(to_date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD")
items = []
# Получаем разовые tasks
result = await db.execute(
select(Task).where(
and_(
Task.date >= from_date,
Task.date <= to_date,
Task.repeat_weekly == False
)
)
)
tasks = result.scalars().all()
for task in tasks:
items.append(ScheduleItem(
kind="task",
id=task.id,
date=task.date,
title=task.title,
repeat_weekly=False
))
# Получаем events
result = await db.execute(
select(Event).where(
and_(
Event.date >= from_date,
Event.date <= to_date
)
)
)
events = result.scalars().all()
for event in events:
items.append(ScheduleItem(
kind="event",
id=event.id,
date=event.date,
title=event.title,
start_time=event.start_time,
duration_min=event.duration_min
))
# Материализуем weekly tasks
weekly_tasks = await materialize_weekly_tasks(db, from_date, to_date)
items.extend(weekly_tasks)
return ScheduleResponse(items=items)
@router.post("/events")
async def create_item(
kind: str,
data: dict = Body(...),
db: AsyncSession = Depends(get_db)
):
"""Создать task или event"""
if kind == "task":
try:
task = TaskCreate(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid task data: {e}")
# Создаём основную task
new_task = Task(
date=task.date,
title=task.title,
repeat_weekly=task.repeat_weekly
)
if task.repeat_weekly:
weekday = get_weekday_from_date(task.date)
new_task.weekday = weekday
db.add(new_task)
await db.commit()
await db.refresh(new_task)
# Копирование на другие дни недели
if task.copy_to_weekdays and not task.repeat_weekly:
base_date = datetime.strptime(task.date, "%Y-%m-%d").date()
base_weekday = base_date.weekday()
for target_weekday in task.copy_to_weekdays:
# Вычисляем дату целевого дня недели в той же неделе
days_diff = (target_weekday - base_weekday) % 7
if days_diff > 0: # только будущие дни
target_date = base_date + timedelta(days=days_diff)
copy_task = Task(
date=target_date.strftime("%Y-%m-%d"),
title=task.title,
repeat_weekly=False
)
db.add(copy_task)
await db.commit()
return {"id": new_task.id, "kind": "task"}
elif kind == "event":
try:
event = EventCreate(**data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid event data: {e}")
# Проверка пересечений
overlap = await check_event_overlap(
db, event.date, event.start_time, event.duration_min, None
)
if overlap:
raise HTTPException(status_code=400, detail="Нельзя добавить: пересечение по времени")
new_event = Event(
date=event.date,
start_time=event.start_time,
duration_min=event.duration_min,
title=event.title
)
db.add(new_event)
await db.commit()
await db.refresh(new_event)
return {"id": new_event.id, "kind": "event"}
else:
raise HTTPException(status_code=400, detail="Invalid kind. Use 'task' or 'event'")
@router.put("/events/{item_id}")
async def update_item(
item_id: int,
update: UpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""Обновить task или event"""
# Пробуем найти как task
result = await db.execute(select(Task).where(Task.id == item_id))
task = result.scalar_one_or_none()
if task:
if task.repeat_weekly and update.scope == "one_date":
# Создаём исключение для конкретной даты
weekday = get_weekday_from_date(task.date)
exception = WeeklyTaskException(
weekday=weekday,
date=task.date,
action="replace" if update.title else "delete",
replacement_title=update.title
)
db.add(exception)
await db.commit()
return {"success": True, "action": "exception_created"}
if update.title:
if task.repeat_weekly and update.scope == "series":
# Обновляем все weekly tasks с этим weekday
await db.execute(
Task.__table__.update()
.where(Task.weekday == task.weekday)
.values(title=update.title)
)
else:
task.title = update.title
await db.commit()
return {"success": True}
# Пробуем найти как event
result = await db.execute(select(Event).where(Event.id == item_id))
event = result.scalar_one_or_none()
if event:
new_date = update.date or event.date
new_start_time = update.start_time or event.start_time
new_duration = update.duration_min or event.duration_min
# Проверка пересечений
overlap = await check_event_overlap(
db, new_date, new_start_time, new_duration, item_id
)
if overlap:
raise HTTPException(status_code=400, detail="Нельзя изменить: пересечение по времени")
if update.title:
event.title = update.title
if update.date:
event.date = update.date
if update.start_time:
event.start_time = update.start_time
if update.duration_min:
event.duration_min = update.duration_min
await db.commit()
return {"success": True}
raise HTTPException(status_code=404, detail="Item not found")
@router.delete("/events/{item_id}")
async def delete_item(
item_id: int,
scope: str = None,
db: AsyncSession = Depends(get_db)
):
"""Удалить task или event"""
# Пробуем найти как task
result = await db.execute(select(Task).where(Task.id == item_id))
task = result.scalar_one_or_none()
if task:
if task.repeat_weekly:
if scope == "one_date":
# Создаём исключение на удаление
weekday = get_weekday_from_date(task.date)
exception = WeeklyTaskException(
weekday=weekday,
date=task.date,
action="delete"
)
db.add(exception)
await db.commit()
return {"success": True, "action": "exception_created"}
elif scope == "series":
# Удаляем все weekly tasks с этим weekday
await db.execute(
Task.__table__.delete().where(Task.weekday == task.weekday)
)
await db.commit()
return {"success": True}
await db.delete(task)
await db.commit()
return {"success": True}
# Пробуем найти как event
result = await db.execute(select(Event).where(Event.id == item_id))
event = result.scalar_one_or_none()
if event:
await db.delete(event)
await db.commit()
return {"success": True}
raise HTTPException(status_code=404, detail="Item not found")
@router.get("/backup")
async def backup_database():
"""Создать backup базы данных"""
import shutil
import os
from datetime import datetime
db_path = os.getenv("DATABASE_PATH", "data/schedule.db")
backup_path = f"{db_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if os.path.exists(db_path):
shutil.copy2(db_path, backup_path)
return {"success": True, "backup_path": backup_path}
return {"success": False, "error": "Database not found"}