1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
#if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
#define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
#include "../rx-includes.hpp"
namespace rxcpp {
namespace schedulers {
struct event_loop : public scheduler_interface
{
private:
typedef event_loop this_type;
event_loop(const this_type&);
struct loop_worker : public worker_interface
{
private:
typedef loop_worker this_type;
loop_worker(const this_type&);
typedef detail::schedulable_queue<
typename clock_type::time_point> queue_item_time;
typedef queue_item_time::item_type item_type;
composite_subscription lifetime;
worker controller;
std::shared_ptr<const scheduler_interface> alive;
public:
virtual ~loop_worker()
{
}
loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
: lifetime(cs)
, controller(w)
, alive(alive)
{
auto token = controller.add(cs);
cs.add([token, w](){
w.remove(token);
});
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual void schedule(const schedulable& scbl) const {
controller.schedule(lifetime, scbl.get_action());
}
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
controller.schedule(when, lifetime, scbl.get_action());
}
};
mutable thread_factory factory;
scheduler newthread;
mutable std::atomic<std::size_t> count;
composite_subscription loops_lifetime;
std::vector<worker> loops;
public:
event_loop()
: factory([](std::function<void()> start){
return std::thread(std::move(start));
})
, newthread(make_new_thread())
, count(0)
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
loops.push_back(newthread.create_worker(loops_lifetime));
}
}
explicit event_loop(thread_factory tf)
: factory(tf)
, newthread(make_new_thread(tf))
, count(0)
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
loops.push_back(newthread.create_worker(loops_lifetime));
}
}
virtual ~event_loop()
{
loops_lifetime.unsubscribe();
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual worker create_worker(composite_subscription cs) const {
return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
}
};
inline scheduler make_event_loop() {
static scheduler instance = make_scheduler<event_loop>();
return instance;
}
inline scheduler make_event_loop(thread_factory tf) {
return make_scheduler<event_loop>(tf);
}
}
}
#endif
|