Files
ophion/internal/otel/otlp_receiver.go
bigtux 771cf6cf50 feat: Add OpenTelemetry OTLP HTTP receiver
- Add POST /v1/traces endpoint for OTLP JSON trace ingestion
- Convert OTLP spans to internal format and save to PostgreSQL
- Manual JSON parsing (no Go 1.24 dependencies)
- Add Node.js instrumentation example with Express
- Add Python instrumentation example with Flask
- Auto-instrumentation support for both languages
2026-02-06 14:59:29 -03:00

394 lines
12 KiB
Go

package otel
import (
"database/sql"
"encoding/hex"
"encoding/json"
"log"
"strings"
"time"
"github.com/gofiber/fiber/v2"
)
// ═══════════════════════════════════════════════════════════
// 🔭 OTLP HTTP Receiver - OpenTelemetry Protocol Support
// ═══════════════════════════════════════════════════════════
//
// Implements OTLP HTTP JSON receiver for traces
// Based on: https://opentelemetry.io/docs/specs/otlp/#otlphttp
//
// No external dependencies - manual JSON parsing
// OTLPReceiver handles OTLP HTTP requests
type OTLPReceiver struct {
db *sql.DB
}
// NewOTLPReceiver creates a new OTLP receiver
func NewOTLPReceiver(db *sql.DB) *OTLPReceiver {
return &OTLPReceiver{db: db}
}
// ─────────────────────────────────────────────────────────────
// OTLP JSON Structures (manual parsing, no protobuf)
// ─────────────────────────────────────────────────────────────
// ExportTraceServiceRequest is the top-level OTLP trace request
type ExportTraceServiceRequest struct {
ResourceSpans []ResourceSpans `json:"resourceSpans"`
}
// ResourceSpans groups spans by resource
type ResourceSpans struct {
Resource Resource `json:"resource"`
ScopeSpans []ScopeSpans `json:"scopeSpans"`
}
// Resource describes the source of telemetry
type Resource struct {
Attributes []KeyValue `json:"attributes"`
}
// ScopeSpans groups spans by instrumentation scope
type ScopeSpans struct {
Scope InstrumentationScope `json:"scope"`
Spans []OTLPSpan `json:"spans"`
}
// InstrumentationScope identifies the instrumentation library
type InstrumentationScope struct {
Name string `json:"name"`
Version string `json:"version"`
}
// OTLPSpan represents a span in OTLP format
type OTLPSpan struct {
TraceID string `json:"traceId"`
SpanID string `json:"spanId"`
ParentSpanID string `json:"parentSpanId,omitempty"`
Name string `json:"name"`
Kind int `json:"kind"`
StartTimeUnixNano string `json:"startTimeUnixNano"`
EndTimeUnixNano string `json:"endTimeUnixNano"`
Attributes []KeyValue `json:"attributes,omitempty"`
Status SpanStatus `json:"status,omitempty"`
Events []Event `json:"events,omitempty"`
}
// SpanStatus represents the status of a span
type SpanStatus struct {
Code int `json:"code"`
Message string `json:"message,omitempty"`
}
// KeyValue is an OTLP key-value pair
type KeyValue struct {
Key string `json:"key"`
Value AnyValue `json:"value"`
}
// AnyValue can hold different types of values
type AnyValue struct {
StringValue string `json:"stringValue,omitempty"`
IntValue string `json:"intValue,omitempty"`
DoubleValue float64 `json:"doubleValue,omitempty"`
BoolValue bool `json:"boolValue,omitempty"`
ArrayValue *ArrayValue `json:"arrayValue,omitempty"`
}
// ArrayValue holds an array of values
type ArrayValue struct {
Values []AnyValue `json:"values"`
}
// Event represents a span event
type Event struct {
Name string `json:"name"`
TimeUnixNano string `json:"timeUnixNano"`
Attributes []KeyValue `json:"attributes,omitempty"`
}
// ─────────────────────────────────────────────────────────────
// Internal Span Format (matches existing Ophion format)
// ─────────────────────────────────────────────────────────────
type InternalSpan struct {
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
ParentSpanID string `json:"parent_span_id,omitempty"`
Service string `json:"service"`
Operation string `json:"operation"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
DurationNs int64 `json:"duration_ns"`
StatusCode string `json:"status_code"`
StatusMsg string `json:"status_message,omitempty"`
Kind string `json:"kind"`
Attributes map[string]any `json:"attributes,omitempty"`
}
// ─────────────────────────────────────────────────────────────
// HTTP Handler
// ─────────────────────────────────────────────────────────────
// HandleTraces handles POST /v1/traces for OTLP trace ingestion
func (r *OTLPReceiver) HandleTraces(c *fiber.Ctx) error {
contentType := c.Get("Content-Type")
// OTLP supports both JSON and protobuf, we only support JSON
if !strings.Contains(contentType, "application/json") && contentType != "" {
return c.Status(415).JSON(fiber.Map{
"error": "Unsupported media type. Use application/json",
})
}
var req ExportTraceServiceRequest
if err := c.BodyParser(&req); err != nil {
log.Printf("OTLP parse error: %v", err)
return c.Status(400).JSON(fiber.Map{
"error": "Failed to parse OTLP request: " + err.Error(),
})
}
spans := r.convertOTLPToInternal(req)
if len(spans) == 0 {
return c.JSON(fiber.Map{
"partialSuccess": fiber.Map{
"rejectedSpans": 0,
},
})
}
// Save to database
savedCount := 0
for _, sp := range spans {
attrs, _ := json.Marshal(sp.Attributes)
_, err := r.db.Exec(`
INSERT INTO spans (trace_id, span_id, parent_span_id, service, operation, start_time, end_time, duration_ns, status_code, status_message, kind, attributes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
sp.TraceID, sp.SpanID, sp.ParentSpanID, sp.Service, sp.Operation, sp.StartTime, sp.EndTime, sp.DurationNs, sp.StatusCode, sp.StatusMsg, sp.Kind, attrs)
if err != nil {
log.Printf("Error inserting OTLP span: %v", err)
} else {
savedCount++
}
}
log.Printf("📡 OTLP: Received %d spans, saved %d", len(spans), savedCount)
// OTLP response format
return c.JSON(fiber.Map{
"partialSuccess": fiber.Map{
"rejectedSpans": len(spans) - savedCount,
},
})
}
// ─────────────────────────────────────────────────────────────
// Conversion Functions
// ─────────────────────────────────────────────────────────────
func (r *OTLPReceiver) convertOTLPToInternal(req ExportTraceServiceRequest) []InternalSpan {
var result []InternalSpan
for _, rs := range req.ResourceSpans {
serviceName := extractServiceName(rs.Resource)
for _, ss := range rs.ScopeSpans {
for _, span := range ss.Spans {
internal := InternalSpan{
TraceID: normalizeTraceID(span.TraceID),
SpanID: normalizeSpanID(span.SpanID),
ParentSpanID: normalizeSpanID(span.ParentSpanID),
Service: serviceName,
Operation: span.Name,
StartTime: parseNanoTimestamp(span.StartTimeUnixNano),
EndTime: parseNanoTimestamp(span.EndTimeUnixNano),
StatusCode: convertStatusCode(span.Status.Code),
StatusMsg: span.Status.Message,
Kind: convertSpanKind(span.Kind),
Attributes: convertAttributes(span.Attributes),
}
// Calculate duration
internal.DurationNs = internal.EndTime.Sub(internal.StartTime).Nanoseconds()
// Add scope info to attributes
if ss.Scope.Name != "" {
if internal.Attributes == nil {
internal.Attributes = make(map[string]any)
}
internal.Attributes["otel.library.name"] = ss.Scope.Name
if ss.Scope.Version != "" {
internal.Attributes["otel.library.version"] = ss.Scope.Version
}
}
// Add events as attribute if present
if len(span.Events) > 0 {
if internal.Attributes == nil {
internal.Attributes = make(map[string]any)
}
internal.Attributes["span.events"] = convertEvents(span.Events)
}
result = append(result, internal)
}
}
}
return result
}
// extractServiceName gets service.name from resource attributes
func extractServiceName(resource Resource) string {
for _, attr := range resource.Attributes {
if attr.Key == "service.name" {
return getStringValue(attr.Value)
}
}
return "unknown"
}
// normalizeTraceID converts trace ID to consistent format (32 hex chars)
func normalizeTraceID(id string) string {
if id == "" {
return ""
}
// If it's base64, decode it
if len(id) > 32 {
if decoded, err := hex.DecodeString(id); err == nil {
return hex.EncodeToString(decoded)
}
}
// Remove any non-hex characters and ensure lowercase
return strings.ToLower(strings.TrimSpace(id))
}
// normalizeSpanID converts span ID to consistent format (16 hex chars)
func normalizeSpanID(id string) string {
if id == "" {
return ""
}
return strings.ToLower(strings.TrimSpace(id))
}
// parseNanoTimestamp converts nanosecond timestamp string to time.Time
func parseNanoTimestamp(ns string) time.Time {
if ns == "" {
return time.Time{}
}
// Parse as int64 - try direct string parsing first
var nanos int64
for _, c := range ns {
if c >= '0' && c <= '9' {
nanos = nanos*10 + int64(c-'0')
}
}
if nanos == 0 {
// Fallback: try JSON unmarshal
json.Unmarshal([]byte(ns), &nanos)
}
return time.Unix(0, nanos)
}
// convertStatusCode converts OTLP status code to string
func convertStatusCode(code int) string {
switch code {
case 0:
return "UNSET"
case 1:
return "OK"
case 2:
return "ERROR"
default:
return "UNSET"
}
}
// convertSpanKind converts OTLP span kind to string
func convertSpanKind(kind int) string {
switch kind {
case 0:
return "UNSPECIFIED"
case 1:
return "INTERNAL"
case 2:
return "SERVER"
case 3:
return "CLIENT"
case 4:
return "PRODUCER"
case 5:
return "CONSUMER"
default:
return "INTERNAL"
}
}
// convertAttributes converts OTLP KeyValue array to map
func convertAttributes(attrs []KeyValue) map[string]any {
if len(attrs) == 0 {
return nil
}
result := make(map[string]any, len(attrs))
for _, attr := range attrs {
result[attr.Key] = getAnyValue(attr.Value)
}
return result
}
// getStringValue extracts string value from AnyValue
func getStringValue(v AnyValue) string {
if v.StringValue != "" {
return v.StringValue
}
if v.IntValue != "" {
return v.IntValue
}
return ""
}
// getAnyValue extracts the actual value from AnyValue
func getAnyValue(v AnyValue) any {
if v.StringValue != "" {
return v.StringValue
}
if v.IntValue != "" {
return v.IntValue
}
if v.DoubleValue != 0 {
return v.DoubleValue
}
if v.BoolValue {
return v.BoolValue
}
if v.ArrayValue != nil {
var arr []any
for _, av := range v.ArrayValue.Values {
arr = append(arr, getAnyValue(av))
}
return arr
}
return nil
}
// convertEvents converts OTLP events to a serializable format
func convertEvents(events []Event) []map[string]any {
var result []map[string]any
for _, e := range events {
ev := map[string]any{
"name": e.Name,
"time": parseNanoTimestamp(e.TimeUnixNano),
}
if len(e.Attributes) > 0 {
ev["attributes"] = convertAttributes(e.Attributes)
}
result = append(result, ev)
}
return result
}