summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp
blob: 584a6bf52fb0d3c89f61da2a6a10f93fb4abe10b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-pairwise.hpp

    \brief Take values pairwise from this observable.

    \return Observable that emits tuples of two the most recent items emitted by the source observable.

    \sample
    \snippet pairwise.cpp pairwise sample
    \snippet output.txt pairwise sample

    If the source observable emits less than two items, no pairs are emitted  by the source observable:
    \snippet pairwise.cpp pairwise short sample
    \snippet output.txt pairwise short sample
*/

#if !defined(RXCPP_OPERATORS_RX_PAIRWISE_HPP)
#define RXCPP_OPERATORS_RX_PAIRWISE_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct pairwise_invalid_arguments {};

template<class... AN>
struct pairwise_invalid : public rxo::operator_base<pairwise_invalid_arguments<AN...>> {
    using type = observable<pairwise_invalid_arguments<AN...>, pairwise_invalid<AN...>>;
};
template<class... AN>
using pairwise_invalid_t = typename pairwise_invalid<AN...>::type;

template<class T>
struct pairwise
{
    typedef rxu::decay_t<T> source_value_type;
    typedef std::tuple<source_value_type, source_value_type> value_type;

    template<class Subscriber>
    struct pairwise_observer
    {
        typedef pairwise_observer<Subscriber> this_type;
        typedef std::tuple<source_value_type, source_value_type> value_type;
        typedef rxu::decay_t<Subscriber> dest_type;
        typedef observer<T, this_type> observer_type;
        dest_type dest;
        mutable rxu::detail::maybe<source_value_type> remembered;

        pairwise_observer(dest_type d)
            : dest(std::move(d))
        {
        }
        void on_next(source_value_type v) const {
            if (remembered.empty()) {
                remembered.reset(v);
                return;
            }

            dest.on_next(std::make_tuple(remembered.get(), v));
            remembered.reset(v);
        }
        void on_error(std::exception_ptr e) const {
            dest.on_error(e);
        }
        void on_completed() const {
            dest.on_completed();
        }

        static subscriber<T, observer_type> make(dest_type d) {
            auto cs = d.get_subscription();
            return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d))));
        }
    };

    template<class Subscriber>
    auto operator()(Subscriber dest) const
        -> decltype(pairwise_observer<Subscriber>::make(std::move(dest))) {
        return      pairwise_observer<Subscriber>::make(std::move(dest));
    }
};

}

/*! @copydoc rx-pairwise.hpp
*/
template<class... AN>
auto pairwise(AN&&... an)
    ->     operator_factory<pairwise_tag, AN...> {
    return operator_factory<pairwise_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<pairwise_tag>
{
    template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Pairwise = rxo::detail::pairwise<SourceValue>,
        class Value = rxu::value_type_t<Pairwise>>
    static auto member(Observable&& o)
    -> decltype(o.template lift<Value>(Pairwise())) {
        return  o.template lift<Value>(Pairwise());
    }

    template<class... AN>
    static operators::detail::pairwise_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "pairwise takes no arguments");
    }
};

}

#endif