use std::convert::TryFrom;

use rayon::iter::plumbing::{Consumer, Folder, Producer, ProducerCallback, UnindexedConsumer};
use rayon::iter::{IndexedParallelIterator, ParallelIterator};

use crate::{ProgressBar, ProgressBarIter};

/// Wraps a Rayon parallel iterator.
///
/// See [`ProgressIterator`](trait.ProgressIterator.html) for method
/// documentation.
#[cfg_attr(docsrs, doc(cfg(feature = "rayon")))]
pub trait ParallelProgressIterator
where
    Self: Sized + ParallelIterator,
{
    /// Wrap an iterator with a custom progress bar.
    fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self>;

    /// Wrap an iterator with an explicit element count.
    fn progress_count(self, len: u64) -> ProgressBarIter<Self> {
        self.progress_with(ProgressBar::new(len))
    }

    fn progress(self) -> ProgressBarIter<Self>
    where
        Self: IndexedParallelIterator,
    {
        let len = u64::try_from(self.len()).unwrap();
        self.progress_count(len)
    }

    /// Wrap an iterator with a progress bar and style it.
    fn progress_with_style(self, style: crate::ProgressStyle) -> ProgressBarIter<Self>
    where
        Self: IndexedParallelIterator,
    {
        let len = u64::try_from(self.len()).unwrap();
        let bar = ProgressBar::new(len).with_style(style);
        self.progress_with(bar)
    }
}

impl<S: Send, T: ParallelIterator<Item = S>> ParallelProgressIterator for T {
    fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self> {
        ProgressBarIter { it: self, progress }
    }
}

impl<S: Send, T: IndexedParallelIterator<Item = S>> IndexedParallelIterator for ProgressBarIter<T> {
    fn len(&self) -> usize {
        self.it.len()
    }

    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> <C as Consumer<Self::Item>>::Result {
        let consumer = ProgressConsumer::new(consumer, self.progress);
        self.it.drive(consumer)
    }

    fn with_producer<CB: ProducerCallback<Self::Item>>(
        self,
        callback: CB,
    ) -> <CB as ProducerCallback<Self::Item>>::Output {
        return self.it.with_producer(Callback {
            callback,
            progress: self.progress,
        });

        struct Callback<CB> {
            callback: CB,
            progress: ProgressBar,
        }

        impl<T, CB: ProducerCallback<T>> ProducerCallback<T> for Callback<CB> {
            type Output = CB::Output;

            fn callback<P>(self, base: P) -> CB::Output
            where
                P: Producer<Item = T>,
            {
                let producer = ProgressProducer {
                    base,
                    progress: self.progress,
                };
                self.callback.callback(producer)
            }
        }
    }
}

struct ProgressProducer<T> {
    base: T,
    progress: ProgressBar,
}

impl<T, P: Producer<Item = T>> Producer for ProgressProducer<P> {
    type Item = T;
    type IntoIter = ProgressBarIter<P::IntoIter>;

    fn into_iter(self) -> Self::IntoIter {
        ProgressBarIter {
            it: self.base.into_iter(),
            progress: self.progress,
        }
    }

    fn min_len(&self) -> usize {
        self.base.min_len()
    }

    fn max_len(&self) -> usize {
        self.base.max_len()
    }

    fn split_at(self, index: usize) -> (Self, Self) {
        let (left, right) = self.base.split_at(index);
        (
            ProgressProducer {
                base: left,
                progress: self.progress.clone(),
            },
            ProgressProducer {
                base: right,
                progress: self.progress,
            },
        )
    }
}

struct ProgressConsumer<C> {
    base: C,
    progress: ProgressBar,
}

impl<C> ProgressConsumer<C> {
    fn new(base: C, progress: ProgressBar) -> Self {
        ProgressConsumer { base, progress }
    }
}

impl<T, C: Consumer<T>> Consumer<T> for ProgressConsumer<C> {
    type Folder = ProgressFolder<C::Folder>;
    type Reducer = C::Reducer;
    type Result = C::Result;

    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
        let (left, right, reducer) = self.base.split_at(index);
        (
            ProgressConsumer::new(left, self.progress.clone()),
            ProgressConsumer::new(right, self.progress),
            reducer,
        )
    }

    fn into_folder(self) -> Self::Folder {
        ProgressFolder {
            base: self.base.into_folder(),
            progress: self.progress,
        }
    }

    fn full(&self) -> bool {
        self.base.full()
    }
}

impl<T, C: UnindexedConsumer<T>> UnindexedConsumer<T> for ProgressConsumer<C> {
    fn split_off_left(&self) -> Self {
        ProgressConsumer::new(self.base.split_off_left(), self.progress.clone())
    }

    fn to_reducer(&self) -> Self::Reducer {
        self.base.to_reducer()
    }
}

struct ProgressFolder<C> {
    base: C,
    progress: ProgressBar,
}

impl<T, C: Folder<T>> Folder<T> for ProgressFolder<C> {
    type Result = C::Result;

    fn consume(self, item: T) -> Self {
        self.progress.inc(1);
        ProgressFolder {
            base: self.base.consume(item),
            progress: self.progress,
        }
    }

    fn complete(self) -> C::Result {
        self.base.complete()
    }

    fn full(&self) -> bool {
        self.base.full()
    }
}

impl<S: Send, T: ParallelIterator<Item = S>> ParallelIterator for ProgressBarIter<T> {
    type Item = S;

    fn drive_unindexed<C: UnindexedConsumer<Self::Item>>(self, consumer: C) -> C::Result {
        let consumer1 = ProgressConsumer::new(consumer, self.progress.clone());
        self.it.drive_unindexed(consumer1)
    }
}

#[cfg(test)]
mod test {
    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

    use crate::{ParallelProgressIterator, ProgressBar, ProgressBarIter, ProgressStyle};

    #[test]
    fn it_can_wrap_a_parallel_iterator() {
        let v = vec![1, 2, 3];
        fn wrap<'a, T: ParallelIterator<Item = &'a i32>>(it: ProgressBarIter<T>) {
            assert_eq!(it.map(|x| x * 2).collect::<Vec<_>>(), vec![2, 4, 6]);
        }

        wrap(v.par_iter().progress_count(3));
        wrap({
            let pb = ProgressBar::new(v.len() as u64);
            v.par_iter().progress_with(pb)
        });

        wrap({
            let style = ProgressStyle::default_bar()
                .template("{wide_bar:.red} {percent}/100%")
                .unwrap();
            v.par_iter().progress_with_style(style)
        });
    }
}
