blob: fd2dd10c2c697dba4d73830e268ab1575515835e [file] [log] [blame]
use super::{FileLock, SparseIndex};
use crate::{Error, IndexKrate, KrateName};
pub use reqwest::blocking::Client;
pub use reqwest::Client as AsyncClient;
use std::collections::{BTreeMap, BTreeSet};
/// Allows **blocking** access to a remote HTTP sparse registry index
pub struct RemoteSparseIndex {
/// The local index this remote is wrapping
pub index: SparseIndex,
/// The client used to make requests to the remote index
pub client: Client,
}
impl RemoteSparseIndex {
/// Creates a new [`Self`] that can access and write local cache entries,
/// and contact the remote index to retrieve the latest index information
#[inline]
pub fn new(index: SparseIndex, client: Client) -> Self {
Self { index, client }
}
/// Gets the latest index metadata for the crate
///
/// Network I/O is _always_ performed when calling this method, however the
/// response from the remote registry will be empty of contents other than
/// headers if the local cache entry for the crate is up to date with the
/// latest in the index
pub fn krate(
&self,
name: KrateName<'_>,
write_cache_entry: bool,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
let req = self.index.make_remote_request(name, None, lock)?;
let req = req.try_into()?;
let res = self.client.execute(req)?;
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
builder
.headers_mut()
.unwrap()
.extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
let body = res.bytes()?;
let res = builder.body(body.to_vec())?;
self.index
.parse_remote_response(name, res, write_cache_entry, lock)
}
/// Attempts to read the locally cached crate information
///
/// This method does no network I/O unlike [`Self::krate`], but does not
/// guarantee that the cache information is up to date with the latest in
/// the remote index
#[inline]
pub fn cached_krate(
&self,
name: KrateName<'_>,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
self.index.cached_krate(name, lock)
}
/// Helper method for downloading multiple crates in parallel
///
/// Note that in most cases using [`AsyncRemoteSparseIndex::krates_blocking`]
/// will outperform this method, especially on lower core counts
#[inline]
pub fn krates(
&self,
mut krates: BTreeSet<String>,
write_cache_entries: bool,
lock: &FileLock,
) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
let Some(prep_krate) = krates.pop_last() else {
return Default::default();
};
let prep = || {
let name = prep_krate.as_str().try_into()?;
self.krate(name, write_cache_entries, lock)
};
let prep_krate_res = prep();
use rayon::prelude::*;
let mut results: BTreeMap<_, _> = krates
.into_par_iter()
.map(|kname| {
let res = || {
let name = kname.as_str().try_into()?;
self.krate(name, write_cache_entries, lock)
};
let res = res();
(kname, res)
})
.collect();
results.insert(prep_krate, prep_krate_res);
results
}
}
/// Allows **async** access to a remote HTTP sparse registry index
pub struct AsyncRemoteSparseIndex {
/// The local index this remote is wrapping
pub index: SparseIndex,
/// The client used to make requests to the remote index
pub client: AsyncClient,
}
impl AsyncRemoteSparseIndex {
/// Creates a new [`Self`] that can access and write local cache entries,
/// and contact the remote index to retrieve the latest index information
#[inline]
pub fn new(index: SparseIndex, client: AsyncClient) -> Self {
Self { index, client }
}
/// Async version of [`RemoteSparseIndex::krate`]
pub async fn krate_async(
&self,
name: KrateName<'_>,
write_cache_entry: bool,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
let req = self
.index
.make_remote_request(name, None, lock)?
.try_into()?;
let res = Self::exec_request(&self.client, req).await?;
self.index
.parse_remote_response(name, res, write_cache_entry, lock)
}
async fn exec_request(
client: &AsyncClient,
req: reqwest::Request,
) -> Result<http::Response<Vec<u8>>, Error> {
// This is unfortunate, but we always make a copy in case we need to retry
let res = loop {
let reqc = req.try_clone().unwrap();
let res = client.execute(reqc).await;
match res {
Err(err) if err.is_connect() || err.is_timeout() || err.is_request() => continue,
Err(err) => return Err(err.into()),
Ok(res) => break res,
}
};
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
builder
.headers_mut()
.unwrap()
.extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
let body = res.bytes().await?;
Ok(builder.body(body.to_vec())?)
}
/// Attempts to read the locally cached crate information
///
/// This method does no network I/O unlike [`Self::krate_async`], but does not
/// guarantee that the cache information is up to date with the latest in
/// the remote index
#[inline]
pub fn cached_krate(
&self,
name: KrateName<'_>,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
self.index.cached_krate(name, lock)
}
/// Helper method for downloading multiples crates concurrently
///
/// This method will generally perform better than [`RemoteSparseIndex::krates`]
///
/// One notable difference with this method is that you can specify a maximum
/// duration that each individual krate request can take before it is timed out.
/// This is because certain [errors](https://github.com/seanmonstar/reqwest/issues/1748)
/// can occur when making many concurrent requests, which we detect and retry
/// automatically, but with (by default) no upper bound in number of
/// retries/time.
///
/// You can also run this entire operation with a single timeout if you wish,
/// via something like [`tokio::time::timeout`](https://docs.rs/tokio/latest/tokio/time/fn.timeout.html)
pub async fn krates(
&self,
mut krates: BTreeSet<String>,
write_cache_entries: bool,
individual_timeout: Option<std::time::Duration>,
lock: &FileLock,
) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
let Some(prep_krate) = krates.pop_last() else {
return Default::default();
};
let create_req = |kname: &str| -> Result<http::Request<&'static [u8]>, Error> {
let name = kname.try_into()?;
self.index.make_remote_request(name, None, lock)
};
let mut results = BTreeMap::new();
{
let result;
match create_req(&prep_krate).and_then(|req| Ok(req.try_into()?)) {
Ok(req) => match Self::exec_request(&self.client, req).await {
Ok(res) => {
result = self.index.parse_remote_response(
prep_krate.as_str().try_into().unwrap(),
res,
write_cache_entries,
lock,
);
}
Err(err) => result = Err(err),
},
Err(err) => result = Err(err),
}
results.insert(prep_krate, result);
}
let mut tasks = tokio::task::JoinSet::new();
for kname in krates {
match kname.as_str().try_into().and_then(|name| {
Ok(self
.index
.make_remote_request(name, None, lock)?
.try_into()?)
}) {
Ok(req) => {
let client = self.client.clone();
tasks.spawn(async move {
let res = if let Some(to) = individual_timeout {
match tokio::time::timeout(to, Self::exec_request(&client, req)).await {
Ok(res) => res,
Err(_) => Err(Error::Http(crate::HttpError::Timeout)),
}
} else {
Self::exec_request(&client, req).await
};
(kname, res)
});
}
Err(err) => {
results.insert(kname, Err(err));
}
}
}
let (tx, rx) = crossbeam_channel::unbounded();
while let Some(res) = tasks.join_next().await {
let Ok(res) = res else {
continue;
};
let _ = tx.send(res);
}
drop(tx);
let results = std::sync::Mutex::new(results);
rayon::scope(|s| {
while let Ok((kname, res)) = rx.recv() {
s.spawn(|_s| {
let res = res.and_then(|res| {
let name = kname
.as_str()
.try_into()
.expect("this was already validated");
self.index
.parse_remote_response(name, res, write_cache_entries, lock)
});
results.lock().unwrap().insert(kname, res);
});
}
});
results.into_inner().unwrap()
}
/// A non-async version of [`Self::krates`]
///
/// Using this method requires that there is an active tokio runtime as
/// described [here](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.current)
pub fn krates_blocking(
&self,
krates: BTreeSet<String>,
write_cache_entries: bool,
individual_timeout: Option<std::time::Duration>,
lock: &FileLock,
) -> Result<BTreeMap<String, Result<Option<IndexKrate>, Error>>, tokio::runtime::TryCurrentError>
{
let current = tokio::runtime::Handle::try_current()?;
Ok(current.block_on(async {
self.krates(krates, write_cache_entries, individual_timeout, lock)
.await
}))
}
}
impl From<reqwest::Error> for Error {
#[inline]
fn from(e: reqwest::Error) -> Self {
Self::Http(crate::HttpError::Reqwest(e))
}
}
impl From<http::Error> for Error {
#[inline]
fn from(e: http::Error) -> Self {
Self::Http(crate::HttpError::Http(e))
}
}