From c9e68c5048535035d7c86ba9954f8db0b0d0df43 Mon Sep 17 00:00:00 2001 From: Rainbow Date: Sun, 8 Feb 2026 05:13:38 -0300 Subject: [PATCH] feat: add AI APM module for AI/LLM call telemetry - internal/aiapm/types.go: AICallRecord, filter, summary, and stats types - internal/aiapm/pricing.go: vendor pricing tables (Anthropic, OpenAI, Google, Mistral, DeepSeek, Groq) - internal/aiapm/store.go: PostgreSQL storage with batch insert, filtered queries, aggregations, timeseries - internal/aiapm/collector.go: async collector with buffered channel and background batch writer - internal/api/aiapm_handlers.go: Fiber route handlers for ingest, summary, models, vendors, costs, calls, pricing - cmd/server/main.go: register AI APM routes and create ai_calls table at startup --- cmd/server/main.go | 11 ++ internal/aiapm/collector.go | 109 ++++++++++ internal/aiapm/pricing.go | 68 +++++++ internal/aiapm/store.go | 349 +++++++++++++++++++++++++++++++++ internal/aiapm/types.go | 102 ++++++++++ internal/api/aiapm_handlers.go | 142 ++++++++++++++ 6 files changed, 781 insertions(+) create mode 100644 internal/aiapm/collector.go create mode 100644 internal/aiapm/pricing.go create mode 100644 internal/aiapm/store.go create mode 100644 internal/aiapm/types.go create mode 100644 internal/api/aiapm_handlers.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 21952ec..239bb11 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,6 +12,8 @@ import ( "syscall" "time" + "github.com/bigtux/ophion/internal/aiapm" + aiapmapi "github.com/bigtux/ophion/internal/api" "github.com/bigtux/ophion/internal/auth" "github.com/bigtux/ophion/internal/otel" "github.com/bigtux/ophion/internal/security" @@ -107,6 +109,12 @@ func main() { } else { log.Println("✓ Connected to PostgreSQL") initSchema(db) + // Initialize AI APM table + if err := aiapm.CreateTable(db); err != nil { + log.Printf("⚠ Failed to create AI APM table: %v", err) + } else { + log.Println("✓ AI APM table initialized") + } // Create default admin user if err := auth.CreateDefaultAdmin(db); err != nil { log.Printf("⚠ Failed to create default admin: %v", err) @@ -335,6 +343,9 @@ func (s *Server) setupRoutes() { // Dashboard protected.Get("/dashboard/overview", s.getDashboardOverview) + // AI APM routes + aiapmapi.RegisterAIAPMRoutes(protected, s.db) + // User info protected.Get("/me", s.authHandler.Me) diff --git a/internal/aiapm/collector.go b/internal/aiapm/collector.go new file mode 100644 index 0000000..582107c --- /dev/null +++ b/internal/aiapm/collector.go @@ -0,0 +1,109 @@ +package aiapm + +import ( + "database/sql" + "log" + "time" + + "github.com/google/uuid" +) + +// Collector receives AI call records and writes them to the database asynchronously +type Collector struct { + db *sql.DB + ch chan AICallRecord + done chan struct{} +} + +// NewCollector creates a new Collector with a buffered channel and background writer +func NewCollector(db *sql.DB, bufferSize int) *Collector { + if bufferSize <= 0 { + bufferSize = 1000 + } + c := &Collector{ + db: db, + ch: make(chan AICallRecord, bufferSize), + done: make(chan struct{}), + } + go c.backgroundWriter() + return c +} + +// Collect validates and enqueues a record for async storage +func (c *Collector) Collect(r AICallRecord) { + if r.ID == "" { + r.ID = uuid.New().String() + } + if r.Timestamp.IsZero() { + r.Timestamp = time.Now() + } + if r.Status == "" { + r.Status = "success" + } + // Estimate cost if not provided + if r.EstimatedCost == 0 && (r.TokensIn > 0 || r.TokensOut > 0) { + r.EstimatedCost = EstimateCost(r.Vendor, r.Model, r.TokensIn, r.TokensOut, r.TokensCacheRead, r.TokensCacheWrite) + } + + select { + case c.ch <- r: + default: + // Channel full — write synchronously to avoid data loss + if err := InsertCall(c.db, r); err != nil { + log.Printf("ai-apm: sync insert error: %v", err) + } + } +} + +// CollectBatch validates and enqueues multiple records +func (c *Collector) CollectBatch(records []AICallRecord) { + for i := range records { + c.Collect(records[i]) + } +} + +// Stop gracefully stops the background writer +func (c *Collector) Stop() { + close(c.ch) + <-c.done +} + +func (c *Collector) backgroundWriter() { + defer close(c.done) + + batch := make([]AICallRecord, 0, 100) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + if err := InsertCallBatch(c.db, batch); err != nil { + log.Printf("ai-apm: batch insert error (%d records): %v", len(batch), err) + // Fallback: insert one by one + for _, r := range batch { + if err := InsertCall(c.db, r); err != nil { + log.Printf("ai-apm: single insert error: %v", err) + } + } + } + batch = batch[:0] + } + + for { + select { + case r, ok := <-c.ch: + if !ok { + flush() + return + } + batch = append(batch, r) + if len(batch) >= 100 { + flush() + } + case <-ticker.C: + flush() + } + } +} diff --git a/internal/aiapm/pricing.go b/internal/aiapm/pricing.go new file mode 100644 index 0000000..6ee3279 --- /dev/null +++ b/internal/aiapm/pricing.go @@ -0,0 +1,68 @@ +package aiapm + +// ModelPricing holds per-1M-token pricing for a model +type ModelPricing struct { + InputPer1M float64 `json:"input_per_1m"` + OutputPer1M float64 `json:"output_per_1m"` + CacheReadPer1M float64 `json:"cache_read_per_1m"` + CacheWritePer1M float64 `json:"cache_write_per_1m"` +} + +// PricingTable maps "vendor/model" to pricing. Prices in USD per 1M tokens. +var PricingTable = map[string]ModelPricing{ + // Anthropic + "anthropic/claude-opus-4": {InputPer1M: 15.0, OutputPer1M: 75.0, CacheReadPer1M: 1.5, CacheWritePer1M: 18.75}, + "anthropic/claude-sonnet-4": {InputPer1M: 3.0, OutputPer1M: 15.0, CacheReadPer1M: 0.3, CacheWritePer1M: 3.75}, + "anthropic/claude-3.5-sonnet": {InputPer1M: 3.0, OutputPer1M: 15.0, CacheReadPer1M: 0.3, CacheWritePer1M: 3.75}, + "anthropic/claude-3.5-haiku": {InputPer1M: 0.8, OutputPer1M: 4.0, CacheReadPer1M: 0.08, CacheWritePer1M: 1.0}, + "anthropic/claude-3-haiku": {InputPer1M: 0.25, OutputPer1M: 1.25, CacheReadPer1M: 0.03, CacheWritePer1M: 0.3}, + + // OpenAI + "openai/gpt-4o": {InputPer1M: 2.5, OutputPer1M: 10.0, CacheReadPer1M: 1.25, CacheWritePer1M: 2.5}, + "openai/gpt-4o-mini": {InputPer1M: 0.15, OutputPer1M: 0.6, CacheReadPer1M: 0.075, CacheWritePer1M: 0.15}, + "openai/o1": {InputPer1M: 15.0, OutputPer1M: 60.0, CacheReadPer1M: 7.5, CacheWritePer1M: 15.0}, + "openai/o1-mini": {InputPer1M: 3.0, OutputPer1M: 12.0, CacheReadPer1M: 1.5, CacheWritePer1M: 3.0}, + "openai/o3": {InputPer1M: 10.0, OutputPer1M: 40.0, CacheReadPer1M: 5.0, CacheWritePer1M: 10.0}, + "openai/o3-mini": {InputPer1M: 1.1, OutputPer1M: 4.4, CacheReadPer1M: 0.55, CacheWritePer1M: 1.1}, + + // Google + "google/gemini-2.5-pro": {InputPer1M: 1.25, OutputPer1M: 10.0, CacheReadPer1M: 0.315, CacheWritePer1M: 1.25}, + "google/gemini-2.5-flash": {InputPer1M: 0.15, OutputPer1M: 0.6, CacheReadPer1M: 0.0375, CacheWritePer1M: 0.15}, + "google/gemini-2.0-flash": {InputPer1M: 0.1, OutputPer1M: 0.4, CacheReadPer1M: 0.025, CacheWritePer1M: 0.1}, + + // Mistral + "mistral/mistral-large": {InputPer1M: 2.0, OutputPer1M: 6.0, CacheReadPer1M: 2.0, CacheWritePer1M: 2.0}, + "mistral/mistral-small": {InputPer1M: 0.1, OutputPer1M: 0.3, CacheReadPer1M: 0.1, CacheWritePer1M: 0.1}, + "mistral/codestral": {InputPer1M: 0.3, OutputPer1M: 0.9, CacheReadPer1M: 0.3, CacheWritePer1M: 0.3}, + + // DeepSeek + "deepseek/deepseek-chat": {InputPer1M: 0.14, OutputPer1M: 0.28, CacheReadPer1M: 0.014, CacheWritePer1M: 0.14}, + "deepseek/deepseek-reasoner": {InputPer1M: 0.55, OutputPer1M: 2.19, CacheReadPer1M: 0.055, CacheWritePer1M: 0.55}, + + // Groq (hosted models — pricing approximate) + "groq/llama-3.3-70b": {InputPer1M: 0.59, OutputPer1M: 0.79, CacheReadPer1M: 0.59, CacheWritePer1M: 0.59}, + "groq/llama-3.1-8b": {InputPer1M: 0.05, OutputPer1M: 0.08, CacheReadPer1M: 0.05, CacheWritePer1M: 0.05}, + "groq/gemma2-9b": {InputPer1M: 0.2, OutputPer1M: 0.2, CacheReadPer1M: 0.2, CacheWritePer1M: 0.2}, +} + +// EstimateCost calculates the estimated cost in USD for an AI call +func EstimateCost(vendor, model string, tokensIn, tokensOut, cacheRead, cacheWrite int) float64 { + key := vendor + "/" + model + pricing, ok := PricingTable[key] + if !ok { + // Fallback: try just the model name with vendor prefix variations + return 0 + } + + cost := float64(tokensIn) * pricing.InputPer1M / 1_000_000 + cost += float64(tokensOut) * pricing.OutputPer1M / 1_000_000 + cost += float64(cacheRead) * pricing.CacheReadPer1M / 1_000_000 + cost += float64(cacheWrite) * pricing.CacheWritePer1M / 1_000_000 + + return cost +} + +// GetPricingTable returns the full pricing table (for the API endpoint) +func GetPricingTable() map[string]ModelPricing { + return PricingTable +} diff --git a/internal/aiapm/store.go b/internal/aiapm/store.go new file mode 100644 index 0000000..656bec8 --- /dev/null +++ b/internal/aiapm/store.go @@ -0,0 +1,349 @@ +package aiapm + +import ( + "database/sql" + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/uuid" +) + +// CreateTable creates the ai_calls table and indexes +func CreateTable(db *sql.DB) error { + schema := ` + CREATE TABLE IF NOT EXISTS ai_calls ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), + service_name VARCHAR(255) NOT NULL, + project_id VARCHAR(255) NOT NULL DEFAULT '', + vendor VARCHAR(100) NOT NULL, + model VARCHAR(255) NOT NULL, + tokens_in INT NOT NULL DEFAULT 0, + tokens_out INT NOT NULL DEFAULT 0, + tokens_cache_read INT NOT NULL DEFAULT 0, + tokens_cache_write INT NOT NULL DEFAULT 0, + estimated_cost DOUBLE PRECISION NOT NULL DEFAULT 0, + latency_ms INT NOT NULL DEFAULT 0, + ttfb_ms INT NOT NULL DEFAULT 0, + status VARCHAR(20) NOT NULL DEFAULT 'success', + error_message TEXT, + stream BOOLEAN NOT NULL DEFAULT FALSE, + cached BOOLEAN NOT NULL DEFAULT FALSE, + tags JSONB + ); + + CREATE INDEX IF NOT EXISTS idx_ai_calls_timestamp ON ai_calls(timestamp DESC); + CREATE INDEX IF NOT EXISTS idx_ai_calls_service ON ai_calls(service_name); + CREATE INDEX IF NOT EXISTS idx_ai_calls_vendor ON ai_calls(vendor); + CREATE INDEX IF NOT EXISTS idx_ai_calls_model ON ai_calls(model); + CREATE INDEX IF NOT EXISTS idx_ai_calls_project ON ai_calls(project_id); + CREATE INDEX IF NOT EXISTS idx_ai_calls_status ON ai_calls(status); + CREATE INDEX IF NOT EXISTS idx_ai_calls_vendor_model ON ai_calls(vendor, model); + ` + _, err := db.Exec(schema) + return err +} + +// InsertCall inserts a single AI call record +func InsertCall(db *sql.DB, r AICallRecord) error { + if r.ID == "" { + r.ID = uuid.New().String() + } + if r.Timestamp.IsZero() { + r.Timestamp = time.Now() + } + + tags, _ := json.Marshal(r.Tags) + + _, err := db.Exec(` + INSERT INTO ai_calls (id, timestamp, service_name, project_id, vendor, model, + tokens_in, tokens_out, tokens_cache_read, tokens_cache_write, + estimated_cost, latency_ms, ttfb_ms, status, error_message, stream, cached, tags) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18)`, + r.ID, r.Timestamp, r.ServiceName, r.ProjectID, r.Vendor, r.Model, + r.TokensIn, r.TokensOut, r.TokensCacheRead, r.TokensCacheWrite, + r.EstimatedCost, r.LatencyMs, r.TTFBMs, r.Status, r.ErrorMessage, + r.Stream, r.Cached, tags) + return err +} + +// InsertCallBatch inserts multiple AI call records in a single transaction +func InsertCallBatch(db *sql.DB, records []AICallRecord) error { + if len(records) == 0 { + return nil + } + + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + stmt, err := tx.Prepare(` + INSERT INTO ai_calls (id, timestamp, service_name, project_id, vendor, model, + tokens_in, tokens_out, tokens_cache_read, tokens_cache_write, + estimated_cost, latency_ms, ttfb_ms, status, error_message, stream, cached, tags) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18)`) + if err != nil { + return err + } + defer stmt.Close() + + for _, r := range records { + if r.ID == "" { + r.ID = uuid.New().String() + } + if r.Timestamp.IsZero() { + r.Timestamp = time.Now() + } + tags, _ := json.Marshal(r.Tags) + + _, err := stmt.Exec( + r.ID, r.Timestamp, r.ServiceName, r.ProjectID, r.Vendor, r.Model, + r.TokensIn, r.TokensOut, r.TokensCacheRead, r.TokensCacheWrite, + r.EstimatedCost, r.LatencyMs, r.TTFBMs, r.Status, r.ErrorMessage, + r.Stream, r.Cached, tags) + if err != nil { + return err + } + } + + return tx.Commit() +} + +// buildWhereClause constructs WHERE clause from filter +func buildWhereClause(f AICallFilter, startArg int) (string, []any) { + var conditions []string + var args []any + n := startArg + + if !f.From.IsZero() { + conditions = append(conditions, "timestamp >= $"+strconv.Itoa(n)) + args = append(args, f.From) + n++ + } + if !f.To.IsZero() { + conditions = append(conditions, "timestamp <= $"+strconv.Itoa(n)) + args = append(args, f.To) + n++ + } + if f.ServiceName != "" { + conditions = append(conditions, "service_name = $"+strconv.Itoa(n)) + args = append(args, f.ServiceName) + n++ + } + if f.ProjectID != "" { + conditions = append(conditions, "project_id = $"+strconv.Itoa(n)) + args = append(args, f.ProjectID) + n++ + } + if f.Vendor != "" { + conditions = append(conditions, "vendor = $"+strconv.Itoa(n)) + args = append(args, f.Vendor) + n++ + } + if f.Model != "" { + conditions = append(conditions, "model = $"+strconv.Itoa(n)) + args = append(args, f.Model) + n++ + } + if f.Status != "" { + conditions = append(conditions, "status = $"+strconv.Itoa(n)) + args = append(args, f.Status) + n++ + } + + if len(conditions) == 0 { + return "", args + } + return " WHERE " + strings.Join(conditions, " AND "), args +} + +// QueryCalls queries AI call records with filters +func QueryCalls(db *sql.DB, filter AICallFilter) ([]AICallRecord, error) { + where, args := buildWhereClause(filter, 1) + + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + offset := filter.Offset + + q := `SELECT id, timestamp, service_name, project_id, vendor, model, + tokens_in, tokens_out, tokens_cache_read, tokens_cache_write, + estimated_cost, latency_ms, ttfb_ms, status, COALESCE(error_message,''), + stream, cached, COALESCE(tags::text,'{}') + FROM ai_calls` + where + ` ORDER BY timestamp DESC LIMIT ` + + strconv.Itoa(limit) + ` OFFSET ` + strconv.Itoa(offset) + + rows, err := db.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []AICallRecord + for rows.Next() { + var r AICallRecord + var tagsJSON string + if err := rows.Scan(&r.ID, &r.Timestamp, &r.ServiceName, &r.ProjectID, + &r.Vendor, &r.Model, &r.TokensIn, &r.TokensOut, + &r.TokensCacheRead, &r.TokensCacheWrite, &r.EstimatedCost, + &r.LatencyMs, &r.TTFBMs, &r.Status, &r.ErrorMessage, + &r.Stream, &r.Cached, &tagsJSON); err != nil { + continue + } + _ = json.Unmarshal([]byte(tagsJSON), &r.Tags) + records = append(records, r) + } + return records, rows.Err() +} + +// GetUsageSummary returns aggregated usage statistics +func GetUsageSummary(db *sql.DB, filter AICallFilter) (*AIUsageSummary, error) { + where, args := buildWhereClause(filter, 1) + + q := `SELECT + COUNT(*), + COALESCE(SUM(tokens_in),0), + COALESCE(SUM(tokens_out),0), + COALESCE(SUM(tokens_cache_read),0), + COALESCE(SUM(tokens_cache_write),0), + COALESCE(SUM(estimated_cost),0), + COALESCE(AVG(latency_ms),0), + COALESCE(AVG(ttfb_ms),0), + COUNT(*) FILTER (WHERE status = 'error'), + COUNT(DISTINCT model), + COUNT(DISTINCT vendor), + COUNT(DISTINCT service_name) + FROM ai_calls` + where + + s := &AIUsageSummary{} + err := db.QueryRow(q, args...).Scan( + &s.TotalCalls, &s.TotalTokensIn, &s.TotalTokensOut, + &s.TotalCacheRead, &s.TotalCacheWrite, &s.TotalCost, + &s.AvgLatencyMs, &s.AvgTTFBMs, &s.ErrorCount, + &s.UniqueModels, &s.UniqueVendors, &s.UniqueServices) + if err != nil { + return nil, err + } + if s.TotalCalls > 0 { + s.ErrorRate = float64(s.ErrorCount) / float64(s.TotalCalls) + } + // Cache hit rate + var cachedCount int + cq := `SELECT COUNT(*) FILTER (WHERE cached = true) FROM ai_calls` + where + if err := db.QueryRow(cq, args...).Scan(&cachedCount); err == nil && s.TotalCalls > 0 { + s.CacheHitRate = float64(cachedCount) / float64(s.TotalCalls) + } + return s, nil +} + +// GetModelStats returns per-model statistics +func GetModelStats(db *sql.DB, filter AICallFilter) ([]AIModelStats, error) { + where, args := buildWhereClause(filter, 1) + + q := `SELECT vendor, model, COUNT(*), + COALESCE(SUM(tokens_in + tokens_out),0), + COALESCE(SUM(estimated_cost),0), + COALESCE(AVG(latency_ms),0), + COUNT(*) FILTER (WHERE status = 'error') + FROM ai_calls` + where + ` + GROUP BY vendor, model ORDER BY SUM(estimated_cost) DESC` + + rows, err := db.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var stats []AIModelStats + for rows.Next() { + var s AIModelStats + if err := rows.Scan(&s.Vendor, &s.Model, &s.TotalCalls, + &s.TotalTokens, &s.TotalCost, &s.AvgLatencyMs, &s.ErrorCount); err != nil { + continue + } + if s.TotalCalls > 0 { + s.ErrorRate = float64(s.ErrorCount) / float64(s.TotalCalls) + } + stats = append(stats, s) + } + return stats, rows.Err() +} + +// GetVendorStats returns per-vendor statistics +func GetVendorStats(db *sql.DB, filter AICallFilter) ([]AIVendorStats, error) { + where, args := buildWhereClause(filter, 1) + + q := `SELECT vendor, COUNT(*), + COALESCE(SUM(tokens_in + tokens_out),0), + COALESCE(SUM(estimated_cost),0), + COALESCE(AVG(latency_ms),0), + COUNT(DISTINCT model), + COUNT(*) FILTER (WHERE status = 'error') + FROM ai_calls` + where + ` + GROUP BY vendor ORDER BY SUM(estimated_cost) DESC` + + rows, err := db.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var stats []AIVendorStats + for rows.Next() { + var s AIVendorStats + if err := rows.Scan(&s.Vendor, &s.TotalCalls, &s.TotalTokens, + &s.TotalCost, &s.AvgLatencyMs, &s.ModelCount, &s.ErrorCount); err != nil { + continue + } + if s.TotalCalls > 0 { + s.ErrorRate = float64(s.ErrorCount) / float64(s.TotalCalls) + } + stats = append(stats, s) + } + return stats, rows.Err() +} + +// GetCostTimeseries returns cost aggregated over time intervals +func GetCostTimeseries(db *sql.DB, filter AICallFilter, interval string) ([]TimeseriesPoint, error) { + // Validate interval + validIntervals := map[string]bool{"1h": true, "6h": true, "1d": true, "7d": true, "1m": true} + if !validIntervals[interval] { + interval = "1d" + } + + // Map to PostgreSQL interval + pgInterval := map[string]string{ + "1h": "1 hour", "6h": "6 hours", "1d": "1 day", "7d": "7 days", "1m": "1 month", + }[interval] + + where, args := buildWhereClause(filter, 1) + + q := fmt.Sprintf(`SELECT date_trunc('hour', timestamp) - + (EXTRACT(EPOCH FROM date_trunc('hour', timestamp))::int %%%% EXTRACT(EPOCH FROM interval '%s')::int) * interval '1 second' AS bucket, + COALESCE(SUM(estimated_cost),0), + COUNT(*) + FROM ai_calls%s + GROUP BY bucket ORDER BY bucket ASC`, pgInterval, where) + + rows, err := db.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []TimeseriesPoint + for rows.Next() { + var p TimeseriesPoint + if err := rows.Scan(&p.Timestamp, &p.Value, &p.Count); err != nil { + continue + } + points = append(points, p) + } + return points, rows.Err() +} diff --git a/internal/aiapm/types.go b/internal/aiapm/types.go new file mode 100644 index 0000000..4d04614 --- /dev/null +++ b/internal/aiapm/types.go @@ -0,0 +1,102 @@ +package aiapm + +import "time" + +// AICallRecord represents a single AI/LLM API call +type AICallRecord struct { + ID string `json:"id"` + Timestamp time.Time `json:"timestamp"` + ServiceName string `json:"service_name"` + ProjectID string `json:"project_id"` + Vendor string `json:"vendor"` + Model string `json:"model"` + TokensIn int `json:"tokens_in"` + TokensOut int `json:"tokens_out"` + TokensCacheRead int `json:"tokens_cache_read"` + TokensCacheWrite int `json:"tokens_cache_write"` + EstimatedCost float64 `json:"estimated_cost"` + LatencyMs int `json:"latency_ms"` + TTFBMs int `json:"ttfb_ms"` + Status string `json:"status"` + ErrorMessage string `json:"error_message,omitempty"` + Stream bool `json:"stream"` + Cached bool `json:"cached"` + Tags map[string]string `json:"tags,omitempty"` +} + +// AICallFilter defines query filters for AI call records +type AICallFilter struct { + ServiceName string `json:"service_name"` + ProjectID string `json:"project_id"` + Vendor string `json:"vendor"` + Model string `json:"model"` + Status string `json:"status"` + From time.Time `json:"from"` + To time.Time `json:"to"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} + +// AIUsageSummary aggregated usage statistics +type AIUsageSummary struct { + TotalCalls int `json:"total_calls"` + TotalTokensIn int64 `json:"total_tokens_in"` + TotalTokensOut int64 `json:"total_tokens_out"` + TotalCacheRead int64 `json:"total_cache_read"` + TotalCacheWrite int64 `json:"total_cache_write"` + TotalCost float64 `json:"total_cost"` + AvgLatencyMs float64 `json:"avg_latency_ms"` + AvgTTFBMs float64 `json:"avg_ttfb_ms"` + ErrorCount int `json:"error_count"` + ErrorRate float64 `json:"error_rate"` + CacheHitRate float64 `json:"cache_hit_rate"` + UniqueModels int `json:"unique_models"` + UniqueVendors int `json:"unique_vendors"` + UniqueServices int `json:"unique_services"` +} + +// AIModelStats per-model breakdown +type AIModelStats struct { + Vendor string `json:"vendor"` + Model string `json:"model"` + TotalCalls int `json:"total_calls"` + TotalTokens int64 `json:"total_tokens"` + TotalCost float64 `json:"total_cost"` + AvgLatencyMs float64 `json:"avg_latency_ms"` + ErrorCount int `json:"error_count"` + ErrorRate float64 `json:"error_rate"` +} + +// AIVendorStats per-vendor breakdown +type AIVendorStats struct { + Vendor string `json:"vendor"` + TotalCalls int `json:"total_calls"` + TotalTokens int64 `json:"total_tokens"` + TotalCost float64 `json:"total_cost"` + AvgLatencyMs float64 `json:"avg_latency_ms"` + ModelCount int `json:"model_count"` + ErrorCount int `json:"error_count"` + ErrorRate float64 `json:"error_rate"` +} + +// AICostBreakdown cost breakdown by dimension +type AICostBreakdown struct { + Dimension string `json:"dimension"` // vendor, model, service, project + Key string `json:"key"` + Cost float64 `json:"cost"` + Calls int `json:"calls"` + Tokens int64 `json:"tokens"` +} + +// TimeseriesPoint a single point in a time series +type TimeseriesPoint struct { + Timestamp time.Time `json:"timestamp"` + Value float64 `json:"value"` + Count int `json:"count"` +} + +// IngestRequest payload for the ingest endpoint +type IngestRequest struct { + Call *AICallRecord `json:"call,omitempty"` + Calls []AICallRecord `json:"calls,omitempty"` +} diff --git a/internal/api/aiapm_handlers.go b/internal/api/aiapm_handlers.go new file mode 100644 index 0000000..ed7218c --- /dev/null +++ b/internal/api/aiapm_handlers.go @@ -0,0 +1,142 @@ +package api + +import ( + "database/sql" + "time" + + "github.com/bigtux/ophion/internal/aiapm" + "github.com/gofiber/fiber/v2" +) + +// AIAPMHandlers holds dependencies for AI APM route handlers +type AIAPMHandlers struct { + db *sql.DB + collector *aiapm.Collector +} + +// RegisterAIAPMRoutes registers all AI APM routes on the given router +func RegisterAIAPMRoutes(api fiber.Router, db *sql.DB) *aiapm.Collector { + collector := aiapm.NewCollector(db, 5000) + h := &AIAPMHandlers{db: db, collector: collector} + + g := api.Group("/ai-apm") + g.Post("/ingest", h.Ingest) + g.Get("/summary", h.Summary) + g.Get("/models", h.Models) + g.Get("/vendors", h.Vendors) + g.Get("/costs", h.Costs) + g.Get("/calls", h.Calls) + g.Get("/pricing", h.Pricing) + + return collector +} + +// Ingest receives AI call records (single or batch) +func (h *AIAPMHandlers) Ingest(c *fiber.Ctx) error { + var req aiapm.IngestRequest + if err := c.BodyParser(&req); err != nil { + return c.Status(400).JSON(fiber.Map{"error": "invalid request body: " + err.Error()}) + } + + count := 0 + if req.Call != nil { + h.collector.Collect(*req.Call) + count = 1 + } + if len(req.Calls) > 0 { + h.collector.CollectBatch(req.Calls) + count += len(req.Calls) + } + + if count == 0 { + return c.Status(400).JSON(fiber.Map{"error": "no call records provided; use 'call' or 'calls' field"}) + } + + return c.JSON(fiber.Map{"status": "accepted", "count": count}) +} + +// parseFilter extracts common filter parameters from query string +func parseFilter(c *fiber.Ctx) aiapm.AICallFilter { + f := aiapm.AICallFilter{ + ServiceName: c.Query("service"), + ProjectID: c.Query("project"), + Vendor: c.Query("vendor"), + Model: c.Query("model"), + Status: c.Query("status"), + } + if from := c.Query("from"); from != "" { + if t, err := time.Parse(time.RFC3339, from); err == nil { + f.From = t + } + } + if to := c.Query("to"); to != "" { + if t, err := time.Parse(time.RFC3339, to); err == nil { + f.To = t + } + } + if f.From.IsZero() { + f.From = time.Now().Add(-24 * time.Hour) + } + if f.To.IsZero() { + f.To = time.Now() + } + f.Limit = c.QueryInt("limit", 100) + f.Offset = c.QueryInt("offset", 0) + return f +} + +// Summary returns aggregated usage statistics +func (h *AIAPMHandlers) Summary(c *fiber.Ctx) error { + filter := parseFilter(c) + summary, err := aiapm.GetUsageSummary(h.db, filter) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(summary) +} + +// Models returns per-model statistics +func (h *AIAPMHandlers) Models(c *fiber.Ctx) error { + filter := parseFilter(c) + stats, err := aiapm.GetModelStats(h.db, filter) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(fiber.Map{"models": stats}) +} + +// Vendors returns per-vendor statistics +func (h *AIAPMHandlers) Vendors(c *fiber.Ctx) error { + filter := parseFilter(c) + stats, err := aiapm.GetVendorStats(h.db, filter) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(fiber.Map{"vendors": stats}) +} + +// Costs returns cost timeseries data +func (h *AIAPMHandlers) Costs(c *fiber.Ctx) error { + filter := parseFilter(c) + interval := c.Query("interval", "1d") + points, err := aiapm.GetCostTimeseries(h.db, filter, interval) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(fiber.Map{"timeseries": points, "interval": interval}) +} + +// Calls returns recent AI call records (paginated) +func (h *AIAPMHandlers) Calls(c *fiber.Ctx) error { + filter := parseFilter(c) + calls, err := aiapm.QueryCalls(h.db, filter) + if err != nil { + return c.Status(500).JSON(fiber.Map{"error": err.Error()}) + } + return c.JSON(fiber.Map{"calls": calls, "count": len(calls)}) +} + +// Pricing returns the current pricing table +func (h *AIAPMHandlers) Pricing(c *fiber.Ctx) error { + return c.JSON(fiber.Map{"pricing": aiapm.GetPricingTable()}) +}