//! Generic Watcher implementation based on polling
//!
//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
//! Rust stdlib APIs and should work on all of the platforms it supports.

use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Mutex,
    },
    thread,
    time::Duration,
};

/// Event send for registered handler on initial directory scans
pub type ScanEvent = crate::Result<PathBuf>;

/// Handler trait for receivers of ScanEvent.  
/// Very much the same as [EventHandler], but including the Result.
///
/// See the full example for more information.
pub trait ScanEventHandler: Send + 'static {
    /// Handles an event.
    fn handle_event(&mut self, event: ScanEvent);
}

impl<F> ScanEventHandler for F
where
    F: FnMut(ScanEvent) + Send + 'static,
{
    fn handle_event(&mut self, event: ScanEvent) {
        (self)(event);
    }
}

#[cfg(feature = "crossbeam-channel")]
impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
    fn handle_event(&mut self, event: ScanEvent) {
        let _ = self.send(event);
    }
}

impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
    fn handle_event(&mut self, event: ScanEvent) {
        let _ = self.send(event);
    }
}

impl ScanEventHandler for () {
    fn handle_event(&mut self, _event: ScanEvent) {}
}

use data::{DataBuilder, WatchData};
mod data {
    use crate::{
        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
        EventHandler,
    };
    use filetime::FileTime;
    use std::{
        cell::RefCell,
        collections::{hash_map::RandomState, HashMap},
        fmt::{self, Debug},
        fs::{self, File, Metadata},
        hash::{BuildHasher, Hasher},
        io::{self, Read},
        path::{Path, PathBuf},
        time::Instant,
    };
    use walkdir::WalkDir;

    use super::ScanEventHandler;

    /// Builder for [`WatchData`] & [`PathData`].
    pub(super) struct DataBuilder {
        emitter: EventEmitter,
        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,

        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
        // in future.
        build_hasher: Option<RandomState>,

        // current timestamp for building Data.
        now: Instant,
    }

    impl DataBuilder {
        pub(super) fn new<F, G>(
            event_handler: F,
            compare_content: bool,
            scan_emitter: Option<G>,
        ) -> Self
        where
            F: EventHandler,
            G: ScanEventHandler,
        {
            let scan_emitter = match scan_emitter {
                None => None,
                Some(v) => {
                    // workaround for a weird type resolution bug when directly going to dyn Trait
                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
                        Box::new(RefCell::new(v));
                    Some(intermediate)
                }
            };
            Self {
                emitter: EventEmitter::new(event_handler),
                scan_emitter,
                build_hasher: compare_content.then(RandomState::default),
                now: Instant::now(),
            }
        }

        /// Update internal timestamp.
        pub(super) fn update_timestamp(&mut self) {
            self.now = Instant::now();
        }

        /// Create [`WatchData`].
        ///
        /// This function will return `Err(_)` if can not retrieve metadata from
        /// the path location. (e.g., not found).
        pub(super) fn build_watch_data(
            &self,
            root: PathBuf,
            is_recursive: bool,
        ) -> Option<WatchData> {
            WatchData::new(self, root, is_recursive)
        }

