/* Health Daemon — Continuous health monitoring for Volt workloads. Unlike deploy-time health checks (which verify a single instance during deployment), the health daemon runs continuously, monitoring all configured workloads and taking action when they become unhealthy. Features: - HTTP, TCP, and exec health checks - Configurable intervals and thresholds - Auto-restart on sustained unhealthy state - Health status API for monitoring integrations - Event emission for webhook/notification systems Configuration is stored in /etc/volt/health/ as YAML files, one per workload. Copyright (c) Armored Gates LLC. All rights reserved. */ package healthd import ( "context" "encoding/json" "fmt" "net" "net/http" "os" "os/exec" "path/filepath" "strings" "sync" "time" "gopkg.in/yaml.v3" ) // ── Constants ──────────────────────────────────────────────────────────────── const ( // DefaultHealthDir stores health check configurations. DefaultHealthDir = "/etc/volt/health" // DefaultStatusDir stores runtime health status. DefaultStatusDir = "/var/lib/volt/health" ) // ── Health Check Config ────────────────────────────────────────────────────── // CheckType defines the type of health check. type CheckType string const ( CheckHTTP CheckType = "http" CheckTCP CheckType = "tcp" CheckExec CheckType = "exec" ) // Config defines a health check configuration for a workload. type Config struct { Workload string `yaml:"workload" json:"workload"` Type CheckType `yaml:"type" json:"type"` Target string `yaml:"target" json:"target"` // URL path for HTTP, port for TCP, command for exec Port int `yaml:"port,omitempty" json:"port,omitempty"` Interval time.Duration `yaml:"interval" json:"interval"` Timeout time.Duration `yaml:"timeout" json:"timeout"` Retries int `yaml:"retries" json:"retries"` // Failures before unhealthy AutoRestart bool `yaml:"auto_restart" json:"auto_restart"` MaxRestarts int `yaml:"max_restarts" json:"max_restarts"` // 0 = unlimited RestartDelay time.Duration `yaml:"restart_delay" json:"restart_delay"` Enabled bool `yaml:"enabled" json:"enabled"` } // Validate checks that a health config is valid and fills defaults. func (c *Config) Validate() error { if c.Workload == "" { return fmt.Errorf("healthd: workload name required") } switch c.Type { case CheckHTTP: if c.Target == "" { c.Target = "/healthz" } if c.Port == 0 { c.Port = 8080 } case CheckTCP: if c.Port == 0 { return fmt.Errorf("healthd: TCP check requires port") } case CheckExec: if c.Target == "" { return fmt.Errorf("healthd: exec check requires command") } default: return fmt.Errorf("healthd: unknown check type %q", c.Type) } if c.Interval <= 0 { c.Interval = 30 * time.Second } if c.Timeout <= 0 { c.Timeout = 5 * time.Second } if c.Retries <= 0 { c.Retries = 3 } if c.RestartDelay <= 0 { c.RestartDelay = 10 * time.Second } return nil } // ── Health Status ──────────────────────────────────────────────────────────── // Status represents the current health state of a workload. type Status struct { Workload string `json:"workload" yaml:"workload"` Healthy bool `json:"healthy" yaml:"healthy"` LastCheck time.Time `json:"last_check" yaml:"last_check"` LastHealthy time.Time `json:"last_healthy,omitempty" yaml:"last_healthy,omitempty"` ConsecutiveFails int `json:"consecutive_fails" yaml:"consecutive_fails"` TotalChecks int64 `json:"total_checks" yaml:"total_checks"` TotalFails int64 `json:"total_fails" yaml:"total_fails"` RestartCount int `json:"restart_count" yaml:"restart_count"` LastError string `json:"last_error,omitempty" yaml:"last_error,omitempty"` LastRestart time.Time `json:"last_restart,omitempty" yaml:"last_restart,omitempty"` } // ── IP Resolver ────────────────────────────────────────────────────────────── // IPResolver maps a workload name to its IP address. type IPResolver func(workload string) (string, error) // DefaultIPResolver tries to resolve via machinectl show. func DefaultIPResolver(workload string) (string, error) { out, err := exec.Command("machinectl", "show", workload, "-p", "Addresses").CombinedOutput() if err != nil { return "127.0.0.1", nil // Fallback to localhost } line := strings.TrimSpace(string(out)) if strings.HasPrefix(line, "Addresses=") { addrs := strings.TrimPrefix(line, "Addresses=") // Take first address parts := strings.Fields(addrs) if len(parts) > 0 { return parts[0], nil } } return "127.0.0.1", nil } // ── Restart Handler ────────────────────────────────────────────────────────── // RestartFunc defines how to restart a workload. type RestartFunc func(workload string) error // DefaultRestartFunc restarts via systemctl. func DefaultRestartFunc(workload string) error { unit := fmt.Sprintf("volt-container@%s.service", workload) return exec.Command("systemctl", "restart", unit).Run() } // ── Event Handler ──────────────────────────────────────────────────────────── // EventType describes health daemon events. type EventType string const ( EventHealthy EventType = "healthy" EventUnhealthy EventType = "unhealthy" EventRestart EventType = "restart" EventCheckFail EventType = "check_fail" ) // Event is emitted when health state changes. type Event struct { Type EventType `json:"type"` Workload string `json:"workload"` Timestamp time.Time `json:"timestamp"` Message string `json:"message"` } // EventHandler is called when health events occur. type EventHandler func(event Event) // ── Health Daemon ──────────────────────────────────────────────────────────── // Daemon manages continuous health monitoring for multiple workloads. type Daemon struct { configDir string statusDir string ipResolver IPResolver restartFunc RestartFunc eventHandler EventHandler configs map[string]*Config statuses map[string]*Status mu sync.RWMutex cancel context.CancelFunc wg sync.WaitGroup } // NewDaemon creates a health monitoring daemon. func NewDaemon(configDir, statusDir string) *Daemon { if configDir == "" { configDir = DefaultHealthDir } if statusDir == "" { statusDir = DefaultStatusDir } return &Daemon{ configDir: configDir, statusDir: statusDir, ipResolver: DefaultIPResolver, restartFunc: DefaultRestartFunc, configs: make(map[string]*Config), statuses: make(map[string]*Status), } } // SetIPResolver sets a custom IP resolver. func (d *Daemon) SetIPResolver(resolver IPResolver) { d.ipResolver = resolver } // SetRestartFunc sets a custom restart function. func (d *Daemon) SetRestartFunc(fn RestartFunc) { d.restartFunc = fn } // SetEventHandler sets the event callback. func (d *Daemon) SetEventHandler(handler EventHandler) { d.eventHandler = handler } // LoadConfigs reads all health check configurations from disk. func (d *Daemon) LoadConfigs() error { d.mu.Lock() defer d.mu.Unlock() files, err := filepath.Glob(filepath.Join(d.configDir, "*.yaml")) if err != nil { return fmt.Errorf("healthd: glob configs: %w", err) } for _, f := range files { data, err := os.ReadFile(f) if err != nil { continue } var cfg Config if err := yaml.Unmarshal(data, &cfg); err != nil { continue } if err := cfg.Validate(); err != nil { fmt.Fprintf(os.Stderr, "healthd: invalid config %s: %v\n", f, err) continue } if cfg.Enabled { d.configs[cfg.Workload] = &cfg } } return nil } // Start begins monitoring all configured workloads. func (d *Daemon) Start(ctx context.Context) error { if err := d.LoadConfigs(); err != nil { return err } ctx, d.cancel = context.WithCancel(ctx) d.mu.RLock() configs := make([]*Config, 0, len(d.configs)) for _, cfg := range d.configs { configs = append(configs, cfg) } d.mu.RUnlock() for _, cfg := range configs { d.wg.Add(1) go d.monitorLoop(ctx, cfg) } return nil } // Stop gracefully stops the health daemon. func (d *Daemon) Stop() { if d.cancel != nil { d.cancel() } d.wg.Wait() d.saveStatuses() } // GetStatus returns the health status of a workload. func (d *Daemon) GetStatus(workload string) *Status { d.mu.RLock() defer d.mu.RUnlock() if s, ok := d.statuses[workload]; ok { cp := *s return &cp } return nil } // GetAllStatuses returns health status of all monitored workloads. func (d *Daemon) GetAllStatuses() []Status { d.mu.RLock() defer d.mu.RUnlock() result := make([]Status, 0, len(d.statuses)) for _, s := range d.statuses { result = append(result, *s) } return result } // ── Configuration Management (CLI) ────────────────────────────────────────── // ConfigureCheck writes or updates a health check configuration. func ConfigureCheck(configDir string, cfg Config) error { if configDir == "" { configDir = DefaultHealthDir } if err := cfg.Validate(); err != nil { return err } if err := os.MkdirAll(configDir, 0755); err != nil { return fmt.Errorf("healthd: create config dir: %w", err) } data, err := yaml.Marshal(cfg) if err != nil { return fmt.Errorf("healthd: marshal config: %w", err) } path := filepath.Join(configDir, cfg.Workload+".yaml") return os.WriteFile(path, data, 0644) } // RemoveCheck removes a health check configuration. func RemoveCheck(configDir string, workload string) error { if configDir == "" { configDir = DefaultHealthDir } path := filepath.Join(configDir, workload+".yaml") if err := os.Remove(path); err != nil && !os.IsNotExist(err) { return fmt.Errorf("healthd: remove config: %w", err) } return nil } // ListConfigs returns all configured health checks. func ListConfigs(configDir string) ([]Config, error) { if configDir == "" { configDir = DefaultHealthDir } files, err := filepath.Glob(filepath.Join(configDir, "*.yaml")) if err != nil { return nil, err } var configs []Config for _, f := range files { data, err := os.ReadFile(f) if err != nil { continue } var cfg Config if err := yaml.Unmarshal(data, &cfg); err != nil { continue } configs = append(configs, cfg) } return configs, nil } // LoadStatuses reads saved health statuses from disk. func LoadStatuses(statusDir string) ([]Status, error) { if statusDir == "" { statusDir = DefaultStatusDir } path := filepath.Join(statusDir, "statuses.json") data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } var statuses []Status if err := json.Unmarshal(data, &statuses); err != nil { return nil, err } return statuses, nil } // ── Monitor Loop ───────────────────────────────────────────────────────────── func (d *Daemon) monitorLoop(ctx context.Context, cfg *Config) { defer d.wg.Done() // Initialize status d.mu.Lock() d.statuses[cfg.Workload] = &Status{ Workload: cfg.Workload, Healthy: true, // Assume healthy until proven otherwise } d.mu.Unlock() ticker := time.NewTicker(cfg.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: d.runCheck(cfg) } } } func (d *Daemon) runCheck(cfg *Config) { d.mu.Lock() status := d.statuses[cfg.Workload] d.mu.Unlock() status.TotalChecks++ status.LastCheck = time.Now() var err error switch cfg.Type { case CheckHTTP: err = d.checkHTTP(cfg) case CheckTCP: err = d.checkTCP(cfg) case CheckExec: err = d.checkExec(cfg) } if err != nil { status.TotalFails++ status.ConsecutiveFails++ status.LastError = err.Error() d.emitEvent(Event{ Type: EventCheckFail, Workload: cfg.Workload, Timestamp: time.Now(), Message: err.Error(), }) // Check if we've exceeded the failure threshold if status.ConsecutiveFails >= cfg.Retries { wasHealthy := status.Healthy status.Healthy = false if wasHealthy { d.emitEvent(Event{ Type: EventUnhealthy, Workload: cfg.Workload, Timestamp: time.Now(), Message: fmt.Sprintf("health check failed %d times: %s", status.ConsecutiveFails, err.Error()), }) } // Auto-restart if configured if cfg.AutoRestart { if cfg.MaxRestarts == 0 || status.RestartCount < cfg.MaxRestarts { d.handleRestart(cfg, status) } } } } else { wasUnhealthy := !status.Healthy status.Healthy = true status.ConsecutiveFails = 0 status.LastHealthy = time.Now() status.LastError = "" if wasUnhealthy { d.emitEvent(Event{ Type: EventHealthy, Workload: cfg.Workload, Timestamp: time.Now(), Message: "health check recovered", }) } } } func (d *Daemon) checkHTTP(cfg *Config) error { ip, err := d.ipResolver(cfg.Workload) if err != nil { return fmt.Errorf("resolve IP: %w", err) } url := fmt.Sprintf("http://%s:%d%s", ip, cfg.Port, cfg.Target) client := &http.Client{Timeout: cfg.Timeout} resp, err := client.Get(url) if err != nil { return fmt.Errorf("HTTP check failed: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 400 { return fmt.Errorf("HTTP %d from %s", resp.StatusCode, url) } return nil } func (d *Daemon) checkTCP(cfg *Config) error { ip, err := d.ipResolver(cfg.Workload) if err != nil { return fmt.Errorf("resolve IP: %w", err) } addr := fmt.Sprintf("%s:%d", ip, cfg.Port) conn, err := net.DialTimeout("tcp", addr, cfg.Timeout) if err != nil { return fmt.Errorf("TCP check failed: %w", err) } conn.Close() return nil } func (d *Daemon) checkExec(cfg *Config) error { ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) defer cancel() cmd := exec.CommandContext(ctx, "sh", "-c", cfg.Target) if err := cmd.Run(); err != nil { return fmt.Errorf("exec check failed: %w", err) } return nil } func (d *Daemon) handleRestart(cfg *Config, status *Status) { // Respect restart delay if !status.LastRestart.IsZero() && time.Since(status.LastRestart) < cfg.RestartDelay { return } d.emitEvent(Event{ Type: EventRestart, Workload: cfg.Workload, Timestamp: time.Now(), Message: fmt.Sprintf("auto-restarting (attempt %d)", status.RestartCount+1), }) if err := d.restartFunc(cfg.Workload); err != nil { fmt.Fprintf(os.Stderr, "healthd: restart %s failed: %v\n", cfg.Workload, err) return } status.RestartCount++ status.LastRestart = time.Now() status.ConsecutiveFails = 0 // Reset after restart, let it prove healthy } func (d *Daemon) emitEvent(event Event) { if d.eventHandler != nil { d.eventHandler(event) } } func (d *Daemon) saveStatuses() { d.mu.RLock() statuses := make([]Status, 0, len(d.statuses)) for _, s := range d.statuses { statuses = append(statuses, *s) } d.mu.RUnlock() os.MkdirAll(d.statusDir, 0755) data, err := json.MarshalIndent(statuses, "", " ") if err != nil { return } os.WriteFile(filepath.Join(d.statusDir, "statuses.json"), data, 0644) }