diff options
Diffstat (limited to 'grpc/include/grpc/event_engine/event_engine.h')
-rw-r--r-- | grpc/include/grpc/event_engine/event_engine.h | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/grpc/include/grpc/event_engine/event_engine.h b/grpc/include/grpc/event_engine/event_engine.h new file mode 100644 index 00000000..cdb59662 --- /dev/null +++ b/grpc/include/grpc/event_engine/event_engine.h @@ -0,0 +1,336 @@ +// Copyright 2021 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H +#define GRPC_EVENT_ENGINE_EVENT_ENGINE_H + +#include <grpc/support/port_platform.h> + +#include <functional> +#include <vector> + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/time/time.h" + +#include "grpc/event_engine/channel_args.h" +#include "grpc/event_engine/port.h" +#include "grpc/event_engine/slice_allocator.h" + +// TODO(hork): explicitly define lifetimes and ownership of all objects. +// TODO(hork): Define the Endpoint::Write metrics collection system + +namespace grpc_event_engine { +namespace experimental { + +//////////////////////////////////////////////////////////////////////////////// +/// The EventEngine encapsulates all platform-specific behaviors related to low +/// level network I/O, timers, asynchronous execution, and DNS resolution. +/// +/// This interface allows developers to provide their own event management and +/// network stacks. Motivating uses cases for supporting custom EventEngines +/// include the ability to hook into external event loops, and using different +/// EventEngine instances for each channel to better insulate network I/O and +/// callback processing from other channels. +/// +/// A default cross-platform EventEngine instance is provided by gRPC. +/// +/// LIFESPAN AND OWNERSHIP +/// +/// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure +/// that the engines remain available until they are no longer needed. Depending +/// on the use case, engines may live until gRPC is shut down. +/// +/// EXAMPLE USAGE (Not yet implemented) +/// +/// Custom EventEngines can be specified per channel, and allow configuration +/// for both clients and servers. To set a custom EventEngine for a client +/// channel, you can do something like the following: +/// +/// ChannelArguments args; +/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); +/// args.SetEventEngine(engine); +/// MyAppClient client(grpc::CreateCustomChannel( +/// "localhost:50051", grpc::InsecureChannelCredentials(), args)); +/// +/// A gRPC server can use a custom EventEngine by calling the +/// ServerBuilder::SetEventEngine method: +/// +/// ServerBuilder builder; +/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); +/// builder.SetEventEngine(engine); +/// std::unique_ptr<Server> server(builder.BuildAndStart()); +/// server->Wait(); +/// +//////////////////////////////////////////////////////////////////////////////// +class EventEngine { + public: + /// A basic callable function. The first argument to all callbacks is an + /// absl::Status indicating the status of the operation associated with this + /// callback. Each EventEngine method that takes a callback parameter, defines + /// the expected sets and meanings of statuses for that use case. + using Callback = std::function<void(absl::Status)>; + /// A callback handle, used to cancel a callback. + struct TaskHandle { + intptr_t key; + }; + /// A thin wrapper around a platform-specific sockaddr type. A sockaddr struct + /// exists on all platforms that gRPC supports. + /// + /// Platforms are expected to provide definitions for: + /// * sockaddr + /// * sockaddr_in + /// * sockaddr_in6 + class ResolvedAddress { + public: + static constexpr socklen_t MAX_SIZE_BYTES = 128; + + ResolvedAddress(const sockaddr* address, socklen_t size); + const struct sockaddr* address() const; + socklen_t size() const; + + private: + char address_[MAX_SIZE_BYTES]; + socklen_t size_; + }; + + /// An Endpoint represents one end of a connection between a gRPC client and + /// server. Endpoints are created when connections are established, and + /// Endpoint operations are gRPC's primary means of communication. + /// + /// Endpoints must use the provided SliceAllocator for all data buffer memory + /// allocations. gRPC allows applications to set memory constraints per + /// Channel or Server, and the implementation depends on all dynamic memory + /// allocation being handled by the quota system. + class Endpoint { + public: + /// The Endpoint destructor is responsible for shutting down all connections + /// and invoking all pending read or write callbacks with an error status. + virtual ~Endpoint() = default; + /// Read data from the Endpoint. + /// + /// When data is available on the connection, that data is moved into the + /// \a buffer, and the \a on_read callback is called. The caller must ensure + /// that the callback has access to the buffer when executed later. + /// Ownership of the buffer is not transferred. Valid slices *may* be placed + /// into the buffer even if the callback is invoked with a non-OK Status. + /// + /// For failed read operations, implementations should pass the appropriate + /// statuses to \a on_read. For example, callbacks might expect to receive + /// DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED on + /// endpoint shutdown. + virtual void Read(Callback on_read, SliceBuffer* buffer, + absl::Time deadline) = 0; + /// Write data out on the connection. + /// + /// \a on_writable is called when the connection is ready for more data. The + /// Slices within the \a data buffer may be mutated at will by the Endpoint + /// until \a on_writable is called. The \a data SliceBuffer will remain + /// valid after calling \a Write, but its state is otherwise undefined. + /// + /// For failed write operations, implementations should pass the appropriate + /// statuses to \a on_writable. For example, callbacks might expect to + /// receive DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED + /// on endpoint shutdown. + virtual void Write(Callback on_writable, SliceBuffer* data, + absl::Time deadline) = 0; + // TODO(hork): define status codes for the callback + // TODO(hork): define cleanup operations, lifetimes, responsibilities. + virtual void Close(Callback on_close) = 0; + /// These methods return an address in the format described in DNSResolver. + /// The returned values are owned by the Endpoint and are expected to remain + /// valid for the life of the Endpoint. + virtual const ResolvedAddress* GetPeerAddress() const = 0; + virtual const ResolvedAddress* GetLocalAddress() const = 0; + }; + + /// Called when a new connection is established. + /// + /// If the connection attempt was not successful, implementations should pass + /// the appropriate statuses to this callback. For example, callbacks might + /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or + /// CANCELLED statuses on EventEngine shutdown. + using OnConnectCallback = + std::function<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>; + + /// An EventEngine Listener listens for incoming connection requests from gRPC + /// clients and initiates request processing once connections are established. + class Listener { + public: + /// Called when the listener has accepted a new client connection. + using AcceptCallback = std::function<void(std::unique_ptr<Endpoint>)>; + virtual ~Listener() = default; + /// Bind an address/port to this Listener. + /// + /// It is expected that multiple addresses/ports can be bound to this + /// Listener before Listener::Start has been called. Returns either the + /// bound port or an appropriate error status. + virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0; + virtual absl::Status Start() = 0; + }; + + /// Factory method to create a network listener / server. + /// + /// Once a \a Listener is created and started, the \a on_accept callback will + /// be called once asynchronously for each established connection. Note that + /// unlike other callbacks, there is no status code parameter since the + /// callback will only be called in healthy scenarios where connections can be + /// accepted. + /// + /// This method may return a non-OK status immediately if an error was + /// encountered in any synchronous steps required to create the Listener. In + /// this case, \a on_shutdown will never be called. + /// + /// If this method returns a Listener, then \a on_shutdown will be invoked + /// exactly once, when the Listener is shut down. The status passed to it will + /// indicate if there was a problem during shutdown. + /// + /// The provided \a SliceAllocatorFactory is used to create \a SliceAllocators + /// for Endpoint construction. + virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener( + Listener::AcceptCallback on_accept, Callback on_shutdown, + const ChannelArgs& args, + SliceAllocatorFactory slice_allocator_factory) = 0; + /// Creates a client network connection to a remote network listener. + /// + /// \a Connect may return an error status immediately if there was a failure + /// in the synchronous part of establishing a connection. In that event, the + /// \a on_connect callback *will not* have been executed. Otherwise, it is + /// expected that the \a on_connect callback will be asynchronously executed + /// exactly once by the EventEngine. + /// + /// Implementation Note: it is important that the \a slice_allocator be used + /// for all read/write buffer allocations in the EventEngine implementation. + /// This allows gRPC's \a ResourceQuota system to monitor and control memory + /// usage with graceful degradation mechanisms. Please see the \a + /// SliceAllocator API for more information. + virtual absl::Status Connect(OnConnectCallback on_connect, + const ResolvedAddress& addr, + const ChannelArgs& args, + SliceAllocator slice_allocator, + absl::Time deadline) = 0; + + /// The DNSResolver that provides asynchronous resolution. + class DNSResolver { + public: + /// A task handle for DNS Resolution requests. + struct LookupTaskHandle { + intptr_t key; + }; + /// A DNS SRV record type. + struct SRVRecord { + std::string host; + int port = 0; + int priority = 0; + int weight = 0; + }; + /// Called with the collection of sockaddrs that were resolved from a given + /// target address. + using LookupHostnameCallback = + std::function<void(absl::StatusOr<std::vector<ResolvedAddress>>)>; + /// Called with a collection of SRV records. + using LookupSRVCallback = + std::function<void(absl::StatusOr<std::vector<SRVRecord>>)>; + /// Called with the result of a TXT record lookup + using LookupTXTCallback = std::function<void(absl::StatusOr<std::string>)>; + + virtual ~DNSResolver() = default; + + /// Asynchronously resolve an address. + /// + /// \a default_port may be a non-numeric named service port, and will only + /// be used if \a address does not already contain a port component. + /// + /// When the lookup is complete, the \a on_resolve callback will be invoked + /// with a status indicating the success or failure of the lookup. + /// Implementations should pass the appropriate statuses to the callback. + /// For example, callbacks might expect to receive DEADLINE_EXCEEDED when + /// the deadline is exceeded or CANCELLED if the lookup was cancelled. + virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, + absl::string_view address, + absl::string_view default_port, + absl::Time deadline) = 0; + /// Asynchronously perform an SRV record lookup. + /// + /// \a on_resolve has the same meaning and expectations as \a + /// LookupHostname's \a on_resolve callback. + virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, + absl::string_view name, + absl::Time deadline) = 0; + /// Asynchronously perform a TXT record lookup. + /// + /// \a on_resolve has the same meaning and expectations as \a + /// LookupHostname's \a on_resolve callback. + virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, + absl::string_view name, + absl::Time deadline) = 0; + /// Cancel an asynchronous lookup operation. + virtual void TryCancelLookup(LookupTaskHandle handle) = 0; + }; + + virtual ~EventEngine() = default; + + // TODO(hork): define return status codes + /// Retrieves an instance of a DNSResolver. + virtual absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver() = 0; + + /// Intended for future expansion of Task run functionality. + struct RunOptions {}; + // TODO(hork): consider recommendation to make TaskHandle an output arg + /// Run a callback as soon as possible. + /// + /// The \a fn callback's \a status argument is used to indicate whether it was + /// executed normally. For example, the status may be CANCELLED if + /// \a TryCancel was called, or if the EventEngine is being shut down. + virtual TaskHandle Run(Callback fn, RunOptions opts) = 0; + /// Synonymous with scheduling an alarm to run at time \a when. + /// + /// The callback \a fn will execute when either when time \a when arrives + /// (receiving status OK), or when the \a fn is cancelled (reveiving status + /// CANCELLED). The callback is guaranteed to be called exactly once. + virtual TaskHandle RunAt(absl::Time when, Callback fn, RunOptions opts) = 0; + /// Immediately tries to cancel a callback. + /// Note that this is a "best effort" cancellation. No guarantee is made that + /// the callback will be cancelled, the call could be in any stage. + /// + /// There are three scenarios in which we may cancel a scheduled function: + /// 1. We cancel the execution before it has run. + /// 2. The callback has already run. + /// 3. We can't cancel it because it is "in flight". + /// + /// In all cases, the cancellation is still considered successful, the + /// callback will be run exactly once from either cancellation or from its + /// activation. + virtual void TryCancel(TaskHandle handle) = 0; + /// Immediately run all callbacks with status indicating the shutdown. Every + /// EventEngine is expected to shut down exactly once. No new callbacks/tasks + /// should be scheduled after shutdown has begun, no new connections should be + /// created. + /// + /// If the \a on_shutdown_complete callback is given a non-OK status, errors + /// are expected to be unrecoverable. For example, an implementation could + /// warn callers about leaks if memory cannot be freed within a certain + /// timeframe. + virtual void Shutdown(Callback on_shutdown_complete) = 0; +}; + +/// Lazily instantiate and return a default global EventEngine instance if no +/// custom instance is provided. If a custom EventEngine is provided for every +/// channel/server via ChannelArgs, this method should never be called, and the +/// default instance will never be instantiated. +std::shared_ptr<EventEngine> GetDefaultEventEngine(); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H |