/* Deploy — Rolling and canary deployment strategies for Volt workloads. Coordinates zero-downtime updates for containers and workloads by orchestrating instance creation, health verification, traffic shifting, and automatic rollback on failure. Since Volt uses CAS (content-addressed storage) for rootfs assembly, "updating" a workload means pointing it to a new CAS ref and having TinyVol reassemble the directory tree from the new blob manifest. Strategies: rolling — Update instances one-by-one (respecting MaxSurge/MaxUnavail) canary — Route a percentage of traffic to a new instance before full rollout Copyright (c) Armored Gates LLC. All rights reserved. */ package deploy import ( "fmt" "path/filepath" "strings" "sync" "time" ) // ── Strategy ───────────────────────────────────────────────────────────────── // Strategy defines the deployment approach. type Strategy string const ( // StrategyRolling updates instances one-by-one with health verification. StrategyRolling Strategy = "rolling" // StrategyCanary routes a percentage of traffic to a new instance first. StrategyCanary Strategy = "canary" ) // ── Configuration ──────────────────────────────────────────────────────────── // DeployConfig holds all parameters for a deployment operation. type DeployConfig struct { Strategy Strategy // Deployment strategy Target string // Container/workload name or pattern NewImage string // New CAS ref or image path to deploy MaxSurge int // Max extra instances during rolling (default: 1) MaxUnavail int // Max unavailable during rolling (default: 0) CanaryWeight int // Canary traffic percentage (1-99) HealthCheck HealthCheck // How to verify new instance is healthy Timeout time.Duration // Max time for the entire deployment AutoRollback bool // Rollback on failure } // Validate checks that the config is usable and fills in defaults. func (c *DeployConfig) Validate() error { if c.Target == "" { return fmt.Errorf("deploy: target is required") } if c.NewImage == "" { return fmt.Errorf("deploy: new image (CAS ref) is required") } switch c.Strategy { case StrategyRolling: if c.MaxSurge <= 0 { c.MaxSurge = 1 } if c.MaxUnavail < 0 { c.MaxUnavail = 0 } case StrategyCanary: if c.CanaryWeight <= 0 || c.CanaryWeight >= 100 { return fmt.Errorf("deploy: canary weight must be between 1 and 99, got %d", c.CanaryWeight) } default: return fmt.Errorf("deploy: unknown strategy %q (use 'rolling' or 'canary')", c.Strategy) } if c.Timeout <= 0 { c.Timeout = 10 * time.Minute } if c.HealthCheck.Type == "" { c.HealthCheck.Type = "none" } if c.HealthCheck.Interval <= 0 { c.HealthCheck.Interval = 5 * time.Second } if c.HealthCheck.Retries <= 0 { c.HealthCheck.Retries = 3 } return nil } // ── Deploy Status ──────────────────────────────────────────────────────────── // Phase represents the current phase of a deployment. type Phase string const ( PhasePreparing Phase = "preparing" PhaseDeploying Phase = "deploying" PhaseVerifying Phase = "verifying" PhaseComplete Phase = "complete" PhaseRollingBack Phase = "rolling-back" PhaseFailed Phase = "failed" PhasePaused Phase = "paused" ) // DeployStatus tracks the progress of an active deployment. type DeployStatus struct { ID string `json:"id" yaml:"id"` Phase Phase `json:"phase" yaml:"phase"` Progress string `json:"progress" yaml:"progress"` // e.g. "2/5 instances updated" OldVersion string `json:"old_version" yaml:"old_version"` // previous CAS ref NewVersion string `json:"new_version" yaml:"new_version"` // target CAS ref Target string `json:"target" yaml:"target"` Strategy Strategy `json:"strategy" yaml:"strategy"` StartedAt time.Time `json:"started_at" yaml:"started_at"` CompletedAt time.Time `json:"completed_at,omitempty" yaml:"completed_at,omitempty"` Message string `json:"message,omitempty" yaml:"message,omitempty"` } // ── Instance abstraction ───────────────────────────────────────────────────── // Instance represents a single running workload instance that can be deployed to. type Instance struct { Name string // Instance name (e.g., "web-app-1") Image string // Current CAS ref or image Status string // "running", "stopped", etc. Healthy bool // Last known health state } // ── Executor interface ─────────────────────────────────────────────────────── // Executor abstracts the system operations needed for deployments. // This allows testing without real systemd/nspawn/nftables calls. type Executor interface { // ListInstances returns all instances matching the target pattern. ListInstances(target string) ([]Instance, error) // CreateInstance creates a new instance with the given image. CreateInstance(name, image string) error // StartInstance starts a stopped instance. StartInstance(name string) error // StopInstance stops a running instance. StopInstance(name string) error // DeleteInstance removes an instance entirely. DeleteInstance(name string) error // GetInstanceImage returns the current image/CAS ref for an instance. GetInstanceImage(name string) (string, error) // UpdateInstanceImage updates an instance to use a new image (CAS ref). // This reassembles the rootfs via TinyVol and restarts the instance. UpdateInstanceImage(name, newImage string) error // UpdateTrafficWeight adjusts traffic routing for canary deployments. // weight is 0-100 representing percentage to the canary instance. UpdateTrafficWeight(target string, canaryName string, weight int) error } // ── Active deployments tracking ────────────────────────────────────────────── var ( activeDeployments = make(map[string]*DeployStatus) activeDeploymentsMu sync.RWMutex ) // GetActiveDeployments returns a snapshot of all active deployments. func GetActiveDeployments() []DeployStatus { activeDeploymentsMu.RLock() defer activeDeploymentsMu.RUnlock() result := make([]DeployStatus, 0, len(activeDeployments)) for _, ds := range activeDeployments { result = append(result, *ds) } return result } // GetActiveDeployment returns the active deployment for a target, if any. func GetActiveDeployment(target string) *DeployStatus { activeDeploymentsMu.RLock() defer activeDeploymentsMu.RUnlock() if ds, ok := activeDeployments[target]; ok { cp := *ds return &cp } return nil } func setActiveDeployment(ds *DeployStatus) { activeDeploymentsMu.Lock() defer activeDeploymentsMu.Unlock() activeDeployments[ds.Target] = ds } func removeActiveDeployment(target string) { activeDeploymentsMu.Lock() defer activeDeploymentsMu.Unlock() delete(activeDeployments, target) } // ── Progress callback ──────────────────────────────────────────────────────── // ProgressFunc is called with status updates during deployment. type ProgressFunc func(status DeployStatus) // ── Rolling Deploy ─────────────────────────────────────────────────────────── // RollingDeploy performs a rolling update of instances matching cfg.Target. // // Algorithm: // 1. List all instances matching the target pattern // 2. For each instance (respecting MaxSurge / MaxUnavail): // a. Update instance image to new CAS ref (reassemble rootfs via TinyVol) // b. Start/restart the instance // c. Wait for health check to pass // d. If health check fails and AutoRollback: revert to old image // 3. Record deployment in history func RollingDeploy(cfg DeployConfig, exec Executor, hc HealthChecker, hist *HistoryStore, progress ProgressFunc) error { if err := cfg.Validate(); err != nil { return err } // Generate deployment ID. deployID := generateDeployID() status := &DeployStatus{ ID: deployID, Phase: PhasePreparing, Target: cfg.Target, Strategy: StrategyRolling, NewVersion: cfg.NewImage, StartedAt: time.Now().UTC(), } setActiveDeployment(status) notifyProgress(progress, *status) // 1. Discover instances. instances, err := exec.ListInstances(cfg.Target) if err != nil { status.Phase = PhaseFailed status.Message = fmt.Sprintf("failed to list instances: %v", err) notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } if len(instances) == 0 { status.Phase = PhaseFailed status.Message = "no instances found matching target" notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } // Record old version from first instance. if len(instances) > 0 { oldImg, _ := exec.GetInstanceImage(instances[0].Name) status.OldVersion = oldImg } total := len(instances) updated := 0 var rollbackTargets []string // instances that were updated (for rollback) status.Phase = PhaseDeploying status.Progress = fmt.Sprintf("0/%d instances updated", total) notifyProgress(progress, *status) // Timeout enforcement. deadline := time.Now().Add(cfg.Timeout) // 2. Rolling update loop. for i, inst := range instances { if time.Now().After(deadline) { err := fmt.Errorf("deployment timed out after %s", cfg.Timeout) if cfg.AutoRollback && len(rollbackTargets) > 0 { status.Phase = PhaseRollingBack status.Message = err.Error() notifyProgress(progress, *status) rollbackInstances(exec, rollbackTargets, status.OldVersion) } status.Phase = PhaseFailed status.Message = err.Error() status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, updated) return err } // Respect MaxSurge: we update in-place, so surge is about allowing // brief overlap. With MaxUnavail=0 and MaxSurge=1, we update one at a time. _ = cfg.MaxSurge // In single-node mode, surge is handled by updating in-place. status.Progress = fmt.Sprintf("%d/%d instances updated (updating %s)", i, total, inst.Name) notifyProgress(progress, *status) // a. Update the instance image. if err := exec.UpdateInstanceImage(inst.Name, cfg.NewImage); err != nil { errMsg := fmt.Sprintf("failed to update instance %s: %v", inst.Name, err) if cfg.AutoRollback { status.Phase = PhaseRollingBack status.Message = errMsg notifyProgress(progress, *status) rollbackInstances(exec, rollbackTargets, status.OldVersion) status.Phase = PhaseFailed } else { status.Phase = PhaseFailed } status.Message = errMsg status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, updated) return fmt.Errorf("deploy: %s", errMsg) } // b. Start the instance. if err := exec.StartInstance(inst.Name); err != nil { errMsg := fmt.Sprintf("failed to start instance %s: %v", inst.Name, err) if cfg.AutoRollback { status.Phase = PhaseRollingBack status.Message = errMsg notifyProgress(progress, *status) // Rollback this instance too. rollbackTargets = append(rollbackTargets, inst.Name) rollbackInstances(exec, rollbackTargets, status.OldVersion) status.Phase = PhaseFailed } else { status.Phase = PhaseFailed } status.Message = errMsg status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, updated) return fmt.Errorf("deploy: %s", errMsg) } // c. Health check. status.Phase = PhaseVerifying notifyProgress(progress, *status) if err := hc.WaitHealthy(inst.Name, cfg.HealthCheck); err != nil { errMsg := fmt.Sprintf("health check failed for %s: %v", inst.Name, err) if cfg.AutoRollback { status.Phase = PhaseRollingBack status.Message = errMsg notifyProgress(progress, *status) rollbackTargets = append(rollbackTargets, inst.Name) rollbackInstances(exec, rollbackTargets, status.OldVersion) status.Phase = PhaseFailed } else { status.Phase = PhaseFailed } status.Message = errMsg status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, updated) return fmt.Errorf("deploy: %s", errMsg) } rollbackTargets = append(rollbackTargets, inst.Name) updated++ status.Phase = PhaseDeploying status.Progress = fmt.Sprintf("%d/%d instances updated", updated, total) notifyProgress(progress, *status) } // 3. Complete. status.Phase = PhaseComplete status.Progress = fmt.Sprintf("%d/%d instances updated", updated, total) status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, updated) return nil } // ── Canary Deploy ──────────────────────────────────────────────────────────── // CanaryDeploy creates a canary instance alongside existing instances and // routes cfg.CanaryWeight percent of traffic to it. // // Algorithm: // 1. List existing instances // 2. Create a new canary instance with the new image // 3. Start the canary and verify health // 4. Update traffic routing to send CanaryWeight% to canary // 5. If health fails and AutoRollback: remove canary, restore routing func CanaryDeploy(cfg DeployConfig, exec Executor, hc HealthChecker, hist *HistoryStore, progress ProgressFunc) error { if err := cfg.Validate(); err != nil { return err } deployID := generateDeployID() status := &DeployStatus{ ID: deployID, Phase: PhasePreparing, Target: cfg.Target, Strategy: StrategyCanary, NewVersion: cfg.NewImage, StartedAt: time.Now().UTC(), } setActiveDeployment(status) notifyProgress(progress, *status) // 1. Discover existing instances. instances, err := exec.ListInstances(cfg.Target) if err != nil { status.Phase = PhaseFailed status.Message = fmt.Sprintf("failed to list instances: %v", err) notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } if len(instances) == 0 { status.Phase = PhaseFailed status.Message = "no instances found matching target" notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } // Record old version. if oldImg, err := exec.GetInstanceImage(instances[0].Name); err == nil { status.OldVersion = oldImg } // 2. Create canary instance. canaryName := canaryInstanceName(cfg.Target) status.Phase = PhaseDeploying status.Progress = fmt.Sprintf("creating canary instance %s", canaryName) notifyProgress(progress, *status) if err := exec.CreateInstance(canaryName, cfg.NewImage); err != nil { status.Phase = PhaseFailed status.Message = fmt.Sprintf("failed to create canary: %v", err) notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } // 3. Start canary and verify health. if err := exec.StartInstance(canaryName); err != nil { cleanupCanary(exec, canaryName) status.Phase = PhaseFailed status.Message = fmt.Sprintf("failed to start canary: %v", err) notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } status.Phase = PhaseVerifying status.Progress = "verifying canary health" notifyProgress(progress, *status) if err := hc.WaitHealthy(canaryName, cfg.HealthCheck); err != nil { if cfg.AutoRollback { status.Phase = PhaseRollingBack status.Message = fmt.Sprintf("canary health check failed: %v", err) notifyProgress(progress, *status) cleanupCanary(exec, canaryName) } status.Phase = PhaseFailed status.Message = fmt.Sprintf("canary health check failed: %v", err) status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } // 4. Update traffic routing. status.Progress = fmt.Sprintf("routing %d%% traffic to canary", cfg.CanaryWeight) notifyProgress(progress, *status) if err := exec.UpdateTrafficWeight(cfg.Target, canaryName, cfg.CanaryWeight); err != nil { if cfg.AutoRollback { cleanupCanary(exec, canaryName) } status.Phase = PhaseFailed status.Message = fmt.Sprintf("failed to update traffic routing: %v", err) status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 0) return fmt.Errorf("deploy: %s", status.Message) } // 5. Canary is live. status.Phase = PhaseComplete status.Progress = fmt.Sprintf("canary live with %d%% traffic", cfg.CanaryWeight) status.CompletedAt = time.Now().UTC() notifyProgress(progress, *status) removeActiveDeployment(cfg.Target) recordHistory(hist, status, 1) return nil } // ── Rollback ───────────────────────────────────────────────────────────────── // Rollback reverts a target to its previous version using deployment history. func Rollback(target string, exec Executor, hist *HistoryStore, progress ProgressFunc) error { if hist == nil { return fmt.Errorf("deploy rollback: no history store available") } entries, err := hist.ListByTarget(target) if err != nil { return fmt.Errorf("deploy rollback: failed to read history: %w", err) } // Find the last successful deployment that has a different version. var previousRef string for _, entry := range entries { if entry.Status == string(PhaseComplete) && entry.OldRef != "" { previousRef = entry.OldRef break } } if previousRef == "" { return fmt.Errorf("deploy rollback: no previous version found in history for %q", target) } status := &DeployStatus{ ID: generateDeployID(), Phase: PhaseRollingBack, Target: target, Strategy: StrategyRolling, NewVersion: previousRef, StartedAt: time.Now().UTC(), Message: "rollback to previous version", } notifyProgress(progress, *status) // Perform a rolling deploy with the previous ref. rollbackCfg := DeployConfig{ Strategy: StrategyRolling, Target: target, NewImage: previousRef, MaxSurge: 1, MaxUnavail: 0, HealthCheck: HealthCheck{Type: "none"}, Timeout: 5 * time.Minute, AutoRollback: false, // Don't auto-rollback a rollback } return RollingDeploy(rollbackCfg, exec, &NoopHealthChecker{}, hist, progress) } // ── Helpers ────────────────────────────────────────────────────────────────── // rollbackInstances reverts a list of instances to the old image. func rollbackInstances(exec Executor, names []string, oldImage string) { for _, name := range names { _ = exec.UpdateInstanceImage(name, oldImage) _ = exec.StartInstance(name) } } // cleanupCanary stops and removes a canary instance. func cleanupCanary(exec Executor, canaryName string) { _ = exec.StopInstance(canaryName) _ = exec.DeleteInstance(canaryName) } // canaryInstanceName generates a canary instance name from the target. func canaryInstanceName(target string) string { // Strip any trailing instance numbers and add -canary suffix. base := strings.TrimRight(target, "0123456789-") if base == "" { base = target } return base + "-canary" } // generateDeployID creates a unique deployment ID. func generateDeployID() string { return fmt.Sprintf("deploy-%d", time.Now().UnixNano()/int64(time.Millisecond)) } // notifyProgress safely calls the progress callback if non-nil. func notifyProgress(fn ProgressFunc, status DeployStatus) { if fn != nil { fn(status) } } // recordHistory saves a deployment to the history store if available. func recordHistory(hist *HistoryStore, status *DeployStatus, instancesUpdated int) { if hist == nil { return } entry := HistoryEntry{ ID: status.ID, Target: status.Target, Strategy: string(status.Strategy), OldRef: status.OldVersion, NewRef: status.NewVersion, Status: string(status.Phase), StartedAt: status.StartedAt, CompletedAt: status.CompletedAt, InstancesUpdated: instancesUpdated, Message: status.Message, } _ = hist.Append(entry) } // ── Default executor (real system calls) ───────────────────────────────────── // DefaultCASDir is the default directory for CAS storage. const DefaultCASDir = "/var/lib/volt/cas" // SystemExecutor implements Executor using real system commands. type SystemExecutor struct { ContainerBaseDir string CASBaseDir string } // NewSystemExecutor creates an executor for real system operations. func NewSystemExecutor() *SystemExecutor { return &SystemExecutor{ ContainerBaseDir: "/var/lib/volt/containers", CASBaseDir: DefaultCASDir, } } func (e *SystemExecutor) ListInstances(target string) ([]Instance, error) { // Match instances by prefix or exact name. // Scan /var/lib/volt/containers for directories matching the pattern. var instances []Instance entries, err := filepath.Glob(filepath.Join(e.ContainerBaseDir, target+"*")) if err != nil { return nil, fmt.Errorf("list instances: %w", err) } for _, entry := range entries { name := filepath.Base(entry) instances = append(instances, Instance{ Name: name, Status: "unknown", }) } // If no glob matches, try exact match. if len(instances) == 0 { exact := filepath.Join(e.ContainerBaseDir, target) if info, err := fileInfo(exact); err == nil && info.IsDir() { instances = append(instances, Instance{ Name: target, Status: "unknown", }) } } return instances, nil } func (e *SystemExecutor) CreateInstance(name, image string) error { // Create container directory and write unit file. // In a real implementation this would use the backend.Create flow. return fmt.Errorf("SystemExecutor.CreateInstance not yet wired to backend") } func (e *SystemExecutor) StartInstance(name string) error { return runSystemctl("start", voltContainerUnit(name)) } func (e *SystemExecutor) StopInstance(name string) error { return runSystemctl("stop", voltContainerUnit(name)) } func (e *SystemExecutor) DeleteInstance(name string) error { return fmt.Errorf("SystemExecutor.DeleteInstance not yet wired to backend") } func (e *SystemExecutor) GetInstanceImage(name string) (string, error) { // Read the CAS ref from the instance's metadata. // Stored in /var/lib/volt/containers//.volt-cas-ref refPath := filepath.Join(e.ContainerBaseDir, name, ".volt-cas-ref") data, err := readFile(refPath) if err != nil { return "", fmt.Errorf("no CAS ref found for instance %s", name) } return strings.TrimSpace(string(data)), nil } func (e *SystemExecutor) UpdateInstanceImage(name, newImage string) error { // 1. Stop the instance. _ = runSystemctl("stop", voltContainerUnit(name)) // 2. Write new CAS ref. refPath := filepath.Join(e.ContainerBaseDir, name, ".volt-cas-ref") if err := writeFile(refPath, []byte(newImage)); err != nil { return fmt.Errorf("failed to write CAS ref: %w", err) } return nil } func (e *SystemExecutor) UpdateTrafficWeight(target, canaryName string, weight int) error { // In a full implementation this would update nftables rules for load balancing. // For now, record the weight in a metadata file. weightPath := filepath.Join(e.ContainerBaseDir, ".traffic-weights") data := fmt.Sprintf("%s:%s:%d\n", target, canaryName, weight) return appendFile(weightPath, []byte(data)) } // voltContainerUnit returns the systemd unit name for a container. func voltContainerUnit(name string) string { return fmt.Sprintf("volt-container@%s.service", name) }