package otel import ( "database/sql" "encoding/hex" "encoding/json" "log" "strings" "time" "github.com/gofiber/fiber/v2" ) // ═══════════════════════════════════════════════════════════ // 🔭 OTLP HTTP Receiver - OpenTelemetry Protocol Support // ═══════════════════════════════════════════════════════════ // // Implements OTLP HTTP JSON receiver for traces // Based on: https://opentelemetry.io/docs/specs/otlp/#otlphttp // // No external dependencies - manual JSON parsing // OTLPReceiver handles OTLP HTTP requests type OTLPReceiver struct { db *sql.DB } // NewOTLPReceiver creates a new OTLP receiver func NewOTLPReceiver(db *sql.DB) *OTLPReceiver { return &OTLPReceiver{db: db} } // ───────────────────────────────────────────────────────────── // OTLP JSON Structures (manual parsing, no protobuf) // ───────────────────────────────────────────────────────────── // ExportTraceServiceRequest is the top-level OTLP trace request type ExportTraceServiceRequest struct { ResourceSpans []ResourceSpans `json:"resourceSpans"` } // ResourceSpans groups spans by resource type ResourceSpans struct { Resource Resource `json:"resource"` ScopeSpans []ScopeSpans `json:"scopeSpans"` } // Resource describes the source of telemetry type Resource struct { Attributes []KeyValue `json:"attributes"` } // ScopeSpans groups spans by instrumentation scope type ScopeSpans struct { Scope InstrumentationScope `json:"scope"` Spans []OTLPSpan `json:"spans"` } // InstrumentationScope identifies the instrumentation library type InstrumentationScope struct { Name string `json:"name"` Version string `json:"version"` } // OTLPSpan represents a span in OTLP format type OTLPSpan struct { TraceID string `json:"traceId"` SpanID string `json:"spanId"` ParentSpanID string `json:"parentSpanId,omitempty"` Name string `json:"name"` Kind int `json:"kind"` StartTimeUnixNano string `json:"startTimeUnixNano"` EndTimeUnixNano string `json:"endTimeUnixNano"` Attributes []KeyValue `json:"attributes,omitempty"` Status SpanStatus `json:"status,omitempty"` Events []Event `json:"events,omitempty"` } // SpanStatus represents the status of a span type SpanStatus struct { Code int `json:"code"` Message string `json:"message,omitempty"` } // KeyValue is an OTLP key-value pair type KeyValue struct { Key string `json:"key"` Value AnyValue `json:"value"` } // AnyValue can hold different types of values type AnyValue struct { StringValue string `json:"stringValue,omitempty"` IntValue string `json:"intValue,omitempty"` DoubleValue float64 `json:"doubleValue,omitempty"` BoolValue bool `json:"boolValue,omitempty"` ArrayValue *ArrayValue `json:"arrayValue,omitempty"` } // ArrayValue holds an array of values type ArrayValue struct { Values []AnyValue `json:"values"` } // Event represents a span event type Event struct { Name string `json:"name"` TimeUnixNano string `json:"timeUnixNano"` Attributes []KeyValue `json:"attributes,omitempty"` } // ───────────────────────────────────────────────────────────── // Internal Span Format (matches existing Ophion format) // ───────────────────────────────────────────────────────────── type InternalSpan struct { TraceID string `json:"trace_id"` SpanID string `json:"span_id"` ParentSpanID string `json:"parent_span_id,omitempty"` Service string `json:"service"` Operation string `json:"operation"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` DurationNs int64 `json:"duration_ns"` StatusCode string `json:"status_code"` StatusMsg string `json:"status_message,omitempty"` Kind string `json:"kind"` Attributes map[string]any `json:"attributes,omitempty"` } // ───────────────────────────────────────────────────────────── // HTTP Handler // ───────────────────────────────────────────────────────────── // HandleTraces handles POST /v1/traces for OTLP trace ingestion func (r *OTLPReceiver) HandleTraces(c *fiber.Ctx) error { contentType := c.Get("Content-Type") // OTLP supports both JSON and protobuf, we only support JSON if !strings.Contains(contentType, "application/json") && contentType != "" { return c.Status(415).JSON(fiber.Map{ "error": "Unsupported media type. Use application/json", }) } var req ExportTraceServiceRequest if err := c.BodyParser(&req); err != nil { log.Printf("OTLP parse error: %v", err) return c.Status(400).JSON(fiber.Map{ "error": "Failed to parse OTLP request: " + err.Error(), }) } spans := r.convertOTLPToInternal(req) if len(spans) == 0 { return c.JSON(fiber.Map{ "partialSuccess": fiber.Map{ "rejectedSpans": 0, }, }) } // Save to database savedCount := 0 for _, sp := range spans { attrs, _ := json.Marshal(sp.Attributes) _, err := r.db.Exec(` INSERT INTO spans (trace_id, span_id, parent_span_id, service, operation, start_time, end_time, duration_ns, status_code, status_message, kind, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, sp.TraceID, sp.SpanID, sp.ParentSpanID, sp.Service, sp.Operation, sp.StartTime, sp.EndTime, sp.DurationNs, sp.StatusCode, sp.StatusMsg, sp.Kind, attrs) if err != nil { log.Printf("Error inserting OTLP span: %v", err) } else { savedCount++ } } log.Printf("📡 OTLP: Received %d spans, saved %d", len(spans), savedCount) // OTLP response format return c.JSON(fiber.Map{ "partialSuccess": fiber.Map{ "rejectedSpans": len(spans) - savedCount, }, }) } // ───────────────────────────────────────────────────────────── // Conversion Functions // ───────────────────────────────────────────────────────────── func (r *OTLPReceiver) convertOTLPToInternal(req ExportTraceServiceRequest) []InternalSpan { var result []InternalSpan for _, rs := range req.ResourceSpans { serviceName := extractServiceName(rs.Resource) for _, ss := range rs.ScopeSpans { for _, span := range ss.Spans { internal := InternalSpan{ TraceID: normalizeTraceID(span.TraceID), SpanID: normalizeSpanID(span.SpanID), ParentSpanID: normalizeSpanID(span.ParentSpanID), Service: serviceName, Operation: span.Name, StartTime: parseNanoTimestamp(span.StartTimeUnixNano), EndTime: parseNanoTimestamp(span.EndTimeUnixNano), StatusCode: convertStatusCode(span.Status.Code), StatusMsg: span.Status.Message, Kind: convertSpanKind(span.Kind), Attributes: convertAttributes(span.Attributes), } // Calculate duration internal.DurationNs = internal.EndTime.Sub(internal.StartTime).Nanoseconds() // Add scope info to attributes if ss.Scope.Name != "" { if internal.Attributes == nil { internal.Attributes = make(map[string]any) } internal.Attributes["otel.library.name"] = ss.Scope.Name if ss.Scope.Version != "" { internal.Attributes["otel.library.version"] = ss.Scope.Version } } // Add events as attribute if present if len(span.Events) > 0 { if internal.Attributes == nil { internal.Attributes = make(map[string]any) } internal.Attributes["span.events"] = convertEvents(span.Events) } result = append(result, internal) } } } return result } // extractServiceName gets service.name from resource attributes func extractServiceName(resource Resource) string { for _, attr := range resource.Attributes { if attr.Key == "service.name" { return getStringValue(attr.Value) } } return "unknown" } // normalizeTraceID converts trace ID to consistent format (32 hex chars) func normalizeTraceID(id string) string { if id == "" { return "" } // If it's base64, decode it if len(id) > 32 { if decoded, err := hex.DecodeString(id); err == nil { return hex.EncodeToString(decoded) } } // Remove any non-hex characters and ensure lowercase return strings.ToLower(strings.TrimSpace(id)) } // normalizeSpanID converts span ID to consistent format (16 hex chars) func normalizeSpanID(id string) string { if id == "" { return "" } return strings.ToLower(strings.TrimSpace(id)) } // parseNanoTimestamp converts nanosecond timestamp string to time.Time func parseNanoTimestamp(ns string) time.Time { if ns == "" { return time.Time{} } // Parse as int64 - try direct string parsing first var nanos int64 for _, c := range ns { if c >= '0' && c <= '9' { nanos = nanos*10 + int64(c-'0') } } if nanos == 0 { // Fallback: try JSON unmarshal json.Unmarshal([]byte(ns), &nanos) } return time.Unix(0, nanos) } // convertStatusCode converts OTLP status code to string func convertStatusCode(code int) string { switch code { case 0: return "UNSET" case 1: return "OK" case 2: return "ERROR" default: return "UNSET" } } // convertSpanKind converts OTLP span kind to string func convertSpanKind(kind int) string { switch kind { case 0: return "UNSPECIFIED" case 1: return "INTERNAL" case 2: return "SERVER" case 3: return "CLIENT" case 4: return "PRODUCER" case 5: return "CONSUMER" default: return "INTERNAL" } } // convertAttributes converts OTLP KeyValue array to map func convertAttributes(attrs []KeyValue) map[string]any { if len(attrs) == 0 { return nil } result := make(map[string]any, len(attrs)) for _, attr := range attrs { result[attr.Key] = getAnyValue(attr.Value) } return result } // getStringValue extracts string value from AnyValue func getStringValue(v AnyValue) string { if v.StringValue != "" { return v.StringValue } if v.IntValue != "" { return v.IntValue } return "" } // getAnyValue extracts the actual value from AnyValue func getAnyValue(v AnyValue) any { if v.StringValue != "" { return v.StringValue } if v.IntValue != "" { return v.IntValue } if v.DoubleValue != 0 { return v.DoubleValue } if v.BoolValue { return v.BoolValue } if v.ArrayValue != nil { var arr []any for _, av := range v.ArrayValue.Values { arr = append(arr, getAnyValue(av)) } return arr } return nil } // convertEvents converts OTLP events to a serializable format func convertEvents(events []Event) []map[string]any { var result []map[string]any for _, e := range events { ev := map[string]any{ "name": e.Name, "time": parseNanoTimestamp(e.TimeUnixNano), } if len(e.Attributes) > 0 { ev["attributes"] = convertAttributes(e.Attributes) } result = append(result, ev) } return result }