summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-11-25 10:37:24 -0800
committerKirk Shoop <kirk.shoop@microsoft.com>2016-11-25 11:51:43 -0800
commit8290f92f744f807e83b1bfe9e8c0ffd162140ec8 (patch)
tree2582ba4355611606c39f241ce69925245b076b4c /Rx/v2
parent4ab756bf297b1406a9dfafb0e887181952c6cb6a (diff)
downloadRxCpp-8290f92f744f807e83b1bfe9e8c0ffd162140ec8.tar.gz
fix window_with_time_or_count
reported in #277
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp4
-rw-r--r--Rx/v2/test/operators/buffer.cpp50
-rw-r--r--Rx/v2/test/operators/window.cpp57
3 files changed, 109 insertions, 2 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
index 03ee730..9dc2a47 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
@@ -135,10 +135,10 @@ struct window_with_time_or_count
void on_next(T v) const {
auto localState = state;
- auto work = [v, localState](const rxsc::schedulable&){
+ auto work = [v, localState](const rxsc::schedulable& self){
localState->subj.get_subscriber().on_next(v);
if (++localState->cursor == localState->count) {
- release_window(localState->subj_id, localState->worker.now(), localState);
+ release_window(localState->subj_id, localState->worker.now(), localState)(self);
}
};
auto selectedWork = on_exception(
diff --git a/Rx/v2/test/operators/buffer.cpp b/Rx/v2/test/operators/buffer.cpp
index bd3b302..61e76e5 100644
--- a/Rx/v2/test/operators/buffer.cpp
+++ b/Rx/v2/test/operators/buffer.cpp
@@ -1172,3 +1172,53 @@ SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or
}
}
}
+
+SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){
+ GIVEN("1 hot observable of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<std::vector<int>> v_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(205, 1),
+ on.next(305, 2),
+ on.next(505, 3),
+ on.next(605, 4),
+ on.next(610, 5),
+ on.completed(850)
+ });
+ WHEN("group ints on intervals"){
+ using namespace std::chrono;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .buffer_with_time_or_count(milliseconds(370), 2, so)
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains groups of ints"){
+ auto required = rxu::to_vector({
+ v_on.next(306, rxu::to_vector({ 1, 2 })),
+ v_on.next(606, rxu::to_vector({ 3, 4 })),
+ v_on.next(851, rxu::to_vector({ 5 })),
+ v_on.completed(851)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the xs"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 850)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
diff --git a/Rx/v2/test/operators/window.cpp b/Rx/v2/test/operators/window.cpp
index b2049cc..30f109e 100644
--- a/Rx/v2/test/operators/window.cpp
+++ b/Rx/v2/test/operators/window.cpp
@@ -1,5 +1,7 @@
#include "../test.h"
+#include <rxcpp/operators/rx-reduce.hpp>
+
SCENARIO("window count, basic", "[window][operators]"){
GIVEN("1 hot observable of ints."){
auto sc = rxsc::make_test();
@@ -979,3 +981,58 @@ SCENARIO("window with time or count, only time triggered", "[window_with_time_or
}
}
}
+
+SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){
+ GIVEN("1 hot observable of ints."){
+ auto sc = rxsc::make_test();
+ auto so = rx::synchronize_in_one_worker(sc);
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<rx::observable<int>> o_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(205, 1),
+ on.next(305, 2),
+ on.next(505, 3),
+ on.next(605, 4),
+ on.next(610, 5),
+ on.completed(850)
+ });
+
+ WHEN("group each int with the next 2 ints"){
+ using namespace std::chrono;
+
+ auto res = w.start(
+ [&]() {
+ return xs
+ .window_with_time_or_count(milliseconds(370), 2, so)
+ .map([](rx::observable<int> w){
+ return w.count();
+ })
+ .merge()
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains merged groups of ints"){
+ auto required = rxu::to_vector({
+ on.next(306, 2),
+ on.next(606, 2),
+ on.next(851, 1),
+ on.completed(851)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription to the observable"){
+ auto required = rxu::to_vector({
+ o_on.subscribe(200, 850)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}