Volt CLI: source-available under AGPSL v5.0
Complete infrastructure platform CLI: - Container runtime (systemd-nspawn) - VoltVisor VMs (Neutron Stardust / QEMU) - Stellarium CAS (content-addressed storage) - ORAS Registry - GitOps integration - Landlock LSM security - Compose orchestration - Mesh networking Copyright (c) Armored Gates LLC. All rights reserved. Licensed under AGPSL v5.0
This commit is contained in:
594
pkg/healthd/healthd.go
Normal file
594
pkg/healthd/healthd.go
Normal file
@@ -0,0 +1,594 @@
|
||||
/*
|
||||
Health Daemon — Continuous health monitoring for Volt workloads.
|
||||
|
||||
Unlike deploy-time health checks (which verify a single instance during
|
||||
deployment), the health daemon runs continuously, monitoring all
|
||||
configured workloads and taking action when they become unhealthy.
|
||||
|
||||
Features:
|
||||
- HTTP, TCP, and exec health checks
|
||||
- Configurable intervals and thresholds
|
||||
- Auto-restart on sustained unhealthy state
|
||||
- Health status API for monitoring integrations
|
||||
- Event emission for webhook/notification systems
|
||||
|
||||
Configuration is stored in /etc/volt/health/ as YAML files, one per
|
||||
workload.
|
||||
|
||||
Copyright (c) Armored Gates LLC. All rights reserved.
|
||||
*/
|
||||
package healthd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
const (
|
||||
// DefaultHealthDir stores health check configurations.
|
||||
DefaultHealthDir = "/etc/volt/health"
|
||||
|
||||
// DefaultStatusDir stores runtime health status.
|
||||
DefaultStatusDir = "/var/lib/volt/health"
|
||||
)
|
||||
|
||||
// ── Health Check Config ──────────────────────────────────────────────────────
|
||||
|
||||
// CheckType defines the type of health check.
|
||||
type CheckType string
|
||||
|
||||
const (
|
||||
CheckHTTP CheckType = "http"
|
||||
CheckTCP CheckType = "tcp"
|
||||
CheckExec CheckType = "exec"
|
||||
)
|
||||
|
||||
// Config defines a health check configuration for a workload.
|
||||
type Config struct {
|
||||
Workload string `yaml:"workload" json:"workload"`
|
||||
Type CheckType `yaml:"type" json:"type"`
|
||||
Target string `yaml:"target" json:"target"` // URL path for HTTP, port for TCP, command for exec
|
||||
Port int `yaml:"port,omitempty" json:"port,omitempty"`
|
||||
Interval time.Duration `yaml:"interval" json:"interval"`
|
||||
Timeout time.Duration `yaml:"timeout" json:"timeout"`
|
||||
Retries int `yaml:"retries" json:"retries"` // Failures before unhealthy
|
||||
AutoRestart bool `yaml:"auto_restart" json:"auto_restart"`
|
||||
MaxRestarts int `yaml:"max_restarts" json:"max_restarts"` // 0 = unlimited
|
||||
RestartDelay time.Duration `yaml:"restart_delay" json:"restart_delay"`
|
||||
Enabled bool `yaml:"enabled" json:"enabled"`
|
||||
}
|
||||
|
||||
// Validate checks that a health config is valid and fills defaults.
|
||||
func (c *Config) Validate() error {
|
||||
if c.Workload == "" {
|
||||
return fmt.Errorf("healthd: workload name required")
|
||||
}
|
||||
switch c.Type {
|
||||
case CheckHTTP:
|
||||
if c.Target == "" {
|
||||
c.Target = "/healthz"
|
||||
}
|
||||
if c.Port == 0 {
|
||||
c.Port = 8080
|
||||
}
|
||||
case CheckTCP:
|
||||
if c.Port == 0 {
|
||||
return fmt.Errorf("healthd: TCP check requires port")
|
||||
}
|
||||
case CheckExec:
|
||||
if c.Target == "" {
|
||||
return fmt.Errorf("healthd: exec check requires command")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("healthd: unknown check type %q", c.Type)
|
||||
}
|
||||
|
||||
if c.Interval <= 0 {
|
||||
c.Interval = 30 * time.Second
|
||||
}
|
||||
if c.Timeout <= 0 {
|
||||
c.Timeout = 5 * time.Second
|
||||
}
|
||||
if c.Retries <= 0 {
|
||||
c.Retries = 3
|
||||
}
|
||||
if c.RestartDelay <= 0 {
|
||||
c.RestartDelay = 10 * time.Second
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ── Health Status ────────────────────────────────────────────────────────────
|
||||
|
||||
// Status represents the current health state of a workload.
|
||||
type Status struct {
|
||||
Workload string `json:"workload" yaml:"workload"`
|
||||
Healthy bool `json:"healthy" yaml:"healthy"`
|
||||
LastCheck time.Time `json:"last_check" yaml:"last_check"`
|
||||
LastHealthy time.Time `json:"last_healthy,omitempty" yaml:"last_healthy,omitempty"`
|
||||
ConsecutiveFails int `json:"consecutive_fails" yaml:"consecutive_fails"`
|
||||
TotalChecks int64 `json:"total_checks" yaml:"total_checks"`
|
||||
TotalFails int64 `json:"total_fails" yaml:"total_fails"`
|
||||
RestartCount int `json:"restart_count" yaml:"restart_count"`
|
||||
LastError string `json:"last_error,omitempty" yaml:"last_error,omitempty"`
|
||||
LastRestart time.Time `json:"last_restart,omitempty" yaml:"last_restart,omitempty"`
|
||||
}
|
||||
|
||||
// ── IP Resolver ──────────────────────────────────────────────────────────────
|
||||
|
||||
// IPResolver maps a workload name to its IP address.
|
||||
type IPResolver func(workload string) (string, error)
|
||||
|
||||
// DefaultIPResolver tries to resolve via machinectl show.
|
||||
func DefaultIPResolver(workload string) (string, error) {
|
||||
out, err := exec.Command("machinectl", "show", workload, "-p", "Addresses").CombinedOutput()
|
||||
if err != nil {
|
||||
return "127.0.0.1", nil // Fallback to localhost
|
||||
}
|
||||
line := strings.TrimSpace(string(out))
|
||||
if strings.HasPrefix(line, "Addresses=") {
|
||||
addrs := strings.TrimPrefix(line, "Addresses=")
|
||||
// Take first address
|
||||
parts := strings.Fields(addrs)
|
||||
if len(parts) > 0 {
|
||||
return parts[0], nil
|
||||
}
|
||||
}
|
||||
return "127.0.0.1", nil
|
||||
}
|
||||
|
||||
// ── Restart Handler ──────────────────────────────────────────────────────────
|
||||
|
||||
// RestartFunc defines how to restart a workload.
|
||||
type RestartFunc func(workload string) error
|
||||
|
||||
// DefaultRestartFunc restarts via systemctl.
|
||||
func DefaultRestartFunc(workload string) error {
|
||||
unit := fmt.Sprintf("volt-container@%s.service", workload)
|
||||
return exec.Command("systemctl", "restart", unit).Run()
|
||||
}
|
||||
|
||||
// ── Event Handler ────────────────────────────────────────────────────────────
|
||||
|
||||
// EventType describes health daemon events.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventHealthy EventType = "healthy"
|
||||
EventUnhealthy EventType = "unhealthy"
|
||||
EventRestart EventType = "restart"
|
||||
EventCheckFail EventType = "check_fail"
|
||||
)
|
||||
|
||||
// Event is emitted when health state changes.
|
||||
type Event struct {
|
||||
Type EventType `json:"type"`
|
||||
Workload string `json:"workload"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// EventHandler is called when health events occur.
|
||||
type EventHandler func(event Event)
|
||||
|
||||
// ── Health Daemon ────────────────────────────────────────────────────────────
|
||||
|
||||
// Daemon manages continuous health monitoring for multiple workloads.
|
||||
type Daemon struct {
|
||||
configDir string
|
||||
statusDir string
|
||||
ipResolver IPResolver
|
||||
restartFunc RestartFunc
|
||||
eventHandler EventHandler
|
||||
|
||||
configs map[string]*Config
|
||||
statuses map[string]*Status
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewDaemon creates a health monitoring daemon.
|
||||
func NewDaemon(configDir, statusDir string) *Daemon {
|
||||
if configDir == "" {
|
||||
configDir = DefaultHealthDir
|
||||
}
|
||||
if statusDir == "" {
|
||||
statusDir = DefaultStatusDir
|
||||
}
|
||||
return &Daemon{
|
||||
configDir: configDir,
|
||||
statusDir: statusDir,
|
||||
ipResolver: DefaultIPResolver,
|
||||
restartFunc: DefaultRestartFunc,
|
||||
configs: make(map[string]*Config),
|
||||
statuses: make(map[string]*Status),
|
||||
}
|
||||
}
|
||||
|
||||
// SetIPResolver sets a custom IP resolver.
|
||||
func (d *Daemon) SetIPResolver(resolver IPResolver) {
|
||||
d.ipResolver = resolver
|
||||
}
|
||||
|
||||
// SetRestartFunc sets a custom restart function.
|
||||
func (d *Daemon) SetRestartFunc(fn RestartFunc) {
|
||||
d.restartFunc = fn
|
||||
}
|
||||
|
||||
// SetEventHandler sets the event callback.
|
||||
func (d *Daemon) SetEventHandler(handler EventHandler) {
|
||||
d.eventHandler = handler
|
||||
}
|
||||
|
||||
// LoadConfigs reads all health check configurations from disk.
|
||||
func (d *Daemon) LoadConfigs() error {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(d.configDir, "*.yaml"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("healthd: glob configs: %w", err)
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
data, err := os.ReadFile(f)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var cfg Config
|
||||
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := cfg.Validate(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "healthd: invalid config %s: %v\n", f, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if cfg.Enabled {
|
||||
d.configs[cfg.Workload] = &cfg
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start begins monitoring all configured workloads.
|
||||
func (d *Daemon) Start(ctx context.Context) error {
|
||||
if err := d.LoadConfigs(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, d.cancel = context.WithCancel(ctx)
|
||||
|
||||
d.mu.RLock()
|
||||
configs := make([]*Config, 0, len(d.configs))
|
||||
for _, cfg := range d.configs {
|
||||
configs = append(configs, cfg)
|
||||
}
|
||||
d.mu.RUnlock()
|
||||
|
||||
for _, cfg := range configs {
|
||||
d.wg.Add(1)
|
||||
go d.monitorLoop(ctx, cfg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully stops the health daemon.
|
||||
func (d *Daemon) Stop() {
|
||||
if d.cancel != nil {
|
||||
d.cancel()
|
||||
}
|
||||
d.wg.Wait()
|
||||
d.saveStatuses()
|
||||
}
|
||||
|
||||
// GetStatus returns the health status of a workload.
|
||||
func (d *Daemon) GetStatus(workload string) *Status {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
if s, ok := d.statuses[workload]; ok {
|
||||
cp := *s
|
||||
return &cp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAllStatuses returns health status of all monitored workloads.
|
||||
func (d *Daemon) GetAllStatuses() []Status {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
result := make([]Status, 0, len(d.statuses))
|
||||
for _, s := range d.statuses {
|
||||
result = append(result, *s)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ── Configuration Management (CLI) ──────────────────────────────────────────
|
||||
|
||||
// ConfigureCheck writes or updates a health check configuration.
|
||||
func ConfigureCheck(configDir string, cfg Config) error {
|
||||
if configDir == "" {
|
||||
configDir = DefaultHealthDir
|
||||
}
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(configDir, 0755); err != nil {
|
||||
return fmt.Errorf("healthd: create config dir: %w", err)
|
||||
}
|
||||
|
||||
data, err := yaml.Marshal(cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("healthd: marshal config: %w", err)
|
||||
}
|
||||
|
||||
path := filepath.Join(configDir, cfg.Workload+".yaml")
|
||||
return os.WriteFile(path, data, 0644)
|
||||
}
|
||||
|
||||
// RemoveCheck removes a health check configuration.
|
||||
func RemoveCheck(configDir string, workload string) error {
|
||||
if configDir == "" {
|
||||
configDir = DefaultHealthDir
|
||||
}
|
||||
path := filepath.Join(configDir, workload+".yaml")
|
||||
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("healthd: remove config: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListConfigs returns all configured health checks.
|
||||
func ListConfigs(configDir string) ([]Config, error) {
|
||||
if configDir == "" {
|
||||
configDir = DefaultHealthDir
|
||||
}
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(configDir, "*.yaml"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var configs []Config
|
||||
for _, f := range files {
|
||||
data, err := os.ReadFile(f)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var cfg Config
|
||||
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||
continue
|
||||
}
|
||||
configs = append(configs, cfg)
|
||||
}
|
||||
return configs, nil
|
||||
}
|
||||
|
||||
// LoadStatuses reads saved health statuses from disk.
|
||||
func LoadStatuses(statusDir string) ([]Status, error) {
|
||||
if statusDir == "" {
|
||||
statusDir = DefaultStatusDir
|
||||
}
|
||||
|
||||
path := filepath.Join(statusDir, "statuses.json")
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var statuses []Status
|
||||
if err := json.Unmarshal(data, &statuses); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
// ── Monitor Loop ─────────────────────────────────────────────────────────────
|
||||
|
||||
func (d *Daemon) monitorLoop(ctx context.Context, cfg *Config) {
|
||||
defer d.wg.Done()
|
||||
|
||||
// Initialize status
|
||||
d.mu.Lock()
|
||||
d.statuses[cfg.Workload] = &Status{
|
||||
Workload: cfg.Workload,
|
||||
Healthy: true, // Assume healthy until proven otherwise
|
||||
}
|
||||
d.mu.Unlock()
|
||||
|
||||
ticker := time.NewTicker(cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
d.runCheck(cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) runCheck(cfg *Config) {
|
||||
d.mu.Lock()
|
||||
status := d.statuses[cfg.Workload]
|
||||
d.mu.Unlock()
|
||||
|
||||
status.TotalChecks++
|
||||
status.LastCheck = time.Now()
|
||||
|
||||
var err error
|
||||
switch cfg.Type {
|
||||
case CheckHTTP:
|
||||
err = d.checkHTTP(cfg)
|
||||
case CheckTCP:
|
||||
err = d.checkTCP(cfg)
|
||||
case CheckExec:
|
||||
err = d.checkExec(cfg)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
status.TotalFails++
|
||||
status.ConsecutiveFails++
|
||||
status.LastError = err.Error()
|
||||
|
||||
d.emitEvent(Event{
|
||||
Type: EventCheckFail,
|
||||
Workload: cfg.Workload,
|
||||
Timestamp: time.Now(),
|
||||
Message: err.Error(),
|
||||
})
|
||||
|
||||
// Check if we've exceeded the failure threshold
|
||||
if status.ConsecutiveFails >= cfg.Retries {
|
||||
wasHealthy := status.Healthy
|
||||
status.Healthy = false
|
||||
|
||||
if wasHealthy {
|
||||
d.emitEvent(Event{
|
||||
Type: EventUnhealthy,
|
||||
Workload: cfg.Workload,
|
||||
Timestamp: time.Now(),
|
||||
Message: fmt.Sprintf("health check failed %d times: %s", status.ConsecutiveFails, err.Error()),
|
||||
})
|
||||
}
|
||||
|
||||
// Auto-restart if configured
|
||||
if cfg.AutoRestart {
|
||||
if cfg.MaxRestarts == 0 || status.RestartCount < cfg.MaxRestarts {
|
||||
d.handleRestart(cfg, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
wasUnhealthy := !status.Healthy
|
||||
status.Healthy = true
|
||||
status.ConsecutiveFails = 0
|
||||
status.LastHealthy = time.Now()
|
||||
status.LastError = ""
|
||||
|
||||
if wasUnhealthy {
|
||||
d.emitEvent(Event{
|
||||
Type: EventHealthy,
|
||||
Workload: cfg.Workload,
|
||||
Timestamp: time.Now(),
|
||||
Message: "health check recovered",
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) checkHTTP(cfg *Config) error {
|
||||
ip, err := d.ipResolver(cfg.Workload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve IP: %w", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://%s:%d%s", ip, cfg.Port, cfg.Target)
|
||||
client := &http.Client{Timeout: cfg.Timeout}
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("HTTP check failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
|
||||
return fmt.Errorf("HTTP %d from %s", resp.StatusCode, url)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) checkTCP(cfg *Config) error {
|
||||
ip, err := d.ipResolver(cfg.Workload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve IP: %w", err)
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", ip, cfg.Port)
|
||||
conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("TCP check failed: %w", err)
|
||||
}
|
||||
conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) checkExec(cfg *Config) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "sh", "-c", cfg.Target)
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("exec check failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) handleRestart(cfg *Config, status *Status) {
|
||||
// Respect restart delay
|
||||
if !status.LastRestart.IsZero() && time.Since(status.LastRestart) < cfg.RestartDelay {
|
||||
return
|
||||
}
|
||||
|
||||
d.emitEvent(Event{
|
||||
Type: EventRestart,
|
||||
Workload: cfg.Workload,
|
||||
Timestamp: time.Now(),
|
||||
Message: fmt.Sprintf("auto-restarting (attempt %d)", status.RestartCount+1),
|
||||
})
|
||||
|
||||
if err := d.restartFunc(cfg.Workload); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "healthd: restart %s failed: %v\n", cfg.Workload, err)
|
||||
return
|
||||
}
|
||||
|
||||
status.RestartCount++
|
||||
status.LastRestart = time.Now()
|
||||
status.ConsecutiveFails = 0 // Reset after restart, let it prove healthy
|
||||
}
|
||||
|
||||
func (d *Daemon) emitEvent(event Event) {
|
||||
if d.eventHandler != nil {
|
||||
d.eventHandler(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) saveStatuses() {
|
||||
d.mu.RLock()
|
||||
statuses := make([]Status, 0, len(d.statuses))
|
||||
for _, s := range d.statuses {
|
||||
statuses = append(statuses, *s)
|
||||
}
|
||||
d.mu.RUnlock()
|
||||
|
||||
os.MkdirAll(d.statusDir, 0755)
|
||||
data, err := json.MarshalIndent(statuses, "", " ")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
os.WriteFile(filepath.Join(d.statusDir, "statuses.json"), data, 0644)
|
||||
}
|
||||
Reference in New Issue
Block a user