summaryrefslogtreecommitdiff
path: root/grpc/spm-cpp-include/grpcpp/server.h
blob: a69e64b462f8c73c53e20d040344d158764039e2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPCPP_SERVER_H
#define GRPCPP_SERVER_H

#include <list>
#include <memory>
#include <vector>

#include <grpc/impl/codegen/port_platform.h>

#include <grpc/compression.h>
#include <grpc/support/atm.h>
#include <grpcpp/channel.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/client_interceptor.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/grpc_library.h>
#include <grpcpp/impl/codegen/server_interface.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/status.h>

struct grpc_server;

namespace grpc {
class AsyncGenericService;
class ServerContext;
class ServerInitializer;

namespace internal {
class ExternalConnectionAcceptorImpl;
}  // namespace internal

/// Represents a gRPC server.
///
/// Use a \a grpc::ServerBuilder to create, configure, and start
/// \a Server instances.
class Server : public ServerInterface, private GrpcLibraryCodegen {
 public:
  ~Server() ABSL_LOCKS_EXCLUDED(mu_) override;

  /// Block until the server shuts down.
  ///
  /// \warning The server must be either shutting down or some other thread must
  /// call \a Shutdown for this function to ever return.
  void Wait() ABSL_LOCKS_EXCLUDED(mu_) override;

  /// Global callbacks are a set of hooks that are called when server
  /// events occur.  \a SetGlobalCallbacks method is used to register
  /// the hooks with gRPC.  Note that
  /// the \a GlobalCallbacks instance will be shared among all
  /// \a Server instances in an application and can be set exactly
  /// once per application.
  class GlobalCallbacks {
   public:
    virtual ~GlobalCallbacks() {}
    /// Called before server is created.
    virtual void UpdateArguments(ChannelArguments* /*args*/) {}
    /// Called before application callback for each synchronous server request
    virtual void PreSynchronousRequest(ServerContext* context) = 0;
    /// Called after application callback for each synchronous server request
    virtual void PostSynchronousRequest(ServerContext* context) = 0;
    /// Called before server is started.
    virtual void PreServerStart(Server* /*server*/) {}
    /// Called after a server port is added.
    virtual void AddPort(Server* /*server*/, const std::string& /*addr*/,
                         ServerCredentials* /*creds*/, int /*port*/) {}
  };
  /// Set the global callback object. Can only be called once per application.
  /// Does not take ownership of callbacks, and expects the pointed to object
  /// to be alive until all server objects in the process have been destroyed.
  /// The same \a GlobalCallbacks object will be used throughout the
  /// application and is shared among all \a Server objects.
  static void SetGlobalCallbacks(GlobalCallbacks* callbacks);

  /// Returns a \em raw pointer to the underlying \a grpc_server instance.
  /// EXPERIMENTAL:  for internal/test use only
  grpc_server* c_server();

  /// Returns the health check service.
  HealthCheckServiceInterface* GetHealthCheckService() const {
    return health_check_service_.get();
  }

  /// Establish a channel for in-process communication
  std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args);

  /// NOTE: class experimental_type is not part of the public API of this class.
  /// TODO(yashykt): Integrate into public API when this is no longer
  /// experimental.
  class experimental_type {
   public:
    explicit experimental_type(Server* server) : server_(server) {}

    /// Establish a channel for in-process communication with client
    /// interceptors
    std::shared_ptr<Channel> InProcessChannelWithInterceptors(
        const ChannelArguments& args,
        std::vector<
            std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
            interceptor_creators);

   private:
    Server* server_;
  };

  /// NOTE: The function experimental() is not stable public API. It is a view
  /// to the experimental components of this class. It may be changed or removed
  /// at any time.
  experimental_type experimental() { return experimental_type(this); }

 protected:
  /// Register a service. This call does not take ownership of the service.
  /// The service must exist for the lifetime of the Server instance.
  bool RegisterService(const std::string* addr, Service* service) override;

