diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc | 92 |
1 files changed, 37 insertions, 55 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index 0b4675fa..eab25f30 100644 --- a/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -20,16 +20,20 @@ #include "src/core/lib/iomgr/port.h" #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) -#include "absl/strings/str_format.h" +#include <string.h> #include <ares.h> +#include "absl/strings/str_format.h" + #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include <string.h> + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" @@ -37,12 +41,8 @@ #include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_windows.h" -#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" -#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" - /* TODO(apolcyn): remove this hack after fixing upstream. * Our grpc/c-ares code on Windows uses the ares_set_socket_functions API, * which uses "struct iovec" type, which on Windows is defined inside of @@ -99,10 +99,9 @@ class GrpcPolledFdWindows { WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, }; - GrpcPolledFdWindows(ares_socket_t as, - std::shared_ptr<WorkSerializer> work_serializer, - int address_family, int socket_type) - : work_serializer_(std::move(work_serializer)), + GrpcPolledFdWindows(ares_socket_t as, Mutex* mu, int address_family, + int socket_type) + : mu_(mu), read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), tcp_write_state_(WRITE_IDLE), @@ -132,12 +131,12 @@ class GrpcPolledFdWindows { } void ScheduleAndNullReadClosure(grpc_error_handle error) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, read_closure_, error); read_closure_ = nullptr; } void ScheduleAndNullWriteClosure(grpc_error_handle error) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, write_closure_, error); write_closure_ = nullptr; } @@ -149,8 +148,7 @@ class GrpcPolledFdWindows { GPR_ASSERT(!read_buf_has_data_); read_buf_ = GRPC_SLICE_MALLOC(4192); if (connect_done_) { - work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, - DEBUG_LOCATION); + ContinueRegisterForOnReadableLocked(); } else { GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false); pending_continue_register_for_on_readable_locked_ = true; @@ -159,7 +157,7 @@ class GrpcPolledFdWindows { void ContinueRegisterForOnReadableLocked() { GRPC_CARES_TRACE_LOG( - "fd:|%s| InnerContinueRegisterForOnReadableLocked " + "fd:|%s| ContinueRegisterForOnReadableLocked " "wsa_connect_error_:%d", GetName(), wsa_connect_error_); GPR_ASSERT(connect_done_); @@ -206,8 +204,7 @@ class GrpcPolledFdWindows { GPR_ASSERT(write_closure_ == nullptr); write_closure_ = write_closure; if (connect_done_) { - work_serializer_->Run( - [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); + ContinueRegisterForOnWriteableLocked(); } else { GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false); pending_continue_register_for_on_writeable_locked_ = true; @@ -216,7 +213,7 @@ class GrpcPolledFdWindows { void ContinueRegisterForOnWriteableLocked() { GRPC_CARES_TRACE_LOG( - "fd:|%s| InnerContinueRegisterForOnWriteableLocked " + "fd:|%s| ContinueRegisterForOnWriteableLocked " "wsa_connect_error_:%d", GetName(), wsa_connect_error_); GPR_ASSERT(connect_done_); @@ -261,7 +258,7 @@ class GrpcPolledFdWindows { return grpc_winsocket_wrapped_socket(winsocket_); } - const char* GetName() { return name_.c_str(); } + const char* GetName() const { return name_.c_str(); } ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data, ares_socket_t data_len, int flags, @@ -423,12 +420,8 @@ class GrpcPolledFdWindows { static void OnTcpConnect(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* grpc_polled_fd = static_cast<GrpcPolledFdWindows*>(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - grpc_polled_fd->work_serializer_->Run( - [grpc_polled_fd, error]() { - grpc_polled_fd->OnTcpConnectLocked(error); - }, - DEBUG_LOCATION); + MutexLock lock(grpc_polled_fd->mu_); + grpc_polled_fd->OnTcpConnectLocked(error); } void OnTcpConnectLocked(grpc_error_handle error) { @@ -465,14 +458,11 @@ class GrpcPolledFdWindows { wsa_connect_error_ = WSA_OPERATION_ABORTED; } if (pending_continue_register_for_on_readable_locked_) { - work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, - DEBUG_LOCATION); + ContinueRegisterForOnReadableLocked(); } if (pending_continue_register_for_on_writeable_locked_) { - work_serializer_->Run( - [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); + ContinueRegisterForOnWriteableLocked(); } - GRPC_ERROR_UNREF(error); } int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, @@ -578,10 +568,9 @@ class GrpcPolledFdWindows { static void OnIocpReadable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg); - GRPC_ERROR_REF(error); // ref owned by lambda - polled_fd->work_serializer_->Run( - [polled_fd, error]() { polled_fd->OnIocpReadableLocked(error); }, - DEBUG_LOCATION); + (void)GRPC_ERROR_REF(error); + MutexLock lock(polled_fd->mu_); + polled_fd->OnIocpReadableLocked(error); } // TODO(apolcyn): improve this error handling to be less conversative. @@ -623,10 +612,9 @@ class GrpcPolledFdWindows { static void OnIocpWriteable(void* arg, grpc_error_handle error) { GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg); - GRPC_ERROR_REF(error); // error owned by lambda - polled_fd->work_serializer_->Run( - [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); }, - DEBUG_LOCATION); + (void)GRPC_ERROR_REF(error); + MutexLock lock(polled_fd->mu_); + polled_fd->OnIocpWriteableLocked(error); } void OnIocpWriteableLocked(grpc_error_handle error) { @@ -661,7 +649,7 @@ class GrpcPolledFdWindows { void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; } private: - std::shared_ptr<WorkSerializer> work_serializer_; + Mutex* mu_; char recv_from_source_addr_[200]; ares_socklen_t recv_from_source_addr_len_; grpc_slice read_buf_; @@ -674,7 +662,7 @@ class GrpcPolledFdWindows { grpc_winsocket* winsocket_; // tcp_write_state_ is only used on TCP GrpcPolledFds WriteState tcp_write_state_; - std::string name_; + const std::string name_; bool gotten_into_driver_list_; int address_family_; int socket_type_; @@ -703,8 +691,7 @@ struct SockToPolledFdEntry { * with a GrpcPolledFdWindows factory and event driver */ class SockToPolledFdMap { public: - explicit SockToPolledFdMap(std::shared_ptr<WorkSerializer> work_serializer) - : work_serializer_(std::move(work_serializer)) {} + explicit SockToPolledFdMap(Mutex* mu) : mu_(mu) {} ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); } @@ -762,7 +749,7 @@ class SockToPolledFdMap { } grpc_tcp_set_non_block(s); GrpcPolledFdWindows* polled_fd = - new GrpcPolledFdWindows(s, map->work_serializer_, af, type); + new GrpcPolledFdWindows(s, map->mu_, af, type); GRPC_CARES_TRACE_LOG( "fd:|%s| created with params af:%d type:%d protocol:%d", polled_fd->GetName(), af, type, protocol); @@ -817,8 +804,8 @@ class SockToPolledFdMap { } private: + Mutex* mu_; SockToPolledFdEntry* head_ = nullptr; - std::shared_ptr<WorkSerializer> work_serializer_; }; const struct ares_socket_functions custom_ares_sock_funcs = { @@ -859,21 +846,18 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd { return wrapped_->GetWrappedAresSocketLocked(); } - const char* GetName() override { return wrapped_->GetName(); } + const char* GetName() const override { return wrapped_->GetName(); } private: - GrpcPolledFdWindows* wrapped_; + GrpcPolledFdWindows* const wrapped_; }; class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { public: - explicit GrpcPolledFdFactoryWindows( - std::shared_ptr<WorkSerializer> work_serializer) - : sock_to_polled_fd_map_(std::move(work_serializer)) {} + explicit GrpcPolledFdFactoryWindows(Mutex* mu) : sock_to_polled_fd_map_(mu) {} GrpcPolledFd* NewGrpcPolledFdLocked( - ares_socket_t as, grpc_pollset_set* driver_pollset_set, - std::shared_ptr<WorkSerializer> work_serializer) override { + ares_socket_t as, grpc_pollset_set* driver_pollset_set) override { GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as); // Set a flag so that the virtual socket "close" method knows it // doesn't need to call ShutdownLocked, since now the driver will. @@ -890,10 +874,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { SockToPolledFdMap sock_to_polled_fd_map_; }; -std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory( - std::shared_ptr<WorkSerializer> work_serializer) { - return absl::make_unique<GrpcPolledFdFactoryWindows>( - std::move(work_serializer)); +std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Mutex* mu) { + return absl::make_unique<GrpcPolledFdFactoryWindows>(mu); } } // namespace grpc_core |