| //! Client implementation and builder. |
| |
| mod endpoint; |
| #[cfg(feature = "tls")] |
| #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] |
| mod tls; |
| |
| pub use endpoint::Endpoint; |
| #[cfg(feature = "tls")] |
| pub use tls::ClientTlsConfig; |
| |
| use super::service::{Connection, DynamicServiceStream, SharedExec}; |
| use crate::body::BoxBody; |
| use crate::transport::Executor; |
| use bytes::Bytes; |
| use http::{ |
| uri::{InvalidUri, Uri}, |
| Request, Response, |
| }; |
| use hyper::client::connect::Connection as HyperConnection; |
| use std::{ |
| fmt, |
| future::Future, |
| hash::Hash, |
| pin::Pin, |
| task::{ready, Context, Poll}, |
| }; |
| use tokio::{ |
| io::{AsyncRead, AsyncWrite}, |
| sync::mpsc::{channel, Sender}, |
| }; |
| |
| use tower::balance::p2c::Balance; |
| use tower::{ |
| buffer::{self, Buffer}, |
| discover::{Change, Discover}, |
| util::{BoxService, Either}, |
| Service, |
| }; |
| |
| type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>; |
| |
| const DEFAULT_BUFFER_SIZE: usize = 1024; |
| |
| /// A default batteries included `transport` channel. |
| /// |
| /// This provides a fully featured http2 gRPC client based on [`hyper::Client`] |
| /// and `tower` services. |
| /// |
| /// # Multiplexing requests |
| /// |
| /// Sending a request on a channel requires a `&mut self` and thus can only send |
| /// one request in flight. This is intentional and is required to follow the `Service` |
| /// contract from the `tower` library which this channel implementation is built on |
| /// top of. |
| /// |
| /// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply |
| /// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready` |
| /// we know the `Service` is able to accept only one request before we must `poll_ready` |
| /// again. Due to this fact any `async fn` that wants to poll for readiness and submit |
| /// the request must have a `&mut self` reference. |
| /// |
| /// To work around this and to ease the use of the channel, `Channel` provides a |
| /// `Clone` implementation that is _cheap_. This is because at the very top level |
| /// the channel is backed by a `tower_buffer::Buffer` which runs the connection |
| /// in a background task and provides a `mpsc` channel interface. Due to this |
| /// cloning the `Channel` type is cheap and encouraged. |
| #[derive(Clone)] |
| pub struct Channel { |
| svc: Buffer<Svc, Request<BoxBody>>, |
| } |
| |
| /// A future that resolves to an HTTP response. |
| /// |
| /// This is returned by the `Service::call` on [`Channel`]. |
| pub struct ResponseFuture { |
| inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>, |
| } |
| |
| impl Channel { |
| /// Create an [`Endpoint`] builder that can create [`Channel`]s. |
| pub fn builder(uri: Uri) -> Endpoint { |
| Endpoint::from(uri) |
| } |
| |
| /// Create an [`Endpoint`] from a static string. |
| /// |
| /// ``` |
| /// # use tonic::transport::Channel; |
| /// Channel::from_static("https://example.com"); |
| /// ``` |
| pub fn from_static(s: &'static str) -> Endpoint { |
| let uri = Uri::from_static(s); |
| Self::builder(uri) |
| } |
| |
| /// Create an [`Endpoint`] from shared bytes. |
| /// |
| /// ``` |
| /// # use tonic::transport::Channel; |
| /// Channel::from_shared("https://example.com"); |
| /// ``` |
| pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> { |
| let uri = Uri::from_maybe_shared(s.into())?; |
| Ok(Self::builder(uri)) |
| } |
| |
| /// Balance a list of [`Endpoint`]'s. |
| /// |
| /// This creates a [`Channel`] that will load balance across all the |
| /// provided endpoints. |
| pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self { |
| let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); |
| list.for_each(|endpoint| { |
| tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)) |
| .unwrap(); |
| }); |
| |
| channel |
| } |
| |
| /// Balance a list of [`Endpoint`]'s. |
| /// |
| /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. |
| pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>) |
| where |
| K: Hash + Eq + Send + Clone + 'static, |
| { |
| Self::balance_channel_with_executor(capacity, SharedExec::tokio()) |
| } |
| |
| /// Balance a list of [`Endpoint`]'s. |
| /// |
| /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. |
| /// |
| /// The [`Channel`] will use the given executor to spawn async tasks. |
| pub fn balance_channel_with_executor<K, E>( |
| capacity: usize, |
| executor: E, |
| ) -> (Self, Sender<Change<K, Endpoint>>) |
| where |
| K: Hash + Eq + Send + Clone + 'static, |
| E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static, |
| { |
| let (tx, rx) = channel(capacity); |
| let list = DynamicServiceStream::new(rx); |
| (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx) |
| } |
| |
| pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self |
| where |
| C: Service<Uri> + Send + 'static, |
| C::Error: Into<crate::Error> + Send, |
| C::Future: Unpin + Send, |
| C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, |
| { |
| let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); |
| let executor = endpoint.executor.clone(); |
| |
| let svc = Connection::lazy(connector, endpoint); |
| let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); |
| executor.execute(Box::pin(worker)); |
| |
| Channel { svc } |
| } |
| |
| pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error> |
| where |
| C: Service<Uri> + Send + 'static, |
| C::Error: Into<crate::Error> + Send, |
| C::Future: Unpin + Send, |
| C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, |
| { |
| let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); |
| let executor = endpoint.executor.clone(); |
| |
| let svc = Connection::connect(connector, endpoint) |
| .await |
| .map_err(super::Error::from_source)?; |
| let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); |
| executor.execute(Box::pin(worker)); |
| |
| Ok(Channel { svc }) |
| } |
| |
| pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self |
| where |
| D: Discover<Service = Connection> + Unpin + Send + 'static, |
| D::Error: Into<crate::Error>, |
| D::Key: Hash + Send + Clone, |
| E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static, |
| { |
| let svc = Balance::new(discover); |
| |
| let svc = BoxService::new(svc); |
| let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size); |
| executor.execute(Box::pin(worker)); |
| |
| Channel { svc } |
| } |
| } |
| |
| impl Service<http::Request<BoxBody>> for Channel { |
| type Response = http::Response<super::Body>; |
| type Error = super::Error; |
| type Future = ResponseFuture; |
| |
| fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) |
| } |
| |
| fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future { |
| let inner = Service::call(&mut self.svc, request); |
| |
| ResponseFuture { inner } |
| } |
| } |
| |
| impl Future for ResponseFuture { |
| type Output = Result<Response<hyper::Body>, super::Error>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; |
| Ok(val).into() |
| } |
| } |
| |
| impl fmt::Debug for Channel { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("Channel").finish() |
| } |
| } |
| |
| impl fmt::Debug for ResponseFuture { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("ResponseFuture").finish() |
| } |
| } |