| #![warn(rust_2018_idioms)] |
| #![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi does not support bind or UDP |
| |
| use futures::future::poll_fn; |
| use std::io; |
| use std::sync::Arc; |
| use tokio::{io::ReadBuf, net::UdpSocket}; |
| use tokio_test::assert_ok; |
| |
| const MSG: &[u8] = b"hello"; |
| const MSG_LEN: usize = MSG.len(); |
| |
| #[tokio::test] |
| async fn send_recv() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| sender.connect(receiver.local_addr()?).await?; |
| receiver.connect(sender.local_addr()?).await?; |
| |
| sender.send(MSG).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let len = receiver.recv(&mut recv_buf[..]).await?; |
| |
| assert_eq!(&recv_buf[..len], MSG); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn send_recv_poll() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| sender.connect(receiver.local_addr()?).await?; |
| receiver.connect(sender.local_addr()?).await?; |
| |
| poll_fn(|cx| sender.poll_send(cx, MSG)).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?; |
| |
| assert_eq!(read.filled(), MSG); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn send_to_recv_from() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| let receiver_addr = receiver.local_addr()?; |
| sender.send_to(MSG, &receiver_addr).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?; |
| |
| assert_eq!(&recv_buf[..len], MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn send_to_recv_from_poll() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| let receiver_addr = receiver.local_addr()?; |
| poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; |
| |
| assert_eq!(read.filled(), MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn send_to_peek_from() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| let receiver_addr = receiver.local_addr()?; |
| poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; |
| |
| // peek |
| let mut recv_buf = [0u8; 32]; |
| let (n, addr) = receiver.peek_from(&mut recv_buf).await?; |
| assert_eq!(&recv_buf[..n], MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| |
| // peek |
| let mut recv_buf = [0u8; 32]; |
| let (n, addr) = receiver.peek_from(&mut recv_buf).await?; |
| assert_eq!(&recv_buf[..n], MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| |
| let mut recv_buf = [0u8; 32]; |
| let (n, addr) = receiver.recv_from(&mut recv_buf).await?; |
| assert_eq!(&recv_buf[..n], MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn send_to_peek_from_poll() -> std::io::Result<()> { |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| let receiver = UdpSocket::bind("127.0.0.1:0").await?; |
| |
| let receiver_addr = receiver.local_addr()?; |
| poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?; |
| |
| assert_eq!(read.filled(), MSG); |
| assert_eq!(addr, sender.local_addr()?); |
| |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?; |
| |
| assert_eq!(read.filled(), MSG); |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| |
| poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; |
| assert_eq!(read.filled(), MSG); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn split() -> std::io::Result<()> { |
| let socket = UdpSocket::bind("127.0.0.1:0").await?; |
| let s = Arc::new(socket); |
| let r = s.clone(); |
| |
| let addr = s.local_addr()?; |
| tokio::spawn(async move { |
| s.send_to(MSG, &addr).await.unwrap(); |
| }); |
| let mut recv_buf = [0u8; 32]; |
| let (len, _) = r.recv_from(&mut recv_buf[..]).await?; |
| assert_eq!(&recv_buf[..len], MSG); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn split_chan() -> std::io::Result<()> { |
| // setup UdpSocket that will echo all sent items |
| let socket = UdpSocket::bind("127.0.0.1:0").await?; |
| let addr = socket.local_addr().unwrap(); |
| let s = Arc::new(socket); |
| let r = s.clone(); |
| |
| let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000); |
| tokio::spawn(async move { |
| while let Some((bytes, addr)) = rx.recv().await { |
| s.send_to(&bytes, &addr).await.unwrap(); |
| } |
| }); |
| |
| tokio::spawn(async move { |
| let mut buf = [0u8; 32]; |
| loop { |
| let (len, addr) = r.recv_from(&mut buf).await.unwrap(); |
| tx.send((buf[..len].to_vec(), addr)).await.unwrap(); |
| } |
| }); |
| |
| // test that we can send a value and get back some response |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| sender.send_to(MSG, addr).await?; |
| let mut recv_buf = [0u8; 32]; |
| let (len, _) = sender.recv_from(&mut recv_buf).await?; |
| assert_eq!(&recv_buf[..len], MSG); |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn split_chan_poll() -> std::io::Result<()> { |
| // setup UdpSocket that will echo all sent items |
| let socket = UdpSocket::bind("127.0.0.1:0").await?; |
| let addr = socket.local_addr().unwrap(); |
| let s = Arc::new(socket); |
| let r = s.clone(); |
| |
| let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000); |
| tokio::spawn(async move { |
| while let Some((bytes, addr)) = rx.recv().await { |
| poll_fn(|cx| s.poll_send_to(cx, &bytes, addr)) |
| .await |
| .unwrap(); |
| } |
| }); |
| |
| tokio::spawn(async move { |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| loop { |
| let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap(); |
| tx.send((read.filled().to_vec(), addr)).await.unwrap(); |
| } |
| }); |
| |
| // test that we can send a value and get back some response |
| let sender = UdpSocket::bind("127.0.0.1:0").await?; |
| poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?; |
| |
| let mut recv_buf = [0u8; 32]; |
| let mut read = ReadBuf::new(&mut recv_buf); |
| let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?; |
| assert_eq!(read.filled(), MSG); |
| Ok(()) |
| } |
| |
| // # Note |
| // |
| // This test is purposely written such that each time `sender` sends data on |
| // the socket, `receiver` awaits the data. On Unix, it would be okay waiting |
| // until the end of the test to receive all the data. On Windows, this would |
| // **not** be okay because it's resources are completion based (via IOCP). |
| // If data is sent and not yet received, attempting to send more data will |
| // result in `ErrorKind::WouldBlock` until the first operation completes. |
| #[tokio::test] |
| async fn try_send_spawn() { |
| const MSG2: &[u8] = b"world!"; |
| const MSG2_LEN: usize = MSG2.len(); |
| |
| let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| |
| receiver |
| .connect(sender.local_addr().unwrap()) |
| .await |
| .unwrap(); |
| |
| sender.writable().await.unwrap(); |
| |
| let sent = &sender |
| .try_send_to(MSG, receiver.local_addr().unwrap()) |
| .unwrap(); |
| assert_eq!(sent, &MSG_LEN); |
| let mut buf = [0u8; 32]; |
| let mut received = receiver.recv(&mut buf[..]).await.unwrap(); |
| |
| sender |
| .connect(receiver.local_addr().unwrap()) |
| .await |
| .unwrap(); |
| let sent = &sender.try_send(MSG2).unwrap(); |
| assert_eq!(sent, &MSG2_LEN); |
| received += receiver.recv(&mut buf[..]).await.unwrap(); |
| |
| std::thread::spawn(move || { |
| let sent = &sender.try_send(MSG).unwrap(); |
| assert_eq!(sent, &MSG_LEN); |
| }) |
| .join() |
| .unwrap(); |
| received += receiver.recv(&mut buf[..]).await.unwrap(); |
| |
| assert_eq!(received, MSG_LEN * 2 + MSG2_LEN); |
| } |
| |
| #[tokio::test] |
| async fn try_send_recv() { |
| // Create listener |
| let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| |
| // Create socket pair |
| let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| |
| // Connect the two |
| client.connect(server.local_addr().unwrap()).await.unwrap(); |
| server.connect(client.local_addr().unwrap()).await.unwrap(); |
| |
| for _ in 0..5 { |
| loop { |
| client.writable().await.unwrap(); |
| |
| match client.try_send(b"hello world") { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| |
| loop { |
| server.readable().await.unwrap(); |
| |
| let mut buf = [0; 512]; |
| |
| match server.try_recv(&mut buf) { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| assert_eq!(&buf[0..11], &b"hello world"[..]); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| } |
| } |
| |
| #[tokio::test] |
| async fn try_send_to_recv_from() { |
| // Create listener |
| let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let saddr = server.local_addr().unwrap(); |
| |
| // Create socket pair |
| let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let caddr = client.local_addr().unwrap(); |
| |
| for _ in 0..5 { |
| loop { |
| client.writable().await.unwrap(); |
| |
| match client.try_send_to(b"hello world", saddr) { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| |
| loop { |
| server.readable().await.unwrap(); |
| |
| let mut buf = [0; 512]; |
| |
| match server.try_recv_from(&mut buf) { |
| Ok((n, addr)) => { |
| assert_eq!(n, 11); |
| assert_eq!(addr, caddr); |
| assert_eq!(&buf[0..11], &b"hello world"[..]); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| } |
| } |
| |
| #[tokio::test] |
| async fn try_recv_buf() { |
| // Create listener |
| let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| |
| // Create socket pair |
| let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| |
| // Connect the two |
| client.connect(server.local_addr().unwrap()).await.unwrap(); |
| server.connect(client.local_addr().unwrap()).await.unwrap(); |
| |
| for _ in 0..5 { |
| loop { |
| client.writable().await.unwrap(); |
| |
| match client.try_send(b"hello world") { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| |
| loop { |
| server.readable().await.unwrap(); |
| |
| let mut buf = Vec::with_capacity(512); |
| |
| match server.try_recv_buf(&mut buf) { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| assert_eq!(&buf[0..11], &b"hello world"[..]); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| } |
| } |
| |
| #[tokio::test] |
| async fn try_recv_buf_from() { |
| // Create listener |
| let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let saddr = server.local_addr().unwrap(); |
| |
| // Create socket pair |
| let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let caddr = client.local_addr().unwrap(); |
| |
| for _ in 0..5 { |
| loop { |
| client.writable().await.unwrap(); |
| |
| match client.try_send_to(b"hello world", saddr) { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| |
| loop { |
| server.readable().await.unwrap(); |
| |
| let mut buf = Vec::with_capacity(512); |
| |
| match server.try_recv_buf_from(&mut buf) { |
| Ok((n, addr)) => { |
| assert_eq!(n, 11); |
| assert_eq!(addr, caddr); |
| assert_eq!(&buf[0..11], &b"hello world"[..]); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| } |
| } |
| |
| #[tokio::test] |
| async fn poll_ready() { |
| // Create listener |
| let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let saddr = server.local_addr().unwrap(); |
| |
| // Create socket pair |
| let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); |
| let caddr = client.local_addr().unwrap(); |
| |
| for _ in 0..5 { |
| loop { |
| assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await); |
| |
| match client.try_send_to(b"hello world", saddr) { |
| Ok(n) => { |
| assert_eq!(n, 11); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| |
| loop { |
| assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await); |
| |
| let mut buf = Vec::with_capacity(512); |
| |
| match server.try_recv_buf_from(&mut buf) { |
| Ok((n, addr)) => { |
| assert_eq!(n, 11); |
| assert_eq!(addr, caddr); |
| assert_eq!(&buf[0..11], &b"hello world"[..]); |
| break; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
| Err(e) => panic!("{:?}", e), |
| } |
| } |
| } |
| } |