Skip to content

Commit ba8a093

Browse files
committed
vsock: Enable live migrations (snapshot-restore)
No device state is stored. When a buffer appears on the event queue for the first time after the frontend requested a load state operation the backend sends a VIRTIO_VSOCK_EVENT_TRANSPORT_RESET to the driver and waits for the driver to read the config again (the signal that the reset completed) before starting normal operations. Signed-off-by: Jorge E. Moreira <jemoreira@google.com>
1 parent 3aaca09 commit ba8a093

4 files changed

Lines changed: 143 additions & 34 deletions

File tree

vhost-device-vsock/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
### Added
55

6+
- [#936](https://github.com/rust-vmm/vhost-device/pull/936) vsock: Enable live migrations (snapshot-restore)
7+
68
### Changed
79

810
### Fixed

vhost-device-vsock/src/thread_backend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ pub(crate) struct VsockThreadBackend {
217217
pub raw_pkts_queue: Arc<RwLock<RawPktsQ>>,
218218
/// Set of groups assigned to the device which it is allowed to communicate
219219
/// with.
220-
groups_set: Arc<RwLock<HashSet<String>>>,
220+
pub groups_set: Arc<RwLock<HashSet<String>>>,
221221
}
222222

223223
impl VsockThreadBackend {

vhost-device-vsock/src/vhu_vsock.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22

33
use std::{
44
collections::{HashMap, HashSet},
5+
fs::File,
56
io::Result as IoResult,
67
path::PathBuf,
78
sync::{Arc, Mutex, RwLock},
89
};
910

1011
use log::warn;
1112
use thiserror::Error as ThisError;
12-
use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
13+
use vhost::vhost_user::message::{
14+
VhostTransferStateDirection, VhostTransferStatePhase, VhostUserProtocolFeatures,
15+
VhostUserVirtioFeatures,
16+
};
1317
use vhost_user_backend::{VhostUserBackend, VringRwLock, VringT};
1418
use virtio_bindings::bindings::{
1519
virtio_config::{VIRTIO_F_NOTIFY_ON_EMPTY, VIRTIO_F_VERSION_1},
@@ -73,8 +77,11 @@ pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1;
7377
/// data
7478
pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2;
7579

80+
/// Vsock events - `VSOCK_EVENT_TRANSPORT_RESET`: Communication has been interrupted
81+
pub(crate) const VSOCK_EVENT_TRANSPORT_RESET: u32 = 0;
82+
7683
// Queue mask to select vrings.
77-
const QUEUE_MASK: u64 = 0b11;
84+
const QUEUE_MASK: u64 = 0b111;
7885

7986
pub(crate) type Result<T> = std::result::Result<T, Error>;
8087

@@ -142,6 +149,10 @@ pub(crate) enum Error {
142149
EmptyRawPktsQueue,
143150
#[error("CID already in use by another vsock device")]
144151
CidAlreadyInUse,
152+
#[error("Event virtqueue is empty")]
153+
EmptyEventQueue,
154+
#[error("Failed to write to event virtqueue")]
155+
EventQueueWrite,
145156
}
146157

147158
impl std::convert::From<Error> for std::io::Error {
@@ -150,6 +161,11 @@ impl std::convert::From<Error> for std::io::Error {
150161
}
151162
}
152163

164+
enum TransportResetState {
165+
Pending,
166+
Sent,
167+
}
168+
153169
#[cfg(feature = "backend_vsock")]
154170
#[derive(Debug, PartialEq, Clone)]
155171
pub(crate) struct VsockProxyInfo {
@@ -262,6 +278,7 @@ pub(crate) struct VhostUserVsockBackend {
262278
queues_per_thread: Vec<u64>,
263279
exit_consumer: EventConsumer,
264280
exit_notifier: EventNotifier,
281+
transport_reset_state: Arc<Mutex<Option<TransportResetState>>>,
265282
}
266283

267284
impl VhostUserVsockBackend {
@@ -287,6 +304,7 @@ impl VhostUserVsockBackend {
287304
queues_per_thread,
288305
exit_consumer,
289306
exit_notifier,
307+
transport_reset_state: Arc::new(Mutex::new(None)),
290308
})
291309
}
292310
}
@@ -311,7 +329,9 @@ impl VhostUserBackend for VhostUserVsockBackend {
311329
}
312330

313331
fn protocol_features(&self) -> VhostUserProtocolFeatures {
314-
VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::CONFIG
332+
VhostUserProtocolFeatures::MQ
333+
| VhostUserProtocolFeatures::CONFIG
334+
| VhostUserProtocolFeatures::DEVICE_STATE
315335
}
316336

317337
fn set_event_idx(&self, enabled: bool) {
@@ -336,6 +356,7 @@ impl VhostUserBackend for VhostUserVsockBackend {
336356
) -> IoResult<()> {
337357
let vring_rx = &vrings[0];
338358
let vring_tx = &vrings[1];
359+
let vring_evt = &vrings[2];
339360

340361
if evset != EventSet::IN {
341362
return Err(Error::HandleEventNotEpollIn.into());
@@ -350,7 +371,12 @@ impl VhostUserBackend for VhostUserVsockBackend {
350371
thread.process_tx(vring_tx, evt_idx)?;
351372
}
352373
EVT_QUEUE_EVENT => {
353-
warn!("Received an unexpected EVT_QUEUE_EVENT");
374+
if let Some(state) = &mut *self.transport_reset_state.lock().unwrap() {
375+
if matches!(*state, TransportResetState::Pending) {
376+
thread.reset_transport(vring_evt, evt_idx)?;
377+
*state = TransportResetState::Sent;
378+
}
379+
}
354380
}
355381
BACKEND_EVENT => {
356382
thread.process_backend_evt(evset);
@@ -398,6 +424,19 @@ impl VhostUserBackend for VhostUserVsockBackend {
398424
return Vec::new();
399425
}
400426

427+
if offset + size == buf.len() {
428+
// The last byte of the config is read when the driver is initializing or after it has
429+
// processed a transport reset event. Either way, no transport reset will be pending
430+
// after this.
431+
*self.transport_reset_state.lock().unwrap() = None;
432+
// Activate all threads once it's known a pending reset event is no longer pending.
433+
for thread in self.threads.iter() {
434+
if let Err(e) = thread.lock().unwrap().activate() {
435+
warn!("Failed to activate thread: {}", e);
436+
}
437+
}
438+
}
439+
401440
buf[offset..offset + size].to_vec()
402441
}
403442

@@ -410,6 +449,22 @@ impl VhostUserBackend for VhostUserVsockBackend {
410449
let notifier = self.exit_notifier.try_clone().ok()?;
411450
Some((consumer, notifier))
412451
}
452+
453+
fn set_device_state_fd(
454+
&self,
455+
direction: VhostTransferStateDirection,
456+
_phase: VhostTransferStatePhase,
457+
_file: File,
458+
) -> std::result::Result<Option<File>, std::io::Error> {
459+
if let VhostTransferStateDirection::LOAD = direction {
460+
*self.transport_reset_state.lock().unwrap() = Some(TransportResetState::Pending);
461+
}
462+
Ok(None)
463+
}
464+
465+
fn check_device_state(&self) -> std::result::Result<(), std::io::Error> {
466+
Ok(())
467+
}
413468
}
414469

415470
#[cfg(test)]
@@ -455,7 +510,7 @@ mod tests {
455510

456511
let queues_per_thread = backend.queues_per_thread();
457512
assert_eq!(queues_per_thread.len(), 1);
458-
assert_eq!(queues_per_thread[0], 0b11);
513+
assert_eq!(queues_per_thread[0], 0b111);
459514

460515
let config = backend.get_config(0, 8);
461516
assert_eq!(config.len(), 8);

vhost-device-vsock/src/vhu_vsock_thread.rs

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::{
44
collections::{HashMap, HashSet},
55
fs::File,
6-
io::{self, BufRead, BufReader},
6+
io::{self, BufRead, BufReader, Write},
77
iter::FromIterator,
88
num::Wrapping,
99
ops::Deref,
@@ -20,7 +20,7 @@ use std::{
2020

2121
use log::{error, warn};
2222
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
23-
use virtio_queue::QueueOwnedT;
23+
use virtio_queue::{QueueOwnedT, QueueT};
2424
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
2525
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
2626
use vmm_sys_util::{
@@ -35,7 +35,7 @@ use crate::{
3535
thread_backend::*,
3636
vhu_vsock::{
3737
BackendType, CidMap, ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT,
38-
SIBLING_VM_EVENT, VSOCK_HOST_CID,
38+
SIBLING_VM_EVENT, VSOCK_EVENT_TRANSPORT_RESET, VSOCK_HOST_CID,
3939
},
4040
vsock_conn::*,
4141
};
@@ -89,6 +89,9 @@ pub(crate) struct VhostUserVsockThread {
8989
/// Used to alternate between the RX queues to prevent the starvation of one
9090
/// by the other.
9191
last_processed: RxQueueType,
92+
/// Backend and sibling vm connections will be dropped until this is set to true, typically
93+
/// when communication with the driver has been established.
94+
active: bool,
9295
}
9396

9497
impl VhostUserVsockThread {
@@ -100,6 +103,9 @@ impl VhostUserVsockThread {
100103
groups: Vec<String>,
101104
cid_map: Arc<RwLock<CidMap>>,
102105
) -> Result<Self> {
106+
if cid_map.read().unwrap().contains_key(&guest_cid) {
107+
return Err(Error::CidAlreadyInUse);
108+
}
103109
let mut host_listeners_map = HashMap::new();
104110
match &backend_info {
105111
BackendType::UnixDomainSocket(uds_path) => {
@@ -138,25 +144,10 @@ impl VhostUserVsockThread {
138144
epoll_fd,
139145
guest_cid,
140146
tx_buffer_size,
141-
groups_set.clone(),
147+
groups_set,
142148
cid_map.clone(),
143149
);
144150

145-
{
146-
let mut cid_map = cid_map.write().unwrap();
147-
if cid_map.contains_key(&guest_cid) {
148-
return Err(Error::CidAlreadyInUse);
149-
}
150-
151-
cid_map.insert(
152-
guest_cid,
153-
(
154-
thread_backend.raw_pkts_queue.clone(),
155-
groups_set,
156-
sibling_event_fd.try_clone().unwrap(),
157-
),
158-
);
159-
}
160151
let (sender, receiver) = mpsc::channel::<EventData>();
161152
thread::spawn(move || loop {
162153
// TODO: Understand why doing the following in the background thread works.
@@ -180,6 +171,7 @@ impl VhostUserVsockThread {
180171
tx_buffer_size,
181172
sibling_event_fd,
182173
last_processed: RxQueueType::Standard,
174+
active: false,
183175
};
184176

185177
for host_raw_fd in thread.host_listeners_map.keys() {
@@ -259,6 +251,27 @@ impl VhostUserVsockThread {
259251
Ok(())
260252
}
261253

254+
/// Start processing requests from the host and sibling VMs
255+
pub fn activate(&mut self) -> Result<()> {
256+
self.active = true;
257+
258+
// Add the guest cid to the map to become accessible to sibling VMs
259+
let mut cid_map = self.thread_backend.cid_map.write().unwrap();
260+
if cid_map.contains_key(&self.guest_cid) {
261+
return Err(Error::CidAlreadyInUse);
262+
}
263+
cid_map.insert(
264+
self.guest_cid,
265+
(
266+
self.thread_backend.raw_pkts_queue.clone(),
267+
self.thread_backend.groups_set.clone(),
268+
self.sibling_event_fd.try_clone().unwrap(),
269+
),
270+
);
271+
272+
Ok(())
273+
}
274+
262275
/// Return raw file descriptor of the epoll file.
263276
fn get_epoll_fd(&self) -> RawFd {
264277
self.epoll_file.as_raw_fd()
@@ -310,7 +323,7 @@ impl VhostUserVsockThread {
310323
match listener {
311324
ListenerType::Unix(unix_listener) => {
312325
let conn = unix_listener.accept().map_err(Error::UnixAccept);
313-
if self.mem.is_some() {
326+
if self.active && self.mem.is_some() {
314327
conn.and_then(|(stream, _)| {
315328
stream
316329
.set_nonblocking(true)
@@ -748,14 +761,20 @@ impl VhostUserVsockThread {
748761
}
749762
};
750763

751-
if self.thread_backend.send_pkt(&pkt).is_err() {
752-
vring
753-
.get_mut()
754-
.get_queue_mut()
755-
.iter(mem)
756-
.unwrap()
757-
.go_to_previous_position();
758-
break;
764+
if self.active {
765+
if self.thread_backend.send_pkt(&pkt).is_err() {
766+
vring
767+
.get_mut()
768+
.get_queue_mut()
769+
.iter(mem)
770+
.unwrap()
771+
.go_to_previous_position();
772+
break;
773+
}
774+
} else {
775+
// A tx packet can only arrive before we're active if it was sent before a
776+
// transport reset completed.
777+
warn!("vsock: Dropping packet received before reset completes");
759778
}
760779

761780
// TODO: Check if the protocol requires read length to be correct
@@ -796,6 +815,39 @@ impl VhostUserVsockThread {
796815
}
797816
Ok(())
798817
}
818+
819+
pub fn reset_transport(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<()> {
820+
let atomic_mem = match &self.mem {
821+
Some(m) => m,
822+
None => return Err(Error::NoMemoryConfigured),
823+
};
824+
825+
let mut vring_mut = vring.get_mut();
826+
let queue = vring_mut.get_queue_mut();
827+
828+
let avail_desc = queue
829+
.pop_descriptor_chain(atomic_mem.memory())
830+
.ok_or(Error::EmptyEventQueue)?;
831+
832+
let mem = atomic_mem.clone().memory();
833+
let head_idx = avail_desc.head_index();
834+
let mut writer = avail_desc
835+
.writer(&mem)
836+
.map_err(|_| Error::EventQueueWrite)?;
837+
writer
838+
.write_all(&VSOCK_EVENT_TRANSPORT_RESET.to_le_bytes())
839+
.map_err(|_| Error::EventQueueWrite)?;
840+
self.sender
841+
.send(EventData {
842+
vring: vring.clone(),
843+
event_idx,
844+
head_idx,
845+
used_len: std::mem::size_of::<u32>(),
846+
})
847+
.unwrap();
848+
849+
Ok(())
850+
}
799851
}
800852

801853
impl Drop for VhostUserVsockThread {

0 commit comments

Comments
 (0)