267 lines
12 KiB
Python
267 lines
12 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, File, UploadFile
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.product import Product
|
|
from app.models.scan import Scan
|
|
from app.schemas.scan import ScanRequest, ScanResult, ScanHistoryItem
|
|
from app.utils.security import get_current_user
|
|
from app.integrations.open_food_facts import fetch_product
|
|
from app.integrations.openai_client import analyze_product, analyze_photo
|
|
from app.config import settings
|
|
from app.services.seed import SEED_PRODUCTS
|
|
from app.services.achievements import check_achievements
|
|
|
|
router = APIRouter(prefix="/api", tags=["scan"])
|
|
|
|
def get_user_context(user: User) -> dict:
|
|
"""Build user context for AI analysis."""
|
|
allergies = json.loads(user.allergies or "[]")
|
|
health_profile = user.health_profile or "normal"
|
|
return {"allergies": allergies, "health_profile": health_profile}
|
|
|
|
@router.post("/scan", response_model=ScanResult)
|
|
async def scan_product(req: ScanRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
if not user.is_premium:
|
|
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
result = await db.execute(
|
|
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
|
|
)
|
|
count = result.scalar()
|
|
if count >= settings.FREE_SCAN_LIMIT:
|
|
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido. Faça upgrade para Premium!")
|
|
|
|
result = await db.execute(select(Product).where(Product.barcode == req.barcode))
|
|
product = result.scalar_one_or_none()
|
|
|
|
product_data = None
|
|
source = "cache"
|
|
|
|
if product:
|
|
product_data = {
|
|
"name": product.name, "brand": product.brand, "category": product.category,
|
|
"ingredients_text": product.ingredients_text, "nutri_score": product.nutri_score,
|
|
"nova_group": product.nova_group, "nutrition": json.loads(product.nutrition_json or "{}"),
|
|
"image_url": product.image_url,
|
|
}
|
|
else:
|
|
if req.barcode in SEED_PRODUCTS:
|
|
product_data = SEED_PRODUCTS[req.barcode].copy()
|
|
source = "seed"
|
|
else:
|
|
product_data = await fetch_product(req.barcode)
|
|
source = "open_food_facts"
|
|
|
|
if product_data:
|
|
new_product = Product(
|
|
barcode=req.barcode, name=product_data.get("name"), brand=product_data.get("brand"),
|
|
category=product_data.get("category"), ingredients_text=product_data.get("ingredients_text"),
|
|
nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"),
|
|
nutrition_json=json.dumps(product_data.get("nutrition", {})),
|
|
image_url=product_data.get("image_url", ""),
|
|
)
|
|
db.add(new_product)
|
|
await db.commit()
|
|
|
|
if not product_data:
|
|
raise HTTPException(status_code=404, detail="Produto não encontrado. Tente inserir manualmente.")
|
|
|
|
user_context = get_user_context(user)
|
|
analysis = await analyze_product(product_data, user_context=user_context)
|
|
|
|
# Add allergen alerts
|
|
allergies = json.loads(user.allergies or "[]")
|
|
allergen_alerts = []
|
|
if allergies and analysis.get("ingredients"):
|
|
for ing in analysis["ingredients"]:
|
|
ing_name = (ing.get("name", "") + " " + ing.get("popular_name", "")).lower()
|
|
for allergy in allergies:
|
|
allergy_lower = allergy.lower()
|
|
# Map common allergy names to ingredient keywords
|
|
allergy_keywords = {
|
|
"glúten": ["glúten", "trigo", "centeio", "cevada", "aveia", "farinha de trigo", "wheat", "gluten"],
|
|
"lactose": ["lactose", "leite", "soro de leite", "whey", "caseína", "lácteo", "milk", "dairy"],
|
|
"amendoim": ["amendoim", "peanut"],
|
|
"soja": ["soja", "lecitina de soja", "soy"],
|
|
"ovo": ["ovo", "albumina", "egg"],
|
|
"frutos do mar": ["camarão", "peixe", "lagosta", "caranguejo", "marisco", "fish", "shrimp"],
|
|
"nozes": ["nozes", "castanha", "amêndoa", "avelã", "nuts", "almond"],
|
|
}
|
|
keywords = allergy_keywords.get(allergy_lower, [allergy_lower])
|
|
for kw in keywords:
|
|
if kw in ing_name:
|
|
allergen_alerts.append({
|
|
"ingredient": ing.get("name", ""),
|
|
"allergy": allergy,
|
|
})
|
|
ing["is_allergen"] = True
|
|
break
|
|
|
|
analysis["allergen_alerts"] = allergen_alerts
|
|
|
|
# Save scan
|
|
scan = Scan(
|
|
user_id=user.id, barcode=req.barcode, product_name=product_data.get("name"),
|
|
brand=product_data.get("brand"), score=analysis.get("score", 50),
|
|
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
|
|
)
|
|
db.add(scan)
|
|
await db.commit()
|
|
|
|
# Check achievements
|
|
new_badges = await check_achievements(user.id, db, action="scan")
|
|
|
|
return ScanResult(
|
|
id=scan.id,
|
|
barcode=req.barcode,
|
|
product_name=product_data.get("name"),
|
|
brand=product_data.get("brand"),
|
|
category=product_data.get("category"),
|
|
image_url=product_data.get("image_url"),
|
|
score=analysis.get("score", 50),
|
|
summary=analysis.get("summary", ""),
|
|
positives=analysis.get("positives", []),
|
|
negatives=analysis.get("negatives", []),
|
|
ingredients=analysis.get("ingredients", []),
|
|
nutri_score=product_data.get("nutri_score"),
|
|
nova_group=product_data.get("nova_group"),
|
|
nutrition=analysis.get("nutrition"),
|
|
nutrition_verdict=analysis.get("nutrition_verdict"),
|
|
recipe=analysis.get("recipe"),
|
|
substitutions=analysis.get("substitutions"),
|
|
allergen_alerts=allergen_alerts,
|
|
new_badges=new_badges,
|
|
source=source,
|
|
)
|
|
|
|
@router.get("/history", response_model=list[ScanHistoryItem])
|
|
async def get_history(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
result = await db.execute(
|
|
select(Scan).where(Scan.user_id == user.id).order_by(Scan.scanned_at.desc()).limit(50)
|
|
)
|
|
scans = result.scalars().all()
|
|
return [ScanHistoryItem(
|
|
id=s.id, barcode=s.barcode, product_name=s.product_name,
|
|
brand=s.brand, score=s.score, scanned_at=s.scanned_at
|
|
) for s in scans]
|
|
|
|
@router.get("/history/{scan_id}")
|
|
async def get_scan_detail(scan_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
result = await db.execute(
|
|
select(Scan).where(Scan.id == scan_id, Scan.user_id == user.id)
|
|
)
|
|
scan = result.scalar_one_or_none()
|
|
if not scan:
|
|
raise HTTPException(status_code=404, detail="Scan não encontrado")
|
|
|
|
analysis = json.loads(scan.analysis_json or '{}')
|
|
prod_result = await db.execute(select(Product).where(Product.barcode == scan.barcode))
|
|
product = prod_result.scalar_one_or_none()
|
|
|
|
return {
|
|
"id": scan.id,
|
|
"barcode": scan.barcode,
|
|
"product_name": scan.product_name,
|
|
"brand": scan.brand,
|
|
"score": scan.score,
|
|
"summary": scan.summary,
|
|
"scanned_at": scan.scanned_at.isoformat() if scan.scanned_at else None,
|
|
"category": product.category if product else None,
|
|
"image_url": product.image_url if product else None,
|
|
"nutri_score": product.nutri_score if product else None,
|
|
"nova_group": product.nova_group if product else None,
|
|
"positives": analysis.get("positives", []),
|
|
"negatives": analysis.get("negatives", []),
|
|
"ingredients": analysis.get("ingredients", []),
|
|
"nutrition": analysis.get("nutrition", {}),
|
|
"nutrition_verdict": analysis.get("nutrition_verdict", ""),
|
|
"recipe": analysis.get("recipe"),
|
|
"substitutions": analysis.get("substitutions"),
|
|
"allergen_alerts": analysis.get("allergen_alerts", []),
|
|
}
|
|
|
|
@router.post("/scan/photo")
|
|
async def scan_photo(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), file: UploadFile = File(...)):
|
|
if not user.is_premium:
|
|
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
result = await db.execute(
|
|
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
|
|
)
|
|
count = result.scalar()
|
|
if count >= settings.FREE_SCAN_LIMIT:
|
|
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido.")
|
|
|
|
contents = await file.read()
|
|
if len(contents) > 10 * 1024 * 1024:
|
|
raise HTTPException(status_code=400, detail="Imagem muito grande. Máximo 10MB.")
|
|
|
|
import base64
|
|
from PIL import Image
|
|
import io
|
|
try:
|
|
img = Image.open(io.BytesIO(contents))
|
|
img = img.convert("RGB")
|
|
max_dim = 1024
|
|
if max(img.size) > max_dim:
|
|
ratio = max_dim / max(img.size)
|
|
img = img.resize((int(img.size[0]*ratio), int(img.size[1]*ratio)), Image.LANCZOS)
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="JPEG", quality=85)
|
|
b64 = base64.b64encode(buf.getvalue()).decode()
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Imagem inválida: {str(e)}")
|
|
|
|
user_context = get_user_context(user)
|
|
analysis = await analyze_photo(b64, user_context=user_context)
|
|
|
|
if not analysis:
|
|
raise HTTPException(status_code=422, detail="Não foi possível analisar a imagem. Tente uma foto mais nítida do rótulo.")
|
|
|
|
scan = Scan(
|
|
user_id=user.id, barcode="PHOTO",
|
|
product_name=analysis.get("product_name", "Produto (foto)"),
|
|
brand=analysis.get("brand", ""), score=analysis.get("score", 50),
|
|
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
|
|
)
|
|
db.add(scan)
|
|
|
|
if analysis.get("product_name"):
|
|
new_product = Product(
|
|
barcode="PHOTO_" + str(hash(b64[:100]))[-8:],
|
|
name=analysis.get("product_name"), brand=analysis.get("brand", ""),
|
|
category=analysis.get("category", ""), ingredients_text=analysis.get("ingredients_text", ""),
|
|
nutri_score=analysis.get("nutri_score"), nova_group=analysis.get("nova_group"),
|
|
nutrition_json=json.dumps(analysis.get("nutrition", {})),
|
|
)
|
|
db.add(new_product)
|
|
|
|
await db.commit()
|
|
|
|
new_badges = await check_achievements(user.id, db, action="scan")
|
|
|
|
return {
|
|
"id": scan.id,
|
|
"barcode": "PHOTO",
|
|
"product_name": analysis.get("product_name", "Produto (foto)"),
|
|
"brand": analysis.get("brand", ""),
|
|
"category": analysis.get("category", ""),
|
|
"image_url": None,
|
|
"score": analysis.get("score", 50),
|
|
"summary": analysis.get("summary", ""),
|
|
"positives": analysis.get("positives", []),
|
|
"negatives": analysis.get("negatives", []),
|
|
"ingredients": analysis.get("ingredients", []),
|
|
"nutrition": analysis.get("nutrition", {}),
|
|
"nutrition_verdict": analysis.get("nutrition_verdict", ""),
|
|
"recipe": analysis.get("recipe"),
|
|
"substitutions": analysis.get("substitutions"),
|
|
"allergen_alerts": analysis.get("allergen_alerts", []),
|
|
"nutri_score": analysis.get("nutri_score"),
|
|
"nova_group": analysis.get("nova_group"),
|
|
"new_badges": new_badges,
|
|
"source": "photo",
|
|
}
|