Files
aletheia/backend/app/routers/stats.py

88 lines
3.3 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc, asc
from datetime import datetime, timezone, timedelta
from app.database import get_db
from app.models.user import User
from app.models.scan import Scan
from app.utils.security import get_current_user
router = APIRouter(prefix="/api", tags=["stats"])
@router.get("/stats")
async def get_stats(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
now = datetime.now(timezone.utc)
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Total scans
total_res = await db.execute(select(func.count(Scan.id)).where(Scan.user_id == user.id))
total_scans = total_res.scalar() or 0
# Average score
avg_res = await db.execute(select(func.avg(Scan.score)).where(Scan.user_id == user.id))
avg_score = round(avg_res.scalar() or 0, 1)
# Monthly scans
monthly_res = await db.execute(
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= month_start)
)
monthly_scans = monthly_res.scalar() or 0
# Top 10 best this month
best_res = await db.execute(
select(Scan).where(Scan.user_id == user.id, Scan.scanned_at >= month_start)
.order_by(desc(Scan.score)).limit(10)
)
best = [{"id": s.id, "product_name": s.product_name, "brand": s.brand, "score": s.score,
"scanned_at": s.scanned_at.isoformat() if s.scanned_at else None} for s in best_res.scalars().all()]
# Top 10 worst this month
worst_res = await db.execute(
select(Scan).where(Scan.user_id == user.id, Scan.scanned_at >= month_start)
.order_by(asc(Scan.score)).limit(10)
)
worst = [{"id": s.id, "product_name": s.product_name, "brand": s.brand, "score": s.score,
"scanned_at": s.scanned_at.isoformat() if s.scanned_at else None} for s in worst_res.scalars().all()]
return {
"total_scans": total_scans,
"avg_score": avg_score,
"monthly_scans": monthly_scans,
"best": best,
"worst": worst,
}
@router.get("/stats/evolution")
async def get_evolution(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
now = datetime.now(timezone.utc)
three_months_ago = now - timedelta(days=90)
result = await db.execute(
select(Scan.scanned_at, Scan.score)
.where(Scan.user_id == user.id, Scan.scanned_at >= three_months_ago)
.order_by(Scan.scanned_at)
)
scans = result.all()
# Group by week
weeks = {}
for scanned_at, score in scans:
# ISO week
week_key = scanned_at.strftime("%Y-W%W")
week_start = scanned_at - timedelta(days=scanned_at.weekday())
if week_key not in weeks:
weeks[week_key] = {"week": week_start.strftime("%d/%m"), "scores": [], "count": 0}
weeks[week_key]["scores"].append(score)
weeks[week_key]["count"] += 1
evolution = []
for key in sorted(weeks.keys()):
w = weeks[key]
evolution.append({
"week": w["week"],
"avg_score": round(sum(w["scores"]) / len(w["scores"]), 1),
"count": w["count"],
})
return {"evolution": evolution}