| // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause |
| |
| use std::{ |
| io::{ErrorKind, Read, Write}, |
| num::Wrapping, |
| os::unix::prelude::{AsRawFd, RawFd}, |
| }; |
| |
| use log::{error, info}; |
| use virtio_vsock::packet::{VsockPacket, PKT_HEADER_SIZE}; |
| use vm_memory::{bitmap::BitmapSlice, Bytes, VolatileSlice}; |
| |
| use crate::{ |
| rxops::*, |
| rxqueue::*, |
| txbuf::*, |
| vhu_vsock::{ |
| Error, Result, VSOCK_FLAGS_SHUTDOWN_RCV, VSOCK_FLAGS_SHUTDOWN_SEND, |
| VSOCK_OP_CREDIT_REQUEST, VSOCK_OP_CREDIT_UPDATE, VSOCK_OP_REQUEST, VSOCK_OP_RESPONSE, |
| VSOCK_OP_RST, VSOCK_OP_RW, VSOCK_OP_SHUTDOWN, VSOCK_TYPE_STREAM, |
| }, |
| vhu_vsock_thread::VhostUserVsockThread, |
| }; |
| |
| #[derive(Debug)] |
| pub(crate) struct VsockConnection<S> { |
| /// Host-side stream corresponding to this vsock connection. |
| pub stream: S, |
| /// Specifies if the stream is connected to a listener on the host. |
| pub connect: bool, |
| /// Port at which a guest application is listening to. |
| pub peer_port: u32, |
| /// Queue holding pending rx operations per connection. |
| pub rx_queue: RxQueue, |
| /// CID of the host. |
| local_cid: u64, |
| /// Port on the host at which a host-side application listens to. |
| pub local_port: u32, |
| /// CID of the guest. |
| pub guest_cid: u64, |
| /// Total number of bytes written to stream from tx buffer. |
| pub fwd_cnt: Wrapping<u32>, |
| /// Total number of bytes previously forwarded to stream. |
| last_fwd_cnt: Wrapping<u32>, |
| /// Size of buffer the guest has allocated for this connection. |
| peer_buf_alloc: u32, |
| /// Number of bytes the peer has forwarded to a connection. |
| peer_fwd_cnt: Wrapping<u32>, |
| /// The total number of bytes sent to the guest vsock driver. |
| rx_cnt: Wrapping<u32>, |
| /// epoll fd to which this connection's stream has to be added. |
| pub epoll_fd: RawFd, |
| /// Local tx buffer. |
| pub tx_buf: LocalTxBuf, |
| /// Local tx buffer size |
| tx_buffer_size: u32, |
| } |
| |
| impl<S: AsRawFd + Read + Write> VsockConnection<S> { |
| /// Create a new vsock connection object for locally i.e host-side |
| /// inititated connections. |
| pub fn new_local_init( |
| stream: S, |
| local_cid: u64, |
| local_port: u32, |
| guest_cid: u64, |
| guest_port: u32, |
| epoll_fd: RawFd, |
| tx_buffer_size: u32, |
| ) -> Self { |
| Self { |
| stream, |
| connect: false, |
| peer_port: guest_port, |
| rx_queue: RxQueue::new(), |
| local_cid, |
| local_port, |
| guest_cid, |
| fwd_cnt: Wrapping(0), |
| last_fwd_cnt: Wrapping(0), |
| peer_buf_alloc: 0, |
| peer_fwd_cnt: Wrapping(0), |
| rx_cnt: Wrapping(0), |
| epoll_fd, |
| tx_buf: LocalTxBuf::new(tx_buffer_size), |
| tx_buffer_size, |
| } |
| } |
| |
| /// Create a new vsock connection object for connections initiated by |
| /// an application running in the guest. |
| #[allow(clippy::too_many_arguments)] |
| pub fn new_peer_init( |
| stream: S, |
| local_cid: u64, |
| local_port: u32, |
| guest_cid: u64, |
| guest_port: u32, |
| epoll_fd: RawFd, |
| peer_buf_alloc: u32, |
| tx_buffer_size: u32, |
| ) -> Self { |
| let mut rx_queue = RxQueue::new(); |
| rx_queue.enqueue(RxOps::Response); |
| Self { |
| stream, |
| connect: false, |
| peer_port: guest_port, |
| rx_queue, |
| local_cid, |
| local_port, |
| guest_cid, |
| fwd_cnt: Wrapping(0), |
| last_fwd_cnt: Wrapping(0), |
| peer_buf_alloc, |
| peer_fwd_cnt: Wrapping(0), |
| rx_cnt: Wrapping(0), |
| epoll_fd, |
| tx_buf: LocalTxBuf::new(tx_buffer_size), |
| tx_buffer_size, |
| } |
| } |
| |
| /// Set the peer port to the guest side application's port. |
| pub fn set_peer_port(&mut self, peer_port: u32) { |
| self.peer_port = peer_port; |
| } |
| |
| /// Process a vsock packet that is meant for this connection. |
| /// Forward data to the host-side application if the vsock packet |
| /// contains a RW operation. |
| pub fn recv_pkt<B: BitmapSlice>(&mut self, pkt: &mut VsockPacket<B>) -> Result<()> { |
| // Initialize all fields in the packet header |
| self.init_pkt(pkt); |
| |
| match self.rx_queue.dequeue() { |
| Some(RxOps::Request) => { |
| // Send a connection request to the guest-side application |
| pkt.set_op(VSOCK_OP_REQUEST); |
| Ok(()) |
| } |
| Some(RxOps::Rw) => { |
| if !self.connect { |
| // There is no host-side application listening for this |
| // packet, hence send back an RST. |
| pkt.set_op(VSOCK_OP_RST); |
| return Ok(()); |
| } |
| |
| // Check if peer has space for receiving data |
| if self.need_credit_update_from_peer() { |
| self.last_fwd_cnt = self.fwd_cnt; |
| pkt.set_op(VSOCK_OP_CREDIT_REQUEST); |
| return Ok(()); |
| } |
| let buf = pkt.data_slice().ok_or(Error::PktBufMissing)?; |
| |
| // Perform a credit check to find the maximum read size. The read |
| // data must fit inside a packet buffer and be within peer's |
| // available buffer space |
| let max_read_len = std::cmp::min(buf.len(), self.peer_avail_credit()); |
| |
| // Read data from the stream directly into the buffer |
| if let Ok(read_cnt) = buf.read_from(0, &mut self.stream, max_read_len) { |
| if read_cnt == 0 { |
| // If no data was read then the stream was closed down unexpectedly. |
| // Send a shutdown packet to the guest-side application. |
| pkt.set_op(VSOCK_OP_SHUTDOWN) |
| .set_flag(VSOCK_FLAGS_SHUTDOWN_RCV) |
| .set_flag(VSOCK_FLAGS_SHUTDOWN_SEND); |
| } else { |
| // If data was read, then set the length field in the packet header |
| // to the amount of data that was read. |
| pkt.set_op(VSOCK_OP_RW).set_len(read_cnt as u32); |
| |
| // Re-register the stream file descriptor for read and write events |
| if VhostUserVsockThread::epoll_modify( |
| self.epoll_fd, |
| self.stream.as_raw_fd(), |
| epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, |
| ) |
| .is_err() |
| { |
| if let Err(e) = VhostUserVsockThread::epoll_register( |
| self.epoll_fd, |
| self.stream.as_raw_fd(), |
| epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, |
| ) { |
| // TODO: let's move this logic out of this func, and handle it properly |
| error!("epoll_register failed: {:?}, but proceed further.", e); |
| } |
| }; |
| } |
| |
| // Update the rx_cnt with the amount of data in the vsock packet. |
| self.rx_cnt += Wrapping(pkt.len()); |
| self.last_fwd_cnt = self.fwd_cnt; |
| } |
| Ok(()) |
| } |
| Some(RxOps::Response) => { |
| // A response has been received to a newly initiated host-side connection |
| self.connect = true; |
| pkt.set_op(VSOCK_OP_RESPONSE); |
| Ok(()) |
| } |
| Some(RxOps::CreditUpdate) => { |
| // Request credit update from the guest. |
| if !self.rx_queue.pending_rx() { |
| // Waste an rx buffer if no rx is pending |
| pkt.set_op(VSOCK_OP_CREDIT_UPDATE); |
| self.last_fwd_cnt = self.fwd_cnt; |
| } |
| Ok(()) |
| } |
| _ => Err(Error::NoRequestRx), |
| } |
| } |
| |
| /// Deliver a guest generated packet to this connection. |
| /// |
| /// Returns: |
| /// - always `Ok(())` to indicate that the packet has been consumed |
| pub fn send_pkt<B: BitmapSlice>(&mut self, pkt: &VsockPacket<B>) -> Result<()> { |
| // Update peer credit information |
| self.peer_buf_alloc = pkt.buf_alloc(); |
| self.peer_fwd_cnt = Wrapping(pkt.fwd_cnt()); |
| |
| match pkt.op() { |
| VSOCK_OP_RESPONSE => { |
| // Confirmation for a host initiated connection |
| // TODO: Handle stream write error in a better manner |
| let response = format!("OK {}\n", self.peer_port); |
| self.stream.write_all(response.as_bytes()).unwrap(); |
| self.connect = true; |
| } |
| VSOCK_OP_RW => { |
| // Data has to be written to the host-side stream |
| match pkt.data_slice() { |
| None => { |
| info!( |
| "Dropping empty packet from guest (lp={}, pp={})", |
| self.local_port, self.peer_port |
| ); |
| return Ok(()); |
| } |
| Some(buf) => { |
| if let Err(err) = self.send_bytes(buf) { |
| // TODO: Terminate this connection |
| dbg!("err:{:?}", err); |
| return Ok(()); |
| } |
| } |
| } |
| } |
| VSOCK_OP_CREDIT_UPDATE => { |
| // Already updated the credit |
| |
| // Re-register the stream file descriptor for read and write events |
| if VhostUserVsockThread::epoll_modify( |
| self.epoll_fd, |
| self.stream.as_raw_fd(), |
| epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, |
| ) |
| .is_err() |
| { |
| if let Err(e) = VhostUserVsockThread::epoll_register( |
| self.epoll_fd, |
| self.stream.as_raw_fd(), |
| epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, |
| ) { |
| // TODO: let's move this logic out of this func, and handle it properly |
| error!("epoll_register failed: {:?}, but proceed further.", e); |
| } |
| }; |
| } |
| VSOCK_OP_CREDIT_REQUEST => { |
| // Send back this connection's credit information |
| self.rx_queue.enqueue(RxOps::CreditUpdate); |
| } |
| VSOCK_OP_SHUTDOWN => { |
| // Shutdown this connection |
| let recv_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_RCV != 0; |
| let send_off = pkt.flags() & VSOCK_FLAGS_SHUTDOWN_SEND != 0; |
| |
| if recv_off && send_off && self.tx_buf.is_empty() { |
| self.rx_queue.enqueue(RxOps::Reset); |
| } |
| } |
| _ => {} |
| } |
| |
| Ok(()) |
| } |
| |
| /// Write data to the host-side stream. |
| /// |
| /// Returns: |
| /// - Ok(cnt) where cnt is the number of bytes written to the stream |
| /// - Err(Error::UnixWrite) if there was an error writing to the stream |
| fn send_bytes<B: BitmapSlice>(&mut self, buf: &VolatileSlice<B>) -> Result<()> { |
| if !self.tx_buf.is_empty() { |
| // Data is already present in the buffer and the backend |
| // is waiting for a EPOLLOUT event to flush it |
| return self.tx_buf.push(buf); |
| } |
| |
| // Write data to the stream |
| let written_count = match buf.write_to(0, &mut self.stream, buf.len()) { |
| Ok(cnt) => cnt, |
| Err(vm_memory::VolatileMemoryError::IOError(e)) => { |
| if e.kind() == ErrorKind::WouldBlock { |
| 0 |
| } else { |
| dbg!("send_bytes error: {:?}", e); |
| return Err(Error::UnixWrite); |
| } |
| } |
| Err(e) => { |
| dbg!("send_bytes error: {:?}", e); |
| return Err(Error::UnixWrite); |
| } |
| }; |
| |
| if written_count > 0 { |
| // Increment forwarded count by number of bytes written to the stream |
| self.fwd_cnt += Wrapping(written_count as u32); |
| |
| // At what point in available credits should we send a credit update. |
| // This is set to 1/4th of the tx buffer size. If we keep it too low, |
| // we will end up sending too many credit updates. If we keep it too |
| // high, we will end up sending too few credit updates and cause stalls. |
| // Stalls are more bad than too many credit updates. |
| let free_space = self |
| .tx_buffer_size |
| .wrapping_sub((self.fwd_cnt - self.last_fwd_cnt).0); |
| if free_space < self.tx_buffer_size / 4 { |
| self.rx_queue.enqueue(RxOps::CreditUpdate); |
| } |
| } |
| |
| if written_count != buf.len() { |
| // Try to re-enable EPOLLOUT in case it is disabled when txbuf is empty. |
| if VhostUserVsockThread::epoll_modify( |
| self.epoll_fd, |
| self.stream.as_raw_fd(), |
| epoll::Events::EPOLLIN | epoll::Events::EPOLLOUT, |
| ) |
| .is_err() |
| { |
| error!("Failed to re-enable EPOLLOUT"); |
| } |
| return self.tx_buf.push(&buf.offset(written_count).unwrap()); |
| } |
| |
| Ok(()) |
| } |
| |
| /// Initialize all header fields in the vsock packet. |
| fn init_pkt<'a, 'b, B: BitmapSlice>( |
| &self, |
| pkt: &'a mut VsockPacket<'b, B>, |
| ) -> &'a mut VsockPacket<'b, B> { |
| // Zero out the packet header |
| pkt.set_header_from_raw(&[0u8; PKT_HEADER_SIZE]).unwrap(); |
| |
| pkt.set_src_cid(self.local_cid) |
| .set_dst_cid(self.guest_cid) |
| .set_src_port(self.local_port) |
| .set_dst_port(self.peer_port) |
| .set_type(VSOCK_TYPE_STREAM) |
| .set_buf_alloc(self.tx_buffer_size) |
| .set_fwd_cnt(self.fwd_cnt.0) |
| } |
| |
| /// Get max number of bytes we can send to peer without overflowing |
| /// the peer's buffer. |
| fn peer_avail_credit(&self) -> usize { |
| (Wrapping(self.peer_buf_alloc) - (self.rx_cnt - self.peer_fwd_cnt)).0 as usize |
| } |
| |
| /// Check if we need a credit update from the peer before sending |
| /// more data to it. |
| fn need_credit_update_from_peer(&self) -> bool { |
| self.peer_avail_credit() == 0 |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use byteorder::{ByteOrder, LittleEndian}; |
| |
| use super::*; |
| use crate::vhu_vsock::{VSOCK_HOST_CID, VSOCK_OP_RW, VSOCK_TYPE_STREAM}; |
| use std::io::Result as IoResult; |
| use std::ops::Deref; |
| use virtio_bindings::bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE}; |
| use virtio_queue::{mock::MockSplitQueue, Descriptor, DescriptorChain, Queue, QueueOwnedT}; |
| use vm_memory::{ |
| Address, Bytes, GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryLoadGuard, |
| GuestMemoryMmap, |
| }; |
| |
| const CONN_TX_BUF_SIZE: u32 = 64 * 1024; |
| |
| struct HeadParams { |
| head_len: usize, |
| data_len: u32, |
| } |
| |
| impl HeadParams { |
| fn new(head_len: usize, data_len: u32) -> Self { |
| Self { head_len, data_len } |
| } |
| fn construct_head(&self) -> Vec<u8> { |
| let mut header = vec![0_u8; self.head_len]; |
| if self.head_len == PKT_HEADER_SIZE { |
| // Offset into the header for data length |
| const HDROFF_LEN: usize = 24; |
| LittleEndian::write_u32(&mut header[HDROFF_LEN..], self.data_len); |
| } |
| header |
| } |
| } |
| |
| fn prepare_desc_chain_vsock( |
| write_only: bool, |
| head_params: &HeadParams, |
| data_chain_len: u16, |
| head_data_len: u32, |
| ) -> ( |
| GuestMemoryAtomic<GuestMemoryMmap>, |
| DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>, |
| ) { |
| let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); |
| let virt_queue = MockSplitQueue::new(&mem, 16); |
| let mut next_addr = virt_queue.desc_table().total_size() + 0x100; |
| let mut flags = 0; |
| |
| if write_only { |
| flags |= VRING_DESC_F_WRITE; |
| } |
| |
| let mut head_flags = if data_chain_len > 0 { |
| flags | VRING_DESC_F_NEXT |
| } else { |
| flags |
| }; |
| |
| // vsock packet header |
| // let header = vec![0 as u8; head_params.head_len]; |
| let header = head_params.construct_head(); |
| let head_desc = |
| Descriptor::new(next_addr, head_params.head_len as u32, head_flags as u16, 1); |
| mem.write(&header, head_desc.addr()).unwrap(); |
| assert!(virt_queue.desc_table().store(0, head_desc).is_ok()); |
| next_addr += head_params.head_len as u64; |
| |
| // Put the descriptor index 0 in the first available ring position. |
| mem.write_obj(0u16, virt_queue.avail_addr().unchecked_add(4)) |
| .unwrap(); |
| |
| // Set `avail_idx` to 1. |
| mem.write_obj(1u16, virt_queue.avail_addr().unchecked_add(2)) |
| .unwrap(); |
| |
| // chain len excludes the head |
| for i in 0..(data_chain_len) { |
| // last descr in chain |
| if i == data_chain_len - 1 { |
| head_flags &= !VRING_DESC_F_NEXT; |
| } |
| // vsock data |
| let data = vec![0_u8; head_data_len as usize]; |
| let data_desc = Descriptor::new(next_addr, data.len() as u32, head_flags as u16, i + 2); |
| mem.write(&data, data_desc.addr()).unwrap(); |
| assert!(virt_queue.desc_table().store(i + 1, data_desc).is_ok()); |
| next_addr += head_data_len as u64; |
| } |
| |
| // Create descriptor chain from pre-filled memory |
| ( |
| GuestMemoryAtomic::new(mem.clone()), |
| virt_queue |
| .create_queue::<Queue>() |
| .unwrap() |
| .iter(GuestMemoryAtomic::new(mem.clone()).memory()) |
| .unwrap() |
| .next() |
| .unwrap(), |
| ) |
| } |
| |
| struct VsockDummySocket { |
| data: Vec<u8>, |
| } |
| |
| impl VsockDummySocket { |
| fn new() -> Self { |
| Self { data: Vec::new() } |
| } |
| } |
| |
| impl Write for VsockDummySocket { |
| fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> { |
| self.data.clear(); |
| self.data.extend_from_slice(buf); |
| |
| Ok(buf.len()) |
| } |
| fn flush(&mut self) -> IoResult<()> { |
| Ok(()) |
| } |
| } |
| |
| impl Read for VsockDummySocket { |
| fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { |
| buf[..self.data.len()].copy_from_slice(&self.data); |
| Ok(self.data.len()) |
| } |
| } |
| |
| impl AsRawFd for VsockDummySocket { |
| fn as_raw_fd(&self) -> RawFd { |
| -1 |
| } |
| } |
| |
| #[test] |
| fn test_vsock_conn_init() { |
| // new locally inititated connection |
| let dummy_file = VsockDummySocket::new(); |
| let mut conn_local = VsockConnection::new_local_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| assert!(!conn_local.connect); |
| assert_eq!(conn_local.peer_port, 5001); |
| assert_eq!(conn_local.rx_queue, RxQueue::new()); |
| assert_eq!(conn_local.local_cid, VSOCK_HOST_CID); |
| assert_eq!(conn_local.local_port, 5000); |
| assert_eq!(conn_local.guest_cid, 3); |
| |
| // set peer port |
| conn_local.set_peer_port(5002); |
| assert_eq!(conn_local.peer_port, 5002); |
| |
| // New connection initiated by the peer/guest |
| let dummy_file = VsockDummySocket::new(); |
| let mut conn_peer = VsockConnection::new_peer_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| 65536, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| assert!(!conn_peer.connect); |
| assert_eq!(conn_peer.peer_port, 5001); |
| assert_eq!(conn_peer.rx_queue.dequeue().unwrap(), RxOps::Response); |
| assert!(!conn_peer.rx_queue.pending_rx()); |
| assert_eq!(conn_peer.local_cid, VSOCK_HOST_CID); |
| assert_eq!(conn_peer.local_port, 5000); |
| assert_eq!(conn_peer.guest_cid, 3); |
| assert_eq!(conn_peer.peer_buf_alloc, 65536); |
| } |
| |
| #[test] |
| fn test_vsock_conn_credit() { |
| // new locally inititated connection |
| let dummy_file = VsockDummySocket::new(); |
| let mut conn_local = VsockConnection::new_local_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| assert_eq!(conn_local.peer_avail_credit(), 0); |
| assert!(conn_local.need_credit_update_from_peer()); |
| |
| conn_local.peer_buf_alloc = 65536; |
| assert_eq!(conn_local.peer_avail_credit(), 65536); |
| assert!(!conn_local.need_credit_update_from_peer()); |
| |
| conn_local.rx_cnt = Wrapping(32768); |
| assert_eq!(conn_local.peer_avail_credit(), 32768); |
| assert!(!conn_local.need_credit_update_from_peer()); |
| |
| conn_local.rx_cnt = Wrapping(65536); |
| assert_eq!(conn_local.peer_avail_credit(), 0); |
| assert!(conn_local.need_credit_update_from_peer()); |
| } |
| |
| #[test] |
| fn test_vsock_conn_init_pkt() { |
| // parameters for packet head construction |
| let head_params = HeadParams::new(PKT_HEADER_SIZE, 10); |
| |
| // new locally inititated connection |
| let dummy_file = VsockDummySocket::new(); |
| let conn_local = VsockConnection::new_local_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| // write only descriptor chain |
| let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 2, 10); |
| let mem = mem.memory(); |
| let mut pkt = |
| VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) |
| .unwrap(); |
| |
| // initialize a vsock packet for the guest |
| conn_local.init_pkt(&mut pkt); |
| |
| assert_eq!(pkt.src_cid(), VSOCK_HOST_CID); |
| assert_eq!(pkt.dst_cid(), 3); |
| assert_eq!(pkt.src_port(), 5000); |
| assert_eq!(pkt.dst_port(), 5001); |
| assert_eq!(pkt.type_(), VSOCK_TYPE_STREAM); |
| assert_eq!(pkt.buf_alloc(), CONN_TX_BUF_SIZE); |
| assert_eq!(pkt.fwd_cnt(), 0); |
| } |
| |
| #[test] |
| fn test_vsock_conn_recv_pkt() { |
| // parameters for packet head construction |
| let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); |
| |
| // new locally inititated connection |
| let dummy_file = VsockDummySocket::new(); |
| let mut conn_local = VsockConnection::new_local_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| // write only descriptor chain |
| let (mem, mut descr_chain) = prepare_desc_chain_vsock(true, &head_params, 1, 5); |
| let mem = mem.memory(); |
| let mut pkt = |
| VsockPacket::from_rx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) |
| .unwrap(); |
| |
| // VSOCK_OP_REQUEST: new local conn request |
| conn_local.rx_queue.enqueue(RxOps::Request); |
| let op_req = conn_local.recv_pkt(&mut pkt); |
| assert!(op_req.is_ok()); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(pkt.op(), VSOCK_OP_REQUEST); |
| |
| // VSOCK_OP_RST: reset if connection not established |
| conn_local.rx_queue.enqueue(RxOps::Rw); |
| let op_rst = conn_local.recv_pkt(&mut pkt); |
| assert!(op_rst.is_ok()); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(pkt.op(), VSOCK_OP_RST); |
| |
| // VSOCK_OP_CREDIT_UPDATE: need credit update from peer/guest |
| conn_local.connect = true; |
| conn_local.rx_queue.enqueue(RxOps::Rw); |
| conn_local.fwd_cnt = Wrapping(1024); |
| let op_credit_update = conn_local.recv_pkt(&mut pkt); |
| assert!(op_credit_update.is_ok()); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(pkt.op(), VSOCK_OP_CREDIT_REQUEST); |
| assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); |
| |
| // VSOCK_OP_SHUTDOWN: zero data read from stream/file |
| conn_local.peer_buf_alloc = 65536; |
| conn_local.rx_queue.enqueue(RxOps::Rw); |
| let op_zero_read_shutdown = conn_local.recv_pkt(&mut pkt); |
| assert!(op_zero_read_shutdown.is_ok()); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(conn_local.rx_cnt, Wrapping(0)); |
| assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); |
| assert_eq!(pkt.op(), VSOCK_OP_SHUTDOWN); |
| assert_eq!( |
| pkt.flags(), |
| VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND |
| ); |
| |
| // VSOCK_OP_RW: finite data read from stream/file |
| let payload = b"hello"; |
| conn_local.stream.write_all(payload).unwrap(); |
| conn_local.rx_queue.enqueue(RxOps::Rw); |
| let op_zero_read = conn_local.recv_pkt(&mut pkt); |
| assert!(op_zero_read.is_ok()); |
| assert_eq!(pkt.op(), VSOCK_OP_RW); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(conn_local.rx_cnt, Wrapping(payload.len() as u32)); |
| assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); |
| assert_eq!(pkt.len(), 5); |
| let buf = &mut [0u8; 5]; |
| assert!(pkt.data_slice().unwrap().read_slice(buf, 0).is_ok()); |
| assert_eq!(buf, b"hello"); |
| |
| // VSOCK_OP_RESPONSE: response from a locally initiated connection |
| conn_local.rx_queue.enqueue(RxOps::Response); |
| let op_response = conn_local.recv_pkt(&mut pkt); |
| assert!(op_response.is_ok()); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert_eq!(pkt.op(), VSOCK_OP_RESPONSE); |
| assert!(conn_local.connect); |
| |
| // VSOCK_OP_CREDIT_UPDATE: guest needs credit update |
| conn_local.rx_queue.enqueue(RxOps::CreditUpdate); |
| let op_credit_update = conn_local.recv_pkt(&mut pkt); |
| assert!(!conn_local.rx_queue.pending_rx()); |
| assert!(op_credit_update.is_ok()); |
| assert_eq!(pkt.op(), VSOCK_OP_CREDIT_UPDATE); |
| assert_eq!(conn_local.last_fwd_cnt, Wrapping(1024)); |
| |
| // non-existent request |
| let op_error = conn_local.recv_pkt(&mut pkt); |
| assert!(op_error.is_err()); |
| } |
| |
| #[test] |
| fn test_vsock_conn_send_pkt() { |
| // parameters for packet head construction |
| let head_params = HeadParams::new(PKT_HEADER_SIZE, 5); |
| |
| // new locally inititated connection |
| let dummy_file = VsockDummySocket::new(); |
| let mut conn_local = VsockConnection::new_local_init( |
| dummy_file, |
| VSOCK_HOST_CID, |
| 5000, |
| 3, |
| 5001, |
| -1, |
| CONN_TX_BUF_SIZE, |
| ); |
| |
| // write only descriptor chain |
| let (mem, mut descr_chain) = prepare_desc_chain_vsock(false, &head_params, 1, 5); |
| let mem = mem.memory(); |
| let mut pkt = |
| VsockPacket::from_tx_virtq_chain(mem.deref(), &mut descr_chain, CONN_TX_BUF_SIZE) |
| .unwrap(); |
| |
| // peer credit information |
| pkt.set_buf_alloc(65536).set_fwd_cnt(1024); |
| |
| // check if peer credit information is updated currently |
| let credit_check = conn_local.send_pkt(&pkt); |
| assert!(credit_check.is_ok()); |
| assert_eq!(conn_local.peer_buf_alloc, 65536); |
| assert_eq!(conn_local.peer_fwd_cnt, Wrapping(1024)); |
| |
| // VSOCK_OP_RESPONSE |
| pkt.set_op(VSOCK_OP_RESPONSE); |
| let peer_response = conn_local.send_pkt(&pkt); |
| assert!(peer_response.is_ok()); |
| assert!(conn_local.connect); |
| let mut resp_buf = vec![0; 8]; |
| conn_local.stream.read_exact(&mut resp_buf).unwrap(); |
| assert_eq!(resp_buf, b"OK 5001\n"); |
| |
| // VSOCK_OP_RW |
| pkt.set_op(VSOCK_OP_RW); |
| let buf = b"hello"; |
| assert!(pkt.data_slice().unwrap().write_slice(buf, 0).is_ok()); |
| let rw_response = conn_local.send_pkt(&pkt); |
| assert!(rw_response.is_ok()); |
| let mut resp_buf = vec![0; 5]; |
| conn_local.stream.read_exact(&mut resp_buf).unwrap(); |
| assert_eq!(resp_buf, b"hello"); |
| |
| // VSOCK_OP_CREDIT_REQUEST |
| pkt.set_op(VSOCK_OP_CREDIT_REQUEST); |
| let credit_response = conn_local.send_pkt(&pkt); |
| assert!(credit_response.is_ok()); |
| assert_eq!(conn_local.rx_queue.peek().unwrap(), RxOps::CreditUpdate); |
| |
| // VSOCK_OP_SHUTDOWN |
| pkt.set_op(VSOCK_OP_SHUTDOWN); |
| pkt.set_flags(VSOCK_FLAGS_SHUTDOWN_RCV | VSOCK_FLAGS_SHUTDOWN_SEND); |
| let shutdown_response = conn_local.send_pkt(&pkt); |
| assert!(shutdown_response.is_ok()); |
| assert!(conn_local.rx_queue.contains(RxOps::Reset.bitmask())); |
| } |
| } |