diff options
Diffstat (limited to 'src/core/lib/surface/server.cc')
-rw-r--r-- | src/core/lib/surface/server.cc | 319 |
1 files changed, 137 insertions, 182 deletions
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index dd09cb91de..cf6883bd7e 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -45,10 +45,9 @@ typedef struct listener { void *arg; - void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, - grpc_pollset **pollsets, size_t pollset_count); - void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, - grpc_closure *closure); + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, + size_t pollset_count); + void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure); struct listener *next; grpc_closure destroy_done; } listener; @@ -231,13 +230,12 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, - grpc_error *error); -static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc, grpc_error *error); +static void publish_new_rpc(void *calld, grpc_error *error); +static void fail_call(grpc_server *server, size_t cq_idx, requested_call *rc, + grpc_error *error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ -static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); +static void maybe_finish_shutdown(grpc_server *server); /* * channel broadcaster @@ -265,15 +263,14 @@ struct shutdown_cleanup_args { grpc_slice slice; }; -static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void shutdown_cleanup(void *arg, grpc_error *error) { struct shutdown_cleanup_args *a = (struct shutdown_cleanup_args *)arg; - grpc_slice_unref_internal(exec_ctx, a->slice); + grpc_slice_unref_internal(a->slice); gpr_free(a); } -static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - bool send_goaway, grpc_error *send_disconnect) { +static void send_shutdown(grpc_channel *channel, bool send_goaway, + grpc_error *send_disconnect) { struct shutdown_cleanup_args *sc = (struct shutdown_cleanup_args *)gpr_malloc(sizeof(*sc)); GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc, @@ -291,19 +288,18 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, op->disconnect_with_error = send_disconnect; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(exec_ctx, elem, op); + elem->filter->start_transport_op(elem, op); } -static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, - channel_broadcaster *cb, +static void channel_broadcaster_shutdown(channel_broadcaster *cb, bool send_goaway, grpc_error *force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(exec_ctx, cb->channels[i], send_goaway, + send_shutdown(cb->channels[i], send_goaway, GRPC_ERROR_REF(force_disconnect)); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast"); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } gpr_free(cb->channels); GRPC_ERROR_UNREF(force_disconnect); @@ -332,13 +328,11 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_free(rm->requests_per_cq); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, - grpc_error *error) { +static void kill_zombie(void *elem, grpc_error *error) { grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem)); } -static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, - request_matcher *rm) { +static void request_matcher_zombify_all_pending_calls(request_matcher *rm) { while (rm->pending_head) { call_data *calld = rm->pending_head; rm->pending_head = calld->pending_next; @@ -349,20 +343,18 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); } } -static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, - grpc_server *server, +static void request_matcher_kill_requests(grpc_server *server, request_matcher *rm, grpc_error *error) { int request_id; for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, - &server->requested_calls_per_cq[i][request_id], + fail_call(server, i, &server->requested_calls_per_cq[i][request_id], GRPC_ERROR_REF(error)); } } @@ -377,10 +369,10 @@ static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } -static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { +static void server_delete(grpc_server *server) { registered_method *rm; size_t i; - grpc_channel_args_destroy(exec_ctx, server->channel_args); + grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_cv_destroy(&server->starting_cv); @@ -397,7 +389,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { request_matcher_destroy(&server->unregistered_request_matcher); } for (i = 0; i < server->cq_count; i++) { - GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); if (server->started) { gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); gpr_free(server->requested_calls_per_cq[i]); @@ -411,9 +403,9 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { gpr_free(server); } -static void server_unref(grpc_exec_ctx *exec_ctx, grpc_server *server) { +static void server_unref(grpc_server *server) { if (gpr_unref(&server->internal_refcount)) { - server_delete(exec_ctx, server); + server_delete(server); } } @@ -427,21 +419,19 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - grpc_error *error) { +static void finish_destroy_channel(void *cd, grpc_error *error) { channel_data *chand = (channel_data *)cd; grpc_server *server = chand->server; - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); - server_unref(exec_ctx, server); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); + server_unref(server); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_error *error) { +static void destroy_channel(channel_data *chand, grpc_error *error) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - maybe_finish_shutdown(exec_ctx, chand->server); + maybe_finish_shutdown(chand->server); GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure, finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); @@ -454,14 +444,12 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_transport_op *op = grpc_make_transport_op(&chand->finish_destroy_channel_closure); op->set_accept_stream = true; - grpc_channel_next_op(exec_ctx, - grpc_channel_stack_element( + grpc_channel_next_op(grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), op); } -static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, - grpc_cq_completion *c) { +static void done_request_event(void *req, grpc_cq_completion *c) { requested_call *rc = (requested_call *)req; grpc_server *server = rc->server; @@ -476,12 +464,12 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, gpr_free(req); } - server_unref(exec_ctx, server); + server_unref(server); } -static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, size_t cq_idx, requested_call *rc) { - grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); +static void publish_call(grpc_server *server, call_data *calld, size_t cq_idx, + requested_call *rc) { + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); grpc_call *call = calld->call; *rc->call = call; calld->cq_new = server->cqs[cq_idx]; @@ -512,12 +500,11 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); channel_data *chand = (channel_data *)elem->channel_data; server_ref(chand->server); - grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, - done_request_event, rc, &rc->completion); + grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, + rc, &rc->completion); } -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void publish_new_rpc(void *arg, grpc_error *error) { grpc_call_element *call_elem = (grpc_call_element *)arg; call_data *calld = (call_data *)call_elem->call_data; channel_data *chand = (channel_data *)call_elem->channel_data; @@ -532,8 +519,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error)); return; } @@ -543,18 +529,18 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, if (request_id == -1) { continue; } else { - GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i); + GRPC_STATS_INC_SERVER_CQS_CHECKED(i); gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, + publish_call(server, calld, cq_idx, &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } /* no cq to take the request found: queue it on the slow list */ - GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx); + GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(); gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; @@ -570,8 +556,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, } static void finish_start_new_rpc( - grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem, - request_matcher *rm, + grpc_server *server, grpc_call_element *elem, request_matcher *rm, grpc_server_register_method_payload_handling payload_handling) { call_data *calld = (call_data *)elem->call_data; @@ -581,7 +566,7 @@ static void finish_start_new_rpc( gpr_mu_unlock(&calld->mu_state); GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -589,7 +574,7 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); + publish_new_rpc(elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; @@ -598,14 +583,13 @@ static void finish_start_new_rpc( op.data.recv_message.recv_message = &calld->payload; GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem, grpc_schedule_on_exec_ctx); - grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, - &calld->publish); + grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish); break; } } } -static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { +static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = (channel_data *)elem->channel_data; call_data *calld = (call_data *)elem->call_data; grpc_server *server = chand->server; @@ -630,8 +614,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -648,14 +631,12 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } } - finish_start_new_rpc(exec_ctx, server, elem, - &server->unregistered_request_matcher, + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher, GRPC_SRM_PAYLOAD_NONE); } @@ -668,9 +649,8 @@ static int num_listeners(grpc_server *server) { return n; } -static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server, - grpc_cq_completion *completion) { - server_unref(exec_ctx, (grpc_server *)server); +static void done_shutdown_event(void *server, grpc_cq_completion *completion) { + server_unref((grpc_server *)server); } static int num_channels(grpc_server *server) { @@ -683,34 +663,30 @@ static int num_channels(grpc_server *server) { return n; } -static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, - grpc_server *server, grpc_error *error) { +static void kill_pending_work_locked(grpc_server *server, grpc_error *error) { if (server->started) { - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher, + request_matcher_kill_requests(server, &server->unregistered_request_matcher, GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matcher); + &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->matcher, + request_matcher_kill_requests(server, &rm->matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher); + request_matcher_zombify_all_pending_calls(&rm->matcher); } } GRPC_ERROR_UNREF(error); } -static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_server *server) { +static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } kill_pending_work_locked( - exec_ctx, server, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -730,15 +706,13 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); - grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, GRPC_ERROR_NONE, - done_shutdown_event, server, + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, + GRPC_ERROR_NONE, done_shutdown_event, server, &server->shutdown_tags[i].completion); } } -static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - grpc_error *error) { +static void server_on_recv_initial_metadata(void *ptr, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)ptr; call_data *calld = (call_data *)elem->call_data; grpc_millis op_deadline; @@ -752,10 +726,10 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md)); calld->path_set = true; calld->host_set = true; - grpc_metadata_batch_remove(exec_ctx, calld->recv_initial_metadata, + grpc_metadata_batch_remove(calld->recv_initial_metadata, calld->recv_initial_metadata->idx.named.path); grpc_metadata_batch_remove( - exec_ctx, calld->recv_initial_metadata, + calld->recv_initial_metadata, calld->recv_initial_metadata->idx.named.authority); } else { GRPC_ERROR_REF(error); @@ -773,7 +747,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_UNREF(src_error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); } static void server_mutate_op(grpc_call_element *elem, @@ -794,18 +768,16 @@ static void server_mutate_op(grpc_call_element *elem, } static void server_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_call_element *elem, grpc_transport_stream_op_batch *op) { server_mutate_op(elem, op); - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(elem, op); } -static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - grpc_error *error) { +static void got_initial_metadata(void *ptr, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)ptr; call_data *calld = (call_data *)elem->call_data; if (error == GRPC_ERROR_NONE) { - start_new_rpc(exec_ctx, elem); + start_new_rpc(elem); } else { gpr_mu_lock(&calld->mu_state); if (calld->state == NOT_STARTED) { @@ -813,8 +785,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, gpr_mu_unlock(&calld->mu_state); GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -826,8 +797,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, } } -static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, - grpc_transport *transport, +static void accept_stream(void *cd, grpc_transport *transport, const void *transport_server_data) { channel_data *chand = (channel_data *)cd; /* create a call */ @@ -837,11 +807,11 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, args.server_transport_data = transport_server_data; args.send_deadline = GRPC_MILLIS_INF_FUTURE; grpc_call *call; - grpc_error *error = grpc_call_create(exec_ctx, &args, &call); + grpc_error *error = grpc_call_create(&args, &call); grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); if (error != GRPC_ERROR_NONE) { - got_initial_metadata(exec_ctx, elem, error); + got_initial_metadata(elem, error); GRPC_ERROR_UNREF(error); return; } @@ -853,32 +823,28 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, &calld->initial_metadata; GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem, grpc_schedule_on_exec_ctx); - grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, - &calld->got_initial_metadata); + grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata); } -static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - grpc_error *error) { +static void channel_connectivity_changed(void *cd, grpc_error *error) { channel_data *chand = (channel_data *)cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op *op = grpc_make_transport_op(NULL); op->on_connectivity_state_change = &chand->channel_connectivity_changed, op->connectivity_state = &chand->connectivity_state; - grpc_channel_next_op(exec_ctx, - grpc_channel_stack_element( + grpc_channel_next_op(grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), op); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error)); + destroy_channel(chand, GRPC_ERROR_REF(error)); gpr_mu_unlock(&server->mu_global); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity"); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); } } -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, +static grpc_error *init_call_elem(grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; @@ -895,7 +861,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, +static void destroy_call_elem(grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *ignored) { channel_data *chand = (channel_data *)elem->channel_data; @@ -904,21 +870,20 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(calld->state != PENDING); if (calld->host_set) { - grpc_slice_unref_internal(exec_ctx, calld->host); + grpc_slice_unref_internal(calld->host); } if (calld->path_set) { - grpc_slice_unref_internal(exec_ctx, calld->path); + grpc_slice_unref_internal(calld->path); } grpc_metadata_array_destroy(&calld->initial_metadata); grpc_byte_buffer_destroy(calld->payload); gpr_mu_destroy(&calld->mu_state); - server_unref(exec_ctx, chand->server); + server_unref(chand->server); } -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, +static grpc_error *init_channel_elem(grpc_channel_element *elem, grpc_channel_element_args *args) { channel_data *chand = (channel_data *)elem->channel_data; GPR_ASSERT(args->is_first); @@ -934,15 +899,14 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem) { size_t i; channel_data *chand = (channel_data *)elem->channel_data; if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { - grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].method); + grpc_slice_unref_internal(chand->registered_methods[i].method); if (chand->registered_methods[i].has_host) { - grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].host); + grpc_slice_unref_internal(chand->registered_methods[i].host); } } gpr_free(chand->registered_methods); @@ -952,9 +916,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; - maybe_finish_shutdown(exec_ctx, chand->server); + maybe_finish_shutdown(chand->server); gpr_mu_unlock(&chand->server->mu_global); - server_unref(exec_ctx, chand->server); + server_unref(chand->server); } } @@ -1070,11 +1034,10 @@ void *grpc_server_register_method( return m; } -static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, - grpc_error *error) { +static void start_listeners(void *s, grpc_error *error) { grpc_server *server = (grpc_server *)s; for (listener *l = server->listeners; l; l = l->next) { - l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); + l->start(server, l->arg, server->pollsets, server->pollset_count); } gpr_mu_lock(&server->mu_global); @@ -1082,12 +1045,12 @@ static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, gpr_cv_signal(&server->starting_cv); gpr_mu_unlock(&server->mu_global); - server_unref(exec_ctx, server); + server_unref(server); } void grpc_server_start(grpc_server *server) { size_t i; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); @@ -1123,12 +1086,11 @@ void grpc_server_start(grpc_server *server) { server_ref(server); server->starting = true; GRPC_CLOSURE_SCHED( - &exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), GRPC_ERROR_NONE); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets, @@ -1137,8 +1099,7 @@ void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets, *pollsets = server->pollsets; } -void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, - grpc_transport *transport, +void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_pollset *accepting_pollset, const grpc_channel_args *args) { size_t num_registered_methods; @@ -1153,8 +1114,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op *op = NULL; - channel = - grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); + channel = grpc_channel_create(NULL, args, GRPC_SERVER_CHANNEL, transport); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -1231,21 +1191,19 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); } - grpc_transport_perform_op(exec_ctx, transport, op); + grpc_transport_perform_op(transport, op); } -void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage) { +void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) { (void)done_arg; gpr_free(storage); } -static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - grpc_error *error) { +static void listener_destroy_done(void *s, grpc_error *error) { grpc_server *server = (grpc_server *)s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; - maybe_finish_shutdown(exec_ctx, server); + maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); } @@ -1254,7 +1212,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, listener *l; shutdown_tag *sdt; channel_broadcaster broadcaster; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); @@ -1270,7 +1228,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op( - &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, + cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; @@ -1295,30 +1253,29 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); kill_pending_work_locked( - &exec_ctx, server, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); + server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); gpr_mu_unlock(&server->mu_call); - maybe_finish_shutdown(&exec_ctx, server); + maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server, grpc_schedule_on_exec_ctx); - l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); + l->destroy(server, l->arg, &l->destroy_done); } - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, true /* send_goaway */, + channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, GRPC_ERROR_NONE); done: - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster broadcaster; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server)); @@ -1327,14 +1284,14 @@ void grpc_server_cancel_all_calls(grpc_server *server) { gpr_mu_unlock(&server->mu_global); channel_broadcaster_shutdown( - &exec_ctx, &broadcaster, false /* send_goaway */, + &broadcaster, false /* send_goaway */, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } void grpc_server_destroy(grpc_server *server) { listener *l; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server)); @@ -1350,16 +1307,16 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_unlock(&server->mu_global); - server_unref(&exec_ctx, server); - grpc_exec_ctx_finish(&exec_ctx); + server_unref(server); + grpc_exec_ctx_finish(); } -void grpc_server_add_listener( - grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, - void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, - grpc_pollset **pollsets, size_t pollset_count), - void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, - grpc_closure *on_done)) { +void grpc_server_add_listener(grpc_server *server, void *arg, + void (*start)(grpc_server *server, void *arg, + grpc_pollset **pollsets, + size_t pollset_count), + void (*destroy)(grpc_server *server, void *arg, + grpc_closure *on_done)) { listener *l = (listener *)gpr_malloc(sizeof(listener)); l->arg = arg; l->start = start; @@ -1368,21 +1325,20 @@ void grpc_server_add_listener( server->listeners = l; } -static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, - grpc_server *server, size_t cq_idx, +static grpc_call_error queue_call_request(grpc_server *server, size_t cq_idx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, cq_idx, rc, + fail_call(server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc, + fail_call(server, cq_idx, rc, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); @@ -1414,13 +1370,12 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, + publish_call(server, calld, cq_idx, &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); @@ -1436,9 +1391,9 @@ grpc_call_error grpc_server_request_call( grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { grpc_call_error error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc)); - GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " @@ -1470,9 +1425,9 @@ grpc_call_error grpc_server_request_call( rc->call = call; rc->data.batch.details = details; rc->initial_metadata = initial_metadata; - error = queue_call_request(&exec_ctx, server, cq_idx, rc); + error = queue_call_request(server, cq_idx, rc); done: - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); return error; } @@ -1482,10 +1437,10 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { grpc_call_error error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc)); registered_method *rm = (registered_method *)rmp; - GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1526,21 +1481,21 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(&exec_ctx, server, cq_idx, rc); + error = queue_call_request(server, cq_idx, rc); done: - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); return error; } -static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc, grpc_error *error) { +static void fail_call(grpc_server *server, size_t cq_idx, requested_call *rc, + grpc_error *error) { *rc->call = NULL; rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); server_ref(server); - grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, - done_request_event, rc, &rc->completion); + grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc, + &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |