blob: 850ad0c78d846fd80a1ec6883235ce7a2168ad07 [file] [log] [blame]
// SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause
use std::{
collections::HashSet,
fs::File,
io,
io::Read,
iter::FromIterator,
num::Wrapping,
ops::Deref,
os::unix::{
net::{UnixListener, UnixStream},
prelude::{AsRawFd, FromRawFd, RawFd},
},
sync::mpsc::Sender,
sync::{mpsc, Arc, RwLock},
thread,
};
use log::warn;
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
use virtio_queue::QueueOwnedT;
use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE};
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::{
epoll::EventSet,
eventfd::{EventFd, EFD_NONBLOCK},
};
use crate::{
rxops::*,
thread_backend::*,
vhu_vsock::{
CidMap, ConnMapKey, Error, Result, VhostUserVsockBackend, BACKEND_EVENT, SIBLING_VM_EVENT,
VSOCK_HOST_CID,
},
vsock_conn::*,
};
type ArcVhostBknd = Arc<VhostUserVsockBackend>;
enum RxQueueType {
Standard,
RawPkts,
}
// Data which is required by a worker handling event idx.
struct EventData {
vring: VringRwLock,
event_idx: bool,
head_idx: u16,
used_len: usize,
}
pub(crate) struct VhostUserVsockThread {
/// Guest memory map.
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
/// VIRTIO_RING_F_EVENT_IDX.
pub event_idx: bool,
/// Host socket raw file descriptor.
host_sock: RawFd,
/// Host socket path
host_sock_path: String,
/// Listener listening for new connections on the host.
host_listener: UnixListener,
/// epoll fd to which new host connections are added.
epoll_file: File,
/// VsockThreadBackend instance.
pub thread_backend: VsockThreadBackend,
/// CID of the guest.
guest_cid: u64,
/// Channel to a worker which handles event idx.
sender: Sender<EventData>,
/// host side port on which application listens.
local_port: Wrapping<u32>,
/// The tx buffer size
tx_buffer_size: u32,
/// EventFd to notify this thread for custom events. Currently used to notify
/// this thread to process raw vsock packets sent from a sibling VM.
pub sibling_event_fd: EventFd,
/// Keeps track of which RX queue was processed first in the last iteration.
/// Used to alternate between the RX queues to prevent the starvation of one by the other.
last_processed: RxQueueType,
}
impl VhostUserVsockThread {
/// Create a new instance of VhostUserVsockThread.
pub fn new(
uds_path: String,
guest_cid: u64,
tx_buffer_size: u32,
groups: Vec<String>,
cid_map: Arc<RwLock<CidMap>>,
) -> Result<Self> {
// TODO: better error handling, maybe add a param to force the unlink
let _ = std::fs::remove_file(uds_path.clone());
let host_sock = UnixListener::bind(&uds_path)
.and_then(|sock| sock.set_nonblocking(true).map(|_| sock))
.map_err(Error::UnixBind)?;
let epoll_fd = epoll::create(true).map_err(Error::EpollFdCreate)?;
// SAFETY: Safe as the fd is guaranteed to be valid here.
let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
let host_raw_fd = host_sock.as_raw_fd();
let mut groups = groups;
let groups_set: Arc<RwLock<HashSet<String>>> =
Arc::new(RwLock::new(HashSet::from_iter(groups.drain(..))));
let sibling_event_fd = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?;
let thread_backend = VsockThreadBackend::new(
uds_path.clone(),
epoll_fd,
guest_cid,
tx_buffer_size,
groups_set.clone(),
cid_map.clone(),
);
{
let mut cid_map = cid_map.write().unwrap();
if cid_map.contains_key(&guest_cid) {
return Err(Error::CidAlreadyInUse);
}
cid_map.insert(
guest_cid,
(
thread_backend.raw_pkts_queue.clone(),
groups_set,
sibling_event_fd.try_clone().unwrap(),
),
);
}
let (sender, receiver) = mpsc::channel::<EventData>();
thread::spawn(move || loop {
// TODO: Understand why doing the following in the background thread works.
// maybe we'd better have thread pool for the entire application if necessary.
let Ok(event_data) = receiver.recv() else {
break;
};
Self::vring_handle_event(event_data);
});
let thread = VhostUserVsockThread {
mem: None,
event_idx: false,
host_sock: host_sock.as_raw_fd(),
host_sock_path: uds_path,
host_listener: host_sock,
epoll_file,
thread_backend,
guest_cid,
sender,
local_port: Wrapping(0),
tx_buffer_size,
sibling_event_fd,
last_processed: RxQueueType::Standard,
};
VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?;
Ok(thread)
}
fn vring_handle_event(event_data: EventData) {
if event_data.event_idx {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
match event_data.vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
event_data.vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
event_data.vring.signal_used_queue().unwrap();
}
}
}
} else {
if event_data
.vring
.add_used(event_data.head_idx, event_data.used_len as u32)
.is_err()
{
warn!("Could not return used descriptors to ring");
}
event_data.vring.signal_used_queue().unwrap();
}
}
/// Register a file with an epoll to listen for events in evset.
pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_ADD,
fd,
epoll::Event::new(evset, fd as u64),
)
.map_err(Error::EpollAdd)?;
Ok(())
}
/// Remove a file from the epoll.
pub fn epoll_unregister(epoll_fd: RawFd, fd: RawFd) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_DEL,
fd,
epoll::Event::new(epoll::Events::empty(), 0),
)
.map_err(Error::EpollRemove)?;
Ok(())
}
/// Modify the events we listen to for the fd in the epoll.
pub fn epoll_modify(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
epoll::ctl(
epoll_fd,
epoll::ControlOptions::EPOLL_CTL_MOD,
fd,
epoll::Event::new(evset, fd as u64),
)
.map_err(Error::EpollModify)?;
Ok(())
}
/// Return raw file descriptor of the epoll file.
fn get_epoll_fd(&self) -> RawFd {
self.epoll_file.as_raw_fd()
}
/// Register our listeners in the VringEpollHandler
pub fn register_listeners(
&mut self,
epoll_handler: Arc<VringEpollHandler<ArcVhostBknd, VringRwLock, ()>>,
) {
epoll_handler
.register_listener(self.get_epoll_fd(), EventSet::IN, u64::from(BACKEND_EVENT))
.unwrap();
epoll_handler
.register_listener(
self.sibling_event_fd.as_raw_fd(),
EventSet::IN,
u64::from(SIBLING_VM_EVENT),
)
.unwrap();
}
/// Process a BACKEND_EVENT received by VhostUserVsockBackend.
pub fn process_backend_evt(&mut self, _evset: EventSet) {
let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); 32];
'epoll: loop {
match epoll::wait(self.epoll_file.as_raw_fd(), 0, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for evt in epoll_events.iter().take(ev_cnt) {
self.handle_event(
evt.data as RawFd,
epoll::Events::from_bits(evt.events).unwrap(),
);
}
}
Err(e) => {
if e.kind() == io::ErrorKind::Interrupted {
continue;
}
warn!("failed to consume new epoll event");
}
}
break 'epoll;
}
}
/// Handle a BACKEND_EVENT by either accepting a new connection or
/// forwarding a request to the appropriate connection object.
fn handle_event(&mut self, fd: RawFd, evset: epoll::Events) {
if fd == self.host_sock {
// This is a new connection initiated by an application running on the host
let conn = self.host_listener.accept().map_err(Error::UnixAccept);
if self.mem.is_some() {
conn.and_then(|(stream, _)| {
stream
.set_nonblocking(true)
.map(|_| stream)
.map_err(Error::UnixAccept)
})
.and_then(|stream| self.add_stream_listener(stream))
.unwrap_or_else(|err| {
warn!("Unable to accept new local connection: {:?}", err);
});
} else {
// If we aren't ready to process requests, accept and immediately close
// the connection.
conn.map(drop).unwrap_or_else(|err| {
warn!("Error closing an incoming connection: {:?}", err);
});
}
} else {
// Check if the stream represented by fd has already established a
// connection with the application running in the guest
if let std::collections::hash_map::Entry::Vacant(_) =
self.thread_backend.listener_map.entry(fd)
{
// New connection from the host
if evset.bits() != epoll::Events::EPOLLIN.bits() {
// Has to be EPOLLIN as it was not connected previously
return;
}
let mut unix_stream = match self.thread_backend.stream_map.remove(&fd) {
Some(uds) => uds,
None => {
warn!("Error while searching fd in the stream map");
return;
}
};
// Local peer is sending a "connect PORT\n" command
let peer_port = match Self::read_local_stream_port(&mut unix_stream) {
Ok(port) => port,
Err(err) => {
warn!("Error while parsing \"connect PORT\n\" command: {:?}", err);
return;
}
};
// Allocate a local port number
let local_port = match self.allocate_local_port() {
Ok(lp) => lp,
Err(err) => {
warn!("Error while allocating local port: {:?}", err);
return;
}
};
// Insert the fd into the backend's maps
self.thread_backend
.listener_map
.insert(fd, ConnMapKey::new(local_port, peer_port));
// Create a new connection object an enqueue a connection request
// packet to be sent to the guest
let conn_map_key = ConnMapKey::new(local_port, peer_port);
let mut new_conn = VsockConnection::new_local_init(
unix_stream,
VSOCK_HOST_CID,
local_port,
self.guest_cid,
peer_port,
self.get_epoll_fd(),
self.tx_buffer_size,
);
new_conn.rx_queue.enqueue(RxOps::Request);
new_conn.set_peer_port(peer_port);
// Add connection object into the backend's maps
self.thread_backend.conn_map.insert(conn_map_key, new_conn);
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(local_port, peer_port));
// Re-register the fd to listen for EPOLLIN and EPOLLOUT events
Self::epoll_modify(
self.get_epoll_fd(),
fd,
epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT,
)
.unwrap();
} else {
// Previously connected connection
let key = self.thread_backend.listener_map.get(&fd).unwrap();
let conn = self.thread_backend.conn_map.get_mut(key).unwrap();
if evset.bits() == epoll::Events::EPOLLOUT.bits() {
// Flush any remaining data from the tx buffer
match conn.tx_buf.flush_to(&mut conn.stream) {
Ok(cnt) => {
if cnt > 0 {
conn.fwd_cnt += Wrapping(cnt as u32);
conn.rx_queue.enqueue(RxOps::CreditUpdate);
}
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
}
Err(e) => {
dbg!("Error: {:?}", e);
}
}
return;
}
// Unregister stream from the epoll, register when connection is
// established with the guest
Self::epoll_unregister(self.epoll_file.as_raw_fd(), fd).unwrap();
// Enqueue a read request
conn.rx_queue.enqueue(RxOps::Rw);
self.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(conn.local_port, conn.peer_port));
}
}
}
/// Allocate a new local port number.
fn allocate_local_port(&mut self) -> Result<u32> {
// TODO: Improve space efficiency of this operation
// TODO: Reuse the conn_map HashMap
// TODO: Test this.
let mut alloc_local_port = self.local_port.0;
loop {
if !self
.thread_backend
.local_port_set
.contains(&alloc_local_port)
{
// The port set doesn't contain the newly allocated port number.
self.local_port = Wrapping(alloc_local_port + 1);
self.thread_backend.local_port_set.insert(alloc_local_port);
return Ok(alloc_local_port);
} else {
if alloc_local_port == self.local_port.0 {
// We have exhausted our search and wrapped back to the current port number
return Err(Error::NoFreeLocalPort);
}
alloc_local_port += 1;
}
}
}
/// Read `CONNECT PORT_NUM\n` from the connected stream.
fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
let mut buf = [0u8; 32];
// Minimum number of bytes we should be able to read
// Corresponds to 'CONNECT 0\n'
const MIN_READ_LEN: usize = 10;
// Read in the minimum number of bytes we can read
stream
.read_exact(&mut buf[..MIN_READ_LEN])
.map_err(Error::UnixRead)?;
let mut read_len = MIN_READ_LEN;
while buf[read_len - 1] != b'\n' && read_len < buf.len() {
stream
.read_exact(&mut buf[read_len..read_len + 1])
.map_err(Error::UnixRead)?;
read_len += 1;
}
let mut word_iter = std::str::from_utf8(&buf[..read_len])
.map_err(Error::ConvertFromUtf8)?
.split_whitespace();
word_iter
.next()
.ok_or(Error::InvalidPortRequest)
.and_then(|word| {
if word.to_lowercase() == "connect" {
Ok(())
} else {
Err(Error::InvalidPortRequest)
}
})
.and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
.and_then(|word| word.parse::<u32>().map_err(Error::ParseInteger))
.map_err(|e| Error::ReadStreamPort(Box::new(e)))
}
/// Add a stream to epoll to listen for EPOLLIN events.
fn add_stream_listener(&mut self, stream: UnixStream) -> Result<()> {
let stream_fd = stream.as_raw_fd();
self.thread_backend.stream_map.insert(stream_fd, stream);
VhostUserVsockThread::epoll_register(
self.get_epoll_fd(),
stream_fd,
epoll::Events::EPOLLIN,
)?;
Ok(())
}
/// Iterate over the rx queue and process rx requests.
fn process_rx_queue(
&mut self,
vring: &VringRwLock,
rx_queue_type: RxQueueType,
) -> Result<bool> {
let mut used_any = false;
let atomic_mem = match &self.mem {
Some(m) => m,
None => return Err(Error::NoMemoryConfigured),
};
let mut vring_mut = vring.get_mut();
let queue = vring_mut.get_queue_mut();
while let Some(mut avail_desc) = queue
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let used_len = match VsockPacket::from_rx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(mut pkt) => {
let recv_result = match rx_queue_type {
RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt),
RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt),
};
if recv_result.is_ok() {
PKT_HEADER_SIZE + pkt.len() as usize
} else {
queue.iter(mem).unwrap().go_to_previous_position();
break;
}
}
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};
let vring = vring.clone();
let event_idx = self.event_idx;
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();
match rx_queue_type {
RxQueueType::Standard => {
if !self.thread_backend.pending_rx() {
break;
}
}
RxQueueType::RawPkts => {
if !self.thread_backend.pending_raw_pkts() {
break;
}
}
}
}
Ok(used_any)
}
/// Wrapper to process rx queue based on whether event idx is enabled or not.
fn process_unix_sockets(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
// To properly handle EVENT_IDX we need to keep calling
// process_rx_queue until it stops finding new requests
// on the queue, as vm-virtio's Queue implementation
// only checks avail_index once
loop {
if !self.thread_backend.pending_rx() {
break;
}
vring.disable_notification().unwrap();
self.process_rx_queue(vring, RxQueueType::Standard)?;
if !vring.enable_notification().unwrap() {
break;
}
}
} else {
self.process_rx_queue(vring, RxQueueType::Standard)?;
}
Ok(false)
}
/// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not.
pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
loop {
if !self.thread_backend.pending_raw_pkts() {
break;
}
vring.disable_notification().unwrap();
self.process_rx_queue(vring, RxQueueType::RawPkts)?;
if !vring.enable_notification().unwrap() {
break;
}
}
} else {
self.process_rx_queue(vring, RxQueueType::RawPkts)?;
}
Ok(false)
}
pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result<bool> {
match self.last_processed {
RxQueueType::Standard => {
if self.thread_backend.pending_raw_pkts() {
self.process_raw_pkts(vring, event_idx)?;
self.last_processed = RxQueueType::RawPkts;
}
if self.thread_backend.pending_rx() {
self.process_unix_sockets(vring, event_idx)?;
}
}
RxQueueType::RawPkts => {
if self.thread_backend.pending_rx() {
self.process_unix_sockets(vring, event_idx)?;
self.last_processed = RxQueueType::Standard;
}
if self.thread_backend.pending_raw_pkts() {
self.process_raw_pkts(vring, event_idx)?;
}
}
}
Ok(false)
}
/// Process tx queue and send requests to the backend for processing.
fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result<bool> {
let mut used_any = false;
let atomic_mem = match &self.mem {
Some(m) => m,
None => return Err(Error::NoMemoryConfigured),
};
while let Some(mut avail_desc) = vring
.get_mut()
.get_queue_mut()
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();
let head_idx = avail_desc.head_index();
let pkt = match VsockPacket::from_tx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(pkt) => pkt,
Err(e) => {
dbg!("vsock: error reading TX packet: {:?}", e);
continue;
}
};
if self.thread_backend.send_pkt(&pkt).is_err() {
vring
.get_mut()
.get_queue_mut()
.iter(mem)
.unwrap()
.go_to_previous_position();
break;
}
// TODO: Check if the protocol requires read length to be correct
let used_len = 0;
let vring = vring.clone();
let event_idx = self.event_idx;
self.sender
.send(EventData {
vring,
event_idx,
head_idx,
used_len,
})
.unwrap();
}
Ok(used_any)
}
/// Wrapper to process tx queue based on whether event idx is enabled or not.
pub fn process_tx(&mut self, vring_lock: &VringRwLock, event_idx: bool) -> Result<bool> {
if event_idx {
// To properly handle EVENT_IDX we need to keep calling
// process_rx_queue until it stops finding new requests
// on the queue, as vm-virtio's Queue implementation
// only checks avail_index once
loop {
vring_lock.disable_notification().unwrap();
self.process_tx_queue(vring_lock)?;
if !vring_lock.enable_notification().unwrap() {
break;
}
}
} else {
self.process_tx_queue(vring_lock)?;
}
Ok(false)
}
}
impl Drop for VhostUserVsockThread {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.host_sock_path);
self.thread_backend
.cid_map
.write()
.unwrap()
.remove(&self.guest_cid);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tempfile::tempdir;
use vm_memory::GuestAddress;
use vmm_sys_util::eventfd::EventFd;
const CONN_TX_BUF_SIZE: u32 = 64 * 1024;
impl VhostUserVsockThread {
fn get_epoll_file(&self) -> &File {
&self.epoll_file
}
}
#[test]
fn test_vsock_thread() {
let groups: Vec<String> = vec![String::from("default")];
let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
let test_dir = tempdir().expect("Could not create a temp test directory.");
let t = VhostUserVsockThread::new(
test_dir
.path()
.join("test_vsock_thread.vsock")
.display()
.to_string(),
3,
CONN_TX_BUF_SIZE,
groups,
cid_map,
);
assert!(t.is_ok());
let mut t = t.unwrap();
let epoll_fd = t.get_epoll_file().as_raw_fd();
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
t.mem = Some(mem.clone());
let dummy_fd = EventFd::new(0).unwrap();
assert!(VhostUserVsockThread::epoll_register(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLOUT
)
.is_ok());
assert!(VhostUserVsockThread::epoll_modify(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLIN
)
.is_ok());
assert!(VhostUserVsockThread::epoll_unregister(epoll_fd, dummy_fd.as_raw_fd()).is_ok());
assert!(VhostUserVsockThread::epoll_register(
epoll_fd,
dummy_fd.as_raw_fd(),
epoll::Events::EPOLLIN
)
.is_ok());
let vring = VringRwLock::new(mem, 0x1000).unwrap();
vring.set_queue_info(0x100, 0x200, 0x300).unwrap();
vring.set_queue_ready(true);
assert!(t.process_tx(&vring, false).is_ok());
assert!(t.process_tx(&vring, true).is_ok());
// add backend_rxq to avoid that RX processing is skipped
t.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(0, 0));
assert!(t.process_rx(&vring, false).is_ok());
assert!(t.process_rx(&vring, true).is_ok());
dummy_fd.write(1).unwrap();
t.process_backend_evt(EventSet::empty());
test_dir.close().unwrap();
}
#[test]
fn test_vsock_thread_failures() {
let groups: Vec<String> = vec![String::from("default")];
let cid_map: Arc<RwLock<CidMap>> = Arc::new(RwLock::new(HashMap::new()));
let test_dir = tempdir().expect("Could not create a temp test directory.");
let t = VhostUserVsockThread::new(
"/sys/not_allowed.vsock".to_string(),
3,
CONN_TX_BUF_SIZE,
groups.clone(),
cid_map.clone(),
);
assert!(t.is_err());
let vsock_socket_path = test_dir
.path()
.join("test_vsock_thread_failures.vsock")
.display()
.to_string();
let mut t = VhostUserVsockThread::new(
vsock_socket_path,
3,
CONN_TX_BUF_SIZE,
groups.clone(),
cid_map.clone(),
)
.unwrap();
assert!(VhostUserVsockThread::epoll_register(-1, -1, epoll::Events::EPOLLIN).is_err());
assert!(VhostUserVsockThread::epoll_modify(-1, -1, epoll::Events::EPOLLIN).is_err());
assert!(VhostUserVsockThread::epoll_unregister(-1, -1).is_err());
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x10000)]).unwrap(),
);
let vring = VringRwLock::new(mem, 0x1000).unwrap();
// memory is not configured, so processing TX should fail
assert!(t.process_tx(&vring, false).is_err());
assert!(t.process_tx(&vring, true).is_err());
// add backend_rxq to avoid that RX processing is skipped
t.thread_backend
.backend_rxq
.push_back(ConnMapKey::new(0, 0));
assert!(t.process_rx(&vring, false).is_err());
assert!(t.process_rx(&vring, true).is_err());
// trying to use a CID that is already in use should fail
let vsock_socket_path2 = test_dir
.path()
.join("test_vsock_thread_failures2.vsock")
.display()
.to_string();
let t2 =
VhostUserVsockThread::new(vsock_socket_path2, 3, CONN_TX_BUF_SIZE, groups, cid_map);
assert!(t2.is_err());
test_dir.close().unwrap();
}
}