  /// Try binding the server to the given \a addr endpoint
  /// (port, and optionally including IP address to bind to).
  ///
  /// It can be invoked multiple times. Should be used before
  /// starting the server.
  ///
  /// \param addr The address to try to bind to the server (eg, localhost:1234,
  /// 192.168.1.1:31416, [::1]:27182, etc.).
  /// \param creds The credentials associated with the server.
  ///
  /// \return bound port number on success, 0 on failure.
  ///
  /// \warning It is an error to call this method on an already started server.
  int AddListeningPort(const std::string& addr,
                       ServerCredentials* creds) override;

  /// NOTE: This is *NOT* a public API. The server constructors are supposed to
  /// be used by \a ServerBuilder class only. The constructor will be made
  /// 'private' very soon.
  ///
  /// Server constructors. To be used by \a ServerBuilder only.
  ///
  /// \param args The channel args
  ///
  /// \param sync_server_cqs The completion queues to use if the server is a
  /// synchronous server (or a hybrid server). The server polls for new RPCs on
  /// these queues
  ///
  /// \param min_pollers The minimum number of polling threads per server
  /// completion queue (in param sync_server_cqs) to use for listening to
  /// incoming requests (used only in case of sync server)
  ///
  /// \param max_pollers The maximum number of polling threads per server
  /// completion queue (in param sync_server_cqs) to use for listening to
  /// incoming requests (used only in case of sync server)
  ///
  /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
  /// server completion queues passed via sync_server_cqs param.
  Server(ChannelArguments* args,
         std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
             sync_server_cqs,
         int min_pollers, int max_pollers, int sync_cq_timeout_msec,
         std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>>
             acceptors,
         grpc_server_config_fetcher* server_config_fetcher = nullptr,
         grpc_resource_quota* server_rq = nullptr,
         std::vector<
             std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
             interceptor_creators = std::vector<std::unique_ptr<
                 experimental::ServerInterceptorFactoryInterface>>());

  /// Start the server.
  ///
  /// \param cqs Completion queues for handling asynchronous services. The
  /// caller is required to keep all completion queues live until the server is
  /// destroyed.
  /// \param num_cqs How many completion queues does \a cqs hold.
  void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;

  grpc_server* server() override { return server_; }

 protected:
  /// NOTE: This method is not part of the public API for this class.
  void set_health_check_service(
      std::unique_ptr<HealthCheckServiceInterface> service) {
    health_check_service_ = std::move(service);
  }

  ContextAllocator* context_allocator() { return context_allocator_.get(); }

  /// NOTE: This method is not part of the public API for this class.
  bool health_check_service_disabled() const {
    return health_check_service_disabled_;
  }

 private:
  std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
  interceptor_creators() override {
    return &interceptor_creators_;
  }

  friend class AsyncGenericService;
  friend class ServerBuilder;
  friend class ServerInitializer;

  class SyncRequest;
  class CallbackRequestBase;
  template <class ServerContextType>
  class CallbackRequest;
  class UnimplementedAsyncRequest;
  class UnimplementedAsyncResponse;

  /// SyncRequestThreadManager is an implementation of ThreadManager. This class
  /// is responsible for polling for incoming RPCs and calling the RPC handlers.
  /// This is only used in case of a Sync server (i.e a server exposing a sync
  /// interface)
  class SyncRequestThreadManager;

  /// Register a generic service. This call does not take ownership of the
  /// service. The service must exist for the lifetime of the Server instance.
  void RegisterAsyncGenericService(AsyncGenericService* service) override;

#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
  /// Register a callback-based generic service. This call does not take
  /// ownership of theservice. The service must exist for the lifetime of the
  /// Server instance.
  void RegisterCallbackGenericService(CallbackGenericService* service) override;

  void RegisterContextAllocator(
      std::unique_ptr<ContextAllocator> context_allocator) {
    context_allocator_ = std::move(context_allocator);
  }

#else
  /// NOTE: class experimental_registration_type is not part of the public API
  /// of this class
  /// TODO(vjpai): Move these contents to the public API of Server when
  ///              they are no longer experimental
  class experimental_registration_type final
      : public experimental_registration_interface {
   public:
    explicit experimental_registration_type(Server* server) : server_(server) {}
    void RegisterCallbackGenericService(
        experimental::CallbackGenericService* service) override {
      server_->RegisterCallbackGenericService(service);
    }

    void RegisterContextAllocator(
        std::unique_ptr<ContextAllocator> context_allocator) override {
      server_->context_allocator_ = std::move(context_allocator);
    }

   private:
    Server* server_;
  };

