| //! Generic WebSocket message stream. |
| |
| pub mod frame; |
| |
| mod message; |
| |
| pub use self::{frame::CloseFrame, message::Message}; |
| |
| use self::{ |
| frame::{ |
| coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode}, |
| Frame, FrameCodec, |
| }, |
| message::{IncompleteMessage, IncompleteMessageType}, |
| }; |
| use crate::error::{Error, ProtocolError, Result}; |
| use log::*; |
| use std::{ |
| io::{self, Read, Write}, |
| mem::replace, |
| }; |
| |
| /// Indicates a Client or Server role of the websocket |
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| pub enum Role { |
| /// This socket is a server |
| Server, |
| /// This socket is a client |
| Client, |
| } |
| |
| /// The configuration for WebSocket connection. |
| #[derive(Debug, Clone, Copy)] |
| pub struct WebSocketConfig { |
| /// Does nothing, instead use `max_write_buffer_size`. |
| #[deprecated] |
| pub max_send_queue: Option<usize>, |
| /// The target minimum size of the write buffer to reach before writing the data |
| /// to the underlying stream. |
| /// The default value is 128 KiB. |
| /// |
| /// If set to `0` each message will be eagerly written to the underlying stream. |
| /// It is often more optimal to allow them to buffer a little, hence the default value. |
| /// |
| /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless. |
| pub write_buffer_size: usize, |
| /// The max size of the write buffer in bytes. Setting this can provide backpressure |
| /// in the case the write buffer is filling up due to write errors. |
| /// The default value is unlimited. |
| /// |
| /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size) |
| /// when writes to the underlying stream are failing. So the **write buffer can not |
| /// fill up if you are not observing write errors even if not flushing**. |
| /// |
| /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size) |
| /// and probably a little more depending on error handling strategy. |
| pub max_write_buffer_size: usize, |
| /// The maximum size of an incoming message. `None` means no size limit. The default value is 64 MiB |
| /// which should be reasonably big for all normal use-cases but small enough to prevent |
| /// memory eating by a malicious user. |
| pub max_message_size: Option<usize>, |
| /// The maximum size of a single incoming message frame. `None` means no size limit. The limit is for |
| /// frame payload NOT including the frame header. The default value is 16 MiB which should |
| /// be reasonably big for all normal use-cases but small enough to prevent memory eating |
| /// by a malicious user. |
| pub max_frame_size: Option<usize>, |
| /// When set to `true`, the server will accept and handle unmasked frames |
| /// from the client. According to the RFC 6455, the server must close the |
| /// connection to the client in such cases, however it seems like there are |
| /// some popular libraries that are sending unmasked frames, ignoring the RFC. |
| /// By default this option is set to `false`, i.e. according to RFC 6455. |
| pub accept_unmasked_frames: bool, |
| } |
| |
| impl Default for WebSocketConfig { |
| fn default() -> Self { |
| #[allow(deprecated)] |
| WebSocketConfig { |
| max_send_queue: None, |
| write_buffer_size: 128 * 1024, |
| max_write_buffer_size: usize::MAX, |
| max_message_size: Some(64 << 20), |
| max_frame_size: Some(16 << 20), |
| accept_unmasked_frames: false, |
| } |
| } |
| } |
| |
| impl WebSocketConfig { |
| /// Panic if values are invalid. |
| pub(crate) fn assert_valid(&self) { |
| assert!( |
| self.max_write_buffer_size > self.write_buffer_size, |
| "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \ |
| see WebSocketConfig docs`" |
| ); |
| } |
| } |
| |
| /// WebSocket input-output stream. |
| /// |
| /// This is THE structure you want to create to be able to speak the WebSocket protocol. |
| /// It may be created by calling `connect`, `accept` or `client` functions. |
| /// |
| /// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages. |
| #[derive(Debug)] |
| pub struct WebSocket<Stream> { |
| /// The underlying socket. |
| socket: Stream, |
| /// The context for managing a WebSocket. |
| context: WebSocketContext, |
| } |
| |
| impl<Stream> WebSocket<Stream> { |
| /// Convert a raw socket into a WebSocket without performing a handshake. |
| /// |
| /// Call this function if you're using Tungstenite as a part of a web framework |
| /// or together with an existing one. If you need an initial handshake, use |
| /// `connect()` or `accept()` functions of the crate to construct a websocket. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self { |
| WebSocket { socket: stream, context: WebSocketContext::new(role, config) } |
| } |
| |
| /// Convert a raw socket into a WebSocket without performing a handshake. |
| /// |
| /// Call this function if you're using Tungstenite as a part of a web framework |
| /// or together with an existing one. If you need an initial handshake, use |
| /// `connect()` or `accept()` functions of the crate to construct a websocket. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn from_partially_read( |
| stream: Stream, |
| part: Vec<u8>, |
| role: Role, |
| config: Option<WebSocketConfig>, |
| ) -> Self { |
| WebSocket { |
| socket: stream, |
| context: WebSocketContext::from_partially_read(part, role, config), |
| } |
| } |
| |
| /// Returns a shared reference to the inner stream. |
| pub fn get_ref(&self) -> &Stream { |
| &self.socket |
| } |
| /// Returns a mutable reference to the inner stream. |
| pub fn get_mut(&mut self) -> &mut Stream { |
| &mut self.socket |
| } |
| |
| /// Change the configuration. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { |
| self.context.set_config(set_func) |
| } |
| |
| /// Read the configuration. |
| pub fn get_config(&self) -> &WebSocketConfig { |
| self.context.get_config() |
| } |
| |
| /// Check if it is possible to read messages. |
| /// |
| /// Reading is impossible after receiving `Message::Close`. It is still possible after |
| /// sending close frame since the peer still may send some data before confirming close. |
| pub fn can_read(&self) -> bool { |
| self.context.can_read() |
| } |
| |
| /// Check if it is possible to write messages. |
| /// |
| /// Writing gets impossible immediately after sending or receiving `Message::Close`. |
| pub fn can_write(&self) -> bool { |
| self.context.can_write() |
| } |
| } |
| |
| impl<Stream: Read + Write> WebSocket<Stream> { |
| /// Read a message from stream, if possible. |
| /// |
| /// This will also queue responses to ping and close messages. These responses |
| /// will be written and flushed on the next call to [`read`](Self::read), |
| /// [`write`](Self::write) or [`flush`](Self::flush). |
| /// |
| /// # Closing the connection |
| /// When the remote endpoint decides to close the connection this will return |
| /// the close message with an optional close frame. |
| /// |
| /// You should continue calling [`read`](Self::read), [`write`](Self::write) or |
| /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`] |
| /// is returned. Once that happens it is safe to drop the underlying connection. |
| pub fn read(&mut self) -> Result<Message> { |
| self.context.read(&mut self.socket) |
| } |
| |
| /// Writes and immediately flushes a message. |
| /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush). |
| pub fn send(&mut self, message: Message) -> Result<()> { |
| self.write(message)?; |
| self.flush() |
| } |
| |
| /// Write a message to the provided stream, if possible. |
| /// |
| /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. |
| /// |
| /// In the event of stream write failure the message frame will be stored |
| /// in the write buffer and will try again on the next call to [`write`](Self::write) |
| /// or [`flush`](Self::flush). |
| /// |
| /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] |
| /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. |
| /// |
| /// This call will generally not flush. However, if there are queued automatic messages |
| /// they will be written and eagerly flushed. |
| /// |
| /// For example, upon receiving ping messages tungstenite queues pong replies automatically. |
| /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush) |
| /// will write & flush the pong reply. This means you should not respond to ping frames manually. |
| /// |
| /// You can however send pong frames manually in order to indicate a unidirectional heartbeat |
| /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that |
| /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing |
| /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the |
| /// ping will not be sent as it will be replaced by your custom pong message. |
| /// |
| /// # Errors |
| /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned |
| /// along with the equivalent passed message frame. |
| /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`]. |
| /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from |
| /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program |
| /// error on your part. |
| /// - [`Error::Io`] is returned if the underlying connection returns an error |
| /// (consider these fatal except for WouldBlock). |
| /// - [`Error::Capacity`] if your message size is bigger than the configured max message size. |
| pub fn write(&mut self, message: Message) -> Result<()> { |
| self.context.write(&mut self.socket, message) |
| } |
| |
| /// Flush writes. |
| /// |
| /// Ensures all messages previously passed to [`write`](Self::write) and automatic |
| /// queued pong responses are written & flushed into the underlying stream. |
| pub fn flush(&mut self) -> Result<()> { |
| self.context.flush(&mut self.socket) |
| } |
| |
| /// Close the connection. |
| /// |
| /// This function guarantees that the close frame will be queued. |
| /// There is no need to call it again. Calling this function is |
| /// the same as calling `write(Message::Close(..))`. |
| /// |
| /// After queuing the close frame you should continue calling [`read`](Self::read) or |
| /// [`flush`](Self::flush) to drive the close handshake to completion. |
| /// |
| /// The websocket RFC defines that the underlying connection should be closed |
| /// by the server. Tungstenite takes care of this asymmetry for you. |
| /// |
| /// When the close handshake is finished (we have both sent and received |
| /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return |
| /// [Error::ConnectionClosed] if this endpoint is the server. |
| /// |
| /// If this endpoint is a client, [Error::ConnectionClosed] will only be |
| /// returned after the server has closed the underlying connection. |
| /// |
| /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed] |
| /// is returned from [`read`](Self::read) or [`flush`](Self::flush). |
| pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { |
| self.context.close(&mut self.socket, code) |
| } |
| |
| /// Old name for [`read`](Self::read). |
| #[deprecated(note = "Use `read`")] |
| pub fn read_message(&mut self) -> Result<Message> { |
| self.read() |
| } |
| |
| /// Old name for [`send`](Self::send). |
| #[deprecated(note = "Use `send`")] |
| pub fn write_message(&mut self, message: Message) -> Result<()> { |
| self.send(message) |
| } |
| |
| /// Old name for [`flush`](Self::flush). |
| #[deprecated(note = "Use `flush`")] |
| pub fn write_pending(&mut self) -> Result<()> { |
| self.flush() |
| } |
| } |
| |
| /// A context for managing WebSocket stream. |
| #[derive(Debug)] |
| pub struct WebSocketContext { |
| /// Server or client? |
| role: Role, |
| /// encoder/decoder of frame. |
| frame: FrameCodec, |
| /// The state of processing, either "active" or "closing". |
| state: WebSocketState, |
| /// Receive: an incomplete message being processed. |
| incomplete: Option<IncompleteMessage>, |
| /// Send in addition to regular messages E.g. "pong" or "close". |
| additional_send: Option<Frame>, |
| /// True indicates there is an additional message (like a pong) |
| /// that failed to flush previously and we should try again. |
| unflushed_additional: bool, |
| /// The configuration for the websocket session. |
| config: WebSocketConfig, |
| } |
| |
| impl WebSocketContext { |
| /// Create a WebSocket context that manages a post-handshake stream. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self { |
| Self::_new(role, FrameCodec::new(), config.unwrap_or_default()) |
| } |
| |
| /// Create a WebSocket context that manages an post-handshake stream. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self { |
| Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default()) |
| } |
| |
| fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self { |
| config.assert_valid(); |
| frame.set_max_out_buffer_len(config.max_write_buffer_size); |
| frame.set_out_buffer_write_len(config.write_buffer_size); |
| Self { |
| role, |
| frame, |
| state: WebSocketState::Active, |
| incomplete: None, |
| additional_send: None, |
| unflushed_additional: false, |
| config, |
| } |
| } |
| |
| /// Change the configuration. |
| /// |
| /// # Panics |
| /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. |
| pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { |
| set_func(&mut self.config); |
| self.config.assert_valid(); |
| self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); |
| self.frame.set_out_buffer_write_len(self.config.write_buffer_size); |
| } |
| |
| /// Read the configuration. |
| pub fn get_config(&self) -> &WebSocketConfig { |
| &self.config |
| } |
| |
| /// Check if it is possible to read messages. |
| /// |
| /// Reading is impossible after receiving `Message::Close`. It is still possible after |
| /// sending close frame since the peer still may send some data before confirming close. |
| pub fn can_read(&self) -> bool { |
| self.state.can_read() |
| } |
| |
| /// Check if it is possible to write messages. |
| /// |
| /// Writing gets impossible immediately after sending or receiving `Message::Close`. |
| pub fn can_write(&self) -> bool { |
| self.state.is_active() |
| } |
| |
| /// Read a message from the provided stream, if possible. |
| /// |
| /// This function sends pong and close responses automatically. |
| /// However, it never blocks on write. |
| pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message> |
| where |
| Stream: Read + Write, |
| { |
| // Do not read from already closed connections. |
| self.state.check_not_terminated()?; |
| |
| loop { |
| if self.additional_send.is_some() || self.unflushed_additional { |
| // Since we may get ping or close, we need to reply to the messages even during read. |
| match self.flush(stream) { |
| Ok(_) => {} |
| Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => { |
| // If blocked continue reading, but try again later |
| self.unflushed_additional = true; |
| } |
| Err(err) => return Err(err), |
| } |
| } else if self.role == Role::Server && !self.state.can_read() { |
| self.state = WebSocketState::Terminated; |
| return Err(Error::ConnectionClosed); |
| } |
| |
| // If we get here, either write blocks or we have nothing to write. |
| // Thus if read blocks, just let it return WouldBlock. |
| if let Some(message) = self.read_message_frame(stream)? { |
| trace!("Received message {}", message); |
| return Ok(message); |
| } |
| } |
| } |
| |
| /// Write a message to the provided stream. |
| /// |
| /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. |
| /// |
| /// In the event of stream write failure the message frame will be stored |
| /// in the write buffer and will try again on the next call to [`write`](Self::write) |
| /// or [`flush`](Self::flush). |
| /// |
| /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] |
| /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. |
| pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> |
| where |
| Stream: Read + Write, |
| { |
| // When terminated, return AlreadyClosed. |
| self.state.check_not_terminated()?; |
| |
| // Do not write after sending a close frame. |
| if !self.state.is_active() { |
| return Err(Error::Protocol(ProtocolError::SendAfterClosing)); |
| } |
| |
| let frame = match message { |
| Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true), |
| Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true), |
| Message::Ping(data) => Frame::ping(data), |
| Message::Pong(data) => { |
| self.set_additional(Frame::pong(data)); |
| // Note: user pongs can be user flushed so no need to flush here |
| return self._write(stream, None).map(|_| ()); |
| } |
| Message::Close(code) => return self.close(stream, code), |
| Message::Frame(f) => f, |
| }; |
| |
| let should_flush = self._write(stream, Some(frame))?; |
| if should_flush { |
| self.flush(stream)?; |
| } |
| Ok(()) |
| } |
| |
| /// Flush writes. |
| /// |
| /// Ensures all messages previously passed to [`write`](Self::write) and automatically |
| /// queued pong responses are written & flushed into the `stream`. |
| #[inline] |
| pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()> |
| where |
| Stream: Read + Write, |
| { |
| self._write(stream, None)?; |
| self.frame.write_out_buffer(stream)?; |
| stream.flush()?; |
| self.unflushed_additional = false; |
| Ok(()) |
| } |
| |
| /// Writes any data in the out_buffer, `additional_send` and given `data`. |
| /// |
| /// Does **not** flush. |
| /// |
| /// Returns true if the write contents indicate we should flush immediately. |
| fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool> |
| where |
| Stream: Read + Write, |
| { |
| if let Some(data) = data { |
| self.buffer_frame(stream, data)?; |
| } |
| |
| // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in |
| // response, unless it already received a Close frame. It SHOULD |
| // respond with Pong frame as soon as is practical. (RFC 6455) |
| let should_flush = if let Some(msg) = self.additional_send.take() { |
| trace!("Sending pong/close"); |
| match self.buffer_frame(stream, msg) { |
| Err(Error::WriteBufferFull(Message::Frame(msg))) => { |
| // if an system message would exceed the buffer put it back in |
| // `additional_send` for retry. Otherwise returning this error |
| // may not make sense to the user, e.g. calling `flush`. |
| self.set_additional(msg); |
| false |
| } |
| Err(err) => return Err(err), |
| Ok(_) => true, |
| } |
| } else { |
| self.unflushed_additional |
| }; |
| |
| // If we're closing and there is nothing to send anymore, we should close the connection. |
| if self.role == Role::Server && !self.state.can_read() { |
| // The underlying TCP connection, in most normal cases, SHOULD be closed |
| // first by the server, so that it holds the TIME_WAIT state and not the |
| // client (as this would prevent it from re-opening the connection for 2 |
| // maximum segment lifetimes (2MSL), while there is no corresponding |
| // server impact as a TIME_WAIT connection is immediately reopened upon |
| // a new SYN with a higher seq number). (RFC 6455) |
| self.frame.write_out_buffer(stream)?; |
| self.state = WebSocketState::Terminated; |
| Err(Error::ConnectionClosed) |
| } else { |
| Ok(should_flush) |
| } |
| } |
| |
| /// Close the connection. |
| /// |
| /// This function guarantees that the close frame will be queued. |
| /// There is no need to call it again. Calling this function is |
| /// the same as calling `send(Message::Close(..))`. |
| pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> |
| where |
| Stream: Read + Write, |
| { |
| if let WebSocketState::Active = self.state { |
| self.state = WebSocketState::ClosedByUs; |
| let frame = Frame::close(code); |
| self._write(stream, Some(frame))?; |
| } |
| self.flush(stream) |
| } |
| |
| /// Try to decode one message frame. May return None. |
| fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>> |
| where |
| Stream: Read + Write, |
| { |
| if let Some(mut frame) = self |
| .frame |
| .read_frame(stream, self.config.max_frame_size) |
| .check_connection_reset(self.state)? |
| { |
| if !self.state.can_read() { |
| return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing)); |
| } |
| // MUST be 0 unless an extension is negotiated that defines meanings |
| // for non-zero values. If a nonzero value is received and none of |
| // the negotiated extensions defines the meaning of such a nonzero |
| // value, the receiving endpoint MUST _Fail the WebSocket |
| // Connection_. |
| { |
| let hdr = frame.header(); |
| if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 { |
| return Err(Error::Protocol(ProtocolError::NonZeroReservedBits)); |
| } |
| } |
| |
| match self.role { |
| Role::Server => { |
| if frame.is_masked() { |
| // A server MUST remove masking for data frames received from a client |
| // as described in Section 5.3. (RFC 6455) |
| frame.apply_mask() |
| } else if !self.config.accept_unmasked_frames { |
| // The server MUST close the connection upon receiving a |
| // frame that is not masked. (RFC 6455) |
| // The only exception here is if the user explicitly accepts given |
| // stream by setting WebSocketConfig.accept_unmasked_frames to true |
| return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient)); |
| } |
| } |
| Role::Client => { |
| if frame.is_masked() { |
| // A client MUST close a connection if it detects a masked frame. (RFC 6455) |
| return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer)); |
| } |
| } |
| } |
| |
| match frame.header().opcode { |
| OpCode::Control(ctl) => { |
| match ctl { |
| // All control frames MUST have a payload length of 125 bytes or less |
| // and MUST NOT be fragmented. (RFC 6455) |
| _ if !frame.header().is_final => { |
| Err(Error::Protocol(ProtocolError::FragmentedControlFrame)) |
| } |
| _ if frame.payload().len() > 125 => { |
| Err(Error::Protocol(ProtocolError::ControlFrameTooBig)) |
| } |
| OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), |
| OpCtl::Reserved(i) => { |
| Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i))) |
| } |
| OpCtl::Ping => { |
| let data = frame.into_data(); |
| // No ping processing after we sent a close frame. |
| if self.state.is_active() { |
| self.set_additional(Frame::pong(data.clone())); |
| } |
| Ok(Some(Message::Ping(data))) |
| } |
| OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))), |
| } |
| } |
| |
| OpCode::Data(data) => { |
| let fin = frame.header().is_final; |
| match data { |
| OpData::Continue => { |
| if let Some(ref mut msg) = self.incomplete { |
| msg.extend(frame.into_data(), self.config.max_message_size)?; |
| } else { |
| return Err(Error::Protocol( |
| ProtocolError::UnexpectedContinueFrame, |
| )); |
| } |
| if fin { |
| Ok(Some(self.incomplete.take().unwrap().complete()?)) |
| } else { |
| Ok(None) |
| } |
| } |
| c if self.incomplete.is_some() => { |
| Err(Error::Protocol(ProtocolError::ExpectedFragment(c))) |
| } |
| OpData::Text | OpData::Binary => { |
| let msg = { |
| let message_type = match data { |
| OpData::Text => IncompleteMessageType::Text, |
| OpData::Binary => IncompleteMessageType::Binary, |
| _ => panic!("Bug: message is not text nor binary"), |
| }; |
| let mut m = IncompleteMessage::new(message_type); |
| m.extend(frame.into_data(), self.config.max_message_size)?; |
| m |
| }; |
| if fin { |
| Ok(Some(msg.complete()?)) |
| } else { |
| self.incomplete = Some(msg); |
| Ok(None) |
| } |
| } |
| OpData::Reserved(i) => { |
| Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i))) |
| } |
| } |
| } |
| } // match opcode |
| } else { |
| // Connection closed by peer |
| match replace(&mut self.state, WebSocketState::Terminated) { |
| WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { |
| Err(Error::ConnectionClosed) |
| } |
| _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)), |
| } |
| } |
| } |
| |
| /// Received a close frame. Tells if we need to return a close frame to the user. |
| #[allow(clippy::option_option)] |
| fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> { |
| debug!("Received close frame: {:?}", close); |
| match self.state { |
| WebSocketState::Active => { |
| self.state = WebSocketState::ClosedByPeer; |
| |
| let close = close.map(|frame| { |
| if !frame.code.is_allowed() { |
| CloseFrame { |
| code: CloseCode::Protocol, |
| reason: "Protocol violation".into(), |
| } |
| } else { |
| frame |
| } |
| }); |
| |
| let reply = Frame::close(close.clone()); |
| debug!("Replying to close with {:?}", reply); |
| self.set_additional(reply); |
| |
| Some(close) |
| } |
| WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { |
| // It is already closed, just ignore. |
| None |
| } |
| WebSocketState::ClosedByUs => { |
| // We received a reply. |
| self.state = WebSocketState::CloseAcknowledged; |
| Some(close) |
| } |
| WebSocketState::Terminated => unreachable!(), |
| } |
| } |
| |
| /// Write a single frame into the write-buffer. |
| fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> |
| where |
| Stream: Read + Write, |
| { |
| match self.role { |
| Role::Server => {} |
| Role::Client => { |
| // 5. If the data is being sent by the client, the frame(s) MUST be |
| // masked as defined in Section 5.3. (RFC 6455) |
| frame.set_random_mask(); |
| } |
| } |
| |
| trace!("Sending frame: {:?}", frame); |
| self.frame.buffer_frame(stream, frame).check_connection_reset(self.state) |
| } |
| |
| /// Replace `additional_send` if it is currently a `Pong` message. |
| fn set_additional(&mut self, add: Frame) { |
| let empty_or_pong = self |
| .additional_send |
| .as_ref() |
| .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong)); |
| if empty_or_pong { |
| self.additional_send.replace(add); |
| } |
| } |
| } |
| |
| /// The current connection state. |
| #[derive(Debug, PartialEq, Eq, Clone, Copy)] |
| enum WebSocketState { |
| /// The connection is active. |
| Active, |
| /// We initiated a close handshake. |
| ClosedByUs, |
| /// The peer initiated a close handshake. |
| ClosedByPeer, |
| /// The peer replied to our close handshake. |
| CloseAcknowledged, |
| /// The connection does not exist anymore. |
| Terminated, |
| } |
| |
| impl WebSocketState { |
| /// Tell if we're allowed to process normal messages. |
| fn is_active(self) -> bool { |
| matches!(self, WebSocketState::Active) |
| } |
| |
| /// Tell if we should process incoming data. Note that if we send a close frame |
| /// but the remote hasn't confirmed, they might have sent data before they receive our |
| /// close frame, so we should still pass those to client code, hence ClosedByUs is valid. |
| fn can_read(self) -> bool { |
| matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs) |
| } |
| |
| /// Check if the state is active, return error if not. |
| fn check_not_terminated(self) -> Result<()> { |
| match self { |
| WebSocketState::Terminated => Err(Error::AlreadyClosed), |
| _ => Ok(()), |
| } |
| } |
| } |
| |
| /// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate. |
| trait CheckConnectionReset { |
| fn check_connection_reset(self, state: WebSocketState) -> Self; |
| } |
| |
| impl<T> CheckConnectionReset for Result<T> { |
| fn check_connection_reset(self, state: WebSocketState) -> Self { |
| match self { |
| Err(Error::Io(io_error)) => Err({ |
| if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset { |
| Error::ConnectionClosed |
| } else { |
| Error::Io(io_error) |
| } |
| }), |
| x => x, |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::{Message, Role, WebSocket, WebSocketConfig}; |
| use crate::error::{CapacityError, Error}; |
| |
| use std::{io, io::Cursor}; |
| |
| struct WriteMoc<Stream>(Stream); |
| |
| impl<Stream> io::Write for WriteMoc<Stream> { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| Ok(buf.len()) |
| } |
| fn flush(&mut self) -> io::Result<()> { |
| Ok(()) |
| } |
| } |
| |
| impl<Stream: io::Read> io::Read for WriteMoc<Stream> { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| self.0.read(buf) |
| } |
| } |
| |
| #[test] |
| fn receive_messages() { |
| let incoming = Cursor::new(vec![ |
| 0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, |
| 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02, |
| 0x03, |
| ]); |
| let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); |
| assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2])); |
| assert_eq!(socket.read().unwrap(), Message::Pong(vec![3])); |
| assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into())); |
| assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); |
| } |
| |
| #[test] |
| fn size_limiting_text_fragmented() { |
| let incoming = Cursor::new(vec![ |
| 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, |
| 0x6c, 0x64, 0x21, |
| ]); |
| let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() }; |
| let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
| |
| assert!(matches!( |
| socket.read(), |
| Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) |
| )); |
| } |
| |
| #[test] |
| fn size_limiting_binary() { |
| let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]); |
| let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() }; |
| let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
| |
| assert!(matches!( |
| socket.read(), |
| Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) |
| )); |
| } |
| } |