summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc')
-rw-r--r--grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc92
1 files changed, 37 insertions, 55 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
index 0b4675fa..eab25f30 100644
--- a/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
+++ b/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
@@ -20,16 +20,20 @@
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
-#include "absl/strings/str_format.h"
+#include <string.h>
#include <ares.h>
+#include "absl/strings/str_format.h"
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
-#include <string.h>
+
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
@@ -37,12 +41,8 @@
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
-#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
-#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
-
/* TODO(apolcyn): remove this hack after fixing upstream.
* Our grpc/c-ares code on Windows uses the ares_set_socket_functions API,
* which uses "struct iovec" type, which on Windows is defined inside of
@@ -99,10 +99,9 @@ class GrpcPolledFdWindows {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
};
- GrpcPolledFdWindows(ares_socket_t as,
- std::shared_ptr<WorkSerializer> work_serializer,
- int address_family, int socket_type)
- : work_serializer_(std::move(work_serializer)),
+ GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family,
+ int socket_type)
+ : mu_(mu),
read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()),
tcp_write_state_(WRITE_IDLE),
@@ -132,12 +131,12 @@ class GrpcPolledFdWindows {
}
void ScheduleAndNullReadClosure(grpc_error_handle error) {
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_, error);
+ ExecCtx::Run(DEBUG_LOCATION, read_closure_, error);
read_closure_ = nullptr;
}
void ScheduleAndNullWriteClosure(grpc_error_handle error) {
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_, error);
+ ExecCtx::Run(DEBUG_LOCATION, write_closure_, error);
write_closure_ = nullptr;
}
@@ -149,8 +148,7 @@ class GrpcPolledFdWindows {
GPR_ASSERT(!read_buf_has_data_);
read_buf_ = GRPC_SLICE_MALLOC(4192);
if (connect_done_) {
- work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
- DEBUG_LOCATION);
+ ContinueRegisterForOnReadableLocked();
} else {
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
pending_continue_register_for_on_readable_locked_ = true;
@@ -159,7 +157,7 @@ class GrpcPolledFdWindows {
void ContinueRegisterForOnReadableLocked() {
GRPC_CARES_TRACE_LOG(
- "fd:|%s| InnerContinueRegisterForOnReadableLocked "
+ "fd:|%s| ContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
@@ -206,8 +204,7 @@ class GrpcPolledFdWindows {
GPR_ASSERT(write_closure_ == nullptr);
write_closure_ = write_closure;
if (connect_done_) {
- work_serializer_->Run(
- [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION);
+ ContinueRegisterForOnWriteableLocked();
} else {
GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false);
pending_continue_register_for_on_writeable_locked_ = true;
@@ -216,7 +213,7 @@ class GrpcPolledFdWindows {
void ContinueRegisterForOnWriteableLocked() {
GRPC_CARES_TRACE_LOG(
- "fd:|%s| InnerContinueRegisterForOnWriteableLocked "
+ "fd:|%s| ContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GPR_ASSERT(connect_done_);
@@ -261,7 +258,7 @@ class GrpcPolledFdWindows {
return grpc_winsocket_wrapped_socket(winsocket_);
}
- const char* GetName() { return name_.c_str(); }
+ const char* GetName() const { return name_.c_str(); }
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int flags,
@@ -423,12 +420,8 @@ class GrpcPolledFdWindows {
static void OnTcpConnect(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
- GRPC_ERROR_REF(error); // ref owned by lambda
- grpc_polled_fd->work_serializer_->Run(
- [grpc_polled_fd, error]() {
- grpc_polled_fd->OnTcpConnectLocked(error);
- },
- DEBUG_LOCATION);
+ MutexLock lock(grpc_polled_fd->mu_);
+ grpc_polled_fd->OnTcpConnectLocked(error);
}
void OnTcpConnectLocked(grpc_error_handle error) {
@@ -465,14 +458,11 @@ class GrpcPolledFdWindows {
wsa_connect_error_ = WSA_OPERATION_ABORTED;
}
if (pending_continue_register_for_on_readable_locked_) {
- work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
- DEBUG_LOCATION);
+ ContinueRegisterForOnReadableLocked();
}
if (pending_continue_register_for_on_writeable_locked_) {
- work_serializer_->Run(
- [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION);
+ ContinueRegisterForOnWriteableLocked();
}
- GRPC_ERROR_UNREF(error);
}
int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
@@ -578,10 +568,9 @@ class GrpcPolledFdWindows {
static void OnIocpReadable(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
- GRPC_ERROR_REF(error); // ref owned by lambda
- polled_fd->work_serializer_->Run(
- [polled_fd, error]() { polled_fd->OnIocpReadableLocked(error); },
- DEBUG_LOCATION);
+ (void)GRPC_ERROR_REF(error);
+ MutexLock lock(polled_fd->mu_);
+ polled_fd->OnIocpReadableLocked(error);
}
// TODO(apolcyn): improve this error handling to be less conversative.
@@ -623,10 +612,9 @@ class GrpcPolledFdWindows {
static void OnIocpWriteable(void* arg, grpc_error_handle error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
- GRPC_ERROR_REF(error); // error owned by lambda
- polled_fd->work_serializer_->Run(
- [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); },
- DEBUG_LOCATION);
+ (void)GRPC_ERROR_REF(error);
+ MutexLock lock(polled_fd->mu_);
+ polled_fd->OnIocpWriteableLocked(error);
}
void OnIocpWriteableLocked(grpc_error_handle error) {
@@ -661,7 +649,7 @@ class GrpcPolledFdWindows {
void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
private:
- std::shared_ptr<WorkSerializer> work_serializer_;
+ Mutex* mu_;
char recv_from_source_addr_[200];
ares_socklen_t recv_from_source_addr_len_;
grpc_slice read_buf_;
@@ -674,7 +662,7 @@ class GrpcPolledFdWindows {
grpc_winsocket* winsocket_;
// tcp_write_state_ is only used on TCP GrpcPolledFds
WriteState tcp_write_state_;
- std::string name_;
+ const std::string name_;
bool gotten_into_driver_list_;
int address_family_;
int socket_type_;
@@ -703,8 +691,7 @@ struct SockToPolledFdEntry {
* with a GrpcPolledFdWindows factory and event driver */
class SockToPolledFdMap {
public:
- explicit SockToPolledFdMap(std::shared_ptr<WorkSerializer> work_serializer)
- : work_serializer_(std::move(work_serializer)) {}
+ explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {}
~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); }
@@ -762,7 +749,7 @@ class SockToPolledFdMap {
}
grpc_tcp_set_non_block(s);
GrpcPolledFdWindows* polled_fd =
- new GrpcPolledFdWindows(s, map->work_serializer_, af, type);
+ new GrpcPolledFdWindows(s, map->mu_, af, type);
GRPC_CARES_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
@@ -817,8 +804,8 @@ class SockToPolledFdMap {
}
private:
+ Mutex* mu_;
SockToPolledFdEntry* head_ = nullptr;
- std::shared_ptr<WorkSerializer> work_serializer_;
};
const struct ares_socket_functions custom_ares_sock_funcs = {
@@ -859,21 +846,18 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
return wrapped_->GetWrappedAresSocketLocked();
}
- const char* GetName() override { return wrapped_->GetName(); }
+ const char* GetName() const override { return wrapped_->GetName(); }
private:
- GrpcPolledFdWindows* wrapped_;
+ GrpcPolledFdWindows* const wrapped_;
};
class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
public:
- explicit GrpcPolledFdFactoryWindows(
- std::shared_ptr<WorkSerializer> work_serializer)
- : sock_to_polled_fd_map_(std::move(work_serializer)) {}
+ explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {}
GrpcPolledFd* NewGrpcPolledFdLocked(
- ares_socket_t as, grpc_pollset_set* driver_pollset_set,
- std::shared_ptr<WorkSerializer> work_serializer) override {
+ ares_socket_t as, grpc_pollset_set* driver_pollset_set) override {
GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
// Set a flag so that the virtual socket "close" method knows it
// doesn't need to call ShutdownLocked, since now the driver will.
@@ -890,10 +874,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
SockToPolledFdMap sock_to_polled_fd_map_;
};
-std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
- std::shared_ptr<WorkSerializer> work_serializer) {
- return absl::make_unique<GrpcPolledFdFactoryWindows>(
- std::move(work_serializer));
+std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* mu) {
+ return absl::make_unique<GrpcPolledFdFactoryWindows>(mu);
}
} // namespace grpc_core