summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/sources/rx-create.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/sources/rx-create.hpp')
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-create.hpp55
1 files changed, 55 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/sources/rx-create.hpp b/Rx/v2/src/rxcpp/sources/rx-create.hpp
new file mode 100644
index 0000000..2c50e98
--- /dev/null
+++ b/Rx/v2/src/rxcpp/sources/rx-create.hpp
@@ -0,0 +1,55 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_SOURCES_RX_CREATE_HPP)
+#define RXCPP_SOURCES_RX_CREATE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace sources {
+
+namespace detail {
+
+template<class T, class OnSubscribe>
+struct create : public source_base<T>
+{
+ typedef create<T, OnSubscribe> this_type;
+
+ typedef typename std::decay<OnSubscribe>::type on_subscribe_type;
+
+ on_subscribe_type on_subscribe_function;
+
+ create(on_subscribe_type os)
+ : on_subscribe_function(std::move(os))
+ {
+ }
+
+ template<class Subscriber>
+ void on_subscribe(Subscriber o) const {
+
+ on_exception(
+ [&](){
+ this->on_subscribe_function(o);
+ return true;
+ },
+ o);
+ }
+};
+
+}
+
+template<class T, class OnSubscribe>
+auto create(OnSubscribe os)
+ -> observable<T, detail::create<T, OnSubscribe>> {
+ return observable<T, detail::create<T, OnSubscribe>>(
+ detail::create<T, OnSubscribe>(std::move(os)));
+}
+
+}
+
+}
+
+#endif