from fastapi import APIRouter, Depends, HTTPException, File, UploadFile import json from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from app.database import get_db from app.models.user import User from app.models.product import Product from app.models.scan import Scan from app.schemas.scan import ScanRequest, ScanResult, ScanHistoryItem from app.utils.security import get_current_user from app.integrations.open_food_facts import fetch_product from app.integrations.openai_client import analyze_product, analyze_photo from app.config import settings from app.services.seed import SEED_PRODUCTS from app.services.achievements import check_achievements router = APIRouter(prefix="/api", tags=["scan"]) def get_user_context(user: User) -> dict: """Build user context for AI analysis.""" allergies = json.loads(user.allergies or "[]") health_profile = user.health_profile or "normal" return {"allergies": allergies, "health_profile": health_profile} @router.post("/scan", response_model=ScanResult) async def scan_product(req: ScanRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): if not user.is_premium: today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) result = await db.execute( select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start) ) count = result.scalar() if count >= settings.FREE_SCAN_LIMIT: raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido. Faça upgrade para Premium!") result = await db.execute(select(Product).where(Product.barcode == req.barcode)) product = result.scalar_one_or_none() product_data = None source = "cache" if product: product_data = { "name": product.name, "brand": product.brand, "category": product.category, "ingredients_text": product.ingredients_text, "nutri_score": product.nutri_score, "nova_group": product.nova_group, "nutrition": json.loads(product.nutrition_json or "{}"), "image_url": product.image_url, } else: if req.barcode in SEED_PRODUCTS: product_data = SEED_PRODUCTS[req.barcode].copy() source = "seed" else: product_data = await fetch_product(req.barcode) source = "open_food_facts" if product_data: new_product = Product( barcode=req.barcode, name=product_data.get("name"), brand=product_data.get("brand"), category=product_data.get("category"), ingredients_text=product_data.get("ingredients_text"), nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"), nutrition_json=json.dumps(product_data.get("nutrition", {})), image_url=product_data.get("image_url", ""), ) db.add(new_product) await db.commit() if not product_data: raise HTTPException(status_code=404, detail="Produto não encontrado. Tente inserir manualmente.") user_context = get_user_context(user) analysis = await analyze_product(product_data, user_context=user_context) # Add allergen alerts allergies = json.loads(user.allergies or "[]") allergen_alerts = [] if allergies and analysis.get("ingredients"): for ing in analysis["ingredients"]: ing_name = (ing.get("name", "") + " " + ing.get("popular_name", "")).lower() for allergy in allergies: allergy_lower = allergy.lower() # Map common allergy names to ingredient keywords allergy_keywords = { "glúten": ["glúten", "trigo", "centeio", "cevada", "aveia", "farinha de trigo", "wheat", "gluten"], "lactose": ["lactose", "leite", "soro de leite", "whey", "caseína", "lácteo", "milk", "dairy"], "amendoim": ["amendoim", "peanut"], "soja": ["soja", "lecitina de soja", "soy"], "ovo": ["ovo", "albumina", "egg"], "frutos do mar": ["camarão", "peixe", "lagosta", "caranguejo", "marisco", "fish", "shrimp"], "nozes": ["nozes", "castanha", "amêndoa", "avelã", "nuts", "almond"], } keywords = allergy_keywords.get(allergy_lower, [allergy_lower]) for kw in keywords: if kw in ing_name: allergen_alerts.append({ "ingredient": ing.get("name", ""), "allergy": allergy, }) ing["is_allergen"] = True break analysis["allergen_alerts"] = allergen_alerts # Save scan scan = Scan( user_id=user.id, barcode=req.barcode, product_name=product_data.get("name"), brand=product_data.get("brand"), score=analysis.get("score", 50), summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis), ) db.add(scan) await db.commit() # Check achievements new_badges = await check_achievements(user.id, db, action="scan") return ScanResult( id=scan.id, barcode=req.barcode, product_name=product_data.get("name"), brand=product_data.get("brand"), category=product_data.get("category"), image_url=product_data.get("image_url"), score=analysis.get("score", 50), summary=analysis.get("summary", ""), positives=analysis.get("positives", []), negatives=analysis.get("negatives", []), ingredients=analysis.get("ingredients", []), nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"), nutrition=analysis.get("nutrition"), nutrition_verdict=analysis.get("nutrition_verdict"), recipe=analysis.get("recipe"), substitutions=analysis.get("substitutions"), allergen_alerts=allergen_alerts, new_badges=new_badges, source=source, ) @router.get("/history", response_model=list[ScanHistoryItem]) async def get_history(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): result = await db.execute( select(Scan).where(Scan.user_id == user.id).order_by(Scan.scanned_at.desc()).limit(50) ) scans = result.scalars().all() return [ScanHistoryItem( id=s.id, barcode=s.barcode, product_name=s.product_name, brand=s.brand, score=s.score, scanned_at=s.scanned_at ) for s in scans] @router.get("/history/{scan_id}") async def get_scan_detail(scan_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): result = await db.execute( select(Scan).where(Scan.id == scan_id, Scan.user_id == user.id) ) scan = result.scalar_one_or_none() if not scan: raise HTTPException(status_code=404, detail="Scan não encontrado") analysis = json.loads(scan.analysis_json or '{}') prod_result = await db.execute(select(Product).where(Product.barcode == scan.barcode)) product = prod_result.scalar_one_or_none() return { "id": scan.id, "barcode": scan.barcode, "product_name": scan.product_name, "brand": scan.brand, "score": scan.score, "summary": scan.summary, "scanned_at": scan.scanned_at.isoformat() if scan.scanned_at else None, "category": product.category if product else None, "image_url": product.image_url if product else None, "nutri_score": product.nutri_score if product else None, "nova_group": product.nova_group if product else None, "positives": analysis.get("positives", []), "negatives": analysis.get("negatives", []), "ingredients": analysis.get("ingredients", []), "nutrition": analysis.get("nutrition", {}), "nutrition_verdict": analysis.get("nutrition_verdict", ""), "recipe": analysis.get("recipe"), "substitutions": analysis.get("substitutions"), "allergen_alerts": analysis.get("allergen_alerts", []), } @router.post("/scan/photo") async def scan_photo(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), file: UploadFile = File(...)): if not user.is_premium: today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) result = await db.execute( select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start) ) count = result.scalar() if count >= settings.FREE_SCAN_LIMIT: raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido.") contents = await file.read() if len(contents) > 10 * 1024 * 1024: raise HTTPException(status_code=400, detail="Imagem muito grande. Máximo 10MB.") import base64 from PIL import Image import io try: img = Image.open(io.BytesIO(contents)) img = img.convert("RGB") max_dim = 1024 if max(img.size) > max_dim: ratio = max_dim / max(img.size) img = img.resize((int(img.size[0]*ratio), int(img.size[1]*ratio)), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) b64 = base64.b64encode(buf.getvalue()).decode() except Exception as e: raise HTTPException(status_code=400, detail=f"Imagem inválida: {str(e)}") user_context = get_user_context(user) analysis = await analyze_photo(b64, user_context=user_context) if not analysis: raise HTTPException(status_code=422, detail="Não foi possível analisar a imagem. Tente uma foto mais nítida do rótulo.") scan = Scan( user_id=user.id, barcode="PHOTO", product_name=analysis.get("product_name", "Produto (foto)"), brand=analysis.get("brand", ""), score=analysis.get("score", 50), summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis), ) db.add(scan) if analysis.get("product_name"): new_product = Product( barcode="PHOTO_" + str(hash(b64[:100]))[-8:], name=analysis.get("product_name"), brand=analysis.get("brand", ""), category=analysis.get("category", ""), ingredients_text=analysis.get("ingredients_text", ""), nutri_score=analysis.get("nutri_score"), nova_group=analysis.get("nova_group"), nutrition_json=json.dumps(analysis.get("nutrition", {})), ) db.add(new_product) await db.commit() new_badges = await check_achievements(user.id, db, action="scan") return { "id": scan.id, "barcode": "PHOTO", "product_name": analysis.get("product_name", "Produto (foto)"), "brand": analysis.get("brand", ""), "category": analysis.get("category", ""), "image_url": None, "score": analysis.get("score", 50), "summary": analysis.get("summary", ""), "positives": analysis.get("positives", []), "negatives": analysis.get("negatives", []), "ingredients": analysis.get("ingredients", []), "nutrition": analysis.get("nutrition", {}), "nutrition_verdict": analysis.get("nutrition_verdict", ""), "recipe": analysis.get("recipe"), "substitutions": analysis.get("substitutions"), "allergen_alerts": analysis.get("allergen_alerts", []), "nutri_score": analysis.get("nutri_score"), "nova_group": analysis.get("nova_group"), "new_badges": new_badges, "source": "photo", }