- internal/aiapm/types.go: AICallRecord, filter, summary, and stats types - internal/aiapm/pricing.go: vendor pricing tables (Anthropic, OpenAI, Google, Mistral, DeepSeek, Groq) - internal/aiapm/store.go: PostgreSQL storage with batch insert, filtered queries, aggregations, timeseries - internal/aiapm/collector.go: async collector with buffered channel and background batch writer - internal/api/aiapm_handlers.go: Fiber route handlers for ingest, summary, models, vendors, costs, calls, pricing - cmd/server/main.go: register AI APM routes and create ai_calls table at startup
110 lines
2.3 KiB
Go
110 lines
2.3 KiB
Go
package aiapm
|
|
|
|
import (
|
|
"database/sql"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Collector receives AI call records and writes them to the database asynchronously
|
|
type Collector struct {
|
|
db *sql.DB
|
|
ch chan AICallRecord
|
|
done chan struct{}
|
|
}
|
|
|
|
// NewCollector creates a new Collector with a buffered channel and background writer
|
|
func NewCollector(db *sql.DB, bufferSize int) *Collector {
|
|
if bufferSize <= 0 {
|
|
bufferSize = 1000
|
|
}
|
|
c := &Collector{
|
|
db: db,
|
|
ch: make(chan AICallRecord, bufferSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
go c.backgroundWriter()
|
|
return c
|
|
}
|
|
|
|
// Collect validates and enqueues a record for async storage
|
|
func (c *Collector) Collect(r AICallRecord) {
|
|
if r.ID == "" {
|
|
r.ID = uuid.New().String()
|
|
}
|
|
if r.Timestamp.IsZero() {
|
|
r.Timestamp = time.Now()
|
|
}
|
|
if r.Status == "" {
|
|
r.Status = "success"
|
|
}
|
|
// Estimate cost if not provided
|
|
if r.EstimatedCost == 0 && (r.TokensIn > 0 || r.TokensOut > 0) {
|
|
r.EstimatedCost = EstimateCost(r.Vendor, r.Model, r.TokensIn, r.TokensOut, r.TokensCacheRead, r.TokensCacheWrite)
|
|
}
|
|
|
|
select {
|
|
case c.ch <- r:
|
|
default:
|
|
// Channel full — write synchronously to avoid data loss
|
|
if err := InsertCall(c.db, r); err != nil {
|
|
log.Printf("ai-apm: sync insert error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// CollectBatch validates and enqueues multiple records
|
|
func (c *Collector) CollectBatch(records []AICallRecord) {
|
|
for i := range records {
|
|
c.Collect(records[i])
|
|
}
|
|
}
|
|
|
|
// Stop gracefully stops the background writer
|
|
func (c *Collector) Stop() {
|
|
close(c.ch)
|
|
<-c.done
|
|
}
|
|
|
|
func (c *Collector) backgroundWriter() {
|
|
defer close(c.done)
|
|
|
|
batch := make([]AICallRecord, 0, 100)
|
|
ticker := time.NewTicker(500 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
flush := func() {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
if err := InsertCallBatch(c.db, batch); err != nil {
|
|
log.Printf("ai-apm: batch insert error (%d records): %v", len(batch), err)
|
|
// Fallback: insert one by one
|
|
for _, r := range batch {
|
|
if err := InsertCall(c.db, r); err != nil {
|
|
log.Printf("ai-apm: single insert error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case r, ok := <-c.ch:
|
|
if !ok {
|
|
flush()
|
|
return
|
|
}
|
|
batch = append(batch, r)
|
|
if len(batch) >= 100 {
|
|
flush()
|
|
}
|
|
case <-ticker.C:
|
|
flush()
|
|
}
|
|
}
|
|
}
|