blob: 7f2ffde2b3951b8d13a24f8f1ad03e042bb5dd51 [file] [log] [blame]
//! Server implementation and builder.
mod conn;
mod incoming;
mod recover_error;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;
#[cfg(unix)]
mod unix;
pub use super::service::Routes;
pub use super::service::RoutesBuilder;
pub use conn::{Connected, TcpConnectInfo};
#[cfg(feature = "tls")]
pub use tls::ServerTlsConfig;
#[cfg(feature = "tls")]
pub use conn::TlsConnectInfo;
#[cfg(feature = "tls")]
use super::service::TlsAcceptor;
#[cfg(unix)]
pub use unix::UdsConnectInfo;
pub use incoming::TcpIncoming;
#[cfg(feature = "tls")]
pub(crate) use tokio_rustls::server::TlsStream;
#[cfg(feature = "tls")]
use crate::transport::Error;
use self::recover_error::RecoverError;
use super::service::{GrpcTimeout, ServerIo};
use crate::body::BoxBody;
use crate::server::NamedService;
use bytes::Bytes;
use http::{Request, Response};
use http_body::Body as _;
use hyper::{server::accept, Body};
use pin_project::pin_project;
use std::{
convert::Infallible,
fmt,
future::{self, Future},
marker::PhantomData,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::Stream;
use tower::{
layer::util::{Identity, Stack},
layer::Layer,
limit::concurrency::ConcurrencyLimitLayer,
util::Either,
Service, ServiceBuilder,
};
type BoxHttpBody = http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>;
type BoxService = tower::util::BoxService<Request<Body>, Response<BoxHttpBody>, crate::Error>;
type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
/// A default batteries included `transport` server.
///
/// This is a wrapper around [`hyper::Server`] and provides an easy builder
/// pattern style builder [`Server`]. This builder exposes easy configuration parameters
/// for providing a fully featured http2 based gRPC server. This should provide
/// a very good out of the box http2 server for use with tonic but is also a
/// reference implementation that should be a good starting point for anyone
/// wanting to create a more complex and/or specific implementation.
#[derive(Clone)]
pub struct Server<L = Identity> {
trace_interceptor: Option<TraceInterceptor>,
concurrency_limit: Option<usize>,
timeout: Option<Duration>,
#[cfg(feature = "tls")]
tls: Option<TlsAcceptor>,
init_stream_window_size: Option<u32>,
init_connection_window_size: Option<u32>,
max_concurrent_streams: Option<u32>,
tcp_keepalive: Option<Duration>,
tcp_nodelay: bool,
http2_keepalive_interval: Option<Duration>,
http2_keepalive_timeout: Option<Duration>,
http2_adaptive_window: Option<bool>,
http2_max_pending_accept_reset_streams: Option<usize>,
max_frame_size: Option<u32>,
accept_http1: bool,
service_builder: ServiceBuilder<L>,
}
impl Default for Server<Identity> {
fn default() -> Self {
Self {
trace_interceptor: None,
concurrency_limit: None,
timeout: None,
#[cfg(feature = "tls")]
tls: None,
init_stream_window_size: None,
init_connection_window_size: None,
max_concurrent_streams: None,
tcp_keepalive: None,
tcp_nodelay: false,
http2_keepalive_interval: None,
http2_keepalive_timeout: None,
http2_adaptive_window: None,
http2_max_pending_accept_reset_streams: None,
max_frame_size: None,
accept_http1: false,
service_builder: Default::default(),
}
}
}
/// A stack based `Service` router.
#[derive(Debug)]
pub struct Router<L = Identity> {
server: Server<L>,
routes: Routes,
}
impl<S: NamedService, T> NamedService for Either<S, T> {
const NAME: &'static str = S::NAME;
}
impl Server {
/// Create a new server builder that can configure a [`Server`].
pub fn builder() -> Self {
Server {
tcp_nodelay: true,
accept_http1: false,
..Default::default()
}
}
}
impl<L> Server<L> {
/// Configure TLS for this server.
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
Ok(Server {
tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
..self
})
}
/// Set the concurrency limit applied to on requests inbound per connection.
///
/// # Example
///
/// ```
/// # use tonic::transport::Server;
/// # use tower_service::Service;
/// # let builder = Server::builder();
/// builder.concurrency_limit_per_connection(32);
/// ```
#[must_use]
pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
Server {
concurrency_limit: Some(limit),
..self
}
}
/// Set a timeout on for all request handlers.
///
/// # Example
///
/// ```
/// # use tonic::transport::Server;
/// # use tower_service::Service;
/// # use std::time::Duration;
/// # let builder = Server::builder();
/// builder.timeout(Duration::from_secs(30));
/// ```
#[must_use]
pub fn timeout(self, timeout: Duration) -> Self {
Server {
timeout: Some(timeout),
..self
}
}
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
/// stream-level flow control.
///
/// Default is 65,535
///
/// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
#[must_use]
pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
Server {
init_stream_window_size: sz.into(),
..self
}
}
/// Sets the max connection-level flow control for HTTP2
///
/// Default is 65,535
#[must_use]
pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
Server {
init_connection_window_size: sz.into(),
..self
}
}
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
/// connections.
///
/// Default is no limit (`None`).
///
/// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
#[must_use]
pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
Server {
max_concurrent_streams: max.into(),
..self
}
}
/// Set whether HTTP2 Ping frames are enabled on accepted connections.
///
/// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
/// specified will be the time interval between HTTP2 Ping frames.
/// The timeout for receiving an acknowledgement of the keepalive ping
/// can be set with [`Server::http2_keepalive_timeout`].
///
/// Default is no HTTP2 keepalive (`None`)
///
#[must_use]
pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
Server {
http2_keepalive_interval,
..self
}
}
/// Sets a timeout for receiving an acknowledgement of the keepalive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will be closed.
/// Does nothing if http2_keep_alive_interval is disabled.
///
/// Default is 20 seconds.
///
#[must_use]
pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
Server {
http2_keepalive_timeout,
..self
}
}
/// Sets whether to use an adaptive flow control. Defaults to false.
/// Enabling this will override the limits set in http2_initial_stream_window_size and
/// http2_initial_connection_window_size.
#[must_use]
pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
Server {
http2_adaptive_window: enabled,
..self
}
}
/// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
///
/// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
///
/// See <https://github.com/hyperium/hyper/issues/2877> for more information.
#[must_use]
pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
Server {
http2_max_pending_accept_reset_streams: max,
..self
}
}
/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
/// specified will be the time to remain idle before sending TCP keepalive
/// probes.
///
/// Default is no keepalive (`None`)
///
#[must_use]
pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
Server {
tcp_keepalive,
..self
}
}
/// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
#[must_use]
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Server {
tcp_nodelay: enabled,
..self
}
}
/// Sets the maximum frame size to use for HTTP2.
///
/// Passing `None` will do nothing.
///
/// If not set, will default from underlying transport.
#[must_use]
pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
Server {
max_frame_size: frame_size.into(),
..self
}
}
/// Allow this server to accept http1 requests.
///
/// Accepting http1 requests is only useful when developing `grpc-web`
/// enabled services. If this setting is set to `true` but services are
/// not correctly configured to handle grpc-web requests, your server may
/// return confusing (but correct) protocol errors.
///
/// Default is `false`.
#[must_use]
pub fn accept_http1(self, accept_http1: bool) -> Self {
Server {
accept_http1,
..self
}
}
/// Intercept inbound headers and add a [`tracing::Span`] to each response future.
#[must_use]
pub fn trace_fn<F>(self, f: F) -> Self
where
F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
{
Server {
trace_interceptor: Some(Arc::new(f)),
..self
}
}
/// Create a router with the `S` typed service as the first service.
///
/// This will clone the `Server` builder and create a router that will
/// route around different services.
pub fn add_service<S>(&mut self, svc: S) -> Router<L>
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
L: Clone,
{
Router::new(self.clone(), Routes::new(svc))
}
/// Create a router with the optional `S` typed service as the first service.
///
/// This will clone the `Server` builder and create a router that will
/// route around different services.
///
/// # Note
/// Even when the argument given is `None` this will capture *all* requests to this service name.
/// As a result, one cannot use this to toggle between two identically named implementations.
pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
L: Clone,
{
let routes = svc.map(Routes::new).unwrap_or_default();
Router::new(self.clone(), routes)
}
/// Create a router with given [`Routes`].
///
/// This will clone the `Server` builder and create a router that will
/// route around different services that were already added to the provided `routes`.
pub fn add_routes(&mut self, routes: Routes) -> Router<L>
where
L: Clone,
{
Router::new(self.clone(), routes)
}
/// Set the [Tower] [`Layer`] all services will be wrapped in.
///
/// This enables using middleware from the [Tower ecosystem][eco].
///
/// # Example
///
/// ```
/// # use tonic::transport::Server;
/// # use tower_service::Service;
/// use tower::timeout::TimeoutLayer;
/// use std::time::Duration;
///
/// # let mut builder = Server::builder();
/// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
/// ```
///
/// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
/// here as an example.
///
/// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
/// [interceptors]:
///
/// ```
/// # use tonic::transport::Server;
/// # use tower_service::Service;
/// use tower::ServiceBuilder;
/// use std::time::Duration;
/// use tonic::{Request, Status, service::interceptor};
///
/// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
/// if valid_credentials(&request) {
/// Ok(request)
/// } else {
/// Err(Status::unauthenticated("invalid credentials"))
/// }
/// }
///
/// fn valid_credentials(request: &Request<()>) -> bool {
/// // ...
/// # true
/// }
///
/// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
/// Ok(request)
/// }
///
/// let layer = ServiceBuilder::new()
/// .load_shed()
/// .timeout(Duration::from_secs(30))
/// .layer(interceptor(auth_interceptor))
/// .layer(interceptor(some_other_interceptor))
/// .into_inner();
///
/// Server::builder().layer(layer);
/// ```
///
/// [Tower]: https://github.com/tower-rs/tower
/// [`Layer`]: tower::layer::Layer
/// [eco]: https://github.com/tower-rs
/// [`ServiceBuilder`]: tower::ServiceBuilder
/// [interceptors]: crate::service::Interceptor
pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
Server {
service_builder: self.service_builder.layer(new_layer),
trace_interceptor: self.trace_interceptor,
concurrency_limit: self.concurrency_limit,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
init_stream_window_size: self.init_stream_window_size,
init_connection_window_size: self.init_connection_window_size,
max_concurrent_streams: self.max_concurrent_streams,
tcp_keepalive: self.tcp_keepalive,
tcp_nodelay: self.tcp_nodelay,
http2_keepalive_interval: self.http2_keepalive_interval,
http2_keepalive_timeout: self.http2_keepalive_timeout,
http2_adaptive_window: self.http2_adaptive_window,
http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
max_frame_size: self.max_frame_size,
accept_http1: self.accept_http1,
}
}
pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>(
self,
svc: S,
incoming: I,
signal: Option<F>,
) -> Result<(), super::Error>
where
L: Layer<S>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<crate::Error>,
F: Future<Output = ()>,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
let trace_interceptor = self.trace_interceptor.clone();
let concurrency_limit = self.concurrency_limit;
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
let max_concurrent_streams = self.max_concurrent_streams;
let timeout = self.timeout;
let max_frame_size = self.max_frame_size;
let http2_only = !self.accept_http1;
let http2_keepalive_interval = self.http2_keepalive_interval;
let http2_keepalive_timeout = self
.http2_keepalive_timeout
.unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
let http2_adaptive_window = self.http2_adaptive_window;
let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
let svc = self.service_builder.service(svc);
let tcp = incoming::tcp_incoming(incoming, self);
let incoming = accept::from_stream::<_, _, crate::Error>(tcp);
let svc = MakeSvc {
inner: svc,
concurrency_limit,
timeout,
trace_interceptor,
_io: PhantomData,
};
let server = hyper::Server::builder(incoming)
.http2_only(http2_only)
.http2_initial_connection_window_size(init_connection_window_size)
.http2_initial_stream_window_size(init_stream_window_size)
.http2_max_concurrent_streams(max_concurrent_streams)
.http2_keep_alive_interval(http2_keepalive_interval)
.http2_keep_alive_timeout(http2_keepalive_timeout)
.http2_adaptive_window(http2_adaptive_window.unwrap_or_default())
.http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
.http2_max_frame_size(max_frame_size);
if let Some(signal) = signal {
server
.serve(svc)
.with_graceful_shutdown(signal)
.await
.map_err(super::Error::from_source)?
} else {
server.serve(svc).await.map_err(super::Error::from_source)?;
}
Ok(())
}
}
impl<L> Router<L> {
pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
Self { server, routes }
}
}
impl<L> Router<L> {
/// Add a new service to this router.
pub fn add_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
self.routes = self.routes.add_service(svc);
self
}
/// Add a new optional service to this router.
///
/// # Note
/// Even when the argument given is `None` this will capture *all* requests to this service name.
/// As a result, one cannot use this to toggle between two identically named implementations.
#[allow(clippy::type_complexity)]
pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
if let Some(svc) = svc {
self.routes = self.routes.add_service(svc);
}
self
}
/// Convert this tonic `Router` into an axum `Router` consuming the tonic one.
pub fn into_router(self) -> axum::Router {
self.routes.into_router()
}
/// Consume this [`Server`] creating a future that will execute the server
/// on [tokio]'s default executor.
///
/// [`Server`]: struct.Server.html
/// [tokio]: https://docs.rs/tokio
pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
.map_err(super::Error::from_source)?;
self.server
.serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
self.routes.prepare(),
incoming,
None,
)
.await
}
/// Consume this [`Server`] creating a future that will execute the server
/// on [tokio]'s default executor. And shutdown when the provided signal
/// is received.
///
/// [`Server`]: struct.Server.html
/// [tokio]: https://docs.rs/tokio
pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
self,
addr: SocketAddr,
signal: F,
) -> Result<(), super::Error>
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
.map_err(super::Error::from_source)?;
self.server
.serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
.await
}
/// Consume this [`Server`] creating a future that will execute the server
/// on the provided incoming stream of `AsyncRead + AsyncWrite`.
///
/// This method discards any provided [`Server`] TCP configuration.
///
/// [`Server`]: struct.Server.html
pub async fn serve_with_incoming<I, IO, IE, ResBody>(
self,
incoming: I,
) -> Result<(), super::Error>
where
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<crate::Error>,
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
self.server
.serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
self.routes.prepare(),
incoming,
None,
)
.await
}
/// Consume this [`Server`] creating a future that will execute the server
/// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
/// `serve_with_shutdown` this method will also take a signal future to
/// gracefully shutdown the server.
///
/// This method discards any provided [`Server`] TCP configuration.
///
/// [`Server`]: struct.Server.html
pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
self,
incoming: I,
signal: F,
) -> Result<(), super::Error>
where
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<crate::Error>,
F: Future<Output = ()>,
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
self.server
.serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
.await
}
/// Create a tower service out of a router.
pub fn into_service<ResBody>(self) -> L::Service
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
self.server.service_builder.service(self.routes.prepare())
}
}
impl<L> fmt::Debug for Server<L> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Builder").finish()
}
}
struct Svc<S> {
inner: S,
trace_interceptor: Option<TraceInterceptor>,
}
impl<S, ResBody> Service<Request<Body>> for Svc<S>
where
S: Service<Request<Body>, Response = Response<ResBody>>,
S::Error: Into<crate::Error>,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
type Response = Response<BoxHttpBody>;
type Error = crate::Error;
type Future = SvcFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let span = if let Some(trace_interceptor) = &self.trace_interceptor {
let (parts, body) = req.into_parts();
let bodyless_request = Request::from_parts(parts, ());
let span = trace_interceptor(&bodyless_request);
let (parts, _) = bodyless_request.into_parts();
req = Request::from_parts(parts, body);
span
} else {
tracing::Span::none()
};
SvcFuture {
inner: self.inner.call(req),
span,
}
}
}
#[pin_project]
struct SvcFuture<F> {
#[pin]
inner: F,
span: tracing::Span,
}
impl<F, E, ResBody> Future for SvcFuture<F>
where
F: Future<Output = Result<Response<ResBody>, E>>,
E: Into<crate::Error>,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
type Output = Result<Response<BoxHttpBody>, crate::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.span.enter();
let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
let response = response.map(|body| body.map_err(Into::into).boxed_unsync());
Poll::Ready(Ok(response))
}
}
impl<S> fmt::Debug for Svc<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Svc").finish()
}
}
struct MakeSvc<S, IO> {
concurrency_limit: Option<usize>,
timeout: Option<Duration>,
inner: S,
trace_interceptor: Option<TraceInterceptor>,
_io: PhantomData<fn() -> IO>,
}
impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
where
IO: Connected,
S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<crate::Error>,
{
type Response = BoxService;
type Error = crate::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
let conn_info = io.connect_info();
let svc = self.inner.clone();
let concurrency_limit = self.concurrency_limit;
let timeout = self.timeout;
let trace_interceptor = self.trace_interceptor.clone();
let svc = ServiceBuilder::new()
.layer_fn(RecoverError::new)
.option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
.layer_fn(|s| GrpcTimeout::new(s, timeout))
.service(svc);
let svc = ServiceBuilder::new()
.layer(BoxService::layer())
.map_request(move |mut request: Request<Body>| {
match &conn_info {
tower::util::Either::A(inner) => {
request.extensions_mut().insert(inner.clone());
}
tower::util::Either::B(inner) => {
#[cfg(feature = "tls")]
{
request.extensions_mut().insert(inner.clone());
request.extensions_mut().insert(inner.get_ref().clone());
}
#[cfg(not(feature = "tls"))]
{
// just a type check to make sure we didn't forget to
// insert this into the extensions
let _: &() = inner;
}
}
}
request
})
.service(Svc {
inner: svc,
trace_interceptor,
});
future::ready(Ok(svc))
}
}