diff options
Diffstat (limited to 'grpc/src/core/lib/surface/channel.cc')
-rw-r--r-- | grpc/src/core/lib/surface/channel.cc | 175 |
1 files changed, 72 insertions, 103 deletions
diff --git a/grpc/src/core/lib/surface/channel.cc b/grpc/src/core/lib/surface/channel.cc index d3552c87..4dab2827 100644 --- a/grpc/src/core/lib/surface/channel.cc +++ b/grpc/src/core/lib/surface/channel.cc @@ -31,21 +31,23 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz_registry.h" +#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/resource_quota/api.h" +#include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" -#include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/surface/channel_stack_type.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -57,22 +59,20 @@ static void destroy_channel(void* arg, grpc_error_handle error); grpc_channel* grpc_channel_create_with_builder( - grpc_channel_stack_builder* builder, + grpc_core::ChannelStackBuilder* builder, grpc_channel_stack_type channel_stack_type, grpc_error_handle* error) { - char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); - grpc_channel_args* args = grpc_channel_args_copy( - grpc_channel_stack_builder_get_channel_arguments(builder)); - grpc_resource_user* resource_user = - grpc_channel_stack_builder_get_resource_user(builder); + std::string target(builder->target()); + grpc_channel_args* args = grpc_channel_args_copy(builder->channel_args()); grpc_channel* channel; if (channel_stack_type == GRPC_SERVER_CHANNEL) { GRPC_STATS_INC_SERVER_CHANNELS_CREATED(); } else { GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(); } - grpc_error_handle builder_error = grpc_channel_stack_builder_finish( - builder, sizeof(grpc_channel), 1, destroy_channel, nullptr, - reinterpret_cast<void**>(&channel)); + std::string name(builder->target()); + grpc_error_handle builder_error = + builder->Build(sizeof(grpc_channel), 1, destroy_channel, nullptr, + reinterpret_cast<void**>(&channel)); if (builder_error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "channel stack builder failed: %s", grpc_error_std_string(builder_error).c_str()); @@ -82,14 +82,15 @@ grpc_channel* grpc_channel_create_with_builder( } else { GRPC_ERROR_UNREF(builder_error); } - gpr_free(target); grpc_channel_args_destroy(args); return nullptr; } - channel->target = target; - channel->resource_user = resource_user; + channel->target.Init(std::move(target)); channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); channel->registration_table.Init(); + channel->allocator.Init(grpc_core::ResourceQuotaFromChannelArgs(args) + ->memory_quota() + ->CreateMemoryOwner(name)); gpr_atm_no_barrier_store( &channel->call_size_estimate, @@ -180,13 +181,14 @@ void channelz_node_destroy(void* p) { static_cast<grpc_core::channelz::ChannelNode*>(p); node->Unref(); } -int channelz_node_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); } +int channelz_node_cmp(void* p1, void* p2) { + return grpc_core::QsortCompare(p1, p2); +} const grpc_arg_pointer_vtable channelz_node_arg_vtable = { channelz_node_copy, channelz_node_destroy, channelz_node_cmp}; -void CreateChannelzNode(grpc_channel_stack_builder* builder) { - const grpc_channel_args* args = - grpc_channel_stack_builder_get_channel_arguments(builder); +void CreateChannelzNode(grpc_core::ChannelStackBuilder* builder) { + const grpc_channel_args* args = builder->channel_args(); // Check whether channelz is enabled. const bool channelz_enabled = grpc_channel_args_find_bool( args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT); @@ -198,11 +200,10 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) { const bool is_internal_channel = grpc_channel_args_find_bool( args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false); // Create the channelz node. - const char* target = grpc_channel_stack_builder_get_target(builder); + std::string target(builder->target()); grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node = grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>( - target != nullptr ? target : "", channel_tracer_max_memory, - is_internal_channel); + target.c_str(), channel_tracer_max_memory, is_internal_channel); channelz_node->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); @@ -214,18 +215,16 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) { const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL}; grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - grpc_channel_stack_builder_set_channel_arguments(builder, new_args); + builder->SetChannelArgs(new_args); grpc_channel_args_destroy(new_args); } } // namespace -grpc_channel* grpc_channel_create(const char* target, - const grpc_channel_args* input_args, - grpc_channel_stack_type channel_stack_type, - grpc_transport* optional_transport, - grpc_resource_user* resource_user, - grpc_error_handle* error) { +grpc_channel* grpc_channel_create_internal( + const char* target, const grpc_channel_args* input_args, + grpc_channel_stack_type channel_stack_type, + grpc_transport* optional_transport, grpc_error_handle* error) { // We need to make sure that grpc_shutdown() does not shut things down // until after the channel is destroyed. However, the channel may not // actually be destroyed by the time grpc_channel_destroy() returns, @@ -243,7 +242,8 @@ grpc_channel* grpc_channel_create(const char* target, // grpc_shutdown() when the channel is actually destroyed, thus // ensuring that shutdown is deferred until that point. grpc_init(); - grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); + grpc_core::ChannelStackBuilderImpl builder( + grpc_channel_stack_type_string(channel_stack_type), channel_stack_type); const grpc_core::UniquePtr<char> default_authority = get_default_authority(input_args); grpc_channel_args* args = @@ -255,26 +255,21 @@ grpc_channel* grpc_channel_create(const char* target, args = channel_args_mutator(target, args, channel_stack_type); } } - grpc_channel_stack_builder_set_channel_arguments(builder, args); + builder.SetChannelArgs(args).SetTarget(target).SetTransport( + optional_transport); grpc_channel_args_destroy(args); - grpc_channel_stack_builder_set_target(builder, target); - grpc_channel_stack_builder_set_transport(builder, optional_transport); - grpc_channel_stack_builder_set_resource_user(builder, resource_user); - if (!grpc_channel_init_create_stack(builder, channel_stack_type)) { - grpc_channel_stack_builder_destroy(builder); - if (resource_user != nullptr) { - grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); - } + if (!grpc_core::CoreConfiguration::Get().channel_init().CreateStack( + &builder)) { grpc_shutdown(); // Since we won't call destroy_channel(). return nullptr; } // We only need to do this for clients here. For servers, this will be // done in src/core/lib/surface/server.cc. if (grpc_channel_stack_type_is_client(channel_stack_type)) { - CreateChannelzNode(builder); + CreateChannelzNode(&builder); } grpc_channel* channel = - grpc_channel_create_with_builder(builder, channel_stack_type, error); + grpc_channel_create_with_builder(&builder, channel_stack_type, error); if (channel == nullptr) { grpc_shutdown(); // Since we won't call destroy_channel(). } @@ -311,14 +306,14 @@ void grpc_channel_update_call_size_estimate(grpc_channel* channel, /* size shrank: decrease estimate */ gpr_atm_no_barrier_cas( &channel->call_size_estimate, static_cast<gpr_atm>(cur), - static_cast<gpr_atm>(GPR_MIN(cur - 1, (255 * cur + size) / 256))); + static_cast<gpr_atm>(std::min(cur - 1, (255 * cur + size) / 256))); /* if we lose: never mind, something else will likely update soon enough */ } } char* grpc_channel_get_target(grpc_channel* channel) { GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel)); - return gpr_strdup(channel->target); + return gpr_strdup(channel->target->c_str()); } void grpc_channel_get_info(grpc_channel* channel, @@ -345,19 +340,11 @@ void grpc_channel_reset_connect_backoff(grpc_channel* channel) { static grpc_call* grpc_channel_create_call_internal( grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask, grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative, - grpc_mdelem path_mdelem, grpc_mdelem authority_mdelem, - grpc_millis deadline) { - grpc_mdelem send_metadata[2]; - size_t num_metadata = 0; - + grpc_core::Slice path, absl::optional<grpc_core::Slice> authority, + grpc_core::Timestamp deadline) { GPR_ASSERT(channel->is_client); GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr)); - send_metadata[num_metadata++] = path_mdelem; - if (!GRPC_MDISNULL(authority_mdelem)) { - send_metadata[num_metadata++] = authority_mdelem; - } - grpc_call_create_args args; args.channel = channel; args.server = nullptr; @@ -366,8 +353,8 @@ static grpc_call* grpc_channel_create_call_internal( args.cq = cq; args.pollset_set_alternative = pollset_set_alternative; args.server_transport_data = nullptr; - args.add_initial_metadata = send_metadata; - args.add_initial_metadata_count = num_metadata; + args.path = std::move(path); + args.authority = std::move(authority); args.send_deadline = deadline; grpc_call* call; @@ -386,10 +373,11 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel, grpc_core::ExecCtx exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, nullptr, - grpc_mdelem_create(GRPC_MDSTR_PATH, method, nullptr), - host != nullptr ? grpc_mdelem_create(GRPC_MDSTR_AUTHORITY, *host, nullptr) - : GRPC_MDNULL, - grpc_timespec_to_millis_round_up(deadline)); + grpc_core::Slice(grpc_slice_ref_internal(method)), + host != nullptr + ? absl::optional<grpc_core::Slice>(grpc_slice_ref_internal(*host)) + : absl::nullopt, + grpc_core::Timestamp::FromTimespecRoundUp(deadline)); return call; } @@ -397,54 +385,35 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel, grpc_call* grpc_channel_create_pollset_set_call( grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask, grpc_pollset_set* pollset_set, const grpc_slice& method, - const grpc_slice* host, grpc_millis deadline, void* reserved) { + const grpc_slice* host, grpc_core::Timestamp deadline, void* reserved) { GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, nullptr, pollset_set, - grpc_mdelem_create(GRPC_MDSTR_PATH, method, nullptr), - host != nullptr ? grpc_mdelem_create(GRPC_MDSTR_AUTHORITY, *host, nullptr) - : GRPC_MDNULL, + grpc_core::Slice(method), + host != nullptr + ? absl::optional<grpc_core::Slice>(grpc_slice_ref_internal(*host)) + : absl::nullopt, deadline); } namespace grpc_core { -RegisteredCall::RegisteredCall(const char* method_arg, const char* host_arg) - : method(method_arg != nullptr ? method_arg : ""), - host(host_arg != nullptr ? host_arg : ""), - path(grpc_mdelem_from_slices( - GRPC_MDSTR_PATH, grpc_core::ExternallyManagedSlice(method.c_str()))), - authority(!host.empty() - ? grpc_mdelem_from_slices( - GRPC_MDSTR_AUTHORITY, - grpc_core::ExternallyManagedSlice(host.c_str())) - : GRPC_MDNULL) {} - -// TODO(vjpai): Delete copy-constructor when allowed by all supported compilers. -RegisteredCall::RegisteredCall(const RegisteredCall& other) - : RegisteredCall(other.method.c_str(), other.host.c_str()) {} - -RegisteredCall::RegisteredCall(RegisteredCall&& other) noexcept - : method(std::move(other.method)), - host(std::move(other.host)), - path(grpc_mdelem_from_slices( - GRPC_MDSTR_PATH, grpc_core::ExternallyManagedSlice(method.c_str()))), - authority(!host.empty() - ? grpc_mdelem_from_slices( - GRPC_MDSTR_AUTHORITY, - grpc_core::ExternallyManagedSlice(host.c_str())) - : GRPC_MDNULL) { - GRPC_MDELEM_UNREF(other.path); - GRPC_MDELEM_UNREF(other.authority); - other.path = GRPC_MDNULL; - other.authority = GRPC_MDNULL; +RegisteredCall::RegisteredCall(const char* method_arg, const char* host_arg) { + path = Slice::FromCopiedString(method_arg); + if (host_arg != nullptr && host_arg[0] != 0) { + authority = Slice::FromCopiedString(host_arg); + } } -RegisteredCall::~RegisteredCall() { - GRPC_MDELEM_UNREF(path); - GRPC_MDELEM_UNREF(authority); +RegisteredCall::RegisteredCall(const RegisteredCall& other) + : path(other.path.Ref()) { + if (other.authority.has_value()) { + authority = other.authority->Ref(); + } } +RegisteredCall::~RegisteredCall() {} + } // namespace grpc_core void* grpc_channel_register_call(grpc_channel* channel, const char* method, @@ -491,8 +460,11 @@ grpc_call* grpc_channel_create_registered_call( grpc_core::ExecCtx exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, nullptr, - GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), - grpc_timespec_to_millis_round_up(deadline)); + rc->path.Ref(), + rc->authority.has_value() + ? absl::optional<grpc_core::Slice>(rc->authority->Ref()) + : absl::nullopt, + grpc_core::Timestamp::FromTimespecRoundUp(deadline)); return call; } @@ -507,13 +479,10 @@ static void destroy_channel(void* arg, grpc_error_handle /*error*/) { } grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); channel->registration_table.Destroy(); - if (channel->resource_user != nullptr) { - grpc_resource_user_free(channel->resource_user, - GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); - } - gpr_free(channel->target); + channel->allocator.Destroy(); + channel->target.Destroy(); gpr_free(channel); - // See comment in grpc_channel_create() for why we do this. + // See comment in grpc_channel_create_internal() for why we do this. grpc_shutdown(); } |