diff --git a/vhost-device-vsock/CHANGELOG.md b/vhost-device-vsock/CHANGELOG.md index a2aa52a9..2eba5cd0 100644 --- a/vhost-device-vsock/CHANGELOG.md +++ b/vhost-device-vsock/CHANGELOG.md @@ -3,10 +3,14 @@ ### Added +- [#936](https://github.com/rust-vmm/vhost-device/pull/936) vsock: Enable live migrations (snapshot-restore) + ### Changed ### Fixed +- [#936](https://github.com/rust-vmm/vhost-device/pull/936) vsock: Access virtqueues only if they are ready + ### Deprecated ## v0.3.0 diff --git a/vhost-device-vsock/README.md b/vhost-device-vsock/README.md index 1dc35cf2..91cdd93e 100644 --- a/vhost-device-vsock/README.md +++ b/vhost-device-vsock/README.md @@ -129,6 +129,18 @@ In this configuration: - The host must have vsock support (e.g., `vsock_loopback` kernel module loaded) - For testing, you can load the module with: `modprobe vsock_loopback` +## Live migration + +This device implementation advertises support for live migrations by offering the VHOST_USER_PROTOCOL_F_DEVICE_STATE protocol feature, however this doesn't work with Qemu yet as it marks its vsock frontend as "unmigratable". This feature does work with CrosVm and potentially other virtual machine managers. + +The device itself doesn't save or restore any state during a live migration. It relies instead on the frontend to save the vring's states and negotiated features. It also expects the the frontend to "kick" the queues that have pending buffers in it since the driver probably kicked those queues before the migration and won't do it again. + +The state saving flow is trivial as the device doesn't save any state as mentioned. + +The state loading flow is a bit more complicated because the virtio-vsock spec mandates that the device must send a VIRTIO_VSOCK_EVENT_TRANSPORT_RESET event to the driver. During a restore the backend is started no differently than during a regular boot. When the frontend sends the VHOST_USER_SET_DEVICE_STATE_FD command with LOAD direction the backend doesn't load anything, but it takes note that a transport reset event needs to be sent to the driver via the event vring when possible. In order to make sure this event is sent when the queue is ready, the backend waits for the event queue to be kicked before sending the event. While these kicks usually come from the driver, this particular one is actually sent by the vhost-user frontend. This implementation depends on the frontend to kick all queues with pending buffers after a restore because the driver is unlikely to do so as it probably did it before the snapshot was taken. + +In response to the transport reset event the driver drops any existing connections and reads the configuration space again. To prevent the driver from dropping any new connections established after the restore the backend doesn't forward any packets from outside the VM to the driver until it has read the configuration space. In fact, because the backend doesn't know at start time whether this is a restore or a clean boot, it always waits until after the driver has read the configuration space to start forwarding packets between the outside world and the driver. + ## Usage Run the vhost-device-vsock device with unix domain socket backend: diff --git a/vhost-device-vsock/src/thread_backend.rs b/vhost-device-vsock/src/thread_backend.rs index 44a1f1e1..700aaca2 100644 --- a/vhost-device-vsock/src/thread_backend.rs +++ b/vhost-device-vsock/src/thread_backend.rs @@ -332,7 +332,7 @@ impl VsockThreadBackend { if dst_cid != VSOCK_HOST_CID { let cid_map = self.cid_map.read().unwrap(); if cid_map.contains_key(&dst_cid) { - let (sibling_raw_pkts_queue, sibling_groups_set, sibling_event_fd) = + let (sibling_raw_pkts_queue_opt, sibling_groups_set, sibling_event_fd) = cid_map.get(&dst_cid).unwrap(); if self @@ -345,11 +345,18 @@ impl VsockThreadBackend { return Ok(()); } - sibling_raw_pkts_queue - .write() - .unwrap() - .push_back(RawVsockPacket::from_vsock_packet(pkt)?); - let _ = sibling_event_fd.write(1); + match sibling_raw_pkts_queue_opt { + Some(queue) => { + queue + .write() + .unwrap() + .push_back(RawVsockPacket::from_vsock_packet(pkt)?); + let _ = sibling_event_fd.write(1); + } + None => { + info!("vsock: dropping packet for cid: {dst_cid:?} due to inactive device"); + } + } } else { warn!("vsock: dropping packet for unknown cid: {dst_cid:?}"); } @@ -525,6 +532,7 @@ mod tests { #[cfg(feature = "backend_vsock")] use crate::vhu_vsock::VsockProxyInfo; use crate::vhu_vsock::{BackendType, VhostUserVsockBackend, VsockConfig, VSOCK_OP_RW}; + use vhost_user_backend::VhostUserBackend; const DATA_LEN: usize = 16; const CONN_TX_BUF_SIZE: u32 = 64 * 1024; @@ -698,11 +706,28 @@ mod tests { // SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid. let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() }; + packet.set_type(VSOCK_TYPE_STREAM); + packet.set_src_cid(CID); + packet.set_dst_cid(SIBLING_CID); + packet.set_dst_port(SIBLING_LISTENING_PORT); + packet.set_op(VSOCK_OP_RW); + packet.set_len(DATA_LEN as u32); + packet + .data_slice() + .unwrap() + .copy_from(&[0x01u8, 0x12u8, 0x23u8, 0x34u8]); + + // The packet will be dropped silently because the thread won't activate until the config + // is read. + vtp.send_pkt(&packet).unwrap(); assert_eq!( vtp.recv_raw_pkt(&mut packet).unwrap_err().to_string(), Error::EmptyRawPktsQueue.to_string() ); + sibling_backend.get_config(0, 8); + sibling2_backend.get_config(0, 8); + packet.set_type(VSOCK_TYPE_STREAM); packet.set_src_cid(CID); packet.set_dst_cid(SIBLING_CID); diff --git a/vhost-device-vsock/src/vhu_vsock.rs b/vhost-device-vsock/src/vhu_vsock.rs index d5d236dd..d6e0f0ad 100644 --- a/vhost-device-vsock/src/vhu_vsock.rs +++ b/vhost-device-vsock/src/vhu_vsock.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashMap, HashSet}, + fs::File, io::Result as IoResult, path::PathBuf, sync::{Arc, Mutex, RwLock}, @@ -9,7 +10,10 @@ use std::{ use log::warn; use thiserror::Error as ThisError; -use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures}; +use vhost::vhost_user::message::{ + VhostTransferStateDirection, VhostTransferStatePhase, VhostUserProtocolFeatures, + VhostUserVirtioFeatures, +}; use vhost_user_backend::{VhostUserBackend, VringRwLock}; use virtio_bindings::bindings::{ virtio_config::{VIRTIO_F_NOTIFY_ON_EMPTY, VIRTIO_F_VERSION_1}, @@ -24,8 +28,14 @@ use vmm_sys_util::{ use crate::{thread_backend::RawPktsQ, vhu_vsock_thread::*}; -pub(crate) type CidMap = - HashMap>, Arc>>, EventFd)>; +pub(crate) type CidMap = HashMap< + u64, + ( + Option>>, + Arc>>, + EventFd, + ), +>; const NUM_QUEUES: usize = 3; @@ -72,8 +82,11 @@ pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1; /// data pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2; +/// Vsock events - `VSOCK_EVENT_TRANSPORT_RESET`: Communication has been interrupted +pub(crate) const VSOCK_EVENT_TRANSPORT_RESET: u32 = 0; + // Queue mask to select vrings. -const QUEUE_MASK: u64 = 0b11; +const QUEUE_MASK: u64 = 0b111; pub(crate) type Result = std::result::Result; @@ -141,6 +154,8 @@ pub(crate) enum Error { EmptyRawPktsQueue, #[error("CID already in use by another vsock device")] CidAlreadyInUse, + #[error("Failed to write to event virtqueue")] + EventQueueWrite, } impl std::convert::From for std::io::Error { @@ -261,6 +276,7 @@ pub(crate) struct VhostUserVsockBackend { queues_per_thread: Vec, exit_consumer: EventConsumer, exit_notifier: EventNotifier, + transport_reset_pending: Arc>, } impl VhostUserVsockBackend { @@ -286,6 +302,7 @@ impl VhostUserVsockBackend { queues_per_thread, exit_consumer, exit_notifier, + transport_reset_pending: Arc::new(Mutex::new(false)), }) } } @@ -310,7 +327,9 @@ impl VhostUserBackend for VhostUserVsockBackend { } fn protocol_features(&self) -> VhostUserProtocolFeatures { - VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::CONFIG + VhostUserProtocolFeatures::MQ + | VhostUserProtocolFeatures::CONFIG + | VhostUserProtocolFeatures::DEVICE_STATE } fn set_event_idx(&self, enabled: bool) { @@ -335,6 +354,7 @@ impl VhostUserBackend for VhostUserVsockBackend { ) -> IoResult<()> { let vring_rx = &vrings[0]; let vring_tx = &vrings[1]; + let vring_evt = &vrings[2]; if evset != EventSet::IN { return Err(Error::HandleEventNotEpollIn.into()); @@ -349,7 +369,11 @@ impl VhostUserBackend for VhostUserVsockBackend { thread.process_tx(vring_tx, evt_idx)?; } EVT_QUEUE_EVENT => { - warn!("Received an unexpected EVT_QUEUE_EVENT"); + let reset_pending = &mut *self.transport_reset_pending.lock().unwrap(); + if *reset_pending { + thread.reset_transport(vring_evt, evt_idx)?; + *reset_pending = false; + } } BACKEND_EVENT => { thread.process_backend_evt(evset); @@ -389,6 +413,15 @@ impl VhostUserBackend for VhostUserVsockBackend { return Vec::new(); } + if offset + size == buf.len() { + // The last byte of the config is read when the driver is initializing or after it has + // processed a transport reset event. Either way, no transport reset will be pending + // after this. Activate all threads once it's known a reset event is not pending. + for thread in self.threads.iter() { + thread.lock().unwrap().activate(); + } + } + buf[offset..offset + size].to_vec() } @@ -401,6 +434,23 @@ impl VhostUserBackend for VhostUserVsockBackend { let notifier = self.exit_notifier.try_clone().ok()?; Some((consumer, notifier)) } + + fn set_device_state_fd( + &self, + direction: VhostTransferStateDirection, + _phase: VhostTransferStatePhase, + _file: File, + ) -> std::result::Result, std::io::Error> { + if let VhostTransferStateDirection::LOAD = direction { + *self.transport_reset_pending.lock().unwrap() = true; + } + Ok(None) + } + + fn check_device_state(&self) -> std::result::Result<(), std::io::Error> { + // We had nothing to read/write to the fd, so always return Ok. + Ok(()) + } } #[cfg(test)] @@ -436,17 +486,20 @@ mod tests { let vrings = [ VringRwLock::new(mem.clone(), 0x1000).unwrap(), VringRwLock::new(mem.clone(), 0x2000).unwrap(), + VringRwLock::new(mem.clone(), 0x1000).unwrap(), ]; vrings[0].set_queue_info(0x100, 0x200, 0x300).unwrap(); vrings[0].set_queue_ready(true); vrings[1].set_queue_info(0x1100, 0x1200, 0x1300).unwrap(); vrings[1].set_queue_ready(true); + vrings[2].set_queue_info(0x2100, 0x2200, 0x2300).unwrap(); + vrings[2].set_queue_ready(true); backend.update_memory(mem).unwrap(); let queues_per_thread = backend.queues_per_thread(); assert_eq!(queues_per_thread.len(), 1); - assert_eq!(queues_per_thread[0], 0b11); + assert_eq!(queues_per_thread[0], 0b111); let config = backend.get_config(0, 8); assert_eq!(config.len(), 8); @@ -569,6 +622,7 @@ mod tests { let vrings = [ VringRwLock::new(mem.clone(), 0x1000).unwrap(), VringRwLock::new(mem.clone(), 0x2000).unwrap(), + VringRwLock::new(mem.clone(), 0x1000).unwrap(), ]; backend.update_memory(mem).unwrap(); diff --git a/vhost-device-vsock/src/vhu_vsock_thread.rs b/vhost-device-vsock/src/vhu_vsock_thread.rs index 07997282..6f2e7f69 100644 --- a/vhost-device-vsock/src/vhu_vsock_thread.rs +++ b/vhost-device-vsock/src/vhu_vsock_thread.rs @@ -3,7 +3,7 @@ use std::{ collections::{HashMap, HashSet}, fs::File, - io::{self, BufRead, BufReader}, + io::{self, BufRead, BufReader, Write}, iter::FromIterator, num::Wrapping, ops::Deref, @@ -20,7 +20,7 @@ use std::{ use log::{error, warn}; use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT}; -use virtio_queue::QueueOwnedT; +use virtio_queue::{QueueOwnedT, QueueT}; use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; use vmm_sys_util::{ @@ -35,7 +35,7 @@ use crate::{ thread_backend::*, vhu_vsock::{ BackendType, CidMap, ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, - SIBLING_VM_EVENT, VSOCK_HOST_CID, + SIBLING_VM_EVENT, VSOCK_EVENT_TRANSPORT_RESET, VSOCK_HOST_CID, }, vsock_conn::*, }; @@ -89,6 +89,9 @@ pub(crate) struct VhostUserVsockThread { /// Used to alternate between the RX queues to prevent the starvation of one /// by the other. last_processed: RxQueueType, + /// Backend and sibling vm connections will be dropped until this is set to true, typically + /// when communication with the driver has been established. + active: bool, } impl VhostUserVsockThread { @@ -150,11 +153,7 @@ impl VhostUserVsockThread { cid_map.insert( guest_cid, - ( - thread_backend.raw_pkts_queue.clone(), - groups_set, - sibling_event_fd.try_clone().unwrap(), - ), + (None, groups_set, sibling_event_fd.try_clone().unwrap()), ); } let (sender, receiver) = mpsc::channel::(); @@ -180,6 +179,7 @@ impl VhostUserVsockThread { tx_buffer_size, sibling_event_fd, last_processed: RxQueueType::Standard, + active: false, }; for host_raw_fd in thread.host_listeners_map.keys() { @@ -259,6 +259,19 @@ impl VhostUserVsockThread { Ok(()) } + /// Start processing requests from the host and sibling VMs + pub fn activate(&mut self) { + if self.active { + return; + } + self.active = true; + + // Add the raw_pkts queue to the map to begin accepting packets from sibling VMs + let mut cid_map = self.thread_backend.cid_map.write().unwrap(); + cid_map.get_mut(&self.guest_cid).unwrap().0 = + Some(self.thread_backend.raw_pkts_queue.clone()); + } + /// Return raw file descriptor of the epoll file. fn get_epoll_fd(&self) -> RawFd { self.epoll_file.as_raw_fd() @@ -310,7 +323,7 @@ impl VhostUserVsockThread { match listener { ListenerType::Unix(unix_listener) => { let conn = unix_listener.accept().map_err(Error::UnixAccept); - if self.mem.is_some() { + if self.active && self.mem.is_some() { conn.and_then(|(stream, _)| { stream .set_nonblocking(true) @@ -332,7 +345,7 @@ impl VhostUserVsockThread { #[cfg(feature = "backend_vsock")] ListenerType::Vsock(vsock_listener) => { let conn = vsock_listener.accept().map_err(Error::VsockAccept); - if self.mem.is_some() { + if self.active && self.mem.is_some() { match conn { Ok((stream, addr)) => { if let Err(err) = stream.set_nonblocking(true) { @@ -587,7 +600,6 @@ impl VhostUserVsockThread { }; let mut vring_mut = vring.get_mut(); - let queue = vring_mut.get_queue_mut(); while let Some(mut avail_desc) = queue @@ -696,6 +708,11 @@ impl VhostUserVsockThread { } pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<()> { + if !vring.get_ref().get_queue().ready() { + // A VHOST_USER_GET_VRING_BASE request can cause the vring to not be ready. + return Ok(()); + } + match self.last_processed { RxQueueType::Standard => { if self.thread_backend.pending_raw_pkts() { @@ -748,14 +765,20 @@ impl VhostUserVsockThread { } }; - if self.thread_backend.send_pkt(&pkt).is_err() { - vring - .get_mut() - .get_queue_mut() - .iter(mem) - .unwrap() - .go_to_previous_position(); - break; + if self.active { + if self.thread_backend.send_pkt(&pkt).is_err() { + vring + .get_mut() + .get_queue_mut() + .iter(mem) + .unwrap() + .go_to_previous_position(); + break; + } + } else { + // A tx packet can only arrive before we're active if it was sent before a + // transport reset completed. + warn!("vsock: Dropping packet received before reset completes"); } // TODO: Check if the protocol requires read length to be correct @@ -779,6 +802,11 @@ impl VhostUserVsockThread { /// Wrapper to process tx queue based on whether event idx is enabled or /// not. pub fn process_tx(&mut self, vring_lock: &VringRwLock, event_idx: bool) -> Result<()> { + if !vring_lock.get_ref().get_queue().ready() { + // A VHOST_USER_GET_VRING_BASE request can cause the vring to not be ready. + return Ok(()); + } + if event_idx { // To properly handle EVENT_IDX we need to keep calling // process_rx_queue until it stops finding new requests @@ -796,6 +824,44 @@ impl VhostUserVsockThread { } Ok(()) } + + /// Sends a TRANSPORT_RESET event to the guest driver. Returns true if it was able to send it, + /// false if there were no buffers available in the vring. + pub fn reset_transport(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<()> { + let mut vring_mut = vring.get_mut(); + let queue = vring_mut.get_queue_mut(); + + let atomic_mem = match &self.mem { + Some(m) => m, + None => return Err(Error::NoMemoryConfigured), + }; + + let avail_desc = match queue.pop_descriptor_chain(atomic_mem.memory()) { + Some(d) => d, + None => { + return Ok(()); + } + }; + + let mem = atomic_mem.clone().memory(); + let head_idx = avail_desc.head_index(); + let mut writer = avail_desc + .writer(&mem) + .map_err(|_| Error::EventQueueWrite)?; + writer + .write_all(&VSOCK_EVENT_TRANSPORT_RESET.to_le_bytes()) + .map_err(|_| Error::EventQueueWrite)?; + self.sender + .send(EventData { + vring: vring.clone(), + event_idx, + head_idx, + used_len: std::mem::size_of::(), + }) + .unwrap(); + + Ok(()) + } } impl Drop for VhostUserVsockThread { @@ -955,6 +1021,7 @@ mod tests { cid_map.clone(), ) .unwrap(); + t.activate(); assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err()); assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err()); @@ -964,6 +1031,7 @@ mod tests { ); let vring = VringRwLock::new(mem, 0x1000).unwrap(); + vring.get_mut().get_queue_mut().set_ready(true); // memory is not configured, so processing TX should fail assert!(t.process_tx(&vring, false).is_err()); @@ -1007,6 +1075,7 @@ mod tests { ); let mut t = t.unwrap(); + t.activate(); let mem = GuestMemoryAtomic::new( GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(), @@ -1061,6 +1130,7 @@ mod tests { ); t.mem = Some(mem.clone()); + t.activate(); let mut vs1 = VsockStream::connect_with_cid_port(VMADDR_CID_LOCAL, 9003).unwrap(); let mut vs2 = VsockStream::connect_with_cid_port(VMADDR_CID_LOCAL, 9004).unwrap();