summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-05-20 08:35:26 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-05-20 08:35:26 -0700
commitfba0f2d04f8d6ac53994a994353f6ae3cb6a2219 (patch)
tree4be73631eb77c8c53e260a92b5642ff074a2ee00 /Rx/v2/src/rxcpp
parent1f67a383c47b24ffdf4e97771cebbb89f9c167dd (diff)
downloadRxCpp-fba0f2d04f8d6ac53994a994353f6ae3cb6a2219.tar.gz
add synchronize subject and fix lifetime issues
This finally forced a rewrite of subscription. pretty happy with the result. flat map removed all sync and takes a new parameter that is applied to every source and can be used to synchronize them also added multicast
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp92
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-multicast.hpp97
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-publish.hpp45
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ref_count.hpp4
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-notification.hpp22
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp62
-rw-r--r--Rx/v2/src/rxcpp/rx-observer.hpp25
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp15
-rw-r--r--Rx/v2/src/rxcpp/rx-subjects.hpp3
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp9
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp442
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp4
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-test.hpp8
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp17
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp214
17 files changed, 764 insertions, 298 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index 38dd15f..96e7ea6 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -13,11 +13,12 @@ namespace operators {
namespace detail {
-template<class Observable, class CollectionSelector, class ResultSelector>
+template<class Observable, class CollectionSelector, class ResultSelector, class SourceFilter>
struct flat_map_traits {
typedef typename std::decay<Observable>::type source_type;
typedef typename std::decay<CollectionSelector>::type collection_selector_type;
typedef typename std::decay<ResultSelector>::type result_selector_type;
+ typedef typename std::decay<SourceFilter>::type source_filter_type;
typedef typename source_type::value_type source_value_type;
@@ -47,12 +48,12 @@ struct flat_map_traits {
typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
};
-template<class Observable, class CollectionSelector, class ResultSelector>
+template<class Observable, class CollectionSelector, class ResultSelector, class SourceFilter>
struct flat_map
- : public operator_base<typename flat_map_traits<Observable, CollectionSelector, ResultSelector>::value_type>
+ : public operator_base<typename flat_map_traits<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type>
{
- typedef flat_map<Observable, CollectionSelector, ResultSelector> this_type;
- typedef flat_map_traits<Observable, CollectionSelector, ResultSelector> traits;
+ typedef flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter> this_type;
+ typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, SourceFilter> traits;
typedef typename traits::source_type source_type;
typedef typename traits::collection_selector_type collection_selector_type;
@@ -62,22 +63,26 @@ struct flat_map
typedef typename traits::collection_type collection_type;
typedef typename traits::collection_value_type collection_value_type;
+ typedef typename traits::source_filter_type source_filter_type;
+
struct values
{
- values(source_type o, collection_selector_type s, result_selector_type rs)
+ values(source_type o, collection_selector_type s, result_selector_type rs, source_filter_type sf)
: source(std::move(o))
, selectCollection(std::move(s))
, selectResult(std::move(rs))
+ , sourceFilter(std::move(sf))
{
}
source_type source;
collection_selector_type selectCollection;
result_selector_type selectResult;
+ source_filter_type sourceFilter;
};
values initial;
- flat_map(source_type o, collection_selector_type s, result_selector_type rs)
- : initial(std::move(o), std::move(s), std::move(rs))
+ flat_map(source_type o, collection_selector_type s, result_selector_type rs, source_filter_type sf)
+ : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
{
}
@@ -99,12 +104,7 @@ struct flat_map
}
// on_completed on the output must wait until all the
// subscriptions have received on_completed
- std::atomic<int> pendingCompletions;
- // because multiple sources are subscribed to by flat_map,
- // calls to the output must be serialized by lock.
- // the on_error/on_complete and unsubscribe calls can
- // cause lock recursion.
- std::recursive_mutex lock;
+ int pendingCompletions;
output_type out;
};
// take a copy of the values for each subscription
@@ -116,21 +116,26 @@ struct flat_map
// inner subscriptions are unsubscribed as well
state->out.add(outercs);
+ auto source = on_exception(
+ [&](){return state->sourceFilter(state->source);},
+ state->out);
+ if (source.empty()) {
+ return;
+ }
+
++state->pendingCompletions;
// this subscribe does not share the observer subscription
// so that when it is unsubscribed the observer can be called
// until the inner subscriptions have finished
- state->source.subscribe(
+ source->subscribe(
state->out,
outercs,
// on_next
[state](source_value_type st) {
- util::detail::maybe<collection_type> selectedCollection;
- try {
- selectedCollection.reset(state->selectCollection(st));
- } catch(...) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
- state->out.on_error(std::current_exception());
+ auto selectedCollection = on_exception(
+ [&](){return state->selectCollection(st);},
+ state->out);
+ if (selectedCollection.empty()) {
return;
}
@@ -144,34 +149,36 @@ struct flat_map
state->out.remove(innercstoken);
}));
+ auto selectedSource = on_exception(
+ [&](){return state->sourceFilter(selectedCollection.get());},
+ state->out);
+ if (selectedSource.empty()) {
+ return;
+ }
+
++state->pendingCompletions;
// this subscribe does not share the source subscription
// so that when it is unsubscribed the source will continue
- selectedCollection->subscribe(
+ selectedSource->subscribe(
state->out,
innercs,
// on_next
[state, st](collection_value_type ct) {
- util::detail::maybe<typename this_type::value_type> selectedResult;
- try {
- selectedResult.reset(state->selectResult(st, std::move(ct)));
- } catch(...) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
- state->out.on_error(std::current_exception());
+ auto selectedResult = on_exception(
+ [&](){return state->selectResult(st, std::move(ct));},
+ state->out);
+ if (selectedResult.empty()) {
return;
}
- std::unique_lock<std::recursive_mutex> guard(state->lock);
state->out.on_next(std::move(*selectedResult));
},
// on_error
[state](std::exception_ptr e) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
state->out.on_error(e);
},
//on_completed
[state](){
if (--state->pendingCompletions == 0) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
state->out.on_completed();
}
}
@@ -179,13 +186,11 @@ struct flat_map
},
// on_error
[state](std::exception_ptr e) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
state->out.on_error(e);
},
// on_completed
[state]() {
if (--state->pendingCompletions == 0) {
- std::unique_lock<std::recursive_mutex> guard(state->lock);
state->out.on_completed();
}
}
@@ -193,35 +198,38 @@ struct flat_map
}
};
-template<class CollectionSelector, class ResultSelector>
+template<class CollectionSelector, class ResultSelector, class SourceFilter>
class flat_map_factory
{
typedef typename std::decay<CollectionSelector>::type collection_selector_type;
typedef typename std::decay<ResultSelector>::type result_selector_type;
+ typedef typename std::decay<SourceFilter>::type source_filter_type;
collection_selector_type selectorCollection;
result_selector_type selectorResult;
+ source_filter_type sourceFilter;
public:
- flat_map_factory(collection_selector_type s, result_selector_type rs)
+ flat_map_factory(collection_selector_type s, result_selector_type rs, source_filter_type sf)
: selectorCollection(std::move(rs))
, selectorResult(std::move(s))
+ , sourceFilter(std::move(sf))
{
}
template<class Observable>
auto operator()(Observable&& source)
- -> observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>> {
- return observable<typename flat_map<Observable, CollectionSelector, ResultSelector>::value_type, flat_map<Observable, CollectionSelector, ResultSelector>>(
- flat_map<Observable, CollectionSelector, ResultSelector>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult)));
+ -> observable<typename flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type, flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>> {
+ return observable<typename flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>::value_type, flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>>(
+ flat_map<Observable, CollectionSelector, ResultSelector, SourceFilter>(std::forward<Observable>(source), std::move(selectorCollection), std::move(selectorResult), std::move(sourceFilter)));
}
};
}
-template<class CollectionSelector, class ResultSelector>
-auto flat_map(CollectionSelector&& s, ResultSelector&& rs)
- -> detail::flat_map_factory<CollectionSelector, ResultSelector> {
- return detail::flat_map_factory<CollectionSelector, ResultSelector>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs));
+template<class CollectionSelector, class ResultSelector, class SourceFilter>
+auto flat_map(CollectionSelector&& s, ResultSelector&& rs, SourceFilter&& sf)
+ -> detail::flat_map_factory<CollectionSelector, ResultSelector, SourceFilter> {
+ return detail::flat_map_factory<CollectionSelector, ResultSelector, SourceFilter>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<SourceFilter>(sf));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-multicast.hpp b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
new file mode 100644
index 0000000..7f25071
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-multicast.hpp
@@ -0,0 +1,97 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
+#define RXCPP_OPERATORS_RX_MULTICAST_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Observable, class Subject>
+struct multicast : public operator_base<T>
+{
+ typedef typename std::decay<Observable>::type source_type;
+ typedef typename std::decay<Subject>::type subject_type;
+
+ struct multicast_state : public std::enable_shared_from_this<multicast_state>
+ {
+ multicast_state(source_type o, subject_type sub)
+ : source(std::move(o))
+ , subject_value(std::move(sub))
+ {
+ }
+ source_type source;
+ subject_type subject_value;
+ rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
+ };
+
+ std::shared_ptr<multicast_state> state;
+
+ multicast(source_type o, subject_type sub)
+ : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
+ {
+ }
+ template<class Subscriber>
+ void on_subscribe(Subscriber&& o) {
+ state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
+ }
+ void on_connect(composite_subscription cs) {
+ if (state->connection.empty()) {
+ auto destination = state->subject_value.get_subscriber();
+
+ // the lifetime of each connect is nested in the subject lifetime
+ state->connection.reset(destination.add(cs));
+
+ auto localState = state;
+
+ // when the connection is finished it should shutdown the connection
+ cs.add(
+ [destination, localState](){
+ if (!localState->connection.empty()) {
+ destination.remove(localState->connection.get());
+ localState->connection.reset();
+ }
+ });
+
+ // use cs not destination for lifetime of subscribe.
+ state->source.subscribe(cs, destination);
+ }
+ }
+};
+
+template<class Subject>
+class multicast_factory
+{
+ Subject caster;
+public:
+ multicast_factory(Subject sub)
+ : caster(std::move(sub))
+ {
+ }
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>> {
+ return connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>>(
+ multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>(std::forward<Observable>(source), caster));
+ }
+};
+
+}
+
+template<class Subject>
+inline auto multicast(Subject sub)
+ -> detail::multicast_factory<Subject> {
+ return detail::multicast_factory<Subject>(std::move(sub));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
index fcb89f3..34de51a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
@@ -13,44 +13,6 @@ namespace operators {
namespace detail {
-template<class T, class Observable, class Subject>
-struct publish : public operator_base<T>
-{
- typedef typename std::decay<Observable>::type source_type;
- typedef typename std::decay<Subject>::type subject_type;
- source_type source;
- subject_type subject_value;
- rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
-
- explicit publish(source_type o)
- : source(std::move(o))
- {
- }
- template<class Subscriber>
- void on_subscribe(Subscriber&& o) {
- subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
- }
- void on_connect(composite_subscription cs) {
- if (connection.empty()) {
- // the lifetime of each connect is independent
- auto destination = subject_value.get_subscriber();
-
- // when the paramter is unsubscribed it should
- // unsubscribe the most recent connection
- connection.reset(cs.add(destination.get_subscription()));
-
- // when the connection is finished it should shutdown the connection
- destination.add(make_subscription(
- [cs, this](){
- cs.remove(this->connection.get());
- this->connection.reset();
- }));
-
- source.subscribe(destination);
- }
- }
-};
-
template<template<class T> class Subject>
class publish_factory
{
@@ -58,9 +20,10 @@ public:
publish_factory() {}
template<class Observable>
auto operator()(Observable&& source)
- -> connectable_observable<typename std::decay<Observable>::type::value_type, publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>> {
- return connectable_observable<typename std::decay<Observable>::type::value_type, publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>>(
- publish<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>(std::forward<Observable>(source)));
+ -> connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>> {
+ return connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>>(
+ multicast<typename std::decay<Observable>::type::value_type, Observable, Subject<typename std::decay<Observable>::type::value_type>>(
+ std::forward<Observable>(source), Subject<typename std::decay<Observable>::type::value_type>()));
}
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 8d8b1a7..52a3d30 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -31,12 +31,12 @@ struct ref_count : public operator_base<T>
template<class Subscriber>
void on_subscribe(Subscriber&& o) {
auto needConnect = ++subscribers == 1;
- o.add(make_subscription(
+ o.add(
[this](){
if (--this->subscribers == 0) {
this->connection.unsubscribe();
}
- }));
+ });
source.subscribe(std::forward<Subscriber>(o));
if (needConnect) {
connection = source.connect();
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index b15a3d1..75518ab 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -85,6 +85,7 @@
#include <algorithm>
#include <atomic>
#include <map>
+#include <set>
#include <mutex>
#include <deque>
#include <thread>
@@ -94,6 +95,7 @@
#include <queue>
#include <chrono>
#include <condition_variable>
+#include <typeinfo>
#include "rx-util.hpp"
#include "rx-predef.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp
index dd1ff78..8e4d27a 100644
--- a/Rx/v2/src/rxcpp/rx-notification.hpp
+++ b/Rx/v2/src/rxcpp/rx-notification.hpp
@@ -56,6 +56,24 @@ struct notification_base
virtual void accept(const observer_type& o) const =0;
};
+template<class T>
+auto to_stream(std::ostream& os, const T& t, int, int)
+ -> decltype(os << t) {
+ return os << t;
+}
+
+template<class T>
+auto to_stream(std::ostream& os, const T&, int, ...)
+ -> decltype(os << typeid(T).name() << "") {
+ return os << "< " << typeid(T).name() << " does not support ostream>";
+}
+
+template<class T>
+auto to_stream(std::ostream& os, const T&, ...)
+ -> decltype(os << "") {
+ return os << "<the value does not support ostream>";
+}
+
}
template<typename T>
@@ -71,7 +89,9 @@ private:
on_next_notification(T value) : value(std::move(value)) {
}
virtual void out(std::ostream& os) const {
- os << "on_next( " << value << ")";
+ os << "on_next( ";
+ detail::to_stream(os, value, 0, 0);
+ os << ")";
}
virtual bool equals(const typename base::type& other) const {
bool result = false;
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 016bba3..5c197a3 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -74,6 +74,20 @@ struct select_chain
}
+//
+// this type is the default used by operators that subscribe to
+// multiple sources. It assumes that the sources are already synchronized
+//
+struct identity_observable
+{
+ template<class Observable>
+ auto operator()(Observable o)
+ -> Observable {
+ return std::move(o);
+ static_assert(is_observable<Observable>::value, "only support observables");
+ }
+};
+
template<class T>
class dynamic_observable
: public rxs::source_base<T>
@@ -150,7 +164,7 @@ private:
template<class Subscriber>
auto detail_subscribe(Subscriber&& scrbr) const
- -> decltype(make_subscription(*(typename std::decay<Subscriber>::type*)nullptr)) {
+ -> composite_subscription {
typedef typename std::decay<Subscriber>::type subscriber_type;
subscriber_type o = std::forward<Subscriber>(scrbr);
@@ -160,7 +174,7 @@ private:
static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
if (!o.is_subscribed()) {
- return make_subscription(o);
+ return o.get_subscription();
}
auto safe_subscribe = [=]() {
@@ -189,7 +203,7 @@ private:
safe_subscribe();
}
- return make_subscription(o);
+ return o.get_subscription();
}
public:
@@ -302,18 +316,46 @@ public:
///
template<class CollectionSelector, class ResultSelector>
auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const
- -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>> {
- return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>>(
- rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs)));
+ -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>> {
+ return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>>(
+ rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, identity_observable>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_observable()));
+ }
+
+ /// flat_map (AKA SelectMany) ->
+ /// for each item from this observable use the CollectionSelector to select an observable and subscribe to that observable.
+ /// for each item from all of the selected observables use the ResultSelector to select a value to emit from the new observable that is returned.
+ ///
+ template<class CollectionSelector, class ResultSelector, class SourceFilter>
+ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, SourceFilter&& sf) const
+ -> observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>> {
+ return observable<typename rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>::value_type, rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>>(
+ rxo::detail::flat_map<this_type, CollectionSelector, ResultSelector, SourceFilter>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<SourceFilter>(sf)));
+ }
+
+ /// multicast ->
+ /// allows connections to the source to be independent of subscriptions
+ ///
+ template<class Subject>
+ auto multicast(Subject sub) const
+ -> connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>> {
+ return connectable_observable<T, rxo::detail::multicast<T, this_type, Subject>>(
+ rxo::detail::multicast<T, this_type, Subject>(*this, std::move(sub)));
+ }
+
+ /// synchronize ->
+ /// turns a cold observable hot and allows connections to the source to be independent of subscriptions
+ ///
+ auto synchronize(rxsc::worker w, composite_subscription cs = composite_subscription()) const
+ -> decltype(multicast(rxsub::synchronize<T>(w, cs))) {
+ return multicast(rxsub::synchronize<T>(w, cs));
}
/// publish ->
/// turns a cold observable hot and allows connections to the source to be independent of subscriptions
///
- auto publish() const
- -> connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>> {
- return connectable_observable<T, rxo::detail::publish<T, this_type, rxsub::subject<T>>>(
- rxo::detail::publish<T, this_type, rxsub::subject<T>>(*this));
+ auto publish(composite_subscription cs = composite_subscription()) const
+ -> decltype(multicast(rxsub::subject<T>(cs))) {
+ return multicast(rxsub::subject<T>(cs));
}
/// take ->
diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp
index 6b573ae..b6bad50 100644
--- a/Rx/v2/src/rxcpp/rx-observer.hpp
+++ b/Rx/v2/src/rxcpp/rx-observer.hpp
@@ -366,6 +366,31 @@ auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
dynamic_observer<T>(make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc))));
}
+
+template<class F, class OnError>
+auto on_exception(const F& f, const OnError& c)
+ -> typename std::enable_if<detail::is_on_error<OnError>::value, rxu::detail::maybe<decltype(f())>>::type {
+ rxu::detail::maybe<decltype(f())> r;
+ try {
+ r.reset(f());
+ } catch (...) {
+ c(std::current_exception());
+ }
+ return r;
+}
+
+template<class F, class Subscriber>
+auto on_exception(const F& f, const Subscriber& s)
+ -> typename std::enable_if<is_subscriber<Subscriber>::value, rxu::detail::maybe<decltype(f())>>::type {
+ rxu::detail::maybe<decltype(f())> r;
+ try {
+ r.reset(f());
+ } catch (...) {
+ s.on_error(std::current_exception());
+ }
+ return r;
+}
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 34c44bf..48c84d5 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -39,6 +39,7 @@ namespace rxo=operators;
#include "operators/rx-filter.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-flat_map.hpp"
+#include "operators/rx-multicast.hpp"
#include "operators/rx-publish.hpp"
#include "operators/rx-ref_count.hpp"
#include "operators/rx-connect_forever.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index 754a7b0..5713b58 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -193,7 +193,6 @@ class worker : public worker_base
friend bool operator==(const worker&, const worker&);
public:
typedef scheduler_base::clock_type clock_type;
- typedef composite_subscription::shared_subscription shared_subscription;
typedef composite_subscription::weak_subscription weak_subscription;
worker()
@@ -217,10 +216,7 @@ public:
inline bool is_subscribed() const {
return lifetime.is_subscribed();
}
- inline weak_subscription add(shared_subscription s) const {
- return lifetime.add(std::move(s));
- }
- inline weak_subscription add(dynamic_subscription s) const {
+ inline weak_subscription add(subscription s) const {
return lifetime.add(std::move(s));
}
inline void remove(weak_subscription w) const {
@@ -452,7 +448,6 @@ class schedulable : public schedulable_base
public:
typedef composite_subscription::weak_subscription weak_subscription;
- typedef composite_subscription::shared_subscription shared_subscription;
typedef scheduler_base::clock_type clock_type;
~schedulable()
@@ -543,11 +538,13 @@ public:
inline bool is_subscribed() const {
return lifetime.is_subscribed();
}
- inline weak_subscription add(shared_subscription s) const {
+ inline weak_subscription add(subscription s) const {
return lifetime.add(std::move(s));
}
- inline weak_subscription add(dynamic_subscription s) const {
- return lifetime.add(std::move(s));
+ template<class F>
+ auto add(F f) const
+ -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
+ return lifetime.add(make_subscription(std::move(f)));
}
inline void remove(weak_subscription w) const {
return lifetime.remove(std::move(w));
diff --git a/Rx/v2/src/rxcpp/rx-subjects.hpp b/Rx/v2/src/rxcpp/rx-subjects.hpp
index 1289835..3c3fb32 100644
--- a/Rx/v2/src/rxcpp/rx-subjects.hpp
+++ b/Rx/v2/src/rxcpp/rx-subjects.hpp
@@ -17,5 +17,6 @@ namespace rxsub=subjects;
}
#include "subjects/rx-subject.hpp"
+#include "subjects/rx-synchronize.hpp"
-#endif \ No newline at end of file
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index 7c3207d..939d76c 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -43,7 +43,6 @@ class subscriber : public subscriber_base<T>
subscriber();
public:
typedef typename composite_subscription::weak_subscription weak_subscription;
- typedef typename composite_subscription::shared_subscription shared_subscription;
subscriber(const this_type& o)
: lifetime(o.lifetime)
@@ -132,11 +131,13 @@ public:
bool is_subscribed() const {
return lifetime.is_subscribed();
}
- weak_subscription add(shared_subscription s) const {
+ weak_subscription add(subscription s) const {
return lifetime.add(std::move(s));
}
- weak_subscription add(dynamic_subscription s) const {
- return lifetime.add(std::move(s));
+ template<class F>
+ auto add(F f) const
+ -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
+ return lifetime.add(make_subscription(std::move(f)));
}
void remove(weak_subscription w) const {
return lifetime.remove(std::move(w));
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index 40700e8..527d52b 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -9,6 +9,21 @@
namespace rxcpp {
+namespace detail {
+
+template<class F>
+struct is_unsubscribe_function
+{
+ struct not_void {};
+ template<class CF>
+ static auto check(int) -> decltype((*(CF*)nullptr)());
+ template<class CF>
+ static not_void check(...);
+
+ static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
+};
+
+}
struct tag_subscription {};
struct subscription_base {typedef tag_subscription subscription_tag;};
@@ -23,39 +38,8 @@ public:
static const bool value = std::is_convertible<decltype(check<typename std::decay<T>::type>(0)), tag_subscription*>::value;
};
-class dynamic_subscription : public subscription_base
-{
- typedef std::function<void()> unsubscribe_call_type;
- unsubscribe_call_type unsubscribe_call;
- dynamic_subscription()
- {
- }
-public:
- dynamic_subscription(const dynamic_subscription& o)
- : unsubscribe_call(o.unsubscribe_call)
- {
- }
- dynamic_subscription(dynamic_subscription&& o)
- : unsubscribe_call(std::move(o.unsubscribe_call))
- {
- }
- template<class I>
- dynamic_subscription(I i, typename std::enable_if<is_subscription<I>::value && !std::is_same<I, dynamic_subscription>::value, void**>::type selector = nullptr)
- : unsubscribe_call([i](){
- i.unsubscribe();})
- {
- }
- dynamic_subscription(unsubscribe_call_type s)
- : unsubscribe_call(std::move(s))
- {
- }
- void unsubscribe() const {
- unsubscribe_call();
- }
-};
-
template<class Unsubscribe>
-class static_subscription : public subscription_base
+class static_subscription
{
typedef typename std::decay<Unsubscribe>::type unsubscribe_call_type;
unsubscribe_call_type unsubscribe_call;
@@ -80,178 +64,243 @@ public:
}
};
-template<class I>
class subscription : public subscription_base
{
- typedef typename std::decay<I>::type inner_t;
- inner_t inner;
- mutable bool issubscribed;
+ class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
+ {
+ base_subscription_state();
+ public:
+
+ explicit base_subscription_state(bool initial)
+ : issubscribed(initial)
+ {
+ }
+ virtual void unsubscribe() {
+ issubscribed = false;
+ }
+ std::atomic<bool> issubscribed;
+ };
public:
- subscription(inner_t inner)
- : inner(std::move(inner))
- , issubscribed(true)
+ typedef std::weak_ptr<base_subscription_state> weak_state_type;
+
+private:
+ template<class I>
+ struct subscription_state : public base_subscription_state
{
- }
- bool is_subscribed() const {
- return issubscribed;
- }
- void unsubscribe() const {
- if (issubscribed) {
- inner.unsubscribe();
+ typedef typename std::decay<I>::type inner_t;
+ subscription_state(inner_t i)
+ : base_subscription_state(true)
+ , inner(std::move(i))
+ {
+ }
+ virtual void unsubscribe() {
+ if (issubscribed.exchange(false)) {
+ inner.unsubscribe();
+ }
+ }
+ inner_t inner;
+ };
+ std::shared_ptr<base_subscription_state> state;
+
+ friend bool operator<(const subscription&, const subscription&);
+ friend bool operator==(const subscription&, const subscription&);
+
+ subscription(weak_state_type w)
+ : state(w.lock())
+ {
+ if (!state) {
+ abort();
}
- issubscribed = false;
}
-};
-template<>
-class subscription<void> : public subscription_base
-{
public:
+
subscription()
+ : state(std::make_shared<base_subscription_state>(false))
+ {
+ if (!state) {
+ abort();
+ }
+ }
+ template<class U>
+ explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
+ : state(std::make_shared<subscription_state<U>>(std::move(u)))
+ {
+ if (!state) {
+ abort();
+ }
+ }
+ template<class U>
+ explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
+ // intentionally slice
+ : state(std::move(static_cast<subscription&>(u).state))
+ {
+ if (!state) {
+ abort();
+ }
+ }
+ subscription(const subscription& o)
+ : state(o.state)
+ {
+ if (!state) {
+ abort();
+ }
+ }
+ subscription(subscription&& o)
+ : state(std::move(o.state))
{
+ if (!state) {
+ abort();
+ }
+ }
+ subscription& operator=(subscription o) {
+ state = std::move(o.state);
+ return *this;
}
bool is_subscribed() const {
- return false;
+ if (!state) {
+ abort();
+ }
+ return state->issubscribed;
}
void unsubscribe() const {
+ if (!state) {
+ abort();
+ }
+ auto keepAlive = state;
+ state->unsubscribe();
+ }
+
+ weak_state_type get_weak() {
+ return state;
+ }
+ static subscription lock(weak_state_type w) {
+ return subscription(w);
}
};
+
+inline bool operator<(const subscription& lhs, const subscription& rhs) {
+ return lhs.state < rhs.state;
+}
+inline bool operator==(const subscription& lhs, const subscription& rhs) {
+ return lhs.state == rhs.state;
+}
+inline bool operator!=(const subscription& lhs, const subscription& rhs) {
+ return !(lhs == rhs);
+}
+
+
inline auto make_subscription()
- -> subscription<void> {
- return subscription<void>();
+ -> subscription {
+ return subscription();
}
template<class I>
auto make_subscription(I&& i)
- -> typename std::enable_if<is_subscription<I>::value,
- subscription<I>>::type {
- return subscription<I>(std::forward<I>(i));
+ -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
+ subscription>::type {
+ return subscription(std::forward<I>(i));
}
template<class Unsubscribe>
auto make_subscription(Unsubscribe&& u)
- -> typename std::enable_if<!is_subscription<Unsubscribe>::value,
- subscription< static_subscription<Unsubscribe>>>::type {
- return subscription< static_subscription<Unsubscribe>>(
- static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
+ -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
+ subscription>::type {
+ return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
}
-class composite_subscription : public subscription_base
+namespace detail {
+
+struct tag_composite_subscription_empty {};
+
+class composite_subscription_inner
{
-public:
- typedef std::shared_ptr<dynamic_subscription> shared_subscription;
- typedef std::weak_ptr<dynamic_subscription> weak_subscription;
private:
- struct tag_empty {};
- struct state_t : public std::enable_shared_from_this<state_t>
+ typedef subscription::weak_state_type weak_subscription;
+ struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
{
- std::vector<shared_subscription> subscriptions;
- std::recursive_mutex lock;
- bool issubscribed;
+ std::set<subscription> subscriptions;
+ std::mutex lock;
+ std::atomic<bool> issubscribed;
- state_t()
- : issubscribed(true)
+ ~composite_subscription_state()
{
+ std::unique_lock<decltype(lock)> guard(lock);
+ subscriptions.clear();
}
- state_t(tag_empty&&)
- : issubscribed(false)
+ composite_subscription_state()
+ : issubscribed(true)
{
}
-
- inline bool is_subscribed() {
- return issubscribed;
- }
-
- inline weak_subscription add(dynamic_subscription s) {
- return add(std::make_shared<dynamic_subscription>(std::move(s)));
+ composite_subscription_state(tag_composite_subscription_empty)
+ : issubscribed(false)
+ {
}
- inline weak_subscription add(shared_subscription s) {
- std::unique_lock<decltype(lock)> guard(lock);
-
+ inline weak_subscription add(subscription s) {
if (!issubscribed) {
- s->unsubscribe();
- } else {
- auto end = std::end(subscriptions);
- auto it = std::find(std::begin(subscriptions), end, s);
- if (it == end)
- {
- subscriptions.emplace_back(s);
- }
+ s.unsubscribe();
+ } else if (s.is_subscribed()) {
+ std::unique_lock<decltype(lock)> guard(lock);
+ subscriptions.insert(s);
}
- return s;
+ return s.get_weak();
}
inline void remove(weak_subscription w) {
- std::unique_lock<decltype(lock)> guard(lock);
-
if (issubscribed && !w.expired()) {
- auto s = w.lock();
- if (s)
- {
- auto end = std::end(subscriptions);
- auto it = std::find(std::begin(subscriptions), end, s);
- if (it != end)
- {
- subscriptions.erase(it);
- }
- }
+ std::unique_lock<decltype(lock)> guard(lock);
+ subscriptions.erase(subscription::lock(w));
}
}
inline void clear() {
- std::unique_lock<decltype(lock)> guard(lock);
-
if (issubscribed) {
- std::vector<shared_subscription> v(std::move(subscriptions));
+ std::unique_lock<decltype(lock)> guard(lock);
+
+ std::set<subscription> v(std::move(subscriptions));
+ guard.unlock();
std::for_each(v.begin(), v.end(),
- [](shared_subscription& s) {
- s->unsubscribe(); });
+ [](const subscription& s) {
+ s.unsubscribe(); });
}
}
inline void unsubscribe() {
- std::unique_lock<decltype(lock)> guard(lock);
+ if (issubscribed.exchange(false)) {
+ std::unique_lock<decltype(lock)> guard(lock);
- if (issubscribed) {
- issubscribed = false;
- std::vector<shared_subscription> v(std::move(subscriptions));
+ std::set<subscription> v(std::move(subscriptions));
+ guard.unlock();
std::for_each(v.begin(), v.end(),
- [](shared_subscription& s) {
- s->unsubscribe(); });
+ [](const subscription& s) {
+ s.unsubscribe(); });
}
}
};
- mutable std::shared_ptr<state_t> state;
+public:
+ typedef std::shared_ptr<composite_subscription_state> shared_state_type;
- static std::shared_ptr<state_t> shared_empty;
+protected:
+ mutable shared_state_type state;
- composite_subscription(std::shared_ptr<state_t> s)
- : state(std::move(s))
+public:
+ composite_subscription_inner()
+ : state(std::make_shared<composite_subscription_state>())
{
- if (!state) {
- abort();
- }
}
-
- friend bool operator==(const composite_subscription&, const composite_subscription&);
-
-public:
-
- composite_subscription()
- : state(std::make_shared<state_t>())
+ composite_subscription_inner(tag_composite_subscription_empty et)
+ : state(std::make_shared<composite_subscription_state>(et))
{
- if (!state) {
- abort();
- }
}
- composite_subscription(const composite_subscription& o)
+
+ composite_subscription_inner(const composite_subscription_inner& o)
: state(o.state)
{
if (!state) {
abort();
}
}
- composite_subscription(composite_subscription&& o)
+ composite_subscription_inner(composite_subscription_inner&& o)
: state(std::move(o.state))
{
if (!state) {
@@ -259,75 +308,120 @@ public:
}
}
- composite_subscription& operator=(const composite_subscription& o)
+ composite_subscription_inner& operator=(composite_subscription_inner o)
{
- state = o.state;
+ state = std::move(o.state);
if (!state) {
abort();
}
return *this;
}
- composite_subscription& operator=(composite_subscription&& o)
- {
- state = std::move(o.state);
+
+ inline weak_subscription add(subscription s) const {
if (!state) {
abort();
}
- return *this;
- }
-
- static inline composite_subscription empty() {
- return composite_subscription(shared_empty);
- }
-
- inline bool is_subscribed() const {
- return state->is_subscribed();
- }
- inline weak_subscription add(shared_subscription s) const {
- return state->add(std::move(s));
- }
- inline weak_subscription add(dynamic_subscription s) const {
+ if (s == static_cast<const subscription&>(*this)) {
+ // do not nest the same subscription
+ abort();
+ //return s.get_weak();
+ }
return state->add(std::move(s));
}
inline void remove(weak_subscription w) const {
+ if (!state) {
+ abort();
+ }
state->remove(std::move(w));
}
inline void clear() const {
+ if (!state) {
+ abort();
+ }
state->clear();
}
- inline void unsubscribe() const {
+ inline void unsubscribe() {
+ if (!state) {
+ abort();
+ }
state->unsubscribe();
}
};
-inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
- return lhs.state == rhs.state;
-}
-inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
- return !(lhs == rhs);
}
-//static
-RXCPP_SELECT_ANY std::shared_ptr<composite_subscription::state_t> composite_subscription::shared_empty = std::make_shared<composite_subscription::state_t>(composite_subscription::tag_empty());
+class composite_subscription
+ : protected detail::composite_subscription_inner
+ , public subscription
+{
+ typedef detail::composite_subscription_inner inner_type;
+public:
+ typedef subscription::weak_state_type weak_subscription;
+ static composite_subscription shared_empty;
-namespace detail {
+ composite_subscription(detail::tag_composite_subscription_empty et)
+ : inner_type(et)
+ , subscription() // use empty base
+ {
+ }
-struct tag_subscription_resolution
-{
- template<class LHS>
- struct predicate
+public:
+
+ composite_subscription()
+ : inner_type()
+ , subscription(static_cast<const inner_type&>(*this))
{
- static const bool value = !is_subscriber<LHS>::value && !is_observer<LHS>::value && is_subscription<LHS>::value;
- };
- struct default_type {
- inline operator composite_subscription() const {
- return composite_subscription();
- }
- };
+ }
+
+ composite_subscription(const composite_subscription& o)
+ : inner_type(o)
+ , subscription(static_cast<const subscription&>(o))
+ {
+ }
+ composite_subscription(composite_subscription&& o)
+ : inner_type(std::move(o))
+ , subscription(std::move(static_cast<subscription&>(o)))
+ {
+ }
+
+ composite_subscription& operator=(composite_subscription o)
+ {
+ inner_type::operator=(std::move(o));
+ subscription::operator=(std::move(static_cast<subscription&>(o)));
+ return *this;
+ }
+
+ static inline composite_subscription empty() {
+ return shared_empty;
+ }
+
+ using subscription::is_subscribed;
+ using subscription::unsubscribe;
+
+ using inner_type::add;
+ using inner_type::remove;
+ using inner_type::clear;
+
+ template<class F>
+ auto add(F f) const
+ -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
+ return inner_type::add(make_subscription(std::move(f)));
+ }
};
+inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
+ return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
}
+inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
+ return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
+}
+inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
+ return !(lhs == rhs);
+}
+
+//static
+RXCPP_SELECT_ANY composite_subscription composite_subscription::shared_empty = composite_subscription(detail::tag_composite_subscription_empty());
}
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 6b37ca3..76b7f9e 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -227,12 +227,12 @@ public:
}
maybe& operator=(const T& other) {
- set(other);
+ reset(other);
return *this;
}
maybe& operator=(const maybe& other) {
if (const T* pother = other.get()) {
- set(*pother);
+ reset(*pother);
} else {
reset();
}
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
index 411bd43..6d3cf6f 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
@@ -251,9 +251,9 @@ public:
}
auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
- o.add(dynamic_subscription([sharedThis, index]() {
+ o.add([sharedThis, index]() {
sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
- }));
+ });
}
virtual std::vector<rxn::subscription> subscriptions() const {
@@ -313,9 +313,9 @@ public:
auto index = sv.size() - 1;
auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
- o.add(dynamic_subscription([sharedThis, index]() {
+ o.add([sharedThis, index]() {
sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
- }));
+ });
}
virtual std::vector<rxn::subscription> subscriptions() const {
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index dffa37c..e892828 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -2,8 +2,8 @@
#pragma once
-#if !defined(RXCPP_RX_SCHEDULER_SUBJECT_HPP)
-#define RXCPP_RX_SCHEDULER_SUBJECT_HPP
+#if !defined(RXCPP_RX_SUBJECT_HPP)
+#define RXCPP_RX_SUBJECT_HPP
#include "../rx-includes.hpp"
@@ -200,7 +200,11 @@ class subject
detail::multicast_observer<T> s;
public:
- explicit subject(composite_subscription cs = composite_subscription())
+ subject()
+ : s(lifetime)
+ {
+ }
+ explicit subject(composite_subscription cs)
: lifetime(cs)
, s(cs)
{
@@ -210,11 +214,8 @@ public:
return s.has_observers();
}
- subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber(composite_subscription cs = composite_subscription()) const {
- auto lt = lifetime;
- auto token = lt.add(cs);
- cs.add(make_subscription([token, lt](){lt.remove(token);}));
- return make_subscriber<T>(cs, observer<T, detail::multicast_observer<T>>(s));
+ subscriber<T, observer<T, detail::multicast_observer<T>>> get_subscriber() const {
+ return make_subscriber<T>(lifetime, observer<T, detail::multicast_observer<T>>(s));
}
observable<T> get_observable() const {
diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
new file mode 100644
index 0000000..60ce46a
--- /dev/null
+++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
@@ -0,0 +1,214 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
+#define RXCPP_RX_SYNCHRONIZE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace subjects {
+
+namespace detail {
+
+template<class T>
+class synchronize_observer : public detail::multicast_observer<T>
+{
+ typedef synchronize_observer<T> this_type;
+ typedef detail::multicast_observer<T> base_type;
+
+ struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state>
+ {
+ typedef rxn::notification<T> notification_type;
+ typedef typename notification_type::type base_notification_type;
+ typedef std::deque<base_notification_type> queue_type;
+
+ struct mode
+ {
+ enum type {
+ Invalid = 0,
+ Processing,
+ Empty,
+ Disposed
+ };
+ };
+
+ mutable std::mutex lock;
+ mutable std::condition_variable wake;
+ mutable queue_type queue;
+ composite_subscription lifetime;
+ rxsc::worker processor;
+ mutable typename mode::type current;
+ subscriber<T> destination;
+
+ void ensure_processing(std::unique_lock<std::mutex>& guard) const {
+ if (!guard.owns_lock()) {
+ abort();
+ }
+ if (current == mode::Empty) {
+ current = mode::Processing;
+ auto keepAlive = this->shared_from_this();
+ processor.schedule(lifetime, [keepAlive, this](const rxsc::schedulable& self){
+ try {
+ std::unique_lock<std::mutex> guard(lock);
+ if (!lifetime.is_subscribed() || !destination.is_subscribed()) {
+ current = mode::Disposed;
+ queue.clear();
+ guard.unlock();
+ lifetime.unsubscribe();
+ destination.unsubscribe();
+ return;
+ }
+ if (queue.empty()) {
+ current = mode::Empty;
+ return;
+ }
+ auto notification = std::move(queue.front());
+ queue.pop_front();
+ guard.unlock();
+ notification->accept(destination);
+ self();
+ } catch(...) {
+ destination.on_error(std::current_exception());
+ std::unique_lock<std::mutex> guard(lock);
+ current = mode::Empty;
+ }
+ });
+ }
+ }
+
+ synchronize_observer_state(rxsc::worker w, composite_subscription cs, subscriber<T> scbr)
+ : lifetime(cs)
+ , processor(w)
+ , current(mode::Empty)
+ , destination(scbr)
+ {
+ }
+
+ template<class V>
+ void on_next(V v) const {
+ if (lifetime.is_subscribed()) {
+ std::unique_lock<std::mutex> guard(lock);
+ queue.push_back(notification_type::on_next(std::move(v)));
+ wake.notify_one();
+ ensure_processing(guard);
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ if (lifetime.is_subscribed()) {
+ std::unique_lock<std::mutex> guard(lock);
+ queue.push_back(notification_type::on_error(e));
+ wake.notify_one();
+ ensure_processing(guard);
+ }
+ }
+ void on_completed() const {
+ if (lifetime.is_subscribed()) {
+ std::unique_lock<std::mutex> guard(lock);
+ queue.push_back(notification_type::on_completed());
+ wake.notify_one();
+ ensure_processing(guard);
+ }
+ }
+ };
+
+ std::shared_ptr<synchronize_observer_state> state;
+
+public:
+ synchronize_observer(rxsc::worker w, composite_subscription cs)
+ : base_type(cs)
+ , state(std::make_shared<synchronize_observer_state>(
+ w, cs, make_subscriber<T>(cs, make_observer_dynamic<T>( *static_cast<base_type*>(this) ))))
+ {}
+
+ template<class V>
+ void on_next(V v) const {
+ state->on_next(std::move(v));
+ }
+ void on_error(std::exception_ptr e) const {
+ state->on_error(e);
+ }
+ void on_completed() const {
+ state->on_completed();
+ }
+};
+
+}
+
+template<class T>
+class synchronize
+{
+ rxsc::worker controller;
+ composite_subscription lifetime;
+ detail::synchronize_observer<T> s;
+
+public:
+ explicit synchronize(rxsc::worker w, composite_subscription cs = composite_subscription())
+ : controller(w)
+ , lifetime(cs)
+ , s(w, cs)
+ {
+ }
+
+ bool has_observers() const {
+ return s.has_observers();
+ }
+
+ subscriber<T> get_subscriber() const {
+ return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::synchronize_observer<T>>(s)));
+ }
+
+ observable<T> get_observable() const {
+ return make_observable_dynamic<T>([this](subscriber<T> o){
+ this->s.add(std::move(o));
+ });
+ }
+};
+
+//
+// this type is used by operators that subscribe to
+// multiple sources to ensure that the notifications are serialized
+//
+class synchronize_observable
+{
+ rxsc::scheduler factory;
+ rxsc::worker controller;
+public:
+ synchronize_observable(rxsc::scheduler sc)
+ : factory(sc)
+ , controller(sc.create_worker())
+ {
+ }
+ synchronize_observable(const synchronize_observable& o)
+ : factory(o.factory)
+ // new worker for each copy. this spreads work across threads
+ // but keeps each use serialized
+ , controller(factory.create_worker())
+ {
+ }
+ synchronize_observable(synchronize_observable&& o)
+ : factory(std::move(o.factory))
+ , controller(std::move(o.controller))
+ {
+ }
+ synchronize_observable& operator=(synchronize_observable o)
+ {
+ factory = std::move(o.factory);
+ controller = std::move(o.controller);
+ return *this;
+ }
+ template<class Observable>
+ auto operator()(Observable o)
+ -> decltype(o.synchronize(controller).ref_count()) {
+ return o.synchronize(controller).ref_count();
+ static_assert(is_observable<Observable>::value, "can only synchronize observables");
+ }
+};
+
+}
+
+}
+
+#endif