Files
ophion/internal/aiapm/collector.go
Rainbow c9e68c5048 feat: add AI APM module for AI/LLM call telemetry
- internal/aiapm/types.go: AICallRecord, filter, summary, and stats types
- internal/aiapm/pricing.go: vendor pricing tables (Anthropic, OpenAI, Google, Mistral, DeepSeek, Groq)
- internal/aiapm/store.go: PostgreSQL storage with batch insert, filtered queries, aggregations, timeseries
- internal/aiapm/collector.go: async collector with buffered channel and background batch writer
- internal/api/aiapm_handlers.go: Fiber route handlers for ingest, summary, models, vendors, costs, calls, pricing
- cmd/server/main.go: register AI APM routes and create ai_calls table at startup
2026-02-08 05:13:38 -03:00

110 lines
2.3 KiB
Go

package aiapm
import (
"database/sql"
"log"
"time"
"github.com/google/uuid"
)
// Collector receives AI call records and writes them to the database asynchronously
type Collector struct {
db *sql.DB
ch chan AICallRecord
done chan struct{}
}
// NewCollector creates a new Collector with a buffered channel and background writer
func NewCollector(db *sql.DB, bufferSize int) *Collector {
if bufferSize <= 0 {
bufferSize = 1000
}
c := &Collector{
db: db,
ch: make(chan AICallRecord, bufferSize),
done: make(chan struct{}),
}
go c.backgroundWriter()
return c
}
// Collect validates and enqueues a record for async storage
func (c *Collector) Collect(r AICallRecord) {
if r.ID == "" {
r.ID = uuid.New().String()
}
if r.Timestamp.IsZero() {
r.Timestamp = time.Now()
}
if r.Status == "" {
r.Status = "success"
}
// Estimate cost if not provided
if r.EstimatedCost == 0 && (r.TokensIn > 0 || r.TokensOut > 0) {
r.EstimatedCost = EstimateCost(r.Vendor, r.Model, r.TokensIn, r.TokensOut, r.TokensCacheRead, r.TokensCacheWrite)
}
select {
case c.ch <- r:
default:
// Channel full — write synchronously to avoid data loss
if err := InsertCall(c.db, r); err != nil {
log.Printf("ai-apm: sync insert error: %v", err)
}
}
}
// CollectBatch validates and enqueues multiple records
func (c *Collector) CollectBatch(records []AICallRecord) {
for i := range records {
c.Collect(records[i])
}
}
// Stop gracefully stops the background writer
func (c *Collector) Stop() {
close(c.ch)
<-c.done
}
func (c *Collector) backgroundWriter() {
defer close(c.done)
batch := make([]AICallRecord, 0, 100)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
if err := InsertCallBatch(c.db, batch); err != nil {
log.Printf("ai-apm: batch insert error (%d records): %v", len(batch), err)
// Fallback: insert one by one
for _, r := range batch {
if err := InsertCall(c.db, r); err != nil {
log.Printf("ai-apm: single insert error: %v", err)
}
}
}
batch = batch[:0]
}
for {
select {
case r, ok := <-c.ch:
if !ok {
flush()
return
}
batch = append(batch, r)
if len(batch) >= 100 {
flush()
}
case <-ticker.C:
flush()
}
}
}