summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp
blob: 4142890719765dc0670b98453770792ba71dfe76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
#define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class T, class Observable, class Coordination>
struct subscribe_on : public operator_base<T>
{
    typedef rxu::decay_t<Observable> source_type;
    typedef rxu::decay_t<Coordination> coordination_type;
    typedef typename coordination_type::coordinator_type coordinator_type;
    struct subscribe_on_values
    {
        ~subscribe_on_values()
        {
        }
        subscribe_on_values(source_type s, coordination_type sf)
            : source(std::move(s))
            , coordination(std::move(sf))
        {
        }
        source_type source;
        coordination_type coordination;
    private:
        subscribe_on_values& operator=(subscribe_on_values o) RXCPP_DELETE;
    };
    const subscribe_on_values initial;

    ~subscribe_on()
    {
    }
    subscribe_on(source_type s, coordination_type sf)
        : initial(std::move(s), std::move(sf))
    {
    }

    template<class Subscriber>
    void on_subscribe(Subscriber s) const {

        typedef Subscriber output_type;
        struct subscribe_on_state_type
            : public std::enable_shared_from_this<subscribe_on_state_type>
            , public subscribe_on_values
        {
            subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg)
                : subscribe_on_values(i)
                , coordinator(std::move(coor))
                , out(oarg)
            {
            }
            composite_subscription source_lifetime;
            coordinator_type coordinator;
            output_type out;
        private:
            subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
        };

        auto coordinator = initial.coordination.create_coordinator(s.get_subscription());

        auto controller = coordinator.get_worker();

        // take a copy of the values for each subscription
        auto state = std::shared_ptr<subscribe_on_state_type>(new subscribe_on_state_type(initial, std::move(coordinator), std::move(s)));

        auto disposer = [=](const rxsc::schedulable&){
            state->source_lifetime.unsubscribe();
            state->out.unsubscribe();
        };
        auto selectedDisposer = on_exception(
            [&](){return state->coordinator.act(disposer);},
            state->out);
        if (selectedDisposer.empty()) {
            return;
        }

        state->out.add([=](){
            controller.schedule(selectedDisposer.get());
        });
        state->source_lifetime.add([=](){
            controller.schedule(selectedDisposer.get());
        });

        auto producer = [=](const rxsc::schedulable&){
            state->source.subscribe(state->source_lifetime, state->out);
        };

        auto selectedProducer = on_exception(
            [&](){return state->coordinator.act(producer);},
            state->out);
        if (selectedProducer.empty()) {
            return;
        }

        controller.schedule(selectedProducer.get());
    }
private:
    subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
};

template<class Coordination>
class subscribe_on_factory
{
    typedef rxu::decay_t<Coordination> coordination_type;

    coordination_type coordination;
public:
    subscribe_on_factory(coordination_type sf)
        : coordination(std::move(sf))
    {
    }
    template<class Observable>
    auto operator()(Observable&& source)
        ->      observable<rxu::value_type_t<rxu::decay_t<Observable>>,   subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>> {
        return  observable<rxu::value_type_t<rxu::decay_t<Observable>>,   subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>>(
                                                                          subscribe_on<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, Coordination>(std::forward<Observable>(source), coordination));
    }
};

}

template<class Coordination>
auto subscribe_on(Coordination sf)
    ->      detail::subscribe_on_factory<Coordination> {
    return  detail::subscribe_on_factory<Coordination>(std::move(sf));
}

}

}

#endif