summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--Rx/CPP/test/operators/Publish.cpp111
-rw-r--r--Rx/CPP/test/operators/Where.cpp389
-rw-r--r--Rx/CPP/test/test.cpp2
-rw-r--r--projects/CMake/CMakeLists.txt20
5 files changed, 522 insertions, 2 deletions
diff --git a/.gitignore b/.gitignore
index ca7abf5..7156fda 100644
--- a/.gitignore
+++ b/.gitignore
@@ -106,4 +106,4 @@ Desktop.ini
############
projects/*
-!projects/CMake/CMakelists.txt
+!projects/CMake/CMakeLists.txt
diff --git a/Rx/CPP/test/operators/Publish.cpp b/Rx/CPP/test/operators/Publish.cpp
new file mode 100644
index 0000000..26e6e5b
--- /dev/null
+++ b/Rx/CPP/test/operators/Publish.cpp
@@ -0,0 +1,111 @@
+#include "cpprx/rx.hpp"
+namespace rx=rxcpp;
+
+#include "catch.hpp"
+
+SCENARIO("publish_last", "[publish_last][publish][multicast][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ auto xs = scheduler->CreateHotObservable(
+ []() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 7),
+ m::OnNext(220, 3),
+ m::OnNext(280, 4),
+ m::OnNext(290, 1),
+ m::OnNext(340, 8),
+ m::OnNext(360, 5),
+ m::OnNext(370, 6),
+ m::OnNext(390, 7),
+ m::OnNext(410, 13),
+ m::OnNext(430, 2),
+ m::OnNext(450, 9),
+ m::OnNext(520, 11),
+ m::OnNext(560, 20),
+ m::OnCompleted(600)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ auto res = scheduler->CreateObserver<long>();
+
+ rx::SerialDisposable subscription;
+ std::shared_ptr<rx::ConnectableObservable<long>> ys;
+
+ WHEN("subscribed and then connected"){
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Created,
+ [&invoked, &ys, &xs](rx::Scheduler::shared) {
+ ys = rx::observable(rx::from(xs)
+ .publish_last());
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Subscribed, [&subscription, &ys, &res](rx::Scheduler::shared) {
+ subscription.Set(ys->Subscribe(res));
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Disposed, [&subscription](rx::Scheduler::shared) {
+ subscription.Dispose();
+ return rx::Disposable::Empty();
+ });
+
+ auto connection = std::make_shared<rx::SerialDisposable>();
+ scheduler->ScheduleAbsolute(300, [connection, &ys](rx::Scheduler::shared) {
+ connection->Set(ys->Connect());
+ return rx::Disposable::Empty();
+ });
+ scheduler->ScheduleAbsolute(400, [connection](rx::Scheduler::shared) {
+ connection->Dispose();
+ return rx::Disposable::Empty();
+ });
+
+ connection = std::make_shared<rx::SerialDisposable>();
+ scheduler->ScheduleAbsolute(500, [connection, &ys](rx::Scheduler::shared) {
+ connection->Set(ys->Connect());
+ return rx::Disposable::Empty();
+ });
+ scheduler->ScheduleAbsolute(550, [connection](rx::Scheduler::shared) {
+ connection->Dispose();
+ return rx::Disposable::Empty();
+ });
+
+ connection = std::make_shared<rx::SerialDisposable>();
+ scheduler->ScheduleAbsolute(650, [connection, &ys](rx::Scheduler::shared) {
+ connection->Set(ys->Connect());
+ return rx::Disposable::Empty();
+ });
+ scheduler->ScheduleAbsolute(800, [connection](rx::Scheduler::shared) {
+ connection->Dispose();
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->Start();
+
+ THEN("the output is empty"){
+ std::vector<m::RecordedT> required;
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there were 3 subscription/unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(300, 400),
+ m::Subscribe(500, 550),
+ m::Subscribe(650, 800)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
diff --git a/Rx/CPP/test/operators/Where.cpp b/Rx/CPP/test/operators/Where.cpp
new file mode 100644
index 0000000..36cee52
--- /dev/null
+++ b/Rx/CPP/test/operators/Where.cpp
@@ -0,0 +1,389 @@
+#include "cpprx/rx.hpp"
+namespace rx=rxcpp;
+
+#include "catch.hpp"
+
+bool IsPrime(int x)
+{
+ if (x < 2) return false;
+ for (int i = 2; i <= x/2; ++i)
+ {
+ if (x % i == 0)
+ return false;
+ }
+ return true;
+}
+
+SCENARIO("where stops on completion", "[where][filter][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ auto xs = scheduler->CreateHotObservable(
+ []() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 1),
+ m::OnNext(180, 2),
+ m::OnNext(230, 3),
+ m::OnNext(270, 4),
+ m::OnNext(340, 5),
+ m::OnNext(380, 6),
+ m::OnNext(390, 7),
+ m::OnNext(450, 8),
+ m::OnNext(470, 9),
+ m::OnNext(560, 10),
+ m::OnNext(580, 11),
+ m::OnCompleted(600),
+ m::OnNext(610, 12),
+ m::OnError(620, std::exception()),
+ m::OnCompleted(630)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ WHEN("filtered to longs that are primes"){
+
+ auto res = scheduler->Start<long>(
+ [xs, &invoked]() {
+ return rx::observable(rx::from(xs)
+ .where([&invoked](long x) {
+ invoked++;
+ return IsPrime(x);
+ }));
+ }
+ );
+
+ THEN("the output only contains primes"){
+ m::RecordedT items[] = {
+ m::OnNext(230, 3),
+ m::OnNext(340, 5),
+ m::OnNext(390, 7),
+ m::OnNext(580, 11),
+ m::OnCompleted(600)
+ };
+ auto required = m::ToVector(items);
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(200, 600)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("where was called until completed"){
+ REQUIRE(9 == invoked);
+ }
+ }
+ }
+}
+
+SCENARIO("where stops on disposal", "[where][filter][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ auto xs = scheduler->CreateHotObservable(
+ []() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 1),
+ m::OnNext(180, 2),
+ m::OnNext(230, 3),
+ m::OnNext(270, 4),
+ m::OnNext(340, 5),
+ m::OnNext(380, 6),
+ m::OnNext(390, 7),
+ m::OnNext(450, 8),
+ m::OnNext(470, 9),
+ m::OnNext(560, 10),
+ m::OnNext(580, 11),
+ m::OnCompleted(600)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ WHEN("filtered to longs that are primes"){
+
+ auto res = scheduler->Start<long>(
+ [xs, &invoked]() {
+ return rx::observable(rx::from(xs)
+ .where([&invoked](long x) {
+ invoked++;
+ return IsPrime(x);
+ }));
+ },
+ 400
+ );
+
+ THEN("the output only contains primes that arrived before disposal"){
+ m::RecordedT items[] = {
+ m::OnNext(230, 3),
+ m::OnNext(340, 5),
+ m::OnNext(390, 7)
+ };
+ auto required = m::ToVector(items);
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(200, 400)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("where was called until disposed"){
+ REQUIRE(5 == invoked);
+ }
+ }
+ }
+}
+
+SCENARIO("where stops on error", "[where][filter][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ std::exception ex;
+
+ auto xs = scheduler->CreateHotObservable(
+ [ex]() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 1),
+ m::OnNext(180, 2),
+ m::OnNext(230, 3),
+ m::OnNext(270, 4),
+ m::OnNext(340, 5),
+ m::OnNext(380, 6),
+ m::OnNext(390, 7),
+ m::OnNext(450, 8),
+ m::OnNext(470, 9),
+ m::OnNext(560, 10),
+ m::OnNext(580, 11),
+ m::OnError(600, ex),
+ m::OnNext(610, 12),
+ m::OnError(620, std::exception()),
+ m::OnCompleted(630)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ WHEN("filtered to longs that are primes"){
+
+ auto res = scheduler->Start<long>(
+ [xs, &invoked]() {
+ return rx::observable(rx::from(xs)
+ .where([&invoked](long x) {
+ invoked++;
+ return IsPrime(x);
+ }));
+ }
+ );
+
+ THEN("the output only contains primes"){
+ m::RecordedT items[] = {
+ m::OnNext(230, 3),
+ m::OnNext(340, 5),
+ m::OnNext(390, 7),
+ m::OnNext(580, 11),
+ m::OnError(600, ex),
+ };
+ auto required = m::ToVector(items);
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(200, 600)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("where was called until error"){
+ REQUIRE(9 == invoked);
+ }
+ }
+ }
+}
+
+SCENARIO("where stops on throw from predicate", "[where][filter][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ std::exception ex;
+
+ auto xs = scheduler->CreateHotObservable(
+ []() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 1),
+ m::OnNext(180, 2),
+ m::OnNext(230, 3),
+ m::OnNext(270, 4),
+ m::OnNext(340, 5),
+ m::OnNext(380, 6),
+ m::OnNext(390, 7),
+ m::OnNext(450, 8),
+ m::OnNext(470, 9),
+ m::OnNext(560, 10),
+ m::OnNext(580, 11),
+ m::OnCompleted(600),
+ m::OnNext(610, 12),
+ m::OnError(620, std::exception()),
+ m::OnCompleted(630)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ WHEN("filtered to longs that are primes"){
+
+ auto res = scheduler->Start<long>(
+ [ex, xs, &invoked]() {
+ return rx::observable(rx::from(xs)
+ .where([ex, &invoked](long x) {
+ invoked++;
+ if (x > 5) {
+ throw ex;
+ }
+ return IsPrime(x);
+ }));
+ }
+ );
+
+ THEN("the output only contains primes"){
+ m::RecordedT items[] = {
+ m::OnNext(230, 3),
+ m::OnNext(340, 5),
+ m::OnError(380, ex)
+ };
+ auto required = m::ToVector(items);
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(200, 380)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("where was called until error"){
+ REQUIRE(4 == invoked);
+ }
+ }
+ }
+}
+
+SCENARIO("where stops on dispose from predicate", "[where][filter][operators]"){
+ GIVEN("a test hot observable of longs"){
+ auto scheduler = std::make_shared<rx::TestScheduler>();
+ typedef rx::TestScheduler::Messages<long> m;
+
+ long invoked = 0;
+
+ auto xs = scheduler->CreateHotObservable(
+ []() {
+ m::RecordedT messages[] = {
+ m::OnNext(110, 1),
+ m::OnNext(180, 2),
+ m::OnNext(230, 3),
+ m::OnNext(270, 4),
+ m::OnNext(340, 5),
+ m::OnNext(380, 6),
+ m::OnNext(390, 7),
+ m::OnNext(450, 8),
+ m::OnNext(470, 9),
+ m::OnNext(560, 10),
+ m::OnNext(580, 11),
+ m::OnCompleted(600),
+ m::OnNext(610, 12),
+ m::OnError(620, std::exception()),
+ m::OnCompleted(630)
+ };
+ return m::ToVector(messages);
+ }()
+ );
+
+ auto res = scheduler->CreateObserver<long>();
+
+ rx::SerialDisposable d;
+ std::shared_ptr<rx::Observable<long>> ys;
+
+ WHEN("filtered to longs that are primes"){
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Created,
+ [&invoked, &d, &ys, &xs](rx::Scheduler::shared) {
+ ys = rx::observable(rx::from(xs)
+ .where([&invoked, &d](long x) {
+ invoked++;
+ if (x == 8)
+ d.Dispose();
+ return IsPrime(x);
+ }));
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Subscribed, [&d, &ys, &res](rx::Scheduler::shared) {
+ d.Set(ys->Subscribe(res));
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->ScheduleAbsolute(rx::TestScheduler::Disposed, [&d](rx::Scheduler::shared) {
+ d.Dispose();
+ return rx::Disposable::Empty();
+ });
+
+ scheduler->Start();
+
+ THEN("the output only contains primes"){
+ m::RecordedT items[] = {
+ m::OnNext(230, 3),
+ m::OnNext(340, 5),
+ m::OnNext(390, 7)
+ };
+ auto required = m::ToVector(items);
+ auto actual = res->Messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ rx::Subscription items[] = {
+ m::Subscribe(200, 450)
+ };
+ auto required = m::ToVector(items);
+ auto actual = xs->Subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ THEN("where was called until disposed"){
+ REQUIRE(6 == invoked);
+ }
+ }
+ }
+}
+
diff --git a/Rx/CPP/test/test.cpp b/Rx/CPP/test/test.cpp
new file mode 100644
index 0000000..0c7c351
--- /dev/null
+++ b/Rx/CPP/test/test.cpp
@@ -0,0 +1,2 @@
+#define CATCH_CONFIG_MAIN
+#include "catch.hpp"
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 866beec..4080292 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -17,10 +17,28 @@ get_filename_component(RXCPP_DIR "${CMAKE_CURRENT_SOURCE_DIR}" PATH)
get_filename_component(RXCPP_DIR "${RXCPP_DIR}" PATH)
set(TEST_DIR ${RXCPP_DIR}/Rx/CPP/test)
-include_directories(${RXCPP_DIR}/Ix/CPP/src ${RXCPP_DIR}/Rx/CPP/src)
+include_directories(${RXCPP_DIR}/ext/catch/include ${RXCPP_DIR}/Ix/CPP/src ${RXCPP_DIR}/Rx/CPP/src)
+
+# define the sources of the self test
+set(TEST_SOURCES
+ ${TEST_DIR}/test.cpp
+ ${TEST_DIR}/operators/Publish.cpp
+ ${TEST_DIR}/operators/Where.cpp
+)
+add_executable(rxcpp_test ${TEST_SOURCES})
# define the sources of testbench
set(TESTBENCH_SOURCES
${RXCPP_DIR}/Rx/CPP/testbench/testbench.cpp
)
add_executable(testbench ${TESTBENCH_SOURCES})
+
+# configure unit tests via CTest
+enable_testing()
+add_test(NAME RunTests COMMAND rxcpp_test)
+
+add_test(NAME ListTests COMMAND rxcpp_test --list-tests)
+set_tests_properties(ListTests PROPERTIES PASS_REGULAR_EXPRESSION "[0-9]+ test cases")
+
+add_test(NAME ListTags COMMAND rxcpp_test --list-tags)
+set_tests_properties(ListTags PROPERTIES PASS_REGULAR_EXPRESSION "[0-9]+ tags")