  /// TODO(vjpai): Mark this override when experimental type above is deleted
  void RegisterCallbackGenericService(
      experimental::CallbackGenericService* service);

  /// NOTE: The function experimental_registration() is not stable public API.
  /// It is a view to the experimental components of this class. It may be
  /// changed or removed at any time.
  experimental_registration_interface* experimental_registration() override {
    return &experimental_registration_;
  }
#endif

  void PerformOpsOnCall(internal::CallOpSetInterface* ops,
                        internal::Call* call) override;

  void ShutdownInternal(gpr_timespec deadline)
      ABSL_LOCKS_EXCLUDED(mu_) override;

  int max_receive_message_size() const override {
    return max_receive_message_size_;
  }

  CompletionQueue* CallbackCQ() ABSL_LOCKS_EXCLUDED(mu_) override;

  ServerInitializer* initializer();

  // Functions to manage the server shutdown ref count. Things that increase
  // the ref count are the running state of the server (take a ref at start and
  // drop it at shutdown) and each running callback RPC.
  void Ref();
  void UnrefWithPossibleNotify() ABSL_LOCKS_EXCLUDED(mu_);
  void UnrefAndWaitLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

  std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>>
      acceptors_;

  // A vector of interceptor factory objects.
  // This should be destroyed after health_check_service_ and this requirement
  // is satisfied by declaring interceptor_creators_ before
  // health_check_service_. (C++ mandates that member objects be destroyed in
  // the reverse order of initialization.)
  std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
      interceptor_creators_;

  int max_receive_message_size_;

  /// The following completion queues are ONLY used in case of Sync API
  /// i.e. if the server has any services with sync methods. The server uses
  /// these completion queues to poll for new RPCs
  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
      sync_server_cqs_;

  /// List of \a ThreadManager instances (one for each cq in
  /// the \a sync_server_cqs)
  std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;

#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
  // For registering experimental callback generic service; remove when that
  // method longer experimental
  experimental_registration_type experimental_registration_{this};
#endif

  // Server status
  internal::Mutex mu_;
  bool started_;
  bool shutdown_ ABSL_GUARDED_BY(mu_);
  bool shutdown_notified_
      ABSL_GUARDED_BY(mu_);  // Was notify called on the shutdown_cv_
  internal::CondVar shutdown_done_cv_;
  bool shutdown_done_ ABSL_GUARDED_BY(mu_) = false;
  std::atomic_int shutdown_refs_outstanding_{1};

  internal::CondVar shutdown_cv_;

  std::shared_ptr<GlobalCallbacks> global_callbacks_;

  std::vector<std::string> services_;
  bool has_async_generic_service_ = false;
  bool has_callback_generic_service_ = false;
  bool has_callback_methods_ = false;

  // Pointer to the wrapped grpc_server.
  grpc_server* server_;

  std::unique_ptr<ServerInitializer> server_initializer_;

  std::unique_ptr<ContextAllocator> context_allocator_;

  std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
  bool health_check_service_disabled_;

  // When appropriate, use a default callback generic service to handle
  // unimplemented methods
#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
  std::unique_ptr<CallbackGenericService> unimplemented_service_;
#else
  std::unique_ptr<experimental::CallbackGenericService> unimplemented_service_;
#endif

  // A special handler for resource exhausted in sync case
  std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;

  // Handler for callback generic service, if any
  std::unique_ptr<internal::MethodHandler> generic_handler_;

  // callback_cq_ references the callbackable completion queue associated
  // with this server (if any). It is set on the first call to CallbackCQ().
  // It is _not owned_ by the server; ownership belongs with its internal
  // shutdown callback tag (invoked when the CQ is fully shutdown).
  std::atomic<CompletionQueue*> callback_cq_{nullptr};

  // List of CQs passed in by user that must be Shutdown only after Server is
  // Shutdown.  Even though this is only used with NDEBUG, instantiate it in all
  // cases since otherwise the size will be inconsistent.
  std::vector<CompletionQueue*> cq_list_;
};

}  // namespace grpc

#endif  // GRPCPP_SERVER_H