- MANUAL-PRODUTO.md: Manual do usuário final - MANUAL-VENDAS.md: Estratégia comercial e vendas - MANUAL-TECNICO.md: Infraestrutura e deploy - README.md: Visão geral do projeto
107 lines
4.4 KiB
Python
107 lines
4.4 KiB
Python
import json
|
|
from datetime import datetime, timezone, date
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.product import Product
|
|
from app.models.scan import Scan
|
|
from app.schemas.scan import ScanRequest, ScanResult, ScanHistoryItem
|
|
from app.utils.security import get_current_user
|
|
from app.integrations.open_food_facts import fetch_product
|
|
from app.integrations.openai_client import analyze_product
|
|
from app.config import settings
|
|
from app.services.seed import SEED_PRODUCTS
|
|
|
|
router = APIRouter(prefix="/api", tags=["scan"])
|
|
|
|
@router.post("/scan", response_model=ScanResult)
|
|
async def scan_product(req: ScanRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
# Rate limit check
|
|
if not user.is_premium:
|
|
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
result = await db.execute(
|
|
select(func.count(Scan.id)).where(Scan.user_id == user.id, Scan.scanned_at >= today_start)
|
|
)
|
|
count = result.scalar()
|
|
if count >= settings.FREE_SCAN_LIMIT:
|
|
raise HTTPException(status_code=429, detail=f"Limite de {settings.FREE_SCAN_LIMIT} scans/dia atingido. Faça upgrade para Premium!")
|
|
|
|
# Check local cache
|
|
result = await db.execute(select(Product).where(Product.barcode == req.barcode))
|
|
product = result.scalar_one_or_none()
|
|
|
|
product_data = None
|
|
source = "cache"
|
|
|
|
if product:
|
|
product_data = {
|
|
"name": product.name, "brand": product.brand, "category": product.category,
|
|
"ingredients_text": product.ingredients_text, "nutri_score": product.nutri_score,
|
|
"nova_group": product.nova_group, "nutrition": json.loads(product.nutrition_json or "{}"),
|
|
"image_url": product.image_url,
|
|
}
|
|
else:
|
|
# Check seed data
|
|
if req.barcode in SEED_PRODUCTS:
|
|
product_data = SEED_PRODUCTS[req.barcode].copy()
|
|
source = "seed"
|
|
else:
|
|
# Fetch from Open Food Facts
|
|
product_data = await fetch_product(req.barcode)
|
|
source = "open_food_facts"
|
|
|
|
if product_data:
|
|
new_product = Product(
|
|
barcode=req.barcode, name=product_data.get("name"), brand=product_data.get("brand"),
|
|
category=product_data.get("category"), ingredients_text=product_data.get("ingredients_text"),
|
|
nutri_score=product_data.get("nutri_score"), nova_group=product_data.get("nova_group"),
|
|
nutrition_json=json.dumps(product_data.get("nutrition", {})),
|
|
image_url=product_data.get("image_url", ""),
|
|
)
|
|
db.add(new_product)
|
|
await db.commit()
|
|
|
|
if not product_data:
|
|
raise HTTPException(status_code=404, detail="Produto não encontrado. Tente inserir manualmente.")
|
|
|
|
# AI Analysis
|
|
analysis = await analyze_product(product_data)
|
|
|
|
# Save scan
|
|
scan = Scan(
|
|
user_id=user.id, barcode=req.barcode, product_name=product_data.get("name"),
|
|
brand=product_data.get("brand"), score=analysis.get("score", 50),
|
|
summary=analysis.get("summary", ""), analysis_json=json.dumps(analysis),
|
|
)
|
|
db.add(scan)
|
|
await db.commit()
|
|
|
|
return ScanResult(
|
|
barcode=req.barcode,
|
|
product_name=product_data.get("name"),
|
|
brand=product_data.get("brand"),
|
|
category=product_data.get("category"),
|
|
image_url=product_data.get("image_url"),
|
|
score=analysis.get("score", 50),
|
|
summary=analysis.get("summary", ""),
|
|
positives=analysis.get("positives", []),
|
|
negatives=analysis.get("negatives", []),
|
|
ingredients=analysis.get("ingredients", []),
|
|
nutri_score=product_data.get("nutri_score"),
|
|
nova_group=product_data.get("nova_group"),
|
|
source=source,
|
|
)
|
|
|
|
@router.get("/history", response_model=list[ScanHistoryItem])
|
|
async def get_history(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
result = await db.execute(
|
|
select(Scan).where(Scan.user_id == user.id).order_by(Scan.scanned_at.desc()).limit(50)
|
|
)
|
|
scans = result.scalars().all()
|
|
return [ScanHistoryItem(
|
|
id=s.id, barcode=s.barcode, product_name=s.product_name,
|
|
brand=s.brand, score=s.score, scanned_at=s.scanned_at
|
|
) for s in scans]
|