Files
ophion/cmd/agent/main.go
2026-02-06 14:26:15 -03:00

346 lines
9.1 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"
psnet "github.com/shirou/gopsutil/v3/net"
)
// ═══════════════════════════════════════════════════════════
// 🐍 OPHION Agent - Observability Collector
// ═══════════════════════════════════════════════════════════
type Config struct {
ServerURL string
APIKey string
Hostname string
CollectInterval time.Duration
DockerEnabled bool
}
type Metric struct {
Timestamp time.Time `json:"timestamp"`
Service string `json:"service"`
Host string `json:"host"`
Name string `json:"name"`
Value float64 `json:"value"`
MetricType string `json:"metric_type"`
Tags map[string]string `json:"tags,omitempty"`
}
type ContainerStats struct {
ID string `json:"id"`
Name string `json:"name"`
CPUPercent float64 `json:"cpu_percent"`
MemoryUsage uint64 `json:"memory_usage"`
MemoryLimit uint64 `json:"memory_limit"`
MemoryPercent float64 `json:"memory_percent"`
NetRx uint64 `json:"net_rx"`
NetTx uint64 `json:"net_tx"`
State string `json:"state"`
}
func main() {
config := loadConfig()
log.Printf("🐍 OPHION Agent starting")
log.Printf(" Server: %s", config.ServerURL)
log.Printf(" Host: %s", config.Hostname)
log.Printf(" Interval: %s", config.CollectInterval)
log.Printf(" Docker: %v", config.DockerEnabled)
// Handle shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
ticker := time.NewTicker(config.CollectInterval)
defer ticker.Stop()
// Collect immediately
collect(config)
for {
select {
case <-sigCh:
log.Println("🛑 Shutting down agent...")
return
case <-ticker.C:
collect(config)
}
}
}
func loadConfig() *Config {
hostname, _ := os.Hostname()
interval := 30 * time.Second
if v := os.Getenv("OPHION_INTERVAL"); v != "" {
if d, err := time.ParseDuration(v); err == nil {
interval = d
}
}
dockerEnabled := true
if v := os.Getenv("OPHION_DOCKER"); v == "false" || v == "0" {
dockerEnabled = false
}
return &Config{
ServerURL: getEnv("OPHION_SERVER", "http://localhost:8080"),
APIKey: getEnv("OPHION_API_KEY", ""),
Hostname: getEnv("OPHION_HOSTNAME", hostname),
CollectInterval: interval,
DockerEnabled: dockerEnabled,
}
}
func getEnv(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func collect(config *Config) {
var metrics []Metric
now := time.Now()
host := config.Hostname
// System metrics
metrics = append(metrics, collectSystemMetrics(now, host)...)
// Docker metrics
if config.DockerEnabled {
metrics = append(metrics, collectDockerMetrics(now, host)...)
}
// Send to server
if len(metrics) > 0 {
sendMetrics(config, metrics)
}
}
func collectSystemMetrics(now time.Time, hostname string) []Metric {
var metrics []Metric
svc := "system"
// CPU
cpuPercent, err := cpu.Percent(time.Second, false)
if err == nil && len(cpuPercent) > 0 {
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "cpu.usage_percent", Value: cpuPercent[0], MetricType: "gauge",
})
}
// Load average
loadAvg, err := load.Avg()
if err == nil {
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "cpu.load_avg_1", Value: loadAvg.Load1, MetricType: "gauge",
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "cpu.load_avg_5", Value: loadAvg.Load5, MetricType: "gauge",
})
}
// Memory
memInfo, err := mem.VirtualMemory()
if err == nil {
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "memory.used_percent", Value: memInfo.UsedPercent, MetricType: "gauge",
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "memory.used_bytes", Value: float64(memInfo.Used), MetricType: "gauge",
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "memory.available_bytes", Value: float64(memInfo.Available), MetricType: "gauge",
})
}
// Disk
partitions, _ := disk.Partitions(false)
for _, p := range partitions {
if strings.HasPrefix(p.Mountpoint, "/snap") ||
strings.HasPrefix(p.Mountpoint, "/sys") ||
strings.HasPrefix(p.Mountpoint, "/proc") ||
strings.HasPrefix(p.Mountpoint, "/dev") ||
strings.HasPrefix(p.Mountpoint, "/run") {
continue
}
usage, err := disk.Usage(p.Mountpoint)
if err != nil {
continue
}
tags := map[string]string{"path": p.Mountpoint, "device": p.Device}
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "disk.used_percent", Value: usage.UsedPercent, MetricType: "gauge", Tags: tags,
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "disk.used_bytes", Value: float64(usage.Used), MetricType: "gauge", Tags: tags,
})
}
// Network
netIO, err := psnet.IOCounters(false)
if err == nil && len(netIO) > 0 {
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "network.bytes_sent", Value: float64(netIO[0].BytesSent), MetricType: "counter",
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "network.bytes_recv", Value: float64(netIO[0].BytesRecv), MetricType: "counter",
})
}
// Uptime
hostInfo, err := host.Info()
if err == nil {
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "host.uptime_seconds", Value: float64(hostInfo.Uptime), MetricType: "gauge",
})
}
// Cores
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "host.cpu_cores", Value: float64(runtime.NumCPU()), MetricType: "gauge",
})
return metrics
}
func collectDockerMetrics(now time.Time, hostname string) []Metric {
var metrics []Metric
svc := "docker"
// Check if docker is available
if _, err := exec.LookPath("docker"); err != nil {
return metrics
}
// Get container stats using docker CLI (simpler, no SDK needed)
out, err := exec.Command("docker", "stats", "--no-stream", "--format",
"{{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}").Output()
if err != nil {
return metrics
}
lines := strings.Split(strings.TrimSpace(string(out)), "\n")
runningCount := 0
for _, line := range lines {
if line == "" {
continue
}
runningCount++
parts := strings.Split(line, "\t")
if len(parts) < 6 {
continue
}
id := parts[0]
name := parts[1]
cpuStr := strings.TrimSuffix(parts[2], "%")
memStr := strings.TrimSuffix(parts[4], "%")
var cpuPercent, memPercent float64
fmt.Sscanf(cpuStr, "%f", &cpuPercent)
fmt.Sscanf(memStr, "%f", &memPercent)
tags := map[string]string{"container": name, "container_id": id}
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "container.cpu_percent", Value: cpuPercent, MetricType: "gauge", Tags: tags,
})
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "container.memory_percent", Value: memPercent, MetricType: "gauge", Tags: tags,
})
}
// Total containers
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "containers.running", Value: float64(runningCount), MetricType: "gauge",
})
// Get all containers count
out, err = exec.Command("docker", "ps", "-a", "-q").Output()
if err == nil {
total := len(strings.Split(strings.TrimSpace(string(out)), "\n"))
if strings.TrimSpace(string(out)) == "" {
total = 0
}
metrics = append(metrics, Metric{
Timestamp: now, Service: svc, Host: hostname,
Name: "containers.total", Value: float64(total), MetricType: "gauge",
})
}
return metrics
}
func sendMetrics(config *Config, metrics []Metric) {
data, err := json.Marshal(map[string]any{"metrics": metrics})
if err != nil {
log.Printf("Error marshaling metrics: %v", err)
return
}
req, err := http.NewRequest("POST", config.ServerURL+"/api/v1/metrics", bytes.NewBuffer(data))
if err != nil {
log.Printf("Error creating request: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
if config.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+config.APIKey)
}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("Error sending metrics: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
log.Printf("Server returned error: %d", resp.StatusCode)
return
}
log.Printf("📤 Sent %d metrics", len(metrics))
}