summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp
blob: da45e8d942a36a408dc907b207e54ce24bb183f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
#define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace schedulers {

struct event_loop : public scheduler_interface
{
private:
    typedef event_loop this_type;
    event_loop(const this_type&);

    struct loop_worker : public worker_interface
    {
    private:
        typedef loop_worker this_type;
        loop_worker(const this_type&);

        typedef detail::schedulable_queue<
            typename clock_type::time_point> queue_item_time;

        typedef queue_item_time::item_type item_type;

        composite_subscription lifetime;
        worker controller;

    public:
        virtual ~loop_worker()
        {
        }
        loop_worker(composite_subscription cs, worker w)
            : lifetime(cs)
            , controller(w)
        {
        }

        virtual clock_type::time_point now() const {
            return clock_type::now();
        }

        virtual void schedule(const schedulable& scbl) const {
            controller.schedule(lifetime, scbl.get_action());
        }

        virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
            controller.schedule(when, lifetime, scbl.get_action());
        }
    };

    mutable thread_factory factory;
    scheduler newthread;
    mutable std::atomic<std::size_t> count;
    std::vector<worker> loops;

public:
    event_loop()
        : factory([](std::function<void()> start){
            return std::thread(std::move(start));
        })
        , newthread(make_new_thread())
        , count(0)
    {
        auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
        while (remaining--) {
            loops.push_back(newthread.create_worker());
        }
    }
    explicit event_loop(thread_factory tf)
        : factory(tf)
        , newthread(make_new_thread(tf))
        , count(0)
    {
        auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
        while (remaining--) {
            loops.push_back(newthread.create_worker());
        }
    }
    virtual ~event_loop()
    {
    }

    virtual clock_type::time_point now() const {
        return clock_type::now();
    }

    virtual worker create_worker(composite_subscription cs) const {
        return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()]));
    }
};

inline scheduler make_event_loop() {
    static scheduler instance = make_scheduler<event_loop>();
    return instance;
}
inline scheduler make_event_loop(thread_factory tf) {
    return make_scheduler<event_loop>(tf);
}

}

}

#endif