summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Ix/CPP/src/IxCpp.vcxproj65
-rw-r--r--Ix/CPP/src/IxCpp.vcxproj.filters55
-rw-r--r--Rx/CPP/src/RxCpp.vcxproj61
-rw-r--r--Rx/CPP/src/RxCpp.vcxproj.filters43
-rw-r--r--Rx/CPP/src/cpprx/rx-base.hpp37
-rw-r--r--Rx/CPP/src/cpprx/rx-includes.hpp27
-rw-r--r--Rx/CPP/src/cpprx/rx-operators.hpp194
-rw-r--r--Rx/CPP/src/cpprx/rx-scheduler.hpp4
-rw-r--r--Rx/CPP/src/cpprx/rx-windows.hpp165
-rw-r--r--Rx/CPP/src/cpprx/rx.hpp188
10 files changed, 722 insertions, 117 deletions
diff --git a/Ix/CPP/src/IxCpp.vcxproj b/Ix/CPP/src/IxCpp.vcxproj
new file mode 100644
index 0000000..9367080
--- /dev/null
+++ b/Ix/CPP/src/IxCpp.vcxproj
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{65483A0D-1D72-47CC-B147-27415FFC1FC3}</ProjectGuid>
+ <Keyword>MakeFileProj</Keyword>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Makefile</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Makefile</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <NMakePreprocessorDefinitions>WIN32;_DEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <NMakePreprocessorDefinitions>WIN32;NDEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions>
+ </PropertyGroup>
+ <ItemDefinitionGroup>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <Text Include="readme.txt" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="cpplinq\linq.hpp" />
+ <ClInclude Include="cpplinq\linq_cursor.hpp" />
+ <ClInclude Include="cpplinq\linq_groupby.hpp" />
+ <ClInclude Include="cpplinq\linq_iterators.hpp" />
+ <ClInclude Include="cpplinq\linq_last.hpp" />
+ <ClInclude Include="cpplinq\linq_select.hpp" />
+ <ClInclude Include="cpplinq\linq_selectmany.hpp" />
+ <ClInclude Include="cpplinq\linq_skip.hpp" />
+ <ClInclude Include="cpplinq\linq_take.hpp" />
+ <ClInclude Include="cpplinq\linq_where.hpp" />
+ <ClInclude Include="cpplinq\util.hpp" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project> \ No newline at end of file
diff --git a/Ix/CPP/src/IxCpp.vcxproj.filters b/Ix/CPP/src/IxCpp.vcxproj.filters
new file mode 100644
index 0000000..dd7d5bc
--- /dev/null
+++ b/Ix/CPP/src/IxCpp.vcxproj.filters
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <Filter Include="Source Files">
+ <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
+ <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
+ </Filter>
+ <Filter Include="Header Files">
+ <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
+ <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions>
+ </Filter>
+ <Filter Include="Resource Files">
+ <UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
+ <Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
+ </Filter>
+ </ItemGroup>
+ <ItemGroup>
+ <Text Include="readme.txt" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="cpplinq\linq.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_cursor.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_groupby.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_iterators.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_last.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_select.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_selectmany.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_skip.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_take.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\linq_where.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpplinq\util.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ </ItemGroup>
+</Project> \ No newline at end of file
diff --git a/Rx/CPP/src/RxCpp.vcxproj b/Rx/CPP/src/RxCpp.vcxproj
new file mode 100644
index 0000000..7a7398f
--- /dev/null
+++ b/Rx/CPP/src/RxCpp.vcxproj
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{A912FB36-33A7-4DDA-A7D6-D17F9495A623}</ProjectGuid>
+ <Keyword>MakeFileProj</Keyword>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Makefile</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Makefile</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <NMakePreprocessorDefinitions>WIN32;_DEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <NMakePreprocessorDefinitions>WIN32;NDEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions>
+ </PropertyGroup>
+ <ItemDefinitionGroup>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <Text Include="readme.txt" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="cpprx\rx-base.hpp" />
+ <ClInclude Include="cpprx\rx-includes.hpp" />
+ <ClInclude Include="cpprx\rx-operators.hpp" />
+ <ClInclude Include="cpprx\rx-scheduler.hpp" />
+ <ClInclude Include="cpprx\rx-util.hpp" />
+ <ClInclude Include="cpprx\rx-windows.hpp" />
+ <ClInclude Include="cpprx\rx.hpp" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project> \ No newline at end of file
diff --git a/Rx/CPP/src/RxCpp.vcxproj.filters b/Rx/CPP/src/RxCpp.vcxproj.filters
new file mode 100644
index 0000000..2ce3d34
--- /dev/null
+++ b/Rx/CPP/src/RxCpp.vcxproj.filters
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <Filter Include="Source Files">
+ <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
+ <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
+ </Filter>
+ <Filter Include="Header Files">
+ <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
+ <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions>
+ </Filter>
+ <Filter Include="Resource Files">
+ <UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
+ <Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
+ </Filter>
+ </ItemGroup>
+ <ItemGroup>
+ <Text Include="readme.txt" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="cpprx\rx.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-base.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-includes.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-operators.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-scheduler.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-util.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="cpprx\rx-windows.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ </ItemGroup>
+</Project> \ No newline at end of file
diff --git a/Rx/CPP/src/cpprx/rx-base.hpp b/Rx/CPP/src/cpprx/rx-base.hpp
index e5af503..6182d0c 100644
--- a/Rx/CPP/src/cpprx/rx-base.hpp
+++ b/Rx/CPP/src/cpprx/rx-base.hpp
@@ -261,25 +261,13 @@ namespace rxcpp
LocalScheduler(const LocalScheduler&);
public:
- static void DoNoThrow(Work& work, Scheduler::shared scheduler) throw()
+ static void Do(Work& work, Scheduler::shared scheduler) throw()
{
if (work)
{
work(std::move(scheduler));
}
}
- static void Do(Work& work, Scheduler::shared scheduler)
- {
- try {
- DoNoThrow(work, std::move(scheduler));
- } catch (const std::exception& ) {
- // work must catch all expected exceptions
- std::unexpected();
- } catch (...) {
- // work must catch all expected exceptions
- std::unexpected();
- }
- }
public:
LocalScheduler()
@@ -305,5 +293,28 @@ namespace rxcpp
}
};
+ template<class T>
+ T item(const std::shared_ptr<Observable<T>>&);
+
+ template<class K, class T>
+ T item(const std::shared_ptr<GroupedObservable<K,T>>&);
+
+ template<class Observable>
+ struct is_observable {static const bool value = false;};
+
+ template<class T>
+ struct is_observable<std::shared_ptr<Observable<T>>> {static const bool value = true;};
+
+ template<class K, class T>
+ struct is_observable<std::shared_ptr<GroupedObservable<K, T>>> {static const bool value = true;};
+
+ template<class Observable>
+ struct observable_item;
+
+ template<class T>
+ struct observable_item<std::shared_ptr<Observable<T>>> {typedef T type;};
+
+ template<class K, class T>
+ struct observable_item<std::shared_ptr<GroupedObservable<K, T>>> {typedef T type;};
}
#endif
diff --git a/Rx/CPP/src/cpprx/rx-includes.hpp b/Rx/CPP/src/cpprx/rx-includes.hpp
index d5be690..e0574aa 100644
--- a/Rx/CPP/src/cpprx/rx-includes.hpp
+++ b/Rx/CPP/src/cpprx/rx-includes.hpp
@@ -26,6 +26,33 @@
#include <chrono>
#include <condition_variable>
+// some configuration macros
+#if defined(_MSC_VER)
+
+#if _MSC_VER > 1600
+#define RXCPP_USE_RVALUEREF 1
+#define RXCPP_USE_VARIADIC_TEMPLATES 0
+#endif
+
+#if _CPPRTTI
+#define RXCPP_USE_RTTI 1
+#endif
+
+#elif defined(__clang__)
+
+#if __has_feature(cxx_rvalue_references)
+#define RXCPP_USE_RVALUEREF 1
+#endif
+#if __has_feature(cxx_rtti)
+#define RXCPP_USE_RTTI 1
+#endif
+#if __has_feature(cxx_variadic_templates)
+#define RXCPP_USE_VARIADIC_TEMPLATES 1
+#endif
+
+#endif
+
+
#include "rx-util.hpp"
#include "rx-base.hpp"
#include "rx-scheduler.hpp"
diff --git a/Rx/CPP/src/cpprx/rx-operators.hpp b/Rx/CPP/src/cpprx/rx-operators.hpp
index cfd100d..29b489f 100644
--- a/Rx/CPP/src/cpprx/rx-operators.hpp
+++ b/Rx/CPP/src/cpprx/rx-operators.hpp
@@ -120,10 +120,12 @@ namespace rxcpp
public std::enable_shared_from_this<Subject>
{
protected:
+ std::mutex lock;
std::vector<std::shared_ptr<Observer<T>>> observers;
void RemoveObserver(std::shared_ptr<Observer<T>> toRemove)
{
+ std::unique_lock<decltype(lock)> guard(lock);
auto it = std::find(begin(observers), end(observers), toRemove);
if (it != end(observers))
*it = nullptr;
@@ -142,14 +144,17 @@ namespace rxcpp
}
});
- for(auto& o : observers)
{
- if (!o){
- o = std::move(observer);
- return d;
+ std::unique_lock<decltype(lock)> guard(lock);
+ for(auto& o : observers)
+ {
+ if (!o){
+ o = std::move(observer);
+ return d;
+ }
}
+ observers.push_back(std::move(observer));
}
- observers.push_back(std::move(observer));
return d;
}
};
@@ -166,37 +171,45 @@ namespace rxcpp
virtual void OnNext(const T& element)
{
- for(auto& o : Base::observers)
+ std::unique_lock<decltype(Base::lock)> guard(Base::lock);
+ auto local = Base::observers;
+ guard.unlock();
+ for(auto& o : local)
{
try
{
- if (o)
+ if (o) {
o->OnNext(element);
+ }
}
catch (...)
{
- auto o_ = std::move(o);
- o_->OnError(std::current_exception());
+ o->OnError(std::current_exception());
+ Base::RemoveObserver(o);
}
}
}
virtual void OnCompleted()
{
- for(auto& o : Base::observers)
+ std::unique_lock<decltype(Base::lock)> guard(Base::lock);
+ auto local = std::move(Base::observers);
+ guard.unlock();
+ for(auto& o : local)
{
if (o) {
o->OnCompleted();
- o = nullptr;
}
}
}
virtual void OnError(const std::exception_ptr& error)
{
- for(auto& o : Base::observers)
+ std::unique_lock<decltype(Base::lock)> guard(Base::lock);
+ auto local = std::move(Base::observers);
+ guard.unlock();
+ for(auto& o : local)
{
if (o) {
o->OnError(error);
- o = nullptr;
}
}
}
@@ -317,15 +330,16 @@ namespace rxcpp
//
// imperative functions
- template <class T>
+ template <class S>
Disposable Subscribe(
- const std::shared_ptr<Observable<T>>& source,
- typename util::identity<std::function<void(const T&)>>::type onNext,
+ const S& source,
+ typename util::identity<std::function<void(const typename observable_item<S>::type&)>>::type onNext,
std::function<void()> onCompleted = nullptr,
std::function<void(const std::exception_ptr&)> onError = nullptr
)
{
- auto observer = CreateObserver<T>(std::move(onNext), std::move(onCompleted), std::move(onError));
+ auto observer = CreateObserver<typename observable_item<S>::type>(
+ std::move(onNext), std::move(onCompleted), std::move(onError));
return source->Subscribe(observer);
}
@@ -376,7 +390,7 @@ namespace rxcpp
return CreateObservable<U>(
[=](std::shared_ptr<Observer<U>> observer)
{
- return Subscribe<T>(
+ return Subscribe(
source,
// on next
[=](const T& element)
@@ -396,6 +410,146 @@ namespace rxcpp
});
});
}
+
+ template<class T>
+ struct reveal_type {private: reveal_type();};
+
+ template <class T, class CS, class RS>
+ auto SelectMany(
+ const std::shared_ptr<Observable<T>>& source,
+ CS collectionSelector,
+ RS resultSelector)
+ -> const std::shared_ptr<Observable<
+ typename std::result_of<RS(
+ const typename observable_item<
+ typename std::result_of<CS(const T&)>::type>::type&)>::type>>
+ {
+ typedef typename std::result_of<CS(const T&)>::type C;
+ typedef typename observable_item<C>::type CI;
+ typedef typename std::result_of<RS(const CI&)>::type U;
+
+ return CreateObservable<U>(
+ [=](std::shared_ptr<Observer<U>> observer)
+ -> Disposable
+ {
+ struct State {
+ size_t subscribed;
+ bool cancel;
+ std::mutex lock;
+ };
+ auto state = std::make_shared<State>();
+ state->cancel = false;
+ state->subscribed = 0;
+
+ ComposableDisposable cd;
+
+ cd.Add(Disposable([=]{
+ std::unique_lock<std::mutex> guard(state->lock);
+ state->cancel = true; })
+ );
+
+ ++state->subscribed;
+ cd.Add(Subscribe(
+ source,
+ // on next
+ [=](const T& element)
+ {
+ bool cancel = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ cancel = state->cancel;
+ if (!cancel) ++state->subscribed;
+ }
+ if (!cancel) {
+ try {
+ auto collection = collectionSelector(element);
+ cd.Add(Subscribe(
+ collection,
+ // on next
+ [=](const CI& element)
+ {
+ bool cancel = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ cancel = state->cancel;
+ }
+ try {
+ if (!cancel) {
+ auto result = resultSelector(element);
+ observer->OnNext(std::move(result));
+ }
+ } catch (...) {
+ observer->OnError(std::current_exception());
+ cd.Dispose();
+ }
+ },
+ // on completed
+ [=]
+ {
+ bool cancel = false;
+ bool finished = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ finished = (--state->subscribed) == 0;
+ cancel = state->cancel;
+ }
+ if (!cancel && finished)
+ observer->OnCompleted();
+ },
+ // on error
+ [=](const std::exception_ptr& error)
+ {
+ bool cancel = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ --state->subscribed;
+ cancel = state->cancel;
+ }
+ if (!cancel)
+ observer->OnError(error);
+ cd.Dispose();
+ }));
+ } catch (...) {
+ bool cancel = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ cancel = state->cancel;
+ }
+ if (!cancel) {
+ observer->OnError(std::current_exception());
+ }
+ cd.Dispose();
+ }
+ }
+ },
+ // on completed
+ [=]
+ {
+ bool cancel = false;
+ bool finished = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ finished = (--state->subscribed) == 0;
+ cancel = state->cancel;
+ }
+ if (!cancel && finished)
+ observer->OnCompleted();
+ },
+ // on error
+ [=](const std::exception_ptr& error)
+ {
+ bool cancel = false;
+ {
+ std::unique_lock<std::mutex> guard(state->lock);
+ --state->subscribed;
+ cancel = state->cancel;
+ }
+ if (!cancel)
+ observer->OnError(error);
+ }));
+ return cd;
+ });
+ }
template <class T, class P>
const std::shared_ptr<Observable<T>> Where(
@@ -577,9 +731,8 @@ namespace rxcpp
std::shared_ptr<Observable<T>> Delay(
const std::shared_ptr<Observable<T>>& source,
Scheduler::clock::duration due,
- Scheduler::shared scheduler = nullptr)
+ Scheduler::shared scheduler)
{
- if (!scheduler) {scheduler = std::make_shared<CurrentThreadScheduler>();}
return CreateObservable<T>(
[=](std::shared_ptr<Observer<T>> observer)
-> Disposable
@@ -685,6 +838,7 @@ namespace rxcpp
-> Disposable
{
struct State {
+ State() : last(), hasValue(false) {}
T last; bool hasValue;
};
diff --git a/Rx/CPP/src/cpprx/rx-scheduler.hpp b/Rx/CPP/src/cpprx/rx-scheduler.hpp
index 5fcef15..037e41e 100644
--- a/Rx/CPP/src/cpprx/rx-scheduler.hpp
+++ b/Rx/CPP/src/cpprx/rx-scheduler.hpp
@@ -103,6 +103,7 @@ namespace rxcpp
virtual Disposable Schedule(clock::time_point dueTime, Work work)
{
auto ct = std::make_shared<CurrentThreadScheduler>();
+ std::this_thread::sleep_until(dueTime);
Do(work, ct);
return Disposable::Empty();
}
@@ -183,7 +184,8 @@ namespace rxcpp
if (++trampoline == 1)
{
auto local = shared_from_this();
- result.set(factory([local]{local->Run();}));
+ result.set(factory([local]{
+ local->Run();}));
// trampoline lifetime is now owned by the thread
unwindTrampoline.dismiss();
}
diff --git a/Rx/CPP/src/cpprx/rx-windows.hpp b/Rx/CPP/src/cpprx/rx-windows.hpp
index 72e636f..31b932b 100644
--- a/Rx/CPP/src/cpprx/rx-windows.hpp
+++ b/Rx/CPP/src/cpprx/rx-windows.hpp
@@ -93,78 +93,181 @@ namespace rxcpp { namespace win32 {
class WindowScheduler : public rxcpp::LocalScheduler
{
- HWND hwnd;
- Scheduler::shared ct;
+
+ struct compare_work
+ {
+ template <class T>
+ bool operator()(const T& work1, const T& work2) const {
+ return work1.first > work2.first;
+ }
+ };
+
+ struct Queue;
+
typedef std::pair<Scheduler::shared, Work> Item;
+ typedef std::priority_queue<
+ std::pair<clock::time_point, std::shared_ptr<Item>>,
+ std::vector<std::pair<clock::time_point, std::shared_ptr<Item>>>,
+ compare_work
+ > ScheduledWork;
+
+ struct Queue
+ {
+ Queue() : exit(false), window(NULL) {}
+ bool exit;
+ HWND window;
+ ScheduledWork scheduledWork;
+ mutable std::mutex lock;
+ };
+
struct WindowClass
{
+ std::shared_ptr<Queue> queue;
+
static const wchar_t* const className(){ return L"rxcpp::win32::WindowScheduler::WindowClass"; }
WindowClass()
{
- WNDCLASS wndclass = {};
+ }
+
+ static void Create(const std::shared_ptr<Queue>& queue)
+ {
+ WNDCLASSW wndclass = {};
wndclass.style = 0;
wndclass.lpfnWndProc = &WndProc;
- wndclass.cbClsExtra;
wndclass.cbWndExtra = 0;
wndclass.hInstance = NULL;
wndclass.lpszClassName = className();
- if (!RegisterClass(&wndclass))
+ if (!RegisterClassW(&wndclass))
throw std::exception("failed to register windows class");
+ std::unique_ptr<WindowClass> that(new WindowClass);
+ that->queue = queue;
+
+ queue->window = CreateWindowExW(0, WindowClass::className(), L"MessageOnlyWindow", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, 0, reinterpret_cast<LPVOID>(that.get()));
+ if (!queue->window)
+ throw std::exception("create window failed");
+
+ that.release();
}
- HWND CreateWindow_()
- {
- return CreateWindowEx(0, WindowClass::className(), L"MessageOnlyWindow", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, 0, 0);
- }
+
static const int WM_USER_DISPATCH = WM_USER + 1;
static LRESULT CALLBACK WndProc(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)
{
+ static WindowClass* windowClass;
switch (message)
{
- // TODO: shatter attack surface. should validate the message, e.g. using a handle table.
+ case WM_NCCREATE:
+ {
+ windowClass = reinterpret_cast<WindowClass*>(reinterpret_cast<LPCREATESTRUCT>(lParam)->lpCreateParams);
+ }
+ break;
+ case WM_NCDESTROY:
+ {
+ delete windowClass;
+ windowClass = nullptr;
+ }
+ break;
+ case WM_TIMER:
case WM_USER_DISPATCH:
- ((void(*)(void*))wParam)((void*)lParam);
+ {
+ bool destroy = false;
+ HWND window = NULL;
+ {
+ std::unique_lock<std::mutex> guard(windowClass->queue->lock);
+
+ while (!windowClass->queue->scheduledWork.empty() && !windowClass->queue->scheduledWork.top().second.get()->second)
+ {
+ // discard the disposed items
+ windowClass->queue->scheduledWork.pop();
+ }
+
+ if (!windowClass->queue->scheduledWork.empty())
+ {
+ auto& item = windowClass->queue->scheduledWork.top();
+ auto now = item.second.get()->first->Now();
+
+ // wait until the work is due
+ if(now < item.first)
+ {
+ auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(item.first - now).count();
+ if (remaining >= USER_TIMER_MINIMUM)
+ {
+ return SetTimer(hwnd, 0, static_cast<UINT>(remaining), nullptr);
+ }
+ std::this_thread::sleep_until(item.first);
+ }
+
+ // dispatch work
+ auto work = std::move(item.second.get()->second);
+ auto scheduler = std::move(item.second.get()->first);
+ windowClass->queue->scheduledWork.pop();
+
+ {
+ RXCPP_UNWIND_AUTO([&]{guard.lock();});
+ guard.unlock();
+ LocalScheduler::Do(work, scheduler);
+ work = nullptr;
+ scheduler = nullptr;
+ }
+
+ if (!windowClass->queue->scheduledWork.empty())
+ {
+ ::PostMessageW(windowClass->queue->window, WindowClass::WM_USER_DISPATCH, 0, 0);
+ }
+ }
+
+ destroy = windowClass->queue->exit && windowClass->queue->scheduledWork.empty();
+ window = windowClass->queue->window;
+ }
+
+ if (destroy)
+ {
+ DestroyWindow(window);
+ }
+ }
return 0;
default:
- return DefWindowProc(hwnd, message, wParam, lParam);
+ break;
}
- }
- static WindowClass& Instance() {
- static WindowClass instance;
- return instance;
+ return DefWindowProcW(hwnd, message, wParam, lParam);
}
};
- static void run_proc(
- void* pvfn
- )
- {
- std::unique_ptr<Item> f((Item*)(void*) pvfn);
- Do(f->second, f->first);
- }
+ std::shared_ptr<Queue> queue;
public:
WindowScheduler()
- : hwnd(WindowClass::Instance().CreateWindow_())
- , ct(std::make_shared<CurrentThreadScheduler>())
+ : queue(std::make_shared<Queue>())
{
- if (!hwnd)
- throw std::exception("create window failed");
+ WindowClass::Create(queue);
}
~WindowScheduler()
{
// send one last message to ourselves to shutdown.
- Schedule([=](Scheduler::shared){ CloseWindow(hwnd); return Disposable::Empty();});
+ {
+ std::unique_lock<std::mutex> guard(queue->lock);
+ queue->exit = true;
+ }
+ ::PostMessageW(queue->window, WindowClass::WM_USER_DISPATCH, 0, 0);
}
using LocalScheduler::Schedule;
virtual Disposable Schedule(clock::time_point dueTime, Work work)
{
- std::unique_ptr<Item> f(new Item(ct, std::move(work)));
- ::PostMessage(hwnd, WindowClass::WM_USER_DISPATCH, (WPARAM)(void(*)(void*))&run_proc, (LPARAM)(void*)f.release());
- return Disposable::Empty();
+ auto cancelable = std::make_shared<Item>(std::make_pair(get(), std::move(work)));
+ {
+ std::unique_lock<std::mutex> guard(queue->lock);
+ queue->scheduledWork.push(std::make_pair(dueTime, cancelable));
+ }
+ ::PostMessageW(queue->window, WindowClass::WM_USER_DISPATCH, 0, 0);
+ auto local = queue;
+ return Disposable([local, cancelable]{
+ std::unique_lock<std::mutex> guard(local->lock);
+ cancelable.get()->second = nullptr;
+ ::PostMessageW(local->window, WindowClass::WM_USER_DISPATCH, 0, 0);
+ });
}
};
} }
diff --git a/Rx/CPP/src/cpprx/rx.hpp b/Rx/CPP/src/cpprx/rx.hpp
index 848e945..b5a75cb 100644
--- a/Rx/CPP/src/cpprx/rx.hpp
+++ b/Rx/CPP/src/cpprx/rx.hpp
@@ -7,23 +7,101 @@
#define CPPRX_RX_HPP
namespace rxcpp
-{
+{
template<class T, class Obj>
- class Binder
+ class BinderBase
{
+ protected:
Obj obj;
- static T defaultValueSelector(T t){return std::move(t);}
+
+ struct pass_through {
+ template<class X>
+ X operator()(X x) const {return std::move(x);}
+ };
+ public:
+ typedef T item_type;
+ typedef Obj observable_type;
+
+ BinderBase(Obj obj) : obj(std::move(obj))
+ {
+ }
+ };
+
+ template<class T, class Obj, bool IsTObservable>
+ class BinderNested;
+
+ template<class T, class Obj>
+ class BinderNested<T, Obj, false> : public BinderBase<T, Obj>
+ {
+ protected:
+ typedef BinderBase<T, Obj> base;
+ typedef typename base::item_type item_type;
+ typedef typename base::pass_through pass_through;
+ using base::obj;
public:
- Binder(Obj obj) : obj(std::move(obj))
+ static const bool is_item_observable = false;
+ BinderNested(Obj obj) : BinderBase<T, Obj>(std::move(obj))
{
}
+ };
+
+ template<class T, class Obj>
+ class BinderNested<T, Obj, true> : public BinderBase<T, Obj>
+ {
+ protected:
+ typedef BinderBase<T, Obj> base;
+ typedef typename base::item_type item_type;
+ typedef typename base::pass_through pass_through;
+ using base::obj;
+ public:
+ static const bool is_item_observable = true;
+
+ BinderNested(Obj obj) : base(std::move(obj))
+ {
+ }
+
+ auto select_many()
+ -> decltype(from(SelectMany<item_type>(obj, pass_through(), pass_through()))) {
+ return from(SelectMany<item_type>(obj, pass_through(), pass_through()));
+ }
+ template <class CS>
+ auto select_many(CS collectionSelector)
+ -> decltype(from(SelectMany<item_type>(obj, std::move(collectionSelector), pass_through()))) {
+ return from(SelectMany<item_type>(obj, std::move(collectionSelector), pass_through()));
+ }
+ template <class CS, class RS>
+ auto select_many(CS collectionSelector, RS resultSelector)
+ -> decltype(from(SelectMany<item_type>(obj, std::move(collectionSelector), std::move(resultSelector)))) {
+ return from(SelectMany<item_type>(obj, std::move(collectionSelector), std::move(resultSelector)));
+ }
+ };
+
+ template<class Obj>
+ class Binder : public BinderNested<
+ typename observable_item<Obj>::type,
+ Obj,
+ is_observable<typename observable_item<Obj>::type>::value>
+ {
+ typedef BinderNested<
+ typename observable_item<Obj>::type,
+ Obj,
+ is_observable<typename observable_item<Obj>::type>::value> base;
+ typedef typename base::item_type item_type;
+ typedef typename base::pass_through pass_through;
+ using base::obj;
+ public:
+
+ Binder(Obj obj) : base(std::move(obj))
+ {
+ }
+
template <class S>
- auto select(S selector) -> decltype(from(Select<T>(obj, selector))) {
- return from(Select<T>(obj, selector));
+ auto select(S selector) -> decltype(from(Select<item_type>(obj, selector))) {
+ return from(Select<item_type>(obj, selector));
}
template <class P>
- auto where(P predicate) -> decltype(from(Where<T>(obj, predicate))) {
- return from(Where<T>(obj, predicate));
+ auto where(P predicate) -> decltype(from(Where<item_type>(obj, predicate))) {
+ return from(Where<item_type>(obj, predicate));
}
Obj publish() {
return obj;
@@ -31,101 +109,107 @@ namespace rxcpp
template <class KS>
auto group_by(
KS keySelector)
- -> decltype(from(GroupBy<T>(obj, keySelector, defaultValueSelector, std::less<decltype(keySelector((*(T*)0)))>()))) {
- return from(GroupBy<T>(obj, keySelector, defaultValueSelector, std::less<decltype(keySelector((*(T*)0)))>()));
+ -> decltype(from(GroupBy<item_type>(obj, keySelector, pass_through(), std::less<decltype(keySelector((*(item_type*)0)))>()))) {
+ return from(GroupBy<item_type>(obj, keySelector, pass_through(), std::less<decltype(keySelector((*(item_type*)0)))>()));
}
template <class KS, class VS>
auto group_by(
KS keySelector,
VS valueSelector)
- -> decltype(from(GroupBy<T>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(T*)0)))>()))) {
- return from(GroupBy<T>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(T*)0)))>()));
+ -> decltype(from(GroupBy<item_type>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(item_type*)0)))>()))) {
+ return from(GroupBy<item_type>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(item_type*)0)))>()));
}
template <class KS, class VS, class L>
auto group_by(
KS keySelector,
VS valueSelector,
L less)
- -> decltype(from(GroupBy<T>(obj, keySelector, valueSelector, less))) {
- return from(GroupBy<T>(obj, keySelector, valueSelector, less));
+ -> decltype(from(GroupBy<item_type>(obj, keySelector, valueSelector, less))) {
+ return from(GroupBy<item_type>(obj, keySelector, valueSelector, less));
}
template <class Integral>
- auto take(Integral n) -> decltype(from(Take<T>(obj, n))) {
- return from(Take<T>(obj, n));
+ auto take(Integral n) -> decltype(from(Take<item_type>(obj, n))) {
+ return from(Take<item_type>(obj, n));
}
- auto delay(Scheduler::clock::duration due) -> decltype(from(Delay<T>(obj, due))) {
- return from(Delay<T>(obj, due));
+ auto delay(Scheduler::clock::duration due, Scheduler::shared scheduler) -> decltype(from(Delay<item_type>(obj, due, scheduler))) {
+ return from(Delay<item_type>(obj, due, scheduler));
}
- auto delay(Scheduler::clock::duration due, Scheduler::shared scheduler) -> decltype(from(Delay<T>(obj, due, scheduler))) {
- return from(Delay<T>(obj, due, scheduler));
+ auto limit_window(int milliseconds) -> decltype(from(LimitWindow<item_type>(obj, milliseconds))) {
+ return from(LimitWindow<item_type>(obj, milliseconds));
}
- auto limit_window(int milliseconds) -> decltype(from(LimitWindow<T>(obj, milliseconds))) {
- return from(LimitWindow<T>(obj, milliseconds));
- }
- auto distinct_until_changed() -> decltype(from(DistinctUntilChanged<T>(obj))) {
- return from(DistinctUntilChanged<T>(obj));
+ auto distinct_until_changed() -> decltype(from(DistinctUntilChanged<item_type>(obj))) {
+ return from(DistinctUntilChanged<item_type>(obj));
}
auto subscribe_on(Scheduler::shared scheduler)
- -> decltype(from(SubscribeOnObservable<T>(obj, std::move(scheduler))))
+ -> decltype(from(SubscribeOnObservable<item_type>(obj, std::move(scheduler))))
{
- return from(SubscribeOnObservable<T>(obj, std::move(scheduler)));
+ return from(SubscribeOnObservable<item_type>(obj, std::move(scheduler)));
}
auto observe_on(Scheduler::shared scheduler)
- -> decltype(from(ObserveOnObserver<T>(obj, std::move(scheduler))))
+ -> decltype(from(ObserveOnObserver<item_type>(obj, std::move(scheduler))))
{
- return from(ObserveOnObserver<T>(obj, std::move(scheduler)));
+ return from(ObserveOnObserver<item_type>(obj, std::move(scheduler)));
}
auto on_dispatcher()
- -> decltype(from(ObserveOnDispatcher<T>(obj)))
+ -> decltype(from(ObserveOnDispatcher<item_type>(obj)))
{
- return from(ObserveOnDispatcher<T>(obj));
+ return from(ObserveOnDispatcher<item_type>(obj));
}
template <class OnNext>
void for_each(OnNext onNext) {
- ForEach<T>(obj, onNext);
+ ForEach<item_type>(obj, onNext);
}
template <class OnNext>
- auto subscribe(OnNext onNext) -> decltype(Subscribe<T>(obj, onNext)) {
- auto result = Subscribe<T>(obj, onNext);
+ auto subscribe(OnNext onNext) -> decltype(Subscribe(obj, onNext)) {
+ auto result = Subscribe(obj, onNext);
return result;
}
template <class OnNext, class OnComplete>
auto subscribe(OnNext onNext, OnComplete onComplete)
- -> decltype(Subscribe<T>(obj, onNext, onComplete)) {
- auto result = Subscribe<T>(obj, onNext, onComplete);
+ -> decltype(Subscribe(obj, onNext, onComplete)) {
+ auto result = Subscribe(obj, onNext, onComplete);
return result;
}
template <class OnNext, class OnComplete, class OnError>
auto subscribe(OnNext onNext, OnComplete onComplete, OnError onError)
- -> decltype(Subscribe<T>(obj, onNext, onComplete, onError)) {
- auto result = Subscribe<T>(obj, onNext, onComplete, onError);
+ -> decltype(Subscribe(obj, onNext, onComplete, onError)) {
+ auto result = Subscribe(obj, onNext, onComplete, onError);
return result;
}
+#if RXCPP_USE_VARIADIC_TEMPLATES
+ template <class Tag, class... Arg>
+ auto chain(Arg&& ...arg)
+ -> decltype(from(rxcpp_chain(Tag(), obj, std::forward<Arg>(arg)...))) {
+ return from(rxcpp_chain(Tag(), obj, std::forward<Arg>(arg)...));
+ }
+#endif
};
template<class T>
- Binder<T, std::shared_ptr<Observable<T>>> from(std::shared_ptr<Observable<T>> obj) {
- return Binder<T, std::shared_ptr<Observable<T>>>(std::move(obj)); }
+ Binder<std::shared_ptr<Observable<T>>> from(std::shared_ptr<Observable<T>> obj) {
+ return Binder<std::shared_ptr<Observable<T>>>(std::move(obj)); }
template<class K, class T>
- Binder<T, std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedObservable<K, T>> obj) {
- return Binder<T, std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); }
-
- template<class T, class Obj>
- Binder<T, Obj> from(Binder<T, Obj> binder) {
- return std::move(binder); }
+ Binder<std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedObservable<K, T>> obj) {
+ return Binder<std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); }
template<class T>
- T item(const Binder<T, std::shared_ptr<Observable<T>>>&);
+ Binder<std::shared_ptr<Observable<T>>> from(std::shared_ptr<Subject<T>> obj) {
+ return Binder<std::shared_ptr<Observable<T>>>(std::move(obj)); }
+
+ template<class K, class T>
+ Binder<std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedSubject<K, T>> obj) {
+ return Binder<std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); }
- template<class T, class K>
- T item(const Binder<T, std::shared_ptr<GroupedObservable<K, T>>>&);
+ template<class Obj>
+ Binder<Obj> from(Binder<Obj> binder) {
+ return std::move(binder); }
template<class T>
- T item(const std::shared_ptr<Observable<T>>&);
+ T item(const Binder<std::shared_ptr<Observable<T>>>&);
- template<class K, class T>
- T item(const std::shared_ptr<GroupedObservable<K,T>>&);
+ template<class T, class K>
+ T item(const Binder<std::shared_ptr<GroupedObservable<K, T>>>&);
}
#endif