Files

267 lines
12 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, File, UploadFile
import json
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.database import get_db
from app.models.user import User
from app.models.product import Product
from app.models.scan import Scan
from app.schemas.scan import ScanRequest, ScanResult, ScanHistoryItem
from app.utils.security import get_current_user
from app.integrations.open_food_facts import fetch_product
from app.integrations.openai_client import analyze_product, analyze_photo
from app.config import settings
from app.services.seed import SEED_PRODUCTS
from app.services.achievements import check_achievements
router = APIRouter(prefix="/api", tags=["scan"])
def get_user_context(user: User) -> dict:
"""Build user context for AI analysis."""
allergies = json.loads(user.allergies or "[]")
health_profile = user.health_profile or "normal"
return {"allergies": allergies, "health_profile": health_profile}
@router.post("/scan", response_model=ScanResult)
async def scan_product(req: ScanRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
if not user.is_premium:
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
)
count = result.scalar()
if count >= settings.FREE_SCAN_LIMIT:
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido. Faça upgrade para Premium!")
result = await db.execute(select(Product).where(Product.barcode == req.barcode))
product = result.scalar_one_or_none()
product_data = None
source = "cache"
if product:
product_data = {
"name": product.name, "brand": product.brand, "category": product.category,
"ingredients_text": product.ingredients_text, "nutri_score": product.nutri_score,
"nova_group": product.nova_group, "nutrition": json.loads(product.nutrition_json or "{}"),
"image_url": product.image_url,
}
else:
if req.barcode in SEED_PRODUCTS:
product_data = SEED_PRODUCTS[req.barcode].copy()
source = "seed"
else:
product_data = await fetch_product(req.barcode)
source = "open_food_facts"
if product_data:
new_product = Product(
barcode=req.barcode, name=product_data.get("name"), brand=product_data.get("brand"),
category=product_data.get("category"), ingredients_text=product_data.get("ingredients_text"),
nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"),
nutrition_json=json.dumps(product_data.get("nutrition", {})),
image_url=product_data.get("image_url", ""),
)
db.add(new_product)
await db.commit()
if not product_data:
raise HTTPException(status_code=404, detail="Produto não encontrado. Tente inserir manualmente.")
user_context = get_user_context(user)
analysis = await analyze_product(product_data, user_context=user_context)
# Add allergen alerts
allergies = json.loads(user.allergies or "[]")
allergen_alerts = []
if allergies and analysis.get("ingredients"):
for ing in analysis["ingredients"]:
ing_name = (ing.get("name", "") + " " + ing.get("popular_name", "")).lower()
for allergy in allergies:
allergy_lower = allergy.lower()
# Map common allergy names to ingredient keywords
allergy_keywords = {
"glúten": ["glúten", "trigo", "centeio", "cevada", "aveia", "farinha de trigo", "wheat", "gluten"],
"lactose": ["lactose", "leite", "soro de leite", "whey", "caseína", "lácteo", "milk", "dairy"],
"amendoim": ["amendoim", "peanut"],
"soja": ["soja", "lecitina de soja", "soy"],
"ovo": ["ovo", "albumina", "egg"],
"frutos do mar": ["camarão", "peixe", "lagosta", "caranguejo", "marisco", "fish", "shrimp"],
"nozes": ["nozes", "castanha", "amêndoa", "avelã", "nuts", "almond"],
}
keywords = allergy_keywords.get(allergy_lower, [allergy_lower])
for kw in keywords:
if kw in ing_name:
allergen_alerts.append({
"ingredient": ing.get("name", ""),
"allergy": allergy,
})
ing["is_allergen"] = True
break
analysis["allergen_alerts"] = allergen_alerts
# Save scan
scan = Scan(
user_id=user.id, barcode=req.barcode, product_name=product_data.get("name"),
brand=product_data.get("brand"), score=analysis.get("score", 50),
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
)
db.add(scan)
await db.commit()
# Check achievements
new_badges = await check_achievements(user.id, db, action="scan")
return ScanResult(
id=scan.id,
barcode=req.barcode,
product_name=product_data.get("name"),
brand=product_data.get("brand"),
category=product_data.get("category"),
image_url=product_data.get("image_url"),
score=analysis.get("score", 50),
summary=analysis.get("summary", ""),
positives=analysis.get("positives", []),
negatives=analysis.get("negatives", []),
ingredients=analysis.get("ingredients", []),
nutri_score=product_data.get("nutri_score"),
nova_group=product_data.get("nova_group"),
nutrition=analysis.get("nutrition"),
nutrition_verdict=analysis.get("nutrition_verdict"),
recipe=analysis.get("recipe"),
substitutions=analysis.get("substitutions"),
allergen_alerts=allergen_alerts,
new_badges=new_badges,
source=source,
)
@router.get("/history", response_model=list[ScanHistoryItem])
async def get_history(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Scan).where(Scan.user_id == user.id).order_by(Scan.scanned_at.desc()).limit(50)
)
scans = result.scalars().all()
return [ScanHistoryItem(
id=s.id, barcode=s.barcode, product_name=s.product_name,
brand=s.brand, score=s.score, scanned_at=s.scanned_at
) for s in scans]
@router.get("/history/{scan_id}")
async def get_scan_detail(scan_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Scan).where(Scan.id == scan_id, Scan.user_id == user.id)
)
scan = result.scalar_one_or_none()
if not scan:
raise HTTPException(status_code=404, detail="Scan não encontrado")
analysis = json.loads(scan.analysis_json or '{}')
prod_result = await db.execute(select(Product).where(Product.barcode == scan.barcode))
product = prod_result.scalar_one_or_none()
return {
"id": scan.id,
"barcode": scan.barcode,
"product_name": scan.product_name,
"brand": scan.brand,
"score": scan.score,
"summary": scan.summary,
"scanned_at": scan.scanned_at.isoformat() if scan.scanned_at else None,
"category": product.category if product else None,
"image_url": product.image_url if product else None,
"nutri_score": product.nutri_score if product else None,
"nova_group": product.nova_group if product else None,
"positives": analysis.get("positives", []),
"negatives": analysis.get("negatives", []),
"ingredients": analysis.get("ingredients", []),
"nutrition": analysis.get("nutrition", {}),
"nutrition_verdict": analysis.get("nutrition_verdict", ""),
"recipe": analysis.get("recipe"),
"substitutions": analysis.get("substitutions"),
"allergen_alerts": analysis.get("allergen_alerts", []),
}
@router.post("/scan/photo")
async def scan_photo(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), file: UploadFile = File(...)):
if not user.is_premium:
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
)
count = result.scalar()
if count >= settings.FREE_SCAN_LIMIT:
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido.")
contents = await file.read()
if len(contents) > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="Imagem muito grande. Máximo 10MB.")
import base64
from PIL import Image
import io
try:
img = Image.open(io.BytesIO(contents))
img = img.convert("RGB")
max_dim = 1024
if max(img.size) > max_dim:
ratio = max_dim / max(img.size)
img = img.resize((int(img.size[0]*ratio), int(img.size[1]*ratio)), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85)
b64 = base64.b64encode(buf.getvalue()).decode()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Imagem inválida: {str(e)}")
user_context = get_user_context(user)
analysis = await analyze_photo(b64, user_context=user_context)
if not analysis:
raise HTTPException(status_code=422, detail="Não foi possível analisar a imagem. Tente uma foto mais nítida do rótulo.")
scan = Scan(
user_id=user.id, barcode="PHOTO",
product_name=analysis.get("product_name", "Produto (foto)"),
brand=analysis.get("brand", ""), score=analysis.get("score", 50),
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
)
db.add(scan)
if analysis.get("product_name"):
new_product = Product(
barcode="PHOTO_" + str(hash(b64[:100]))[-8:],
name=analysis.get("product_name"), brand=analysis.get("brand", ""),
category=analysis.get("category", ""), ingredients_text=analysis.get("ingredients_text", ""),
nutri_score=analysis.get("nutri_score"), nova_group=analysis.get("nova_group"),
nutrition_json=json.dumps(analysis.get("nutrition", {})),
)
db.add(new_product)
await db.commit()
new_badges = await check_achievements(user.id, db, action="scan")
return {
"id": scan.id,
"barcode": "PHOTO",
"product_name": analysis.get("product_name", "Produto (foto)"),
"brand": analysis.get("brand", ""),
"category": analysis.get("category", ""),
"image_url": None,
"score": analysis.get("score", 50),
"summary": analysis.get("summary", ""),
"positives": analysis.get("positives", []),
"negatives": analysis.get("negatives", []),
"ingredients": analysis.get("ingredients", []),
"nutrition": analysis.get("nutrition", {}),
"nutrition_verdict": analysis.get("nutrition_verdict", ""),
"recipe": analysis.get("recipe"),
"substitutions": analysis.get("substitutions"),
"allergen_alerts": analysis.get("allergen_alerts", []),
"nutri_score": analysis.get("nutri_score"),
"nova_group": analysis.get("nova_group"),
"new_badges": new_badges,
"source": "photo",
}