//! Watcher implementation for Darwin's FSEvents API | |
//! | |
//! The FSEvents API provides a mechanism to notify clients about directories they ought to re-scan | |
//! in order to keep their internal data structures up-to-date with respect to the true state of | |
//! the file system. (For example, when files or directories are created, modified, or removed.) It | |
//! sends these notifications "in bulk", possibly notifying the client of changes to several | |
//! directories in a single callback. | |
//! | |
//! For more information see the [FSEvents API reference][ref]. | |
//! | |
//! TODO: document event translation | |
//! | |
//! [ref]: https://developer.apple.com/library/mac/documentation/Darwin/Reference/FSEvents_Ref/ | |
#![allow(non_upper_case_globals, dead_code)] | |
use crate::event::*; | |
use crate::{unbounded, Config, Error, EventHandler, RecursiveMode, Result, Sender, Watcher}; | |
use fsevent_sys as fs; | |
use fsevent_sys::core_foundation as cf; | |
use std::collections::HashMap; | |
use std::ffi::CStr; | |
use std::fmt; | |
use std::os::raw; | |
use std::path::{Path, PathBuf}; | |
use std::ptr; | |
use std::sync::{Arc, Mutex}; | |
use std::thread; | |
bitflags::bitflags! { | |
#[repr(C)] | |
#[derive(Debug)] | |
struct StreamFlags: u32 { | |
const NONE = fs::kFSEventStreamEventFlagNone; | |
const MUST_SCAN_SUBDIRS = fs::kFSEventStreamEventFlagMustScanSubDirs; | |
const USER_DROPPED = fs::kFSEventStreamEventFlagUserDropped; | |
const KERNEL_DROPPED = fs::kFSEventStreamEventFlagKernelDropped; | |
const IDS_WRAPPED = fs::kFSEventStreamEventFlagEventIdsWrapped; | |
const HISTORY_DONE = fs::kFSEventStreamEventFlagHistoryDone; | |
const ROOT_CHANGED = fs::kFSEventStreamEventFlagRootChanged; | |
const MOUNT = fs::kFSEventStreamEventFlagMount; | |
const UNMOUNT = fs::kFSEventStreamEventFlagUnmount; | |
const ITEM_CREATED = fs::kFSEventStreamEventFlagItemCreated; | |
const ITEM_REMOVED = fs::kFSEventStreamEventFlagItemRemoved; | |
const INODE_META_MOD = fs::kFSEventStreamEventFlagItemInodeMetaMod; | |
const ITEM_RENAMED = fs::kFSEventStreamEventFlagItemRenamed; | |
const ITEM_MODIFIED = fs::kFSEventStreamEventFlagItemModified; | |
const FINDER_INFO_MOD = fs::kFSEventStreamEventFlagItemFinderInfoMod; | |
const ITEM_CHANGE_OWNER = fs::kFSEventStreamEventFlagItemChangeOwner; | |
const ITEM_XATTR_MOD = fs::kFSEventStreamEventFlagItemXattrMod; | |
const IS_FILE = fs::kFSEventStreamEventFlagItemIsFile; | |
const IS_DIR = fs::kFSEventStreamEventFlagItemIsDir; | |
const IS_SYMLINK = fs::kFSEventStreamEventFlagItemIsSymlink; | |
const OWN_EVENT = fs::kFSEventStreamEventFlagOwnEvent; | |
const IS_HARDLINK = fs::kFSEventStreamEventFlagItemIsHardlink; | |
const IS_LAST_HARDLINK = fs::kFSEventStreamEventFlagItemIsLastHardlink; | |
const ITEM_CLONED = fs::kFSEventStreamEventFlagItemCloned; | |
} | |
} | |
/// FSEvents-based `Watcher` implementation | |
pub struct FsEventWatcher { | |
paths: cf::CFMutableArrayRef, | |
since_when: fs::FSEventStreamEventId, | |
latency: cf::CFTimeInterval, | |
flags: fs::FSEventStreamCreateFlags, | |
event_handler: Arc<Mutex<dyn EventHandler>>, | |
runloop: Option<(cf::CFRunLoopRef, thread::JoinHandle<()>)>, | |
recursive_info: HashMap<PathBuf, bool>, | |
} | |
impl fmt::Debug for FsEventWatcher { | |
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | |
f.debug_struct("FsEventWatcher") | |
.field("paths", &self.paths) | |
.field("since_when", &self.since_when) | |
.field("latency", &self.latency) | |
.field("flags", &self.flags) | |
.field("event_handler", &Arc::as_ptr(&self.event_handler)) | |
.field("runloop", &self.runloop) | |
.field("recursive_info", &self.recursive_info) | |
.finish() | |
} | |
} | |
// CFMutableArrayRef is a type alias to *mut libc::c_void, so FsEventWatcher is not Send/Sync | |
// automatically. It's Send because the pointer is not used in other threads. | |
unsafe impl Send for FsEventWatcher {} | |
// It's Sync because all methods that change the mutable state use `&mut self`. | |
unsafe impl Sync for FsEventWatcher {} | |
fn translate_flags(flags: StreamFlags, precise: bool) -> Vec<Event> { | |
let mut evs = Vec::new(); | |
// «Denotes a sentinel event sent to mark the end of the "historical" events | |
// sent as a result of specifying a `sinceWhen` value in the FSEvents.Create | |
// call that created this event stream. After invoking the client's callback | |
// with all the "historical" events that occurred before now, the client's | |
// callback will be invoked with an event where the HistoryDone flag is set. | |
// The client should ignore the path supplied in this callback.» | |
// — https://www.mbsplugins.eu/FSEventsNextEvent.shtml | |
// | |
// As a result, we just stop processing here and return an empty vec, which | |
// will ignore this completely and not emit any Events whatsoever. | |
if flags.contains(StreamFlags::HISTORY_DONE) { | |
return evs; | |
} | |
// FSEvents provides two possible hints as to why events were dropped, | |
// however documentation on what those mean is scant, so we just pass them | |
// through in the info attr field. The intent is clear enough, and the | |
// additional information is provided if the user wants it. | |
if flags.contains(StreamFlags::MUST_SCAN_SUBDIRS) { | |
let e = Event::new(EventKind::Other).set_flag(Flag::Rescan); | |
evs.push(if flags.contains(StreamFlags::USER_DROPPED) { | |
e.set_info("rescan: user dropped") | |
} else if flags.contains(StreamFlags::KERNEL_DROPPED) { | |
e.set_info("rescan: kernel dropped") | |
} else { | |
e | |
}); | |
} | |
// In imprecise mode, let's not even bother parsing the kind of the event | |
// except for the above very special events. | |
if !precise { | |
evs.push(Event::new(EventKind::Any)); | |
return evs; | |
} | |
// This is most likely a rename or a removal. We assume rename but may want | |
// to figure out if it was a removal some way later (TODO). To denote the | |
// special nature of the event, we add an info string. | |
if flags.contains(StreamFlags::ROOT_CHANGED) { | |
evs.push( | |
Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::From))) | |
.set_info("root changed"), | |
); | |
} | |
// A path was mounted at the event path; we treat that as a create. | |
if flags.contains(StreamFlags::MOUNT) { | |
evs.push(Event::new(EventKind::Create(CreateKind::Other)).set_info("mount")); | |
} | |
// A path was unmounted at the event path; we treat that as a remove. | |
if flags.contains(StreamFlags::UNMOUNT) { | |
evs.push(Event::new(EventKind::Remove(RemoveKind::Other)).set_info("mount")); | |
} | |
if flags.contains(StreamFlags::ITEM_CREATED) { | |
evs.push(if flags.contains(StreamFlags::IS_DIR) { | |
Event::new(EventKind::Create(CreateKind::Folder)) | |
} else if flags.contains(StreamFlags::IS_FILE) { | |
Event::new(EventKind::Create(CreateKind::File)) | |
} else { | |
let e = Event::new(EventKind::Create(CreateKind::Other)); | |
if flags.contains(StreamFlags::IS_SYMLINK) { | |
e.set_info("is: symlink") | |
} else if flags.contains(StreamFlags::IS_HARDLINK) { | |
e.set_info("is: hardlink") | |
} else if flags.contains(StreamFlags::ITEM_CLONED) { | |
e.set_info("is: clone") | |
} else { | |
Event::new(EventKind::Create(CreateKind::Any)) | |
} | |
}); | |
} | |
if flags.contains(StreamFlags::ITEM_REMOVED) { | |
evs.push(if flags.contains(StreamFlags::IS_DIR) { | |
Event::new(EventKind::Remove(RemoveKind::Folder)) | |
} else if flags.contains(StreamFlags::IS_FILE) { | |
Event::new(EventKind::Remove(RemoveKind::File)) | |
} else { | |
let e = Event::new(EventKind::Remove(RemoveKind::Other)); | |
if flags.contains(StreamFlags::IS_SYMLINK) { | |
e.set_info("is: symlink") | |
} else if flags.contains(StreamFlags::IS_HARDLINK) { | |
e.set_info("is: hardlink") | |
} else if flags.contains(StreamFlags::ITEM_CLONED) { | |
e.set_info("is: clone") | |
} else { | |
Event::new(EventKind::Remove(RemoveKind::Any)) | |
} | |
}); | |
} | |
// FSEvents provides no mechanism to associate the old and new sides of a | |
// rename event. | |
if flags.contains(StreamFlags::ITEM_RENAMED) { | |
evs.push(Event::new(EventKind::Modify(ModifyKind::Name( | |
RenameMode::Any, | |
)))); | |
} | |
// This is only described as "metadata changed", but it may be that it's | |
// only emitted for some more precise subset of events... if so, will need | |
// amending, but for now we have an Any-shaped bucket to put it in. | |
if flags.contains(StreamFlags::INODE_META_MOD) { | |
evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
MetadataKind::Any, | |
)))); | |
} | |
if flags.contains(StreamFlags::FINDER_INFO_MOD) { | |
evs.push( | |
Event::new(EventKind::Modify(ModifyKind::Metadata(MetadataKind::Other))) | |
.set_info("meta: finder info"), | |
); | |
} | |
if flags.contains(StreamFlags::ITEM_CHANGE_OWNER) { | |
evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
MetadataKind::Ownership, | |
)))); | |
} | |
if flags.contains(StreamFlags::ITEM_XATTR_MOD) { | |
evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( | |
MetadataKind::Extended, | |
)))); | |
} | |
// This is specifically described as a data change, which we take to mean | |
// is a content change. | |
if flags.contains(StreamFlags::ITEM_MODIFIED) { | |
evs.push(Event::new(EventKind::Modify(ModifyKind::Data( | |
DataChange::Content, | |
)))); | |
} | |
if flags.contains(StreamFlags::OWN_EVENT) { | |
for ev in &mut evs { | |
*ev = std::mem::take(ev).set_process_id(std::process::id()); | |
} | |
} | |
evs | |
} | |
struct StreamContextInfo { | |
event_handler: Arc<Mutex<dyn EventHandler>>, | |
recursive_info: HashMap<PathBuf, bool>, | |
} | |
// Free the context when the stream created by `FSEventStreamCreate` is released. | |
extern "C" fn release_context(info: *const libc::c_void) { | |
// Safety: | |
// - The [documentation] for `FSEventStreamContext` states that `release` is only | |
// called when the stream is deallocated, so it is safe to convert `info` back into a | |
// box and drop it. | |
// | |
// [docs]: https://developer.apple.com/documentation/coreservices/fseventstreamcontext?language=objc | |
unsafe { | |
drop(Box::from_raw( | |
info as *const StreamContextInfo as *mut StreamContextInfo, | |
)); | |
} | |
} | |
extern "C" { | |
/// Indicates whether the run loop is waiting for an event. | |
fn CFRunLoopIsWaiting(runloop: cf::CFRunLoopRef) -> cf::Boolean; | |
} | |
impl FsEventWatcher { | |
fn from_event_handler(event_handler: Arc<Mutex<dyn EventHandler>>) -> Result<Self> { | |
Ok(FsEventWatcher { | |
paths: unsafe { | |
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks) | |
}, | |
since_when: fs::kFSEventStreamEventIdSinceNow, | |
latency: 0.0, | |
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer, | |
event_handler, | |
runloop: None, | |
recursive_info: HashMap::new(), | |
}) | |
} | |
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
self.stop(); | |
let result = self.append_path(path, recursive_mode); | |
// ignore return error: may be empty path list | |
let _ = self.run(); | |
result | |
} | |
fn unwatch_inner(&mut self, path: &Path) -> Result<()> { | |
self.stop(); | |
let result = self.remove_path(path); | |
// ignore return error: may be empty path list | |
let _ = self.run(); | |
result | |
} | |
#[inline] | |
fn is_running(&self) -> bool { | |
self.runloop.is_some() | |
} | |
fn stop(&mut self) { | |
if !self.is_running() { | |
return; | |
} | |
if let Some((runloop, thread_handle)) = self.runloop.take() { | |
unsafe { | |
let runloop = runloop as *mut raw::c_void; | |
while CFRunLoopIsWaiting(runloop) == 0 { | |
thread::yield_now(); | |
} | |
cf::CFRunLoopStop(runloop); | |
} | |
// Wait for the thread to shut down. | |
thread_handle.join().expect("thread to shut down"); | |
} | |
} | |
fn remove_path(&mut self, path: &Path) -> Result<()> { | |
let str_path = path.to_str().unwrap(); | |
unsafe { | |
let mut err: cf::CFErrorRef = ptr::null_mut(); | |
let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); | |
if cf_path.is_null() { | |
cf::CFRelease(err as cf::CFRef); | |
return Err(Error::watch_not_found().add_path(path.into())); | |
} | |
let mut to_remove = Vec::new(); | |
for idx in 0..cf::CFArrayGetCount(self.paths) { | |
let item = cf::CFArrayGetValueAtIndex(self.paths, idx); | |
if cf::CFStringCompare(item, cf_path, cf::kCFCompareCaseInsensitive) | |
== cf::kCFCompareEqualTo | |
{ | |
to_remove.push(idx); | |
} | |
} | |
cf::CFRelease(cf_path); | |
for idx in to_remove.iter().rev() { | |
cf::CFArrayRemoveValueAtIndex(self.paths, *idx); | |
} | |
} | |
let p = if let Ok(canonicalized_path) = path.canonicalize() { | |
canonicalized_path | |
} else { | |
path.to_owned() | |
}; | |
match self.recursive_info.remove(&p) { | |
Some(_) => Ok(()), | |
None => Err(Error::watch_not_found()), | |
} | |
} | |
// https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c | |
fn append_path(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
if !path.exists() { | |
return Err(Error::path_not_found().add_path(path.into())); | |
} | |
let canonical_path = path.to_path_buf().canonicalize()?; | |
let str_path = path.to_str().unwrap(); | |
unsafe { | |
let mut err: cf::CFErrorRef = ptr::null_mut(); | |
let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); | |
if cf_path.is_null() { | |
// Most likely the directory was deleted, or permissions changed, | |
// while the above code was running. | |
cf::CFRelease(err as cf::CFRef); | |
return Err(Error::path_not_found().add_path(path.into())); | |
} | |
cf::CFArrayAppendValue(self.paths, cf_path); | |
cf::CFRelease(cf_path); | |
} | |
self.recursive_info | |
.insert(canonical_path, recursive_mode.is_recursive()); | |
Ok(()) | |
} | |
fn run(&mut self) -> Result<()> { | |
if unsafe { cf::CFArrayGetCount(self.paths) } == 0 { | |
// TODO: Reconstruct and add paths to error | |
return Err(Error::path_not_found()); | |
} | |
// We need to associate the stream context with our callback in order to propagate events | |
// to the rest of the system. This will be owned by the stream, and will be freed when the | |
// stream is closed. This means we will leak the context if we panic before reacing | |
// `FSEventStreamRelease`. | |
let context = Box::into_raw(Box::new(StreamContextInfo { | |
event_handler: self.event_handler.clone(), | |
recursive_info: self.recursive_info.clone(), | |
})); | |
let stream_context = fs::FSEventStreamContext { | |
version: 0, | |
info: context as *mut libc::c_void, | |
retain: None, | |
release: Some(release_context), | |
copy_description: None, | |
}; | |
let stream = unsafe { | |
fs::FSEventStreamCreate( | |
cf::kCFAllocatorDefault, | |
callback, | |
&stream_context, | |
self.paths, | |
self.since_when, | |
self.latency, | |
self.flags, | |
) | |
}; | |
// Wrapper to help send CFRef types across threads. | |
struct CFSendWrapper(cf::CFRef); | |
// Safety: | |
// - According to the Apple documentation, it's safe to move `CFRef`s across threads. | |
// https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html | |
unsafe impl Send for CFSendWrapper {} | |
// move into thread | |
let stream = CFSendWrapper(stream); | |
// channel to pass runloop around | |
let (rl_tx, rl_rx) = unbounded(); | |
let thread_handle = thread::Builder::new() | |
.name("notify-rs fsevents loop".to_string()) | |
.spawn(move || { | |
let _ = &stream; | |
let stream = stream.0; | |
unsafe { | |
let cur_runloop = cf::CFRunLoopGetCurrent(); | |
fs::FSEventStreamScheduleWithRunLoop( | |
stream, | |
cur_runloop, | |
cf::kCFRunLoopDefaultMode, | |
); | |
fs::FSEventStreamStart(stream); | |
// the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop() | |
rl_tx | |
.send(CFSendWrapper(cur_runloop)) | |
.expect("Unable to send runloop to watcher"); | |
cf::CFRunLoopRun(); | |
fs::FSEventStreamStop(stream); | |
fs::FSEventStreamInvalidate(stream); | |
fs::FSEventStreamRelease(stream); | |
} | |
})?; | |
// block until runloop has been sent | |
self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle)); | |
Ok(()) | |
} | |
fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) { | |
tx.send(Ok(false)) | |
.expect("configuration channel disconnect"); | |
} | |
} | |
extern "C" fn callback( | |
stream_ref: fs::FSEventStreamRef, | |
info: *mut libc::c_void, | |
num_events: libc::size_t, // size_t numEvents | |
event_paths: *mut libc::c_void, // void *eventPaths | |
event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] | |
event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] | |
) { | |
unsafe { | |
callback_impl( | |
stream_ref, | |
info, | |
num_events, | |
event_paths, | |
event_flags, | |
event_ids, | |
) | |
} | |
} | |
unsafe fn callback_impl( | |
_stream_ref: fs::FSEventStreamRef, | |
info: *mut libc::c_void, | |
num_events: libc::size_t, // size_t numEvents | |
event_paths: *mut libc::c_void, // void *eventPaths | |
event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] | |
_event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] | |
) { | |
let event_paths = event_paths as *const *const libc::c_char; | |
let info = info as *const StreamContextInfo; | |
let event_handler = &(*info).event_handler; | |
for p in 0..num_events { | |
let path = CStr::from_ptr(*event_paths.add(p)) | |
.to_str() | |
.expect("Invalid UTF8 string."); | |
let path = PathBuf::from(path); | |
let flag = *event_flags.add(p); | |
let flag = StreamFlags::from_bits(flag).unwrap_or_else(|| { | |
panic!("Unable to decode StreamFlags: {}", flag); | |
}); | |
let mut handle_event = false; | |
for (p, r) in &(*info).recursive_info { | |
if path.starts_with(p) { | |
if *r || &path == p { | |
handle_event = true; | |
break; | |
} else if let Some(parent_path) = path.parent() { | |
if parent_path == p { | |
handle_event = true; | |
break; | |
} | |
} | |
} | |
} | |
if !handle_event { | |
continue; | |
} | |
log::trace!("FSEvent: path = `{}`, flag = {:?}", path.display(), flag); | |
for ev in translate_flags(flag, true).into_iter() { | |
// TODO: precise | |
let ev = ev.add_path(path.clone()); | |
let mut event_handler = event_handler.lock().expect("lock not to be poisoned"); | |
event_handler.handle_event(Ok(ev)); | |
} | |
} | |
} | |
impl Watcher for FsEventWatcher { | |
/// Create a new watcher. | |
fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> { | |
Self::from_event_handler(Arc::new(Mutex::new(event_handler))) | |
} | |
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { | |
self.watch_inner(path, recursive_mode) | |
} | |
fn unwatch(&mut self, path: &Path) -> Result<()> { | |
self.unwatch_inner(path) | |
} | |
fn configure(&mut self, config: Config) -> Result<bool> { | |
let (tx, rx) = unbounded(); | |
self.configure_raw_mode(config, tx); | |
rx.recv()? | |
} | |
fn kind() -> crate::WatcherKind { | |
crate::WatcherKind::Fsevent | |
} | |
} | |
impl Drop for FsEventWatcher { | |
fn drop(&mut self) { | |
self.stop(); | |
unsafe { | |
cf::CFRelease(self.paths); | |
} | |
} | |
} | |
#[test] | |
fn test_fsevent_watcher_drop() { | |
use super::*; | |
use std::time::Duration; | |
let dir = tempfile::tempdir().unwrap(); | |
let (tx, rx) = std::sync::mpsc::channel(); | |
{ | |
let mut watcher = FsEventWatcher::new(tx, Default::default()).unwrap(); | |
watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap(); | |
thread::sleep(Duration::from_millis(2000)); | |
println!("is running -> {}", watcher.is_running()); | |
thread::sleep(Duration::from_millis(1000)); | |
watcher.unwatch(dir.path()).unwrap(); | |
println!("is running -> {}", watcher.is_running()); | |
} | |
thread::sleep(Duration::from_millis(1000)); | |
for res in rx { | |
let e = res.unwrap(); | |
println!("debug => {:?} {:?}", e.kind, e.paths); | |
} | |
println!("in test: {} works", file!()); | |
} | |
#[test] | |
fn test_steam_context_info_send_and_sync() { | |
fn check_send<T: Send + Sync>() {} | |
check_send::<StreamContextInfo>(); | |
} |