        /// Create [`PathData`].
        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
            PathData::new(self, meta_path)
        }
    }

    impl Debug for DataBuilder {
        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
            f.debug_struct("DataBuilder")
                .field("build_hasher", &self.build_hasher)
                .field("now", &self.now)
                .finish()
        }
    }

    #[derive(Debug)]
    pub(super) struct WatchData {
        // config part, won't change.
        root: PathBuf,
        is_recursive: bool,

        // current status part.
        all_path_data: HashMap<PathBuf, PathData>,
    }

    impl WatchData {
        /// Scan filesystem and create a new `WatchData`.
        ///
        /// # Side effect
        ///
        /// This function may send event by `data_builder.emitter`.
        fn new(data_builder: &DataBuilder, root: PathBuf, is_recursive: bool) -> Option<Self> {
            // If metadata read error at `root` path, it will emit
            // a error event and stop to create the whole `WatchData`.
            //
            // QUESTION: inconsistent?
            //
            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
            // if `root` path hit an io error, then watcher will reject to
            // create this new watch.
            //
            // This may inconsistent with *POLLING* a watch. When watcher
            // continue polling, io error at root path will not delete
            // a existing watch. polling still working.
            //
            // So, consider a config file may not exists at first time but may
            // create after a while, developer cannot watch it.
            //
            // FIXME: Can we always allow to watch a path, even file not
            // found at this path?
            if let Err(e) = fs::metadata(&root) {
                data_builder.emitter.emit_io_err(e, &root);
                return None;
            }

            let all_path_data =
                Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();

            Some(Self {
                root,
                is_recursive,
                all_path_data,
            })
        }

        /// Rescan filesystem and update this `WatchData`.
        ///
        /// # Side effect
        ///
        /// This function may emit event by `data_builder.emitter`.
        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
            // scan current filesystem.
            for (path, new_path_data) in
                Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
            {
                let old_path_data = self
                    .all_path_data
                    .insert(path.clone(), new_path_data.clone());

                // emit event
                let event =
                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
                if let Some(event) = event {
                    data_builder.emitter.emit_ok(event);
                }
            }

            // scan for disappeared paths.
            let mut disappeared_paths = Vec::new();
            for (path, path_data) in self.all_path_data.iter() {
                if path_data.last_check < data_builder.now {
                    disappeared_paths.push(path.clone());
                }
            }

            // remove disappeared paths
            for path in disappeared_paths {
                let old_path_data = self.all_path_data.remove(&path);

                // emit event
                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
                if let Some(event) = event {
                    data_builder.emitter.emit_ok(event);
                }
            }
        }

        /// Get all `PathData` by given configuration.
        ///
        /// # Side Effect
        ///
        /// This function may emit some IO Error events by `data_builder.emitter`.
        fn scan_all_path_data(
            data_builder: &'_ DataBuilder,
            root: PathBuf,
            is_recursive: bool,
            // whether this is an initial scan, used only for events
            is_initial: bool,
        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
            log::trace!("rescanning {root:?}");
            // WalkDir return only one entry if root is a file (not a folder),
            // so we can use single logic to do the both file & dir's jobs.
            //
            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
            WalkDir::new(root)
                .follow_links(true)
                .max_depth(Self::dir_scan_depth(is_recursive))
                .into_iter()
                //
                // QUESTION: should we ignore IO Error?
                //
                // current implementation ignore some IO error, e.g.,
                //
                // - `.filter_map(|entry| entry.ok())`
                // - all read error when hashing
                //
                // but the code also interest with `fs::metadata()` error and
                // propagate to event handler. It may not consistent.
                //
                // FIXME: Should we emit all IO error events? Or ignore them all?
                .filter_map(|entry_res| match entry_res {
                    Ok(entry) => Some(entry),
                    Err(err) => {
                        log::warn!("walkdir error scanning {err:?}");
                        let crate_err =
                            crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
                        data_builder.emitter.emit(Err(crate_err));
                        None
                    }
                })
                .filter_map(move |entry| match entry.metadata() {
                    Ok(metadata) => {
                        let path = entry.into_path();
                        if is_initial {
                            // emit initial scans
                            if let Some(ref emitter) = data_builder.scan_emitter {
                                emitter.borrow_mut().handle_event(Ok(path.clone()));
                            }
                        }
                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
                        let data_path = data_builder.build_path_data(&meta_path);

                        Some((meta_path.into_path(), data_path))
                    }
                    Err(e) => {
                        // emit event.
                        let path = entry.into_path();
                        data_builder.emitter.emit_io_err(e, path);

                        None
                    }
                })
        }

        fn dir_scan_depth(is_recursive: bool) -> usize {
            if is_recursive {
                usize::max_value()
            } else {
                1
            }
        }
    }

    /// Stored data for a one path locations.
    ///
    /// See [`WatchData`] for more detail.
    #[derive(Debug, Clone)]
    struct PathData {
        /// File updated time.
        mtime: i64,

        /// Content's hash value, only available if user request compare file
        /// contents and read successful.
        hash: Option<u64>,

        /// Checked time.
        last_check: Instant,
    }

    impl PathData {
        /// Create a new `PathData`.
        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
            let metadata = meta_path.metadata();

            PathData {
                mtime: FileTime::from_last_modification_time(metadata).seconds(),
                hash: data_builder
                    .build_hasher
                    .as_ref()
                    .filter(|_| metadata.is_file())
                    .and_then(|build_hasher| {
                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
                    }),

                last_check: data_builder.now,
            }
        }

        /// Get hash value for the data content in given file `path`.
        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
            let mut hasher = build_hasher.build_hasher();
            let mut file = File::open(path)?;
            let mut buf = [0; 512];

            loop {
                let n = match file.read(&mut buf) {
                    Ok(0) => break,
                    Ok(len) => len,
                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
                    Err(e) => return Err(e),
                };

                hasher.write(&buf[..n]);
            }

            Ok(hasher.finish())
        }

        /// Get [`Event`] by compare two optional [`PathData`].
        fn compare_to_event<P>(
            path: P,
            old: Option<&PathData>,
            new: Option<&PathData>,
        ) -> Option<Event>
        where
            P: Into<PathBuf>,
        {
            match (old, new) {
                (Some(old), Some(new)) => {
                    if new.mtime > old.mtime {
                        Some(EventKind::Modify(ModifyKind::Metadata(
                            MetadataKind::WriteTime,
                        )))
                    } else if new.hash != old.hash {
                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
                    } else {
                        None
                    }
                }
                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
                (None, None) => None,
            }
            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
        }
    }

    /// Compose path and its metadata.
    ///
    /// This data structure designed for make sure path and its metadata can be
    /// transferred in consistent way, and may avoid some duplicated
    /// `fs::metadata()` function call in some situations.
    #[derive(Debug)]
    pub(super) struct MetaPath {
        path: PathBuf,
        metadata: Metadata,
    }

    impl MetaPath {
        /// Create `MetaPath` by given parts.
        ///
        /// # Invariant
        ///
        /// User must make sure the input `metadata` are associated with `path`.
        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
            Self { path, metadata }
        }

        fn path(&self) -> &Path {
            &self.path
        }

        fn metadata(&self) -> &Metadata {
            &self.metadata
        }

        fn into_path(self) -> PathBuf {
            self.path
        }
    }

    /// Thin wrapper for outer event handler, for easy to use.
    struct EventEmitter(
        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
        // Use `Box` to make sure EventEmitter is Sized.
        Box<RefCell<dyn EventHandler>>,
    );

    impl EventEmitter {
        fn new<F: EventHandler>(event_handler: F) -> Self {
            Self(Box::new(RefCell::new(event_handler)))
        }

        /// Emit single event.
        fn emit(&self, event: crate::Result<Event>) {
            self.0.borrow_mut().handle_event(event);
        }

        /// Emit event.
        fn emit_ok(&self, event: Event) {
            self.emit(Ok(event))
        }

        /// Emit io error event.
        fn emit_io_err<E, P>(&self, err: E, path: P)
        where
            E: Into<io::Error>,
            P: Into<PathBuf>,
        {
            self.emit(Err(crate::Error::io(err.into()).add_path(path.into())))
        }
    }
}

