Volt VMM (Neutron Stardust): source-available under AGPSL v5.0

KVM-based microVMM for the Volt platform:
- Sub-second VM boot times
- Minimal memory footprint
- Landlock LSM + seccomp security
- Virtio device support
- Custom kernel management

Copyright (c) Armored Gates LLC. All rights reserved.
Licensed under AGPSL v5.0
This commit is contained in:
Karl Clinger
2026-03-21 01:04:35 -05:00
commit 40ed108dd5
143 changed files with 50300 additions and 0 deletions

27
vmm/api-test/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "volt-vmm-api-test"
version = "0.1.0"
edition = "2021"
[dependencies]
# Async runtime
tokio = { version = "1", features = ["full"] }
# HTTP server
hyper = { version = "1", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto"] }
http-body-util = "0.1"
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Error handling
thiserror = "2"
anyhow = "1"
# Logging
tracing = "0.1"
# Metrics
prometheus = "0.13"

View File

@@ -0,0 +1,291 @@
//! API Request Handlers
//!
//! Handles the business logic for each API endpoint.
use super::types::{
ApiError, ApiResponse, VmConfig, VmState, VmStateAction, VmStateRequest, VmStateResponse,
};
use prometheus::{Encoder, TextEncoder};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
/// Shared VM state managed by the API
#[derive(Debug)]
pub struct VmContext {
pub config: Option<VmConfig>,
pub state: VmState,
pub boot_time_ms: Option<u64>,
}
impl Default for VmContext {
fn default() -> Self {
VmContext {
config: None,
state: VmState::NotConfigured,
boot_time_ms: None,
}
}
}
/// API handler with shared state
#[derive(Clone)]
pub struct ApiHandler {
context: Arc<RwLock<VmContext>>,
// Metrics
requests_total: prometheus::IntCounter,
request_duration: prometheus::Histogram,
vm_state_gauge: prometheus::IntGauge,
}
impl ApiHandler {
pub fn new() -> Self {
// Register Prometheus metrics
let requests_total = prometheus::IntCounter::new(
"volt-vmm_api_requests_total",
"Total number of API requests",
)
.expect("metric creation failed");
let request_duration = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"volt-vmm_api_request_duration_seconds",
"API request duration in seconds",
)
.buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]),
)
.expect("metric creation failed");
let vm_state_gauge =
prometheus::IntGauge::new("volt-vmm_vm_state", "Current VM state (0=not_configured, 1=configured, 2=starting, 3=running, 4=paused, 5=shutting_down, 6=stopped, 7=error)")
.expect("metric creation failed");
// Register with default registry
let _ = prometheus::register(Box::new(requests_total.clone()));
let _ = prometheus::register(Box::new(request_duration.clone()));
let _ = prometheus::register(Box::new(vm_state_gauge.clone()));
ApiHandler {
context: Arc::new(RwLock::new(VmContext::default())),
requests_total,
request_duration,
vm_state_gauge,
}
}
/// PUT /v1/vm/config - Set VM configuration before boot
pub async fn put_config(&self, config: VmConfig) -> Result<ApiResponse<VmConfig>, ApiError> {
let mut ctx = self.context.write().await;
// Only allow config changes when VM is not running
match ctx.state {
VmState::NotConfigured | VmState::Configured | VmState::Stopped => {
info!(
vcpus = config.vcpu_count,
mem_mib = config.mem_size_mib,
"VM configuration updated"
);
ctx.config = Some(config.clone());
ctx.state = VmState::Configured;
self.update_state_gauge(VmState::Configured);
Ok(ApiResponse::ok(config))
}
state => {
warn!(?state, "Cannot change config while VM is in this state");
Err(ApiError::InvalidStateTransition {
current_state: state,
action: "configure".to_string(),
})
}
}
}
/// GET /v1/vm/config - Get current VM configuration
pub async fn get_config(&self) -> Result<ApiResponse<VmConfig>, ApiError> {
let ctx = self.context.read().await;
match &ctx.config {
Some(config) => Ok(ApiResponse::ok(config.clone())),
None => Err(ApiError::NotConfigured),
}
}
/// PUT /v1/vm/state - Change VM state (start/stop/pause/resume)
pub async fn put_state(
&self,
request: VmStateRequest,
) -> Result<ApiResponse<VmStateResponse>, ApiError> {
let mut ctx = self.context.write().await;
let new_state = match (&ctx.state, &request.action) {
// Start transitions
(VmState::Configured, VmStateAction::Start) => {
info!("Starting VM...");
// In real implementation, this would trigger VM boot
VmState::Running
}
(VmState::Stopped, VmStateAction::Start) => {
info!("Restarting VM...");
VmState::Running
}
// Pause/Resume transitions
(VmState::Running, VmStateAction::Pause) => {
info!("Pausing VM...");
VmState::Paused
}
(VmState::Paused, VmStateAction::Resume) => {
info!("Resuming VM...");
VmState::Running
}
// Shutdown transitions
(VmState::Running | VmState::Paused, VmStateAction::Shutdown) => {
info!("Graceful shutdown initiated...");
VmState::ShuttingDown
}
(VmState::Running | VmState::Paused, VmStateAction::Stop) => {
info!("Force stopping VM...");
VmState::Stopped
}
(VmState::ShuttingDown, VmStateAction::Stop) => {
info!("Force stopping during shutdown...");
VmState::Stopped
}
// Invalid transitions
(state, action) => {
warn!(?state, ?action, "Invalid state transition requested");
return Err(ApiError::InvalidStateTransition {
current_state: *state,
action: format!("{:?}", action),
});
}
};
ctx.state = new_state;
self.update_state_gauge(new_state);
debug!(?new_state, "VM state changed");
Ok(ApiResponse::ok(VmStateResponse {
state: new_state,
message: None,
}))
}
/// GET /v1/vm/state - Get current VM state
pub async fn get_state(&self) -> Result<ApiResponse<VmStateResponse>, ApiError> {
let ctx = self.context.read().await;
Ok(ApiResponse::ok(VmStateResponse {
state: ctx.state,
message: None,
}))
}
/// GET /v1/metrics - Prometheus metrics
pub async fn get_metrics(&self) -> Result<String, ApiError> {
self.requests_total.inc();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
encoder
.encode(&metric_families, &mut buffer)
.map_err(|e| ApiError::Internal(e.to_string()))?;
String::from_utf8(buffer).map_err(|e| ApiError::Internal(e.to_string()))
}
/// Record request metrics
pub fn record_request(&self, duration_secs: f64) {
self.requests_total.inc();
self.request_duration.observe(duration_secs);
}
fn update_state_gauge(&self, state: VmState) {
let value = match state {
VmState::NotConfigured => 0,
VmState::Configured => 1,
VmState::Starting => 2,
VmState::Running => 3,
VmState::Paused => 4,
VmState::ShuttingDown => 5,
VmState::Stopped => 6,
VmState::Error => 7,
};
self.vm_state_gauge.set(value);
}
}
impl Default for ApiHandler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_config_workflow() {
let handler = ApiHandler::new();
// Get config should fail initially
let result = handler.get_config().await;
assert!(result.is_err());
// Set config
let config = VmConfig {
vcpu_count: 2,
mem_size_mib: 256,
..Default::default()
};
let result = handler.put_config(config).await;
assert!(result.is_ok());
// Get config should work now
let result = handler.get_config().await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.data.unwrap().vcpu_count, 2);
}
#[tokio::test]
async fn test_state_transitions() {
let handler = ApiHandler::new();
// Configure VM first
let config = VmConfig::default();
handler.put_config(config).await.unwrap();
// Start VM
let request = VmStateRequest {
action: VmStateAction::Start,
};
let result = handler.put_state(request).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().data.unwrap().state, VmState::Running);
// Pause VM
let request = VmStateRequest {
action: VmStateAction::Pause,
};
let result = handler.put_state(request).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().data.unwrap().state, VmState::Paused);
// Resume VM
let request = VmStateRequest {
action: VmStateAction::Resume,
};
let result = handler.put_state(request).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().data.unwrap().state, VmState::Running);
}
}

