summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-scan.hpp
blob: 7f2a314cb05e8e483267016a9377e676fdb1bf93 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_OPERATORS_RX_SCAN_HPP)
#define RXCPP_OPERATORS_RX_SCAN_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class T, class Observable, class Accumulator, class Seed>
struct scan : public operator_base<rxu::decay_t<Seed>>
{
    typedef rxu::decay_t<Observable> source_type;
    typedef rxu::decay_t<Accumulator> accumulator_type;
    typedef rxu::decay_t<Seed> seed_type;

    struct scan_initial_type
    {
        scan_initial_type(source_type o, accumulator_type a, seed_type s)
            : source(std::move(o))
            , accumulator(std::move(a))
            , seed(s)
        {
        }
        source_type source;
        accumulator_type accumulator;
        seed_type seed;
    };
    scan_initial_type initial;

    template<class CT, class CS, class CP>
    static auto check(int) -> decltype((*(CP*)nullptr)(*(CS*)nullptr, *(CT*)nullptr));
    template<class CT, class CS, class CP>
    static void check(...);

    scan(source_type o, accumulator_type a, seed_type s)
        : initial(std::move(o), a, s)
    {
        static_assert(std::is_convertible<decltype(check<T, seed_type, accumulator_type>(0)), seed_type>::value, "scan Accumulator must be a function with the signature Seed(Seed, T)");
    }
    template<class Subscriber>
    void on_subscribe(Subscriber o) const {
        struct scan_state_type
            : public scan_initial_type
            , public std::enable_shared_from_this<scan_state_type>
        {
            scan_state_type(scan_initial_type i, Subscriber scrbr)
                : scan_initial_type(i)
                , result(scan_initial_type::seed)
                , out(std::move(scrbr))
            {
            }
            seed_type result;
            Subscriber out;
        };
        auto state = std::make_shared<scan_state_type>(initial, std::move(o));
        state->source.subscribe(
            state->out,
        // on_next
            [state](T t) {
                auto result = on_exception(
                    [&](){return state->accumulator(state->result, t);},
                    state->out);
                if (result.empty()) {
                    return;
                }
                state->result = result.get();
                state->out.on_next(state->result);
            },
        // on_error
            [state](std::exception_ptr e) {
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                state->out.on_completed();
            }
        );
    }
};

template<class Accumulator, class Seed>
class scan_factory
{
    typedef rxu::decay_t<Accumulator> accumulator_type;
    typedef rxu::decay_t<Seed> seed_type;

    accumulator_type accumulator;
    seed_type seed;
public:
    scan_factory(accumulator_type a, Seed s)
        : accumulator(std::move(a))
        , seed(s)
    {
    }
    template<class Observable>
    auto operator()(Observable&& source)
        ->      observable<rxu::decay_t<Seed>, scan<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Accumulator, Seed>> {
        return  observable<rxu::decay_t<Seed>, scan<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Accumulator, Seed>>(
                                               scan<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Accumulator, Seed>(std::forward<Observable>(source), accumulator, seed));
    }
};

}

template<class Seed, class Accumulator>
auto scan(Seed s, Accumulator&& a)
    ->      detail::scan_factory<Accumulator, Seed> {
    return  detail::scan_factory<Accumulator, Seed>(std::forward<Accumulator>(a), s);
}

}

}

#endif