diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:05:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:05:05 -0800 |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /test/core/end2end | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) | |
download | grpc-grpc-ad4d2dde0052efbbf49d64b0843c45f0381cfeb3.tar.gz |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/core/end2end')
31 files changed, 564 insertions, 457 deletions
diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 93809ac37a..0fdb637ead 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -91,22 +91,22 @@ static grpc_closure on_write; static void* tag(intptr_t t) { return (void*)t; } -static void done_write(void* arg, grpc_error* error) { +static void done_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); gpr_atm_rel_store(&state.done_atm, 1); } -static void handle_write() { +static void handle_write(grpc_exec_ctx* exec_ctx) { grpc_slice slice = grpc_slice_from_copied_buffer( state.response_payload, state.response_payload_length); grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write); + grpc_endpoint_write(exec_ctx, state.tcp, &state.outgoing_buffer, &on_write); } -static void handle_read(void* arg, grpc_error* error) { +static void handle_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); state.incoming_data_length += state.temp_incoming_buffer.length; @@ -123,13 +123,14 @@ static void handle_read(void* arg, grpc_error* error) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD); if (state.incoming_data_length >= SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { - handle_write(); + handle_write(exec_ctx); } else { - grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + &on_read); } } -static void on_connect(void* arg, grpc_endpoint* tcp, +static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { gpr_free(acceptor); @@ -140,8 +141,8 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_slice_buffer_init(&state.outgoing_buffer); state.tcp = tcp; state.incoming_data_length = 0; - grpc_endpoint_add_to_pollset(tcp, server->pollset); - grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); + grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); } static gpr_timespec n_sec_deadline(int seconds) { @@ -216,10 +217,10 @@ static void start_rpc(int target_port, grpc_status_code expected_status, cq_verifier_destroy(cqv); } -static void cleanup_rpc() { +static void cleanup_rpc(grpc_exec_ctx* exec_ctx) { grpc_event ev; - grpc_slice_buffer_destroy_internal(&state.temp_incoming_buffer); - grpc_slice_buffer_destroy_internal(&state.outgoing_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &state.temp_incoming_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &state.outgoing_buffer); grpc_call_unref(state.call); grpc_completion_queue_shutdown(state.cq); do { @@ -269,7 +270,7 @@ static void run_test(const char* response_payload, grpc_status_code expected_status, const char* expected_detail) { test_tcp_server test_server; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_event ev; grpc_init(); @@ -286,11 +287,11 @@ static void run_test(const char* response_payload, gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); /* clean up */ - grpc_endpoint_shutdown(state.tcp, + grpc_endpoint_shutdown(&exec_ctx, state.tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); - grpc_endpoint_destroy(state.tcp); - cleanup_rpc(); - grpc_core::ExecCtx::Get()->Flush(); + grpc_endpoint_destroy(&exec_ctx, state.tcp); + cleanup_rpc(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); test_tcp_server_destroy(&test_server); grpc_shutdown(); @@ -298,7 +299,6 @@ static void run_test(const char* response_payload, int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); /* status defined in hpack static table */ run_test(HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, GRPC_STATUS_CANCELLED, @@ -337,6 +337,5 @@ int main(int argc, char** argv) { run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE, HTTP1_DETAIL_MSG); - grpc_shutdown(); return 0; } diff --git a/test/core/end2end/connection_refused_test.cc b/test/core/end2end/connection_refused_test.cc index ca6d17e7c8..f3f2dda91d 100644 --- a/test/core/end2end/connection_refused_test.cc +++ b/test/core/end2end/connection_refused_test.cc @@ -133,8 +133,9 @@ static void run_test(bool wait_for_ready, bool use_service_config) { grpc_metadata_array_destroy(&trailing_metadata_recv); { - grpc_core::ExecCtx exec_ctx; - if (args != nullptr) grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (args != nullptr) grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } grpc_shutdown(); diff --git a/test/core/end2end/fixtures/h2_census.cc b/test/core/end2end/fixtures/h2_census.cc index 75c80aa1ff..fed8ead5c8 100644 --- a/test/core/end2end/fixtures/h2_census.cc +++ b/test/core/end2end/fixtures/h2_census.cc @@ -75,8 +75,9 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture* f, grpc_insecure_channel_create(ffd->localaddr, client_args, nullptr); GPR_ASSERT(f->client); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_exec_ctx_finish(&exec_ctx); } } @@ -91,8 +92,9 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, server_args = grpc_channel_args_copy_and_add(server_args, &arg, 1); f->server = grpc_server_create(server_args, nullptr); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } grpc_server_register_completion_queue(f->server, f->cq, nullptr); GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr)); diff --git a/test/core/end2end/fixtures/h2_compress.cc b/test/core/end2end/fixtures/h2_compress.cc index 5b9181586c..ea8990fd0a 100644 --- a/test/core/end2end/fixtures/h2_compress.cc +++ b/test/core/end2end/fixtures/h2_compress.cc @@ -66,8 +66,9 @@ void chttp2_init_client_fullstack_compression(grpc_end2end_test_fixture* f, fullstack_compression_fixture_data* ffd = static_cast<fullstack_compression_fixture_data*>(f->fixture_data); if (ffd->client_args_compression != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(ffd->client_args_compression); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, ffd->client_args_compression); + grpc_exec_ctx_finish(&exec_ctx); } ffd->client_args_compression = grpc_channel_args_set_compression_algorithm( client_args, GRPC_COMPRESS_GZIP); @@ -80,8 +81,9 @@ void chttp2_init_server_fullstack_compression(grpc_end2end_test_fixture* f, fullstack_compression_fixture_data* ffd = static_cast<fullstack_compression_fixture_data*>(f->fixture_data); if (ffd->server_args_compression != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(ffd->server_args_compression); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, ffd->server_args_compression); + grpc_exec_ctx_finish(&exec_ctx); } ffd->server_args_compression = grpc_channel_args_set_compression_algorithm( server_args, GRPC_COMPRESS_GZIP); @@ -95,13 +97,14 @@ void chttp2_init_server_fullstack_compression(grpc_end2end_test_fixture* f, } void chttp2_tear_down_fullstack_compression(grpc_end2end_test_fixture* f) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; fullstack_compression_fixture_data* ffd = static_cast<fullstack_compression_fixture_data*>(f->fixture_data); - grpc_channel_args_destroy(ffd->client_args_compression); - grpc_channel_args_destroy(ffd->server_args_compression); + grpc_channel_args_destroy(&exec_ctx, ffd->client_args_compression); + grpc_channel_args_destroy(&exec_ctx, ffd->server_args_compression); gpr_free(ffd->localaddr); gpr_free(ffd); + grpc_exec_ctx_finish(&exec_ctx); } /* All test configurations */ diff --git a/test/core/end2end/fixtures/h2_fd.cc b/test/core/end2end/fixtures/h2_fd.cc index 9157ab04d0..97f4b71bf0 100644 --- a/test/core/end2end/fixtures/h2_fd.cc +++ b/test/core/end2end/fixtures/h2_fd.cc @@ -68,18 +68,20 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* client_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; sp_fixture_data* sfd = static_cast<sp_fixture_data*>(f->fixture_data); GPR_ASSERT(!f->client); f->client = grpc_insecure_channel_create_from_fd( "fixture_client", sfd->fd_pair[0], client_args); GPR_ASSERT(f->client); + + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* server_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; sp_fixture_data* sfd = static_cast<sp_fixture_data*>(f->fixture_data); GPR_ASSERT(!f->server); f->server = grpc_server_create(server_args, nullptr); @@ -88,6 +90,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_server_start(f->server); grpc_server_add_insecure_channel_from_fd(f->server, nullptr, sfd->fd_pair[1]); + + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_full+workarounds.cc b/test/core/end2end/fixtures/h2_full+workarounds.cc index 237841d185..71a497d796 100644 --- a/test/core/end2end/fixtures/h2_full+workarounds.cc +++ b/test/core/end2end/fixtures/h2_full+workarounds.cc @@ -72,7 +72,7 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture* f, void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, grpc_channel_args* server_args) { int i; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; fullstack_fixture_data* ffd = static_cast<fullstack_fixture_data*>(f->fixture_data); grpc_arg args[GRPC_MAX_WORKAROUND_ID]; @@ -90,7 +90,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, grpc_server_register_completion_queue(f->server, f->cq, nullptr); GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); - grpc_channel_args_destroy(server_args_new); + grpc_channel_args_destroy(&exec_ctx, server_args_new); + grpc_exec_ctx_finish(&exec_ctx); } void chttp2_tear_down_fullstack(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_load_reporting.cc b/test/core/end2end/fixtures/h2_load_reporting.cc index fda5f4b052..7486b6af78 100644 --- a/test/core/end2end/fixtures/h2_load_reporting.cc +++ b/test/core/end2end/fixtures/h2_load_reporting.cc @@ -78,8 +78,9 @@ void chttp2_init_server_load_reporting(grpc_end2end_test_fixture* f, server_args = grpc_channel_args_copy_and_add(server_args, &arg, 1); f->server = grpc_server_create(server_args, nullptr); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } grpc_server_register_completion_queue(f->server, f->cq, nullptr); GPR_ASSERT(grpc_server_add_insecure_http2_port(f->server, ffd->localaddr)); diff --git a/test/core/end2end/fixtures/h2_oauth2.cc b/test/core/end2end/fixtures/h2_oauth2.cc index 5fed4434de..1642cb0db9 100644 --- a/test/core/end2end/fixtures/h2_oauth2.cc +++ b/test/core/end2end/fixtures/h2_oauth2.cc @@ -143,11 +143,11 @@ void chttp2_tear_down_secure_fullstack(grpc_end2end_test_fixture* f) { static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack( grpc_end2end_test_fixture* f, grpc_channel_args* client_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_channel_credentials* ssl_creds = grpc_ssl_credentials_create(test_root_cert, nullptr, nullptr); grpc_call_credentials* oauth2_creds = grpc_md_only_test_credentials_create( - "authorization", oauth2_md, true /* is_async */); + &exec_ctx, "authorization", oauth2_md, true /* is_async */); grpc_channel_credentials* ssl_oauth2_creds = grpc_composite_channel_credentials_create(ssl_creds, oauth2_creds, nullptr); @@ -158,9 +158,10 @@ static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack( grpc_channel_args* new_client_args = grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1); chttp2_init_client_secure_fullstack(f, new_client_args, ssl_oauth2_creds); - grpc_channel_args_destroy(new_client_args); + grpc_channel_args_destroy(&exec_ctx, new_client_args); grpc_channel_credentials_release(ssl_creds); grpc_call_credentials_release(oauth2_creds); + grpc_exec_ctx_finish(&exec_ctx); } static int fail_server_auth_check(grpc_channel_args* server_args) { diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 9807e929af..9319c401dc 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -50,11 +50,12 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_end2end_test_fixture* f = static_cast<grpc_end2end_test_fixture*>(ts); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); - grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); - grpc_server_setup_transport(f->server, transport, nullptr, + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); + grpc_server_setup_transport(&exec_ctx, f->server, transport, nullptr, grpc_server_get_channel_args(f->server)); + grpc_exec_ctx_finish(&exec_ctx); } typedef struct { @@ -62,11 +63,13 @@ typedef struct { grpc_channel_args* client_args; } sp_client_setup; -static void client_setup_transport(void* ts, grpc_transport* transport) { +static void client_setup_transport(grpc_exec_ctx* exec_ctx, void* ts, + grpc_transport* transport) { sp_client_setup* cs = static_cast<sp_client_setup*>(ts); - cs->f->client = grpc_channel_create("socketpair-target", cs->client_args, - GRPC_CLIENT_DIRECT_CHANNEL, transport); + cs->f->client = + grpc_channel_create(exec_ctx, "socketpair-target", cs->client_args, + GRPC_CLIENT_DIRECT_CHANNEL, transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -87,30 +90,34 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* client_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, true); - client_setup_transport(&cs, transport); + transport = + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); + client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* server_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; GPR_ASSERT(!f->server); f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, false); + transport = + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { @@ -126,6 +133,7 @@ static grpc_end2end_test_config configs[] = { int main(int argc, char** argv) { size_t i; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* force tracing on, with a value to force many code paths in trace.c to be taken */ @@ -139,6 +147,7 @@ int main(int argc, char** argv) { grpc_test_init(argc, argv); grpc_end2end_tests_pre_init(); grpc_init(); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(0 == grpc_tracer_set_enabled("also-doesnt-exist", 0)); GPR_ASSERT(1 == grpc_tracer_set_enabled("http", 1)); diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index b68279fd71..03566aada2 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -44,11 +44,12 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_end2end_test_fixture* f = static_cast<grpc_end2end_test_fixture*>(ts); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); - grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); - grpc_server_setup_transport(f->server, transport, nullptr, + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); + grpc_server_setup_transport(&exec_ctx, f->server, transport, nullptr, grpc_server_get_channel_args(f->server)); + grpc_exec_ctx_finish(&exec_ctx); } typedef struct { @@ -56,11 +57,13 @@ typedef struct { grpc_channel_args* client_args; } sp_client_setup; -static void client_setup_transport(void* ts, grpc_transport* transport) { +static void client_setup_transport(grpc_exec_ctx* exec_ctx, void* ts, + grpc_transport* transport) { sp_client_setup* cs = static_cast<sp_client_setup*>(ts); - cs->f->client = grpc_channel_create("socketpair-target", cs->client_args, - GRPC_CLIENT_DIRECT_CHANNEL, transport); + cs->f->client = + grpc_channel_create(exec_ctx, "socketpair-target", cs->client_args, + GRPC_CLIENT_DIRECT_CHANNEL, transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -81,30 +84,34 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* client_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, true); - client_setup_transport(&cs, transport); + transport = + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); + client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* server_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; GPR_ASSERT(!f->server); f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, false); + transport = + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 350be138ca..9adba00204 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -44,11 +44,12 @@ static void server_setup_transport(void* ts, grpc_transport* transport) { grpc_end2end_test_fixture* f = static_cast<grpc_end2end_test_fixture*>(ts); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); - grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq)); - grpc_server_setup_transport(f->server, transport, nullptr, + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); + grpc_server_setup_transport(&exec_ctx, f->server, transport, nullptr, grpc_server_get_channel_args(f->server)); + grpc_exec_ctx_finish(&exec_ctx); } typedef struct { @@ -56,11 +57,13 @@ typedef struct { grpc_channel_args* client_args; } sp_client_setup; -static void client_setup_transport(void* ts, grpc_transport* transport) { +static void client_setup_transport(grpc_exec_ctx* exec_ctx, void* ts, + grpc_transport* transport) { sp_client_setup* cs = static_cast<sp_client_setup*>(ts); - cs->f->client = grpc_channel_create("socketpair-target", cs->client_args, - GRPC_CLIENT_DIRECT_CHANNEL, transport); + cs->f->client = + grpc_channel_create(exec_ctx, "socketpair-target", cs->client_args, + GRPC_CLIENT_DIRECT_CHANNEL, transport); } static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( @@ -92,30 +95,34 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* client_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, true); - client_setup_transport(&cs, transport); + transport = + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); + client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_channel_args* server_args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data); grpc_transport* transport; GPR_ASSERT(!f->server); f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, false); + transport = + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_ssl.cc b/test/core/end2end/fixtures/h2_ssl.cc index 9a0680c40e..3d7e2e327e 100644 --- a/test/core/end2end/fixtures/h2_ssl.cc +++ b/test/core/end2end/fixtures/h2_ssl.cc @@ -110,8 +110,9 @@ static void chttp2_init_client_simple_ssl_secure_fullstack( grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1); chttp2_init_client_secure_fullstack(f, new_client_args, ssl_creds); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(new_client_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, new_client_args); + grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/test/core/end2end/fixtures/h2_ssl_proxy.cc b/test/core/end2end/fixtures/h2_ssl_proxy.cc index 5ddbdefc8c..f8d5a699e4 100644 --- a/test/core/end2end/fixtures/h2_ssl_proxy.cc +++ b/test/core/end2end/fixtures/h2_ssl_proxy.cc @@ -66,8 +66,9 @@ static grpc_channel* create_proxy_client(const char* target, grpc_secure_channel_create(ssl_creds, target, new_client_args, nullptr); grpc_channel_credentials_release(ssl_creds); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(new_client_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, new_client_args); + grpc_exec_ctx_finish(&exec_ctx); } return channel; } @@ -147,8 +148,9 @@ static void chttp2_init_client_simple_ssl_secure_fullstack( grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1); chttp2_init_client_secure_fullstack(f, new_client_args, ssl_creds); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(new_client_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, new_client_args); + grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 137f7c9fa3..3904887026 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -116,21 +116,24 @@ static void proxy_connection_ref(proxy_connection* conn, const char* reason) { } // Helper function to destroy the proxy connection. -static void proxy_connection_unref(proxy_connection* conn, const char* reason) { +static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, + proxy_connection* conn, const char* reason) { if (gpr_unref(&conn->refcount)) { gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint, conn->server_endpoint); - grpc_endpoint_destroy(conn->client_endpoint); + grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); if (conn->server_endpoint != nullptr) { - grpc_endpoint_destroy(conn->server_endpoint); + grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); } - grpc_pollset_set_destroy(conn->pollset_set); - grpc_slice_buffer_destroy_internal(&conn->client_read_buffer); - grpc_slice_buffer_destroy_internal(&conn->client_deferred_write_buffer); - grpc_slice_buffer_destroy_internal(&conn->client_write_buffer); - grpc_slice_buffer_destroy_internal(&conn->server_read_buffer); - grpc_slice_buffer_destroy_internal(&conn->server_deferred_write_buffer); - grpc_slice_buffer_destroy_internal(&conn->server_write_buffer); + grpc_pollset_set_destroy(exec_ctx, conn->pollset_set); + grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_read_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, + &conn->client_deferred_write_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_write_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &conn->server_read_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, + &conn->server_deferred_write_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &conn->server_write_buffer); grpc_http_parser_destroy(&conn->http_parser); grpc_http_request_destroy(&conn->http_request); gpr_unref(&conn->proxy->users); @@ -147,7 +150,8 @@ enum failure_type { }; // Helper function to shut down the proxy connection. -static void proxy_connection_failed(proxy_connection* conn, +static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, + proxy_connection* conn, failure_type failure, const char* prefix, grpc_error* error) { gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error)); @@ -171,25 +175,28 @@ static void proxy_connection_failed(proxy_connection* conn, } // If we decided to shut down either one and have not yet done so, do so. if (shutdown_client && !conn->client_shutdown) { - grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, + GRPC_ERROR_REF(error)); conn->client_shutdown = true; } if (shutdown_server && !conn->server_shutdown && (conn->server_endpoint != nullptr)) { - grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, + GRPC_ERROR_REF(error)); conn->server_shutdown = true; } // Unref the connection. - proxy_connection_unref(conn, "conn_failed"); + proxy_connection_unref(exec_ctx, conn, "conn_failed"); GRPC_ERROR_UNREF(error); } // Callback for writing proxy data to the client. -static void on_client_write_done(void* arg, grpc_error* error) { +static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, CLIENT_WRITE_FAILED, + proxy_connection_failed(exec_ctx, conn, CLIENT_WRITE_FAILED, "HTTP proxy client write", GRPC_ERROR_REF(error)); return; } @@ -201,20 +208,22 @@ static void on_client_write_done(void* arg, grpc_error* error) { grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); conn->client_is_writing = true; - grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, &conn->on_client_write_done); } else { // No more writes. Unref the connection. - proxy_connection_unref(conn, "write_done"); + proxy_connection_unref(exec_ctx, conn, "write_done"); } } // Callback for writing proxy data to the backend server. -static void on_server_write_done(void* arg, grpc_error* error) { +static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SERVER_WRITE_FAILED, + proxy_connection_failed(exec_ctx, conn, SERVER_WRITE_FAILED, "HTTP proxy server write", GRPC_ERROR_REF(error)); return; } @@ -226,21 +235,23 @@ static void on_server_write_done(void* arg, grpc_error* error) { grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); conn->server_is_writing = true; - grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, &conn->on_server_write_done); } else { // No more writes. Unref the connection. - proxy_connection_unref(conn, "server_write"); + proxy_connection_unref(exec_ctx, conn, "server_write"); } } // Callback for reading data from the client, which will be proxied to // the backend server. -static void on_client_read_done(void* arg, grpc_error* error) { +static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, CLIENT_READ_FAILED, + "HTTP proxy client read", GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., server_write_buffer is @@ -257,21 +268,23 @@ static void on_client_read_done(void* arg, grpc_error* error) { &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; - grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, + grpc_endpoint_write(exec_ctx, conn->server_endpoint, + &conn->server_write_buffer, &conn->on_server_write_done); } // Read more data. - grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, &conn->on_client_read_done); } // Callback for reading data from the backend server, which will be // proxied to the client. -static void on_server_read_done(void* arg, grpc_error* error) { +static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SERVER_READ_FAILED, + "HTTP proxy server read", GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., client_write_buffer is @@ -288,21 +301,23 @@ static void on_server_read_done(void* arg, grpc_error* error) { &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; - grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, &conn->on_client_write_done); } // Read more data. - grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, + grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, &conn->on_server_read_done); } // Callback to write the HTTP response for the CONNECT request. -static void on_write_response_done(void* arg, grpc_error* error) { +static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy write response", GRPC_ERROR_REF(error)); return; } // Clear write buffer. @@ -312,16 +327,17 @@ static void on_write_response_done(void* arg, grpc_error* error) { // for the other one. proxy_connection_ref(conn, "client_read"); proxy_connection_ref(conn, "server_read"); - proxy_connection_unref(conn, "write_response"); - grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, + proxy_connection_unref(exec_ctx, conn, "write_response"); + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, &conn->on_client_read_done); - grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, + grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, &conn->on_server_read_done); } // Callback to connect to the backend server specified by the HTTP // CONNECT request. -static void on_server_connect_done(void* arg, grpc_error* error) { +static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { // TODO(roth): Technically, in this case, we should handle the error @@ -329,8 +345,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) { // connection failed. However, for the purposes of this test code, // it's fine to pretend this is a client-side error, which will // cause the client connection to be dropped. - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy server connect", GRPC_ERROR_REF(error)); return; } // We've established a connection, so send back a 200 response code to @@ -340,7 +356,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; - grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, + grpc_endpoint_write(exec_ctx, conn->client_endpoint, + &conn->client_write_buffer, &conn->on_write_response_done); } @@ -349,7 +366,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) { * Basic <base64_encoded_expected_cred> * Returns true if it matches, false otherwise */ -static bool proxy_auth_header_matches(char* proxy_auth_header_val, +static bool proxy_auth_header_matches(grpc_exec_ctx* exec_ctx, + char* proxy_auth_header_val, char* expected_cred) { GPR_ASSERT(proxy_auth_header_val != nullptr); GPR_ASSERT(expected_cred != nullptr); @@ -357,10 +375,11 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val, return false; } proxy_auth_header_val += 6; - grpc_slice decoded_slice = grpc_base64_decode(proxy_auth_header_val, 0); + grpc_slice decoded_slice = + grpc_base64_decode(exec_ctx, proxy_auth_header_val, 0); const bool header_matches = grpc_slice_str_cmp(decoded_slice, expected_cred) == 0; - grpc_slice_unref_internal(decoded_slice); + grpc_slice_unref_internal(exec_ctx, decoded_slice); return header_matches; } @@ -370,13 +389,14 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val, // the client indicating that the request failed. However, for the purposes // of this test code, it's fine to pretend this is a client-side error, // which will cause the client connection to be dropped. -static void on_read_request_done(void* arg, grpc_error* error) { +static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); return; } // Read request and feed it to the parser. @@ -385,7 +405,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_http_parser_parse( &conn->http_parser, conn->client_read_buffer.slices[i], nullptr); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse", + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy request parse", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; @@ -395,8 +416,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { grpc_slice_buffer_reset_and_unref(&conn->client_read_buffer); // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { - grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done); + grpc_endpoint_read(exec_ctx, conn->client_endpoint, + &conn->client_read_buffer, &conn->on_read_request_done); return; } // Make sure we got a CONNECT request. @@ -406,8 +427,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { conn->http_request.method); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -419,15 +440,16 @@ static void on_read_request_done(void* arg, grpc_error* error) { for (size_t i = 0; i < conn->http_request.hdr_count; i++) { if (strcmp(conn->http_request.hdrs[i].key, "Proxy-Authorization") == 0) { client_authenticated = proxy_auth_header_matches( - conn->http_request.hdrs[i].value, proxy_auth_arg->value.string); + exec_ctx, conn->http_request.hdrs[i].value, + proxy_auth_arg->value.string); break; } } if (!client_authenticated) { const char* msg = "HTTP Connect could not verify authentication"; error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -437,8 +459,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup", - GRPC_ERROR_REF(error)); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy DNS lookup", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -446,15 +468,15 @@ static void on_read_request_done(void* arg, grpc_error* error) { // Connect to requested address. // The connection callback inherits our reference to conn. const grpc_millis deadline = - grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC; - grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint, - conn->pollset_set, nullptr, + grpc_exec_ctx_now(exec_ctx) + 10 * GPR_MS_PER_SEC; + grpc_tcp_client_connect(exec_ctx, &conn->on_server_connect_done, + &conn->server_endpoint, conn->pollset_set, nullptr, &resolved_addresses->addrs[0], deadline); grpc_resolved_addresses_destroy(resolved_addresses); } -static void on_accept(void* arg, grpc_endpoint* endpoint, - grpc_pollset* accepting_pollset, +static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, + grpc_endpoint* endpoint, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { gpr_free(acceptor); grpc_end2end_http_proxy* proxy = (grpc_end2end_http_proxy*)arg; @@ -465,8 +487,8 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, conn->proxy = proxy; gpr_ref_init(&conn->refcount, 1); conn->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset); - grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set); + grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); + grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set); GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, grpc_combiner_scheduler(conn->proxy->combiner)); GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, @@ -491,7 +513,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); - grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, + grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, &conn->on_read_request_done); } @@ -501,23 +523,24 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, static void thread_main(void* arg) { grpc_end2end_http_proxy* proxy = (grpc_end2end_http_proxy*)arg; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; do { gpr_ref(&proxy->users); grpc_pollset_worker* worker = nullptr; gpr_mu_lock(proxy->mu); GRPC_LOG_IF_ERROR( "grpc_pollset_work", - grpc_pollset_work(proxy->pollset, &worker, - grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC)); + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC)); gpr_mu_unlock(proxy->mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } while (!gpr_unref(&proxy->users)); + grpc_exec_ctx_finish(&exec_ctx); } grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( grpc_channel_args* args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_end2end_http_proxy* proxy = (grpc_end2end_http_proxy*)gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); @@ -529,8 +552,8 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); // Create TCP server. proxy->channel_args = grpc_channel_args_copy(args); - grpc_error* error = - grpc_tcp_server_create(nullptr, proxy->channel_args, &proxy->server); + grpc_error* error = grpc_tcp_server_create( + &exec_ctx, nullptr, proxy->channel_args, &proxy->server); GPR_ASSERT(error == GRPC_ERROR_NONE); // Bind to port. grpc_resolved_address resolved_addr; @@ -545,8 +568,9 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( // Start server. proxy->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(proxy->pollset, &proxy->mu); - grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy); - + grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, on_accept, + proxy); + grpc_exec_ctx_finish(&exec_ctx); // Start proxy thread. gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); @@ -555,25 +579,27 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( return proxy; } -static void destroy_pollset(void* arg, grpc_error* error) { +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_pollset* pollset = (grpc_pollset*)arg; - grpc_pollset_destroy(pollset); + grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); } void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { gpr_unref(&proxy->users); // Signal proxy thread to shutdown. - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_thd_join(proxy->thd); - grpc_tcp_server_shutdown_listeners(proxy->server); - grpc_tcp_server_unref(proxy->server); + grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); + grpc_tcp_server_unref(&exec_ctx, proxy->server); gpr_free(proxy->proxy_name); - grpc_channel_args_destroy(proxy->channel_args); - grpc_pollset_shutdown(proxy->pollset, + grpc_channel_args_destroy(&exec_ctx, proxy->channel_args); + grpc_pollset_shutdown(&exec_ctx, proxy->pollset, GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset, grpc_schedule_on_exec_ctx)); - GRPC_COMBINER_UNREF(proxy->combiner, "test"); + GRPC_COMBINER_UNREF(&exec_ctx, proxy->combiner, "test"); gpr_free(proxy); + grpc_exec_ctx_finish(&exec_ctx); } const char* grpc_end2end_http_proxy_get_proxy_name( diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 967a6d560f..75117218e4 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -376,7 +376,8 @@ typedef struct addr_req { grpc_lb_addresses** lb_addrs; } addr_req; -static void finish_resolve(void* arg, grpc_error* error) { +static void finish_resolve(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { addr_req* r = static_cast<addr_req*>(arg); if (error == GRPC_ERROR_NONE && 0 == strcmp(r->addr, "server")) { @@ -394,9 +395,9 @@ static void finish_resolve(void* arg, grpc_error* error) { nullptr); *r->lb_addrs = lb_addrs; } - GRPC_CLOSURE_SCHED(r->on_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, GRPC_ERROR_NONE); } else { - GRPC_CLOSURE_SCHED(r->on_done, + GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Resolution failed", &error, 1)); } @@ -405,7 +406,8 @@ static void finish_resolve(void* arg, grpc_error* error) { gpr_free(r); } -void my_resolve_address(const char* addr, const char* default_port, +void my_resolve_address(grpc_exec_ctx* exec_ctx, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addresses) { @@ -415,24 +417,22 @@ void my_resolve_address(const char* addr, const char* default_port, r->addrs = addresses; r->lb_addrs = nullptr; grpc_timer_init( - &r->timer, GPR_MS_PER_SEC + grpc_core::ExecCtx::Get()->Now(), + exec_ctx, &r->timer, GPR_MS_PER_SEC + grpc_exec_ctx_now(exec_ctx), GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx)); } -grpc_ares_request* my_dns_lookup_ares(const char* dns_server, const char* addr, - const char* default_port, - grpc_pollset_set* interested_parties, - grpc_closure* on_done, - grpc_lb_addresses** lb_addrs, - bool check_grpclb, - char** service_config_json) { +grpc_ares_request* my_dns_lookup_ares( + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** lb_addrs, bool check_grpclb, + char** service_config_json) { addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r))); r->addr = gpr_strdup(addr); r->on_done = on_done; r->addrs = nullptr; r->lb_addrs = lb_addrs; grpc_timer_init( - &r->timer, GPR_MS_PER_SEC + grpc_core::ExecCtx::Get()->Now(), + exec_ctx, &r->timer, GPR_MS_PER_SEC + grpc_exec_ctx_now(exec_ctx), GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx)); return nullptr; } @@ -442,12 +442,12 @@ grpc_ares_request* my_dns_lookup_ares(const char* dns_server, const char* addr, // defined in tcp_client_posix.c extern void (*grpc_tcp_client_connect_impl)( - grpc_closure* closure, grpc_endpoint** ep, + grpc_exec_ctx* exec_ctx, grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_millis deadline); -static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, - gpr_timespec deadline); +static void sched_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_endpoint** ep, gpr_timespec deadline); typedef struct { grpc_timer timer; @@ -456,11 +456,11 @@ typedef struct { gpr_timespec deadline; } future_connect; -static void do_connect(void* arg, grpc_error* error) { +static void do_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { future_connect* fc = static_cast<future_connect*>(arg); if (error != GRPC_ERROR_NONE) { *fc->ep = nullptr; - GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_REF(error)); } else if (g_server != nullptr) { grpc_endpoint* client; grpc_endpoint* server; @@ -468,23 +468,25 @@ static void do_connect(void* arg, grpc_error* error) { *fc->ep = client; grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, server, false); - grpc_server_setup_transport(g_server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_create_chttp2_transport(exec_ctx, nullptr, server, false); + grpc_server_setup_transport(exec_ctx, g_server, transport, nullptr, + nullptr); + grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr); - GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE); } else { - sched_connect(fc->closure, fc->ep, fc->deadline); + sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline); } gpr_free(fc); } -static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, - gpr_timespec deadline) { +static void sched_connect(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_endpoint** ep, gpr_timespec deadline) { if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) { *ep = nullptr; - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Connect deadline exceeded")); + GRPC_CLOSURE_SCHED( + exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connect deadline exceeded")); return; } @@ -493,16 +495,17 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, fc->ep = ep; fc->deadline = deadline; grpc_timer_init( - &fc->timer, GPR_MS_PER_SEC + grpc_core::ExecCtx::Get()->Now(), + exec_ctx, &fc->timer, GPR_MS_PER_SEC + grpc_exec_ctx_now(exec_ctx), GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx)); } -static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep, +static void my_tcp_client_connect(grpc_exec_ctx* exec_ctx, + grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, const grpc_resolved_address* addr, grpc_millis deadline) { - sched_connect(closure, ep, + sched_connect(exec_ctx, closure, ep, grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)); } @@ -748,8 +751,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); grpc_timer_manager_set_threading(false); { - grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resolve_address = my_resolve_address; grpc_dns_lookup_ares = my_dns_lookup_ares; @@ -842,8 +846,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { g_channel = grpc_insecure_channel_create(target_uri, args, nullptr); GPR_ASSERT(g_channel != nullptr); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } gpr_free(target_uri); gpr_free(target); @@ -869,8 +874,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { g_server = grpc_server_create(args, nullptr); GPR_ASSERT(g_server != nullptr); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } grpc_server_register_completion_queue(g_server, cq, nullptr); grpc_server_start(g_server); @@ -1199,8 +1205,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_secure_channel_create(creds, target_uri, args, nullptr); GPR_ASSERT(g_channel != nullptr); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } gpr_free(target_uri); gpr_free(target); diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index c17d581d8b..5871f0f43e 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -43,114 +43,112 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { if (squelch) gpr_set_log_function(dont_log); if (leak_check) grpc_memory_counters_init(); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); - - grpc_resource_quota* resource_quota = - grpc_resource_quota_create("client_fuzzer"); - grpc_endpoint* mock_endpoint = - grpc_mock_endpoint_create(discard_write, resource_quota); - grpc_resource_quota_unref_internal(resource_quota); - - grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, mock_endpoint, true); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); - - grpc_channel* channel = grpc_channel_create( - "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_slice host = grpc_slice_from_static_string("localhost"); - grpc_call* call = grpc_channel_create_call( - channel, nullptr, 0, cq, grpc_slice_from_static_string("/foo"), &host, - gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - - grpc_metadata_array initial_metadata_recv; - grpc_metadata_array_init(&initial_metadata_recv); - grpc_byte_buffer* response_payload_recv = nullptr; - grpc_metadata_array trailing_metadata_recv; - grpc_metadata_array_init(&trailing_metadata_recv); - grpc_status_code status; - grpc_slice details = grpc_empty_slice(); - - grpc_op ops[6]; - memset(ops, 0, sizeof(ops)); - grpc_op* op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = - &initial_metadata_recv; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &response_payload_recv; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; - op->data.recv_status_on_client.status = &status; - op->data.recv_status_on_client.status_details = &details; - op->flags = 0; - op->reserved = nullptr; - op++; - grpc_call_error error = - grpc_call_start_batch(call, ops, (size_t)(op - ops), tag(1), nullptr); - int requested_calls = 1; - GPR_ASSERT(GRPC_CALL_OK == error); - - grpc_mock_endpoint_put_read( - mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size)); - - grpc_event ev; - while (1) { - grpc_core::ExecCtx::Get()->Flush(); - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - switch (ev.type) { - case GRPC_QUEUE_TIMEOUT: - goto done; - case GRPC_QUEUE_SHUTDOWN: - break; - case GRPC_OP_COMPLETE: - requested_calls--; - break; - } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); + + grpc_resource_quota* resource_quota = + grpc_resource_quota_create("client_fuzzer"); + grpc_endpoint* mock_endpoint = + grpc_mock_endpoint_create(discard_write, resource_quota); + grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + grpc_transport* transport = + grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, true); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); + + grpc_channel* channel = grpc_channel_create( + &exec_ctx, "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_slice host = grpc_slice_from_static_string("localhost"); + grpc_call* call = grpc_channel_create_call( + channel, nullptr, 0, cq, grpc_slice_from_static_string("/foo"), &host, + gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array_init(&initial_metadata_recv); + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_status_code status; + grpc_slice details = grpc_empty_slice(); + + grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + grpc_call_error error = + grpc_call_start_batch(call, ops, (size_t)(op - ops), tag(1), nullptr); + int requested_calls = 1; + GPR_ASSERT(GRPC_CALL_OK == error); + + grpc_mock_endpoint_put_read( + &exec_ctx, mock_endpoint, + grpc_slice_from_copied_buffer((const char*)data, size)); + + grpc_event ev; + while (1) { + grpc_exec_ctx_flush(&exec_ctx); + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + switch (ev.type) { + case GRPC_QUEUE_TIMEOUT: + goto done; + case GRPC_QUEUE_SHUTDOWN: + break; + case GRPC_OP_COMPLETE: + requested_calls--; + break; } + } - done: - if (requested_calls) { - grpc_call_cancel(call, nullptr); - } - for (int i = 0; i < requested_calls; i++) { - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - } - grpc_completion_queue_shutdown(cq); - for (int i = 0; i < requested_calls; i++) { - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); - } - grpc_call_unref(call); - grpc_completion_queue_destroy(cq); - grpc_metadata_array_destroy(&initial_metadata_recv); - grpc_metadata_array_destroy(&trailing_metadata_recv); - grpc_slice_unref(details); - grpc_channel_destroy(channel); - if (response_payload_recv != nullptr) { - grpc_byte_buffer_destroy(response_payload_recv); - } +done: + if (requested_calls) { + grpc_call_cancel(call, nullptr); + } + for (int i = 0; i < requested_calls; i++) { + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + } + grpc_completion_queue_shutdown(cq); + for (int i = 0; i < requested_calls; i++) { + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); + } + grpc_call_unref(call); + grpc_completion_queue_destroy(cq); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_slice_unref(details); + grpc_channel_destroy(channel); + if (response_payload_recv != nullptr) { + grpc_byte_buffer_destroy(response_payload_recv); } grpc_shutdown(); if (leak_check) { diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index 61c55e0afd..67caf4e720 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -41,82 +41,81 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { if (squelch) gpr_set_log_function(dont_log); if (leak_check) grpc_memory_counters_init(); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, false); - grpc_resource_quota* resource_quota = - grpc_resource_quota_create("server_fuzzer"); - grpc_endpoint* mock_endpoint = - grpc_mock_endpoint_create(discard_write, resource_quota); - grpc_resource_quota_unref_internal(resource_quota); - grpc_mock_endpoint_put_read( - mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size)); + grpc_resource_quota* resource_quota = + grpc_resource_quota_create("server_fuzzer"); + grpc_endpoint* mock_endpoint = + grpc_mock_endpoint_create(discard_write, resource_quota); + grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_mock_endpoint_put_read( + &exec_ctx, mock_endpoint, + grpc_slice_from_copied_buffer((const char*)data, size)); - grpc_server* server = grpc_server_create(nullptr, nullptr); - grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - grpc_server_register_completion_queue(server, cq, nullptr); - // TODO(ctiller): add registered methods (one for POST, one for PUT) - // void *registered_method = - // grpc_server_register_method(server, "/reg", NULL, 0); - grpc_server_start(server); - grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, mock_endpoint, false); - grpc_server_setup_transport(server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_server* server = grpc_server_create(nullptr, nullptr); + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + grpc_server_register_completion_queue(server, cq, nullptr); + // TODO(ctiller): add registered methods (one for POST, one for PUT) + // void *registered_method = + // grpc_server_register_method(server, "/reg", NULL, 0); + grpc_server_start(server); + grpc_transport* transport = + grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, false); + grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); - grpc_call* call1 = nullptr; - grpc_call_details call_details1; - grpc_metadata_array request_metadata1; - grpc_call_details_init(&call_details1); - grpc_metadata_array_init(&request_metadata1); - int requested_calls = 0; + grpc_call* call1 = nullptr; + grpc_call_details call_details1; + grpc_metadata_array request_metadata1; + grpc_call_details_init(&call_details1); + grpc_metadata_array_init(&request_metadata1); + int requested_calls = 0; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_call(server, &call1, &call_details1, - &request_metadata1, cq, cq, tag(1))); - requested_calls++; + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &call1, &call_details1, + &request_metadata1, cq, cq, tag(1))); + requested_calls++; - grpc_event ev; - while (1) { - grpc_core::ExecCtx::Get()->Flush(); - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - switch (ev.type) { - case GRPC_QUEUE_TIMEOUT: - goto done; - case GRPC_QUEUE_SHUTDOWN: - break; - case GRPC_OP_COMPLETE: - switch (detag(ev.tag)) { - case 1: - requested_calls--; - // TODO(ctiller): keep reading that call! - break; - } - } + grpc_event ev; + while (1) { + grpc_exec_ctx_flush(&exec_ctx); + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + switch (ev.type) { + case GRPC_QUEUE_TIMEOUT: + goto done; + case GRPC_QUEUE_SHUTDOWN: + break; + case GRPC_OP_COMPLETE: + switch (detag(ev.tag)) { + case 1: + requested_calls--; + // TODO(ctiller): keep reading that call! + break; + } } + } - done: - if (call1 != nullptr) grpc_call_unref(call1); - grpc_call_details_destroy(&call_details1); - grpc_metadata_array_destroy(&request_metadata1); - grpc_server_shutdown_and_notify(server, cq, tag(0xdead)); - grpc_server_cancel_all_calls(server); - for (int i = 0; i <= requested_calls; i++) { - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - } - grpc_completion_queue_shutdown(cq); - for (int i = 0; i <= requested_calls; i++) { - ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), - nullptr); - GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); - } - grpc_server_destroy(server); - grpc_completion_queue_destroy(cq); +done: + if (call1 != nullptr) grpc_call_unref(call1); + grpc_call_details_destroy(&call_details1); + grpc_metadata_array_destroy(&request_metadata1); + grpc_server_shutdown_and_notify(server, cq, tag(0xdead)); + grpc_server_cancel_all_calls(server); + for (int i = 0; i <= requested_calls; i++) { + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + } + grpc_completion_queue_shutdown(cq); + for (int i = 0; i <= requested_calls; i++) { + ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); } + grpc_server_destroy(server); + grpc_completion_queue_destroy(cq); grpc_shutdown(); if (leak_check) { counters = grpc_memory_counters_snapshot(); diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 94cfbdda7e..2d0db967c3 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -39,15 +39,16 @@ static void* tag(intptr_t i) { return (void*)i; } static gpr_mu g_mu; static int g_resolve_port = -1; -static void (*iomgr_resolve_address)(const char* addr, const char* default_port, +static void (*iomgr_resolve_address)(grpc_exec_ctx* exec_ctx, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addresses); static grpc_ares_request* (*iomgr_dns_lookup_ares)( - const char* dns_server, const char* addr, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addresses, bool check_grpclb, + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** addresses, bool check_grpclb, char** service_config_json); static void set_resolve_port(int port) { @@ -56,13 +57,14 @@ static void set_resolve_port(int port) { gpr_mu_unlock(&g_mu); } -static void my_resolve_address(const char* addr, const char* default_port, +static void my_resolve_address(grpc_exec_ctx* exec_ctx, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_resolved_addresses** addrs) { if (0 != strcmp(addr, "test")) { - iomgr_resolve_address(addr, default_port, interested_parties, on_done, - addrs); + iomgr_resolve_address(exec_ctx, addr, default_port, interested_parties, + on_done, addrs); return; } @@ -84,16 +86,16 @@ static void my_resolve_address(const char* addr, const char* default_port, (*addrs)->addrs[0].len = sizeof(*sa); gpr_mu_unlock(&g_mu); } - GRPC_CLOSURE_SCHED(on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, error); } static grpc_ares_request* my_dns_lookup_ares( - const char* dns_server, const char* addr, const char* default_port, - grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** lb_addrs, bool check_grpclb, + grpc_exec_ctx* exec_ctx, const char* dns_server, const char* addr, + const char* default_port, grpc_pollset_set* interested_parties, + grpc_closure* on_done, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json) { if (0 != strcmp(addr, "test")) { - return iomgr_dns_lookup_ares(dns_server, addr, default_port, + return iomgr_dns_lookup_ares(exec_ctx, dns_server, addr, default_port, interested_parties, on_done, lb_addrs, check_grpclb, service_config_json); } @@ -115,7 +117,7 @@ static grpc_ares_request* my_dns_lookup_ares( gpr_free(sa); gpr_mu_unlock(&g_mu); } - GRPC_CLOSURE_SCHED(on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, error); return nullptr; } diff --git a/test/core/end2end/h2_ssl_cert_test.cc b/test/core/end2end/h2_ssl_cert_test.cc index d50d1f4d81..9a98c07158 100644 --- a/test/core/end2end/h2_ssl_cert_test.cc +++ b/test/core/end2end/h2_ssl_cert_test.cc @@ -181,8 +181,9 @@ typedef enum { NONE, SELF_SIGNED, SIGNED, BAD_CERT_PAIR } certtype; grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1); \ chttp2_init_client_secure_fullstack(f, new_client_args, ssl_creds); \ { \ - grpc_core::ExecCtx exec_ctx; \ - grpc_channel_args_destroy(new_client_args); \ + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; \ + grpc_channel_args_destroy(&exec_ctx, new_client_args); \ + grpc_exec_ctx_finish(&exec_ctx); \ } \ } diff --git a/test/core/end2end/tests/cancel_after_accept.cc b/test/core/end2end/tests/cancel_after_accept.cc index f59caf7e35..83439d71d2 100644 --- a/test/core/end2end/tests/cancel_after_accept.cc +++ b/test/core/end2end/tests/cancel_after_accept.cc @@ -245,8 +245,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, grpc_call_unref(s); if (args != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } cq_verifier_destroy(cqv); diff --git a/test/core/end2end/tests/cancel_after_round_trip.cc b/test/core/end2end/tests/cancel_after_round_trip.cc index b10b93978d..ddcec67de5 100644 --- a/test/core/end2end/tests/cancel_after_round_trip.cc +++ b/test/core/end2end/tests/cancel_after_round_trip.cc @@ -278,8 +278,9 @@ static void test_cancel_after_round_trip(grpc_end2end_test_config config, grpc_call_unref(s); if (args != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, args); + grpc_exec_ctx_finish(&exec_ctx); } cq_verifier_destroy(cqv); diff --git a/test/core/end2end/tests/compressed_payload.cc b/test/core/end2end/tests/compressed_payload.cc index 944edc7a70..a8ea0ff2e0 100644 --- a/test/core/end2end/tests/compressed_payload.cc +++ b/test/core/end2end/tests/compressed_payload.cc @@ -129,9 +129,10 @@ static void request_for_disabled_algorithm( server_args = grpc_channel_args_set_compression_algorithm(nullptr, GRPC_COMPRESS_NONE); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; server_args = grpc_channel_args_compression_algorithm_set_state( - &server_args, algorithm_to_disable, false); + &exec_ctx, &server_args, algorithm_to_disable, false); + grpc_exec_ctx_finish(&exec_ctx); } f = begin_test(config, test_name, client_args, server_args); @@ -256,9 +257,10 @@ static void request_for_disabled_algorithm( grpc_byte_buffer_destroy(request_payload_recv); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } end_test(&f); @@ -537,9 +539,10 @@ static void request_with_payload_template( cq_verifier_destroy(cqv); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } end_test(&f); diff --git a/test/core/end2end/tests/filter_call_init_fails.cc b/test/core/end2end/tests/filter_call_init_fails.cc index 8f46f0bb91..6eed68a2f9 100644 --- a/test/core/end2end/tests/filter_call_init_fails.cc +++ b/test/core/end2end/tests/filter_call_init_fails.cc @@ -399,23 +399,26 @@ static void test_client_subchannel_filter(grpc_end2end_test_config config) { * Test filter - always fails to initialize a call */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { return grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("access denied"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_PERMISSION_DENIED); } -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) {} -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} static const grpc_channel_filter test_filter = { grpc_call_next_op, @@ -434,7 +437,8 @@ static const grpc_channel_filter test_filter = { * Registration */ -static bool maybe_add_server_channel_filter(grpc_channel_stack_builder* builder, +static bool maybe_add_server_channel_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg) { if (g_enable_server_channel_filter) { // Want to add the filter as close to the end as possible, to make @@ -453,7 +457,8 @@ static bool maybe_add_server_channel_filter(grpc_channel_stack_builder* builder, } } -static bool maybe_add_client_channel_filter(grpc_channel_stack_builder* builder, +static bool maybe_add_client_channel_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg) { if (g_enable_client_channel_filter) { // Want to add the filter as close to the end as possible, to make @@ -473,7 +478,7 @@ static bool maybe_add_client_channel_filter(grpc_channel_stack_builder* builder, } static bool maybe_add_client_subchannel_filter( - grpc_channel_stack_builder* builder, void* arg) { + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) { if (g_enable_client_subchannel_filter) { // Want to add the filter as close to the end as possible, to make // sure that all of the filters work well together. However, we diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index ec8f9dbe00..793f590686 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -197,11 +197,12 @@ typedef struct { uint8_t unused; } channel_data; -static void recv_im_ready(void* arg, grpc_error* error) { +static void recv_im_ready(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; GRPC_CLOSURE_RUN( - calld->recv_im_ready, + exec_ctx, calld->recv_im_ready, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failure that's not preventable.", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, @@ -209,7 +210,8 @@ static void recv_im_ready(void* arg, grpc_error* error) { } static void start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* op) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { call_data* calld = (call_data*)elem->call_data; if (op->recv_initial_metadata) { calld->recv_im_ready = @@ -217,24 +219,27 @@ static void start_transport_stream_op_batch( op->payload->recv_initial_metadata.recv_initial_metadata_ready = GRPC_CLOSURE_CREATE(recv_im_ready, elem, grpc_schedule_on_exec_ctx); } - grpc_call_next_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); } -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { return GRPC_ERROR_NONE; } -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) {} -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} static const grpc_channel_filter test_filter = { start_transport_stream_op_batch, @@ -253,7 +258,8 @@ static const grpc_channel_filter test_filter = { * Registration */ -static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { +static bool maybe_add_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg) { if (g_enable_filter) { return grpc_channel_stack_builder_prepend_filter(builder, &test_filter, nullptr, nullptr); diff --git a/test/core/end2end/tests/filter_latency.cc b/test/core/end2end/tests/filter_latency.cc index 845cbc01cf..c4d96ebfe2 100644 --- a/test/core/end2end/tests/filter_latency.cc +++ b/test/core/end2end/tests/filter_latency.cc @@ -247,12 +247,14 @@ static void test_request(grpc_end2end_test_config config) { * Test latency filter */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { return GRPC_ERROR_NONE; } -static void client_destroy_call_elem(grpc_call_element* elem, +static void client_destroy_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { gpr_mu_lock(&g_mu); @@ -260,7 +262,8 @@ static void client_destroy_call_elem(grpc_call_element* elem, gpr_mu_unlock(&g_mu); } -static void server_destroy_call_elem(grpc_call_element* elem, +static void server_destroy_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { gpr_mu_lock(&g_mu); @@ -268,12 +271,14 @@ static void server_destroy_call_elem(grpc_call_element* elem, gpr_mu_unlock(&g_mu); } -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -static void destroy_channel_elem(grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} static const grpc_channel_filter test_client_filter = { grpc_call_next_op, @@ -305,7 +310,8 @@ static const grpc_channel_filter test_server_filter = { * Registration */ -static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { +static bool maybe_add_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg) { grpc_channel_filter* filter = (grpc_channel_filter*)arg; if (g_enable_filter) { // Want to add the filter as close to the end as possible, to make diff --git a/test/core/end2end/tests/load_reporting_hook.cc b/test/core/end2end/tests/load_reporting_hook.cc index e056bd547b..faabec34cb 100644 --- a/test/core/end2end/tests/load_reporting_hook.cc +++ b/test/core/end2end/tests/load_reporting_hook.cc @@ -300,8 +300,9 @@ static void test_load_reporting_hook(grpc_end2end_test_config config) { &trailing_lr_metadata); end_test(&f); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(lr_server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, lr_server_args); + grpc_exec_ctx_finish(&exec_ctx); } config.tear_down_data(&f); } diff --git a/test/core/end2end/tests/max_message_length.cc b/test/core/end2end/tests/max_message_length.cc index e581f1fc20..f1ac27fa7c 100644 --- a/test/core/end2end/tests/max_message_length.cc +++ b/test/core/end2end/tests/max_message_length.cc @@ -173,9 +173,12 @@ static void test_max_message_length_on_request(grpc_end2end_test_config config, f = begin_test(config, "test_max_request_message_length", client_args, server_args); { - grpc_core::ExecCtx exec_ctx; - if (client_args != nullptr) grpc_channel_args_destroy(client_args); - if (server_args != nullptr) grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (client_args != nullptr) + grpc_channel_args_destroy(&exec_ctx, client_args); + if (server_args != nullptr) + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } cqv = cq_verifier_create(f.cq); @@ -363,9 +366,12 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config, f = begin_test(config, "test_max_response_message_length", client_args, server_args); { - grpc_core::ExecCtx exec_ctx; - if (client_args != nullptr) grpc_channel_args_destroy(client_args); - if (server_args != nullptr) grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (client_args != nullptr) + grpc_channel_args_destroy(&exec_ctx, client_args); + if (server_args != nullptr) + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } cqv = cq_verifier_create(f.cq); diff --git a/test/core/end2end/tests/stream_compression_compressed_payload.cc b/test/core/end2end/tests/stream_compression_compressed_payload.cc index ec3050ad45..d73346468a 100644 --- a/test/core/end2end/tests/stream_compression_compressed_payload.cc +++ b/test/core/end2end/tests/stream_compression_compressed_payload.cc @@ -129,9 +129,10 @@ static void request_for_disabled_algorithm( server_args = grpc_channel_args_set_stream_compression_algorithm( nullptr, GRPC_STREAM_COMPRESS_NONE); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; server_args = grpc_channel_args_stream_compression_algorithm_set_state( - &server_args, algorithm_to_disable, false); + &exec_ctx, &server_args, algorithm_to_disable, false); + grpc_exec_ctx_finish(&exec_ctx); } f = begin_test(config, test_name, client_args, server_args); @@ -257,9 +258,10 @@ static void request_for_disabled_algorithm( grpc_byte_buffer_destroy(request_payload_recv); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } end_test(&f); @@ -545,9 +547,10 @@ static void request_with_payload_template( cq_verifier_destroy(cqv); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } end_test(&f); diff --git a/test/core/end2end/tests/stream_compression_payload.cc b/test/core/end2end/tests/stream_compression_payload.cc index b95e6528cd..924961ea55 100644 --- a/test/core/end2end/tests/stream_compression_payload.cc +++ b/test/core/end2end/tests/stream_compression_payload.cc @@ -277,9 +277,10 @@ static void test_invoke_request_response_with_payload( end_test(&f); config.tear_down_data(&f); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/test/core/end2end/tests/stream_compression_ping_pong_streaming.cc b/test/core/end2end/tests/stream_compression_ping_pong_streaming.cc index 2a8799ee67..d3b526f04e 100644 --- a/test/core/end2end/tests/stream_compression_ping_pong_streaming.cc +++ b/test/core/end2end/tests/stream_compression_ping_pong_streaming.cc @@ -275,9 +275,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, end_test(&f); config.tear_down_data(&f); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/test/core/end2end/tests/workaround_cronet_compression.cc b/test/core/end2end/tests/workaround_cronet_compression.cc index d4decce0aa..bc4d5079d8 100644 --- a/test/core/end2end/tests/workaround_cronet_compression.cc +++ b/test/core/end2end/tests/workaround_cronet_compression.cc @@ -142,14 +142,15 @@ static void request_with_payload_template( nullptr, default_server_channel_compression_algorithm); if (user_agent_override) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_channel_args* client_args_old = client_args; grpc_arg arg; arg.key = const_cast<char*>(GRPC_ARG_PRIMARY_USER_AGENT_STRING); arg.type = GRPC_ARG_STRING; arg.value.string = user_agent_override; client_args = grpc_channel_args_copy_and_add(client_args_old, &arg, 1); - grpc_channel_args_destroy(client_args_old); + grpc_channel_args_destroy(&exec_ctx, client_args_old); + grpc_exec_ctx_finish(&exec_ctx); } f = begin_test(config, test_name, client_args, server_args); @@ -350,9 +351,10 @@ static void request_with_payload_template( cq_verifier_destroy(cqv); { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); } end_test(&f); |