summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-test.hpp
blob: 946ec03804b70ee732c2f59f76536b9a5355c47b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#pragma once

#if !defined(RXCPP_RX_TEST_HPP)
#define RXCPP_RX_TEST_HPP

#include "rx-includes.hpp"

namespace rxcpp {

namespace test {

namespace detail {

template<class T>
struct test_subject_base
    : public std::enable_shared_from_this<test_subject_base<T>>
{
    typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
    typedef std::shared_ptr<test_subject_base<T>> type;

    virtual void on_subscribe(observer<T>) const =0;
    virtual std::vector<recorded_type> messages() const =0;
    virtual std::vector<rxn::subscription> subscriptions() const =0;
};

template<class T>
struct test_source
    : public rxs::source_base<T>
{
    explicit test_source(typename test_subject_base<T>::type ts)
        : ts(std::move(ts))
    {
    }
    typename test_subject_base<T>::type ts;
    void on_subscribe(observer<T> o) const {
        ts->on_subscribe(std::move(o));
    }
    template<class Observer>
    typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type
    on_subscribe(Observer o) const {
        auto so = std::make_shared<Observer>(o);
        ts->on_subscribe(make_observer_dynamic<T>(
            so->get_subscription(),
        // on_next
            [so](T t){
                so->on_next(t);
            },
        // on_error
            [so](std::exception_ptr e){
                so->on_error(e);
            },
        // on_completed
            [so](){
                so->on_completed();
            }));
    }
};

}

template<class T>
class testable_observer
    : public observer<T>
{
    typedef observer<T> observer_base;
    typedef typename detail::test_subject_base<T>::type test_subject;
    test_subject ts;

public:
    typedef typename detail::test_subject_base<T>::recorded_type recorded_type;

    testable_observer(test_subject ts, observer_base ob)
        : observer_base(std::move(ob))
        , ts(std::move(ts))
    {
    }

    std::vector<recorded_type> messages() const {
        return ts->messages();
    }
};

//struct tag_test_observable : public tag_observable {};

template<class T>
class testable_observable
    : public observable<T, typename detail::test_source<T>>
{
    typedef observable<T, typename detail::test_source<T>> observable_base;
    typedef typename detail::test_subject_base<T>::type test_subject;
    test_subject ts;

    //typedef tag_test_observable observable_tag;

public:
    typedef typename detail::test_subject_base<T>::recorded_type recorded_type;

    explicit testable_observable(test_subject ts)
        : observable_base(detail::test_source<T>(ts))
        , ts(ts)
    {
    }

    std::vector<rxn::subscription> subscriptions() const {
        return ts->subscriptions();
    }

    std::vector<recorded_type> messages() const {
        return ts->messages();
    }
};

}
namespace rxt=test;

}

template<class T, class OperatorFactory>
auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
    -> decltype(source.op(std::forward<OperatorFactory>(of))) {
    return      source.op(std::forward<OperatorFactory>(of));
}

#include "schedulers/rx-test.hpp"

#endif