View File

@@ -0,0 +1,25 @@
//! Volt HTTP API
//!
//! Unix socket HTTP/1.1 API server (Firecracker-compatible style).
//! Provides endpoints for VM configuration and lifecycle management.
//!
//! ## Endpoints
//!
//! - `PUT /v1/vm/config` - Pre-boot VM configuration
//! - `GET /v1/vm/config` - Get current configuration
//! - `PUT /v1/vm/state` - Change VM state (start/stop/pause/resume)
//! - `GET /v1/vm/state` - Get current VM state
//! - `GET /v1/metrics` - Prometheus-format metrics
//! - `GET /health` - Health check
mod handlers;
mod routes;
mod server;
mod types;
pub use handlers::ApiHandler;
pub use server::{run_server, ServerBuilder};
pub use types::{
ApiError, ApiResponse, NetworkConfig, VmConfig, VmState, VmStateAction, VmStateRequest,
VmStateResponse,
};

View File

@@ -0,0 +1,193 @@
//! API Route Definitions
//!
//! Maps HTTP paths and methods to handlers.
use super::handlers::ApiHandler;
use super::types::ApiError;
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::{Method, Request, Response, StatusCode};
use std::time::Instant;
use tracing::{debug, error};
/// Route an incoming request to the appropriate handler
pub async fn route_request(
handler: ApiHandler,
req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let start = Instant::now();
let method = req.method().clone();
let path = req.uri().path().to_string();
debug!(%method, %path, "Incoming request");
let response = match (method.clone(), path.as_str()) {
// VM Configuration
(Method::PUT, "/v1/vm/config") => handle_put_config(handler.clone(), req).await,
(Method::GET, "/v1/vm/config") => handle_get_config(handler.clone()).await,
// VM State
(Method::PUT, "/v1/vm/state") => handle_put_state(handler.clone(), req).await,
(Method::GET, "/v1/vm/state") => handle_get_state(handler.clone()).await,
// Metrics
(Method::GET, "/v1/metrics") | (Method::GET, "/metrics") => {
handle_metrics(handler.clone()).await
}
// Health check
(Method::GET, "/") | (Method::GET, "/health") => Ok(json_response(
StatusCode::OK,
r#"{"status":"ok","version":"0.1.0"}"#,
)),
// 404 for unknown paths
(_, path) => {
debug!("Unknown path: {}", path);
Ok(error_response(ApiError::NotFound(path.to_string())))
}
};
// Record metrics
let duration = start.elapsed().as_secs_f64();
handler.record_request(duration);
debug!(%method, path = %req.uri().path(), duration_ms = duration * 1000.0, "Request completed");
response
}
async fn handle_put_config(
handler: ApiHandler,
req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
// Read request body
let body = match read_body(req).await {
Ok(b) => b,
Err(e) => return Ok(error_response(e)),
};
// Parse JSON
let config = match serde_json::from_slice(&body) {
Ok(c) => c,
Err(e) => {
return Ok(error_response(ApiError::BadRequest(format!(
"Invalid JSON: {}",
e
))))
}
};
// Handle request
match handler.put_config(config).await {
Ok(response) => Ok(json_response(
StatusCode::OK,
&serde_json::to_string(&response).unwrap(),
)),
Err(e) => Ok(error_response(e)),
}
}
async fn handle_get_config(
handler: ApiHandler,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
match handler.get_config().await {
Ok(response) => Ok(json_response(
StatusCode::OK,
&serde_json::to_string(&response).unwrap(),
)),
Err(e) => Ok(error_response(e)),
}
}
async fn handle_put_state(
handler: ApiHandler,
req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
// Read request body
let body = match read_body(req).await {
Ok(b) => b,
Err(e) => return Ok(error_response(e)),
};
// Parse JSON
let request = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
return Ok(error_response(ApiError::BadRequest(format!(
"Invalid JSON: {}",
e
))))
}
};
// Handle request
match handler.put_state(request).await {
Ok(response) => Ok(json_response(
StatusCode::OK,
&serde_json::to_string(&response).unwrap(),
)),
Err(e) => Ok(error_response(e)),
}
}
async fn handle_get_state(
handler: ApiHandler,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
match handler.get_state().await {
Ok(response) => Ok(json_response(
StatusCode::OK,
&serde_json::to_string(&response).unwrap(),
)),
Err(e) => Ok(error_response(e)),
}
}
async fn handle_metrics(
handler: ApiHandler,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
match handler.get_metrics().await {
Ok(metrics) => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4")
.body(Full::new(Bytes::from(metrics)))
.unwrap()),
Err(e) => Ok(error_response(e)),
}
}
/// Read the full request body into bytes
async fn read_body(req: Request<hyper::body::Incoming>) -> Result<Bytes, ApiError> {
req.into_body()
.collect()
.await
.map(|c| c.to_bytes())
.map_err(|e| ApiError::Internal(format!("Failed to read body: {}", e)))
}
/// Create a JSON response
fn json_response(status: StatusCode, body: &str) -> Response<Full<Bytes>> {
Response::builder()
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(body.to_string())))
.unwrap()
}
/// Create an error response from an ApiError
fn error_response(error: ApiError) -> Response<Full<Bytes>> {
let status = StatusCode::from_u16(error.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let body = serde_json::json!({
"success": false,
"error": error.to_string()
});
error!(status = %status, error = %error, "API error response");
Response::builder()
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(body.to_string())))
.unwrap()
}

