diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-11-25 10:37:24 -0800 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-11-25 11:51:43 -0800 |
commit | 8290f92f744f807e83b1bfe9e8c0ffd162140ec8 (patch) | |
tree | 2582ba4355611606c39f241ce69925245b076b4c /Rx/v2 | |
parent | 4ab756bf297b1406a9dfafb0e887181952c6cb6a (diff) | |
download | RxCpp-8290f92f744f807e83b1bfe9e8c0ffd162140ec8.tar.gz |
fix window_with_time_or_count
reported in #277
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp | 4 | ||||
-rw-r--r-- | Rx/v2/test/operators/buffer.cpp | 50 | ||||
-rw-r--r-- | Rx/v2/test/operators/window.cpp | 57 |
3 files changed, 109 insertions, 2 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp index 03ee730..9dc2a47 100644 --- a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp @@ -135,10 +135,10 @@ struct window_with_time_or_count void on_next(T v) const { auto localState = state; - auto work = [v, localState](const rxsc::schedulable&){ + auto work = [v, localState](const rxsc::schedulable& self){ localState->subj.get_subscriber().on_next(v); if (++localState->cursor == localState->count) { - release_window(localState->subj_id, localState->worker.now(), localState); + release_window(localState->subj_id, localState->worker.now(), localState)(self); } }; auto selectedWork = on_exception( diff --git a/Rx/v2/test/operators/buffer.cpp b/Rx/v2/test/operators/buffer.cpp index bd3b302..61e76e5 100644 --- a/Rx/v2/test/operators/buffer.cpp +++ b/Rx/v2/test/operators/buffer.cpp @@ -1172,3 +1172,53 @@ SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or } } } + +SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){ + GIVEN("1 hot observable of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<std::vector<int>> v_on; + + auto xs = sc.make_hot_observable({ + on.next(205, 1), + on.next(305, 2), + on.next(505, 3), + on.next(605, 4), + on.next(610, 5), + on.completed(850) + }); + WHEN("group ints on intervals"){ + using namespace std::chrono; + + auto res = w.start( + [&]() { + return xs + .buffer_with_time_or_count(milliseconds(370), 2, so) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains groups of ints"){ + auto required = rxu::to_vector({ + v_on.next(306, rxu::to_vector({ 1, 2 })), + v_on.next(606, rxu::to_vector({ 3, 4 })), + v_on.next(851, rxu::to_vector({ 5 })), + v_on.completed(851) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the xs"){ + auto required = rxu::to_vector({ + on.subscribe(200, 850) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} diff --git a/Rx/v2/test/operators/window.cpp b/Rx/v2/test/operators/window.cpp index b2049cc..30f109e 100644 --- a/Rx/v2/test/operators/window.cpp +++ b/Rx/v2/test/operators/window.cpp @@ -1,5 +1,7 @@ #include "../test.h" +#include <rxcpp/operators/rx-reduce.hpp> + SCENARIO("window count, basic", "[window][operators]"){ GIVEN("1 hot observable of ints."){ auto sc = rxsc::make_test(); @@ -979,3 +981,58 @@ SCENARIO("window with time or count, only time triggered", "[window_with_time_or } } } + +SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){ + GIVEN("1 hot observable of ints."){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<rx::observable<int>> o_on; + + auto xs = sc.make_hot_observable({ + on.next(205, 1), + on.next(305, 2), + on.next(505, 3), + on.next(605, 4), + on.next(610, 5), + on.completed(850) + }); + + WHEN("group each int with the next 2 ints"){ + using namespace std::chrono; + + auto res = w.start( + [&]() { + return xs + .window_with_time_or_count(milliseconds(370), 2, so) + .map([](rx::observable<int> w){ + return w.count(); + }) + .merge() + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains merged groups of ints"){ + auto required = rxu::to_vector({ + on.next(306, 2), + on.next(606, 2), + on.next(851, 1), + on.completed(851) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the observable"){ + auto required = rxu::to_vector({ + o_on.subscribe(200, 850) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} |