blob: df8afd4083ac8a36c4ca68b0dfff7ff5935ae552 [file]
//! Event Loop Executor
//!
//! Either spawns a new event loop, or re-uses provided one.
//! Spawned event loop is always single threaded (mostly for
//! historical/backward compatibility reasons) despite the fact
//! that `tokio::runtime` can be multi-threaded.
use std::io;
use tokio::runtime;
/// Task executor for Tokio 0.2 runtime.
pub type TaskExecutor = tokio::runtime::Handle;
/// Possibly uninitialized event loop executor.
#[derive(Debug)]
pub enum UninitializedExecutor {
/// Shared instance of executor.
Shared(TaskExecutor),
/// Event Loop should be spawned by the transport.
Unspawned,
}
impl UninitializedExecutor {
/// Initializes executor.
/// In case there is no shared executor, will spawn a new event loop.
/// Dropping `Executor` closes the loop.
pub fn initialize(self) -> io::Result<Executor> {
self.init_with_name("event.loop")
}
/// Initializes executor.
/// In case there is no shared executor, will spawn a new event loop.
/// Dropping `Executor` closes the loop.
pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
match self {
UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
}
}
}
/// Initialized Executor
#[derive(Debug)]
pub enum Executor {
/// Shared instance
Shared(TaskExecutor),
/// Spawned Event Loop
Spawned(RpcEventLoop),
}
impl Executor {
/// Get tokio executor associated with this event loop.
pub fn executor(&self) -> TaskExecutor {
match self {
Executor::Shared(ref executor) => executor.clone(),
Executor::Spawned(ref eloop) => eloop.executor(),
}
}
/// Closes underlying event loop (if any!).
pub fn close(self) {
if let Executor::Spawned(eloop) = self {
eloop.close()
}
}
/// Wait for underlying event loop to finish (if any!).
pub fn wait(self) {
if let Executor::Spawned(eloop) = self {
let _ = eloop.wait();
}
}
}
/// A handle to running event loop. Dropping the handle will cause event loop to finish.
#[derive(Debug)]
pub struct RpcEventLoop {
executor: TaskExecutor,
close: Option<futures::channel::oneshot::Sender<()>>,
runtime: Option<runtime::Runtime>,
}
impl Drop for RpcEventLoop {
fn drop(&mut self) {
self.close.take().map(|v| v.send(()));
}
}
impl RpcEventLoop {
/// Spawns a new thread with the `EventLoop`.
pub fn spawn() -> io::Result<Self> {
RpcEventLoop::with_name(None)
}
/// Spawns a new named thread with the `EventLoop`.
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::channel::oneshot::channel();
let mut tb = runtime::Builder::new();
tb.core_threads(1);
tb.threaded_scheduler();
tb.enable_all();
if let Some(name) = name {
tb.thread_name(name);
}
let runtime = tb.build()?;
let executor = runtime.handle().to_owned();
runtime.spawn(async {
let _ = stopped.await;
});
Ok(RpcEventLoop {
executor,
close: Some(stop),
runtime: Some(runtime),
})
}
/// Get executor for this event loop.
pub fn executor(&self) -> runtime::Handle {
self.runtime
.as_ref()
.expect("Runtime is only None if we're being dropped; qed")
.handle()
.clone()
}
/// Blocks current thread and waits until the event loop is finished.
pub fn wait(mut self) -> Result<(), ()> {
// Dropping Tokio 0.2 runtime waits for all spawned tasks to terminate
let runtime = self.runtime.take().ok_or(())?;
drop(runtime);
Ok(())
}
/// Finishes this event loop.
pub fn close(mut self) {
let _ = self
.close
.take()
.expect("Close is always set before self is consumed.")
.send(())
.map_err(|e| {
warn!("Event Loop is already finished. {:?}", e);
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn make_sure_rpc_event_loop_is_send_and_sync() {
fn is_send_and_sync<T: Send + Sync>() {}
is_send_and_sync::<RpcEventLoop>();
}
}