blob: bba12e2431af98c30165729ef1840860d719c26d [file] [log] [blame] [edit]
//! Watcher implementation for the inotify Linux API
//!
//! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to
//! monitor individual files, or to monitor directories. When a directory is monitored, inotify
//! will return events for the directory itself, and for files inside the directory.
use super::event::*;
use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
use inotify as inotify_sys;
use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
use std::collections::HashMap;
use std::env;
use std::ffi::OsStr;
use std::fs::metadata;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use walkdir::WalkDir;
const INOTIFY: mio::Token = mio::Token(0);
const MESSAGE: mio::Token = mio::Token(1);
// The EventLoop will set up a mio::Poll and use it to wait for the following:
//
// - messages telling it what to do
//
// - events telling it that something has happened on one of the watched files.
struct EventLoop {
running: bool,
poll: mio::Poll,
event_loop_waker: Arc<mio::Waker>,
event_loop_tx: Sender<EventLoopMsg>,
event_loop_rx: Receiver<EventLoopMsg>,
inotify: Option<Inotify>,
event_handler: Box<dyn EventHandler>,
watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
paths: HashMap<WatchDescriptor, PathBuf>,
rename_event: Option<Event>,
}
/// Watcher implementation based on inotify
#[derive(Debug)]
pub struct INotifyWatcher {
channel: Sender<EventLoopMsg>,
waker: Arc<mio::Waker>,
}
enum EventLoopMsg {
AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
RemoveWatch(PathBuf, Sender<Result<()>>),
Shutdown,
Configure(Config, BoundSender<Result<bool>>),
}
#[inline]
fn add_watch_by_event(
path: &Option<PathBuf>,
event: &inotify_sys::Event<&OsStr>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
add_watches: &mut Vec<PathBuf>,
) {
if let Some(ref path) = *path {
if event.mask.contains(EventMask::ISDIR) {
if let Some(parent_path) = path.parent() {
if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
if is_recursive {
add_watches.push(path.to_owned());
}
}
}
}
}
}
#[inline]
fn remove_watch_by_event(
path: &Option<PathBuf>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
remove_watches: &mut Vec<PathBuf>,
) {
if let Some(ref path) = *path {
if watches.contains_key(path) {
remove_watches.push(path.to_owned());
}
}
}
impl EventLoop {
pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
let poll = mio::Poll::new()?;
let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
let inotify_fd = inotify.as_raw_fd();
let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
poll.registry()
.register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
let event_loop = EventLoop {
running: true,
poll,
event_loop_waker,
event_loop_tx,
event_loop_rx,
inotify: Some(inotify),
event_handler,
watches: HashMap::new(),
paths: HashMap::new(),
rename_event: None,
};
Ok(event_loop)
}
// Run the event loop.
pub fn run(self) {
let _ = thread::Builder::new()
.name("notify-rs inotify loop".to_string())
.spawn(|| self.event_loop_thread());
}
fn event_loop_thread(mut self) {
let mut events = mio::Events::with_capacity(16);
loop {
// Wait for something to happen.
match self.poll.poll(&mut events, None) {
Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
// System call was interrupted, we will retry
// TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
}
Err(e) => panic!("poll failed: {}", e),
Ok(()) => {}
}
// Process whatever happened.
for event in &events {
self.handle_event(event);
}
// Stop, if we're done.
if !self.running {
break;
}
}
}
// Handle a single event.
fn handle_event(&mut self, event: &mio::event::Event) {
match event.token() {
MESSAGE => {
// The channel is readable - handle messages.
self.handle_messages()
}
INOTIFY => {
// inotify has something to tell us.
self.handle_inotify()
}
_ => unreachable!(),
}
}
fn handle_messages(&mut self) {
while let Ok(msg) = self.event_loop_rx.try_recv() {
match msg {
EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
}
EventLoopMsg::RemoveWatch(path, tx) => {
let _ = tx.send(self.remove_watch(path, false));
}
EventLoopMsg::Shutdown => {
let _ = self.remove_all_watches();
if let Some(inotify) = self.inotify.take() {
let _ = inotify.close();
}
self.running = false;
break;
}
EventLoopMsg::Configure(config, tx) => {
self.configure_raw_mode(config, tx);
}
}
}
}
fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
tx.send(Ok(false))
.expect("configuration channel disconnected");
}
fn handle_inotify(&mut self) {
let mut add_watches = Vec::new();
let mut remove_watches = Vec::new();
if let Some(ref mut inotify) = self.inotify {
let mut buffer = [0; 1024];
// Read all buffers available.
loop {
match inotify.read_events(&mut buffer) {
Ok(events) => {
let mut num_events = 0;
for event in events {
log::trace!("inotify event: {event:?}");
num_events += 1;
if event.mask.contains(EventMask::Q_OVERFLOW) {
let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
self.event_handler.handle_event(ev);
}
let path = match event.name {
Some(name) => {
self.paths.get(&event.wd).map(|root| root.join(&name))
}
None => self.paths.get(&event.wd).cloned(),
};
let mut evs = Vec::new();
if event.mask.contains(EventMask::MOVED_FROM) {
remove_watch_by_event(&path, &self.watches, &mut remove_watches);
let event = Event::new(EventKind::Modify(ModifyKind::Name(
RenameMode::From,
)))
.add_some_path(path.clone())
.set_tracker(event.cookie as usize);
self.rename_event = Some(event.clone());
evs.push(event);
} else if event.mask.contains(EventMask::MOVED_TO) {
evs.push(
Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
.set_tracker(event.cookie as usize)
.add_some_path(path.clone()),
);
let trackers_match = self
.rename_event
.as_ref()
.and_then(|e| e.tracker())
.map_or(false, |from_tracker| {
from_tracker == event.cookie as usize
});
if trackers_match {
let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
evs.push(
Event::new(EventKind::Modify(ModifyKind::Name(
RenameMode::Both,
)))
.set_tracker(event.cookie as usize)
.add_some_path(rename_event.paths.first().cloned())
.add_some_path(path.clone()),
);
}
add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
}
if event.mask.contains(EventMask::MOVE_SELF) {
evs.push(
Event::new(EventKind::Modify(ModifyKind::Name(
RenameMode::From,
)))
.add_some_path(path.clone()),
);
// TODO stat the path and get to new path
// - emit To and Both events
// - change prefix for further events
}
if event.mask.contains(EventMask::CREATE) {
evs.push(
Event::new(EventKind::Create(
if event.mask.contains(EventMask::ISDIR) {
CreateKind::Folder
} else {
CreateKind::File
},
))
.add_some_path(path.clone()),
);
add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
}
if event.mask.contains(EventMask::DELETE_SELF)
|| event.mask.contains(EventMask::DELETE)
{
evs.push(
Event::new(EventKind::Remove(
if event.mask.contains(EventMask::ISDIR) {
RemoveKind::Folder
} else {
RemoveKind::File
},
))
.add_some_path(path.clone()),
);
remove_watch_by_event(&path, &self.watches, &mut remove_watches);
}
if event.mask.contains(EventMask::MODIFY) {
evs.push(
Event::new(EventKind::Modify(ModifyKind::Data(
DataChange::Any,
)))
.add_some_path(path.clone()),
);
}
if event.mask.contains(EventMask::CLOSE_WRITE) {
evs.push(
Event::new(EventKind::Access(AccessKind::Close(
AccessMode::Write,
)))
.add_some_path(path.clone()),
);
}
if event.mask.contains(EventMask::CLOSE_NOWRITE) {
evs.push(
Event::new(EventKind::Access(AccessKind::Close(
AccessMode::Read,
)))
.add_some_path(path.clone()),
);
}
if event.mask.contains(EventMask::ATTRIB) {
evs.push(
Event::new(EventKind::Modify(ModifyKind::Metadata(
MetadataKind::Any,
)))
.add_some_path(path.clone()),
);
}
if event.mask.contains(EventMask::OPEN) {
evs.push(
Event::new(EventKind::Access(AccessKind::Open(
AccessMode::Any,
)))
.add_some_path(path.clone()),
);
}
for ev in evs {
self.event_handler.handle_event(Ok(ev));
}
}
// All events read. Break out.
if num_events == 0 {
break;
}
}
Err(e) => {
self.event_handler.handle_event(Err(Error::io(e)));
}
}
}
}
for path in remove_watches {
self.remove_watch(path, true).ok();
}
for path in add_watches {
self.add_watch(path, true, false).ok();
}
}
fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
// If the watch is not recursive, or if we determine (by stat'ing the path to get its
// metadata) that the watched path is not a directory, add a single path watch.
if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
return self.add_single_watch(path, false, true);
}
for entry in WalkDir::new(path)
.follow_links(true)
.into_iter()
.filter_map(filter_dir)
{
self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
watch_self = false;
}
Ok(())
}
fn add_single_watch(
&mut self,
path: PathBuf,
is_recursive: bool,
watch_self: bool,
) -> Result<()> {
let mut watchmask = WatchMask::ATTRIB
| WatchMask::CREATE
| WatchMask::DELETE
| WatchMask::CLOSE_WRITE
| WatchMask::MODIFY
| WatchMask::MOVED_FROM
| WatchMask::MOVED_TO;
if watch_self {
watchmask.insert(WatchMask::DELETE_SELF);
watchmask.insert(WatchMask::MOVE_SELF);
}
if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
watchmask.insert(old_watchmask);
watchmask.insert(WatchMask::MASK_ADD);
}
if let Some(ref mut inotify) = self.inotify {
log::trace!("adding inotify watch: {}", path.display());
match inotify.add_watch(&path, watchmask) {
Err(e) => {
Err(if e.raw_os_error() == Some(libc::ENOSPC) {
// do not report inotify limits as "no more space" on linux #266
Error::new(ErrorKind::MaxFilesWatch)
} else {
Error::io(e)
}
.add_path(path))
}
Ok(w) => {
watchmask.remove(WatchMask::MASK_ADD);
self.watches
.insert(path.clone(), (w.clone(), watchmask, is_recursive));
self.paths.insert(w, path);
Ok(())
}
}
} else {
Ok(())
}
}
fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
match self.watches.remove(&path) {
None => return Err(Error::watch_not_found().add_path(path)),
Some((w, _, is_recursive)) => {
if let Some(ref mut inotify) = self.inotify {
log::trace!("removing inotify watch: {}", path.display());
inotify
.rm_watch(w.clone())
.map_err(|e| Error::io(e).add_path(path.clone()))?;
self.paths.remove(&w);
if is_recursive || remove_recursive {
let mut remove_list = Vec::new();
for (w, p) in &self.paths {
if p.starts_with(&path) {
inotify
.rm_watch(w.clone())
.map_err(|e| Error::io(e).add_path(p.into()))?;
self.watches.remove(p);
remove_list.push(w.clone());
}
}
for w in remove_list {
self.paths.remove(&w);
}
}
}
}
}
Ok(())
}
fn remove_all_watches(&mut self) -> Result<()> {
if let Some(ref mut inotify) = self.inotify {
for (w, p) in &self.paths {
inotify
.rm_watch(w.clone())
.map_err(|e| Error::io(e).add_path(p.into()))?;
}
self.watches.clear();
self.paths.clear();
}
Ok(())
}
}
/// return `DirEntry` when it is a directory
fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
if let Ok(e) = e {
if let Ok(metadata) = e.metadata() {
if metadata.is_dir() {
return Some(e);
}
}
}
None
}
impl INotifyWatcher {
fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
let inotify = Inotify::init()?;
let event_loop = EventLoop::new(inotify, event_handler)?;
let channel = event_loop.event_loop_tx.clone();
let waker = event_loop.event_loop_waker.clone();
event_loop.run();
Ok(INotifyWatcher { channel, waker })
}
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let (tx, rx) = unbounded();
let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
// we expect the event loop to live and reply => unwraps must not panic
self.channel.send(msg).unwrap();
self.waker.wake().unwrap();
rx.recv().unwrap()
}
fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let (tx, rx) = unbounded();
let msg = EventLoopMsg::RemoveWatch(pb, tx);
// we expect the event loop to live and reply => unwraps must not panic
self.channel.send(msg).unwrap();
self.waker.wake().unwrap();
rx.recv().unwrap()
}
}
impl Watcher for INotifyWatcher {
/// Create a new watcher.
fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
Self::from_event_handler(Box::new(event_handler))
}
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
}
fn unwatch(&mut self, path: &Path) -> Result<()> {
self.unwatch_inner(path)
}
fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = bounded(1);
self.channel.send(EventLoopMsg::Configure(config, tx))?;
self.waker.wake()?;
rx.recv()?
}
fn kind() -> crate::WatcherKind {
crate::WatcherKind::Inotify
}
}
impl Drop for INotifyWatcher {
fn drop(&mut self) {
// we expect the event loop to live => unwrap must not panic
self.channel.send(EventLoopMsg::Shutdown).unwrap();
self.waker.wake().unwrap();
}
}
#[test]
fn inotify_watcher_is_send_and_sync() {
fn check<T: Send + Sync>() {}
check::<INotifyWatcher>();
}