//! A generic connection pool.
//! Opening a new database connection every time one is needed is both
//! inefficient and can lead to resource exhaustion under high traffic
//! conditions. A connection pool maintains a set of open connections to a
//! database, handing them out for repeated use.
//! r2d2 is agnostic to the connection type it is managing. Implementors of the
//! `ManageConnection` trait provide the database-specific logic to create and
//! check the health of connections.
//! # Example
//! Using an imaginary "foodb" database.
//! ```rust,ignore
//! use std::thread;
//! extern crate r2d2;
//! extern crate r2d2_foodb;
//! fn main() {
//! let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");
//! let pool = r2d2::Pool::builder()
//! .max_size(15)
//! .build(manager)
//! .unwrap();
//! for _ in 0..20 {
//! let pool = pool.clone();
//! thread::spawn(move || {
//! let conn = pool.get().unwrap();
//! // use the connection
//! // it will be returned to the pool when it falls out of scope.
//! })
//! }
//! }
//! ```
#![doc(html_root_url = "")]
use log::error;
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::cmp;
use std::error;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
pub use crate::config::Builder;
use crate::config::Config;
use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
pub use crate::event::{HandleEvent, NopEventHandler};
pub use crate::extensions::Extensions;
mod config;
pub mod event;
mod extensions;
mod test;
static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
/// A trait which provides connection-specific functionality.
pub trait ManageConnection: Send + Sync + 'static {
/// The connection type this manager deals with.
type Connection: Send + 'static;
/// The error type returned by `Connection`s.
type Error: error::Error + 'static;
/// Attempts to create a new connection.
fn connect(&self) -> Result<Self::Connection, Self::Error>;
/// Determines if the connection is still connected to the database.
/// A standard implementation would check if a simple query like `SELECT 1`
/// succeeds.
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
/// *Quickly* determines if the connection is no longer usable.
/// This will be called synchronously every time a connection is returned
/// to the pool, so it should *not* block. If it returns `true`, the
/// connection will be discarded.
/// For example, an implementation might check if the underlying TCP socket
/// has disconnected. Implementations that do not support this kind of
/// fast health check may simply return `false`.
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
/// A trait which handles errors reported by the `ManageConnection`.
pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
/// Handles an error.
fn handle_error(&self, error: E);
/// A `HandleError` implementation which does nothing.
#[derive(Copy, Clone, Debug)]
pub struct NopErrorHandler;
impl<E> HandleError<E> for NopErrorHandler {
fn handle_error(&self, _: E) {}
/// A `HandleError` implementation which logs at the error level.
#[derive(Copy, Clone, Debug)]
pub struct LoggingErrorHandler;
impl<E> HandleError<E> for LoggingErrorHandler
E: error::Error,
fn handle_error(&self, error: E) {
error!("{}", error);
/// A trait which allows for customization of connections.
pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
/// Called with connections immediately after they are returned from
/// `ManageConnection::connect`.
/// The default implementation simply returns `Ok(())`.
/// # Errors
/// If this method returns an error, the connection will be discarded.
fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
/// Called with connections when they are removed from the pool.
/// The connections may be broken (as reported by `is_valid` or
/// `has_broken`), or have simply timed out.
/// The default implementation does nothing.
fn on_release(&self, conn: C) {}
/// A `CustomizeConnection` which does nothing.
#[derive(Copy, Clone, Debug)]
pub struct NopConnectionCustomizer;
impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
struct Conn<C> {
conn: C,
extensions: Extensions,
birth: Instant,
id: u64,
struct IdleConn<C> {
conn: Conn<C>,
idle_start: Instant,
struct PoolInternals<C> {
conns: Vec<IdleConn<C>>,
num_conns: u32,
pending_conns: u32,
last_error: Option<String>,
struct SharedPool<M>
M: ManageConnection,
config: Config<M::Connection, M::Error>,
manager: M,
internals: Mutex<PoolInternals<M::Connection>>,
cond: Condvar,
fn drop_conns<M>(
shared: &Arc<SharedPool<M>>,
mut internals: MutexGuard<PoolInternals<M::Connection>>,
conns: Vec<Conn<M::Connection>>,
) where
M: ManageConnection,
internals.num_conns -= conns.len() as u32;
establish_idle_connections(shared, &mut internals);
drop(internals); // make sure we run connection destructors without this locked
for conn in conns {
let event = ReleaseEvent {
age: conn.birth.elapsed(),
fn establish_idle_connections<M>(
shared: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>,
) where
M: ManageConnection,
let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
let idle = internals.conns.len() as u32;
for _ in idle..min {
add_connection(shared, internals);
fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
M: ManageConnection,
if internals.num_conns + internals.pending_conns >= shared.config.max_size {
internals.pending_conns += 1;
inner(Duration::from_secs(0), shared);
fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
M: ManageConnection,
let new_shared = Arc::downgrade(shared);
shared.config.thread_pool.execute_after(delay, move || {
let shared = match new_shared.upgrade() {
Some(shared) => shared,
None => return,
let conn = shared.manager.connect().and_then(|mut conn| {
.on_acquire(&mut conn)
.map(|_| conn)
match conn {
Ok(conn) => {
let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
let event = AcquireEvent { id };
let mut internals = shared.internals.lock();
internals.last_error = None;
let now = Instant::now();
let conn = IdleConn {
conn: Conn {
extensions: Extensions::new(),
birth: now,
idle_start: now,
internals.pending_conns -= 1;
internals.num_conns += 1;
Err(err) => {
shared.internals.lock().last_error = Some(err.to_string());
let delay = cmp::max(Duration::from_millis(200), delay);
let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
inner(delay, &shared);
fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
M: ManageConnection,
let shared = match shared.upgrade() {
Some(shared) => shared,
None => return,
let mut old = Vec::with_capacity(shared.config.max_size as usize);
let mut to_drop = vec![];
let mut internals = shared.internals.lock();
mem::swap(&mut old, &mut internals.conns);
let now = Instant::now();
for conn in old {
let mut reap = false;
if let Some(timeout) = shared.config.idle_timeout {
reap |= now - conn.idle_start >= timeout;
if let Some(lifetime) = shared.config.max_lifetime {
reap |= now - conn.conn.birth >= lifetime;
if reap {
} else {
drop_conns(&shared, internals, to_drop);
/// A generic connection pool.
pub struct Pool<M>(Arc<SharedPool<M>>)
M: ManageConnection;
/// Returns a new `Pool` referencing the same state as `self`.
impl<M> Clone for Pool<M>
M: ManageConnection,
fn clone(&self) -> Pool<M> {
impl<M> fmt::Debug for Pool<M>
M: ManageConnection + fmt::Debug,
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
.field("state", &self.state())
.field("config", &self.0.config)
.field("manager", &self.0.manager)
impl<M> Pool<M>
M: ManageConnection,
/// Creates a new connection pool with a default configuration.
pub fn new(manager: M) -> Result<Pool<M>, Error> {
/// Returns a builder type to configure a new pool.
pub fn builder() -> Builder<M> {
// for testing
fn new_inner(
config: Config<M::Connection, M::Error>,
manager: M,
reaper_rate: Duration,
) -> Pool<M> {
let internals = PoolInternals {
conns: Vec::with_capacity(config.max_size as usize),
num_conns: 0,
pending_conns: 0,
last_error: None,
let shared = Arc::new(SharedPool {
internals: Mutex::new(internals),
cond: Condvar::new(),
establish_idle_connections(&shared, &mut shared.internals.lock());
if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
let s = Arc::downgrade(&shared);
.execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
fn wait_for_initialization(&self) -> Result<(), Error> {
let end = Instant::now() + self.0.config.connection_timeout;
let mut internals = self.0.internals.lock();
let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
while internals.num_conns != initial_size {
if self.0.cond.wait_until(&mut internals, end).timed_out() {
return Err(Error(internals.last_error.take()));
/// Retrieves a connection from the pool.
/// Waits for at most the configured connection timeout before returning an
/// error.
pub fn get(&self) -> Result<PooledConnection<M>, Error> {
/// Retrieves a connection from the pool, waiting for at most `timeout`
/// The given timeout will be used instead of the configured connection
/// timeout.
pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
let start = Instant::now();
let end = start + timeout;
let mut internals = self.0.internals.lock();
loop {
match self.try_get_inner(internals) {
Ok(conn) => {
let event = CheckoutEvent {
id: conn.conn.as_ref().unwrap().id,
duration: start.elapsed(),
return Ok(conn);
Err(i) => internals = i,
add_connection(&self.0, &mut internals);
if self.0.cond.wait_until(&mut internals, end).timed_out() {
let event = TimeoutEvent { timeout };
return Err(Error(internals.last_error.take()));
/// Attempts to retrieve a connection from the pool if there is one
/// available.
/// Returns `None` if there are no idle connections available in the pool.
/// This method will not block waiting to establish a new connection.
pub fn try_get(&self) -> Option<PooledConnection<M>> {
fn try_get_inner<'a>(
&'a self,
mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
loop {
if let Some(mut conn) = internals.conns.pop() {
establish_idle_connections(&self.0, &mut internals);
if self.0.config.test_on_check_out {
if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
let msg = e.to_string();
// FIXME we shouldn't have to lock, unlock, and relock here
internals = self.0.internals.lock();
internals.last_error = Some(msg);
drop_conns(&self.0, internals, vec![conn.conn]);
internals = self.0.internals.lock();
return Ok(PooledConnection {
pool: self.clone(),
checkout: Instant::now(),
conn: Some(conn.conn),
} else {
return Err(internals);
fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
let event = CheckinEvent {
duration: checkout.elapsed(),
// This is specified to be fast, but call it before locking anyways
let broken = self.0.manager.has_broken(&mut conn.conn);
let mut internals = self.0.internals.lock();
if broken {
drop_conns(&self.0, internals, vec![conn]);
} else {
let conn = IdleConn {
idle_start: Instant::now(),
/// Returns information about the current state of the pool.
pub fn state(&self) -> State {
let internals = self.0.internals.lock();
State {
connections: internals.num_conns,
idle_connections: internals.conns.len() as u32,
_p: (),
/// Returns the configured maximum pool size.
pub fn max_size(&self) -> u32 {
/// Returns the configured mimimum idle connection count.
pub fn min_idle(&self) -> Option<u32> {
/// Returns if the pool is configured to test connections on check out.
pub fn test_on_check_out(&self) -> bool {
/// Returns the configured maximum connection lifetime.
pub fn max_lifetime(&self) -> Option<Duration> {
/// Returns the configured idle connection timeout.
pub fn idle_timeout(&self) -> Option<Duration> {
/// Returns the configured connection timeout.
pub fn connection_timeout(&self) -> Duration {
/// The error type returned by methods in this crate.
pub struct Error(Option<String>);
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref err) = self.0 {
write!(fmt, ": {}", err)?;
impl error::Error for Error {
fn description(&self) -> &str {
"timed out waiting for connection"
/// Information about the state of a `Pool`.
pub struct State {
/// The number of connections currently being managed by the pool.
pub connections: u32,
/// The number of idle connections.
pub idle_connections: u32,
_p: (),
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
.field("connections", &self.connections)
.field("idle_connections", &self.idle_connections)
/// A smart pointer wrapping a connection.
pub struct PooledConnection<M>
M: ManageConnection,
pool: Pool<M>,
checkout: Instant,
conn: Option<Conn<M::Connection>>,
impl<M> fmt::Debug for PooledConnection<M>
M: ManageConnection,
M::Connection: fmt::Debug,
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
impl<M> Drop for PooledConnection<M>
M: ManageConnection,
fn drop(&mut self) {
self.pool.put_back(self.checkout, self.conn.take().unwrap());
impl<M> Deref for PooledConnection<M>
M: ManageConnection,
type Target = M::Connection;
fn deref(&self) -> &M::Connection {
impl<M> DerefMut for PooledConnection<M>
M: ManageConnection,
fn deref_mut(&mut self) -> &mut M::Connection {
&mut self.conn.as_mut().unwrap().conn
impl<M> PooledConnection<M>
M: ManageConnection,
/// Returns a shared reference to the extensions associated with this connection.
pub fn extensions(this: &Self) -> &Extensions {
/// Returns a mutable reference to the extensions associated with this connection.
pub fn extensions_mut(this: &mut Self) -> &mut Extensions {
&mut this.conn.as_mut().unwrap().extensions