| use crate::metadata::{MetadataMap, MetadataValue}; |
| #[cfg(feature = "transport")] |
| use crate::transport::server::TcpConnectInfo; |
| #[cfg(feature = "tls")] |
| use crate::transport::{server::TlsConnectInfo, Certificate}; |
| use crate::Extensions; |
| #[cfg(feature = "transport")] |
| use std::net::SocketAddr; |
| #[cfg(feature = "tls")] |
| use std::sync::Arc; |
| use std::time::Duration; |
| use tokio_stream::Stream; |
| |
| /// A gRPC request and metadata from an RPC call. |
| #[derive(Debug)] |
| pub struct Request<T> { |
| metadata: MetadataMap, |
| message: T, |
| extensions: Extensions, |
| } |
| |
| /// Trait implemented by RPC request types. |
| /// |
| /// Types implementing this trait can be used as arguments to client RPC |
| /// methods without explicitly wrapping them into `tonic::Request`s. The purpose |
| /// is to make client calls slightly more convenient to write. |
| /// |
| /// Tonic's code generation and blanket implementations handle this for you, |
| /// so it is not necessary to implement this trait directly. |
| /// |
| /// # Example |
| /// |
| /// Given the following gRPC method definition: |
| /// ```proto |
| /// rpc GetFeature(Point) returns (Feature) {} |
| /// ``` |
| /// |
| /// we can call `get_feature` in two equivalent ways: |
| /// ```rust |
| /// # pub struct Point {} |
| /// # pub struct Client {} |
| /// # impl Client { |
| /// # fn get_feature(&self, r: impl tonic::IntoRequest<Point>) {} |
| /// # } |
| /// # let client = Client {}; |
| /// use tonic::Request; |
| /// |
| /// client.get_feature(Point {}); |
| /// client.get_feature(Request::new(Point {})); |
| /// ``` |
| pub trait IntoRequest<T>: sealed::Sealed { |
| /// Wrap the input message `T` in a `tonic::Request` |
| fn into_request(self) -> Request<T>; |
| } |
| |
| /// Trait implemented by RPC streaming request types. |
| /// |
| /// Types implementing this trait can be used as arguments to client streaming |
| /// RPC methods without explicitly wrapping them into `tonic::Request`s. The |
| /// purpose is to make client calls slightly more convenient to write. |
| /// |
| /// Tonic's code generation and blanket implementations handle this for you, |
| /// so it is not necessary to implement this trait directly. |
| /// |
| /// # Example |
| /// |
| /// Given the following gRPC service method definition: |
| /// ```proto |
| /// rpc RecordRoute(stream Point) returns (RouteSummary) {} |
| /// ``` |
| /// we can call `record_route` in two equivalent ways: |
| /// |
| /// ```rust |
| /// # #[derive(Clone)] |
| /// # pub struct Point {}; |
| /// # pub struct Client {}; |
| /// # impl Client { |
| /// # fn record_route(&self, r: impl tonic::IntoStreamingRequest<Message = Point>) {} |
| /// # } |
| /// # let client = Client {}; |
| /// use tonic::Request; |
| /// |
| /// let messages = vec![Point {}, Point {}]; |
| /// |
| /// client.record_route(Request::new(tokio_stream::iter(messages.clone()))); |
| /// client.record_route(tokio_stream::iter(messages)); |
| /// ``` |
| pub trait IntoStreamingRequest: sealed::Sealed { |
| /// The RPC request stream type |
| type Stream: Stream<Item = Self::Message> + Send + 'static; |
| |
| /// The RPC request type |
| type Message; |
| |
| /// Wrap the stream of messages in a `tonic::Request` |
| fn into_streaming_request(self) -> Request<Self::Stream>; |
| } |
| |
| impl<T> Request<T> { |
| /// Create a new gRPC request. |
| /// |
| /// ```rust |
| /// # use tonic::Request; |
| /// # pub struct HelloRequest { |
| /// # pub name: String, |
| /// # } |
| /// Request::new(HelloRequest { |
| /// name: "Bob".into(), |
| /// }); |
| /// ``` |
| pub fn new(message: T) -> Self { |
| Request { |
| metadata: MetadataMap::new(), |
| message, |
| extensions: Extensions::new(), |
| } |
| } |
| |
| /// Get a reference to the message |
| pub fn get_ref(&self) -> &T { |
| &self.message |
| } |
| |
| /// Get a mutable reference to the message |
| pub fn get_mut(&mut self) -> &mut T { |
| &mut self.message |
| } |
| |
| /// Get a reference to the custom request metadata. |
| pub fn metadata(&self) -> &MetadataMap { |
| &self.metadata |
| } |
| |
| /// Get a mutable reference to the request metadata. |
| pub fn metadata_mut(&mut self) -> &mut MetadataMap { |
| &mut self.metadata |
| } |
| |
| /// Consumes `self`, returning the message |
| pub fn into_inner(self) -> T { |
| self.message |
| } |
| |
| /// Consumes `self` returning the parts of the request. |
| pub fn into_parts(self) -> (MetadataMap, Extensions, T) { |
| (self.metadata, self.extensions, self.message) |
| } |
| |
| /// Create a new gRPC request from metadata, extensions and message. |
| pub fn from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self { |
| Self { |
| metadata, |
| extensions, |
| message, |
| } |
| } |
| |
| pub(crate) fn from_http_parts(parts: http::request::Parts, message: T) -> Self { |
| Request { |
| metadata: MetadataMap::from_headers(parts.headers), |
| message, |
| extensions: Extensions::from_http(parts.extensions), |
| } |
| } |
| |
| /// Convert an HTTP request to a gRPC request |
| pub fn from_http(http: http::Request<T>) -> Self { |
| let (parts, message) = http.into_parts(); |
| Request::from_http_parts(parts, message) |
| } |
| |
| pub(crate) fn into_http( |
| self, |
| uri: http::Uri, |
| method: http::Method, |
| version: http::Version, |
| sanitize_headers: SanitizeHeaders, |
| ) -> http::Request<T> { |
| let mut request = http::Request::new(self.message); |
| |
| *request.version_mut() = version; |
| *request.method_mut() = method; |
| *request.uri_mut() = uri; |
| *request.headers_mut() = match sanitize_headers { |
| SanitizeHeaders::Yes => self.metadata.into_sanitized_headers(), |
| SanitizeHeaders::No => self.metadata.into_headers(), |
| }; |
| *request.extensions_mut() = self.extensions.into_http(); |
| |
| request |
| } |
| |
| #[doc(hidden)] |
| pub fn map<F, U>(self, f: F) -> Request<U> |
| where |
| F: FnOnce(T) -> U, |
| { |
| let message = f(self.message); |
| |
| Request { |
| metadata: self.metadata, |
| message, |
| extensions: self.extensions, |
| } |
| } |
| |
| /// Get the local address of this connection. |
| /// |
| /// This will return `None` if the `IO` type used |
| /// does not implement `Connected` or when using a unix domain socket. |
| /// This currently only works on the server side. |
| #[cfg(feature = "transport")] |
| #[cfg_attr(docsrs, doc(cfg(feature = "transport")))] |
| pub fn local_addr(&self) -> Option<SocketAddr> { |
| let addr = self |
| .extensions() |
| .get::<TcpConnectInfo>() |
| .and_then(|i| i.local_addr()); |
| |
| #[cfg(feature = "tls")] |
| let addr = addr.or_else(|| { |
| self.extensions() |
| .get::<TlsConnectInfo<TcpConnectInfo>>() |
| .and_then(|i| i.get_ref().local_addr()) |
| }); |
| |
| addr |
| } |
| |
| /// Get the remote address of this connection. |
| /// |
| /// This will return `None` if the `IO` type used |
| /// does not implement `Connected` or when using a unix domain socket. |
| /// This currently only works on the server side. |
| #[cfg(feature = "transport")] |
| #[cfg_attr(docsrs, doc(cfg(feature = "transport")))] |
| pub fn remote_addr(&self) -> Option<SocketAddr> { |
| let addr = self |
| .extensions() |
| .get::<TcpConnectInfo>() |
| .and_then(|i| i.remote_addr()); |
| |
| #[cfg(feature = "tls")] |
| let addr = addr.or_else(|| { |
| self.extensions() |
| .get::<TlsConnectInfo<TcpConnectInfo>>() |
| .and_then(|i| i.get_ref().remote_addr()) |
| }); |
| |
| addr |
| } |
| |
| /// Get the peer certificates of the connected client. |
| /// |
| /// This is used to fetch the certificates from the TLS session |
| /// and is mostly used for mTLS. This currently only returns |
| /// `Some` on the server side of the `transport` server with |
| /// TLS enabled connections. |
| #[cfg(feature = "tls")] |
| #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] |
| pub fn peer_certs(&self) -> Option<Arc<Vec<Certificate>>> { |
| self.extensions() |
| .get::<TlsConnectInfo<TcpConnectInfo>>() |
| .and_then(|i| i.peer_certs()) |
| } |
| |
| /// Set the max duration the request is allowed to take. |
| /// |
| /// Requires the server to support the `grpc-timeout` metadata, which Tonic does. |
| /// |
| /// The duration will be formatted according to [the spec] and use the most precise unit |
| /// possible. |
| /// |
| /// Example: |
| /// |
| /// ```rust |
| /// use std::time::Duration; |
| /// use tonic::Request; |
| /// |
| /// let mut request = Request::new(()); |
| /// |
| /// request.set_timeout(Duration::from_secs(30)); |
| /// |
| /// let value = request.metadata().get("grpc-timeout").unwrap(); |
| /// |
| /// assert_eq!( |
| /// value, |
| /// // equivalent to 30 seconds |
| /// "30000000u" |
| /// ); |
| /// ``` |
| /// |
| /// [the spec]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md |
| pub fn set_timeout(&mut self, deadline: Duration) { |
| let value: MetadataValue<_> = duration_to_grpc_timeout(deadline).parse().unwrap(); |
| self.metadata_mut() |
| .insert(crate::metadata::GRPC_TIMEOUT_HEADER, value); |
| } |
| |
| /// Returns a reference to the associated extensions. |
| pub fn extensions(&self) -> &Extensions { |
| &self.extensions |
| } |
| |
| /// Returns a mutable reference to the associated extensions. |
| /// |
| /// # Example |
| /// |
| /// Extensions can be set in interceptors: |
| /// |
| /// ```no_run |
| /// use tonic::{Request, service::interceptor}; |
| /// |
| /// struct MyExtension { |
| /// some_piece_of_data: String, |
| /// } |
| /// |
| /// interceptor(|mut request: Request<()>| { |
| /// request.extensions_mut().insert(MyExtension { |
| /// some_piece_of_data: "foo".to_string(), |
| /// }); |
| /// |
| /// Ok(request) |
| /// }); |
| /// ``` |
| /// |
| /// And picked up by RPCs: |
| /// |
| /// ```no_run |
| /// use tonic::{async_trait, Status, Request, Response}; |
| /// # |
| /// # struct Output {} |
| /// # struct Input; |
| /// # struct MyService; |
| /// # struct MyExtension; |
| /// # #[async_trait] |
| /// # trait TestService { |
| /// # async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status>; |
| /// # } |
| /// |
| /// #[async_trait] |
| /// impl TestService for MyService { |
| /// async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status> { |
| /// let value: &MyExtension = req.extensions().get::<MyExtension>().unwrap(); |
| /// |
| /// Ok(Response::new(Output {})) |
| /// } |
| /// } |
| /// ``` |
| pub fn extensions_mut(&mut self) -> &mut Extensions { |
| &mut self.extensions |
| } |
| } |
| |
| impl<T> IntoRequest<T> for T { |
| fn into_request(self) -> Request<Self> { |
| Request::new(self) |
| } |
| } |
| |
| impl<T> IntoRequest<T> for Request<T> { |
| fn into_request(self) -> Request<T> { |
| self |
| } |
| } |
| |
| impl<T> IntoStreamingRequest for T |
| where |
| T: Stream + Send + 'static, |
| { |
| type Stream = T; |
| type Message = T::Item; |
| |
| fn into_streaming_request(self) -> Request<Self> { |
| Request::new(self) |
| } |
| } |
| |
| impl<T> IntoStreamingRequest for Request<T> |
| where |
| T: Stream + Send + 'static, |
| { |
| type Stream = T; |
| type Message = T::Item; |
| |
| fn into_streaming_request(self) -> Self { |
| self |
| } |
| } |
| |
| impl<T> sealed::Sealed for T {} |
| |
| mod sealed { |
| pub trait Sealed {} |
| } |
| |
| fn duration_to_grpc_timeout(duration: Duration) -> String { |
| fn try_format<T: Into<u128>>( |
| duration: Duration, |
| unit: char, |
| convert: impl FnOnce(Duration) -> T, |
| ) -> Option<String> { |
| // The gRPC spec specifies that the timeout most be at most 8 digits. So this is the largest a |
| // value can be before we need to use a bigger unit. |
| let max_size: u128 = 99_999_999; // exactly 8 digits |
| |
| let value = convert(duration).into(); |
| if value > max_size { |
| None |
| } else { |
| Some(format!("{}{}", value, unit)) |
| } |
| } |
| |
| // pick the most precise unit that is less than or equal to 8 digits as per the gRPC spec |
| try_format(duration, 'n', |d| d.as_nanos()) |
| .or_else(|| try_format(duration, 'u', |d| d.as_micros())) |
| .or_else(|| try_format(duration, 'm', |d| d.as_millis())) |
| .or_else(|| try_format(duration, 'S', |d| d.as_secs())) |
| .or_else(|| try_format(duration, 'M', |d| d.as_secs() / 60)) |
| .or_else(|| { |
| try_format(duration, 'H', |d| { |
| let minutes = d.as_secs() / 60; |
| minutes / 60 |
| }) |
| }) |
| // duration has to be more than 11_415 years for this to happen |
| .expect("duration is unrealistically large") |
| } |
| |
| /// When converting a `tonic::Request` into a `http::Request` should reserved |
| /// headers be removed? |
| pub(crate) enum SanitizeHeaders { |
| Yes, |
| No, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::metadata::MetadataValue; |
| use http::Uri; |
| |
| #[test] |
| fn reserved_headers_are_excluded() { |
| let mut r = Request::new(1); |
| |
| for header in &MetadataMap::GRPC_RESERVED_HEADERS { |
| r.metadata_mut() |
| .insert(*header, MetadataValue::from_static("invalid")); |
| } |
| |
| let http_request = r.into_http( |
| Uri::default(), |
| http::Method::POST, |
| http::Version::HTTP_2, |
| SanitizeHeaders::Yes, |
| ); |
| assert!(http_request.headers().is_empty()); |
| } |
| |
| #[test] |
| fn duration_to_grpc_timeout_less_than_second() { |
| let timeout = Duration::from_millis(500); |
| let value = duration_to_grpc_timeout(timeout); |
| assert_eq!(value, format!("{}u", timeout.as_micros())); |
| } |
| |
| #[test] |
| fn duration_to_grpc_timeout_more_than_second() { |
| let timeout = Duration::from_secs(30); |
| let value = duration_to_grpc_timeout(timeout); |
| assert_eq!(value, format!("{}u", timeout.as_micros())); |
| } |
| |
| #[test] |
| fn duration_to_grpc_timeout_a_very_long_time() { |
| let one_hour = Duration::from_secs(60 * 60); |
| let value = duration_to_grpc_timeout(one_hour); |
| assert_eq!(value, format!("{}m", one_hour.as_millis())); |
| } |
| } |