Files
volt/pkg/cas/distributed.go
Karl Clinger 81ad0b597c Volt CLI: source-available under AGPSL v5.0
Complete infrastructure platform CLI:
- Container runtime (systemd-nspawn)
- VoltVisor VMs (Neutron Stardust / QEMU)
- Stellarium CAS (content-addressed storage)
- ORAS Registry
- GitOps integration
- Landlock LSM security
- Compose orchestration
- Mesh networking

Copyright (c) Armored Gates LLC. All rights reserved.
Licensed under AGPSL v5.0
2026-03-21 00:31:12 -05:00

614 lines
17 KiB
Go

/*
Distributed CAS — Cross-node blob exchange and manifest synchronization.
Extends the single-node CAS store with cluster-aware operations:
- Peer discovery (static config or mDNS)
- HTTP API for blob get/head and manifest list/push
- Pull-through cache: local CAS → peers → CDN fallback
- Manifest registry: cluster-wide awareness of available manifests
Each node in a Volt cluster runs a lightweight HTTP server that exposes
its local CAS store to peers. When a node needs a blob, it checks peers
before falling back to the CDN, saving bandwidth and latency.
Architecture:
┌─────────┐ HTTP ┌─────────┐
│ Node A │◄───────────▶│ Node B │
│ CAS │ │ CAS │
└────┬─────┘ └────┬─────┘
│ │
└──── CDN fallback ──────┘
Feature gate: "cas-distributed" (Pro tier)
Copyright (c) Armored Gates LLC. All rights reserved.
*/
package cas
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/armoredgate/volt/pkg/cdn"
"github.com/armoredgate/volt/pkg/storage"
)
// ── Configuration ────────────────────────────────────────────────────────────
const (
// DefaultPort is the default port for the distributed CAS HTTP API.
DefaultPort = 7420
// DefaultTimeout is the timeout for peer requests.
DefaultTimeout = 10 * time.Second
)
// ClusterConfig holds the configuration for distributed CAS operations.
type ClusterConfig struct {
// NodeID identifies this node in the cluster.
NodeID string `yaml:"node_id" json:"node_id"`
// ListenAddr is the address to listen on (e.g., ":7420" or "0.0.0.0:7420").
ListenAddr string `yaml:"listen_addr" json:"listen_addr"`
// Peers is the list of known peer addresses (e.g., ["192.168.1.10:7420"]).
Peers []string `yaml:"peers" json:"peers"`
// AdvertiseAddr is the address this node advertises to peers.
// If empty, auto-detected from the first non-loopback interface.
AdvertiseAddr string `yaml:"advertise_addr" json:"advertise_addr"`
// PeerTimeout is the timeout for peer requests.
PeerTimeout time.Duration `yaml:"peer_timeout" json:"peer_timeout"`
// EnableCDNFallback controls whether to fall back to CDN when peers
// don't have a blob. Default: true.
EnableCDNFallback bool `yaml:"enable_cdn_fallback" json:"enable_cdn_fallback"`
}
// DefaultConfig returns a ClusterConfig with sensible defaults.
func DefaultConfig() ClusterConfig {
hostname, _ := os.Hostname()
return ClusterConfig{
NodeID: hostname,
ListenAddr: fmt.Sprintf(":%d", DefaultPort),
PeerTimeout: DefaultTimeout,
EnableCDNFallback: true,
}
}
// ── Distributed CAS ──────────────────────────────────────────────────────────
// DistributedCAS wraps a local CASStore with cluster-aware operations.
type DistributedCAS struct {
local *storage.CASStore
config ClusterConfig
cdnClient *cdn.Client
httpClient *http.Client
server *http.Server
// peerHealth tracks which peers are currently reachable.
peerHealth map[string]bool
mu sync.RWMutex
}
// New creates a DistributedCAS instance.
func New(cas *storage.CASStore, cfg ClusterConfig) *DistributedCAS {
if cfg.PeerTimeout <= 0 {
cfg.PeerTimeout = DefaultTimeout
}
return &DistributedCAS{
local: cas,
config: cfg,
httpClient: &http.Client{
Timeout: cfg.PeerTimeout,
},
peerHealth: make(map[string]bool),
}
}
// NewWithCDN creates a DistributedCAS with CDN fallback support.
func NewWithCDN(cas *storage.CASStore, cfg ClusterConfig, cdnClient *cdn.Client) *DistributedCAS {
d := New(cas, cfg)
d.cdnClient = cdnClient
return d
}
// ── Blob Operations (Pull-Through) ───────────────────────────────────────────
// GetBlob retrieves a blob using the pull-through strategy:
// 1. Check local CAS
// 2. Check peers
// 3. Fall back to CDN
//
// If the blob is found on a peer or CDN, it is stored in the local CAS
// for future requests (pull-through caching).
func (d *DistributedCAS) GetBlob(digest string) (io.ReadCloser, error) {
// 1. Check local CAS.
if d.local.Exists(digest) {
return d.local.Get(digest)
}
// 2. Check peers.
data, peerAddr, err := d.getFromPeers(digest)
if err == nil {
// Store locally for future requests.
if _, _, putErr := d.local.Put(strings.NewReader(string(data))); putErr != nil {
// Non-fatal: blob still usable from memory.
fmt.Fprintf(os.Stderr, "distributed-cas: warning: failed to cache blob from peer %s: %v\n", peerAddr, putErr)
}
return io.NopCloser(strings.NewReader(string(data))), nil
}
// 3. CDN fallback.
if d.config.EnableCDNFallback && d.cdnClient != nil {
data, err := d.cdnClient.PullBlob(digest)
if err != nil {
return nil, fmt.Errorf("distributed-cas: blob %s not found (checked local, %d peers, CDN): %w",
digest[:12], len(d.config.Peers), err)
}
// Cache locally.
d.local.Put(strings.NewReader(string(data))) //nolint:errcheck
return io.NopCloser(strings.NewReader(string(data))), nil
}
return nil, fmt.Errorf("distributed-cas: blob %s not found (checked local and %d peers)",
digest[:12], len(d.config.Peers))
}
// BlobExists checks if a blob exists anywhere in the cluster.
func (d *DistributedCAS) BlobExists(digest string) (bool, string) {
// Check local.
if d.local.Exists(digest) {
return true, "local"
}
// Check peers.
for _, peer := range d.config.Peers {
url := fmt.Sprintf("http://%s/v1/blobs/%s", peer, digest)
req, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil {
continue
}
resp, err := d.httpClient.Do(req)
if err != nil {
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true, peer
}
}
return false, ""
}
// getFromPeers tries to download a blob from any reachable peer.
func (d *DistributedCAS) getFromPeers(digest string) ([]byte, string, error) {
for _, peer := range d.config.Peers {
d.mu.RLock()
healthy := d.peerHealth[peer]
d.mu.RUnlock()
// Skip peers known to be unhealthy (but still try if health is unknown).
if d.peerHealth[peer] == false && healthy {
continue
}
url := fmt.Sprintf("http://%s/v1/blobs/%s", peer, digest)
resp, err := d.httpClient.Get(url)
if err != nil {
d.markPeerUnhealthy(peer)
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
continue // Peer doesn't have this blob.
}
if resp.StatusCode != http.StatusOK {
continue
}
data, err := io.ReadAll(resp.Body)
if err != nil {
continue
}
d.markPeerHealthy(peer)
return data, peer, nil
}
return nil, "", fmt.Errorf("no peer has blob %s", digest[:12])
}
// ── Manifest Operations ──────────────────────────────────────────────────────
// ManifestInfo describes a manifest available on a node.
type ManifestInfo struct {
Name string `json:"name"`
RefFile string `json:"ref_file"`
BlobCount int `json:"blob_count"`
NodeID string `json:"node_id"`
}
// ListClusterManifests aggregates manifest lists from all peers and local.
func (d *DistributedCAS) ListClusterManifests() ([]ManifestInfo, error) {
var all []ManifestInfo
// Local manifests.
localManifests, err := d.listLocalManifests()
if err != nil {
return nil, err
}
all = append(all, localManifests...)
// Peer manifests.
for _, peer := range d.config.Peers {
url := fmt.Sprintf("http://%s/v1/manifests", peer)
resp, err := d.httpClient.Get(url)
if err != nil {
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
continue
}
var peerManifests []ManifestInfo
if err := json.NewDecoder(resp.Body).Decode(&peerManifests); err != nil {
continue
}
all = append(all, peerManifests...)
}
return all, nil
}
func (d *DistributedCAS) listLocalManifests() ([]ManifestInfo, error) {
refsDir := filepath.Join(d.local.BaseDir(), "refs")
entries, err := os.ReadDir(refsDir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var manifests []ManifestInfo
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
bm, err := d.local.LoadManifest(entry.Name())
if err != nil {
continue
}
manifests = append(manifests, ManifestInfo{
Name: bm.Name,
RefFile: entry.Name(),
BlobCount: len(bm.Objects),
NodeID: d.config.NodeID,
})
}
return manifests, nil
}
// SyncManifest pulls a manifest and all its blobs from a peer.
func (d *DistributedCAS) SyncManifest(peerAddr, refFile string) error {
// Download the manifest.
url := fmt.Sprintf("http://%s/v1/manifests/%s", peerAddr, refFile)
resp, err := d.httpClient.Get(url)
if err != nil {
return fmt.Errorf("sync manifest: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync manifest: peer returned HTTP %d", resp.StatusCode)
}
var bm storage.BlobManifest
if err := json.NewDecoder(resp.Body).Decode(&bm); err != nil {
return fmt.Errorf("sync manifest: decode: %w", err)
}
// Pull missing blobs.
missing := 0
for _, digest := range bm.Objects {
if d.local.Exists(digest) {
continue
}
missing++
if _, err := d.GetBlob(digest); err != nil {
return fmt.Errorf("sync manifest: pull blob %s: %w", digest[:12], err)
}
}
// Save manifest locally.
if _, err := d.local.SaveManifest(&bm); err != nil {
return fmt.Errorf("sync manifest: save: %w", err)
}
return nil
}
// ── HTTP Server ──────────────────────────────────────────────────────────────
// StartServer starts the HTTP API server for peer communication.
func (d *DistributedCAS) StartServer(ctx context.Context) error {
mux := http.NewServeMux()
// Blob endpoints.
mux.HandleFunc("/v1/blobs/", d.handleBlob)
// Manifest endpoints.
mux.HandleFunc("/v1/manifests", d.handleManifestList)
mux.HandleFunc("/v1/manifests/", d.handleManifestGet)
// Health endpoint.
mux.HandleFunc("/v1/health", d.handleHealth)
// Peer info.
mux.HandleFunc("/v1/info", d.handleInfo)
d.server = &http.Server{
Addr: d.config.ListenAddr,
Handler: mux,
}
// Start health checker.
go d.healthCheckLoop(ctx)
// Start server.
ln, err := net.Listen("tcp", d.config.ListenAddr)
if err != nil {
return fmt.Errorf("distributed-cas: listen %s: %w", d.config.ListenAddr, err)
}
go func() {
<-ctx.Done()
d.server.Shutdown(context.Background()) //nolint:errcheck
}()
return d.server.Serve(ln)
}
// ── HTTP Handlers ────────────────────────────────────────────────────────────
func (d *DistributedCAS) handleBlob(w http.ResponseWriter, r *http.Request) {
// Extract digest from path: /v1/blobs/{digest}
parts := strings.Split(r.URL.Path, "/")
if len(parts) < 4 {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
digest := parts[3]
switch r.Method {
case http.MethodHead:
if d.local.Exists(digest) {
blobPath := d.local.GetPath(digest)
info, _ := os.Stat(blobPath)
if info != nil {
w.Header().Set("Content-Length", fmt.Sprintf("%d", info.Size()))
}
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
case http.MethodGet:
reader, err := d.local.Get(digest)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
defer reader.Close()
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Volt-Node", d.config.NodeID)
io.Copy(w, reader) //nolint:errcheck
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (d *DistributedCAS) handleManifestList(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
manifests, err := d.listLocalManifests()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(manifests) //nolint:errcheck
}
func (d *DistributedCAS) handleManifestGet(w http.ResponseWriter, r *http.Request) {
// Extract ref file from path: /v1/manifests/{ref-file}
parts := strings.Split(r.URL.Path, "/")
if len(parts) < 4 {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
refFile := parts[3]
bm, err := d.local.LoadManifest(refFile)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Volt-Node", d.config.NodeID)
json.NewEncoder(w).Encode(bm) //nolint:errcheck
}
func (d *DistributedCAS) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"node_id": d.config.NodeID,
"time": time.Now().UTC().Format(time.RFC3339),
}) //nolint:errcheck
}
func (d *DistributedCAS) handleInfo(w http.ResponseWriter, r *http.Request) {
info := map[string]interface{}{
"node_id": d.config.NodeID,
"listen_addr": d.config.ListenAddr,
"peers": d.config.Peers,
"cas_base": d.local.BaseDir(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info) //nolint:errcheck
}
// ── Health Checking ──────────────────────────────────────────────────────────
func (d *DistributedCAS) healthCheckLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Initial check.
d.checkPeerHealth()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
d.checkPeerHealth()
}
}
}
func (d *DistributedCAS) checkPeerHealth() {
for _, peer := range d.config.Peers {
url := fmt.Sprintf("http://%s/v1/health", peer)
resp, err := d.httpClient.Get(url)
if err != nil {
d.markPeerUnhealthy(peer)
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
d.markPeerHealthy(peer)
} else {
d.markPeerUnhealthy(peer)
}
}
}
func (d *DistributedCAS) markPeerHealthy(peer string) {
d.mu.Lock()
defer d.mu.Unlock()
d.peerHealth[peer] = true
}
func (d *DistributedCAS) markPeerUnhealthy(peer string) {
d.mu.Lock()
defer d.mu.Unlock()
d.peerHealth[peer] = false
}
// ── Peer Status ──────────────────────────────────────────────────────────────
// PeerStatus describes the current state of a peer node.
type PeerStatus struct {
Address string `json:"address"`
NodeID string `json:"node_id,omitempty"`
Healthy bool `json:"healthy"`
Latency time.Duration `json:"latency,omitempty"`
}
// PeerStatuses returns the health status of all configured peers.
func (d *DistributedCAS) PeerStatuses() []PeerStatus {
var statuses []PeerStatus
for _, peer := range d.config.Peers {
ps := PeerStatus{Address: peer}
start := time.Now()
url := fmt.Sprintf("http://%s/v1/health", peer)
resp, err := d.httpClient.Get(url)
if err != nil {
ps.Healthy = false
} else {
ps.Latency = time.Since(start)
ps.Healthy = resp.StatusCode == http.StatusOK
// Try to extract node ID from health response.
var healthResp map[string]interface{}
if json.NewDecoder(resp.Body).Decode(&healthResp) == nil {
if nodeID, ok := healthResp["node_id"].(string); ok {
ps.NodeID = nodeID
}
}
resp.Body.Close()
}
statuses = append(statuses, ps)
}
return statuses
}
// ── Cluster Stats ────────────────────────────────────────────────────────────
// ClusterStats provides aggregate statistics across the cluster.
type ClusterStats struct {
TotalNodes int `json:"total_nodes"`
HealthyNodes int `json:"healthy_nodes"`
TotalManifests int `json:"total_manifests"`
UniqueManifests int `json:"unique_manifests"`
}
// Stats returns aggregate cluster statistics.
func (d *DistributedCAS) Stats() ClusterStats {
stats := ClusterStats{
TotalNodes: 1 + len(d.config.Peers), // self + peers
}
// Count healthy peers.
stats.HealthyNodes = 1 // self is always healthy
d.mu.RLock()
for _, healthy := range d.peerHealth {
if healthy {
stats.HealthyNodes++
}
}
d.mu.RUnlock()
// Count manifests.
manifests, _ := d.ListClusterManifests()
stats.TotalManifests = len(manifests)
seen := make(map[string]bool)
for _, m := range manifests {
seen[m.Name] = true
}
stats.UniqueManifests = len(seen)
return stats
}