- Add POST /v1/traces endpoint for OTLP JSON trace ingestion - Convert OTLP spans to internal format and save to PostgreSQL - Manual JSON parsing (no Go 1.24 dependencies) - Add Node.js instrumentation example with Express - Add Python instrumentation example with Flask - Auto-instrumentation support for both languages
394 lines
12 KiB
Go
394 lines
12 KiB
Go
package otel
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
)
|
|
|
|
// ═══════════════════════════════════════════════════════════
|
|
// 🔭 OTLP HTTP Receiver - OpenTelemetry Protocol Support
|
|
// ═══════════════════════════════════════════════════════════
|
|
//
|
|
// Implements OTLP HTTP JSON receiver for traces
|
|
// Based on: https://opentelemetry.io/docs/specs/otlp/#otlphttp
|
|
//
|
|
// No external dependencies - manual JSON parsing
|
|
|
|
// OTLPReceiver handles OTLP HTTP requests
|
|
type OTLPReceiver struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewOTLPReceiver creates a new OTLP receiver
|
|
func NewOTLPReceiver(db *sql.DB) *OTLPReceiver {
|
|
return &OTLPReceiver{db: db}
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
// OTLP JSON Structures (manual parsing, no protobuf)
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
// ExportTraceServiceRequest is the top-level OTLP trace request
|
|
type ExportTraceServiceRequest struct {
|
|
ResourceSpans []ResourceSpans `json:"resourceSpans"`
|
|
}
|
|
|
|
// ResourceSpans groups spans by resource
|
|
type ResourceSpans struct {
|
|
Resource Resource `json:"resource"`
|
|
ScopeSpans []ScopeSpans `json:"scopeSpans"`
|
|
}
|
|
|
|
// Resource describes the source of telemetry
|
|
type Resource struct {
|
|
Attributes []KeyValue `json:"attributes"`
|
|
}
|
|
|
|
// ScopeSpans groups spans by instrumentation scope
|
|
type ScopeSpans struct {
|
|
Scope InstrumentationScope `json:"scope"`
|
|
Spans []OTLPSpan `json:"spans"`
|
|
}
|
|
|
|
// InstrumentationScope identifies the instrumentation library
|
|
type InstrumentationScope struct {
|
|
Name string `json:"name"`
|
|
Version string `json:"version"`
|
|
}
|
|
|
|
// OTLPSpan represents a span in OTLP format
|
|
type OTLPSpan struct {
|
|
TraceID string `json:"traceId"`
|
|
SpanID string `json:"spanId"`
|
|
ParentSpanID string `json:"parentSpanId,omitempty"`
|
|
Name string `json:"name"`
|
|
Kind int `json:"kind"`
|
|
StartTimeUnixNano string `json:"startTimeUnixNano"`
|
|
EndTimeUnixNano string `json:"endTimeUnixNano"`
|
|
Attributes []KeyValue `json:"attributes,omitempty"`
|
|
Status SpanStatus `json:"status,omitempty"`
|
|
Events []Event `json:"events,omitempty"`
|
|
}
|
|
|
|
// SpanStatus represents the status of a span
|
|
type SpanStatus struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message,omitempty"`
|
|
}
|
|
|
|
// KeyValue is an OTLP key-value pair
|
|
type KeyValue struct {
|
|
Key string `json:"key"`
|
|
Value AnyValue `json:"value"`
|
|
}
|
|
|
|
// AnyValue can hold different types of values
|
|
type AnyValue struct {
|
|
StringValue string `json:"stringValue,omitempty"`
|
|
IntValue string `json:"intValue,omitempty"`
|
|
DoubleValue float64 `json:"doubleValue,omitempty"`
|
|
BoolValue bool `json:"boolValue,omitempty"`
|
|
ArrayValue *ArrayValue `json:"arrayValue,omitempty"`
|
|
}
|
|
|
|
// ArrayValue holds an array of values
|
|
type ArrayValue struct {
|
|
Values []AnyValue `json:"values"`
|
|
}
|
|
|
|
// Event represents a span event
|
|
type Event struct {
|
|
Name string `json:"name"`
|
|
TimeUnixNano string `json:"timeUnixNano"`
|
|
Attributes []KeyValue `json:"attributes,omitempty"`
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
// Internal Span Format (matches existing Ophion format)
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
type InternalSpan struct {
|
|
TraceID string `json:"trace_id"`
|
|
SpanID string `json:"span_id"`
|
|
ParentSpanID string `json:"parent_span_id,omitempty"`
|
|
Service string `json:"service"`
|
|
Operation string `json:"operation"`
|
|
StartTime time.Time `json:"start_time"`
|
|
EndTime time.Time `json:"end_time"`
|
|
DurationNs int64 `json:"duration_ns"`
|
|
StatusCode string `json:"status_code"`
|
|
StatusMsg string `json:"status_message,omitempty"`
|
|
Kind string `json:"kind"`
|
|
Attributes map[string]any `json:"attributes,omitempty"`
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
// HTTP Handler
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
// HandleTraces handles POST /v1/traces for OTLP trace ingestion
|
|
func (r *OTLPReceiver) HandleTraces(c *fiber.Ctx) error {
|
|
contentType := c.Get("Content-Type")
|
|
|
|
// OTLP supports both JSON and protobuf, we only support JSON
|
|
if !strings.Contains(contentType, "application/json") && contentType != "" {
|
|
return c.Status(415).JSON(fiber.Map{
|
|
"error": "Unsupported media type. Use application/json",
|
|
})
|
|
}
|
|
|
|
var req ExportTraceServiceRequest
|
|
if err := c.BodyParser(&req); err != nil {
|
|
log.Printf("OTLP parse error: %v", err)
|
|
return c.Status(400).JSON(fiber.Map{
|
|
"error": "Failed to parse OTLP request: " + err.Error(),
|
|
})
|
|
}
|
|
|
|
spans := r.convertOTLPToInternal(req)
|
|
|
|
if len(spans) == 0 {
|
|
return c.JSON(fiber.Map{
|
|
"partialSuccess": fiber.Map{
|
|
"rejectedSpans": 0,
|
|
},
|
|
})
|
|
}
|
|
|
|
// Save to database
|
|
savedCount := 0
|
|
for _, sp := range spans {
|
|
attrs, _ := json.Marshal(sp.Attributes)
|
|
_, err := r.db.Exec(`
|
|
INSERT INTO spans (trace_id, span_id, parent_span_id, service, operation, start_time, end_time, duration_ns, status_code, status_message, kind, attributes)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
|
|
sp.TraceID, sp.SpanID, sp.ParentSpanID, sp.Service, sp.Operation, sp.StartTime, sp.EndTime, sp.DurationNs, sp.StatusCode, sp.StatusMsg, sp.Kind, attrs)
|
|
if err != nil {
|
|
log.Printf("Error inserting OTLP span: %v", err)
|
|
} else {
|
|
savedCount++
|
|
}
|
|
}
|
|
|
|
log.Printf("📡 OTLP: Received %d spans, saved %d", len(spans), savedCount)
|
|
|
|
// OTLP response format
|
|
return c.JSON(fiber.Map{
|
|
"partialSuccess": fiber.Map{
|
|
"rejectedSpans": len(spans) - savedCount,
|
|
},
|
|
})
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────
|
|
// Conversion Functions
|
|
// ─────────────────────────────────────────────────────────────
|
|
|
|
func (r *OTLPReceiver) convertOTLPToInternal(req ExportTraceServiceRequest) []InternalSpan {
|
|
var result []InternalSpan
|
|
|
|
for _, rs := range req.ResourceSpans {
|
|
serviceName := extractServiceName(rs.Resource)
|
|
|
|
for _, ss := range rs.ScopeSpans {
|
|
for _, span := range ss.Spans {
|
|
internal := InternalSpan{
|
|
TraceID: normalizeTraceID(span.TraceID),
|
|
SpanID: normalizeSpanID(span.SpanID),
|
|
ParentSpanID: normalizeSpanID(span.ParentSpanID),
|
|
Service: serviceName,
|
|
Operation: span.Name,
|
|
StartTime: parseNanoTimestamp(span.StartTimeUnixNano),
|
|
EndTime: parseNanoTimestamp(span.EndTimeUnixNano),
|
|
StatusCode: convertStatusCode(span.Status.Code),
|
|
StatusMsg: span.Status.Message,
|
|
Kind: convertSpanKind(span.Kind),
|
|
Attributes: convertAttributes(span.Attributes),
|
|
}
|
|
|
|
// Calculate duration
|
|
internal.DurationNs = internal.EndTime.Sub(internal.StartTime).Nanoseconds()
|
|
|
|
// Add scope info to attributes
|
|
if ss.Scope.Name != "" {
|
|
if internal.Attributes == nil {
|
|
internal.Attributes = make(map[string]any)
|
|
}
|
|
internal.Attributes["otel.library.name"] = ss.Scope.Name
|
|
if ss.Scope.Version != "" {
|
|
internal.Attributes["otel.library.version"] = ss.Scope.Version
|
|
}
|
|
}
|
|
|
|
// Add events as attribute if present
|
|
if len(span.Events) > 0 {
|
|
if internal.Attributes == nil {
|
|
internal.Attributes = make(map[string]any)
|
|
}
|
|
internal.Attributes["span.events"] = convertEvents(span.Events)
|
|
}
|
|
|
|
result = append(result, internal)
|
|
}
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// extractServiceName gets service.name from resource attributes
|
|
func extractServiceName(resource Resource) string {
|
|
for _, attr := range resource.Attributes {
|
|
if attr.Key == "service.name" {
|
|
return getStringValue(attr.Value)
|
|
}
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
// normalizeTraceID converts trace ID to consistent format (32 hex chars)
|
|
func normalizeTraceID(id string) string {
|
|
if id == "" {
|
|
return ""
|
|
}
|
|
// If it's base64, decode it
|
|
if len(id) > 32 {
|
|
if decoded, err := hex.DecodeString(id); err == nil {
|
|
return hex.EncodeToString(decoded)
|
|
}
|
|
}
|
|
// Remove any non-hex characters and ensure lowercase
|
|
return strings.ToLower(strings.TrimSpace(id))
|
|
}
|
|
|
|
// normalizeSpanID converts span ID to consistent format (16 hex chars)
|
|
func normalizeSpanID(id string) string {
|
|
if id == "" {
|
|
return ""
|
|
}
|
|
return strings.ToLower(strings.TrimSpace(id))
|
|
}
|
|
|
|
// parseNanoTimestamp converts nanosecond timestamp string to time.Time
|
|
func parseNanoTimestamp(ns string) time.Time {
|
|
if ns == "" {
|
|
return time.Time{}
|
|
}
|
|
// Parse as int64 - try direct string parsing first
|
|
var nanos int64
|
|
for _, c := range ns {
|
|
if c >= '0' && c <= '9' {
|
|
nanos = nanos*10 + int64(c-'0')
|
|
}
|
|
}
|
|
if nanos == 0 {
|
|
// Fallback: try JSON unmarshal
|
|
json.Unmarshal([]byte(ns), &nanos)
|
|
}
|
|
return time.Unix(0, nanos)
|
|
}
|
|
|
|
// convertStatusCode converts OTLP status code to string
|
|
func convertStatusCode(code int) string {
|
|
switch code {
|
|
case 0:
|
|
return "UNSET"
|
|
case 1:
|
|
return "OK"
|
|
case 2:
|
|
return "ERROR"
|
|
default:
|
|
return "UNSET"
|
|
}
|
|
}
|
|
|
|
// convertSpanKind converts OTLP span kind to string
|
|
func convertSpanKind(kind int) string {
|
|
switch kind {
|
|
case 0:
|
|
return "UNSPECIFIED"
|
|
case 1:
|
|
return "INTERNAL"
|
|
case 2:
|
|
return "SERVER"
|
|
case 3:
|
|
return "CLIENT"
|
|
case 4:
|
|
return "PRODUCER"
|
|
case 5:
|
|
return "CONSUMER"
|
|
default:
|
|
return "INTERNAL"
|
|
}
|
|
}
|
|
|
|
// convertAttributes converts OTLP KeyValue array to map
|
|
func convertAttributes(attrs []KeyValue) map[string]any {
|
|
if len(attrs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
result := make(map[string]any, len(attrs))
|
|
for _, attr := range attrs {
|
|
result[attr.Key] = getAnyValue(attr.Value)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// getStringValue extracts string value from AnyValue
|
|
func getStringValue(v AnyValue) string {
|
|
if v.StringValue != "" {
|
|
return v.StringValue
|
|
}
|
|
if v.IntValue != "" {
|
|
return v.IntValue
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// getAnyValue extracts the actual value from AnyValue
|
|
func getAnyValue(v AnyValue) any {
|
|
if v.StringValue != "" {
|
|
return v.StringValue
|
|
}
|
|
if v.IntValue != "" {
|
|
return v.IntValue
|
|
}
|
|
if v.DoubleValue != 0 {
|
|
return v.DoubleValue
|
|
}
|
|
if v.BoolValue {
|
|
return v.BoolValue
|
|
}
|
|
if v.ArrayValue != nil {
|
|
var arr []any
|
|
for _, av := range v.ArrayValue.Values {
|
|
arr = append(arr, getAnyValue(av))
|
|
}
|
|
return arr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// convertEvents converts OTLP events to a serializable format
|
|
func convertEvents(events []Event) []map[string]any {
|
|
var result []map[string]any
|
|
for _, e := range events {
|
|
ev := map[string]any{
|
|
"name": e.Name,
|
|
"time": parseNanoTimestamp(e.TimeUnixNano),
|
|
}
|
|
if len(e.Attributes) > 0 {
|
|
ev["attributes"] = convertAttributes(e.Attributes)
|
|
}
|
|
result = append(result, ev)
|
|
}
|
|
return result
|
|
}
|