blob: d6cfbf96acae8e6203f3950bccc3cb73eca7acf6 [file] [log] [blame]
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//! A simple framework to run a vhost-user backend service.
#[macro_use]
extern crate log;
use std::fmt::{Display, Formatter};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use vhost::vhost_user::{BackendListener, BackendReqHandler, Error as VhostUserError, Listener};
use vm_memory::mmap::NewBitmap;
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use self::handler::VhostUserHandler;
mod backend;
pub use self::backend::{VhostUserBackend, VhostUserBackendMut};
mod event_loop;
pub use self::event_loop::VringEpollHandler;
mod handler;
pub use self::handler::VhostUserHandlerError;
pub mod bitmap;
use crate::bitmap::BitmapReplace;
mod vring;
pub use self::vring::{
VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
};
/// Due to the way `xen` handles memory mappings we can not combine it with
/// `postcopy` feature which relies on persistent memory mappings. Thus we
/// disallow enabling both features at the same time.
#[cfg(all(feature = "postcopy", feature = "xen"))]
compile_error!("Both `postcopy` and `xen` features can not be enabled at the same time.");
/// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code.
type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;
#[derive(Debug)]
/// Errors related to vhost-user daemon.
pub enum Error {
/// Failed to create a new vhost-user handler.
NewVhostUserHandler(VhostUserHandlerError),
/// Failed creating vhost-user backend listener.
CreateBackendListener(VhostUserError),
/// Failed creating vhost-user backend handler.
CreateBackendReqHandler(VhostUserError),
/// Failed creating listener socket
CreateVhostUserListener(VhostUserError),
/// Failed starting daemon thread.
StartDaemon(std::io::Error),
/// Failed waiting for daemon thread.
WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
/// Failed handling a vhost-user request.
HandleRequest(VhostUserError),
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e),
Error::CreateBackendListener(e) => write!(f, "cannot create backend listener: {}", e),
Error::CreateBackendReqHandler(e) => {
write!(f, "cannot create backend req handler: {}", e)
}
Error::CreateVhostUserListener(e) => {
write!(f, "cannot create vhost-user listener: {}", e)
}
Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e),
Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e),
}
}
}
/// Result of vhost-user daemon operations.
pub type Result<T> = std::result::Result<T, Error>;
/// Implement a simple framework to run a vhost-user service daemon.
///
/// This structure is the public API the backend is allowed to interact with in order to run
/// a fully functional vhost-user daemon.
pub struct VhostUserDaemon<T: VhostUserBackend> {
name: String,
handler: Arc<Mutex<VhostUserHandler<T>>>,
main_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl<T> VhostUserDaemon<T>
where
T: VhostUserBackend + Clone + 'static,
T::Bitmap: BitmapReplace + NewBitmap + Clone + Send + Sync,
T::Vring: Clone + Send + Sync,
{
/// Create the daemon instance, providing the backend implementation of `VhostUserBackend`.
///
/// Under the hood, this will start a dedicated thread responsible for listening onto
/// registered event. Those events can be vring events or custom events from the backend,
/// but they get to be registered later during the sequence.
pub fn new(
name: String,
backend: T,
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<T::Bitmap>>,
) -> Result<Self> {
let handler = Arc::new(Mutex::new(
VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
));
Ok(VhostUserDaemon {
name,
handler,
main_thread: None,
})
}
/// Run a dedicated thread handling all requests coming through the socket.
/// This runs in an infinite loop that should be terminating once the other
/// end of the socket (the VMM) hangs up.
///
/// This function is the common code for starting a new daemon, no matter if
/// it acts as a client or a server.
fn start_daemon(
&mut self,
mut handler: BackendReqHandler<Mutex<VhostUserHandler<T>>>,
) -> Result<()> {
let handle = thread::Builder::new()
.name(self.name.clone())
.spawn(move || loop {
handler.handle_request().map_err(Error::HandleRequest)?;
})
.map_err(Error::StartDaemon)?;
self.main_thread = Some(handle);
Ok(())
}
/// Connect to the vhost-user socket and run a dedicated thread handling
/// all requests coming through this socket. This runs in an infinite loop
/// that should be terminating once the other end of the socket (the VMM)
/// hangs up.
pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
let backend_handler = BackendReqHandler::connect(socket_path, self.handler.clone())
.map_err(Error::CreateBackendReqHandler)?;
self.start_daemon(backend_handler)
}
/// Listen to the vhost-user socket and run a dedicated thread handling all requests coming
/// through this socket.
///
/// This runs in an infinite loop that should be terminating once the other end of the socket
/// (the VMM) disconnects.
///
/// *Note:* A convenience function [VhostUserDaemon::serve] exists that
/// may be a better option than this for simple use-cases.
// TODO: the current implementation has limitations that only one incoming connection will be
// handled from the listener. Should it be enhanced to support reconnection?
pub fn start(&mut self, listener: Listener) -> Result<()> {
let mut backend_listener = BackendListener::new(listener, self.handler.clone())
.map_err(Error::CreateBackendListener)?;
let backend_handler = self.accept(&mut backend_listener)?;
self.start_daemon(backend_handler)
}
fn accept(
&self,
backend_listener: &mut BackendListener<Mutex<VhostUserHandler<T>>>,
) -> Result<BackendReqHandler<Mutex<VhostUserHandler<T>>>> {
loop {
match backend_listener.accept() {
Err(e) => return Err(Error::CreateBackendListener(e)),
Ok(Some(v)) => return Ok(v),
Ok(None) => continue,
}
}
}
/// Wait for the thread handling the vhost-user socket connection to terminate.
///
/// *Note:* A convenience function [VhostUserDaemon::serve] exists that
/// may be a better option than this for simple use-cases.
pub fn wait(&mut self) -> Result<()> {
if let Some(handle) = self.main_thread.take() {
match handle.join().map_err(Error::WaitDaemon)? {
Ok(()) => Ok(()),
Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
Err(e) => Err(e),
}
} else {
Ok(())
}
}
/// Bind to socket, handle a single connection and shutdown
///
/// This is a convenience function that provides an easy way to handle the
/// following actions without needing to call the low-level functions:
/// - Create a listener
/// - Start listening
/// - Handle a single event
/// - Send the exit event to all handler threads
///
/// Internal `Err` results that indicate a device disconnect will be treated
/// as success and `Ok(())` will be returned in those cases.
///
/// *Note:* See [VhostUserDaemon::start] and [VhostUserDaemon::wait] if you
/// need more flexibility.
pub fn serve<P: AsRef<Path>>(&mut self, socket: P) -> Result<()> {
let listener = Listener::new(socket, true).map_err(Error::CreateVhostUserListener)?;
self.start(listener)?;
let result = self.wait();
// Regardless of the result, we want to signal worker threads to exit
self.handler.lock().unwrap().send_exit_event();
// For this convenience function we are not treating certain "expected"
// outcomes as error. Disconnects and partial messages can be usual
// behaviour seen from quitting guests.
match &result {
Err(e) => match e {
Error::HandleRequest(VhostUserError::Disconnected) => Ok(()),
Error::HandleRequest(VhostUserError::PartialMessage) => Ok(()),
_ => result,
},
_ => result,
}
}
/// Retrieve the vring epoll handler.
///
/// This is necessary to perform further actions like registering and unregistering some extra
/// event file descriptors.
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<T>>> {
// Do not expect poisoned lock.
self.handler.lock().unwrap().get_epoll_handlers()
}
}
#[cfg(test)]
mod tests {
use super::backend::tests::MockVhostBackend;
use super::*;
use libc::EAGAIN;
use std::os::unix::net::{UnixListener, UnixStream};
use std::sync::Barrier;
use std::time::Duration;
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
#[test]
fn test_new_daemon() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let path = tmpdir.path().join("socket");
thread::scope(|s| {
s.spawn(|| {
barrier.wait();
let socket = UnixStream::connect(&path).unwrap();
barrier.wait();
drop(socket)
});
let listener = Listener::new(&path, false).unwrap();
barrier.wait();
daemon.start(listener).unwrap();
barrier.wait();
// Above process generates a `HandleRequest(PartialMessage)` error.
daemon.wait().unwrap_err();
daemon.wait().unwrap();
});
}
#[test]
fn test_new_daemon_client() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let path = tmpdir.path().join("socket");
thread::scope(|s| {
s.spawn(|| {
let listener = UnixListener::bind(&path).unwrap();
barrier.wait();
let (stream, _) = listener.accept().unwrap();
barrier.wait();
drop(stream)
});
barrier.wait();
daemon
.start_client(path.as_path().to_str().unwrap())
.unwrap();
barrier.wait();
// Above process generates a `HandleRequest(PartialMessage)` error.
daemon.wait().unwrap_err();
daemon.wait().unwrap();
});
}
#[test]
fn test_daemon_serve() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend.clone(), mem).unwrap();
let tmpdir = tempfile::tempdir().unwrap();
let socket_path = tmpdir.path().join("socket");
thread::scope(|s| {
s.spawn(|| {
let _ = daemon.serve(&socket_path);
});
// We have no way to wait for when the server becomes available...
// So we will have to spin!
while !socket_path.exists() {
thread::sleep(Duration::from_millis(10));
}
// Check that no exit events got triggered yet
for thread_id in 0..backend.queues_per_thread().len() {
let fd = backend.exit_event(thread_id).unwrap();
// Reading from exit fd should fail since nothing was written yet
assert_eq!(
fd.read().unwrap_err().raw_os_error().unwrap(),
EAGAIN,
"exit event should not have been raised yet!"
);
}
let socket = UnixStream::connect(&socket_path).unwrap();
// disconnect immediately again
drop(socket);
});
// Check that exit events got triggered
let backend = backend.lock().unwrap();
for thread_id in 0..backend.queues_per_thread().len() {
let fd = backend.exit_event(thread_id).unwrap();
assert!(fd.read().is_ok(), "No exit event was raised!");
}
}
}