/* Distributed CAS — Cross-node blob exchange and manifest synchronization. Extends the single-node CAS store with cluster-aware operations: - Peer discovery (static config or mDNS) - HTTP API for blob get/head and manifest list/push - Pull-through cache: local CAS → peers → CDN fallback - Manifest registry: cluster-wide awareness of available manifests Each node in a Volt cluster runs a lightweight HTTP server that exposes its local CAS store to peers. When a node needs a blob, it checks peers before falling back to the CDN, saving bandwidth and latency. Architecture: ┌─────────┐ HTTP ┌─────────┐ │ Node A │◄───────────▶│ Node B │ │ CAS │ │ CAS │ └────┬─────┘ └────┬─────┘ │ │ └──── CDN fallback ──────┘ Feature gate: "cas-distributed" (Pro tier) Copyright (c) Armored Gates LLC. All rights reserved. */ package cas import ( "context" "encoding/json" "fmt" "io" "net" "net/http" "os" "path/filepath" "strings" "sync" "time" "github.com/armoredgate/volt/pkg/cdn" "github.com/armoredgate/volt/pkg/storage" ) // ── Configuration ──────────────────────────────────────────────────────────── const ( // DefaultPort is the default port for the distributed CAS HTTP API. DefaultPort = 7420 // DefaultTimeout is the timeout for peer requests. DefaultTimeout = 10 * time.Second ) // ClusterConfig holds the configuration for distributed CAS operations. type ClusterConfig struct { // NodeID identifies this node in the cluster. NodeID string `yaml:"node_id" json:"node_id"` // ListenAddr is the address to listen on (e.g., ":7420" or "0.0.0.0:7420"). ListenAddr string `yaml:"listen_addr" json:"listen_addr"` // Peers is the list of known peer addresses (e.g., ["192.168.1.10:7420"]). Peers []string `yaml:"peers" json:"peers"` // AdvertiseAddr is the address this node advertises to peers. // If empty, auto-detected from the first non-loopback interface. AdvertiseAddr string `yaml:"advertise_addr" json:"advertise_addr"` // PeerTimeout is the timeout for peer requests. PeerTimeout time.Duration `yaml:"peer_timeout" json:"peer_timeout"` // EnableCDNFallback controls whether to fall back to CDN when peers // don't have a blob. Default: true. EnableCDNFallback bool `yaml:"enable_cdn_fallback" json:"enable_cdn_fallback"` } // DefaultConfig returns a ClusterConfig with sensible defaults. func DefaultConfig() ClusterConfig { hostname, _ := os.Hostname() return ClusterConfig{ NodeID: hostname, ListenAddr: fmt.Sprintf(":%d", DefaultPort), PeerTimeout: DefaultTimeout, EnableCDNFallback: true, } } // ── Distributed CAS ────────────────────────────────────────────────────────── // DistributedCAS wraps a local CASStore with cluster-aware operations. type DistributedCAS struct { local *storage.CASStore config ClusterConfig cdnClient *cdn.Client httpClient *http.Client server *http.Server // peerHealth tracks which peers are currently reachable. peerHealth map[string]bool mu sync.RWMutex } // New creates a DistributedCAS instance. func New(cas *storage.CASStore, cfg ClusterConfig) *DistributedCAS { if cfg.PeerTimeout <= 0 { cfg.PeerTimeout = DefaultTimeout } return &DistributedCAS{ local: cas, config: cfg, httpClient: &http.Client{ Timeout: cfg.PeerTimeout, }, peerHealth: make(map[string]bool), } } // NewWithCDN creates a DistributedCAS with CDN fallback support. func NewWithCDN(cas *storage.CASStore, cfg ClusterConfig, cdnClient *cdn.Client) *DistributedCAS { d := New(cas, cfg) d.cdnClient = cdnClient return d } // ── Blob Operations (Pull-Through) ─────────────────────────────────────────── // GetBlob retrieves a blob using the pull-through strategy: // 1. Check local CAS // 2. Check peers // 3. Fall back to CDN // // If the blob is found on a peer or CDN, it is stored in the local CAS // for future requests (pull-through caching). func (d *DistributedCAS) GetBlob(digest string) (io.ReadCloser, error) { // 1. Check local CAS. if d.local.Exists(digest) { return d.local.Get(digest) } // 2. Check peers. data, peerAddr, err := d.getFromPeers(digest) if err == nil { // Store locally for future requests. if _, _, putErr := d.local.Put(strings.NewReader(string(data))); putErr != nil { // Non-fatal: blob still usable from memory. fmt.Fprintf(os.Stderr, "distributed-cas: warning: failed to cache blob from peer %s: %v\n", peerAddr, putErr) } return io.NopCloser(strings.NewReader(string(data))), nil } // 3. CDN fallback. if d.config.EnableCDNFallback && d.cdnClient != nil { data, err := d.cdnClient.PullBlob(digest) if err != nil { return nil, fmt.Errorf("distributed-cas: blob %s not found (checked local, %d peers, CDN): %w", digest[:12], len(d.config.Peers), err) } // Cache locally. d.local.Put(strings.NewReader(string(data))) //nolint:errcheck return io.NopCloser(strings.NewReader(string(data))), nil } return nil, fmt.Errorf("distributed-cas: blob %s not found (checked local and %d peers)", digest[:12], len(d.config.Peers)) } // BlobExists checks if a blob exists anywhere in the cluster. func (d *DistributedCAS) BlobExists(digest string) (bool, string) { // Check local. if d.local.Exists(digest) { return true, "local" } // Check peers. for _, peer := range d.config.Peers { url := fmt.Sprintf("http://%s/v1/blobs/%s", peer, digest) req, err := http.NewRequest(http.MethodHead, url, nil) if err != nil { continue } resp, err := d.httpClient.Do(req) if err != nil { continue } resp.Body.Close() if resp.StatusCode == http.StatusOK { return true, peer } } return false, "" } // getFromPeers tries to download a blob from any reachable peer. func (d *DistributedCAS) getFromPeers(digest string) ([]byte, string, error) { for _, peer := range d.config.Peers { d.mu.RLock() healthy := d.peerHealth[peer] d.mu.RUnlock() // Skip peers known to be unhealthy (but still try if health is unknown). if d.peerHealth[peer] == false && healthy { continue } url := fmt.Sprintf("http://%s/v1/blobs/%s", peer, digest) resp, err := d.httpClient.Get(url) if err != nil { d.markPeerUnhealthy(peer) continue } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { continue // Peer doesn't have this blob. } if resp.StatusCode != http.StatusOK { continue } data, err := io.ReadAll(resp.Body) if err != nil { continue } d.markPeerHealthy(peer) return data, peer, nil } return nil, "", fmt.Errorf("no peer has blob %s", digest[:12]) } // ── Manifest Operations ────────────────────────────────────────────────────── // ManifestInfo describes a manifest available on a node. type ManifestInfo struct { Name string `json:"name"` RefFile string `json:"ref_file"` BlobCount int `json:"blob_count"` NodeID string `json:"node_id"` } // ListClusterManifests aggregates manifest lists from all peers and local. func (d *DistributedCAS) ListClusterManifests() ([]ManifestInfo, error) { var all []ManifestInfo // Local manifests. localManifests, err := d.listLocalManifests() if err != nil { return nil, err } all = append(all, localManifests...) // Peer manifests. for _, peer := range d.config.Peers { url := fmt.Sprintf("http://%s/v1/manifests", peer) resp, err := d.httpClient.Get(url) if err != nil { continue } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { continue } var peerManifests []ManifestInfo if err := json.NewDecoder(resp.Body).Decode(&peerManifests); err != nil { continue } all = append(all, peerManifests...) } return all, nil } func (d *DistributedCAS) listLocalManifests() ([]ManifestInfo, error) { refsDir := filepath.Join(d.local.BaseDir(), "refs") entries, err := os.ReadDir(refsDir) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } var manifests []ManifestInfo for _, entry := range entries { if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { continue } bm, err := d.local.LoadManifest(entry.Name()) if err != nil { continue } manifests = append(manifests, ManifestInfo{ Name: bm.Name, RefFile: entry.Name(), BlobCount: len(bm.Objects), NodeID: d.config.NodeID, }) } return manifests, nil } // SyncManifest pulls a manifest and all its blobs from a peer. func (d *DistributedCAS) SyncManifest(peerAddr, refFile string) error { // Download the manifest. url := fmt.Sprintf("http://%s/v1/manifests/%s", peerAddr, refFile) resp, err := d.httpClient.Get(url) if err != nil { return fmt.Errorf("sync manifest: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("sync manifest: peer returned HTTP %d", resp.StatusCode) } var bm storage.BlobManifest if err := json.NewDecoder(resp.Body).Decode(&bm); err != nil { return fmt.Errorf("sync manifest: decode: %w", err) } // Pull missing blobs. missing := 0 for _, digest := range bm.Objects { if d.local.Exists(digest) { continue } missing++ if _, err := d.GetBlob(digest); err != nil { return fmt.Errorf("sync manifest: pull blob %s: %w", digest[:12], err) } } // Save manifest locally. if _, err := d.local.SaveManifest(&bm); err != nil { return fmt.Errorf("sync manifest: save: %w", err) } return nil } // ── HTTP Server ────────────────────────────────────────────────────────────── // StartServer starts the HTTP API server for peer communication. func (d *DistributedCAS) StartServer(ctx context.Context) error { mux := http.NewServeMux() // Blob endpoints. mux.HandleFunc("/v1/blobs/", d.handleBlob) // Manifest endpoints. mux.HandleFunc("/v1/manifests", d.handleManifestList) mux.HandleFunc("/v1/manifests/", d.handleManifestGet) // Health endpoint. mux.HandleFunc("/v1/health", d.handleHealth) // Peer info. mux.HandleFunc("/v1/info", d.handleInfo) d.server = &http.Server{ Addr: d.config.ListenAddr, Handler: mux, } // Start health checker. go d.healthCheckLoop(ctx) // Start server. ln, err := net.Listen("tcp", d.config.ListenAddr) if err != nil { return fmt.Errorf("distributed-cas: listen %s: %w", d.config.ListenAddr, err) } go func() { <-ctx.Done() d.server.Shutdown(context.Background()) //nolint:errcheck }() return d.server.Serve(ln) } // ── HTTP Handlers ──────────────────────────────────────────────────────────── func (d *DistributedCAS) handleBlob(w http.ResponseWriter, r *http.Request) { // Extract digest from path: /v1/blobs/{digest} parts := strings.Split(r.URL.Path, "/") if len(parts) < 4 { http.Error(w, "invalid path", http.StatusBadRequest) return } digest := parts[3] switch r.Method { case http.MethodHead: if d.local.Exists(digest) { blobPath := d.local.GetPath(digest) info, _ := os.Stat(blobPath) if info != nil { w.Header().Set("Content-Length", fmt.Sprintf("%d", info.Size())) } w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusNotFound) } case http.MethodGet: reader, err := d.local.Get(digest) if err != nil { http.Error(w, "not found", http.StatusNotFound) return } defer reader.Close() w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("X-Volt-Node", d.config.NodeID) io.Copy(w, reader) //nolint:errcheck default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } } func (d *DistributedCAS) handleManifestList(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } manifests, err := d.listLocalManifests() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(manifests) //nolint:errcheck } func (d *DistributedCAS) handleManifestGet(w http.ResponseWriter, r *http.Request) { // Extract ref file from path: /v1/manifests/{ref-file} parts := strings.Split(r.URL.Path, "/") if len(parts) < 4 { http.Error(w, "invalid path", http.StatusBadRequest) return } refFile := parts[3] bm, err := d.local.LoadManifest(refFile) if err != nil { http.Error(w, "not found", http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Volt-Node", d.config.NodeID) json.NewEncoder(w).Encode(bm) //nolint:errcheck } func (d *DistributedCAS) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "ok", "node_id": d.config.NodeID, "time": time.Now().UTC().Format(time.RFC3339), }) //nolint:errcheck } func (d *DistributedCAS) handleInfo(w http.ResponseWriter, r *http.Request) { info := map[string]interface{}{ "node_id": d.config.NodeID, "listen_addr": d.config.ListenAddr, "peers": d.config.Peers, "cas_base": d.local.BaseDir(), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(info) //nolint:errcheck } // ── Health Checking ────────────────────────────────────────────────────────── func (d *DistributedCAS) healthCheckLoop(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() // Initial check. d.checkPeerHealth() for { select { case <-ctx.Done(): return case <-ticker.C: d.checkPeerHealth() } } } func (d *DistributedCAS) checkPeerHealth() { for _, peer := range d.config.Peers { url := fmt.Sprintf("http://%s/v1/health", peer) resp, err := d.httpClient.Get(url) if err != nil { d.markPeerUnhealthy(peer) continue } resp.Body.Close() if resp.StatusCode == http.StatusOK { d.markPeerHealthy(peer) } else { d.markPeerUnhealthy(peer) } } } func (d *DistributedCAS) markPeerHealthy(peer string) { d.mu.Lock() defer d.mu.Unlock() d.peerHealth[peer] = true } func (d *DistributedCAS) markPeerUnhealthy(peer string) { d.mu.Lock() defer d.mu.Unlock() d.peerHealth[peer] = false } // ── Peer Status ────────────────────────────────────────────────────────────── // PeerStatus describes the current state of a peer node. type PeerStatus struct { Address string `json:"address"` NodeID string `json:"node_id,omitempty"` Healthy bool `json:"healthy"` Latency time.Duration `json:"latency,omitempty"` } // PeerStatuses returns the health status of all configured peers. func (d *DistributedCAS) PeerStatuses() []PeerStatus { var statuses []PeerStatus for _, peer := range d.config.Peers { ps := PeerStatus{Address: peer} start := time.Now() url := fmt.Sprintf("http://%s/v1/health", peer) resp, err := d.httpClient.Get(url) if err != nil { ps.Healthy = false } else { ps.Latency = time.Since(start) ps.Healthy = resp.StatusCode == http.StatusOK // Try to extract node ID from health response. var healthResp map[string]interface{} if json.NewDecoder(resp.Body).Decode(&healthResp) == nil { if nodeID, ok := healthResp["node_id"].(string); ok { ps.NodeID = nodeID } } resp.Body.Close() } statuses = append(statuses, ps) } return statuses } // ── Cluster Stats ──────────────────────────────────────────────────────────── // ClusterStats provides aggregate statistics across the cluster. type ClusterStats struct { TotalNodes int `json:"total_nodes"` HealthyNodes int `json:"healthy_nodes"` TotalManifests int `json:"total_manifests"` UniqueManifests int `json:"unique_manifests"` } // Stats returns aggregate cluster statistics. func (d *DistributedCAS) Stats() ClusterStats { stats := ClusterStats{ TotalNodes: 1 + len(d.config.Peers), // self + peers } // Count healthy peers. stats.HealthyNodes = 1 // self is always healthy d.mu.RLock() for _, healthy := range d.peerHealth { if healthy { stats.HealthyNodes++ } } d.mu.RUnlock() // Count manifests. manifests, _ := d.ListClusterManifests() stats.TotalManifests = len(manifests) seen := make(map[string]bool) for _, m := range manifests { seen[m.Name] = true } stats.UniqueManifests = len(seen) return stats }