diff options
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | Rx/CPP/test/operators/Publish.cpp | 111 | ||||
-rw-r--r-- | Rx/CPP/test/operators/Where.cpp | 389 | ||||
-rw-r--r-- | Rx/CPP/test/test.cpp | 2 | ||||
-rw-r--r-- | projects/CMake/CMakeLists.txt | 20 |
5 files changed, 522 insertions, 2 deletions
@@ -106,4 +106,4 @@ Desktop.ini ############ projects/* -!projects/CMake/CMakelists.txt +!projects/CMake/CMakeLists.txt diff --git a/Rx/CPP/test/operators/Publish.cpp b/Rx/CPP/test/operators/Publish.cpp new file mode 100644 index 0000000..26e6e5b --- /dev/null +++ b/Rx/CPP/test/operators/Publish.cpp @@ -0,0 +1,111 @@ +#include "cpprx/rx.hpp" +namespace rx=rxcpp; + +#include "catch.hpp" + +SCENARIO("publish_last", "[publish_last][publish][multicast][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + auto xs = scheduler->CreateHotObservable( + []() { + m::RecordedT messages[] = { + m::OnNext(110, 7), + m::OnNext(220, 3), + m::OnNext(280, 4), + m::OnNext(290, 1), + m::OnNext(340, 8), + m::OnNext(360, 5), + m::OnNext(370, 6), + m::OnNext(390, 7), + m::OnNext(410, 13), + m::OnNext(430, 2), + m::OnNext(450, 9), + m::OnNext(520, 11), + m::OnNext(560, 20), + m::OnCompleted(600) + }; + return m::ToVector(messages); + }() + ); + + auto res = scheduler->CreateObserver<long>(); + + rx::SerialDisposable subscription; + std::shared_ptr<rx::ConnectableObservable<long>> ys; + + WHEN("subscribed and then connected"){ + + scheduler->ScheduleAbsolute(rx::TestScheduler::Created, + [&invoked, &ys, &xs](rx::Scheduler::shared) { + ys = rx::observable(rx::from(xs) + .publish_last()); + return rx::Disposable::Empty(); + }); + + scheduler->ScheduleAbsolute(rx::TestScheduler::Subscribed, [&subscription, &ys, &res](rx::Scheduler::shared) { + subscription.Set(ys->Subscribe(res)); + return rx::Disposable::Empty(); + }); + + scheduler->ScheduleAbsolute(rx::TestScheduler::Disposed, [&subscription](rx::Scheduler::shared) { + subscription.Dispose(); + return rx::Disposable::Empty(); + }); + + auto connection = std::make_shared<rx::SerialDisposable>(); + scheduler->ScheduleAbsolute(300, [connection, &ys](rx::Scheduler::shared) { + connection->Set(ys->Connect()); + return rx::Disposable::Empty(); + }); + scheduler->ScheduleAbsolute(400, [connection](rx::Scheduler::shared) { + connection->Dispose(); + return rx::Disposable::Empty(); + }); + + connection = std::make_shared<rx::SerialDisposable>(); + scheduler->ScheduleAbsolute(500, [connection, &ys](rx::Scheduler::shared) { + connection->Set(ys->Connect()); + return rx::Disposable::Empty(); + }); + scheduler->ScheduleAbsolute(550, [connection](rx::Scheduler::shared) { + connection->Dispose(); + return rx::Disposable::Empty(); + }); + + connection = std::make_shared<rx::SerialDisposable>(); + scheduler->ScheduleAbsolute(650, [connection, &ys](rx::Scheduler::shared) { + connection->Set(ys->Connect()); + return rx::Disposable::Empty(); + }); + scheduler->ScheduleAbsolute(800, [connection](rx::Scheduler::shared) { + connection->Dispose(); + return rx::Disposable::Empty(); + }); + + scheduler->Start(); + + THEN("the output is empty"){ + std::vector<m::RecordedT> required; + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there were 3 subscription/unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(300, 400), + m::Subscribe(500, 550), + m::Subscribe(650, 800) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + diff --git a/Rx/CPP/test/operators/Where.cpp b/Rx/CPP/test/operators/Where.cpp new file mode 100644 index 0000000..36cee52 --- /dev/null +++ b/Rx/CPP/test/operators/Where.cpp @@ -0,0 +1,389 @@ +#include "cpprx/rx.hpp" +namespace rx=rxcpp; + +#include "catch.hpp" + +bool IsPrime(int x) +{ + if (x < 2) return false; + for (int i = 2; i <= x/2; ++i) + { + if (x % i == 0) + return false; + } + return true; +} + +SCENARIO("where stops on completion", "[where][filter][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + auto xs = scheduler->CreateHotObservable( + []() { + m::RecordedT messages[] = { + m::OnNext(110, 1), + m::OnNext(180, 2), + m::OnNext(230, 3), + m::OnNext(270, 4), + m::OnNext(340, 5), + m::OnNext(380, 6), + m::OnNext(390, 7), + m::OnNext(450, 8), + m::OnNext(470, 9), + m::OnNext(560, 10), + m::OnNext(580, 11), + m::OnCompleted(600), + m::OnNext(610, 12), + m::OnError(620, std::exception()), + m::OnCompleted(630) + }; + return m::ToVector(messages); + }() + ); + + WHEN("filtered to longs that are primes"){ + + auto res = scheduler->Start<long>( + [xs, &invoked]() { + return rx::observable(rx::from(xs) + .where([&invoked](long x) { + invoked++; + return IsPrime(x); + })); + } + ); + + THEN("the output only contains primes"){ + m::RecordedT items[] = { + m::OnNext(230, 3), + m::OnNext(340, 5), + m::OnNext(390, 7), + m::OnNext(580, 11), + m::OnCompleted(600) + }; + auto required = m::ToVector(items); + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(200, 600) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + THEN("where was called until completed"){ + REQUIRE(9 == invoked); + } + } + } +} + +SCENARIO("where stops on disposal", "[where][filter][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + auto xs = scheduler->CreateHotObservable( + []() { + m::RecordedT messages[] = { + m::OnNext(110, 1), + m::OnNext(180, 2), + m::OnNext(230, 3), + m::OnNext(270, 4), + m::OnNext(340, 5), + m::OnNext(380, 6), + m::OnNext(390, 7), + m::OnNext(450, 8), + m::OnNext(470, 9), + m::OnNext(560, 10), + m::OnNext(580, 11), + m::OnCompleted(600) + }; + return m::ToVector(messages); + }() + ); + + WHEN("filtered to longs that are primes"){ + + auto res = scheduler->Start<long>( + [xs, &invoked]() { + return rx::observable(rx::from(xs) + .where([&invoked](long x) { + invoked++; + return IsPrime(x); + })); + }, + 400 + ); + + THEN("the output only contains primes that arrived before disposal"){ + m::RecordedT items[] = { + m::OnNext(230, 3), + m::OnNext(340, 5), + m::OnNext(390, 7) + }; + auto required = m::ToVector(items); + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(200, 400) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + THEN("where was called until disposed"){ + REQUIRE(5 == invoked); + } + } + } +} + +SCENARIO("where stops on error", "[where][filter][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + std::exception ex; + + auto xs = scheduler->CreateHotObservable( + [ex]() { + m::RecordedT messages[] = { + m::OnNext(110, 1), + m::OnNext(180, 2), + m::OnNext(230, 3), + m::OnNext(270, 4), + m::OnNext(340, 5), + m::OnNext(380, 6), + m::OnNext(390, 7), + m::OnNext(450, 8), + m::OnNext(470, 9), + m::OnNext(560, 10), + m::OnNext(580, 11), + m::OnError(600, ex), + m::OnNext(610, 12), + m::OnError(620, std::exception()), + m::OnCompleted(630) + }; + return m::ToVector(messages); + }() + ); + + WHEN("filtered to longs that are primes"){ + + auto res = scheduler->Start<long>( + [xs, &invoked]() { + return rx::observable(rx::from(xs) + .where([&invoked](long x) { + invoked++; + return IsPrime(x); + })); + } + ); + + THEN("the output only contains primes"){ + m::RecordedT items[] = { + m::OnNext(230, 3), + m::OnNext(340, 5), + m::OnNext(390, 7), + m::OnNext(580, 11), + m::OnError(600, ex), + }; + auto required = m::ToVector(items); + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(200, 600) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + THEN("where was called until error"){ + REQUIRE(9 == invoked); + } + } + } +} + +SCENARIO("where stops on throw from predicate", "[where][filter][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + std::exception ex; + + auto xs = scheduler->CreateHotObservable( + []() { + m::RecordedT messages[] = { + m::OnNext(110, 1), + m::OnNext(180, 2), + m::OnNext(230, 3), + m::OnNext(270, 4), + m::OnNext(340, 5), + m::OnNext(380, 6), + m::OnNext(390, 7), + m::OnNext(450, 8), + m::OnNext(470, 9), + m::OnNext(560, 10), + m::OnNext(580, 11), + m::OnCompleted(600), + m::OnNext(610, 12), + m::OnError(620, std::exception()), + m::OnCompleted(630) + }; + return m::ToVector(messages); + }() + ); + + WHEN("filtered to longs that are primes"){ + + auto res = scheduler->Start<long>( + [ex, xs, &invoked]() { + return rx::observable(rx::from(xs) + .where([ex, &invoked](long x) { + invoked++; + if (x > 5) { + throw ex; + } + return IsPrime(x); + })); + } + ); + + THEN("the output only contains primes"){ + m::RecordedT items[] = { + m::OnNext(230, 3), + m::OnNext(340, 5), + m::OnError(380, ex) + }; + auto required = m::ToVector(items); + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(200, 380) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + THEN("where was called until error"){ + REQUIRE(4 == invoked); + } + } + } +} + +SCENARIO("where stops on dispose from predicate", "[where][filter][operators]"){ + GIVEN("a test hot observable of longs"){ + auto scheduler = std::make_shared<rx::TestScheduler>(); + typedef rx::TestScheduler::Messages<long> m; + + long invoked = 0; + + auto xs = scheduler->CreateHotObservable( + []() { + m::RecordedT messages[] = { + m::OnNext(110, 1), + m::OnNext(180, 2), + m::OnNext(230, 3), + m::OnNext(270, 4), + m::OnNext(340, 5), + m::OnNext(380, 6), + m::OnNext(390, 7), + m::OnNext(450, 8), + m::OnNext(470, 9), + m::OnNext(560, 10), + m::OnNext(580, 11), + m::OnCompleted(600), + m::OnNext(610, 12), + m::OnError(620, std::exception()), + m::OnCompleted(630) + }; + return m::ToVector(messages); + }() + ); + + auto res = scheduler->CreateObserver<long>(); + + rx::SerialDisposable d; + std::shared_ptr<rx::Observable<long>> ys; + + WHEN("filtered to longs that are primes"){ + + scheduler->ScheduleAbsolute(rx::TestScheduler::Created, + [&invoked, &d, &ys, &xs](rx::Scheduler::shared) { + ys = rx::observable(rx::from(xs) + .where([&invoked, &d](long x) { + invoked++; + if (x == 8) + d.Dispose(); + return IsPrime(x); + })); + return rx::Disposable::Empty(); + }); + + scheduler->ScheduleAbsolute(rx::TestScheduler::Subscribed, [&d, &ys, &res](rx::Scheduler::shared) { + d.Set(ys->Subscribe(res)); + return rx::Disposable::Empty(); + }); + + scheduler->ScheduleAbsolute(rx::TestScheduler::Disposed, [&d](rx::Scheduler::shared) { + d.Dispose(); + return rx::Disposable::Empty(); + }); + + scheduler->Start(); + + THEN("the output only contains primes"){ + m::RecordedT items[] = { + m::OnNext(230, 3), + m::OnNext(340, 5), + m::OnNext(390, 7) + }; + auto required = m::ToVector(items); + auto actual = res->Messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + rx::Subscription items[] = { + m::Subscribe(200, 450) + }; + auto required = m::ToVector(items); + auto actual = xs->Subscriptions(); + REQUIRE(required == actual); + } + + THEN("where was called until disposed"){ + REQUIRE(6 == invoked); + } + } + } +} + diff --git a/Rx/CPP/test/test.cpp b/Rx/CPP/test/test.cpp new file mode 100644 index 0000000..0c7c351 --- /dev/null +++ b/Rx/CPP/test/test.cpp @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp" diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt index 866beec..4080292 100644 --- a/projects/CMake/CMakeLists.txt +++ b/projects/CMake/CMakeLists.txt @@ -17,10 +17,28 @@ get_filename_component(RXCPP_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PATH) get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH) set(TEST_DIR ${RXCPP_DIR}/Rx/CPP/test) -include_directories(${RXCPP_DIR}/Ix/CPP/src ${RXCPP_DIR}/Rx/CPP/src) +include_directories(${RXCPP_DIR}/ext/catch/include ${RXCPP_DIR}/Ix/CPP/src ${RXCPP_DIR}/Rx/CPP/src) + +# define the sources of the self test +set(TEST_SOURCES + ${TEST_DIR}/test.cpp + ${TEST_DIR}/operators/Publish.cpp + ${TEST_DIR}/operators/Where.cpp +) +add_executable(rxcpp_test ${TEST_SOURCES}) # define the sources of testbench set(TESTBENCH_SOURCES ${RXCPP_DIR}/Rx/CPP/testbench/testbench.cpp ) add_executable(testbench ${TESTBENCH_SOURCES}) + +# configure unit tests via CTest +enable_testing() +add_test(NAME RunTests COMMAND rxcpp_test) + +add_test(NAME ListTests COMMAND rxcpp_test --list-tests) +set_tests_properties(ListTests PROPERTIES PASS_REGULAR_EXPRESSION "[0-9]+ test cases") + +add_test(NAME ListTags COMMAND rxcpp_test --list-tags) +set_tests_properties(ListTags PROPERTIES PASS_REGULAR_EXPRESSION "[0-9]+ tags") |