package main import ( "bytes" "encoding/json" "fmt" "log" "net/http" "os" "os/exec" "os/signal" "runtime" "strings" "syscall" "time" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/mem" psnet "github.com/shirou/gopsutil/v3/net" ) // ═══════════════════════════════════════════════════════════ // 🐍 OPHION Agent - Observability Collector // ═══════════════════════════════════════════════════════════ type Config struct { ServerURL string APIKey string Hostname string CollectInterval time.Duration DockerEnabled bool } type Metric struct { Timestamp time.Time `json:"timestamp"` Service string `json:"service"` Host string `json:"host"` Name string `json:"name"` Value float64 `json:"value"` MetricType string `json:"metric_type"` Tags map[string]string `json:"tags,omitempty"` } type ContainerStats struct { ID string `json:"id"` Name string `json:"name"` CPUPercent float64 `json:"cpu_percent"` MemoryUsage uint64 `json:"memory_usage"` MemoryLimit uint64 `json:"memory_limit"` MemoryPercent float64 `json:"memory_percent"` NetRx uint64 `json:"net_rx"` NetTx uint64 `json:"net_tx"` State string `json:"state"` } func main() { config := loadConfig() log.Printf("🐍 OPHION Agent starting") log.Printf(" Server: %s", config.ServerURL) log.Printf(" Host: %s", config.Hostname) log.Printf(" Interval: %s", config.CollectInterval) log.Printf(" Docker: %v", config.DockerEnabled) // Handle shutdown sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) ticker := time.NewTicker(config.CollectInterval) defer ticker.Stop() // Collect immediately collect(config) for { select { case <-sigCh: log.Println("🛑 Shutting down agent...") return case <-ticker.C: collect(config) } } } func loadConfig() *Config { hostname, _ := os.Hostname() interval := 30 * time.Second if v := os.Getenv("OPHION_INTERVAL"); v != "" { if d, err := time.ParseDuration(v); err == nil { interval = d } } dockerEnabled := true if v := os.Getenv("OPHION_DOCKER"); v == "false" || v == "0" { dockerEnabled = false } return &Config{ ServerURL: getEnv("OPHION_SERVER", "http://localhost:8080"), APIKey: getEnv("OPHION_API_KEY", getEnv("AGENT_KEY", "")), Hostname: getEnv("OPHION_HOSTNAME", hostname), CollectInterval: interval, DockerEnabled: dockerEnabled, } } func getEnv(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func collect(config *Config) { var metrics []Metric now := time.Now() host := config.Hostname // System metrics metrics = append(metrics, collectSystemMetrics(now, host)...) // Docker metrics if config.DockerEnabled { metrics = append(metrics, collectDockerMetrics(now, host)...) } // Send to server if len(metrics) > 0 { sendMetrics(config, metrics) } } func collectSystemMetrics(now time.Time, hostname string) []Metric { var metrics []Metric svc := "system" // CPU cpuPercent, err := cpu.Percent(time.Second, false) if err == nil && len(cpuPercent) > 0 { metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "cpu.usage_percent", Value: cpuPercent[0], MetricType: "gauge", }) } // Load average loadAvg, err := load.Avg() if err == nil { metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "cpu.load_avg_1", Value: loadAvg.Load1, MetricType: "gauge", }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "cpu.load_avg_5", Value: loadAvg.Load5, MetricType: "gauge", }) } // Memory memInfo, err := mem.VirtualMemory() if err == nil { metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "memory.used_percent", Value: memInfo.UsedPercent, MetricType: "gauge", }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "memory.used_bytes", Value: float64(memInfo.Used), MetricType: "gauge", }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "memory.available_bytes", Value: float64(memInfo.Available), MetricType: "gauge", }) } // Disk partitions, _ := disk.Partitions(false) for _, p := range partitions { if strings.HasPrefix(p.Mountpoint, "/snap") || strings.HasPrefix(p.Mountpoint, "/sys") || strings.HasPrefix(p.Mountpoint, "/proc") || strings.HasPrefix(p.Mountpoint, "/dev") || strings.HasPrefix(p.Mountpoint, "/run") { continue } usage, err := disk.Usage(p.Mountpoint) if err != nil { continue } tags := map[string]string{"path": p.Mountpoint, "device": p.Device} metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "disk.used_percent", Value: usage.UsedPercent, MetricType: "gauge", Tags: tags, }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "disk.used_bytes", Value: float64(usage.Used), MetricType: "gauge", Tags: tags, }) } // Network netIO, err := psnet.IOCounters(false) if err == nil && len(netIO) > 0 { metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "network.bytes_sent", Value: float64(netIO[0].BytesSent), MetricType: "counter", }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "network.bytes_recv", Value: float64(netIO[0].BytesRecv), MetricType: "counter", }) } // Uptime hostInfo, err := host.Info() if err == nil { metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "host.uptime_seconds", Value: float64(hostInfo.Uptime), MetricType: "gauge", }) } // Cores metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "host.cpu_cores", Value: float64(runtime.NumCPU()), MetricType: "gauge", }) return metrics } func collectDockerMetrics(now time.Time, hostname string) []Metric { var metrics []Metric svc := "docker" // Check if docker is available if _, err := exec.LookPath("docker"); err != nil { return metrics } // Get container stats using docker CLI (simpler, no SDK needed) out, err := exec.Command("docker", "stats", "--no-stream", "--format", "{{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}").Output() if err != nil { return metrics } lines := strings.Split(strings.TrimSpace(string(out)), "\n") runningCount := 0 for _, line := range lines { if line == "" { continue } runningCount++ parts := strings.Split(line, "\t") if len(parts) < 6 { continue } id := parts[0] name := parts[1] cpuStr := strings.TrimSuffix(parts[2], "%") memStr := strings.TrimSuffix(parts[4], "%") var cpuPercent, memPercent float64 fmt.Sscanf(cpuStr, "%f", &cpuPercent) fmt.Sscanf(memStr, "%f", &memPercent) tags := map[string]string{"container": name, "container_id": id} metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "container.cpu_percent", Value: cpuPercent, MetricType: "gauge", Tags: tags, }) metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "container.memory_percent", Value: memPercent, MetricType: "gauge", Tags: tags, }) } // Total containers metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "containers.running", Value: float64(runningCount), MetricType: "gauge", }) // Get all containers count out, err = exec.Command("docker", "ps", "-a", "-q").Output() if err == nil { total := len(strings.Split(strings.TrimSpace(string(out)), "\n")) if strings.TrimSpace(string(out)) == "" { total = 0 } metrics = append(metrics, Metric{ Timestamp: now, Service: svc, Host: hostname, Name: "containers.total", Value: float64(total), MetricType: "gauge", }) } return metrics } func sendMetrics(config *Config, metrics []Metric) { data, err := json.Marshal(map[string]any{"metrics": metrics}) if err != nil { log.Printf("Error marshaling metrics: %v", err) return } req, err := http.NewRequest("POST", config.ServerURL+"/api/v1/metrics", bytes.NewBuffer(data)) if err != nil { log.Printf("Error creating request: %v", err) return } req.Header.Set("Content-Type", "application/json") if config.APIKey != "" { req.Header.Set("Authorization", "Bearer "+config.APIKey) } client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { log.Printf("Error sending metrics: %v", err) return } defer resp.Body.Close() if resp.StatusCode >= 400 { log.Printf("Server returned error: %d", resp.StatusCode) return } log.Printf("📤 Sent %d metrics", len(metrics)) }