summaryrefslogtreecommitdiff
path: root/grpc/src/core/lib/surface/channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/lib/surface/channel.cc')
-rw-r--r--grpc/src/core/lib/surface/channel.cc175
1 files changed, 72 insertions, 103 deletions
diff --git a/grpc/src/core/lib/surface/channel.cc b/grpc/src/core/lib/surface/channel.cc
index d3552c87..4dab2827 100644
--- a/grpc/src/core/lib/surface/channel.cc
+++ b/grpc/src/core/lib/surface/channel.cc
@@ -31,21 +31,23 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
+#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/iomgr.h"
-#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/resource_quota/api.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
-#include "src/core/lib/surface/channel_init.h"
-#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/surface/channel_stack_type.h"
/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
* Avoids needing to take a metadata context lock for sending status
@@ -57,22 +59,20 @@
static void destroy_channel(void* arg, grpc_error_handle error);
grpc_channel* grpc_channel_create_with_builder(
- grpc_channel_stack_builder* builder,
+ grpc_core::ChannelStackBuilder* builder,
grpc_channel_stack_type channel_stack_type, grpc_error_handle* error) {
- char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
- grpc_channel_args* args = grpc_channel_args_copy(
- grpc_channel_stack_builder_get_channel_arguments(builder));
- grpc_resource_user* resource_user =
- grpc_channel_stack_builder_get_resource_user(builder);
+ std::string target(builder->target());
+ grpc_channel_args* args = grpc_channel_args_copy(builder->channel_args());
grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
} else {
GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
}
- grpc_error_handle builder_error = grpc_channel_stack_builder_finish(
- builder, sizeof(grpc_channel), 1, destroy_channel, nullptr,
- reinterpret_cast<void**>(&channel));
+ std::string name(builder->target());
+ grpc_error_handle builder_error =
+ builder->Build(sizeof(grpc_channel), 1, destroy_channel, nullptr,
+ reinterpret_cast<void**>(&channel));
if (builder_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_std_string(builder_error).c_str());
@@ -82,14 +82,15 @@ grpc_channel* grpc_channel_create_with_builder(
} else {
GRPC_ERROR_UNREF(builder_error);
}
- gpr_free(target);
grpc_channel_args_destroy(args);
return nullptr;
}
- channel->target = target;
- channel->resource_user = resource_user;
+ channel->target.Init(std::move(target));
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
channel->registration_table.Init();
+ channel->allocator.Init(grpc_core::ResourceQuotaFromChannelArgs(args)
+ ->memory_quota()
+ ->CreateMemoryOwner(name));
gpr_atm_no_barrier_store(
&channel->call_size_estimate,
@@ -180,13 +181,14 @@ void channelz_node_destroy(void* p) {
static_cast<grpc_core::channelz::ChannelNode*>(p);
node->Unref();
}
-int channelz_node_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); }
+int channelz_node_cmp(void* p1, void* p2) {
+ return grpc_core::QsortCompare(p1, p2);
+}
const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
-void CreateChannelzNode(grpc_channel_stack_builder* builder) {
- const grpc_channel_args* args =
- grpc_channel_stack_builder_get_channel_arguments(builder);
+void CreateChannelzNode(grpc_core::ChannelStackBuilder* builder) {
+ const grpc_channel_args* args = builder->channel_args();
// Check whether channelz is enabled.
const bool channelz_enabled = grpc_channel_args_find_bool(
args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT);
@@ -198,11 +200,10 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const bool is_internal_channel = grpc_channel_args_find_bool(
args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false);
// Create the channelz node.
- const char* target = grpc_channel_stack_builder_get_target(builder);
+ std::string target(builder->target());
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node =
grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>(
- target != nullptr ? target : "", channel_tracer_max_memory,
- is_internal_channel);
+ target.c_str(), channel_tracer_max_memory, is_internal_channel);
channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
@@ -214,18 +215,16 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
- grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
+ builder->SetChannelArgs(new_args);
grpc_channel_args_destroy(new_args);
}
} // namespace
-grpc_channel* grpc_channel_create(const char* target,
- const grpc_channel_args* input_args,
- grpc_channel_stack_type channel_stack_type,
- grpc_transport* optional_transport,
- grpc_resource_user* resource_user,
- grpc_error_handle* error) {
+grpc_channel* grpc_channel_create_internal(
+ const char* target, const grpc_channel_args* input_args,
+ grpc_channel_stack_type channel_stack_type,
+ grpc_transport* optional_transport, grpc_error_handle* error) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
@@ -243,7 +242,8 @@ grpc_channel* grpc_channel_create(const char* target,
// grpc_shutdown() when the channel is actually destroyed, thus
// ensuring that shutdown is deferred until that point.
grpc_init();
- grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
+ grpc_core::ChannelStackBuilderImpl builder(
+ grpc_channel_stack_type_string(channel_stack_type), channel_stack_type);
const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args);
grpc_channel_args* args =
@@ -255,26 +255,21 @@ grpc_channel* grpc_channel_create(const char* target,
args = channel_args_mutator(target, args, channel_stack_type);
}
}
- grpc_channel_stack_builder_set_channel_arguments(builder, args);
+ builder.SetChannelArgs(args).SetTarget(target).SetTransport(
+ optional_transport);
grpc_channel_args_destroy(args);
- grpc_channel_stack_builder_set_target(builder, target);
- grpc_channel_stack_builder_set_transport(builder, optional_transport);
- grpc_channel_stack_builder_set_resource_user(builder, resource_user);
- if (!grpc_channel_init_create_stack(builder, channel_stack_type)) {
- grpc_channel_stack_builder_destroy(builder);
- if (resource_user != nullptr) {
- grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
- }
+ if (!grpc_core::CoreConfiguration::Get().channel_init().CreateStack(
+ &builder)) {
grpc_shutdown(); // Since we won't call destroy_channel().
return nullptr;
}
// We only need to do this for clients here. For servers, this will be
// done in src/core/lib/surface/server.cc.
if (grpc_channel_stack_type_is_client(channel_stack_type)) {
- CreateChannelzNode(builder);
+ CreateChannelzNode(&builder);
}
grpc_channel* channel =
- grpc_channel_create_with_builder(builder, channel_stack_type, error);
+ grpc_channel_create_with_builder(&builder, channel_stack_type, error);
if (channel == nullptr) {
grpc_shutdown(); // Since we won't call destroy_channel().
}
@@ -311,14 +306,14 @@ void grpc_channel_update_call_size_estimate(grpc_channel* channel,
/* size shrank: decrease estimate */
gpr_atm_no_barrier_cas(
&channel->call_size_estimate, static_cast<gpr_atm>(cur),
- static_cast<gpr_atm>(GPR_MIN(cur - 1, (255 * cur + size) / 256)));
+ static_cast<gpr_atm>(std::min(cur - 1, (255 * cur + size) / 256)));
/* if we lose: never mind, something else will likely update soon enough */
}
}
char* grpc_channel_get_target(grpc_channel* channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
- return gpr_strdup(channel->target);
+ return gpr_strdup(channel->target->c_str());
}
void grpc_channel_get_info(grpc_channel* channel,
@@ -345,19 +340,11 @@ void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
- grpc_mdelem path_mdelem, grpc_mdelem authority_mdelem,
- grpc_millis deadline) {
- grpc_mdelem send_metadata[2];
- size_t num_metadata = 0;
-
+ grpc_core::Slice path, absl::optional<grpc_core::Slice> authority,
+ grpc_core::Timestamp deadline) {
GPR_ASSERT(channel->is_client);
GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr));
- send_metadata[num_metadata++] = path_mdelem;
- if (!GRPC_MDISNULL(authority_mdelem)) {
- send_metadata[num_metadata++] = authority_mdelem;
- }
-
grpc_call_create_args args;
args.channel = channel;
args.server = nullptr;
@@ -366,8 +353,8 @@ static grpc_call* grpc_channel_create_call_internal(
args.cq = cq;
args.pollset_set_alternative = pollset_set_alternative;
args.server_transport_data = nullptr;
- args.add_initial_metadata = send_metadata;
- args.add_initial_metadata_count = num_metadata;
+ args.path = std::move(path);
+ args.authority = std::move(authority);
args.send_deadline = deadline;
grpc_call* call;
@@ -386,10 +373,11 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel,
grpc_core::ExecCtx exec_ctx;
grpc_call* call = grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue, nullptr,
- grpc_mdelem_create(GRPC_MDSTR_PATH, method, nullptr),
- host != nullptr ? grpc_mdelem_create(GRPC_MDSTR_AUTHORITY, *host, nullptr)
- : GRPC_MDNULL,
- grpc_timespec_to_millis_round_up(deadline));
+ grpc_core::Slice(grpc_slice_ref_internal(method)),
+ host != nullptr
+ ? absl::optional<grpc_core::Slice>(grpc_slice_ref_internal(*host))
+ : absl::nullopt,
+ grpc_core::Timestamp::FromTimespecRoundUp(deadline));
return call;
}
@@ -397,54 +385,35 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel,
grpc_call* grpc_channel_create_pollset_set_call(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_pollset_set* pollset_set, const grpc_slice& method,
- const grpc_slice* host, grpc_millis deadline, void* reserved) {
+ const grpc_slice* host, grpc_core::Timestamp deadline, void* reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, nullptr, pollset_set,
- grpc_mdelem_create(GRPC_MDSTR_PATH, method, nullptr),
- host != nullptr ? grpc_mdelem_create(GRPC_MDSTR_AUTHORITY, *host, nullptr)
- : GRPC_MDNULL,
+ grpc_core::Slice(method),
+ host != nullptr
+ ? absl::optional<grpc_core::Slice>(grpc_slice_ref_internal(*host))
+ : absl::nullopt,
deadline);
}
namespace grpc_core {
-RegisteredCall::RegisteredCall(const char* method_arg, const char* host_arg)
- : method(method_arg != nullptr ? method_arg : ""),
- host(host_arg != nullptr ? host_arg : ""),
- path(grpc_mdelem_from_slices(
- GRPC_MDSTR_PATH, grpc_core::ExternallyManagedSlice(method.c_str()))),
- authority(!host.empty()
- ? grpc_mdelem_from_slices(
- GRPC_MDSTR_AUTHORITY,
- grpc_core::ExternallyManagedSlice(host.c_str()))
- : GRPC_MDNULL) {}
-
-// TODO(vjpai): Delete copy-constructor when allowed by all supported compilers.
-RegisteredCall::RegisteredCall(const RegisteredCall& other)
- : RegisteredCall(other.method.c_str(), other.host.c_str()) {}
-
-RegisteredCall::RegisteredCall(RegisteredCall&& other) noexcept
- : method(std::move(other.method)),
- host(std::move(other.host)),
- path(grpc_mdelem_from_slices(
- GRPC_MDSTR_PATH, grpc_core::ExternallyManagedSlice(method.c_str()))),
- authority(!host.empty()
- ? grpc_mdelem_from_slices(
- GRPC_MDSTR_AUTHORITY,
- grpc_core::ExternallyManagedSlice(host.c_str()))
- : GRPC_MDNULL) {
- GRPC_MDELEM_UNREF(other.path);
- GRPC_MDELEM_UNREF(other.authority);
- other.path = GRPC_MDNULL;
- other.authority = GRPC_MDNULL;
+RegisteredCall::RegisteredCall(const char* method_arg, const char* host_arg) {
+ path = Slice::FromCopiedString(method_arg);
+ if (host_arg != nullptr && host_arg[0] != 0) {
+ authority = Slice::FromCopiedString(host_arg);
+ }
}
-RegisteredCall::~RegisteredCall() {
- GRPC_MDELEM_UNREF(path);
- GRPC_MDELEM_UNREF(authority);
+RegisteredCall::RegisteredCall(const RegisteredCall& other)
+ : path(other.path.Ref()) {
+ if (other.authority.has_value()) {
+ authority = other.authority->Ref();
+ }
}
+RegisteredCall::~RegisteredCall() {}
+
} // namespace grpc_core
void* grpc_channel_register_call(grpc_channel* channel, const char* method,
@@ -491,8 +460,11 @@ grpc_call* grpc_channel_create_registered_call(
grpc_core::ExecCtx exec_ctx;
grpc_call* call = grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue, nullptr,
- GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
- grpc_timespec_to_millis_round_up(deadline));
+ rc->path.Ref(),
+ rc->authority.has_value()
+ ? absl::optional<grpc_core::Slice>(rc->authority->Ref())
+ : absl::nullopt,
+ grpc_core::Timestamp::FromTimespecRoundUp(deadline));
return call;
}
@@ -507,13 +479,10 @@ static void destroy_channel(void* arg, grpc_error_handle /*error*/) {
}
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
channel->registration_table.Destroy();
- if (channel->resource_user != nullptr) {
- grpc_resource_user_free(channel->resource_user,
- GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
- }
- gpr_free(channel->target);
+ channel->allocator.Destroy();
+ channel->target.Destroy();
gpr_free(channel);
- // See comment in grpc_channel_create() for why we do this.
+ // See comment in grpc_channel_create_internal() for why we do this.
grpc_shutdown();
}