Skip to content

Commit d8fce24

Browse files
committed
vsock: Enable live migrations (snapshot-restore)
No device state is stored during the migration. When a buffer appears on the event queue for the first time after the frontend requested a load state operation the backend sends a VIRTIO_VSOCK_EVENT_TRANSPORT_RESET to the driver and waits for the driver to read the config again (the signal that the reset completed) before starting normal operations. Signed-off-by: Jorge E. Moreira <jemoreira@google.com>
1 parent 3aaca09 commit d8fce24

4 files changed

Lines changed: 182 additions & 31 deletions

File tree

vhost-device-vsock/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
### Added
55

6+
- [#936](https://github.com/rust-vmm/vhost-device/pull/936) vsock: Enable live migrations (snapshot-restore)
7+
68
### Changed
79

810
### Fixed

vhost-device-vsock/src/thread_backend.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ pub(crate) struct VsockThreadBackend {
217217
pub raw_pkts_queue: Arc<RwLock<RawPktsQ>>,
218218
/// Set of groups assigned to the device which it is allowed to communicate
219219
/// with.
220-
groups_set: Arc<RwLock<HashSet<String>>>,
220+
pub groups_set: Arc<RwLock<HashSet<String>>>,
221221
}
222222

223223
impl VsockThreadBackend {
@@ -332,7 +332,7 @@ impl VsockThreadBackend {
332332
if dst_cid != VSOCK_HOST_CID {
333333
let cid_map = self.cid_map.read().unwrap();
334334
if cid_map.contains_key(&dst_cid) {
335-
let (sibling_raw_pkts_queue, sibling_groups_set, sibling_event_fd) =
335+
let (sibling_raw_pkts_queue_opt, sibling_groups_set, sibling_event_fd) =
336336
cid_map.get(&dst_cid).unwrap();
337337

338338
if self
@@ -345,11 +345,18 @@ impl VsockThreadBackend {
345345
return Ok(());
346346
}
347347

348-
sibling_raw_pkts_queue
349-
.write()
350-
.unwrap()
351-
.push_back(RawVsockPacket::from_vsock_packet(pkt)?);
352-
let _ = sibling_event_fd.write(1);
348+
match sibling_raw_pkts_queue_opt {
349+
Some(queue) => {
350+
queue
351+
.write()
352+
.unwrap()
353+
.push_back(RawVsockPacket::from_vsock_packet(pkt)?);
354+
let _ = sibling_event_fd.write(1);
355+
}
356+
None => {
357+
info!("vsock: dropping packet for cid: {dst_cid:?} due to inactive device");
358+
}
359+
}
353360
} else {
354361
warn!("vsock: dropping packet for unknown cid: {dst_cid:?}");
355362
}
@@ -525,6 +532,7 @@ mod tests {
525532
#[cfg(feature = "backend_vsock")]
526533
use crate::vhu_vsock::VsockProxyInfo;
527534
use crate::vhu_vsock::{BackendType, VhostUserVsockBackend, VsockConfig, VSOCK_OP_RW};
535+
use vhost_user_backend::VhostUserBackend;
528536

529537
const DATA_LEN: usize = 16;
530538
const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
@@ -698,11 +706,28 @@ mod tests {
698706
// SAFETY: Safe as hdr_raw and data_raw are guaranteed to be valid.
699707
let mut packet = unsafe { VsockPacket::new(hdr_raw, Some(data_raw)).unwrap() };
700708

709+
packet.set_type(VSOCK_TYPE_STREAM);
710+
packet.set_src_cid(CID);
711+
packet.set_dst_cid(SIBLING_CID);
712+
packet.set_dst_port(SIBLING_LISTENING_PORT);
713+
packet.set_op(VSOCK_OP_RW);
714+
packet.set_len(DATA_LEN as u32);
715+
packet
716+
.data_slice()
717+
.unwrap()
718+
.copy_from(&[0x01u8, 0x12u8, 0x23u8, 0x34u8]);
719+
720+
// The packet will be dropped silently because the thread won't activate until the config
721+
// is read.
722+
vtp.send_pkt(&packet).unwrap();
701723
assert_eq!(
702724
vtp.recv_raw_pkt(&mut packet).unwrap_err().to_string(),
703725
Error::EmptyRawPktsQueue.to_string()
704726
);
705727

728+
sibling_backend.get_config(0, 8);
729+
sibling2_backend.get_config(0, 8);
730+
706731
packet.set_type(VSOCK_TYPE_STREAM);
707732
packet.set_src_cid(CID);
708733
packet.set_dst_cid(SIBLING_CID);

vhost-device-vsock/src/vhu_vsock.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22

33
use std::{
44
collections::{HashMap, HashSet},
5+
fs::File,
56
io::Result as IoResult,
67
path::PathBuf,
78
sync::{Arc, Mutex, RwLock},
89
};
910

1011
use log::warn;
1112
use thiserror::Error as ThisError;
12-
use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures};
13+
use vhost::vhost_user::message::{
14+
VhostTransferStateDirection, VhostTransferStatePhase, VhostUserProtocolFeatures,
15+
VhostUserVirtioFeatures,
16+
};
1317
use vhost_user_backend::{VhostUserBackend, VringRwLock, VringT};
1418
use virtio_bindings::bindings::{
1519
virtio_config::{VIRTIO_F_NOTIFY_ON_EMPTY, VIRTIO_F_VERSION_1},
@@ -25,8 +29,14 @@ use vmm_sys_util::{
2529

2630
use crate::{thread_backend::RawPktsQ, vhu_vsock_thread::*};
2731

28-
pub(crate) type CidMap =
29-
HashMap<u64, (Arc<RwLock<RawPktsQ>>, Arc<RwLock<HashSet<String>>>, EventFd)>;
32+
pub(crate) type CidMap = HashMap<
33+
u64,
34+
(
35+
Option<Arc<RwLock<RawPktsQ>>>,
36+
Arc<RwLock<HashSet<String>>>,
37+
EventFd,
38+
),
39+
>;
3040

3141
const NUM_QUEUES: usize = 3;
3242

@@ -73,8 +83,11 @@ pub(crate) const VSOCK_FLAGS_SHUTDOWN_RCV: u32 = 1;
7383
/// data
7484
pub(crate) const VSOCK_FLAGS_SHUTDOWN_SEND: u32 = 2;
7585

86+
/// Vsock events - `VSOCK_EVENT_TRANSPORT_RESET`: Communication has been interrupted
87+
pub(crate) const VSOCK_EVENT_TRANSPORT_RESET: u32 = 0;
88+
7689
// Queue mask to select vrings.
77-
const QUEUE_MASK: u64 = 0b11;
90+
const QUEUE_MASK: u64 = 0b111;
7891

7992
pub(crate) type Result<T> = std::result::Result<T, Error>;
8093

@@ -142,6 +155,8 @@ pub(crate) enum Error {
142155
EmptyRawPktsQueue,
143156
#[error("CID already in use by another vsock device")]
144157
CidAlreadyInUse,
158+
#[error("Failed to write to event virtqueue")]
159+
EventQueueWrite,
145160
}
146161

147162
impl std::convert::From<Error> for std::io::Error {
@@ -150,6 +165,11 @@ impl std::convert::From<Error> for std::io::Error {
150165
}
151166
}
152167

168+
enum TransportResetState {
169+
Pending,
170+
Sent,
171+
}
172+
153173
#[cfg(feature = "backend_vsock")]
154174
#[derive(Debug, PartialEq, Clone)]
155175
pub(crate) struct VsockProxyInfo {
@@ -262,6 +282,7 @@ pub(crate) struct VhostUserVsockBackend {
262282
queues_per_thread: Vec<u64>,
263283
exit_consumer: EventConsumer,
264284
exit_notifier: EventNotifier,
285+
transport_reset_state: Arc<Mutex<Option<TransportResetState>>>,
265286
}
266287

267288
impl VhostUserVsockBackend {
@@ -287,6 +308,7 @@ impl VhostUserVsockBackend {
287308
queues_per_thread,
288309
exit_consumer,
289310
exit_notifier,
311+
transport_reset_state: Arc::new(Mutex::new(None)),
290312
})
291313
}
292314
}
@@ -311,7 +333,9 @@ impl VhostUserBackend for VhostUserVsockBackend {
311333
}
312334

313335
fn protocol_features(&self) -> VhostUserProtocolFeatures {
314-
VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::CONFIG
336+
VhostUserProtocolFeatures::MQ
337+
| VhostUserProtocolFeatures::CONFIG
338+
| VhostUserProtocolFeatures::DEVICE_STATE
315339
}
316340

317341
fn set_event_idx(&self, enabled: bool) {
@@ -336,6 +360,7 @@ impl VhostUserBackend for VhostUserVsockBackend {
336360
) -> IoResult<()> {
337361
let vring_rx = &vrings[0];
338362
let vring_tx = &vrings[1];
363+
let vring_evt = &vrings[2];
339364

340365
if evset != EventSet::IN {
341366
return Err(Error::HandleEventNotEpollIn.into());
@@ -350,7 +375,15 @@ impl VhostUserBackend for VhostUserVsockBackend {
350375
thread.process_tx(vring_tx, evt_idx)?;
351376
}
352377
EVT_QUEUE_EVENT => {
353-
warn!("Received an unexpected EVT_QUEUE_EVENT");
378+
if let Some(state) = &mut *self.transport_reset_state.lock().unwrap() {
379+
if vring_evt.get_ref().get_queue().ready() {
380+
if matches!(*state, TransportResetState::Pending)
381+
&& thread.reset_transport(vring_evt, evt_idx)?
382+
{
383+
*state = TransportResetState::Sent;
384+
}
385+
}
386+
}
354387
}
355388
BACKEND_EVENT => {
356389
thread.process_backend_evt(evset);
@@ -398,6 +431,19 @@ impl VhostUserBackend for VhostUserVsockBackend {
398431
return Vec::new();
399432
}
400433

434+
if offset + size == buf.len() {
435+
// The last byte of the config is read when the driver is initializing or after it has
436+
// processed a transport reset event. Either way, no transport reset will be pending
437+
// after this.
438+
*self.transport_reset_state.lock().unwrap() = None;
439+
// Activate all threads once it's known a pending reset event is no longer pending.
440+
for thread in self.threads.iter() {
441+
if let Err(e) = thread.lock().unwrap().activate() {
442+
warn!("Failed to activate thread: {}", e);
443+
}
444+
}
445+
}
446+
401447
buf[offset..offset + size].to_vec()
402448
}
403449

@@ -410,6 +456,22 @@ impl VhostUserBackend for VhostUserVsockBackend {
410456
let notifier = self.exit_notifier.try_clone().ok()?;
411457
Some((consumer, notifier))
412458
}
459+
460+
fn set_device_state_fd(
461+
&self,
462+
direction: VhostTransferStateDirection,
463+
_phase: VhostTransferStatePhase,
464+
_file: File,
465+
) -> std::result::Result<Option<File>, std::io::Error> {
466+
if let VhostTransferStateDirection::LOAD = direction {
467+
*self.transport_reset_state.lock().unwrap() = Some(TransportResetState::Pending);
468+
}
469+
Ok(None)
470+
}
471+
472+
fn check_device_state(&self) -> std::result::Result<(), std::io::Error> {
473+
Ok(())
474+
}
413475
}
414476

415477
#[cfg(test)]
@@ -445,17 +507,20 @@ mod tests {
445507
let vrings = [
446508
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
447509
VringRwLock::new(mem.clone(), 0x2000).unwrap(),
510+
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
448511
];
449512
vrings[0].set_queue_info(0x100, 0x200, 0x300).unwrap();
450513
vrings[0].set_queue_ready(true);
451514
vrings[1].set_queue_info(0x1100, 0x1200, 0x1300).unwrap();
452515
vrings[1].set_queue_ready(true);
516+
vrings[2].set_queue_info(0x2100, 0x2200, 0x2300).unwrap();
517+
vrings[2].set_queue_ready(true);
453518

454519
backend.update_memory(mem).unwrap();
455520

456521
let queues_per_thread = backend.queues_per_thread();
457522
assert_eq!(queues_per_thread.len(), 1);
458-
assert_eq!(queues_per_thread[0], 0b11);
523+
assert_eq!(queues_per_thread[0], 0b111);
459524

460525
let config = backend.get_config(0, 8);
461526
assert_eq!(config.len(), 8);
@@ -578,6 +643,7 @@ mod tests {
578643
let vrings = [
579644
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
580645
VringRwLock::new(mem.clone(), 0x2000).unwrap(),
646+
VringRwLock::new(mem.clone(), 0x1000).unwrap(),
581647
];
582648

583649
backend.update_memory(mem).unwrap();

0 commit comments

Comments
 (0)