CLIO v1.0 — Scanner Inteligente com IA (MVP)

This commit is contained in:
Jarvis Deploy
2026-02-10 23:05:41 +00:00
commit 8e903d9222
41 changed files with 3190 additions and 0 deletions

0
backend/app/__init__.py Normal file
View File

17
backend/app/config.py Normal file
View File

@@ -0,0 +1,17 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://clio:Clio2026!@localhost:5432/clio"
SECRET_KEY: str = "clio-secret-key-2026-musa-da-historia"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7
OPENAI_API_KEY: str = ""
OPENAI_MODEL_TEXT: str = "gpt-4o-mini"
OPENAI_MODEL_VISION: str = "gpt-4o"
FREE_SCAN_LIMIT: int = 5
UPLOAD_DIR: str = "/opt/clio/uploads"
class Config:
env_file = ".env"
settings = Settings()

17
backend/app/database.py Normal file
View File

@@ -0,0 +1,17 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=False)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
async with async_session() as session:
yield session
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

42
backend/app/main.py Normal file
View File

@@ -0,0 +1,42 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from sqlalchemy import select
from app.database import init_db, async_session
from app.models.user import User
from app.routers import auth, documents
from app.utils.security import hash_password
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
# Seed admin user
async with async_session() as db:
result = await db.execute(select(User).where(User.email == "admin@clio.com"))
if not result.scalar_one_or_none():
admin = User(
email="admin@clio.com",
name="Admin CLIO",
password_hash=hash_password("Clio@2026"),
plan="premium"
)
db.add(admin)
await db.commit()
yield
app = FastAPI(title="CLIO API", version="1.0.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router)
app.include_router(documents.router)
@app.get("/api/health")
async def health():
return {"status": "ok", "app": "CLIO API v1.0 — Scanner Inteligente com IA"}

View File

@@ -0,0 +1,2 @@
from app.models.user import User
from app.models.document import Document

View File

@@ -0,0 +1,18 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON
from datetime import datetime, timezone
from app.database import Base
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
title = Column(String(500))
category = Column(String(50), index=True)
original_image = Column(Text)
extracted_text = Column(Text)
summary = Column(Text)
extracted_data = Column(JSON)
risk_alerts = Column(JSON)
tags = Column(JSON)
file_size = Column(Integer)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True)

View File

@@ -0,0 +1,13 @@
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime, timezone
from app.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
name = Column(String(200), nullable=True)
password_hash = Column(String(255), nullable=False)
plan = Column(String(20), default="free")
scan_count_today = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

View File

View File

@@ -0,0 +1,37 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.database import get_db
from app.models.user import User
from app.schemas.auth import RegisterRequest, LoginRequest, TokenResponse, UserResponse
from app.utils.security import hash_password, verify_password, create_access_token, get_current_user
router = APIRouter(prefix="/api/auth", tags=["auth"])
def user_to_dict(user: User) -> dict:
return {"id": user.id, "email": user.email, "name": user.name, "plan": user.plan}
@router.post("/register", response_model=TokenResponse)
async def register(req: RegisterRequest, db: AsyncSession = Depends(get_db)):
existing = await db.execute(select(User).where(User.email == req.email))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email já cadastrado")
user = User(email=req.email, name=req.name or req.email.split("@")[0], password_hash=hash_password(req.password))
db.add(user)
await db.commit()
await db.refresh(user)
token = create_access_token({"sub": str(user.id)})
return TokenResponse(access_token=token, user=UserResponse(**user_to_dict(user)))
@router.post("/login", response_model=TokenResponse)
async def login(req: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == req.email))
user = result.scalar_one_or_none()
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(status_code=401, detail="Email ou senha incorretos")
token = create_access_token({"sub": str(user.id)})
return TokenResponse(access_token=token, user=UserResponse(**user_to_dict(user)))
@router.get("/me")
async def me(user: User = Depends(get_current_user)):
return user_to_dict(user)

View File