View File

@@ -0,0 +1,164 @@
//! Unix Socket HTTP Server
//!
//! Listens on a Unix domain socket and handles HTTP/1.1 requests.
//! Inspired by Firecracker's API server design.
use super::handlers::ApiHandler;
use super::routes::route_request;
use anyhow::{Context, Result};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use std::path::Path;
use std::sync::Arc;
use tokio::net::UnixListener;
use tokio::signal;
use tracing::{debug, error, info, warn};
/// Run the HTTP API server on a Unix socket
pub async fn run_server(socket_path: &str) -> Result<()> {
// Remove existing socket file if present
let path = Path::new(socket_path);
if path.exists() {
std::fs::remove_file(path).context("Failed to remove existing socket")?;
}
// Create the Unix listener
let listener = UnixListener::bind(path).context("Failed to bind Unix socket")?;
// Set socket permissions (readable/writable by owner only for security)
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
.context("Failed to set socket permissions")?;
}
info!(socket = %socket_path, "Volt API server listening");
// Create shared handler
let handler = Arc::new(ApiHandler::new());
// Accept connections in a loop
loop {
tokio::select! {
// Accept new connections
result = listener.accept() => {
match result {
Ok((stream, _addr)) => {
let handler = Arc::clone(&handler);
debug!("New connection accepted");
// Spawn a task to handle this connection
tokio::spawn(async move {
let io = TokioIo::new(stream);
// Create the service function
let service = service_fn(move |req| {
let handler = (*handler).clone();
async move { route_request(handler, req).await }
});
// Serve the connection with HTTP/1
if let Err(e) = http1::Builder::new()
.serve_connection(io, service)
.await
{
// Connection reset by peer is common and not an error
if !e.to_string().contains("connection reset") {
error!("Connection error: {}", e);
}
}
debug!("Connection closed");
});
}
Err(e) => {
error!("Accept failed: {}", e);
}
}
}
// Handle shutdown signals
_ = signal::ctrl_c() => {
info!("Shutdown signal received");
break;
}
}
}
// Cleanup socket file
if path.exists() {
if let Err(e) = std::fs::remove_file(path) {
warn!("Failed to remove socket file: {}", e);
}
}
info!("API server shut down");
Ok(())
}
/// Server builder for more configuration options
pub struct ServerBuilder {
socket_path: String,
socket_permissions: u32,
}
impl ServerBuilder {
pub fn new(socket_path: impl Into<String>) -> Self {
ServerBuilder {
socket_path: socket_path.into(),
socket_permissions: 0o600,
}
}
/// Set socket file permissions (Unix only)
pub fn permissions(mut self, mode: u32) -> Self {
self.socket_permissions = mode;
self
}
/// Build and run the server
pub async fn run(self) -> Result<()> {
run_server(&self.socket_path).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn test_server_starts_and_accepts_connections() {
let socket_path = "/tmp/volt-vmm-test.sock";
// Start server in background
let server_handle = tokio::spawn(async move {
let _ = run_server(socket_path).await;
});
// Give server time to start
tokio::time::sleep(Duration::from_millis(100)).await;
// Connect and send a simple request
if let Ok(mut stream) = tokio::net::UnixStream::connect(socket_path).await {
let request = "GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n";
stream.write_all(request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 1024];
let n = stream.read(&mut response).await.unwrap();
let response_str = String::from_utf8_lossy(&response[..n]);
assert!(response_str.contains("HTTP/1.1 200"));
assert!(response_str.contains("ok"));
}
// Cleanup
server_handle.abort();
let _ = std::fs::remove_file(socket_path);
}
}

View File

@@ -0,0 +1,200 @@
//! API Types and Data Structures
use serde::{Deserialize, Serialize};
use std::fmt;
/// VM configuration for pre-boot setup
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct VmConfig {
/// Number of vCPUs
#[serde(default = "default_vcpu_count")]
pub vcpu_count: u8,
/// Memory size in MiB
#[serde(default = "default_mem_size_mib")]
pub mem_size_mib: u32,
/// Path to kernel image
pub kernel_image_path: Option<String>,
/// Kernel boot arguments
#[serde(default)]
pub boot_args: String,
/// Path to root filesystem
pub rootfs_path: Option<String>,
/// Network configuration
pub network: Option<NetworkConfig>,
/// Enable HugePages for memory
#[serde(default)]
pub hugepages: bool,
}
fn default_vcpu_count() -> u8 {
1
}
fn default_mem_size_mib() -> u32 {
128
}
/// Network configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkConfig {
/// TAP device name
pub tap_device: String,
/// Guest MAC address
pub guest_mac: Option<String>,
/// Host IP for the TAP interface
pub host_ip: Option<String>,
/// Guest IP
pub guest_ip: Option<String>,
}
/// VM runtime state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum VmState {
/// VM is not yet configured
NotConfigured,
/// VM is configured but not started
Configured,
/// VM is starting up
Starting,
/// VM is running
Running,
/// VM is paused
Paused,
/// VM is shutting down
ShuttingDown,
/// VM has stopped
Stopped,
/// VM encountered an error
Error,
}
impl Default for VmState {
fn default() -> Self {
VmState::NotConfigured
}
}
impl fmt::Display for VmState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VmState::NotConfigured => write!(f, "not_configured"),
VmState::Configured => write!(f, "configured"),
VmState::Starting => write!(f, "starting"),
VmState::Running => write!(f, "running"),
VmState::Paused => write!(f, "paused"),
VmState::ShuttingDown => write!(f, "shutting_down"),
VmState::Stopped => write!(f, "stopped"),
VmState::Error => write!(f, "error"),
}
}
}
/// Action to change VM state
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum VmStateAction {
/// Start the VM
Start,
/// Pause the VM (freeze vCPUs)
Pause,
/// Resume a paused VM
Resume,
/// Graceful shutdown
Shutdown,
/// Force stop
Stop,
}
/// Request body for state changes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmStateRequest {
pub action: VmStateAction,
}
/// VM state response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmStateResponse {
pub state: VmState,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
/// Generic API response wrapper
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<T>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl<T> ApiResponse<T> {
pub fn ok(data: T) -> Self {
ApiResponse {
success: true,
data: Some(data),
error: None,
}
}
pub fn error(msg: impl Into<String>) -> Self {
ApiResponse {
success: false,
data: None,
error: Some(msg.into()),
}
}
}
/// API error types
#[derive(Debug, thiserror::Error)]
pub enum ApiError {
#[error("Invalid request: {0}")]
BadRequest(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Method not allowed")]
MethodNotAllowed,
#[error("Invalid state transition: cannot {action} from {current_state}")]
InvalidStateTransition {
current_state: VmState,
action: String,
},
#[error("VM not configured")]
NotConfigured,
#[error("Internal error: {0}")]
Internal(String),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
}
impl ApiError {
pub fn status_code(&self) -> u16 {
match self {
ApiError::BadRequest(_) => 400,
ApiError::NotFound(_) => 404,
ApiError::MethodNotAllowed => 405,
ApiError::InvalidStateTransition { .. } => 409,
ApiError::NotConfigured => 409,
ApiError::Internal(_) => 500,
ApiError::Json(_) => 400,
}
}
}

5
vmm/api-test/src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
//! Volt API Test Crate
pub mod api;
pub use api::{run_server, VmConfig, VmState, VmStateAction};