| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. |
| |
| #pragma once |
| |
| #if !defined(RXCPP_RX_OBSERVABLE_HPP) |
| #define RXCPP_RX_OBSERVABLE_HPP |
| |
| #include "rx-includes.hpp" |
| |
| #ifdef __GNUG__ |
| #define EXPLICIT_THIS this-> |
| #else |
| #define EXPLICIT_THIS |
| #endif |
| |
| namespace rxcpp { |
| |
| namespace detail { |
| |
| template<class Subscriber, class T> |
| struct has_on_subscribe_for |
| { |
| struct not_void {}; |
| template<class CS, class CT> |
| static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr)); |
| template<class CS, class CT> |
| static not_void check(...); |
| |
| typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result; |
| static const bool value = std::is_same<detail_result, void>::value; |
| }; |
| |
| } |
| |
| template<class T> |
| class dynamic_observable |
| : public rxs::source_base<T> |
| { |
| struct state_type |
| : public std::enable_shared_from_this<state_type> |
| { |
| typedef std::function<void(subscriber<T>)> onsubscribe_type; |
| |
| onsubscribe_type on_subscribe; |
| }; |
| std::shared_ptr<state_type> state; |
| |
| template<class U> |
| friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&); |
| |
| template<class SO> |
| void construct(SO&& source, rxs::tag_source&&) { |
| rxu::decay_t<SO> so = std::forward<SO>(source); |
| state->on_subscribe = [so](subscriber<T> o) mutable { |
| so.on_subscribe(std::move(o)); |
| }; |
| } |
| |
| struct tag_function {}; |
| template<class F> |
| void construct(F&& f, tag_function&&) { |
| state->on_subscribe = std::forward<F>(f); |
| } |
| |
| public: |
| |
| typedef tag_dynamic_observable dynamic_observable_tag; |
| |
| dynamic_observable() |
| { |
| } |
| |
| template<class SOF> |
| explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0) |
| : state(std::make_shared<state_type>()) |
| { |
| construct(std::forward<SOF>(sof), |
| typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type()); |
| } |
| |
| void on_subscribe(subscriber<T> o) const { |
| state->on_subscribe(std::move(o)); |
| } |
| |
| template<class Subscriber> |
| typename std::enable_if<is_subscriber<Subscriber>::value, void>::type |
| on_subscribe(Subscriber o) const { |
| state->on_subscribe(o.as_dynamic()); |
| } |
| }; |
| |
| template<class T> |
| inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) { |
| return lhs.state == rhs.state; |
| } |
| template<class T> |
| inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) { |
| return !(lhs == rhs); |
| } |
| |
| template<class T, class Source> |
| observable<T> make_observable_dynamic(Source&& s) { |
| return observable<T>(dynamic_observable<T>(std::forward<Source>(s))); |
| } |
| |
| namespace detail { |
| template<bool Selector, class Default, class SO> |
| struct resolve_observable; |
| |
| template<class Default, class SO> |
| struct resolve_observable<true, Default, SO> |
| { |
| typedef typename SO::type type; |
| typedef typename type::value_type value_type; |
| static const bool value = true; |
| typedef observable<value_type, type> observable_type; |
| template<class... AN> |
| static observable_type make(const Default&, AN&&... an) { |
| return observable_type(type(std::forward<AN>(an)...)); |
| } |
| }; |
| template<class Default, class SO> |
| struct resolve_observable<false, Default, SO> |
| { |
| static const bool value = false; |
| typedef Default observable_type; |
| template<class... AN> |
| static observable_type make(const observable_type& that, const AN&...) { |
| return that; |
| } |
| }; |
| template<class SO> |
| struct resolve_observable<true, void, SO> |
| { |
| typedef typename SO::type type; |
| typedef typename type::value_type value_type; |
| static const bool value = true; |
| typedef observable<value_type, type> observable_type; |
| template<class... AN> |
| static observable_type make(AN&&... an) { |
| return observable_type(type(std::forward<AN>(an)...)); |
| } |
| }; |
| template<class SO> |
| struct resolve_observable<false, void, SO> |
| { |
| static const bool value = false; |
| typedef void observable_type; |
| template<class... AN> |
| static observable_type make(const AN&...) { |
| } |
| }; |
| |
| } |
| |
| template<class Selector, class Default, template<class... TN> class SO, class... AN> |
| struct defer_observable |
| : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>> |
| { |
| }; |
| |
| /*! |
| \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value. |
| |
| \ingroup group-observable |
| |
| */ |
| template<class T, class Observable> |
| class blocking_observable |
| { |
| template<class Obsvbl, class... ArgN> |
| static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an) |
| -> void { |
| std::mutex lock; |
| std::condition_variable wake; |
| bool disposed = false; |
| |
| auto dest = make_subscriber<T>(std::forward<ArgN>(an)...); |
| |
| rxu::error_ptr error; |
| bool has_error = false; |
| |
| // keep any error to rethrow at the end. |
| // copy 'dest' by-value to avoid using it after it goes out of scope. |
| auto scbr = make_subscriber<T>( |
| dest, |
| [dest](T t){dest.on_next(t);}, |
| [dest,&error,&has_error,do_rethrow](rxu::error_ptr e){ |
| if (do_rethrow) { |
| has_error = true; |
| error = e; |
| } else { |
| dest.on_error(e); |
| } |
| }, |
| [dest](){dest.on_completed();} |
| ); |
| |
| auto cs = scbr.get_subscription(); |
| cs.add( |
| [&](){ |
| std::unique_lock<std::mutex> guard(lock); |
| wake.notify_one(); |
| disposed = true; |
| }); |
| |
| source.subscribe(std::move(scbr)); |
| |
| std::unique_lock<std::mutex> guard(lock); |
| wake.wait(guard, |
| [&](){ |
| return disposed; |
| }); |
| |
| if (has_error) {rxu::rethrow_exception(error);} |
| } |
| |
| public: |
| typedef rxu::decay_t<Observable> observable_type; |
| observable_type source; |
| ~blocking_observable() |
| { |
| } |
| blocking_observable(observable_type s) : source(std::move(s)) {} |
| |
| /// |
| /// `subscribe` will cause this observable to emit values to the provided subscriber. |
| /// |
| /// \return void |
| /// |
| /// \param an... - the arguments are passed to make_subscriber(). |
| /// |
| /// callers must provide enough arguments to make a subscriber. |
| /// overrides are supported. thus |
| /// `subscribe(thesubscriber, composite_subscription())` |
| /// will take `thesubscriber.get_observer()` and the provided |
| /// subscription and subscribe to the new subscriber. |
| /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer |
| /// if a subscription or subscriber is not provided then a new subscription will be created. |
| /// |
| template<class... ArgN> |
| auto subscribe(ArgN&&... an) const |
| -> void { |
| return blocking_subscribe(source, false, std::forward<ArgN>(an)...); |
| } |
| |
| /// |
| /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber. |
| /// |
| /// \note If the source observable calls on_error, the raised exception is rethrown by this method. |
| /// |
| /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called. |
| /// |
| /// \return void |
| /// |
| /// \param an... - the arguments are passed to make_subscriber(). |
| /// |
| /// callers must provide enough arguments to make a subscriber. |
| /// overrides are supported. thus |
| /// `subscribe(thesubscriber, composite_subscription())` |
| /// will take `thesubscriber.get_observer()` and the provided |
| /// subscription and subscribe to the new subscriber. |
| /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer |
| /// if a subscription or subscriber is not provided then a new subscription will be created. |
| /// |
| template<class... ArgN> |
| auto subscribe_with_rethrow(ArgN&&... an) const |
| -> void { |
| return blocking_subscribe(source, true, std::forward<ArgN>(an)...); |
| } |
| |
| /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The first item emitted by this blocking_observable. |
| |
| \note If the source observable calls on_error, the raised exception is rethrown by this method. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking first sample |
| \snippet output.txt blocking first sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking first empty sample |
| \snippet output.txt blocking first empty sample |
| */ |
| template<class... AN> |
| auto first(AN**...) -> delayed_type_t<T, AN...> const { |
| rxu::maybe<T> result; |
| composite_subscription cs; |
| subscribe_with_rethrow( |
| cs, |
| [&](T v){result.reset(v); cs.unsubscribe();}); |
| if (result.empty()) |
| rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value")); |
| return result.get(); |
| static_assert(sizeof...(AN) == 0, "first() was passed too many arguments."); |
| } |
| |
| /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The last item emitted by this blocking_observable. |
| |
| \note If the source observable calls on_error, the raised exception is rethrown by this method. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking last sample |
| \snippet output.txt blocking last sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking last empty sample |
| \snippet output.txt blocking last empty sample |
| */ |
| template<class... AN> |
| auto last(AN**...) -> delayed_type_t<T, AN...> const { |
| rxu::maybe<T> result; |
| subscribe_with_rethrow( |
| [&](T v){result.reset(v);}); |
| if (result.empty()) |
| rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value")); |
| return result.get(); |
| static_assert(sizeof...(AN) == 0, "last() was passed too many arguments."); |
| } |
| |
| /*! Return the total number of items emitted by this blocking_observable. |
| |
| \return The total number of items emitted by this blocking_observable. |
| |
| \sample |
| \snippet blocking_observable.cpp blocking count sample |
| \snippet output.txt blocking count sample |
| |
| When the source observable calls on_error: |
| \snippet blocking_observable.cpp blocking count error sample |
| \snippet output.txt blocking count error sample |
| */ |
| int count() const { |
| int result = 0; |
| source.count().as_blocking().subscribe_with_rethrow( |
| [&](int v){result = v;}); |
| return result; |
| } |
| |
| /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The sum of all items emitted by this blocking_observable. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking sum sample |
| \snippet output.txt blocking sum sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking sum empty sample |
| \snippet output.txt blocking sum empty sample |
| |
| When the source observable calls on_error: |
| \snippet blocking_observable.cpp blocking sum error sample |
| \snippet output.txt blocking sum error sample |
| */ |
| T sum() const { |
| return source.sum().as_blocking().last(); |
| } |
| |
| /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The average value of all items emitted by this blocking_observable. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking average sample |
| \snippet output.txt blocking average sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking average empty sample |
| \snippet output.txt blocking average empty sample |
| |
| When the source observable calls on_error: |
| \snippet blocking_observable.cpp blocking average error sample |
| \snippet output.txt blocking average error sample |
| */ |
| double average() const { |
| return source.average().as_blocking().last(); |
| } |
| |
| /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The max of all items emitted by this blocking_observable. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking max sample |
| \snippet output.txt blocking max sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking max empty sample |
| \snippet output.txt blocking max empty sample |
| |
| When the source observable calls on_error: |
| \snippet blocking_observable.cpp blocking max error sample |
| \snippet output.txt blocking max error sample |
| */ |
| T max() const { |
| return source.max().as_blocking().last(); |
| } |
| |
| /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. |
| |
| \return The min of all items emitted by this blocking_observable. |
| |
| \sample |
| When the source observable emits at least one item: |
| \snippet blocking_observable.cpp blocking min sample |
| \snippet output.txt blocking min sample |
| |
| When the source observable is empty: |
| \snippet blocking_observable.cpp blocking min empty sample |
| \snippet output.txt blocking min empty sample |
| |
| When the source observable calls on_error: |
| \snippet blocking_observable.cpp blocking min error sample |
| \snippet output.txt blocking min error sample |
| */ |
| T min() const { |
| return source.min().as_blocking().last(); |
| } |
| }; |
| |
| namespace detail { |
| |
| template<class SourceOperator, class Subscriber> |
| struct safe_subscriber |
| { |
| safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {} |
| |
| void subscribe() { |
| RXCPP_TRY { |
| so->on_subscribe(*o); |
| } RXCPP_CATCH(...) { |
| if (!o->is_subscribed()) { |
| rxu::rethrow_current_exception(); |
| } |
| o->on_error(rxu::make_error_ptr(rxu::current_exception())); |
| o->unsubscribe(); |
| } |
| } |
| |
| void operator()(const rxsc::schedulable&) { |
| subscribe(); |
| } |
| |
| SourceOperator* so; |
| Subscriber* o; |
| }; |
| |
| } |
| |
| template<> |
| class observable<void, void>; |
| |
| /*! |
| \defgroup group-observable Observables |
| |
| \brief These are the set of observable classes in rxcpp. |
| |
| \class rxcpp::observable |
| |
| \ingroup group-observable group-core |
| |
| \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source. |
| |
| \par Some code |
| This sample will observable::subscribe() to values from a observable<void, void>::range(). |
| |
| \sample |
| \snippet range.cpp range sample |
| \snippet output.txt range sample |
| |
| */ |
| template<class T, class SourceOperator> |
| class observable |
| : public observable_base<T> |
| { |
| static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>"); |
| |
| typedef observable<T, SourceOperator> this_type; |
| |
| public: |
| typedef rxu::decay_t<SourceOperator> source_operator_type; |
| mutable source_operator_type source_operator; |
| |
| private: |
| |
| template<class U, class SO> |
| friend class observable; |
| |
| template<class U, class SO> |
| friend bool operator==(const observable<U, SO>&, const observable<U, SO>&); |
| |
| template<class Subscriber> |
| auto detail_subscribe(Subscriber o) const |
| -> composite_subscription { |
| |
| typedef rxu::decay_t<Subscriber> subscriber_type; |
| |
| static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber"); |
| static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible"); |
| static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); |
| |
| trace_activity().subscribe_enter(*this, o); |
| |
| if (!o.is_subscribed()) { |
| trace_activity().subscribe_return(*this); |
| return o.get_subscription(); |
| } |
| |
| detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o); |
| |
| // make sure to let current_thread take ownership of the thread as early as possible. |
| if (rxsc::current_thread::is_schedule_required()) { |
| const auto& sc = rxsc::make_current_thread(); |
| sc.create_worker(o.get_subscription()).schedule(subscriber); |
| } else { |
| // current_thread already owns this thread. |
| subscriber.subscribe(); |
| } |
| |
| trace_activity().subscribe_return(*this); |
| return o.get_subscription(); |
| } |
| |
| public: |
| typedef T value_type; |
| |
| static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source"); |
| |
| ~observable() |
| { |
| } |
| |
| observable() |
| { |
| } |
| |
| explicit observable(const source_operator_type& o) |
| : source_operator(o) |
| { |
| } |
| explicit observable(source_operator_type&& o) |
| : source_operator(std::move(o)) |
| { |
| } |
| |
| /// implicit conversion between observables of the same value_type |
| template<class SO> |
| observable(const observable<T, SO>& o) |
| : source_operator(o.source_operator) |
| {} |
| /// implicit conversion between observables of the same value_type |
| template<class SO> |
| observable(observable<T, SO>&& o) |
| : source_operator(std::move(o.source_operator)) |
| {} |
| |
| #if 0 |
| template<class I> |
| void on_subscribe(observer<T, I> o) const { |
| source_operator.on_subscribe(o); |
| } |
| #endif |
| |
| /*! @copydoc rxcpp::operators::as_dynamic |
| */ |
| template<class... AN> |
| observable<T> as_dynamic(AN**...) const { |
| return *this; |
| static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rx-ref_count.hpp |
| */ |
| template<class... AN> |
| auto ref_count(AN... an) const // ref_count(ConnectableObservable&&) |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::as_blocking |
| */ |
| template<class... AN> |
| blocking_observable<T, this_type> as_blocking(AN**...) const { |
| return blocking_observable<T, this_type>(*this); |
| static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments."); |
| } |
| |
| /// \cond SHOW_SERVICE_MEMBERS |
| |
| /// |
| /// takes any function that will take this observable and produce a result value. |
| /// this is intended to allow externally defined operators, that use subscribe, |
| /// to be connected into the expression. |
| /// |
| template<class OperatorFactory> |
| auto op(OperatorFactory&& of) const |
| -> decltype(of(*(const this_type*)nullptr)) { |
| return of(*this); |
| static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); |
| } |
| |
| /*! @copydoc rx-lift.hpp |
| */ |
| template<class ResultType, class Operator> |
| auto lift(Operator&& op) const |
| -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> { |
| return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>( |
| rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); |
| static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); |
| } |
| |
| /// |
| /// takes any function that will take a subscriber for this observable and produce a subscriber. |
| /// this is intended to allow externally defined operators, that use make_subscriber, to be connected |
| /// into the expression. |
| /// |
| template<class ResultType, class Operator> |
| auto lift_if(Operator&& op) const |
| -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, |
| observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type { |
| return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>( |
| rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); |
| } |
| /// |
| /// takes any function that will take a subscriber for this observable and produce a subscriber. |
| /// this is intended to allow externally defined operators, that use make_subscriber, to be connected |
| /// into the expression. |
| /// |
| template<class ResultType, class Operator> |
| auto lift_if(Operator&&) const |
| -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, |
| decltype(rxs::from<ResultType>())>::type { |
| return rxs::from<ResultType>(); |
| } |
| /// \endcond |
| |
| /*! @copydoc rx-subscribe.hpp |
| */ |
| template<class... ArgN> |
| auto subscribe(ArgN&&... an) const |
| -> composite_subscription { |
| return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...)); |
| } |
| |
| /*! @copydoc rx-all.hpp |
| */ |
| template<class... AN> |
| auto all(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(all_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::is_empty |
| */ |
| template<class... AN> |
| auto is_empty(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-any.hpp |
| */ |
| template<class... AN> |
| auto any(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(any_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::exists |
| */ |
| template<class... AN> |
| auto exists(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(exists_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::contains |
| */ |
| template<class... AN> |
| auto contains(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(contains_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-filter.hpp |
| */ |
| template<class... AN> |
| auto filter(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(filter_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-switch_if_empty.hpp |
| */ |
| template<class... AN> |
| auto switch_if_empty(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::default_if_empty |
| */ |
| template<class... AN> |
| auto default_if_empty(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-sequence_equal.hpp |
| */ |
| template<class... AN> |
| auto sequence_equal(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-tap.hpp |
| */ |
| template<class... AN> |
| auto tap(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(tap_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-time_interval.hpp |
| */ |
| template<class... AN> |
| auto time_interval(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-timeout.hpp |
| */ |
| template<class... AN> |
| auto timeout(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-timestamp.hpp |
| */ |
| template<class... AN> |
| auto timestamp(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-finally.hpp |
| */ |
| template<class... AN> |
| auto finally(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(finally_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-on_error_resume_next.hpp |
| */ |
| template<class... AN> |
| auto on_error_resume_next(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-on_error_resume_next.hpp |
| */ |
| template<class... AN> |
| auto switch_on_error(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-map.hpp |
| */ |
| template<class... AN> |
| auto map(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-map.hpp |
| */ |
| template<class... AN> |
| auto transform(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-debounce.hpp |
| */ |
| template<class... AN> |
| auto debounce(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-delay.hpp |
| */ |
| template<class... AN> |
| auto delay(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(delay_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-distinct.hpp |
| */ |
| template<class... AN> |
| auto distinct(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-distinct_until_changed.hpp |
| */ |
| template<class... AN> |
| auto distinct_until_changed(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-element_at.hpp |
| */ |
| template<class... AN> |
| auto element_at(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-window.hpp |
| */ |
| template<class... AN> |
| auto window(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(window_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-window_time.hpp |
| */ |
| template<class... AN> |
| auto window_with_time(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-window_time_count.hpp |
| */ |
| template<class... AN> |
| auto window_with_time_or_count(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-window_toggle.hpp |
| */ |
| template<class... AN> |
| auto window_toggle(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-buffer_count.hpp |
| */ |
| template<class... AN> |
| auto buffer(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-buffer_time.hpp |
| */ |
| template<class... AN> |
| auto buffer_with_time(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-buffer_time_count.hpp |
| */ |
| template<class... AN> |
| auto buffer_with_time_or_count(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-switch_on_next.hpp |
| */ |
| template<class... AN> |
| auto switch_on_next(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-merge.hpp |
| */ |
| template<class... AN> |
| auto merge(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(merge_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-merge_delay_error.hpp |
| */ |
| template<class... AN> |
| auto merge_delay_error(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-amb.hpp |
| */ |
| template<class... AN> |
| auto amb(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(amb_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-flat_map.hpp |
| */ |
| template<class... AN> |
| auto flat_map(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-flat_map.hpp |
| */ |
| template<class... AN> |
| auto merge_transform(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-concat.hpp |
| */ |
| template<class... AN> |
| auto concat(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(concat_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-concat_map.hpp |
| */ |
| template<class... AN> |
| auto concat_map(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-concat_map.hpp |
| */ |
| template<class... AN> |
| auto concat_transform(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-with_latest_from.hpp |
| */ |
| template<class... AN> |
| auto with_latest_from(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| |
| /*! @copydoc rx-combine_latest.hpp |
| */ |
| template<class... AN> |
| auto combine_latest(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-zip.hpp |
| */ |
| template<class... AN> |
| auto zip(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(zip_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-group_by.hpp |
| */ |
| template<class... AN> |
| inline auto group_by(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-ignore_elements.hpp |
| */ |
| template<class... AN> |
| auto ignore_elements(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-muticast.hpp |
| */ |
| template<class... AN> |
| auto multicast(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-publish.hpp |
| */ |
| template<class... AN> |
| auto publish(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(publish_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::publish_synchronized |
| */ |
| template<class... AN> |
| auto publish_synchronized(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-replay.hpp |
| */ |
| template<class... AN> |
| auto replay(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(replay_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-subscribe_on.hpp |
| */ |
| template<class... AN> |
| auto subscribe_on(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-observe_on.hpp |
| */ |
| template<class... AN> |
| auto observe_on(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-reduce.hpp |
| */ |
| template<class... AN> |
| auto reduce(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-reduce.hpp |
| */ |
| template<class... AN> |
| auto accumulate(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rxcpp::operators::first |
| */ |
| template<class... AN> |
| auto first(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<first_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "first() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::last |
| */ |
| template<class... AN> |
| auto last(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<last_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "last() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::count |
| */ |
| template<class... AN> |
| auto count(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>())) |
| /// \endcond |
| { |
| return observable_member(delayed_type<reduce_tag, AN...>::value(), *this, 0, rxu::count(), identity_for<int>()); |
| static_assert(sizeof...(AN) == 0, "count() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::sum |
| */ |
| template<class... AN> |
| auto sum(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<sum_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::average |
| */ |
| template<class... AN> |
| auto average(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<average_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "average() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::max |
| */ |
| template<class... AN> |
| auto max(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<max_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "max() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rxcpp::operators::min |
| */ |
| template<class... AN> |
| auto min(AN**...) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr)) |
| /// \endcond |
| { |
| return observable_member(delayed_type<min_tag, AN...>::value(), *this); |
| static_assert(sizeof...(AN) == 0, "min() was passed too many arguments."); |
| } |
| |
| /*! @copydoc rx-scan.hpp |
| */ |
| template<class... AN> |
| auto scan(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(scan_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-sample_time.hpp |
| */ |
| template<class... AN> |
| auto sample_with_time(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-skip.hpp |
| */ |
| template<class... AN> |
| auto skip(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(skip_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-skip.hpp |
| */ |
| template<class... AN> |
| auto skip_while(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-skip_last.hpp |
| */ |
| template<class... AN> |
| auto skip_last(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-skip_until.hpp |
| */ |
| template<class... AN> |
| auto skip_until(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-take.hpp |
| */ |
| template<class... AN> |
| auto take(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(take_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-take_last.hpp |
| */ |
| template<class... AN> |
| auto take_last(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-take_until.hpp |
| */ |
| template<class... AN> |
| auto take_until(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-take_while.hpp |
| */ |
| template<class... AN> |
| auto take_while(AN&&... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-repeat.hpp |
| */ |
| template<class... AN> |
| auto repeat(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-retry.hpp |
| */ |
| template<class... AN> |
| auto retry(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-start_with.hpp |
| */ |
| template<class... AN> |
| auto start_with(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...); |
| } |
| |
| /*! @copydoc rx-pairwise.hpp |
| */ |
| template<class... AN> |
| auto pairwise(AN... an) const |
| /// \cond SHOW_SERVICE_MEMBERS |
| -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) |
| /// \endcond |
| { |
| return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...); |
| } |
| }; |
| |
| template<class T, class SourceOperator> |
| inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) { |
| return lhs.source_operator == rhs.source_operator; |
| } |
| template<class T, class SourceOperator> |
| inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) { |
| return !(lhs == rhs); |
| } |
| |
| /*! |
| \defgroup group-core Basics |
| |
| \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled. |
| |
| \class rxcpp::observable<void, void> |
| |
| \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable. |
| |
| \ingroup group-core |
| |
| \par Create a new type of observable |
| |
| \sample |
| \snippet create.cpp Create sample |
| \snippet output.txt Create sample |
| |
| \par Create an observable that emits a range of values |
| |
| \sample |
| \snippet range.cpp range sample |
| \snippet output.txt range sample |
| |
| \par Create an observable that emits nothing / generates an error / immediately completes |
| |
| \sample |
| \snippet never.cpp never sample |
| \snippet output.txt never sample |
| \snippet error.cpp error sample |
| \snippet output.txt error sample |
| \snippet empty.cpp empty sample |
| \snippet output.txt empty sample |
| |
| \par Create an observable that generates new observable for each subscriber |
| |
| \sample |
| \snippet defer.cpp defer sample |
| \snippet output.txt defer sample |
| |
| \par Create an observable that emits items every specified interval of time |
| |
| \sample |
| \snippet interval.cpp interval sample |
| \snippet output.txt interval sample |
| |
| \par Create an observable that emits items in the specified interval of time |
| |
| \sample |
| \snippet timer.cpp duration timer sample |
| \snippet output.txt duration timer sample |
| |
| \par Create an observable that emits all items from a collection |
| |
| \sample |
| \snippet iterate.cpp iterate sample |
| \snippet output.txt iterate sample |
| |
| \par Create an observable that emits a set of specified items |
| |
| \sample |
| \snippet from.cpp from sample |
| \snippet output.txt from sample |
| |
| \par Create an observable that emits a single item |
| |
| \sample |
| \snippet just.cpp just sample |
| \snippet output.txt just sample |
| |
| \par Create an observable that emits a set of items and then subscribes to another observable |
| |
| \sample |
| \snippet start_with.cpp full start_with sample |
| \snippet output.txt full start_with sample |
| |
| \par Create an observable that generates a new observable based on a generated resource for each subscriber |
| |
| \sample |
| \snippet scope.cpp scope sample |
| \snippet output.txt scope sample |
| |
| */ |
| template<> |
| class observable<void, void> |
| { |
| ~observable(); |
| public: |
| /*! @copydoc rx-create.hpp |
| */ |
| template<class T, class OnSubscribe> |
| static auto create(OnSubscribe os) |
| -> decltype(rxs::create<T>(std::move(os))) { |
| return rxs::create<T>(std::move(os)); |
| } |
| |
| /*! @copydoc rx-range.hpp |
| */ |
| template<class T> |
| static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1) |
| -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) { |
| return rxs::range<T>(first, last, step, identity_current_thread()); |
| } |
| /*! @copydoc rx-range.hpp |
| */ |
| template<class T, class Coordination> |
| static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) |
| -> decltype(rxs::range<T>(first, last, step, std::move(cn))) { |
| return rxs::range<T>(first, last, step, std::move(cn)); |
| } |
| /*! @copydoc rx-range.hpp |
| */ |
| template<class T, class Coordination> |
| static auto range(T first, T last, Coordination cn) |
| -> decltype(rxs::range<T>(first, last, std::move(cn))) { |
| return rxs::range<T>(first, last, std::move(cn)); |
| } |
| /*! @copydoc rx-range.hpp |
| */ |
| template<class T, class Coordination> |
| static auto range(T first, Coordination cn) |
| -> decltype(rxs::range<T>(first, std::move(cn))) { |
| return rxs::range<T>(first, std::move(cn)); |
| } |
| |
| /*! @copydoc rx-never.hpp |
| */ |
| template<class T> |
| static auto never() |
| -> decltype(rxs::never<T>()) { |
| return rxs::never<T>(); |
| } |
| |
| /*! @copydoc rx-defer.hpp |
| */ |
| template<class ObservableFactory> |
| static auto defer(ObservableFactory of) |
| -> decltype(rxs::defer(std::move(of))) { |
| return rxs::defer(std::move(of)); |
| } |
| |
| /*! @copydoc rx-interval.hpp |
| */ |
| template<class... AN> |
| static auto interval(rxsc::scheduler::clock_type::duration period, AN**...) |
| -> decltype(rxs::interval(period)) { |
| return rxs::interval(period); |
| static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments."); |
| } |
| /*! @copydoc rx-interval.hpp |
| */ |
| template<class Coordination> |
| static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) |
| -> decltype(rxs::interval(period, std::move(cn))) { |
| return rxs::interval(period, std::move(cn)); |
| } |
| /*! @copydoc rx-interval.hpp |
| */ |
| template<class... AN> |
| static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...) |
| -> decltype(rxs::interval(initial, period)) { |
| return rxs::interval(initial, period); |
| static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments."); |
| } |
| /*! @copydoc rx-interval.hpp |
| */ |
| template<class Coordination> |
| static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) |
| -> decltype(rxs::interval(initial, period, std::move(cn))) { |
| return rxs::interval(initial, period, std::move(cn)); |
| } |
| |
| /*! @copydoc rx-timer.hpp |
| */ |
| template<class... AN> |
| static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...) |
| -> decltype(rxs::timer(at)) { |
| return rxs::timer(at); |
| static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments."); |
| } |
| /*! @copydoc rx-timer.hpp |
| */ |
| template<class... AN> |
| static auto timer(rxsc::scheduler::clock_type::duration after, AN**...) |
| -> decltype(rxs::timer(after)) { |
| return rxs::timer(after); |
| static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments."); |
| } |
| /*! @copydoc rx-timer.hpp |
| */ |
| template<class Coordination> |
| static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) |
| -> decltype(rxs::timer(when, std::move(cn))) { |
| return rxs::timer(when, std::move(cn)); |
| } |
| /*! @copydoc rx-timer.hpp |
| */ |
| template<class Coordination> |
| static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) |
| -> decltype(rxs::timer(when, std::move(cn))) { |
| return rxs::timer(when, std::move(cn)); |
| } |
| |
| /*! @copydoc rx-iterate.hpp |
| */ |
| template<class Collection> |
| static auto iterate(Collection c) |
| -> decltype(rxs::iterate(std::move(c), identity_current_thread())) { |
| return rxs::iterate(std::move(c), identity_current_thread()); |
| } |
| /*! @copydoc rx-iterate.hpp |
| */ |
| template<class Collection, class Coordination> |
| static auto iterate(Collection c, Coordination cn) |
| -> decltype(rxs::iterate(std::move(c), std::move(cn))) { |
| return rxs::iterate(std::move(c), std::move(cn)); |
| } |
| |
| /*! @copydoc rxcpp::sources::from() |
| */ |
| template<class T> |
| static auto from() |
| -> decltype( rxs::from<T>()) { |
| return rxs::from<T>(); |
| } |
| /*! @copydoc rxcpp::sources::from(Coordination cn) |
| */ |
| template<class T, class Coordination> |
| static auto from(Coordination cn) |
| -> typename std::enable_if<is_coordination<Coordination>::value, |
| decltype( rxs::from<T>(std::move(cn)))>::type { |
| return rxs::from<T>(std::move(cn)); |
| } |
| /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn) |
| */ |
| template<class Value0, class... ValueN> |
| static auto from(Value0 v0, ValueN... vn) |
| -> typename std::enable_if<!is_coordination<Value0>::value, |
| decltype( rxs::from(v0, vn...))>::type { |
| return rxs::from(v0, vn...); |
| } |
| /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn) |
| */ |
| template<class Coordination, class Value0, class... ValueN> |
| static auto from(Coordination cn, Value0 v0, ValueN... vn) |
| -> typename std::enable_if<is_coordination<Coordination>::value, |
| decltype( rxs::from(std::move(cn), v0, vn...))>::type { |
| return rxs::from(std::move(cn), v0, vn...); |
| } |
| |
| /*! @copydoc rxcpp::sources::just(Value0 v0) |
| */ |
| template<class T> |
| static auto just(T v) |
| -> decltype(rxs::just(std::move(v))) { |
| return rxs::just(std::move(v)); |
| } |
| /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn) |
| */ |
| template<class T, class Coordination> |
| static auto just(T v, Coordination cn) |
| -> decltype(rxs::just(std::move(v), std::move(cn))) { |
| return rxs::just(std::move(v), std::move(cn)); |
| } |
| |
| /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn) |
| */ |
| template<class Observable, class Value0, class... ValueN> |
| static auto start_with(Observable o, Value0 v0, ValueN... vn) |
| -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) { |
| return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...); |
| } |
| |
| /*! @copydoc rx-empty.hpp |
| */ |
| template<class T> |
| static auto empty() |
| -> decltype(from<T>()) { |
| return from<T>(); |
| } |
| /*! @copydoc rx-empty.hpp |
| */ |
| template<class T, class Coordination> |
| static auto empty(Coordination cn) |
| -> decltype(from<T>(std::move(cn))) { |
| return from<T>(std::move(cn)); |
| } |
| |
| /*! @copydoc rx-error.hpp |
| */ |
| template<class T, class Exception> |
| static auto error(Exception&& e) |
| -> decltype(rxs::error<T>(std::forward<Exception>(e))) { |
| return rxs::error<T>(std::forward<Exception>(e)); |
| } |
| /*! @copydoc rx-error.hpp |
| */ |
| template<class T, class Exception, class Coordination> |
| static auto error(Exception&& e, Coordination cn) |
| -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) { |
| return rxs::error<T>(std::forward<Exception>(e), std::move(cn)); |
| } |
| |
| /*! @copydoc rx-scope.hpp |
| */ |
| template<class ResourceFactory, class ObservableFactory> |
| static auto scope(ResourceFactory rf, ObservableFactory of) |
| -> decltype(rxs::scope(std::move(rf), std::move(of))) { |
| return rxs::scope(std::move(rf), std::move(of)); |
| } |
| }; |
| |
| } |
| |
| // |
| // support range() >> filter() >> subscribe() syntax |
| // '>>' is spelled 'stream' |
| // |
| template<class T, class SourceOperator, class OperatorFactory> |
| auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) |
| -> decltype(source.op(std::forward<OperatorFactory>(of))) { |
| return source.op(std::forward<OperatorFactory>(of)); |
| } |
| |
| // |
| // support range() | filter() | subscribe() syntax |
| // '|' is spelled 'pipe' |
| // |
| template<class T, class SourceOperator, class OperatorFactory> |
| auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) |
| -> decltype(source.op(std::forward<OperatorFactory>(of))) { |
| return source.op(std::forward<OperatorFactory>(of)); |
| } |
| |
| #endif |