From 8103ab12a472dbb30df17d4ac98fa78d976ec00b Mon Sep 17 00:00:00 2001 From: Alex Vakulenko Date: Wed, 20 Jan 2016 17:39:50 -0800 Subject: Restore MessagePumpGlib files These are apparently used on Chrome OS and were deleted from AOSP by mistake... Recovering. Change-Id: I257eee62a27d1dbb6029c776ac56af78ca110213 --- base/message_loop/message_pump_glib.cc | 363 ++++++++++++++++ base/message_loop/message_pump_glib.h | 78 ++++ base/message_loop/message_pump_glib_unittest.cc | 534 ++++++++++++++++++++++++ 3 files changed, 975 insertions(+) create mode 100644 base/message_loop/message_pump_glib.cc create mode 100644 base/message_loop/message_pump_glib.h create mode 100644 base/message_loop/message_pump_glib_unittest.cc diff --git a/base/message_loop/message_pump_glib.cc b/base/message_loop/message_pump_glib.cc new file mode 100644 index 0000000000..f06f60d8cf --- /dev/null +++ b/base/message_loop/message_pump_glib.cc @@ -0,0 +1,363 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop/message_pump_glib.h" + +#include +#include + +#include + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/posix/eintr_wrapper.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" + +namespace base { + +namespace { + +// Return a timeout suitable for the glib loop, -1 to block forever, +// 0 to return right away, or a timeout in milliseconds from now. +int GetTimeIntervalMilliseconds(const TimeTicks& from) { + if (from.is_null()) + return -1; + + // Be careful here. TimeDelta has a precision of microseconds, but we want a + // value in milliseconds. If there are 5.5ms left, should the delay be 5 or + // 6? It should be 6 to avoid executing delayed work too early. + int delay = static_cast( + ceil((from - TimeTicks::Now()).InMillisecondsF())); + + // If this value is negative, then we need to run delayed work soon. + return delay < 0 ? 0 : delay; +} + +// A brief refresher on GLib: +// GLib sources have four callbacks: Prepare, Check, Dispatch and Finalize. +// On each iteration of the GLib pump, it calls each source's Prepare function. +// This function should return TRUE if it wants GLib to call its Dispatch, and +// FALSE otherwise. It can also set a timeout in this case for the next time +// Prepare should be called again (it may be called sooner). +// After the Prepare calls, GLib does a poll to check for events from the +// system. File descriptors can be attached to the sources. The poll may block +// if none of the Prepare calls returned TRUE. It will block indefinitely, or +// by the minimum time returned by a source in Prepare. +// After the poll, GLib calls Check for each source that returned FALSE +// from Prepare. The return value of Check has the same meaning as for Prepare, +// making Check a second chance to tell GLib we are ready for Dispatch. +// Finally, GLib calls Dispatch for each source that is ready. If Dispatch +// returns FALSE, GLib will destroy the source. Dispatch calls may be recursive +// (i.e., you can call Run from them), but Prepare and Check cannot. +// Finalize is called when the source is destroyed. +// NOTE: It is common for subsytems to want to process pending events while +// doing intensive work, for example the flash plugin. They usually use the +// following pattern (recommended by the GTK docs): +// while (gtk_events_pending()) { +// gtk_main_iteration(); +// } +// +// gtk_events_pending just calls g_main_context_pending, which does the +// following: +// - Call prepare on all the sources. +// - Do the poll with a timeout of 0 (not blocking). +// - Call check on all the sources. +// - *Does not* call dispatch on the sources. +// - Return true if any of prepare() or check() returned true. +// +// gtk_main_iteration just calls g_main_context_iteration, which does the whole +// thing, respecting the timeout for the poll (and block, although it is +// expected not to if gtk_events_pending returned true), and call dispatch. +// +// Thus it is important to only return true from prepare or check if we +// actually have events or work to do. We also need to make sure we keep +// internal state consistent so that if prepare/check return true when called +// from gtk_events_pending, they will still return true when called right +// after, from gtk_main_iteration. +// +// For the GLib pump we try to follow the Windows UI pump model: +// - Whenever we receive a wakeup event or the timer for delayed work expires, +// we run DoWork and/or DoDelayedWork. That part will also run in the other +// event pumps. +// - We also run DoWork, DoDelayedWork, and possibly DoIdleWork in the main +// loop, around event handling. + +struct WorkSource : public GSource { + MessagePumpGlib* pump; +}; + +gboolean WorkSourcePrepare(GSource* source, + gint* timeout_ms) { + *timeout_ms = static_cast(source)->pump->HandlePrepare(); + // We always return FALSE, so that our timeout is honored. If we were + // to return TRUE, the timeout would be considered to be 0 and the poll + // would never block. Once the poll is finished, Check will be called. + return FALSE; +} + +gboolean WorkSourceCheck(GSource* source) { + // Only return TRUE if Dispatch should be called. + return static_cast(source)->pump->HandleCheck(); +} + +gboolean WorkSourceDispatch(GSource* source, + GSourceFunc unused_func, + gpointer unused_data) { + + static_cast(source)->pump->HandleDispatch(); + // Always return TRUE so our source stays registered. + return TRUE; +} + +// I wish these could be const, but g_source_new wants non-const. +GSourceFuncs WorkSourceFuncs = { + WorkSourcePrepare, + WorkSourceCheck, + WorkSourceDispatch, + NULL +}; + +// The following is used to make sure we only run the MessagePumpGlib on one +// thread. X only has one message pump so we can only have one UI loop per +// process. +#ifndef NDEBUG + +// Tracks the pump the most recent pump that has been run. +struct ThreadInfo { + // The pump. + MessagePumpGlib* pump; + + // ID of the thread the pump was run on. + PlatformThreadId thread_id; +}; + +// Used for accesing |thread_info|. +static LazyInstance::Leaky thread_info_lock = LAZY_INSTANCE_INITIALIZER; + +// If non-NULL it means a MessagePumpGlib exists and has been Run. This is +// destroyed when the MessagePump is destroyed. +ThreadInfo* thread_info = NULL; + +void CheckThread(MessagePumpGlib* pump) { + AutoLock auto_lock(thread_info_lock.Get()); + if (!thread_info) { + thread_info = new ThreadInfo; + thread_info->pump = pump; + thread_info->thread_id = PlatformThread::CurrentId(); + } + DCHECK(thread_info->thread_id == PlatformThread::CurrentId()) << + "Running MessagePumpGlib on two different threads; " + "this is unsupported by GLib!"; +} + +void PumpDestroyed(MessagePumpGlib* pump) { + AutoLock auto_lock(thread_info_lock.Get()); + if (thread_info && thread_info->pump == pump) { + delete thread_info; + thread_info = NULL; + } +} + +#endif + +} // namespace + +struct MessagePumpGlib::RunState { + Delegate* delegate; + + // Used to flag that the current Run() invocation should return ASAP. + bool should_quit; + + // Used to count how many Run() invocations are on the stack. + int run_depth; + + // This keeps the state of whether the pump got signaled that there was new + // work to be done. Since we eat the message on the wake up pipe as soon as + // we get it, we keep that state here to stay consistent. + bool has_work; +}; + +MessagePumpGlib::MessagePumpGlib() + : state_(NULL), + context_(g_main_context_default()), + wakeup_gpollfd_(new GPollFD) { + // Create our wakeup pipe, which is used to flag when work was scheduled. + int fds[2]; + int ret = pipe(fds); + DCHECK_EQ(ret, 0); + (void)ret; // Prevent warning in release mode. + + wakeup_pipe_read_ = fds[0]; + wakeup_pipe_write_ = fds[1]; + wakeup_gpollfd_->fd = wakeup_pipe_read_; + wakeup_gpollfd_->events = G_IO_IN; + + work_source_ = g_source_new(&WorkSourceFuncs, sizeof(WorkSource)); + static_cast(work_source_)->pump = this; + g_source_add_poll(work_source_, wakeup_gpollfd_.get()); + // Use a low priority so that we let other events in the queue go first. + g_source_set_priority(work_source_, G_PRIORITY_DEFAULT_IDLE); + // This is needed to allow Run calls inside Dispatch. + g_source_set_can_recurse(work_source_, TRUE); + g_source_attach(work_source_, context_); +} + +MessagePumpGlib::~MessagePumpGlib() { +#ifndef NDEBUG + PumpDestroyed(this); +#endif + g_source_destroy(work_source_); + g_source_unref(work_source_); + close(wakeup_pipe_read_); + close(wakeup_pipe_write_); +} + +// Return the timeout we want passed to poll. +int MessagePumpGlib::HandlePrepare() { + // We know we have work, but we haven't called HandleDispatch yet. Don't let + // the pump block so that we can do some processing. + if (state_ && // state_ may be null during tests. + state_->has_work) + return 0; + + // We don't think we have work to do, but make sure not to block + // longer than the next time we need to run delayed work. + return GetTimeIntervalMilliseconds(delayed_work_time_); +} + +bool MessagePumpGlib::HandleCheck() { + if (!state_) // state_ may be null during tests. + return false; + + // We usually have a single message on the wakeup pipe, since we are only + // signaled when the queue went from empty to non-empty, but there can be + // two messages if a task posted a task, hence we read at most two bytes. + // The glib poll will tell us whether there was data, so this read + // shouldn't block. + if (wakeup_gpollfd_->revents & G_IO_IN) { + char msg[2]; + const int num_bytes = HANDLE_EINTR(read(wakeup_pipe_read_, msg, 2)); + if (num_bytes < 1) { + NOTREACHED() << "Error reading from the wakeup pipe."; + } + DCHECK((num_bytes == 1 && msg[0] == '!') || + (num_bytes == 2 && msg[0] == '!' && msg[1] == '!')); + // Since we ate the message, we need to record that we have more work, + // because HandleCheck() may be called without HandleDispatch being called + // afterwards. + state_->has_work = true; + } + + if (state_->has_work) + return true; + + if (GetTimeIntervalMilliseconds(delayed_work_time_) == 0) { + // The timer has expired. That condition will stay true until we process + // that delayed work, so we don't need to record this differently. + return true; + } + + return false; +} + +void MessagePumpGlib::HandleDispatch() { + state_->has_work = false; + if (state_->delegate->DoWork()) { + // NOTE: on Windows at this point we would call ScheduleWork (see + // MessagePumpGlib::HandleWorkMessage in message_pump_win.cc). But here, + // instead of posting a message on the wakeup pipe, we can avoid the + // syscalls and just signal that we have more work. + state_->has_work = true; + } + + if (state_->should_quit) + return; + + state_->delegate->DoDelayedWork(&delayed_work_time_); +} + +void MessagePumpGlib::Run(Delegate* delegate) { +#ifndef NDEBUG + CheckThread(this); +#endif + + RunState state; + state.delegate = delegate; + state.should_quit = false; + state.run_depth = state_ ? state_->run_depth + 1 : 1; + state.has_work = false; + + RunState* previous_state = state_; + state_ = &state; + + // We really only do a single task for each iteration of the loop. If we + // have done something, assume there is likely something more to do. This + // will mean that we don't block on the message pump until there was nothing + // more to do. We also set this to true to make sure not to block on the + // first iteration of the loop, so RunUntilIdle() works correctly. + bool more_work_is_plausible = true; + + // We run our own loop instead of using g_main_loop_quit in one of the + // callbacks. This is so we only quit our own loops, and we don't quit + // nested loops run by others. TODO(deanm): Is this what we want? + for (;;) { + // Don't block if we think we have more work to do. + bool block = !more_work_is_plausible; + + more_work_is_plausible = g_main_context_iteration(context_, block); + if (state_->should_quit) + break; + + more_work_is_plausible |= state_->delegate->DoWork(); + if (state_->should_quit) + break; + + more_work_is_plausible |= + state_->delegate->DoDelayedWork(&delayed_work_time_); + if (state_->should_quit) + break; + + if (more_work_is_plausible) + continue; + + more_work_is_plausible = state_->delegate->DoIdleWork(); + if (state_->should_quit) + break; + } + + state_ = previous_state; +} + +void MessagePumpGlib::Quit() { + if (state_) { + state_->should_quit = true; + } else { + NOTREACHED() << "Quit called outside Run!"; + } +} + +void MessagePumpGlib::ScheduleWork() { + // This can be called on any thread, so we don't want to touch any state + // variables as we would then need locks all over. This ensures that if + // we are sleeping in a poll that we will wake up. + char msg = '!'; + if (HANDLE_EINTR(write(wakeup_pipe_write_, &msg, 1)) != 1) { + NOTREACHED() << "Could not write to the UI message loop wakeup pipe!"; + } +} + +void MessagePumpGlib::ScheduleDelayedWork(const TimeTicks& delayed_work_time) { + // We need to wake up the loop in case the poll timeout needs to be + // adjusted. This will cause us to try to do work, but that's ok. + delayed_work_time_ = delayed_work_time; + ScheduleWork(); +} + +bool MessagePumpGlib::ShouldQuit() const { + CHECK(state_); + return state_->should_quit; +} + +} // namespace base diff --git a/base/message_loop/message_pump_glib.h b/base/message_loop/message_pump_glib.h new file mode 100644 index 0000000000..9f4457141d --- /dev/null +++ b/base/message_loop/message_pump_glib.h @@ -0,0 +1,78 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_MESSAGE_LOOP_MESSAGE_PUMP_GLIB_H_ +#define BASE_MESSAGE_LOOP_MESSAGE_PUMP_GLIB_H_ + +#include "base/base_export.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_pump.h" +#include "base/observer_list.h" +#include "base/time/time.h" + +typedef struct _GMainContext GMainContext; +typedef struct _GPollFD GPollFD; +typedef struct _GSource GSource; + +namespace base { + +// This class implements a base MessagePump needed for TYPE_UI MessageLoops on +// platforms using GLib. +class BASE_EXPORT MessagePumpGlib : public MessagePump { + public: + MessagePumpGlib(); + ~MessagePumpGlib() override; + + // Internal methods used for processing the pump callbacks. They are + // public for simplicity but should not be used directly. HandlePrepare + // is called during the prepare step of glib, and returns a timeout that + // will be passed to the poll. HandleCheck is called after the poll + // has completed, and returns whether or not HandleDispatch should be called. + // HandleDispatch is called if HandleCheck returned true. + int HandlePrepare(); + bool HandleCheck(); + void HandleDispatch(); + + // Overridden from MessagePump: + void Run(Delegate* delegate) override; + void Quit() override; + void ScheduleWork() override; + void ScheduleDelayedWork(const TimeTicks& delayed_work_time) override; + + private: + bool ShouldQuit() const; + + // We may make recursive calls to Run, so we save state that needs to be + // separate between them in this structure type. + struct RunState; + + RunState* state_; + + // This is a GLib structure that we can add event sources to. We use the + // default GLib context, which is the one to which all GTK events are + // dispatched. + GMainContext* context_; + + // This is the time when we need to do delayed work. + TimeTicks delayed_work_time_; + + // The work source. It is shared by all calls to Run and destroyed when + // the message pump is destroyed. + GSource* work_source_; + + // We use a wakeup pipe to make sure we'll get out of the glib polling phase + // when another thread has scheduled us to do some work. There is a glib + // mechanism g_main_context_wakeup, but this won't guarantee that our event's + // Dispatch() will be called. + int wakeup_pipe_read_; + int wakeup_pipe_write_; + // Use a scoped_ptr to avoid needing the definition of GPollFD in the header. + scoped_ptr wakeup_gpollfd_; + + DISALLOW_COPY_AND_ASSIGN(MessagePumpGlib); +}; + +} // namespace base + +#endif // BASE_MESSAGE_LOOP_MESSAGE_PUMP_GLIB_H_ diff --git a/base/message_loop/message_pump_glib_unittest.cc b/base/message_loop/message_pump_glib_unittest.cc new file mode 100644 index 0000000000..7ddd4f08a0 --- /dev/null +++ b/base/message_loop/message_pump_glib_unittest.cc @@ -0,0 +1,534 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop/message_pump_glib.h" + +#include +#include + +#include +#include + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/threading/thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { +namespace { + +// This class injects dummy "events" into the GLib loop. When "handled" these +// events can run tasks. This is intended to mock gtk events (the corresponding +// GLib source runs at the same priority). +class EventInjector { + public: + EventInjector() : processed_events_(0) { + source_ = static_cast(g_source_new(&SourceFuncs, sizeof(Source))); + source_->injector = this; + g_source_attach(source_, NULL); + g_source_set_can_recurse(source_, TRUE); + } + + ~EventInjector() { + g_source_destroy(source_); + g_source_unref(source_); + } + + int HandlePrepare() { + // If the queue is empty, block. + if (events_.empty()) + return -1; + TimeDelta delta = events_[0].time - Time::NowFromSystemTime(); + return std::max(0, static_cast(ceil(delta.InMillisecondsF()))); + } + + bool HandleCheck() { + if (events_.empty()) + return false; + return events_[0].time <= Time::NowFromSystemTime(); + } + + void HandleDispatch() { + if (events_.empty()) + return; + Event event = events_[0]; + events_.erase(events_.begin()); + ++processed_events_; + if (!event.callback.is_null()) + event.callback.Run(); + else if (!event.task.is_null()) + event.task.Run(); + } + + // Adds an event to the queue. When "handled", executes |callback|. + // delay_ms is relative to the last event if any, or to Now() otherwise. + void AddEvent(int delay_ms, const Closure& callback) { + AddEventHelper(delay_ms, callback, Closure()); + } + + void AddDummyEvent(int delay_ms) { + AddEventHelper(delay_ms, Closure(), Closure()); + } + + void AddEventAsTask(int delay_ms, const Closure& task) { + AddEventHelper(delay_ms, Closure(), task); + } + + void Reset() { + processed_events_ = 0; + events_.clear(); + } + + int processed_events() const { return processed_events_; } + + private: + struct Event { + Time time; + Closure callback; + Closure task; + }; + + struct Source : public GSource { + EventInjector* injector; + }; + + void AddEventHelper( + int delay_ms, const Closure& callback, const Closure& task) { + Time last_time; + if (!events_.empty()) + last_time = (events_.end()-1)->time; + else + last_time = Time::NowFromSystemTime(); + + Time future = last_time + TimeDelta::FromMilliseconds(delay_ms); + EventInjector::Event event = {future, callback, task}; + events_.push_back(event); + } + + static gboolean Prepare(GSource* source, gint* timeout_ms) { + *timeout_ms = static_cast(source)->injector->HandlePrepare(); + return FALSE; + } + + static gboolean Check(GSource* source) { + return static_cast(source)->injector->HandleCheck(); + } + + static gboolean Dispatch(GSource* source, + GSourceFunc unused_func, + gpointer unused_data) { + static_cast(source)->injector->HandleDispatch(); + return TRUE; + } + + Source* source_; + std::vector events_; + int processed_events_; + static GSourceFuncs SourceFuncs; + DISALLOW_COPY_AND_ASSIGN(EventInjector); +}; + +GSourceFuncs EventInjector::SourceFuncs = { + EventInjector::Prepare, + EventInjector::Check, + EventInjector::Dispatch, + NULL +}; + +void IncrementInt(int *value) { + ++*value; +} + +// Checks how many events have been processed by the injector. +void ExpectProcessedEvents(EventInjector* injector, int count) { + EXPECT_EQ(injector->processed_events(), count); +} + +// Posts a task on the current message loop. +void PostMessageLoopTask(const tracked_objects::Location& from_here, + const Closure& task) { + MessageLoop::current()->PostTask(from_here, task); +} + +// Test fixture. +class MessagePumpGLibTest : public testing::Test { + public: + MessagePumpGLibTest() : loop_(NULL), injector_(NULL) { } + + // Overridden from testing::Test: + void SetUp() override { + loop_ = new MessageLoop(MessageLoop::TYPE_UI); + injector_ = new EventInjector(); + } + void TearDown() override { + delete injector_; + injector_ = NULL; + delete loop_; + loop_ = NULL; + } + + MessageLoop* loop() const { return loop_; } + EventInjector* injector() const { return injector_; } + + private: + MessageLoop* loop_; + EventInjector* injector_; + DISALLOW_COPY_AND_ASSIGN(MessagePumpGLibTest); +}; + +} // namespace + +TEST_F(MessagePumpGLibTest, TestQuit) { + // Checks that Quit works and that the basic infrastructure is working. + + // Quit from a task + RunLoop().RunUntilIdle(); + EXPECT_EQ(0, injector()->processed_events()); + + injector()->Reset(); + // Quit from an event + injector()->AddEvent(0, MessageLoop::QuitWhenIdleClosure()); + loop()->Run(); + EXPECT_EQ(1, injector()->processed_events()); +} + +TEST_F(MessagePumpGLibTest, TestEventTaskInterleave) { + // Checks that tasks posted by events are executed before the next event if + // the posted task queue is empty. + // MessageLoop doesn't make strong guarantees that it is the case, but the + // current implementation ensures it and the tests below rely on it. + // If changes cause this test to fail, it is reasonable to change it, but + // TestWorkWhileWaitingForEvents and TestEventsWhileWaitingForWork have to be + // changed accordingly, otherwise they can become flaky. + injector()->AddEventAsTask(0, Bind(&DoNothing)); + Closure check_task = + Bind(&ExpectProcessedEvents, Unretained(injector()), 2); + Closure posted_task = + Bind(&PostMessageLoopTask, FROM_HERE, check_task); + injector()->AddEventAsTask(0, posted_task); + injector()->AddEventAsTask(0, Bind(&DoNothing)); + injector()->AddEvent(0, MessageLoop::QuitWhenIdleClosure()); + loop()->Run(); + EXPECT_EQ(4, injector()->processed_events()); + + injector()->Reset(); + injector()->AddEventAsTask(0, Bind(&DoNothing)); + check_task = + Bind(&ExpectProcessedEvents, Unretained(injector()), 2); + posted_task = Bind(&PostMessageLoopTask, FROM_HERE, check_task); + injector()->AddEventAsTask(0, posted_task); + injector()->AddEventAsTask(10, Bind(&DoNothing)); + injector()->AddEvent(0, MessageLoop::QuitWhenIdleClosure()); + loop()->Run(); + EXPECT_EQ(4, injector()->processed_events()); +} + +TEST_F(MessagePumpGLibTest, TestWorkWhileWaitingForEvents) { + int task_count = 0; + // Tests that we process tasks while waiting for new events. + // The event queue is empty at first. + for (int i = 0; i < 10; ++i) { + loop()->PostTask(FROM_HERE, Bind(&IncrementInt, &task_count)); + } + // After all the previous tasks have executed, enqueue an event that will + // quit. + loop()->PostTask( + FROM_HERE, + Bind(&EventInjector::AddEvent, Unretained(injector()), 0, + MessageLoop::QuitWhenIdleClosure())); + loop()->Run(); + ASSERT_EQ(10, task_count); + EXPECT_EQ(1, injector()->processed_events()); + + // Tests that we process delayed tasks while waiting for new events. + injector()->Reset(); + task_count = 0; + for (int i = 0; i < 10; ++i) { + loop()->PostDelayedTask( + FROM_HERE, + Bind(&IncrementInt, &task_count), + TimeDelta::FromMilliseconds(10*i)); + } + // After all the previous tasks have executed, enqueue an event that will + // quit. + // This relies on the fact that delayed tasks are executed in delay order. + // That is verified in message_loop_unittest.cc. + loop()->PostDelayedTask( + FROM_HERE, + Bind(&EventInjector::AddEvent, Unretained(injector()), 10, + MessageLoop::QuitWhenIdleClosure()), + TimeDelta::FromMilliseconds(150)); + loop()->Run(); + ASSERT_EQ(10, task_count); + EXPECT_EQ(1, injector()->processed_events()); +} + +TEST_F(MessagePumpGLibTest, TestEventsWhileWaitingForWork) { + // Tests that we process events while waiting for work. + // The event queue is empty at first. + for (int i = 0; i < 10; ++i) { + injector()->AddDummyEvent(0); + } + // After all the events have been processed, post a task that will check that + // the events have been processed (note: the task executes after the event + // that posted it has been handled, so we expect 11 at that point). + Closure check_task = + Bind(&ExpectProcessedEvents, Unretained(injector()), 11); + Closure posted_task = + Bind(&PostMessageLoopTask, FROM_HERE, check_task); + injector()->AddEventAsTask(10, posted_task); + + // And then quit (relies on the condition tested by TestEventTaskInterleave). + injector()->AddEvent(10, MessageLoop::QuitWhenIdleClosure()); + loop()->Run(); + + EXPECT_EQ(12, injector()->processed_events()); +} + +namespace { + +// This class is a helper for the concurrent events / posted tasks test below. +// It will quit the main loop once enough tasks and events have been processed, +// while making sure there is always work to do and events in the queue. +class ConcurrentHelper : public RefCounted { + public: + explicit ConcurrentHelper(EventInjector* injector) + : injector_(injector), + event_count_(kStartingEventCount), + task_count_(kStartingTaskCount) { + } + + void FromTask() { + if (task_count_ > 0) { + --task_count_; + } + if (task_count_ == 0 && event_count_ == 0) { + MessageLoop::current()->QuitWhenIdle(); + } else { + MessageLoop::current()->PostTask( + FROM_HERE, Bind(&ConcurrentHelper::FromTask, this)); + } + } + + void FromEvent() { + if (event_count_ > 0) { + --event_count_; + } + if (task_count_ == 0 && event_count_ == 0) { + MessageLoop::current()->QuitWhenIdle(); + } else { + injector_->AddEventAsTask( + 0, Bind(&ConcurrentHelper::FromEvent, this)); + } + } + + int event_count() const { return event_count_; } + int task_count() const { return task_count_; } + + private: + friend class RefCounted; + + ~ConcurrentHelper() {} + + static const int kStartingEventCount = 20; + static const int kStartingTaskCount = 20; + + EventInjector* injector_; + int event_count_; + int task_count_; +}; + +} // namespace + +TEST_F(MessagePumpGLibTest, TestConcurrentEventPostedTask) { + // Tests that posted tasks don't starve events, nor the opposite. + // We use the helper class above. We keep both event and posted task queues + // full, the helper verifies that both tasks and events get processed. + // If that is not the case, either event_count_ or task_count_ will not get + // to 0, and MessageLoop::QuitWhenIdle() will never be called. + scoped_refptr helper = new ConcurrentHelper(injector()); + + // Add 2 events to the queue to make sure it is always full (when we remove + // the event before processing it). + injector()->AddEventAsTask( + 0, Bind(&ConcurrentHelper::FromEvent, helper.get())); + injector()->AddEventAsTask( + 0, Bind(&ConcurrentHelper::FromEvent, helper.get())); + + // Similarly post 2 tasks. + loop()->PostTask( + FROM_HERE, Bind(&ConcurrentHelper::FromTask, helper.get())); + loop()->PostTask( + FROM_HERE, Bind(&ConcurrentHelper::FromTask, helper.get())); + + loop()->Run(); + EXPECT_EQ(0, helper->event_count()); + EXPECT_EQ(0, helper->task_count()); +} + +namespace { + +void AddEventsAndDrainGLib(EventInjector* injector) { + // Add a couple of dummy events + injector->AddDummyEvent(0); + injector->AddDummyEvent(0); + // Then add an event that will quit the main loop. + injector->AddEvent(0, MessageLoop::QuitWhenIdleClosure()); + + // Post a couple of dummy tasks + MessageLoop::current()->PostTask(FROM_HERE, Bind(&DoNothing)); + MessageLoop::current()->PostTask(FROM_HERE, Bind(&DoNothing)); + + // Drain the events + while (g_main_context_pending(NULL)) { + g_main_context_iteration(NULL, FALSE); + } +} + +} // namespace + +TEST_F(MessagePumpGLibTest, TestDrainingGLib) { + // Tests that draining events using GLib works. + loop()->PostTask( + FROM_HERE, + Bind(&AddEventsAndDrainGLib, Unretained(injector()))); + loop()->Run(); + + EXPECT_EQ(3, injector()->processed_events()); +} + +namespace { + +// Helper class that lets us run the GLib message loop. +class GLibLoopRunner : public RefCounted { + public: + GLibLoopRunner() : quit_(false) { } + + void RunGLib() { + while (!quit_) { + g_main_context_iteration(NULL, TRUE); + } + } + + void RunLoop() { + while (!quit_) { + g_main_context_iteration(NULL, TRUE); + } + } + + void Quit() { + quit_ = true; + } + + void Reset() { + quit_ = false; + } + + private: + friend class RefCounted; + + ~GLibLoopRunner() {} + + bool quit_; +}; + +void TestGLibLoopInternal(EventInjector* injector) { + // Allow tasks to be processed from 'native' event loops. + MessageLoop::current()->SetNestableTasksAllowed(true); + scoped_refptr runner = new GLibLoopRunner(); + + int task_count = 0; + // Add a couple of dummy events + injector->AddDummyEvent(0); + injector->AddDummyEvent(0); + // Post a couple of dummy tasks + MessageLoop::current()->PostTask( + FROM_HERE, Bind(&IncrementInt, &task_count)); + MessageLoop::current()->PostTask( + FROM_HERE, Bind(&IncrementInt, &task_count)); + // Delayed events + injector->AddDummyEvent(10); + injector->AddDummyEvent(10); + // Delayed work + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + Bind(&IncrementInt, &task_count), + TimeDelta::FromMilliseconds(30)); + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + Bind(&GLibLoopRunner::Quit, runner.get()), + TimeDelta::FromMilliseconds(40)); + + // Run a nested, straight GLib message loop. + runner->RunGLib(); + + ASSERT_EQ(3, task_count); + EXPECT_EQ(4, injector->processed_events()); + MessageLoop::current()->QuitWhenIdle(); +} + +void TestGtkLoopInternal(EventInjector* injector) { + // Allow tasks to be processed from 'native' event loops. + MessageLoop::current()->SetNestableTasksAllowed(true); + scoped_refptr runner = new GLibLoopRunner(); + + int task_count = 0; + // Add a couple of dummy events + injector->AddDummyEvent(0); + injector->AddDummyEvent(0); + // Post a couple of dummy tasks + MessageLoop::current()->PostTask( + FROM_HERE, Bind(&IncrementInt, &task_count)); + MessageLoop::current()->PostTask( + FROM_HERE, Bind(&IncrementInt, &task_count)); + // Delayed events + injector->AddDummyEvent(10); + injector->AddDummyEvent(10); + // Delayed work + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + Bind(&IncrementInt, &task_count), + TimeDelta::FromMilliseconds(30)); + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + Bind(&GLibLoopRunner::Quit, runner.get()), + TimeDelta::FromMilliseconds(40)); + + // Run a nested, straight Gtk message loop. + runner->RunLoop(); + + ASSERT_EQ(3, task_count); + EXPECT_EQ(4, injector->processed_events()); + MessageLoop::current()->QuitWhenIdle(); +} + +} // namespace + +TEST_F(MessagePumpGLibTest, TestGLibLoop) { + // Tests that events and posted tasks are correctly executed if the message + // loop is not run by MessageLoop::Run() but by a straight GLib loop. + // Note that in this case we don't make strong guarantees about niceness + // between events and posted tasks. + loop()->PostTask( + FROM_HERE, + Bind(&TestGLibLoopInternal, Unretained(injector()))); + loop()->Run(); +} + +TEST_F(MessagePumpGLibTest, TestGtkLoop) { + // Tests that events and posted tasks are correctly executed if the message + // loop is not run by MessageLoop::Run() but by a straight Gtk loop. + // Note that in this case we don't make strong guarantees about niceness + // between events and posted tasks. + loop()->PostTask( + FROM_HERE, + Bind(&TestGtkLoopInternal, Unretained(injector()))); + loop()->Run(); +} + +} // namespace base -- cgit v1.2.3