Complete infrastructure platform CLI: - Container runtime (systemd-nspawn) - VoltVisor VMs (Neutron Stardust / QEMU) - Stellarium CAS (content-addressed storage) - ORAS Registry - GitOps integration - Landlock LSM security - Compose orchestration - Mesh networking Copyright (c) Armored Gates LLC. All rights reserved. Licensed under AGPSL v5.0
562 lines
17 KiB
Go
562 lines
17 KiB
Go
/*
|
|
Volt Cluster — Native control plane for multi-node orchestration.
|
|
|
|
Replaces the thin kubectl wrapper with a native clustering system built
|
|
specifically for Volt's workload model (containers, hybrid-native, VMs).
|
|
|
|
Architecture:
|
|
- Control plane: single leader node running volt-control daemon
|
|
- Workers: nodes that register via `volt cluster join`
|
|
- Communication: gRPC-over-mesh (WireGuard) or plain HTTPS
|
|
- State: JSON-based on-disk store (no etcd dependency)
|
|
- Health: heartbeat-based with configurable failure detection
|
|
|
|
The control plane is responsible for:
|
|
- Node registration and deregistration
|
|
- Health monitoring (heartbeat processing)
|
|
- Workload scheduling (resource-based, label selectors)
|
|
- Workload state sync across nodes
|
|
|
|
Copyright (c) Armored Gates LLC. All rights reserved.
|
|
AGPSL v5 — Source-available. Anti-competition clauses apply.
|
|
*/
|
|
package cluster
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ── Constants ────────────────────────────────────────────────────────────────
|
|
|
|
const (
|
|
DefaultHeartbeatInterval = 10 * time.Second
|
|
DefaultFailureThreshold = 3 // missed heartbeats before marking unhealthy
|
|
DefaultAPIPort = 9443
|
|
ClusterStateDir = "/var/lib/volt/cluster"
|
|
ClusterStateFile = "/var/lib/volt/cluster/state.json"
|
|
NodesStateFile = "/var/lib/volt/cluster/nodes.json"
|
|
ScheduleStateFile = "/var/lib/volt/cluster/schedule.json"
|
|
)
|
|
|
|
// ── Node ─────────────────────────────────────────────────────────────────────
|
|
|
|
// NodeStatus represents the health state of a cluster node.
|
|
type NodeStatus string
|
|
|
|
const (
|
|
NodeStatusReady NodeStatus = "ready"
|
|
NodeStatusNotReady NodeStatus = "not-ready"
|
|
NodeStatusJoining NodeStatus = "joining"
|
|
NodeStatusDraining NodeStatus = "draining"
|
|
NodeStatusRemoved NodeStatus = "removed"
|
|
)
|
|
|
|
// NodeResources describes the capacity and usage of a node.
|
|
type NodeResources struct {
|
|
CPUCores int `json:"cpu_cores"`
|
|
MemoryTotalMB int64 `json:"memory_total_mb"`
|
|
MemoryUsedMB int64 `json:"memory_used_mb"`
|
|
DiskTotalGB int64 `json:"disk_total_gb"`
|
|
DiskUsedGB int64 `json:"disk_used_gb"`
|
|
ContainerCount int `json:"container_count"`
|
|
WorkloadCount int `json:"workload_count"`
|
|
}
|
|
|
|
// NodeInfo represents a registered cluster node.
|
|
type NodeInfo struct {
|
|
NodeID string `json:"node_id"`
|
|
Name string `json:"name"`
|
|
MeshIP string `json:"mesh_ip"`
|
|
PublicIP string `json:"public_ip,omitempty"`
|
|
Status NodeStatus `json:"status"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
Resources NodeResources `json:"resources"`
|
|
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
JoinedAt time.Time `json:"joined_at"`
|
|
MissedBeats int `json:"missed_beats"`
|
|
VoltVersion string `json:"volt_version,omitempty"`
|
|
KernelVersion string `json:"kernel_version,omitempty"`
|
|
OS string `json:"os,omitempty"`
|
|
Region string `json:"region,omitempty"`
|
|
}
|
|
|
|
// IsHealthy returns true if the node is responding to heartbeats.
|
|
func (n *NodeInfo) IsHealthy() bool {
|
|
return n.Status == NodeStatusReady && n.MissedBeats < DefaultFailureThreshold
|
|
}
|
|
|
|
// ── Cluster State ────────────────────────────────────────────────────────────
|
|
|
|
// ClusterRole indicates this node's role in the cluster.
|
|
type ClusterRole string
|
|
|
|
const (
|
|
RoleControl ClusterRole = "control"
|
|
RoleWorker ClusterRole = "worker"
|
|
RoleNone ClusterRole = "none"
|
|
)
|
|
|
|
// ClusterState is the persistent on-disk cluster membership state for this node.
|
|
type ClusterState struct {
|
|
ClusterID string `json:"cluster_id"`
|
|
Role ClusterRole `json:"role"`
|
|
NodeID string `json:"node_id"`
|
|
NodeName string `json:"node_name"`
|
|
ControlURL string `json:"control_url"`
|
|
APIPort int `json:"api_port"`
|
|
JoinedAt time.Time `json:"joined_at"`
|
|
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
|
|
}
|
|
|
|
// ── Scheduled Workload ───────────────────────────────────────────────────────
|
|
|
|
// ScheduledWorkload represents a workload assigned to a node by the scheduler.
|
|
type ScheduledWorkload struct {
|
|
WorkloadID string `json:"workload_id"`
|
|
NodeID string `json:"node_id"`
|
|
NodeName string `json:"node_name"`
|
|
Mode string `json:"mode"` // container, hybrid-native, etc.
|
|
ManifestPath string `json:"manifest_path,omitempty"`
|
|
Labels map[string]string `json:"labels,omitempty"`
|
|
Resources WorkloadResources `json:"resources"`
|
|
Status string `json:"status"` // pending, running, stopped, failed
|
|
ScheduledAt time.Time `json:"scheduled_at"`
|
|
}
|
|
|
|
// WorkloadResources describes the resource requirements for a workload.
|
|
type WorkloadResources struct {
|
|
CPUCores int `json:"cpu_cores"`
|
|
MemoryMB int64 `json:"memory_mb"`
|
|
DiskMB int64 `json:"disk_mb,omitempty"`
|
|
}
|
|
|
|
// ── Control Plane ────────────────────────────────────────────────────────────
|
|
|
|
// ControlPlane manages cluster state, node registration, and scheduling.
|
|
type ControlPlane struct {
|
|
state *ClusterState
|
|
nodes map[string]*NodeInfo
|
|
schedule []*ScheduledWorkload
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewControlPlane creates or loads a control plane instance.
|
|
func NewControlPlane() *ControlPlane {
|
|
cp := &ControlPlane{
|
|
nodes: make(map[string]*NodeInfo),
|
|
}
|
|
cp.loadState()
|
|
cp.loadNodes()
|
|
cp.loadSchedule()
|
|
return cp
|
|
}
|
|
|
|
// IsInitialized returns true if the cluster has been initialized.
|
|
func (cp *ControlPlane) IsInitialized() bool {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
return cp.state != nil && cp.state.ClusterID != ""
|
|
}
|
|
|
|
// State returns a copy of the cluster state.
|
|
func (cp *ControlPlane) State() *ClusterState {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
if cp.state == nil {
|
|
return nil
|
|
}
|
|
copy := *cp.state
|
|
return ©
|
|
}
|
|
|
|
// Role returns this node's cluster role.
|
|
func (cp *ControlPlane) Role() ClusterRole {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
if cp.state == nil {
|
|
return RoleNone
|
|
}
|
|
return cp.state.Role
|
|
}
|
|
|
|
// Nodes returns all registered nodes.
|
|
func (cp *ControlPlane) Nodes() []*NodeInfo {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
result := make([]*NodeInfo, 0, len(cp.nodes))
|
|
for _, n := range cp.nodes {
|
|
copy := *n
|
|
result = append(result, ©)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetNode returns a node by ID or name.
|
|
func (cp *ControlPlane) GetNode(idOrName string) *NodeInfo {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
if n, ok := cp.nodes[idOrName]; ok {
|
|
copy := *n
|
|
return ©
|
|
}
|
|
// Try by name
|
|
for _, n := range cp.nodes {
|
|
if n.Name == idOrName {
|
|
copy := *n
|
|
return ©
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Schedule returns the current workload schedule.
|
|
func (cp *ControlPlane) Schedule() []*ScheduledWorkload {
|
|
cp.mu.RLock()
|
|
defer cp.mu.RUnlock()
|
|
result := make([]*ScheduledWorkload, len(cp.schedule))
|
|
for i, sw := range cp.schedule {
|
|
copy := *sw
|
|
result[i] = ©
|
|
}
|
|
return result
|
|
}
|
|
|
|
// ── Init ─────────────────────────────────────────────────────────────────────
|
|
|
|
// InitCluster initializes this node as the cluster control plane.
|
|
func (cp *ControlPlane) InitCluster(clusterID, nodeName, meshIP string, apiPort int) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if cp.state != nil && cp.state.ClusterID != "" {
|
|
return fmt.Errorf("already part of cluster %q", cp.state.ClusterID)
|
|
}
|
|
|
|
if apiPort == 0 {
|
|
apiPort = DefaultAPIPort
|
|
}
|
|
|
|
cp.state = &ClusterState{
|
|
ClusterID: clusterID,
|
|
Role: RoleControl,
|
|
NodeID: clusterID + "-control",
|
|
NodeName: nodeName,
|
|
ControlURL: fmt.Sprintf("https://%s:%d", meshIP, apiPort),
|
|
APIPort: apiPort,
|
|
JoinedAt: time.Now().UTC(),
|
|
HeartbeatInterval: DefaultHeartbeatInterval,
|
|
}
|
|
|
|
// Register self as a node
|
|
cp.nodes[cp.state.NodeID] = &NodeInfo{
|
|
NodeID: cp.state.NodeID,
|
|
Name: nodeName,
|
|
MeshIP: meshIP,
|
|
Status: NodeStatusReady,
|
|
Labels: map[string]string{"role": "control"},
|
|
LastHeartbeat: time.Now().UTC(),
|
|
JoinedAt: time.Now().UTC(),
|
|
}
|
|
|
|
if err := cp.saveState(); err != nil {
|
|
return err
|
|
}
|
|
return cp.saveNodes()
|
|
}
|
|
|
|
// ── Join ─────────────────────────────────────────────────────────────────────
|
|
|
|
// JoinCluster registers this node as a worker in an existing cluster.
|
|
func (cp *ControlPlane) JoinCluster(clusterID, controlURL, nodeID, nodeName, meshIP string) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if cp.state != nil && cp.state.ClusterID != "" {
|
|
return fmt.Errorf("already part of cluster %q — run 'volt cluster leave' first", cp.state.ClusterID)
|
|
}
|
|
|
|
cp.state = &ClusterState{
|
|
ClusterID: clusterID,
|
|
Role: RoleWorker,
|
|
NodeID: nodeID,
|
|
NodeName: nodeName,
|
|
ControlURL: controlURL,
|
|
JoinedAt: time.Now().UTC(),
|
|
HeartbeatInterval: DefaultHeartbeatInterval,
|
|
}
|
|
|
|
return cp.saveState()
|
|
}
|
|
|
|
// ── Node Registration ────────────────────────────────────────────────────────
|
|
|
|
// RegisterNode adds a new worker node to the cluster (control plane only).
|
|
func (cp *ControlPlane) RegisterNode(node *NodeInfo) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if cp.state == nil || cp.state.Role != RoleControl {
|
|
return fmt.Errorf("not the control plane — cannot register nodes")
|
|
}
|
|
|
|
node.Status = NodeStatusReady
|
|
node.JoinedAt = time.Now().UTC()
|
|
node.LastHeartbeat = time.Now().UTC()
|
|
cp.nodes[node.NodeID] = node
|
|
|
|
return cp.saveNodes()
|
|
}
|
|
|
|
// DeregisterNode removes a node from the cluster.
|
|
func (cp *ControlPlane) DeregisterNode(nodeID string) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if _, exists := cp.nodes[nodeID]; !exists {
|
|
return fmt.Errorf("node %q not found", nodeID)
|
|
}
|
|
|
|
delete(cp.nodes, nodeID)
|
|
return cp.saveNodes()
|
|
}
|
|
|
|
// ── Heartbeat ────────────────────────────────────────────────────────────────
|
|
|
|
// ProcessHeartbeat updates a node's health status.
|
|
func (cp *ControlPlane) ProcessHeartbeat(nodeID string, resources NodeResources) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
node, exists := cp.nodes[nodeID]
|
|
if !exists {
|
|
return fmt.Errorf("node %q not registered", nodeID)
|
|
}
|
|
|
|
node.LastHeartbeat = time.Now().UTC()
|
|
node.MissedBeats = 0
|
|
node.Resources = resources
|
|
if node.Status == NodeStatusNotReady {
|
|
node.Status = NodeStatusReady
|
|
}
|
|
|
|
return cp.saveNodes()
|
|
}
|
|
|
|
// CheckHealth evaluates all nodes and marks those with missed heartbeats.
|
|
func (cp *ControlPlane) CheckHealth() []string {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
var unhealthy []string
|
|
threshold := time.Duration(DefaultFailureThreshold) * DefaultHeartbeatInterval
|
|
|
|
for _, node := range cp.nodes {
|
|
if node.Status == NodeStatusRemoved || node.Status == NodeStatusDraining {
|
|
continue
|
|
}
|
|
if time.Since(node.LastHeartbeat) > threshold {
|
|
node.MissedBeats++
|
|
if node.MissedBeats >= DefaultFailureThreshold {
|
|
node.Status = NodeStatusNotReady
|
|
unhealthy = append(unhealthy, node.NodeID)
|
|
}
|
|
}
|
|
}
|
|
|
|
cp.saveNodes()
|
|
return unhealthy
|
|
}
|
|
|
|
// ── Drain ────────────────────────────────────────────────────────────────────
|
|
|
|
// DrainNode marks a node for draining (no new workloads, existing ones rescheduled).
|
|
func (cp *ControlPlane) DrainNode(nodeID string) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
node, exists := cp.nodes[nodeID]
|
|
if !exists {
|
|
return fmt.Errorf("node %q not found", nodeID)
|
|
}
|
|
|
|
node.Status = NodeStatusDraining
|
|
|
|
// Find workloads on this node and mark for rescheduling
|
|
for _, sw := range cp.schedule {
|
|
if sw.NodeID == nodeID && sw.Status == "running" {
|
|
sw.Status = "pending" // will be rescheduled
|
|
sw.NodeID = ""
|
|
sw.NodeName = ""
|
|
}
|
|
}
|
|
|
|
cp.saveNodes()
|
|
return cp.saveSchedule()
|
|
}
|
|
|
|
// ── Leave ────────────────────────────────────────────────────────────────────
|
|
|
|
// LeaveCluster removes this node from the cluster.
|
|
func (cp *ControlPlane) LeaveCluster() error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if cp.state == nil {
|
|
return fmt.Errorf("not part of any cluster")
|
|
}
|
|
|
|
// If control plane, clean up
|
|
if cp.state.Role == RoleControl {
|
|
cp.nodes = make(map[string]*NodeInfo)
|
|
cp.schedule = nil
|
|
os.Remove(NodesStateFile)
|
|
os.Remove(ScheduleStateFile)
|
|
}
|
|
|
|
cp.state = nil
|
|
os.Remove(ClusterStateFile)
|
|
return nil
|
|
}
|
|
|
|
// ── Scheduling ───────────────────────────────────────────────────────────────
|
|
|
|
// ScheduleWorkload assigns a workload to a node based on resource availability
|
|
// and label selectors.
|
|
func (cp *ControlPlane) ScheduleWorkload(workload *ScheduledWorkload, nodeSelector map[string]string) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if cp.state == nil || cp.state.Role != RoleControl {
|
|
return fmt.Errorf("not the control plane — cannot schedule workloads")
|
|
}
|
|
|
|
// Find best node
|
|
bestNode := cp.findBestNode(workload.Resources, nodeSelector)
|
|
if bestNode == nil {
|
|
return fmt.Errorf("no suitable node found for workload %q (required: %dMB RAM, %d CPU cores)",
|
|
workload.WorkloadID, workload.Resources.MemoryMB, workload.Resources.CPUCores)
|
|
}
|
|
|
|
workload.NodeID = bestNode.NodeID
|
|
workload.NodeName = bestNode.Name
|
|
workload.Status = "pending"
|
|
workload.ScheduledAt = time.Now().UTC()
|
|
|
|
cp.schedule = append(cp.schedule, workload)
|
|
|
|
return cp.saveSchedule()
|
|
}
|
|
|
|
// findBestNode selects the best available node for a workload based on
|
|
// resource availability and label matching. Uses a simple "least loaded" strategy.
|
|
func (cp *ControlPlane) findBestNode(required WorkloadResources, selector map[string]string) *NodeInfo {
|
|
var best *NodeInfo
|
|
var bestScore int64 = -1
|
|
|
|
for _, node := range cp.nodes {
|
|
// Skip unhealthy/draining nodes
|
|
if node.Status != NodeStatusReady {
|
|
continue
|
|
}
|
|
|
|
// Check label selector
|
|
if !matchLabels(node.Labels, selector) {
|
|
continue
|
|
}
|
|
|
|
// Check resource availability
|
|
availMem := node.Resources.MemoryTotalMB - node.Resources.MemoryUsedMB
|
|
if required.MemoryMB > 0 && availMem < required.MemoryMB {
|
|
continue
|
|
}
|
|
|
|
// Score: prefer nodes with more available resources (simple bin-packing)
|
|
score := availMem
|
|
if best == nil || score > bestScore {
|
|
best = node
|
|
bestScore = score
|
|
}
|
|
}
|
|
|
|
return best
|
|
}
|
|
|
|
// matchLabels checks if a node's labels satisfy a selector.
|
|
func matchLabels(nodeLabels, selector map[string]string) bool {
|
|
for k, v := range selector {
|
|
if nodeLabels[k] != v {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ── Persistence ──────────────────────────────────────────────────────────────
|
|
|
|
func (cp *ControlPlane) loadState() {
|
|
data, err := os.ReadFile(ClusterStateFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var state ClusterState
|
|
if err := json.Unmarshal(data, &state); err != nil {
|
|
return
|
|
}
|
|
cp.state = &state
|
|
}
|
|
|
|
func (cp *ControlPlane) saveState() error {
|
|
os.MkdirAll(ClusterStateDir, 0755)
|
|
data, err := json.MarshalIndent(cp.state, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(ClusterStateFile, data, 0644)
|
|
}
|
|
|
|
func (cp *ControlPlane) loadNodes() {
|
|
data, err := os.ReadFile(NodesStateFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var nodes map[string]*NodeInfo
|
|
if err := json.Unmarshal(data, &nodes); err != nil {
|
|
return
|
|
}
|
|
cp.nodes = nodes
|
|
}
|
|
|
|
func (cp *ControlPlane) saveNodes() error {
|
|
os.MkdirAll(ClusterStateDir, 0755)
|
|
data, err := json.MarshalIndent(cp.nodes, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(NodesStateFile, data, 0644)
|
|
}
|
|
|
|
func (cp *ControlPlane) loadSchedule() {
|
|
data, err := os.ReadFile(ScheduleStateFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var schedule []*ScheduledWorkload
|
|
if err := json.Unmarshal(data, &schedule); err != nil {
|
|
return
|
|
}
|
|
cp.schedule = schedule
|
|
}
|
|
|
|
func (cp *ControlPlane) saveSchedule() error {
|
|
os.MkdirAll(ClusterStateDir, 0755)
|
|
data, err := json.MarshalIndent(cp.schedule, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(ScheduleStateFile, data, 0644)
|
|
}
|