diff options
-rw-r--r-- | Ix/CPP/src/IxCpp.vcxproj | 65 | ||||
-rw-r--r-- | Ix/CPP/src/IxCpp.vcxproj.filters | 55 | ||||
-rw-r--r-- | Rx/CPP/src/RxCpp.vcxproj | 61 | ||||
-rw-r--r-- | Rx/CPP/src/RxCpp.vcxproj.filters | 43 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx-base.hpp | 37 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx-includes.hpp | 27 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx-operators.hpp | 194 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx-scheduler.hpp | 4 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx-windows.hpp | 165 | ||||
-rw-r--r-- | Rx/CPP/src/cpprx/rx.hpp | 188 |
10 files changed, 722 insertions, 117 deletions
diff --git a/Ix/CPP/src/IxCpp.vcxproj b/Ix/CPP/src/IxCpp.vcxproj new file mode 100644 index 0000000..9367080 --- /dev/null +++ b/Ix/CPP/src/IxCpp.vcxproj @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{65483A0D-1D72-47CC-B147-27415FFC1FC3}</ProjectGuid> + <Keyword>MakeFileProj</Keyword> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration"> + <ConfigurationType>Makefile</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration"> + <ConfigurationType>Makefile</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <NMakePreprocessorDefinitions>WIN32;_DEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <NMakePreprocessorDefinitions>WIN32;NDEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions> + </PropertyGroup> + <ItemDefinitionGroup> + </ItemDefinitionGroup> + <ItemGroup> + <Text Include="readme.txt" /> + </ItemGroup> + <ItemGroup> + <ClInclude Include="cpplinq\linq.hpp" /> + <ClInclude Include="cpplinq\linq_cursor.hpp" /> + <ClInclude Include="cpplinq\linq_groupby.hpp" /> + <ClInclude Include="cpplinq\linq_iterators.hpp" /> + <ClInclude Include="cpplinq\linq_last.hpp" /> + <ClInclude Include="cpplinq\linq_select.hpp" /> + <ClInclude Include="cpplinq\linq_selectmany.hpp" /> + <ClInclude Include="cpplinq\linq_skip.hpp" /> + <ClInclude Include="cpplinq\linq_take.hpp" /> + <ClInclude Include="cpplinq\linq_where.hpp" /> + <ClInclude Include="cpplinq\util.hpp" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + </ImportGroup> +</Project>
\ No newline at end of file diff --git a/Ix/CPP/src/IxCpp.vcxproj.filters b/Ix/CPP/src/IxCpp.vcxproj.filters new file mode 100644 index 0000000..dd7d5bc --- /dev/null +++ b/Ix/CPP/src/IxCpp.vcxproj.filters @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <Filter Include="Source Files"> + <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier> + <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions> + </Filter> + <Filter Include="Header Files"> + <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier> + <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions> + </Filter> + <Filter Include="Resource Files"> + <UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier> + <Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions> + </Filter> + </ItemGroup> + <ItemGroup> + <Text Include="readme.txt" /> + </ItemGroup> + <ItemGroup> + <ClInclude Include="cpplinq\linq.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_cursor.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_groupby.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_iterators.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_last.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_select.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_selectmany.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_skip.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_take.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\linq_where.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpplinq\util.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + </ItemGroup> +</Project>
\ No newline at end of file diff --git a/Rx/CPP/src/RxCpp.vcxproj b/Rx/CPP/src/RxCpp.vcxproj new file mode 100644 index 0000000..7a7398f --- /dev/null +++ b/Rx/CPP/src/RxCpp.vcxproj @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{A912FB36-33A7-4DDA-A7D6-D17F9495A623}</ProjectGuid> + <Keyword>MakeFileProj</Keyword> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration"> + <ConfigurationType>Makefile</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration"> + <ConfigurationType>Makefile</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <NMakePreprocessorDefinitions>WIN32;_DEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <NMakePreprocessorDefinitions>WIN32;NDEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions> + </PropertyGroup> + <ItemDefinitionGroup> + </ItemDefinitionGroup> + <ItemGroup> + <Text Include="readme.txt" /> + </ItemGroup> + <ItemGroup> + <ClInclude Include="cpprx\rx-base.hpp" /> + <ClInclude Include="cpprx\rx-includes.hpp" /> + <ClInclude Include="cpprx\rx-operators.hpp" /> + <ClInclude Include="cpprx\rx-scheduler.hpp" /> + <ClInclude Include="cpprx\rx-util.hpp" /> + <ClInclude Include="cpprx\rx-windows.hpp" /> + <ClInclude Include="cpprx\rx.hpp" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + </ImportGroup> +</Project>
\ No newline at end of file diff --git a/Rx/CPP/src/RxCpp.vcxproj.filters b/Rx/CPP/src/RxCpp.vcxproj.filters new file mode 100644 index 0000000..2ce3d34 --- /dev/null +++ b/Rx/CPP/src/RxCpp.vcxproj.filters @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <Filter Include="Source Files"> + <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier> + <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions> + </Filter> + <Filter Include="Header Files"> + <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier> + <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions> + </Filter> + <Filter Include="Resource Files"> + <UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier> + <Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions> + </Filter> + </ItemGroup> + <ItemGroup> + <Text Include="readme.txt" /> + </ItemGroup> + <ItemGroup> + <ClInclude Include="cpprx\rx.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-base.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-includes.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-operators.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-scheduler.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-util.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="cpprx\rx-windows.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + </ItemGroup> +</Project>
\ No newline at end of file diff --git a/Rx/CPP/src/cpprx/rx-base.hpp b/Rx/CPP/src/cpprx/rx-base.hpp index e5af503..6182d0c 100644 --- a/Rx/CPP/src/cpprx/rx-base.hpp +++ b/Rx/CPP/src/cpprx/rx-base.hpp @@ -261,25 +261,13 @@ namespace rxcpp LocalScheduler(const LocalScheduler&); public: - static void DoNoThrow(Work& work, Scheduler::shared scheduler) throw() + static void Do(Work& work, Scheduler::shared scheduler) throw() { if (work) { work(std::move(scheduler)); } } - static void Do(Work& work, Scheduler::shared scheduler) - { - try { - DoNoThrow(work, std::move(scheduler)); - } catch (const std::exception& ) { - // work must catch all expected exceptions - std::unexpected(); - } catch (...) { - // work must catch all expected exceptions - std::unexpected(); - } - } public: LocalScheduler() @@ -305,5 +293,28 @@ namespace rxcpp } }; + template<class T> + T item(const std::shared_ptr<Observable<T>>&); + + template<class K, class T> + T item(const std::shared_ptr<GroupedObservable<K,T>>&); + + template<class Observable> + struct is_observable {static const bool value = false;}; + + template<class T> + struct is_observable<std::shared_ptr<Observable<T>>> {static const bool value = true;}; + + template<class K, class T> + struct is_observable<std::shared_ptr<GroupedObservable<K, T>>> {static const bool value = true;}; + + template<class Observable> + struct observable_item; + + template<class T> + struct observable_item<std::shared_ptr<Observable<T>>> {typedef T type;}; + + template<class K, class T> + struct observable_item<std::shared_ptr<GroupedObservable<K, T>>> {typedef T type;}; } #endif diff --git a/Rx/CPP/src/cpprx/rx-includes.hpp b/Rx/CPP/src/cpprx/rx-includes.hpp index d5be690..e0574aa 100644 --- a/Rx/CPP/src/cpprx/rx-includes.hpp +++ b/Rx/CPP/src/cpprx/rx-includes.hpp @@ -26,6 +26,33 @@ #include <chrono> #include <condition_variable> +// some configuration macros +#if defined(_MSC_VER) + +#if _MSC_VER > 1600 +#define RXCPP_USE_RVALUEREF 1 +#define RXCPP_USE_VARIADIC_TEMPLATES 0 +#endif + +#if _CPPRTTI +#define RXCPP_USE_RTTI 1 +#endif + +#elif defined(__clang__) + +#if __has_feature(cxx_rvalue_references) +#define RXCPP_USE_RVALUEREF 1 +#endif +#if __has_feature(cxx_rtti) +#define RXCPP_USE_RTTI 1 +#endif +#if __has_feature(cxx_variadic_templates) +#define RXCPP_USE_VARIADIC_TEMPLATES 1 +#endif + +#endif + + #include "rx-util.hpp" #include "rx-base.hpp" #include "rx-scheduler.hpp" diff --git a/Rx/CPP/src/cpprx/rx-operators.hpp b/Rx/CPP/src/cpprx/rx-operators.hpp index cfd100d..29b489f 100644 --- a/Rx/CPP/src/cpprx/rx-operators.hpp +++ b/Rx/CPP/src/cpprx/rx-operators.hpp @@ -120,10 +120,12 @@ namespace rxcpp public std::enable_shared_from_this<Subject> { protected: + std::mutex lock; std::vector<std::shared_ptr<Observer<T>>> observers; void RemoveObserver(std::shared_ptr<Observer<T>> toRemove) { + std::unique_lock<decltype(lock)> guard(lock); auto it = std::find(begin(observers), end(observers), toRemove); if (it != end(observers)) *it = nullptr; @@ -142,14 +144,17 @@ namespace rxcpp } }); - for(auto& o : observers) { - if (!o){ - o = std::move(observer); - return d; + std::unique_lock<decltype(lock)> guard(lock); + for(auto& o : observers) + { + if (!o){ + o = std::move(observer); + return d; + } } + observers.push_back(std::move(observer)); } - observers.push_back(std::move(observer)); return d; } }; @@ -166,37 +171,45 @@ namespace rxcpp virtual void OnNext(const T& element) { - for(auto& o : Base::observers) + std::unique_lock<decltype(Base::lock)> guard(Base::lock); + auto local = Base::observers; + guard.unlock(); + for(auto& o : local) { try { - if (o) + if (o) { o->OnNext(element); + } } catch (...) { - auto o_ = std::move(o); - o_->OnError(std::current_exception()); + o->OnError(std::current_exception()); + Base::RemoveObserver(o); } } } virtual void OnCompleted() { - for(auto& o : Base::observers) + std::unique_lock<decltype(Base::lock)> guard(Base::lock); + auto local = std::move(Base::observers); + guard.unlock(); + for(auto& o : local) { if (o) { o->OnCompleted(); - o = nullptr; } } } virtual void OnError(const std::exception_ptr& error) { - for(auto& o : Base::observers) + std::unique_lock<decltype(Base::lock)> guard(Base::lock); + auto local = std::move(Base::observers); + guard.unlock(); + for(auto& o : local) { if (o) { o->OnError(error); - o = nullptr; } } } @@ -317,15 +330,16 @@ namespace rxcpp // // imperative functions - template <class T> + template <class S> Disposable Subscribe( - const std::shared_ptr<Observable<T>>& source, - typename util::identity<std::function<void(const T&)>>::type onNext, + const S& source, + typename util::identity<std::function<void(const typename observable_item<S>::type&)>>::type onNext, std::function<void()> onCompleted = nullptr, std::function<void(const std::exception_ptr&)> onError = nullptr ) { - auto observer = CreateObserver<T>(std::move(onNext), std::move(onCompleted), std::move(onError)); + auto observer = CreateObserver<typename observable_item<S>::type>( + std::move(onNext), std::move(onCompleted), std::move(onError)); return source->Subscribe(observer); } @@ -376,7 +390,7 @@ namespace rxcpp return CreateObservable<U>( [=](std::shared_ptr<Observer<U>> observer) { - return Subscribe<T>( + return Subscribe( source, // on next [=](const T& element) @@ -396,6 +410,146 @@ namespace rxcpp }); }); } + + template<class T> + struct reveal_type {private: reveal_type();}; + + template <class T, class CS, class RS> + auto SelectMany( + const std::shared_ptr<Observable<T>>& source, + CS collectionSelector, + RS resultSelector) + -> const std::shared_ptr<Observable< + typename std::result_of<RS( + const typename observable_item< + typename std::result_of<CS(const T&)>::type>::type&)>::type>> + { + typedef typename std::result_of<CS(const T&)>::type C; + typedef typename observable_item<C>::type CI; + typedef typename std::result_of<RS(const CI&)>::type U; + + return CreateObservable<U>( + [=](std::shared_ptr<Observer<U>> observer) + -> Disposable + { + struct State { + size_t subscribed; + bool cancel; + std::mutex lock; + }; + auto state = std::make_shared<State>(); + state->cancel = false; + state->subscribed = 0; + + ComposableDisposable cd; + + cd.Add(Disposable([=]{ + std::unique_lock<std::mutex> guard(state->lock); + state->cancel = true; }) + ); + + ++state->subscribed; + cd.Add(Subscribe( + source, + // on next + [=](const T& element) + { + bool cancel = false; + { + std::unique_lock<std::mutex> guard(state->lock); + cancel = state->cancel; + if (!cancel) ++state->subscribed; + } + if (!cancel) { + try { + auto collection = collectionSelector(element); + cd.Add(Subscribe( + collection, + // on next + [=](const CI& element) + { + bool cancel = false; + { + std::unique_lock<std::mutex> guard(state->lock); + cancel = state->cancel; + } + try { + if (!cancel) { + auto result = resultSelector(element); + observer->OnNext(std::move(result)); + } + } catch (...) { + observer->OnError(std::current_exception()); + cd.Dispose(); + } + }, + // on completed + [=] + { + bool cancel = false; + bool finished = false; + { + std::unique_lock<std::mutex> guard(state->lock); + finished = (--state->subscribed) == 0; + cancel = state->cancel; + } + if (!cancel && finished) + observer->OnCompleted(); + }, + // on error + [=](const std::exception_ptr& error) + { + bool cancel = false; + { + std::unique_lock<std::mutex> guard(state->lock); + --state->subscribed; + cancel = state->cancel; + } + if (!cancel) + observer->OnError(error); + cd.Dispose(); + })); + } catch (...) { + bool cancel = false; + { + std::unique_lock<std::mutex> guard(state->lock); + cancel = state->cancel; + } + if (!cancel) { + observer->OnError(std::current_exception()); + } + cd.Dispose(); + } + } + }, + // on completed + [=] + { + bool cancel = false; + bool finished = false; + { + std::unique_lock<std::mutex> guard(state->lock); + finished = (--state->subscribed) == 0; + cancel = state->cancel; + } + if (!cancel && finished) + observer->OnCompleted(); + }, + // on error + [=](const std::exception_ptr& error) + { + bool cancel = false; + { + std::unique_lock<std::mutex> guard(state->lock); + --state->subscribed; + cancel = state->cancel; + } + if (!cancel) + observer->OnError(error); + })); + return cd; + }); + } template <class T, class P> const std::shared_ptr<Observable<T>> Where( @@ -577,9 +731,8 @@ namespace rxcpp std::shared_ptr<Observable<T>> Delay( const std::shared_ptr<Observable<T>>& source, Scheduler::clock::duration due, - Scheduler::shared scheduler = nullptr) + Scheduler::shared scheduler) { - if (!scheduler) {scheduler = std::make_shared<CurrentThreadScheduler>();} return CreateObservable<T>( [=](std::shared_ptr<Observer<T>> observer) -> Disposable @@ -685,6 +838,7 @@ namespace rxcpp -> Disposable { struct State { + State() : last(), hasValue(false) {} T last; bool hasValue; }; diff --git a/Rx/CPP/src/cpprx/rx-scheduler.hpp b/Rx/CPP/src/cpprx/rx-scheduler.hpp index 5fcef15..037e41e 100644 --- a/Rx/CPP/src/cpprx/rx-scheduler.hpp +++ b/Rx/CPP/src/cpprx/rx-scheduler.hpp @@ -103,6 +103,7 @@ namespace rxcpp virtual Disposable Schedule(clock::time_point dueTime, Work work) { auto ct = std::make_shared<CurrentThreadScheduler>(); + std::this_thread::sleep_until(dueTime); Do(work, ct); return Disposable::Empty(); } @@ -183,7 +184,8 @@ namespace rxcpp if (++trampoline == 1) { auto local = shared_from_this(); - result.set(factory([local]{local->Run();})); + result.set(factory([local]{ + local->Run();})); // trampoline lifetime is now owned by the thread unwindTrampoline.dismiss(); } diff --git a/Rx/CPP/src/cpprx/rx-windows.hpp b/Rx/CPP/src/cpprx/rx-windows.hpp index 72e636f..31b932b 100644 --- a/Rx/CPP/src/cpprx/rx-windows.hpp +++ b/Rx/CPP/src/cpprx/rx-windows.hpp @@ -93,78 +93,181 @@ namespace rxcpp { namespace win32 { class WindowScheduler : public rxcpp::LocalScheduler { - HWND hwnd; - Scheduler::shared ct; + + struct compare_work + { + template <class T> + bool operator()(const T& work1, const T& work2) const { + return work1.first > work2.first; + } + }; + + struct Queue; + typedef std::pair<Scheduler::shared, Work> Item; + typedef std::priority_queue< + std::pair<clock::time_point, std::shared_ptr<Item>>, + std::vector<std::pair<clock::time_point, std::shared_ptr<Item>>>, + compare_work + > ScheduledWork; + + struct Queue + { + Queue() : exit(false), window(NULL) {} + bool exit; + HWND window; + ScheduledWork scheduledWork; + mutable std::mutex lock; + }; + struct WindowClass { + std::shared_ptr<Queue> queue; + static const wchar_t* const className(){ return L"rxcpp::win32::WindowScheduler::WindowClass"; } WindowClass() { - WNDCLASS wndclass = {}; + } + + static void Create(const std::shared_ptr<Queue>& queue) + { + WNDCLASSW wndclass = {}; wndclass.style = 0; wndclass.lpfnWndProc = &WndProc; - wndclass.cbClsExtra; wndclass.cbWndExtra = 0; wndclass.hInstance = NULL; wndclass.lpszClassName = className(); - if (!RegisterClass(&wndclass)) + if (!RegisterClassW(&wndclass)) throw std::exception("failed to register windows class"); + std::unique_ptr<WindowClass> that(new WindowClass); + that->queue = queue; + + queue->window = CreateWindowExW(0, WindowClass::className(), L"MessageOnlyWindow", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, 0, reinterpret_cast<LPVOID>(that.get())); + if (!queue->window) + throw std::exception("create window failed"); + + that.release(); } - HWND CreateWindow_() - { - return CreateWindowEx(0, WindowClass::className(), L"MessageOnlyWindow", 0, 0, 0, 0, 0, HWND_MESSAGE, 0, 0, 0); - } + static const int WM_USER_DISPATCH = WM_USER + 1; static LRESULT CALLBACK WndProc(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam) { + static WindowClass* windowClass; switch (message) { - // TODO: shatter attack surface. should validate the message, e.g. using a handle table. + case WM_NCCREATE: + { + windowClass = reinterpret_cast<WindowClass*>(reinterpret_cast<LPCREATESTRUCT>(lParam)->lpCreateParams); + } + break; + case WM_NCDESTROY: + { + delete windowClass; + windowClass = nullptr; + } + break; + case WM_TIMER: case WM_USER_DISPATCH: - ((void(*)(void*))wParam)((void*)lParam); + { + bool destroy = false; + HWND window = NULL; + { + std::unique_lock<std::mutex> guard(windowClass->queue->lock); + + while (!windowClass->queue->scheduledWork.empty() && !windowClass->queue->scheduledWork.top().second.get()->second) + { + // discard the disposed items + windowClass->queue->scheduledWork.pop(); + } + + if (!windowClass->queue->scheduledWork.empty()) + { + auto& item = windowClass->queue->scheduledWork.top(); + auto now = item.second.get()->first->Now(); + + // wait until the work is due + if(now < item.first) + { + auto remaining = std::chrono::duration_cast<std::chrono::milliseconds>(item.first - now).count(); + if (remaining >= USER_TIMER_MINIMUM) + { + return SetTimer(hwnd, 0, static_cast<UINT>(remaining), nullptr); + } + std::this_thread::sleep_until(item.first); + } + + // dispatch work + auto work = std::move(item.second.get()->second); + auto scheduler = std::move(item.second.get()->first); + windowClass->queue->scheduledWork.pop(); + + { + RXCPP_UNWIND_AUTO([&]{guard.lock();}); + guard.unlock(); + LocalScheduler::Do(work, scheduler); + work = nullptr; + scheduler = nullptr; + } + + if (!windowClass->queue->scheduledWork.empty()) + { + ::PostMessageW(windowClass->queue->window, WindowClass::WM_USER_DISPATCH, 0, 0); + } + } + + destroy = windowClass->queue->exit && windowClass->queue->scheduledWork.empty(); + window = windowClass->queue->window; + } + + if (destroy) + { + DestroyWindow(window); + } + } return 0; default: - return DefWindowProc(hwnd, message, wParam, lParam); + break; } - } - static WindowClass& Instance() { - static WindowClass instance; - return instance; + return DefWindowProcW(hwnd, message, wParam, lParam); } }; - static void run_proc( - void* pvfn - ) - { - std::unique_ptr<Item> f((Item*)(void*) pvfn); - Do(f->second, f->first); - } + std::shared_ptr<Queue> queue; public: WindowScheduler() - : hwnd(WindowClass::Instance().CreateWindow_()) - , ct(std::make_shared<CurrentThreadScheduler>()) + : queue(std::make_shared<Queue>()) { - if (!hwnd) - throw std::exception("create window failed"); + WindowClass::Create(queue); } ~WindowScheduler() { // send one last message to ourselves to shutdown. - Schedule([=](Scheduler::shared){ CloseWindow(hwnd); return Disposable::Empty();}); + { + std::unique_lock<std::mutex> guard(queue->lock); + queue->exit = true; + } + ::PostMessageW(queue->window, WindowClass::WM_USER_DISPATCH, 0, 0); } using LocalScheduler::Schedule; virtual Disposable Schedule(clock::time_point dueTime, Work work) { - std::unique_ptr<Item> f(new Item(ct, std::move(work))); - ::PostMessage(hwnd, WindowClass::WM_USER_DISPATCH, (WPARAM)(void(*)(void*))&run_proc, (LPARAM)(void*)f.release()); - return Disposable::Empty(); + auto cancelable = std::make_shared<Item>(std::make_pair(get(), std::move(work))); + { + std::unique_lock<std::mutex> guard(queue->lock); + queue->scheduledWork.push(std::make_pair(dueTime, cancelable)); + } + ::PostMessageW(queue->window, WindowClass::WM_USER_DISPATCH, 0, 0); + auto local = queue; + return Disposable([local, cancelable]{ + std::unique_lock<std::mutex> guard(local->lock); + cancelable.get()->second = nullptr; + ::PostMessageW(local->window, WindowClass::WM_USER_DISPATCH, 0, 0); + }); } }; } } diff --git a/Rx/CPP/src/cpprx/rx.hpp b/Rx/CPP/src/cpprx/rx.hpp index 848e945..b5a75cb 100644 --- a/Rx/CPP/src/cpprx/rx.hpp +++ b/Rx/CPP/src/cpprx/rx.hpp @@ -7,23 +7,101 @@ #define CPPRX_RX_HPP namespace rxcpp -{ +{ template<class T, class Obj> - class Binder + class BinderBase { + protected: Obj obj; - static T defaultValueSelector(T t){return std::move(t);} + + struct pass_through { + template<class X> + X operator()(X x) const {return std::move(x);} + }; + public: + typedef T item_type; + typedef Obj observable_type; + + BinderBase(Obj obj) : obj(std::move(obj)) + { + } + }; + + template<class T, class Obj, bool IsTObservable> + class BinderNested; + + template<class T, class Obj> + class BinderNested<T, Obj, false> : public BinderBase<T, Obj> + { + protected: + typedef BinderBase<T, Obj> base; + typedef typename base::item_type item_type; + typedef typename base::pass_through pass_through; + using base::obj; public: - Binder(Obj obj) : obj(std::move(obj)) + static const bool is_item_observable = false; + BinderNested(Obj obj) : BinderBase<T, Obj>(std::move(obj)) { } + }; + + template<class T, class Obj> + class BinderNested<T, Obj, true> : public BinderBase<T, Obj> + { + protected: + typedef BinderBase<T, Obj> base; + typedef typename base::item_type item_type; + typedef typename base::pass_through pass_through; + using base::obj; + public: + static const bool is_item_observable = true; + + BinderNested(Obj obj) : base(std::move(obj)) + { + } + + auto select_many() + -> decltype(from(SelectMany<item_type>(obj, pass_through(), pass_through()))) { + return from(SelectMany<item_type>(obj, pass_through(), pass_through())); + } + template <class CS> + auto select_many(CS collectionSelector) + -> decltype(from(SelectMany<item_type>(obj, std::move(collectionSelector), pass_through()))) { + return from(SelectMany<item_type>(obj, std::move(collectionSelector), pass_through())); + } + template <class CS, class RS> + auto select_many(CS collectionSelector, RS resultSelector) + -> decltype(from(SelectMany<item_type>(obj, std::move(collectionSelector), std::move(resultSelector)))) { + return from(SelectMany<item_type>(obj, std::move(collectionSelector), std::move(resultSelector))); + } + }; + + template<class Obj> + class Binder : public BinderNested< + typename observable_item<Obj>::type, + Obj, + is_observable<typename observable_item<Obj>::type>::value> + { + typedef BinderNested< + typename observable_item<Obj>::type, + Obj, + is_observable<typename observable_item<Obj>::type>::value> base; + typedef typename base::item_type item_type; + typedef typename base::pass_through pass_through; + using base::obj; + public: + + Binder(Obj obj) : base(std::move(obj)) + { + } + template <class S> - auto select(S selector) -> decltype(from(Select<T>(obj, selector))) { - return from(Select<T>(obj, selector)); + auto select(S selector) -> decltype(from(Select<item_type>(obj, selector))) { + return from(Select<item_type>(obj, selector)); } template <class P> - auto where(P predicate) -> decltype(from(Where<T>(obj, predicate))) { - return from(Where<T>(obj, predicate)); + auto where(P predicate) -> decltype(from(Where<item_type>(obj, predicate))) { + return from(Where<item_type>(obj, predicate)); } Obj publish() { return obj; @@ -31,101 +109,107 @@ namespace rxcpp template <class KS> auto group_by( KS keySelector) - -> decltype(from(GroupBy<T>(obj, keySelector, defaultValueSelector, std::less<decltype(keySelector((*(T*)0)))>()))) { - return from(GroupBy<T>(obj, keySelector, defaultValueSelector, std::less<decltype(keySelector((*(T*)0)))>())); + -> decltype(from(GroupBy<item_type>(obj, keySelector, pass_through(), std::less<decltype(keySelector((*(item_type*)0)))>()))) { + return from(GroupBy<item_type>(obj, keySelector, pass_through(), std::less<decltype(keySelector((*(item_type*)0)))>())); } template <class KS, class VS> auto group_by( KS keySelector, VS valueSelector) - -> decltype(from(GroupBy<T>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(T*)0)))>()))) { - return from(GroupBy<T>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(T*)0)))>())); + -> decltype(from(GroupBy<item_type>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(item_type*)0)))>()))) { + return from(GroupBy<item_type>(obj, keySelector, valueSelector, std::less<decltype(keySelector((*(item_type*)0)))>())); } template <class KS, class VS, class L> auto group_by( KS keySelector, VS valueSelector, L less) - -> decltype(from(GroupBy<T>(obj, keySelector, valueSelector, less))) { - return from(GroupBy<T>(obj, keySelector, valueSelector, less)); + -> decltype(from(GroupBy<item_type>(obj, keySelector, valueSelector, less))) { + return from(GroupBy<item_type>(obj, keySelector, valueSelector, less)); } template <class Integral> - auto take(Integral n) -> decltype(from(Take<T>(obj, n))) { - return from(Take<T>(obj, n)); + auto take(Integral n) -> decltype(from(Take<item_type>(obj, n))) { + return from(Take<item_type>(obj, n)); } - auto delay(Scheduler::clock::duration due) -> decltype(from(Delay<T>(obj, due))) { - return from(Delay<T>(obj, due)); + auto delay(Scheduler::clock::duration due, Scheduler::shared scheduler) -> decltype(from(Delay<item_type>(obj, due, scheduler))) { + return from(Delay<item_type>(obj, due, scheduler)); } - auto delay(Scheduler::clock::duration due, Scheduler::shared scheduler) -> decltype(from(Delay<T>(obj, due, scheduler))) { - return from(Delay<T>(obj, due, scheduler)); + auto limit_window(int milliseconds) -> decltype(from(LimitWindow<item_type>(obj, milliseconds))) { + return from(LimitWindow<item_type>(obj, milliseconds)); } - auto limit_window(int milliseconds) -> decltype(from(LimitWindow<T>(obj, milliseconds))) { - return from(LimitWindow<T>(obj, milliseconds)); - } - auto distinct_until_changed() -> decltype(from(DistinctUntilChanged<T>(obj))) { - return from(DistinctUntilChanged<T>(obj)); + auto distinct_until_changed() -> decltype(from(DistinctUntilChanged<item_type>(obj))) { + return from(DistinctUntilChanged<item_type>(obj)); } auto subscribe_on(Scheduler::shared scheduler) - -> decltype(from(SubscribeOnObservable<T>(obj, std::move(scheduler)))) + -> decltype(from(SubscribeOnObservable<item_type>(obj, std::move(scheduler)))) { - return from(SubscribeOnObservable<T>(obj, std::move(scheduler))); + return from(SubscribeOnObservable<item_type>(obj, std::move(scheduler))); } auto observe_on(Scheduler::shared scheduler) - -> decltype(from(ObserveOnObserver<T>(obj, std::move(scheduler)))) + -> decltype(from(ObserveOnObserver<item_type>(obj, std::move(scheduler)))) { - return from(ObserveOnObserver<T>(obj, std::move(scheduler))); + return from(ObserveOnObserver<item_type>(obj, std::move(scheduler))); } auto on_dispatcher() - -> decltype(from(ObserveOnDispatcher<T>(obj))) + -> decltype(from(ObserveOnDispatcher<item_type>(obj))) { - return from(ObserveOnDispatcher<T>(obj)); + return from(ObserveOnDispatcher<item_type>(obj)); } template <class OnNext> void for_each(OnNext onNext) { - ForEach<T>(obj, onNext); + ForEach<item_type>(obj, onNext); } template <class OnNext> - auto subscribe(OnNext onNext) -> decltype(Subscribe<T>(obj, onNext)) { - auto result = Subscribe<T>(obj, onNext); + auto subscribe(OnNext onNext) -> decltype(Subscribe(obj, onNext)) { + auto result = Subscribe(obj, onNext); return result; } template <class OnNext, class OnComplete> auto subscribe(OnNext onNext, OnComplete onComplete) - -> decltype(Subscribe<T>(obj, onNext, onComplete)) { - auto result = Subscribe<T>(obj, onNext, onComplete); + -> decltype(Subscribe(obj, onNext, onComplete)) { + auto result = Subscribe(obj, onNext, onComplete); return result; } template <class OnNext, class OnComplete, class OnError> auto subscribe(OnNext onNext, OnComplete onComplete, OnError onError) - -> decltype(Subscribe<T>(obj, onNext, onComplete, onError)) { - auto result = Subscribe<T>(obj, onNext, onComplete, onError); + -> decltype(Subscribe(obj, onNext, onComplete, onError)) { + auto result = Subscribe(obj, onNext, onComplete, onError); return result; } +#if RXCPP_USE_VARIADIC_TEMPLATES + template <class Tag, class... Arg> + auto chain(Arg&& ...arg) + -> decltype(from(rxcpp_chain(Tag(), obj, std::forward<Arg>(arg)...))) { + return from(rxcpp_chain(Tag(), obj, std::forward<Arg>(arg)...)); + } +#endif }; template<class T> - Binder<T, std::shared_ptr<Observable<T>>> from(std::shared_ptr<Observable<T>> obj) { - return Binder<T, std::shared_ptr<Observable<T>>>(std::move(obj)); } + Binder<std::shared_ptr<Observable<T>>> from(std::shared_ptr<Observable<T>> obj) { + return Binder<std::shared_ptr<Observable<T>>>(std::move(obj)); } template<class K, class T> - Binder<T, std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedObservable<K, T>> obj) { - return Binder<T, std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); } - - template<class T, class Obj> - Binder<T, Obj> from(Binder<T, Obj> binder) { - return std::move(binder); } + Binder<std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedObservable<K, T>> obj) { + return Binder<std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); } template<class T> - T item(const Binder<T, std::shared_ptr<Observable<T>>>&); + Binder<std::shared_ptr<Observable<T>>> from(std::shared_ptr<Subject<T>> obj) { + return Binder<std::shared_ptr<Observable<T>>>(std::move(obj)); } + + template<class K, class T> + Binder<std::shared_ptr<GroupedObservable<K, T>>> from(std::shared_ptr<GroupedSubject<K, T>> obj) { + return Binder<std::shared_ptr<GroupedObservable<K, T>>>(std::move(obj)); } - template<class T, class K> - T item(const Binder<T, std::shared_ptr<GroupedObservable<K, T>>>&); + template<class Obj> + Binder<Obj> from(Binder<Obj> binder) { + return std::move(binder); } template<class T> - T item(const std::shared_ptr<Observable<T>>&); + T item(const Binder<std::shared_ptr<Observable<T>>>&); - template<class K, class T> - T item(const std::shared_ptr<GroupedObservable<K,T>>&); + template<class T, class K> + T item(const Binder<std::shared_ptr<GroupedObservable<K, T>>>&); } #endif |