| use super::*; |
| |
| use std::task::{Context, Waker}; |
| use std::time::Instant; |
| use std::usize; |
| |
| /// Tracks Stream related state |
| /// |
| /// # Reference counting |
| /// |
| /// There can be a number of outstanding handles to a single Stream. These are |
| /// tracked using reference counting. The `ref_count` field represents the |
| /// number of outstanding userspace handles that can reach this stream. |
| /// |
| /// It's important to note that when the stream is placed in an internal queue |
| /// (such as an accept queue), this is **not** tracked by a reference count. |
| /// Thus, `ref_count` can be zero and the stream still has to be kept around. |
| #[derive(Debug)] |
| pub(super) struct Stream { |
| /// The h2 stream identifier |
| pub id: StreamId, |
| |
| /// Current state of the stream |
| pub state: State, |
| |
| /// Set to `true` when the stream is counted against the connection's max |
| /// concurrent streams. |
| pub is_counted: bool, |
| |
| /// Number of outstanding handles pointing to this stream |
| pub ref_count: usize, |
| |
| // ===== Fields related to sending ===== |
| /// Next node in the accept linked list |
| pub next_pending_send: Option<store::Key>, |
| |
| /// Set to true when the stream is pending accept |
| pub is_pending_send: bool, |
| |
| /// Send data flow control |
| pub send_flow: FlowControl, |
| |
| /// Amount of send capacity that has been requested, but not yet allocated. |
| pub requested_send_capacity: WindowSize, |
| |
| /// Amount of data buffered at the prioritization layer. |
| /// TODO: Technically this could be greater than the window size... |
| pub buffered_send_data: usize, |
| |
| /// Task tracking additional send capacity (i.e. window updates). |
| send_task: Option<Waker>, |
| |
| /// Frames pending for this stream being sent to the socket |
| pub pending_send: buffer::Deque, |
| |
| /// Next node in the linked list of streams waiting for additional |
| /// connection level capacity. |
| pub next_pending_send_capacity: Option<store::Key>, |
| |
| /// True if the stream is waiting for outbound connection capacity |
| pub is_pending_send_capacity: bool, |
| |
| /// Set to true when the send capacity has been incremented |
| pub send_capacity_inc: bool, |
| |
| /// Next node in the open linked list |
| pub next_open: Option<store::Key>, |
| |
| /// Set to true when the stream is pending to be opened |
| pub is_pending_open: bool, |
| |
| /// Set to true when a push is pending for this stream |
| pub is_pending_push: bool, |
| |
| // ===== Fields related to receiving ===== |
| /// Next node in the accept linked list |
| pub next_pending_accept: Option<store::Key>, |
| |
| /// Set to true when the stream is pending accept |
| pub is_pending_accept: bool, |
| |
| /// Receive data flow control |
| pub recv_flow: FlowControl, |
| |
| pub in_flight_recv_data: WindowSize, |
| |
| /// Next node in the linked list of streams waiting to send window updates. |
| pub next_window_update: Option<store::Key>, |
| |
| /// True if the stream is waiting to send a window update |
| pub is_pending_window_update: bool, |
| |
| /// The time when this stream may have been locally reset. |
| pub reset_at: Option<Instant>, |
| |
| /// Next node in list of reset streams that should expire eventually |
| pub next_reset_expire: Option<store::Key>, |
| |
| /// Frames pending for this stream to read |
| pub pending_recv: buffer::Deque, |
| |
| /// When the RecvStream drop occurs, no data should be received. |
| pub is_recv: bool, |
| |
| /// Task tracking receiving frames |
| pub recv_task: Option<Waker>, |
| |
| /// The stream's pending push promises |
| pub pending_push_promises: store::Queue<NextAccept>, |
| |
| /// Validate content-length headers |
| pub content_length: ContentLength, |
| } |
| |
| /// State related to validating a stream's content-length |
| #[derive(Debug)] |
| pub enum ContentLength { |
| Omitted, |
| Head, |
| Remaining(u64), |
| } |
| |
| #[derive(Debug)] |
| pub(super) struct NextAccept; |
| |
| #[derive(Debug)] |
| pub(super) struct NextSend; |
| |
| #[derive(Debug)] |
| pub(super) struct NextSendCapacity; |
| |
| #[derive(Debug)] |
| pub(super) struct NextWindowUpdate; |
| |
| #[derive(Debug)] |
| pub(super) struct NextOpen; |
| |
| #[derive(Debug)] |
| pub(super) struct NextResetExpire; |
| |
| impl Stream { |
| pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { |
| let mut send_flow = FlowControl::new(); |
| let mut recv_flow = FlowControl::new(); |
| |
| recv_flow |
| .inc_window(init_recv_window) |
| .expect("invalid initial receive window"); |
| // TODO: proper error handling? |
| let _res = recv_flow.assign_capacity(init_recv_window); |
| debug_assert!(_res.is_ok()); |
| |
| send_flow |
| .inc_window(init_send_window) |
| .expect("invalid initial send window size"); |
| |
| Stream { |
| id, |
| state: State::default(), |
| ref_count: 0, |
| is_counted: false, |
| |
| // ===== Fields related to sending ===== |
| next_pending_send: None, |
| is_pending_send: false, |
| send_flow, |
| requested_send_capacity: 0, |
| buffered_send_data: 0, |
| send_task: None, |
| pending_send: buffer::Deque::new(), |
| is_pending_send_capacity: false, |
| next_pending_send_capacity: None, |
| send_capacity_inc: false, |
| is_pending_open: false, |
| next_open: None, |
| is_pending_push: false, |
| |
| // ===== Fields related to receiving ===== |
| next_pending_accept: None, |
| is_pending_accept: false, |
| recv_flow, |
| in_flight_recv_data: 0, |
| next_window_update: None, |
| is_pending_window_update: false, |
| reset_at: None, |
| next_reset_expire: None, |
| pending_recv: buffer::Deque::new(), |
| is_recv: true, |
| recv_task: None, |
| pending_push_promises: store::Queue::new(), |
| content_length: ContentLength::Omitted, |
| } |
| } |
| |
| /// Increment the stream's ref count |
| pub fn ref_inc(&mut self) { |
| assert!(self.ref_count < usize::MAX); |
| self.ref_count += 1; |
| } |
| |
| /// Decrements the stream's ref count |
| pub fn ref_dec(&mut self) { |
| assert!(self.ref_count > 0); |
| self.ref_count -= 1; |
| } |
| |
| /// Returns true if stream is currently being held for some time because of |
| /// a local reset. |
| pub fn is_pending_reset_expiration(&self) -> bool { |
| self.reset_at.is_some() |
| } |
| |
| /// Returns true if frames for this stream are ready to be sent over the wire |
| pub fn is_send_ready(&self) -> bool { |
| // Why do we check pending_open? |
| // |
| // We allow users to call send_request() which schedules a stream to be pending_open |
| // if there is no room according to the concurrency limit (max_send_streams), and we |
| // also allow data to be buffered for send with send_data() if there is no capacity for |
| // the stream to send the data, which attempts to place the stream in pending_send. |
| // If the stream is not open, we don't want the stream to be scheduled for |
| // execution (pending_send). Note that if the stream is in pending_open, it will be |
| // pushed to pending_send when there is room for an open stream. |
| // |
| // In pending_push we track whether a PushPromise still needs to be sent |
| // from a different stream before we can start sending frames on this one. |
| // This is different from the "open" check because reserved streams don't count |
| // toward the concurrency limit. |
| // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 |
| !self.is_pending_open && !self.is_pending_push |
| } |
| |
| /// Returns true if the stream is closed |
| pub fn is_closed(&self) -> bool { |
| // The state has fully transitioned to closed. |
| self.state.is_closed() && |
| // Because outbound frames transition the stream state before being |
| // buffered, we have to ensure that all frames have been flushed. |
| self.pending_send.is_empty() && |
| // Sometimes large data frames are sent out in chunks. After a chunk |
| // of the frame is sent, the remainder is pushed back onto the send |
| // queue to be rescheduled. |
| // |
| // Checking for additional buffered data lets us catch this case. |
| self.buffered_send_data == 0 |
| } |
| |
| /// Returns true if the stream is no longer in use |
| pub fn is_released(&self) -> bool { |
| // The stream is closed and fully flushed |
| self.is_closed() && |
| // There are no more outstanding references to the stream |
| self.ref_count == 0 && |
| // The stream is not in any queue |
| !self.is_pending_send && !self.is_pending_send_capacity && |
| !self.is_pending_accept && !self.is_pending_window_update && |
| !self.is_pending_open && self.reset_at.is_none() |
| } |
| |
| /// Returns true when the consumer of the stream has dropped all handles |
| /// (indicating no further interest in the stream) and the stream state is |
| /// not actually closed. |
| /// |
| /// In this case, a reset should be sent. |
| pub fn is_canceled_interest(&self) -> bool { |
| self.ref_count == 0 && !self.state.is_closed() |
| } |
| |
| /// Current available stream send capacity |
| pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { |
| let available = self.send_flow.available().as_size() as usize; |
| let buffered = self.buffered_send_data; |
| |
| available.min(max_buffer_size).saturating_sub(buffered) as WindowSize |
| } |
| |
| pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { |
| let prev_capacity = self.capacity(max_buffer_size); |
| debug_assert!(capacity > 0); |
| // TODO: proper error handling |
| let _res = self.send_flow.assign_capacity(capacity); |
| debug_assert!(_res.is_ok()); |
| |
| tracing::trace!( |
| " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", |
| self.send_flow.available(), |
| self.buffered_send_data, |
| self.id, |
| max_buffer_size, |
| prev_capacity, |
| ); |
| |
| if prev_capacity < self.capacity(max_buffer_size) { |
| self.notify_capacity(); |
| } |
| } |
| |
| pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { |
| let prev_capacity = self.capacity(max_buffer_size); |
| |
| // TODO: proper error handling |
| let _res = self.send_flow.send_data(len); |
| debug_assert!(_res.is_ok()); |
| |
| // Decrement the stream's buffered data counter |
| debug_assert!(self.buffered_send_data >= len as usize); |
| self.buffered_send_data -= len as usize; |
| self.requested_send_capacity -= len; |
| |
| tracing::trace!( |
| " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", |
| self.send_flow.available(), |
| self.buffered_send_data, |
| self.id, |
| max_buffer_size, |
| prev_capacity, |
| ); |
| |
| if prev_capacity < self.capacity(max_buffer_size) { |
| self.notify_capacity(); |
| } |
| } |
| |
| /// If the capacity was limited because of the max_send_buffer_size, |
| /// then consider waking the send task again... |
| pub fn notify_capacity(&mut self) { |
| self.send_capacity_inc = true; |
| tracing::trace!(" notifying task"); |
| self.notify_send(); |
| } |
| |
| /// Returns `Err` when the decrement cannot be completed due to overflow. |
| pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { |
| match self.content_length { |
| ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { |
| Some(val) => *rem = val, |
| None => return Err(()), |
| }, |
| ContentLength::Head => { |
| if len != 0 { |
| return Err(()); |
| } |
| } |
| _ => {} |
| } |
| |
| Ok(()) |
| } |
| |
| pub fn ensure_content_length_zero(&self) -> Result<(), ()> { |
| match self.content_length { |
| ContentLength::Remaining(0) => Ok(()), |
| ContentLength::Remaining(_) => Err(()), |
| _ => Ok(()), |
| } |
| } |
| |
| pub fn notify_send(&mut self) { |
| if let Some(task) = self.send_task.take() { |
| task.wake(); |
| } |
| } |
| |
| pub fn wait_send(&mut self, cx: &Context) { |
| self.send_task = Some(cx.waker().clone()); |
| } |
| |
| pub fn notify_recv(&mut self) { |
| if let Some(task) = self.recv_task.take() { |
| task.wake(); |
| } |
| } |
| } |
| |
| impl store::Next for NextAccept { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_pending_accept |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_pending_accept = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_pending_accept.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.is_pending_accept |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| stream.is_pending_accept = val; |
| } |
| } |
| |
| impl store::Next for NextSend { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_pending_send |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_pending_send = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_pending_send.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.is_pending_send |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| if val { |
| // ensure that stream is not queued for being opened |
| // if it's being put into queue for sending data |
| debug_assert!(!stream.is_pending_open); |
| } |
| stream.is_pending_send = val; |
| } |
| } |
| |
| impl store::Next for NextSendCapacity { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_pending_send_capacity |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_pending_send_capacity = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_pending_send_capacity.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.is_pending_send_capacity |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| stream.is_pending_send_capacity = val; |
| } |
| } |
| |
| impl store::Next for NextWindowUpdate { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_window_update |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_window_update = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_window_update.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.is_pending_window_update |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| stream.is_pending_window_update = val; |
| } |
| } |
| |
| impl store::Next for NextOpen { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_open |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_open = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_open.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.is_pending_open |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| if val { |
| // ensure that stream is not queued for being sent |
| // if it's being put into queue for opening the stream |
| debug_assert!(!stream.is_pending_send); |
| } |
| stream.is_pending_open = val; |
| } |
| } |
| |
| impl store::Next for NextResetExpire { |
| fn next(stream: &Stream) -> Option<store::Key> { |
| stream.next_reset_expire |
| } |
| |
| fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| stream.next_reset_expire = key; |
| } |
| |
| fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| stream.next_reset_expire.take() |
| } |
| |
| fn is_queued(stream: &Stream) -> bool { |
| stream.reset_at.is_some() |
| } |
| |
| fn set_queued(stream: &mut Stream, val: bool) { |
| if val { |
| stream.reset_at = Some(Instant::now()); |
| } else { |
| stream.reset_at = None; |
| } |
| } |
| } |
| |
| // ===== impl ContentLength ===== |
| |
| impl ContentLength { |
| pub fn is_head(&self) -> bool { |
| matches!(*self, Self::Head) |
| } |
| } |