// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_OBSERVER_HPP) #define RXCPP_RX_OBSERVER_HPP #include "rx-includes.hpp" namespace rxcpp { template struct observer_base { typedef T value_type; typedef tag_observer observer_tag; }; namespace detail { template struct OnNextEmpty { void operator()(const T&) const {} }; struct OnErrorEmpty { void operator()(rxu::error_ptr) const { // error implicitly ignored, abort std::terminate(); } }; struct OnErrorIgnore { void operator()(rxu::error_ptr) const { } }; struct OnCompletedEmpty { void operator()() const {} }; template struct OnNextForward { using state_t = rxu::decay_t; using onnext_t = rxu::decay_t; OnNextForward() : onnext() {} explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {} onnext_t onnext; void operator()(state_t& s, T& t) const { onnext(s, t); } void operator()(state_t& s, T&& t) const { onnext(s, t); } }; template struct OnNextForward { using state_t = rxu::decay_t; OnNextForward() {} void operator()(state_t& s, T& t) const { s.on_next(t); } void operator()(state_t& s, T&& t) const { s.on_next(t); } }; template struct OnErrorForward { using state_t = rxu::decay_t; using onerror_t = rxu::decay_t; OnErrorForward() : onerror() {} explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {} onerror_t onerror; void operator()(state_t& s, rxu::error_ptr ep) const { onerror(s, ep); } }; template struct OnErrorForward { using state_t = rxu::decay_t; OnErrorForward() {} void operator()(state_t& s, rxu::error_ptr ep) const { s.on_error(ep); } }; template struct OnCompletedForward { using state_t = rxu::decay_t; using oncompleted_t = rxu::decay_t; OnCompletedForward() : oncompleted() {} explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {} oncompleted_t oncompleted; void operator()(state_t& s) const { oncompleted(s); } }; template struct OnCompletedForward { OnCompletedForward() {} void operator()(State& s) const { s.on_completed(); } }; template struct is_on_next_of { struct not_void {}; template static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr)); template static not_void check(...); typedef decltype(check>(0)) detail_result; static const bool value = std::is_same::value; }; template struct is_on_error { struct not_void {}; template static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr)); template static not_void check(...); static const bool value = std::is_same>(0)), void>::value; }; template struct is_on_error_for { struct not_void {}; template static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr)); template static not_void check(...); static const bool value = std::is_same>(0)), void>::value; }; template struct is_on_completed { struct not_void {}; template static auto check(int) -> decltype((*(CF*)nullptr)()); template static not_void check(...); static const bool value = std::is_same>(0)), void>::value; }; } /*! \brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function. \tparam T - the type of value in the stream \tparam State - the type of the stored state \tparam OnNext - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called. \tparam OnError - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called. \tparam OnCompleted - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called. \ingroup group-core */ template class observer : public observer_base { public: using this_type = observer; using state_t = rxu::decay_t; using on_next_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnNextForward>::type; using on_error_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnErrorForward>::type; using on_completed_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnCompletedForward>::type; private: mutable state_t state; on_next_t onnext; on_error_t onerror; on_completed_t oncompleted; public: explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) : state(std::move(s)) , onnext(std::move(n)) , onerror(std::move(e)) , oncompleted(std::move(c)) { } explicit observer(state_t s, on_next_t n, on_completed_t c) : state(std::move(s)) , onnext(std::move(n)) , onerror(on_error_t()) , oncompleted(std::move(c)) { } observer(const this_type& o) : state(o.state) , onnext(o.onnext) , onerror(o.onerror) , oncompleted(o.oncompleted) { } observer(this_type&& o) : state(std::move(o.state)) , onnext(std::move(o.onnext)) , onerror(std::move(o.onerror)) , oncompleted(std::move(o.oncompleted)) { } this_type& operator=(this_type o) { state = std::move(o.state); onnext = std::move(o.onnext); onerror = std::move(o.onerror); oncompleted = std::move(o.oncompleted); return *this; } void on_next(T& t) const { onnext(state, t); } void on_next(T&& t) const { onnext(state, std::move(t)); } void on_error(rxu::error_ptr e) const { onerror(state, e); } void on_completed() const { oncompleted(state); } observer as_dynamic() const { return observer(*this); } }; /*! \brief consumes values from an observable using default empty method implementations with optional overrides of each function. \tparam T - the type of value in the stream \tparam OnNext - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty is used. \tparam OnError - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used. \tparam OnCompleted - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used. \ingroup group-core */ template class observer : public observer_base { public: using this_type = observer; using on_next_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnNextEmpty>::type; using on_error_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnErrorEmpty>::type; using on_completed_t = typename std::conditional< !std::is_same::value, rxu::decay_t, detail::OnCompletedEmpty>::type; private: on_next_t onnext; on_error_t onerror; on_completed_t oncompleted; public: static_assert(detail::is_on_next_of::value, "Function supplied for on_next must be a function with the signature void(T);"); static_assert(detail::is_on_error::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);"); static_assert(detail::is_on_completed::value, "Function supplied for on_completed must be a function with the signature void();"); observer() : onnext(on_next_t()) , onerror(on_error_t()) , oncompleted(on_completed_t()) { } explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) : onnext(std::move(n)) , onerror(std::move(e)) , oncompleted(std::move(c)) { } observer(const this_type& o) : onnext(o.onnext) , onerror(o.onerror) , oncompleted(o.oncompleted) { } observer(this_type&& o) : onnext(std::move(o.onnext)) , onerror(std::move(o.onerror)) , oncompleted(std::move(o.oncompleted)) { } this_type& operator=(this_type o) { onnext = std::move(o.onnext); onerror = std::move(o.onerror); oncompleted = std::move(o.oncompleted); return *this; } void on_next(T& t) const { onnext(t); } void on_next(T&& t) const { onnext(std::move(t)); } void on_error(rxu::error_ptr e) const { onerror(e); } void on_completed() const { oncompleted(); } observer as_dynamic() const { return observer(*this); } }; namespace detail { template struct virtual_observer : public std::enable_shared_from_this> { virtual ~virtual_observer() {} virtual void on_next(T&) const {}; virtual void on_next(T&&) const {}; virtual void on_error(rxu::error_ptr) const {}; virtual void on_completed() const {}; }; template struct specific_observer : public virtual_observer { explicit specific_observer(Observer o) : destination(std::move(o)) { } Observer destination; virtual void on_next(T& t) const { destination.on_next(t); } virtual void on_next(T&& t) const { destination.on_next(std::move(t)); } virtual void on_error(rxu::error_ptr e) const { destination.on_error(e); } virtual void on_completed() const { destination.on_completed(); } }; } /*! \brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods) \tparam T - the type of value in the stream \ingroup group-core */ template class observer : public observer_base { public: typedef tag_dynamic_observer dynamic_observer_tag; private: using this_type = observer; using base_type = observer_base; using virtual_observer = detail::virtual_observer; std::shared_ptr destination; template static auto make_destination(Observer o) -> std::shared_ptr { return std::make_shared>(std::move(o)); } public: observer() { } observer(const this_type& o) : destination(o.destination) { } observer(this_type&& o) : destination(std::move(o.destination)) { } template explicit observer(Observer o) : destination(make_destination(std::move(o))) { } this_type& operator=(this_type o) { destination = std::move(o.destination); return *this; } // perfect forwarding delays the copy of the value. template void on_next(V&& v) const { if (destination) { destination->on_next(std::forward(v)); } } void on_error(rxu::error_ptr e) const { if (destination) { destination->on_error(e); } } void on_completed() const { if (destination) { destination->on_completed(); } } observer as_dynamic() const { return *this; } }; template auto make_observer() -> observer, DefaultOnError> { return observer, DefaultOnError>(); } template auto make_observer(observer o) -> observer { return observer(std::move(o)); } template auto make_observer(Observer ob) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value && is_observer::value, Observer>::type { return std::move(ob); } template auto make_observer(Observer ob) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value && !is_observer::value, observer>::type { return observer(std::move(ob)); } template auto make_observer(OnNext on) -> typename std::enable_if< detail::is_on_next_of::value, observer>::type { return observer( std::move(on)); } template auto make_observer(OnError oe) -> typename std::enable_if< !detail::is_on_next_of::value && detail::is_on_error::value, observer, OnError>>::type { return observer, OnError>( detail::OnNextEmpty(), std::move(oe)); } template auto make_observer(OnNext on, OnError oe) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_error::value, observer>::type { return observer( std::move(on), std::move(oe)); } template auto make_observer(OnNext on, OnCompleted oc) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_completed::value, observer>::type { return observer( std::move(on), DefaultOnError(), std::move(oc)); } template auto make_observer(OnNext on, OnError oe, OnCompleted oc) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_error::value && detail::is_on_completed::value, observer>::type { return observer( std::move(on), std::move(oe), std::move(oc)); } template auto make_observer(State os, OnNext on) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value, observer>::type { return observer( std::move(os), std::move(on)); } template auto make_observer(State os, OnError oe) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value && detail::is_on_error_for::value, observer, OnError>>::type { return observer, OnError>( std::move(os), detail::OnNextEmpty(), std::move(oe)); } template auto make_observer(State os, OnNext on, OnError oe) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value && detail::is_on_error_for::value, observer>::type { return observer( std::move(os), std::move(on), std::move(oe)); } template auto make_observer(State os, OnNext on, OnCompleted oc) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value, observer>::type { return observer( std::move(os), std::move(on), std::move(oc)); } template auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc) -> typename std::enable_if< !detail::is_on_next_of::value && !detail::is_on_error::value && detail::is_on_error_for::value, observer>::type { return observer( std::move(os), std::move(on), std::move(oe), std::move(oc)); } template auto make_observer_dynamic(Observer o) -> typename std::enable_if< !detail::is_on_next_of::value, observer>::type { return observer(std::move(o)); } template auto make_observer_dynamic(OnNext&& on) -> typename std::enable_if< detail::is_on_next_of::value, observer>::type { return observer( make_observer(std::forward(on))); } template auto make_observer_dynamic(OnNext&& on, OnError&& oe) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_error::value, observer>::type { return observer( make_observer(std::forward(on), std::forward(oe))); } template auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_completed::value, observer>::type { return observer( make_observer(std::forward(on), std::forward(oc))); } template auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc) -> typename std::enable_if< detail::is_on_next_of::value && detail::is_on_error::value && detail::is_on_completed::value, observer>::type { return observer( make_observer(std::forward(on), std::forward(oe), std::forward(oc))); } namespace detail { template struct maybe_from_result { typedef decltype((*(F*)nullptr)()) decl_result_type; typedef rxu::decay_t result_type; typedef rxu::maybe type; }; } template auto on_exception(const F& f, const OnError& c) -> typename std::enable_if::value, typename detail::maybe_from_result::type>::type { typename detail::maybe_from_result::type r; RXCPP_TRY { r.reset(f()); } RXCPP_CATCH(...) { c(rxu::current_exception()); } return r; } template auto on_exception(const F& f, const Subscriber& s) -> typename std::enable_if::value, typename detail::maybe_from_result::type>::type { typename detail::maybe_from_result::type r; RXCPP_TRY { r.reset(f()); } RXCPP_CATCH(...) { s.on_error(rxu::current_exception()); } return r; } } #endif