|
| 1 | +//! A custom kubelet backend that can run [WASI](https://wasi.dev/) based workloads |
| 2 | +//! |
| 3 | +//! The crate provides the [`WasiProvider`] type which can be used |
| 4 | +//! as a provider with [`kubelet`]. |
| 5 | +//! |
| 6 | +//! # Example |
| 7 | +//! ```rust,no_run |
| 8 | +//! use kubelet::{Kubelet, config::Config}; |
| 9 | +//! use kubelet::store::oci::FileStore; |
| 10 | +//! use std::sync::Arc; |
| 11 | +//! use wasi_provider::WasiProvider; |
| 12 | +//! |
| 13 | +//! async { |
| 14 | +//! // Get a configuration for the Kubelet |
| 15 | +//! let kubelet_config = Config::default(); |
| 16 | +//! let client = oci_distribution::Client::default(); |
| 17 | +//! let store = Arc::new(FileStore::new(client, &std::path::PathBuf::from(""))); |
| 18 | +//! |
| 19 | +//! // Load a kubernetes configuration |
| 20 | +//! let kubeconfig = kube::Config::infer().await.unwrap(); |
| 21 | +//! |
| 22 | +//! // Instantiate the provider type |
| 23 | +//! let provider = WasiProvider::new(store, &kubelet_config, kubeconfig.clone()).await.unwrap(); |
| 24 | +//! |
| 25 | +//! // Instantiate the Kubelet |
| 26 | +//! let kubelet = Kubelet::new(provider, kubeconfig, kubelet_config).await.unwrap(); |
| 27 | +//! // Start the Kubelet and block on it |
| 28 | +//! kubelet.start().await.unwrap(); |
| 29 | +//! }; |
| 30 | +//! ``` |
| 31 | +
|
| 32 | +#![deny(missing_docs)] |
| 33 | + |
| 34 | +mod wasi_runtime; |
| 35 | + |
| 36 | +use std::collections::HashMap; |
| 37 | +use std::path::PathBuf; |
| 38 | +use std::sync::Arc; |
| 39 | + |
| 40 | +use async_trait::async_trait; |
| 41 | +use kubelet::node::Builder; |
| 42 | +use kubelet::pod::{key_from_pod, pod_key, Handle, Pod}; |
| 43 | +use kubelet::provider::{Provider, ProviderError}; |
| 44 | +use kubelet::store::Store; |
| 45 | +use kubelet::volume::Ref; |
| 46 | +use tokio::sync::mpsc::{self, Receiver, Sender}; |
| 47 | +use tokio::sync::RwLock; |
| 48 | +use wasi_runtime::Runtime; |
| 49 | + |
| 50 | +mod states; |
| 51 | + |
| 52 | +use states::registered::Registered; |
| 53 | +use states::terminated::Terminated; |
| 54 | + |
| 55 | +const TARGET_WASM32_WASI: &str = "wasm32-wasi"; |
| 56 | +const LOG_DIR_NAME: &str = "wasi-logs"; |
| 57 | +const VOLUME_DIR: &str = "volumes"; |
| 58 | + |
| 59 | +/// WasiProvider provides a Kubelet runtime implementation that executes WASM |
| 60 | +/// binaries conforming to the WASI spec. |
| 61 | +#[derive(Clone)] |
| 62 | +pub struct WasiProvider { |
| 63 | + shared: SharedPodState, |
| 64 | +} |
| 65 | + |
| 66 | +#[derive(Clone)] |
| 67 | +struct SharedPodState { |
| 68 | + handles: Arc<RwLock<HashMap<String, Handle<Runtime, wasi_runtime::HandleFactory>>>>, |
| 69 | + store: Arc<dyn Store + Sync + Send>, |
| 70 | + log_path: PathBuf, |
| 71 | + kubeconfig: kube::Config, |
| 72 | + volume_path: PathBuf, |
| 73 | +} |
| 74 | + |
| 75 | +impl WasiProvider { |
| 76 | + /// Create a new wasi provider from a module store and a kubelet config |
| 77 | + pub async fn new( |
| 78 | + store: Arc<dyn Store + Sync + Send>, |
| 79 | + config: &kubelet::config::Config, |
| 80 | + kubeconfig: kube::Config, |
| 81 | + ) -> anyhow::Result<Self> { |
| 82 | + let log_path = config.data_dir.join(LOG_DIR_NAME); |
| 83 | + let volume_path = config.data_dir.join(VOLUME_DIR); |
| 84 | + tokio::fs::create_dir_all(&log_path).await?; |
| 85 | + tokio::fs::create_dir_all(&volume_path).await?; |
| 86 | + Ok(Self { |
| 87 | + shared: SharedPodState { |
| 88 | + handles: Default::default(), |
| 89 | + store, |
| 90 | + log_path, |
| 91 | + volume_path, |
| 92 | + kubeconfig, |
| 93 | + }, |
| 94 | + }) |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +struct ModuleRunContext { |
| 99 | + modules: HashMap<String, Vec<u8>>, |
| 100 | + volumes: HashMap<String, Ref>, |
| 101 | + status_sender: Sender<(String, kubelet::container::Status)>, |
| 102 | + status_recv: Receiver<(String, kubelet::container::Status)>, |
| 103 | +} |
| 104 | + |
| 105 | +/// State that is shared between pod state handlers. |
| 106 | +pub struct PodState { |
| 107 | + key: String, |
| 108 | + run_context: ModuleRunContext, |
| 109 | + errors: usize, |
| 110 | + shared: SharedPodState, |
| 111 | +} |
| 112 | + |
| 113 | +// No cleanup state needed, we clean up when dropping PodState. |
| 114 | +#[async_trait] |
| 115 | +impl kubelet::state::AsyncDrop for PodState { |
| 116 | + async fn async_drop(self) { |
| 117 | + { |
| 118 | + let mut handles = self.shared.handles.write().await; |
| 119 | + handles.remove(&self.key); |
| 120 | + } |
| 121 | + } |
| 122 | +} |
| 123 | + |
| 124 | +#[async_trait::async_trait] |
| 125 | +impl Provider for WasiProvider { |
| 126 | + type InitialState = Registered; |
| 127 | + type TerminatedState = Terminated; |
| 128 | + type PodState = PodState; |
| 129 | + |
| 130 | + const ARCH: &'static str = TARGET_WASM32_WASI; |
| 131 | + |
| 132 | + async fn node(&self, builder: &mut Builder) -> anyhow::Result<()> { |
| 133 | + builder.set_architecture("wasm-wasi"); |
| 134 | + builder.add_taint("NoExecute", "kubernetes.io/arch", Self::ARCH); |
| 135 | + Ok(()) |
| 136 | + } |
| 137 | + |
| 138 | + async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result<Self::PodState> { |
| 139 | + let (tx, rx) = mpsc::channel(pod.all_containers().len()); |
| 140 | + let run_context = ModuleRunContext { |
| 141 | + modules: Default::default(), |
| 142 | + volumes: Default::default(), |
| 143 | + status_sender: tx, |
| 144 | + status_recv: rx, |
| 145 | + }; |
| 146 | + let key = key_from_pod(pod); |
| 147 | + Ok(PodState { |
| 148 | + key, |
| 149 | + run_context, |
| 150 | + errors: 0, |
| 151 | + shared: self.shared.clone(), |
| 152 | + }) |
| 153 | + } |
| 154 | + |
| 155 | + async fn logs( |
| 156 | + &self, |
| 157 | + namespace: String, |
| 158 | + pod_name: String, |
| 159 | + container_name: String, |
| 160 | + sender: kubelet::log::Sender, |
| 161 | + ) -> anyhow::Result<()> { |
| 162 | + let mut handles = self.shared.handles.write().await; |
| 163 | + let handle = handles |
| 164 | + .get_mut(&pod_key(&namespace, &pod_name)) |
| 165 | + .ok_or_else(|| ProviderError::PodNotFound { |
| 166 | + pod_name: pod_name.clone(), |
| 167 | + })?; |
| 168 | + handle.output(&container_name, sender).await |
| 169 | + } |
| 170 | +} |
0 commit comments