summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
blob: 373d9b3f5b1b0a3940582c27891b6bce62248667 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#pragma once

/*! \file rx-retry-repeat-common.hpp

    \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
    
*/

#include "../rx-includes.hpp"

namespace rxcpp {
  namespace operators {
    namespace detail {

      namespace retry_repeat_common {
        // Structure to perform general retry/repeat operations on state
        template <class Values, class Subscriber, class EventHandlers, class T>
        struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
                            public Values {

          typedef Subscriber output_type;
          state_type(const Values& i, const output_type& oarg)
            : Values(i),
              source_lifetime(composite_subscription::empty()),
              out(oarg) {
          }
          
          void do_subscribe() {
            auto state = this->shared_from_this();
                
            state->out.remove(state->lifetime_token);
            state->source_lifetime.unsubscribe();

            state->source_lifetime = composite_subscription();
            state->lifetime_token = state->out.add(state->source_lifetime);

            state->source.subscribe(
                                    state->out,
                                    state->source_lifetime,
                                    // on_next
                                    [state](T t) {
                                      state->out.on_next(t);
                                    },
                                    // on_error
                                    [state](std::exception_ptr e) {
                                      EventHandlers::on_error(state, e);
                                    },
                                    // on_completed
                                    [state]() {
                                      EventHandlers::on_completed(state);
                                    }
                                    );
          }
          
          composite_subscription source_lifetime;
          output_type out;
          composite_subscription::weak_subscription lifetime_token;
        };

        // Finite case (explicitely limited with the number of times)
        template <class EventHandlers, class T, class Observable, class Count>
        struct finite : public operator_base<T> {
          typedef rxu::decay_t<Observable> source_type;
          typedef rxu::decay_t<Count> count_type;

          struct values {
            values(source_type s, count_type t)
              : source(std::move(s)),
                remaining_(std::move(t)) {
            }

            inline bool completed_predicate() const {
              // Return true if we are completed
              return remaining_ <= 0;
            }
      
            inline void update() {
              // Decrement counter
              --remaining_;
            }

            source_type source;

          private:
            // Counter to hold number of times remaining to complete
            count_type remaining_;
          };
    
          finite(source_type s, count_type t)
            : initial_(std::move(s), std::move(t)) {
          }

          template<class Subscriber>
          void on_subscribe(const Subscriber& s) const {
            typedef state_type<values, Subscriber, EventHandlers, T> state_t;
            // take a copy of the values for each subscription
            auto state = std::make_shared<state_t>(initial_, s);      
            if (initial_.completed_predicate()) {
              // return completed
              state->out.on_completed();
            } else {
              // start the first iteration
              state->do_subscribe();
            }
          }

        private:
          values initial_;
        };

        // Infinite case
        template <class EventHandlers, class T, class Observable>
        struct infinite : public operator_base<T> {
          typedef rxu::decay_t<Observable> source_type;
    
          struct values {
            values(source_type s)
              : source(std::move(s)) {
            }

            static inline bool completed_predicate() {
              // Infinite never completes
              return false;
            }

            static inline void update() {
              // Infinite does not need to update state
            }

            source_type source;
          };
    
          infinite(source_type s) : initial_(std::move(s)) {
          }

          template<class Subscriber>
          void on_subscribe(const Subscriber& s) const {
            typedef state_type<values, Subscriber, EventHandlers, T> state_t;
            // take a copy of the values for each subscription
            auto state = std::make_shared<state_t>(initial_, s);
            // start the first iteration
            state->do_subscribe();
          }

        private:
          values initial_;
        };
        
        
      }
    }
  }
}