/// Polling based `Watcher` implementation.
///
/// By default scans through all files and checks for changed entries based on their change date.
/// Can also be changed to perform file content change checks.
///
/// See [Config] for more details.
#[derive(Debug)]
pub struct PollWatcher {
    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
    data_builder: Arc<Mutex<DataBuilder>>,
    want_to_stop: Arc<AtomicBool>,
    /// channel to the poll loop  
    /// currently used only for manual polling
    message_channel: Sender<()>,
    delay: Option<Duration>,
}

impl PollWatcher {
    /// Create a new [PollWatcher], configured as needed.
    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
        Self::with_opt::<_, ()>(event_handler, config, None)
    }

    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
    pub fn poll(&self) -> crate::Result<()> {
        self.message_channel
            .send(())
            .map_err(|_| Error::generic("failed to send poll message"))?;
        Ok(())
    }

    /// Create a new [PollWatcher] with an scan event handler.
    ///
    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
        event_handler: F,
        config: Config,
        scan_callback: G,
    ) -> crate::Result<PollWatcher> {
        Self::with_opt(event_handler, config, Some(scan_callback))
    }

    /// create a new PollWatcher with all options
    fn with_opt<F: EventHandler, G: ScanEventHandler>(
        event_handler: F,
        config: Config,
        scan_callback: Option<G>,
    ) -> crate::Result<PollWatcher> {
        let data_builder =
            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);

        let (tx, rx) = unbounded();

        let poll_watcher = PollWatcher {
            watches: Default::default(),
            data_builder: Arc::new(Mutex::new(data_builder)),
            want_to_stop: Arc::new(AtomicBool::new(false)),
            delay: config.poll_interval_v2(),
            message_channel: tx,
        };

        poll_watcher.run(rx);

        Ok(poll_watcher)
    }

    fn run(&self, rx: Receiver<()>) {
        let watches = Arc::clone(&self.watches);
        let data_builder = Arc::clone(&self.data_builder);
        let want_to_stop = Arc::clone(&self.want_to_stop);
        let delay = self.delay;

        let _ = thread::Builder::new()
            .name("notify-rs poll loop".to_string())
            .spawn(move || {
                loop {
                    if want_to_stop.load(Ordering::SeqCst) {
                        break;
                    }

                    // HINT: Make sure always lock in the same order to avoid deadlock.
                    //
                    // FIXME: inconsistent: some place mutex poison cause panic,
                    // some place just ignore.
                    if let (Ok(mut watches), Ok(mut data_builder)) =
                        (watches.lock(), data_builder.lock())
                    {
                        data_builder.update_timestamp();

                        let vals = watches.values_mut();
                        for watch_data in vals {
                            watch_data.rescan(&mut data_builder);
                        }
                    }
                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
                    if let Some(delay) = delay {
                        let _ = rx.recv_timeout(delay);
                    } else {
                        let _ = rx.recv();
                    }
                }
            });
    }

    /// Watch a path location.
    ///
    /// QUESTION: this function never return an Error, is it as intend?
    /// Please also consider the IO Error event problem.
    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
        // HINT: Make sure always lock in the same order to avoid deadlock.
        //
        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
        if let (Ok(mut watches), Ok(mut data_builder)) =
            (self.watches.lock(), self.data_builder.lock())
        {
            data_builder.update_timestamp();

            let watch_data =
                data_builder.build_watch_data(path.to_path_buf(), recursive_mode.is_recursive());

            // if create watch_data successful, add it to watching list.
            if let Some(watch_data) = watch_data {
                watches.insert(path.to_path_buf(), watch_data);
            }
        }
    }

    /// Unwatch a path.
    ///
    /// Return `Err(_)` if given path has't be monitored.
    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
        self.watches
            .lock()
            .unwrap()
            .remove(path)
            .map(|_| ())
            .ok_or_else(crate::Error::watch_not_found)
    }
}

impl Watcher for PollWatcher {
    /// Create a new [PollWatcher].
    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
        Self::new(event_handler, config)
    }

    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
        self.watch_inner(path, recursive_mode);

        Ok(())
    }

    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
        self.unwatch_inner(path)
    }

    fn kind() -> crate::WatcherKind {
        crate::WatcherKind::PollWatcher
    }
}

impl Drop for PollWatcher {
    fn drop(&mut self) {
        self.want_to_stop.store(true, Ordering::Relaxed);
    }
}

#[test]
fn poll_watcher_is_send_and_sync() {
    fn check<T: Send + Sync>() {}
    check::<PollWatcher>();
}
