1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#pragma once
#if !defined(RXCPP_RX_TEST_HPP)
#define RXCPP_RX_TEST_HPP
#include "rx-includes.hpp"
namespace rxcpp {
namespace test {
namespace detail {
template<class T>
struct test_subject_base
: public std::enable_shared_from_this<test_subject_base<T>>
{
typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
typedef std::shared_ptr<test_subject_base<T>> type;
virtual void on_subscribe(observer<T>) const =0;
virtual std::vector<recorded_type> messages() const =0;
virtual std::vector<rxn::subscription> subscriptions() const =0;
};
template<class T>
struct test_source
: public rxs::source_base<T>
{
explicit test_source(typename test_subject_base<T>::type ts)
: ts(std::move(ts))
{
}
typename test_subject_base<T>::type ts;
void on_subscribe(observer<T> o) const {
ts->on_subscribe(std::move(o));
}
template<class Observer>
typename std::enable_if<!std::is_same<typename std::decay<Observer>::type, observer<T>>::value, void>::type
on_subscribe(Observer o) const {
auto so = std::make_shared<Observer>(o);
ts->on_subscribe(make_observer_dynamic<T>(
so->get_subscription(),
// on_next
[so](T t){
so->on_next(t);
},
// on_error
[so](std::exception_ptr e){
so->on_error(e);
},
// on_completed
[so](){
so->on_completed();
}));
}
};
}
template<class T>
class testable_observer
: public observer<T>
{
typedef observer<T> observer_base;
typedef typename detail::test_subject_base<T>::type test_subject;
test_subject ts;
public:
typedef typename detail::test_subject_base<T>::recorded_type recorded_type;
testable_observer(test_subject ts, observer_base ob)
: observer_base(std::move(ob))
, ts(std::move(ts))
{
}
std::vector<recorded_type> messages() const {
return ts->messages();
}
};
//struct tag_test_observable : public tag_observable {};
template<class T>
class testable_observable
: public observable<T, typename detail::test_source<T>>
{
typedef observable<T, typename detail::test_source<T>> observable_base;
typedef typename detail::test_subject_base<T>::type test_subject;
test_subject ts;
//typedef tag_test_observable observable_tag;
public:
typedef typename detail::test_subject_base<T>::recorded_type recorded_type;
explicit testable_observable(test_subject ts)
: observable_base(detail::test_source<T>(ts))
, ts(ts)
{
}
std::vector<rxn::subscription> subscriptions() const {
return ts->subscriptions();
}
std::vector<recorded_type> messages() const {
return ts->messages();
}
};
}
namespace rxt=test;
}
template<class T, class OperatorFactory>
auto operator >> (const rxcpp::test::testable_observable<T>& source, OperatorFactory&& of)
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
return source.op(std::forward<OperatorFactory>(of));
}
#include "schedulers/rx-test.hpp"
#endif
|