@@ -0,0 +1,143 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc
from typing import Optional
import base64
import os
from app.database import get_db
from app.models.user import User
from app.models.document import Document
from app.schemas.document import ScanRequest, DocumentResponse, DocumentListResponse
from app.services.ai_service import analyze_document
from app.utils.security import get_current_user
from app.config import settings
router = APIRouter(prefix="/api/documents", tags=["documents"])
@router.post("/scan", response_model=DocumentResponse)
async def scan_document(
req: ScanRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
# Check scan limit for free users
if user.plan == "free" and user.scan_count_today >= settings.FREE_SCAN_LIMIT:
raise HTTPException(status_code=429, detail="Limite de scans diários atingido. Faça upgrade para Premium.")
# Calculate file size
image_data = req.image
if "," in image_data:
image_data_clean = image_data.split(",", 1)[1]
else:
image_data_clean = image_data
file_size = len(base64.b64decode(image_data_clean))
# AI analysis
try:
result = await analyze_document(req.image)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro na análise IA: {str(e)}")
# Save document
doc = Document(
user_id=user.id,
title=result.get("title", "Documento sem título"),
category=result.get("category", "outro"),
original_image=req.image,
extracted_text=result.get("extracted_text", ""),
summary=result.get("summary", ""),
extracted_data=result.get("extracted_data", {}),
risk_alerts=result.get("risk_alerts", []),
tags=result.get("tags", []),
file_size=file_size
)
db.add(doc)
# Update scan count
user.scan_count_today += 1
await db.commit()
await db.refresh(doc)
return DocumentResponse(
id=doc.id, title=doc.title, category=doc.category,
extracted_text=doc.extracted_text, summary=doc.summary,
extracted_data=doc.extracted_data, risk_alerts=doc.risk_alerts,
tags=doc.tags, file_size=doc.file_size, created_at=doc.created_at
)
@router.get("/", response_model=DocumentListResponse)
async def list_documents(
search: Optional[str] = None,
category: Optional[str] = None,
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
query = select(Document).where(Document.user_id == user.id)
count_query = select(func.count(Document.id)).where(Document.user_id == user.id)
if category:
query = query.where(Document.category == category)
count_query = count_query.where(Document.category == category)
if search:
search_filter = Document.extracted_text.ilike(f"%{search}%")
query = query.where(search_filter)
count_query = count_query.where(search_filter)
total = (await db.execute(count_query)).scalar()
result = await db.execute(query.order_by(desc(Document.created_at)).offset((page-1)*limit).limit(limit))
docs = result.scalars().all()
return DocumentListResponse(
documents=[DocumentResponse(
id=d.id, title=d.title, category=d.category,
extracted_text=d.extracted_text, summary=d.summary,
extracted_data=d.extracted_data, risk_alerts=d.risk_alerts,
tags=d.tags, file_size=d.file_size, created_at=d.created_at
) for d in docs],
total=total
)
@router.get("/{doc_id}", response_model=DocumentResponse)
async def get_document(
doc_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(Document).where(Document.id == doc_id, Document.user_id == user.id))
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Documento não encontrado")
return DocumentResponse(
id=doc.id, title=doc.title, category=doc.category,
extracted_text=doc.extracted_text, summary=doc.summary,
extracted_data=doc.extracted_data, risk_alerts=doc.risk_alerts,
tags=doc.tags, file_size=doc.file_size, created_at=doc.created_at
)
@router.get("/{doc_id}/image")
async def get_document_image(
doc_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(Document).where(Document.id == doc_id, Document.user_id == user.id))
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Documento não encontrado")
return {"image": doc.original_image}
@router.delete("/{doc_id}")
async def delete_document(
doc_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(Document).where(Document.id == doc_id, Document.user_id == user.id))
doc = result.scalar_one_or_none()
if not doc:
raise HTTPException(status_code=404, detail="Documento não encontrado")
await db.delete(doc)
await db.commit()
return {"message": "Documento excluído"}

View File

View File

@@ -0,0 +1,21 @@
from pydantic import BaseModel
from typing import Optional
class RegisterRequest(BaseModel):
email: str
password: str
name: Optional[str] = None
class LoginRequest(BaseModel):
email: str
password: str
class UserResponse(BaseModel):
id: int
email: str
name: Optional[str]
plan: str
class TokenResponse(BaseModel):
access_token: str
user: UserResponse

View File

@@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Optional, List, Any
from datetime import datetime
class ScanRequest(BaseModel):
image: str # base64
class DocumentResponse(BaseModel):
id: int
title: Optional[str]
category: Optional[str]
extracted_text: Optional[str]
summary: Optional[str]
extracted_data: Optional[Any]
risk_alerts: Optional[Any]
tags: Optional[Any]
file_size: Optional[int]
created_at: datetime
class DocumentListResponse(BaseModel):
documents: List[DocumentResponse]
total: int

View File

View File

@@ -0,0 +1,65 @@
import openai
import json
from app.config import settings
client = openai.AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
SYSTEM_PROMPT = """Você é CLIO, assistente de IA especializada em análise de documentos.
Ao receber a imagem de um documento, você deve:
1. Extrair TODO o texto visível (OCR)
2. Identificar a categoria: contrato, nf (nota fiscal), receita (médica), rg, cnh, certidao, boleto, outro
3. Extrair dados estruturados relevantes conforme o tipo:
- CNH: nome, cpf, rg, validade, categoria, registro
- RG: nome, rg, cpf, data_nascimento, naturalidade
- NF: cnpj_emitente, razao_social, valor_total, itens, data_emissao
- Contrato: partes, objeto, valor, prazo, data_assinatura
- Receita: paciente, medico, crm, medicamentos, posologia
- Boleto: beneficiario, valor, vencimento, codigo_barras
- Certidão: tipo, nome, cartorio, data
4. Gerar um resumo em bullets (máx 5 pontos)
5. Identificar alertas de risco (cláusulas abusivas, prazos vencendo, valores suspeitos)
6. Sugerir tags relevantes
Responda SEMPRE em JSON válido com esta estrutura:
{
"title": "título descritivo curto do documento",
"category": "categoria",
"extracted_text": "texto completo extraído",
"extracted_data": { ... dados estruturados ... },
"summary": "• ponto 1\\n• ponto 2\\n• ponto 3",
"risk_alerts": ["alerta 1", "alerta 2"],
"tags": ["tag1", "tag2"]
}"""
async def analyze_document(image_base64: str) -> dict:
"""Analyze a document image using GPT-4o vision."""
# Remove data URL prefix if present
if "," in image_base64:
image_base64 = image_base64.split(",", 1)[1]
response = await client.chat.completions.create(
model=settings.OPENAI_MODEL_VISION,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "text", "text": "Analise este documento. Extraia todas as informações e retorne o JSON estruturado."},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}",
"detail": "high"
}
}
]
}
],
max_tokens=4096,
temperature=0.1,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result

View File

View File

@@ -0,0 +1,42 @@
from passlib.context import CryptContext
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.database import get_db
from app.models.user import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
try:
payload = jwt.decode(credentials.credentials, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Token inválido")
except JWTError:
raise HTTPException(status_code=401, detail="Token inválido")
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=401, detail="Usuário não encontrado")
return user