summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Murashkin <iam@google.com>2018-08-10 19:37:48 -0700
committerandroid-build-merger <android-build-merger@google.com>2018-08-10 19:37:48 -0700
commit0bb65c12240ea2fd408d13f49e19bbce33786209 (patch)
treeec69d474dd5d82342434380b1ebd67e2857f5cec
parent67ddf72b665874341a5bdb5cc6cf2805d17a3b09 (diff)
parent7a4063f8397b57abae6aec88a168eb1db15a076d (diff)
downloadRxCpp-0bb65c12240ea2fd408d13f49e19bbce33786209.tar.gz
Merge remote-tracking branch 'upstream-master' into master am: 62eaa9fe03 am: 012a858d2a am: 14ec7d62d8
am: 7a4063f839 Change-Id: I9fb2fceb00fbdc36640c57803c8b12cc1a2a3203
-rw-r--r--Ix/CPP/src/cpplinq/linq.hpp8
-rw-r--r--METADATA4
-rw-r--r--README.md24
-rw-r--r--Rx/v2/examples/doxygen/buffer.cpp6
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp116
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp3
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp45
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp7
-rw-r--r--Rx/v2/src/rxcpp/rx-subscription.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp9
-rw-r--r--Rx/v2/test/operators/merge_delay_error.cpp6
-rw-r--r--Rx/v2/test/operators/observe_on.cpp56
12 files changed, 187 insertions, 99 deletions
diff --git a/Ix/CPP/src/cpplinq/linq.hpp b/Ix/CPP/src/cpplinq/linq.hpp
index be77151..6552f79 100644
--- a/Ix/CPP/src/cpplinq/linq.hpp
+++ b/Ix/CPP/src/cpplinq/linq.hpp
@@ -471,22 +471,22 @@ public:
// TODO: skip_while(pred)
- template<typename ITEM = typename element_type>
+ template<typename ITEM = element_type>
typename std::enable_if<std::is_default_constructible<ITEM>::value, ITEM>::type sum() const {
ITEM seed{};
return sum(seed);
}
- typename element_type sum(typename element_type seed) const {
+ element_type sum(element_type seed) const {
return std::accumulate(begin(), end(), seed);
}
- template <typename Selector, typename Result = std::result_of<Selector(typename element_type)>::type>
+ template <typename Selector, typename Result = typename std::result_of<Selector(element_type)>::type>
typename std::enable_if<std::is_default_constructible<Result>::value, Result>::type sum(Selector sel) const {
return from(begin(), end()).select(sel).sum();
}
- template <typename Selector, typename Result = std::result_of<Selector(typename element_type)>::type>
+ template <typename Selector, typename Result = typename std::result_of<Selector(element_type)>::type>
Result sum(Selector sel, Result seed) const {
return from(begin(), end()).select(sel).sum(seed);
}
diff --git a/METADATA b/METADATA
index 7823817..2448caf 100644
--- a/METADATA
+++ b/METADATA
@@ -12,7 +12,7 @@ third_party {
type: GIT
value: "https://github.com/Reactive-Extensions/RxCpp.git"
}
- version: "b84db4278e54e722fbbae794f573d1142261e9a3"
- last_upgrade_date { year: 2018 month: 1 day: 30 }
+ version: "a7d5856385f126e874db6010d9dbfd37290c61de"
+ last_upgrade_date { year: 2018 month: 8 day: 10 }
license_type: NOTICE
}
diff --git a/README.md b/README.md
index 8106c4a..330ca9c 100644
--- a/README.md
+++ b/README.md
@@ -3,17 +3,21 @@ The Reactive Extensions for C++ (__RxCpp__) is a library of algorithms for value
Platform | Status |
----------- | :------------ |
Windows | [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
-Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp)
+Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCpp.svg?style=flat-square)](https://travis-ci.org/ReactiveX/RxCpp)
Source | Badges |
------------- | :--------------- |
-Github | [![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) <br/> [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases) <br/> [![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
-Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
+Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) <br/> [![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases) <br/> [![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/4.1.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
+Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp)
-Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) <br/> [![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/)
+Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp) <br/> [![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/)
+
+# Usage
+
+__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the [Catch](https://github.com/philsquared/Catch) library.
# Example
-Add ```Rx/v2/src``` to the include paths
+Add `Rx/v2/src` to the include paths
[![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)
@@ -124,7 +128,7 @@ Credit [ReactiveX.io](http://reactivex.io/intro.html)
### Other language implementations
* Java: [RxJava](https://github.com/ReactiveX/RxJava)
-* JavaScript: [RxJS](https://github.com/Reactive-Extensions/RxJS)
+* JavaScript: [rxjs](https://github.com/ReactiveX/rxjs)
* C#: [Rx.NET](https://github.com/Reactive-Extensions/Rx.NET)
* [More..](http://reactivex.io/languages.html)
@@ -144,11 +148,11 @@ Credit [ReactiveX.io](http://reactivex.io/intro.html)
RxCpp uses a git submodule (in `ext/catch`) for the excellent [Catch](https://github.com/philsquared/Catch) library. The easiest way to ensure that the submodules are included in the clone is to add `--recursive` in the clone command.
```shell
-git clone --recursive https://github.com/Reactive-Extensions/RxCpp.git
+git clone --recursive https://github.com/ReactiveX/RxCpp.git
cd RxCpp
```
-# Building RxCpp
+# Building RxCpp Unit Tests
* RxCpp is regularly tested on OSX and Windows.
* RxCpp is regularly built with Clang, Gcc and VC
@@ -220,7 +224,7 @@ Example of by-tag
# Documentation
-RxCpp uses Doxygen to generate project [documentation](http://reactive-extensions.github.io/RxCpp).
+RxCpp uses Doxygen to generate project [documentation](http://reactivex.github.io/RxCpp).
When Doxygen+Graphviz is installed, CMake creates a special build task named `doc`. It creates actual documentation and puts it to `projects/doxygen/html/` folder, which can be published to the `gh-pages` branch. Each merged pull request will build the docs and publish them.
@@ -230,7 +234,5 @@ When Doxygen+Graphviz is installed, CMake creates a special build task named `do
Before submitting a feature or substantial code contribution please discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source.
-You will be prompted to submit a Contributor License Agreement form after submitting your pull request. This needs to only be done once for any Microsoft OSS project. Fill in the [Contributor License Agreement](https://cla2.msopentech.com/) (CLA).
-
# Microsoft Open Source Code of Conduct
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
index e88c2ed..58f023a 100644
--- a/Rx/v2/examples/doxygen/buffer.cpp
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -162,7 +162,6 @@ SCENARIO("buffer period sample"){
SCENARIO("buffer period+count+coordination sample"){
printf("//! [buffer period+count+coordination sample]\n");
- auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
@@ -171,7 +170,7 @@ SCENARIO("buffer period+count+coordination sample"){
values.
as_blocking().
subscribe(
- [start](std::vector<long> v){
+ [](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
@@ -184,7 +183,6 @@ SCENARIO("buffer period+count+coordination sample"){
SCENARIO("buffer period+count sample"){
printf("//! [buffer period+count sample]\n");
- auto start = std::chrono::steady_clock::now();
auto int1 = rxcpp::observable<>::range(1L, 3L);
auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
auto values = int1.
@@ -192,7 +190,7 @@ SCENARIO("buffer period+count sample"){
buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
values.
subscribe(
- [start](std::vector<long> v){
+ [](std::vector<long> v){
printf("OnNext:");
std::for_each(v.begin(), v.end(), [](long a){
printf(" %ld", a);
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index f702fba..d1c4ea4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -9,6 +9,7 @@
\tparam KeySelector the type of the key extracting function
\tparam MarbleSelector the type of the element extracting function
\tparam BinaryPredicate the type of the key comparing function
+ \tparam DurationSelector the type of the duration observable function
\param ks a function that extracts the key for each item (optional)
\param ms a function that extracts the return element for each item (optional)
@@ -63,7 +64,7 @@ struct is_group_by_selector_for {
static const bool value = !std::is_same<type, tag_not_valid>::value;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by_traits
{
typedef T source_value_type;
@@ -71,6 +72,7 @@ struct group_by_traits
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
@@ -87,14 +89,15 @@ struct group_by_traits
typedef grouped_observable<key_type, marble_type> grouped_observable_type;
};
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
struct group_by
{
- typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+ typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
typedef typename traits_type::key_selector_type key_selector_type;
typedef typename traits_type::marble_selector_type marble_selector_type;
typedef typename traits_type::marble_type marble_type;
typedef typename traits_type::predicate_type predicate_type;
+ typedef typename traits_type::duration_selector_type duration_selector_type;
typedef typename traits_type::subject_type subject_type;
typedef typename traits_type::key_type key_type;
@@ -130,21 +133,23 @@ struct group_by
struct group_by_values
{
- group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
mutable key_selector_type keySelector;
mutable marble_selector_type marbleSelector;
mutable predicate_type predicate;
+ mutable duration_selector_type durationSelector;
};
group_by_values initial;
- group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
- : initial(std::move(ks), std::move(ms), std::move(p))
+ group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
+ : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds))
{
}
@@ -206,7 +211,35 @@ struct group_by
}
auto sub = subject_type();
g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
- dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())));
+ auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()));
+ auto durationObs = on_exception(
+ [&](){
+ return this->durationSelector(obs);},
+ [this](rxu::error_ptr e){on_error(e);});
+ if (durationObs.empty()) {
+ return;
+ }
+
+ dest.on_next(obs);
+ composite_subscription duration_sub;
+ auto ssub = state->source_lifetime.add(duration_sub);
+
+ auto expire_state = state;
+ auto expire_dest = g->second;
+ auto expire = [=]() {
+ auto g = expire_state->groups.find(selectedKey.get());
+ if (g != expire_state->groups.end()) {
+ expire_state->groups.erase(g);
+ expire_dest.on_completed();
+ }
+ expire_state->source_lifetime.remove(ssub);
+ };
+ auto robs = durationObs.get().take(1);
+ duration_sub.add(robs.subscribe(
+ [](const typename decltype(robs)::value_type &){},
+ [=](rxu::error_ptr) {expire();},
+ [=](){expire();}
+ ));
}
auto selectedMarble = on_exception(
[&](){
@@ -243,33 +276,36 @@ struct group_by
}
};
-template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
class group_by_factory
{
typedef rxu::decay_t<KeySelector> key_selector_type;
typedef rxu::decay_t<MarbleSelector> marble_selector_type;
typedef rxu::decay_t<BinaryPredicate> predicate_type;
+ typedef rxu::decay_t<DurationSelector> duration_selector_type;
key_selector_type keySelector;
marble_selector_type marbleSelector;
predicate_type predicate;
+ duration_selector_type durationSelector;
public:
- group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
+ group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
: keySelector(std::move(ks))
, marbleSelector(std::move(ms))
, predicate(std::move(p))
+ , durationSelector(std::move(ds))
{
}
template<class Observable>
struct group_by_factory_traits
{
typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
- typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
- typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
+ typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
+ typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type;
};
template<class Observable>
auto operator()(Observable&& source)
- -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
- return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
+ -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) {
+ return source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)));
}
};
@@ -288,61 +324,75 @@ auto group_by(AN&&... an)
template<>
struct member_overload<group_by_tag>
{
- template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
+ class Value = typename Traits::grouped_observable_type>
+ static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds)
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)));
+ }
+
+ template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
- template<class Observable, class KeySelector, class MarbleSelector,
+ template<class Observable, class KeySelector, class MarbleSelector,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable, class KeySelector,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o, KeySelector&& ks)
- -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class Observable,
class KeySelector=rxu::detail::take_at<0>,
class MarbleSelector=rxu::detail::take_at<0>,
class BinaryPredicate=rxu::less,
+ class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
class Enabled = rxu::enable_if_all_true_type_t<
all_observables<Observable>>,
class SourceValue = rxu::value_type_t<Observable>,
- class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
- class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+ class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+ class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
class Value = typename Traits::grouped_observable_type>
static auto member(Observable&& o)
- -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) {
- return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()));
+ -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+ return o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
}
template<class... AN>
static operators::detail::group_by_invalid_t<AN...> member(const AN&...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool");
- }
+ static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable");
+ }
};
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index c1d59a9..b50b773 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -317,8 +317,7 @@ public:
};
inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
- static observe_on_one_worker r(rxsc::make_run_loop(rl));
- return r;
+ return observe_on_one_worker(rxsc::make_run_loop(rl));
}
inline observe_on_one_worker observe_on_event_loop() {
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 3bbb448..97bcabd 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -173,28 +173,9 @@ class blocking_observable
-> void {
std::mutex lock;
std::condition_variable wake;
+ bool disposed = false;
rxu::error_ptr error;
- struct tracking
- {
- ~tracking()
- {
- if (!disposed || !wakened) std::terminate();
- }
- tracking()
- {
- disposed = false;
- wakened = false;
- false_wakes = 0;
- true_wakes = 0;
- }
- std::atomic_bool disposed;
- std::atomic_bool wakened;
- std::atomic_int false_wakes;
- std::atomic_int true_wakes;
- };
- auto track = std::make_shared<tracking>();
-
auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
// keep any error to rethrow at the end.
@@ -213,31 +194,19 @@ class blocking_observable
auto cs = scbr.get_subscription();
cs.add(
- [&, track](){
- // OSX geting invalid x86 op if notify_one is after the disposed = true
- // presumably because the condition_variable may already have been awakened
- // and is now sitting in a while loop on disposed
+ [&](){
+ std::unique_lock<std::mutex> guard(lock);
wake.notify_one();
- track->disposed = true;
+ disposed = true;
});
- std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
+ std::unique_lock<std::mutex> guard(lock);
wake.wait(guard,
- [&, track](){
- // this is really not good.
- // false wakeups were never followed by true wakeups so..
-
- // anyways this gets triggered before disposed is set now so wait.
- while (!track->disposed) {
- ++track->false_wakes;
- }
- ++track->true_wakes;
- return true;
+ [&](){
+ return disposed;
});
- track->wakened = true;
- if (!track->disposed || !track->wakened) std::terminate();
if (error) {rxu::rethrow_exception(error);}
}
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index 0f239be..fc68979 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -458,12 +458,19 @@ class schedulable : public schedulable_base
public:
~exit_recursed_scope_type()
{
+ if (that != nullptr) {
that->requestor = nullptr;
+ }
}
exit_recursed_scope_type(const recursed_scope_type* that)
: that(that)
{
}
+ exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
+ : that(other.that)
+ {
+ other.that = nullptr;
+ }
};
public:
recursed_scope_type()
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index 9c00469..ee4e53e 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -379,7 +379,7 @@ public:
composite_subscription()
: inner_type()
- , subscription(*static_cast<const inner_type* const>(this))
+ , subscription(*static_cast<const inner_type*>(this))
{
}
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index cd5f39b..9ce455f 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -428,6 +428,15 @@ struct less
{ return std::forward<LHS>(lhs) < std::forward<RHS>(rhs); }
};
+template <class T>
+struct ret
+{
+ template <class LHS>
+ auto operator()(LHS&& ) const
+ -> decltype(T())
+ { return T(); }
+};
+
template<class T = void>
struct equal_to
{
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index 7c7a58d..b53b884 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -7,7 +7,7 @@ const int static_onnextcalls = 1000000;
//merge_delay_error must work the very same way as `merge()` except the error handling
-SCENARIO("merge delay error completes", "[merge][join][operators]"){
+SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -117,7 +117,7 @@ SCENARIO("merge delay error completes", "[merge][join][operators]"){
}
}
-SCENARIO("variadic merge delay error completes with error", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
@@ -211,7 +211,7 @@ SCENARIO("variadic merge delay error completes with error", "[merge][join][opera
}
}
-SCENARIO("variadic merge delay error completes with 2 errors", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
GIVEN("1 hot observable with 3 cold observables of ints."){
auto sc = rxsc::make_test();
auto w = sc.create_worker();
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index 644ab93..ffa85aa 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-map.hpp>
#include <rxcpp/operators/rx-observe_on.hpp>
const int static_onnextcalls = 100000;
@@ -136,4 +137,57 @@ SCENARIO("stream observe_on", "[observe][observe_on]"){
}
}
-} \ No newline at end of file
+}
+
+class nocompare {
+public:
+ int v;
+};
+
+SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto so = rx::observe_on_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<nocompare> in;
+ const rxsc::test::messages<int> out;
+
+ auto xs = sc.make_hot_observable({
+ in.next(150, nocompare{1}),
+ in.next(210, nocompare{2}),
+ in.next(240, nocompare{3}),
+ in.completed(300)
+ });
+
+ WHEN("observe_on is specified"){
+
+ auto res = w.start(
+ [so, xs]() {
+ return xs
+ | rxo::observe_on(so)
+ | rxo::map([](nocompare v){ return v.v; })
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ out.next(211, 2),
+ out.next(241, 3),
+ out.completed(301)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ out.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}