Files
aletheia/backend/app/routers/scan.py

174 lines
7.3 KiB
Python

import json
from datetime import datetime, timezone, date
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.database import get_db
from app.models.user import User
from app.models.product import Product
from app.models.scan import Scan
from app.schemas.scan import ScanRequest, ScanResult, ScanHistoryItem
from app.utils.security import get_current_user
from app.integrations.open_food_facts import fetch_product
from app.integrations.openai_client import analyze_product
from app.config import settings
from app.services.seed import SEED_PRODUCTS
router = APIRouter(prefix="/api", tags=["scan"])
@router.post("/scan", response_model=ScanResult)
async def scan_product(req: ScanRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
# Rate limit check
if not user.is_premium:
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
)
count = result.scalar()
if count >= settings.FREE_SCAN_LIMIT:
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido. Faça upgrade para Premium!")
# Check local cache
result = await db.execute(select(Product).where(Product.barcode == req.barcode))
product = result.scalar_one_or_none()
product_data = None
source = "cache"
if product:
product_data = {
"name": product.name, "brand": product.brand, "category": product.category,
"ingredients_text": product.ingredients_text, "nutri_score": product.nutri_score,
"nova_group": product.nova_group, "nutrition": json.loads(product.nutrition_json or "{}"),
"image_url": product.image_url,
}
else:
# Check seed data
if req.barcode in SEED_PRODUCTS:
product_data = SEED_PRODUCTS[req.barcode].copy()
source = "seed"
else:
# Fetch from Open Food Facts
product_data = await fetch_product(req.barcode)
source = "open_food_facts"
if product_data:
new_product = Product(
barcode=req.barcode, name=product_data.get("name"), brand=product_data.get("brand"),
category=product_data.get("category"), ingredients_text=product_data.get("ingredients_text"),
nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"),
nutrition_json=json.dumps(product_data.get("nutrition", {})),
image_url=product_data.get("image_url", ""),
)
db.add(new_product)
await db.commit()
if not product_data:
raise HTTPException(status_code=404, detail="Produto não encontrado. Tente inserir manualmente.")
# AI Analysis
analysis = await analyze_product(product_data)
# Save scan
scan = Scan(
user_id=user.id, barcode=req.barcode, product_name=product_data.get("name"),
brand=product_data.get("brand"), score=analysis.get("score", 50),
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
)
db.add(scan)
await db.commit()
return ScanResult(
barcode=req.barcode,
product_name=product_data.get("name"),
brand=product_data.get("brand"),
category=product_data.get("category"),
image_url=product_data.get("image_url"),
score=analysis.get("score", 50),
summary=analysis.get("summary", ""),
positives=analysis.get("positives", []),
negatives=analysis.get("negatives", []),
ingredients=analysis.get("ingredients", []),
nutri_score=product_data.get("nutri_score"),
nova_group=product_data.get("nova_group"),
nutrition=analysis.get("nutrition"),
nutrition_verdict=analysis.get("nutrition_verdict"),
recipe=analysis.get("recipe"),
source=source,
)
@router.get("/history", response_model=list[ScanHistoryItem])
async def get_history(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Scan).where(Scan.user_id == user.id).order_by(Scan.scanned_at.desc()).limit(50)
)
scans = result.scalars().all()
return [ScanHistoryItem(
id=s.id, barcode=s.barcode, product_name=s.product_name,
brand=s.brand, score=s.score, scanned_at=s.scanned_at
) for s in scans]
@router.get("/history/{scan_id}")
async def get_scan_detail(scan_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Scan).where(Scan.id == scan_id, Scan.user_id == user.id)
)
scan = result.scalar_one_or_none()
if not scan:
raise HTTPException(status_code=404, detail="Scan não encontrado")
analysis = json.loads(scan.analysis_json or '{}')
# Also get product info
prod_result = await db.execute(select(Product).where(Product.barcode == scan.barcode))
product = prod_result.scalar_one_or_none()
return {
"id": scan.id,
"barcode": scan.barcode,
"product_name": scan.product_name,
"brand": scan.brand,
"score": scan.score,
"summary": scan.summary,
"scanned_at": scan.scanned_at.isoformat() if scan.scanned_at else None,
"category": product.category if product else None,
"image_url": product.image_url if product else None,
"nutri_score": product.nutri_score if product else None,
"nova_group": product.nova_group if product else None,
"positives": analysis.get("positives", []),
"negatives": analysis.get("negatives", []),
"ingredients": analysis.get("ingredients", []),
}
@router.get("/history/{scan_id}")
async def get_scan_detail(scan_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Scan).where(Scan.id == scan_id, Scan.user_id == user.id)
)
scan = result.scalar_one_or_none()
if not scan:
raise HTTPException(status_code=404, detail="Scan não encontrado")
analysis = json.loads(scan.analysis_json or '{}')
prod_result = await db.execute(select(Product).where(Product.barcode == scan.barcode))
product = prod_result.scalar_one_or_none()
return {
"id": scan.id,
"barcode": scan.barcode,
"product_name": scan.product_name,
"brand": scan.brand,
"score": scan.score,
"summary": scan.summary,
"scanned_at": scan.scanned_at.isoformat() if scan.scanned_at else None,
"category": product.category if product else None,
"image_url": product.image_url if product else None,
"nutri_score": product.nutri_score if product else None,
"nova_group": product.nova_group if product else None,
"positives": analysis.get("positives", []),
"negatives": analysis.get("negatives", []),
"ingredients": analysis.get("ingredients", []),
"nutrition": analysis.get("nutrition", {}),
"nutrition_verdict": analysis.get("nutrition_verdict", ""),
"recipe": analysis.get("recipe"),
}