aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/io/grpc/internal/GrpcUtil.java
blob: 75a97af0a9ca457533f7ecf0dfb6528e1f1d2aba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
/*
 * Copyright 2014 The gRPC Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.grpc.internal;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
import io.grpc.InternalMetadata;
import io.grpc.InternalMetadata.TrustedAsciiMarshaller;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ProxiedSocketAddress;
import io.grpc.ProxyDetector;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.StreamListener.MessageProducer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

/**
 * Common utilities for GRPC.
 */
public final class GrpcUtil {

  private static final Logger log = Logger.getLogger(GrpcUtil.class.getName());

  private static final Set<Status.Code> INAPPROPRIATE_CONTROL_PLANE_STATUS
      = Collections.unmodifiableSet(EnumSet.of(
          Status.Code.OK,
          Status.Code.INVALID_ARGUMENT,
          Status.Code.NOT_FOUND,
          Status.Code.ALREADY_EXISTS,
          Status.Code.FAILED_PRECONDITION,
          Status.Code.ABORTED,
          Status.Code.OUT_OF_RANGE,
          Status.Code.DATA_LOSS));

  public static final Charset US_ASCII = Charset.forName("US-ASCII");

  /**
   * {@link io.grpc.Metadata.Key} for the timeout header.
   */
  public static final Metadata.Key<Long> TIMEOUT_KEY =
          Metadata.Key.of(GrpcUtil.TIMEOUT, new TimeoutMarshaller());

  /**
   * {@link io.grpc.Metadata.Key} for the message encoding header.
   */
  public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
          Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);

  /**
   * {@link io.grpc.Metadata.Key} for the accepted message encodings header.
   */
  public static final Metadata.Key<byte[]> MESSAGE_ACCEPT_ENCODING_KEY =
      InternalMetadata.keyOf(GrpcUtil.MESSAGE_ACCEPT_ENCODING, new AcceptEncodingMarshaller());

  /**
   * {@link io.grpc.Metadata.Key} for the stream's content encoding header.
   */
  public static final Metadata.Key<String> CONTENT_ENCODING_KEY =
      Metadata.Key.of(GrpcUtil.CONTENT_ENCODING, Metadata.ASCII_STRING_MARSHALLER);

  /**
   * {@link io.grpc.Metadata.Key} for the stream's accepted content encoding header.
   */
  public static final Metadata.Key<byte[]> CONTENT_ACCEPT_ENCODING_KEY =
      InternalMetadata.keyOf(GrpcUtil.CONTENT_ACCEPT_ENCODING, new AcceptEncodingMarshaller());

  static final Metadata.Key<String> CONTENT_LENGTH_KEY =
      Metadata.Key.of("content-length", Metadata.ASCII_STRING_MARSHALLER);

  private static final class AcceptEncodingMarshaller implements TrustedAsciiMarshaller<byte[]> {
    @Override
    public byte[] toAsciiString(byte[] value) {
      return value;
    }

    @Override
    public byte[] parseAsciiString(byte[] serialized) {
      return serialized;
    }
  }

  /**
   * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
   */
  public static final Metadata.Key<String> CONTENT_TYPE_KEY =
          Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER);

  /**
   * {@link io.grpc.Metadata.Key} for the Transfer encoding.
   */
  public static final Metadata.Key<String> TE_HEADER =
      Metadata.Key.of("te", Metadata.ASCII_STRING_MARSHALLER);

  /**
   * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
   */
  public static final Metadata.Key<String> USER_AGENT_KEY =
          Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER);

  /**
   * The default port for plain-text connections.
   */
  public static final int DEFAULT_PORT_PLAINTEXT = 80;

  /**
   * The default port for SSL connections.
   */
  public static final int DEFAULT_PORT_SSL = 443;

  /**
   * Content-Type used for GRPC-over-HTTP/2.
   */
  public static final String CONTENT_TYPE_GRPC = "application/grpc";

  /**
   * The HTTP method used for GRPC requests.
   */
  public static final String HTTP_METHOD = "POST";

  /**
   * The TE (transport encoding) header for requests over HTTP/2.
   */
  public static final String TE_TRAILERS = "trailers";

  /**
   * The Timeout header name.
   */
  public static final String TIMEOUT = "grpc-timeout";

  /**
   * The message encoding (i.e. compression) that can be used in the stream.
   */
  public static final String MESSAGE_ENCODING = "grpc-encoding";

  /**
   * The accepted message encodings (i.e. compression) that can be used in the stream.
   */
  public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding";

  /**
   * The content-encoding used to compress the full gRPC stream.
   */
  public static final String CONTENT_ENCODING = "content-encoding";

  /**
   * The accepted content-encodings that can be used to compress the full gRPC stream.
   */
  public static final String CONTENT_ACCEPT_ENCODING = "accept-encoding";

  /**
   * The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 4 MiB.
   */
  public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024;

  /**
   * The default maximum size (in bytes) for inbound header/trailer.
   */
  // Update documentation in public-facing Builders when changing this value.
  public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;

  public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();

  private static final String IMPLEMENTATION_VERSION = "1.54.0"; // CURRENT_GRPC_VERSION

  /**
   * The default timeout in nanos for a keepalive ping request.
   */
  public static final long DEFAULT_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);

  /**
   * The magic keepalive time value that disables client keepalive.
   */
  public static final long KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;

  /**
   * The default delay in nanos for server keepalive.
   */
  public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);

  /**
   * The default timeout in nanos for a server keepalive ping request.
   */
  public static final long DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);

  /**
   * The magic keepalive time value that disables keepalive.
   */
  public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;

  /**
   * The default proxy detector.
   */
  public static final ProxyDetector DEFAULT_PROXY_DETECTOR = new ProxyDetectorImpl();

  /**
   * A proxy detector that always claims no proxy is needed.
   */
  public static final ProxyDetector NOOP_PROXY_DETECTOR = new ProxyDetector() {
    @Nullable
    @Override
    public ProxiedSocketAddress proxyFor(SocketAddress targetServerAddress) {
      return null;
    }
  };

  /**
   * The very default load-balancing policy.
   */
  public static final String DEFAULT_LB_POLICY = "pick_first";

  /**
   * RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel}
   * will have this option with value {@code true}.  They will be treated differently from
   * the ones created by application.
   */
  public static final CallOptions.Key<Boolean> CALL_OPTIONS_RPC_OWNED_BY_BALANCER =
      CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER");

  private static final ClientStreamTracer NOOP_TRACER = new ClientStreamTracer() {};

  /**
   * Returns true if an RPC with the given properties should be counted when calculating the
   * in-use state of a transport.
   */
  public static boolean shouldBeCountedForInUse(CallOptions callOptions) {
    return !Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER));
  }

  /**
   * Maps HTTP error response status codes to transport codes, as defined in <a
   * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
   * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
   * {@code true}.
   */
  public static Status httpStatusToGrpcStatus(int httpStatusCode) {
    return httpStatusToGrpcCode(httpStatusCode).toStatus()
        .withDescription("HTTP status code " + httpStatusCode);
  }

  private static Status.Code httpStatusToGrpcCode(int httpStatusCode) {
    if (httpStatusCode >= 100 && httpStatusCode < 200) {
      // 1xx. These headers should have been ignored.
      return Status.Code.INTERNAL;
    }
    switch (httpStatusCode) {
      case HttpURLConnection.HTTP_BAD_REQUEST:  // 400
      case 431: // Request Header Fields Too Large
        // TODO(carl-mastrangelo): this should be added to the http-grpc-status-mapping.md doc.
        return Status.Code.INTERNAL;
      case HttpURLConnection.HTTP_UNAUTHORIZED:  // 401
        return Status.Code.UNAUTHENTICATED;
      case HttpURLConnection.HTTP_FORBIDDEN:  // 403
        return Status.Code.PERMISSION_DENIED;
      case HttpURLConnection.HTTP_NOT_FOUND:  // 404
        return Status.Code.UNIMPLEMENTED;
      case 429:  // Too Many Requests
      case HttpURLConnection.HTTP_BAD_GATEWAY:  // 502
      case HttpURLConnection.HTTP_UNAVAILABLE:  // 503
      case HttpURLConnection.HTTP_GATEWAY_TIMEOUT:  // 504
        return Status.Code.UNAVAILABLE;
      default:
        return Status.Code.UNKNOWN;
    }
  }

  /**
   * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames.
   */
  public enum Http2Error {
    /**
     * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with
     * {@code NO_ERROR}. In this case it is important to indicate to the application that the
     * request should be retried (i.e. {@link Status#UNAVAILABLE}).
     */
    NO_ERROR(0x0, Status.UNAVAILABLE),
    PROTOCOL_ERROR(0x1, Status.INTERNAL),
    INTERNAL_ERROR(0x2, Status.INTERNAL),
    FLOW_CONTROL_ERROR(0x3, Status.INTERNAL),
    SETTINGS_TIMEOUT(0x4, Status.INTERNAL),
    STREAM_CLOSED(0x5, Status.INTERNAL),
    FRAME_SIZE_ERROR(0x6, Status.INTERNAL),
    REFUSED_STREAM(0x7, Status.UNAVAILABLE),
    CANCEL(0x8, Status.CANCELLED),
    COMPRESSION_ERROR(0x9, Status.INTERNAL),
    CONNECT_ERROR(0xA, Status.INTERNAL),
    ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")),
    INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as "
        + "protocol is not secure enough to call")),
    HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN);

    // Populate a mapping of code to enum value for quick look-up.
    private static final Http2Error[] codeMap = buildHttp2CodeMap();

    private static Http2Error[] buildHttp2CodeMap() {
      Http2Error[] errors = Http2Error.values();
      int size = (int) errors[errors.length - 1].code() + 1;
      Http2Error[] http2CodeMap = new Http2Error[size];
      for (Http2Error error : errors) {
        int index = (int) error.code();
        http2CodeMap[index] = error;
      }
      return http2CodeMap;
    }

    private final int code;
    // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true
    // when there are exceptions in the Status, which is not true here.
    @SuppressWarnings("ImmutableEnumChecker")
    private final Status status;

    Http2Error(int code, Status status) {
      this.code = code;
      String description = "HTTP/2 error code: " + this.name();
      if (status.getDescription() != null) {
        description += " (" + status.getDescription() + ")";
      }
      this.status = status.withDescription(description);
    }

    /**
     * Gets the code for this error used on the wire.
     */
    public long code() {
      return code;
    }

    /**
     * Gets the {@link Status} associated with this HTTP/2 code.
     */
    public Status status() {
      return status;
    }

    /**
     * Looks up the HTTP/2 error code enum value for the specified code.
     *
     * @param code an HTTP/2 error code value.
     * @return the HTTP/2 error code enum or {@code null} if not found.
     */
    public static Http2Error forCode(long code) {
      if (code >= codeMap.length || code < 0) {
        return null;
      }
      return codeMap[(int) code];
    }

    /**
     * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code
     * forCode(code).status()}, to more easily conform to HTTP/2:
     *
     * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior.
     * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote>
     *
     * @param code the HTTP/2 error code.
     * @return a {@link Status} representing the given error.
     */
    public static Status statusForCode(long code) {
      Http2Error error = forCode(code);
      if (error == null) {
        // This "forgets" the message of INTERNAL_ERROR while keeping the same status code.
        Status.Code statusCode = INTERNAL_ERROR.status().getCode();
        return Status.fromCodeValue(statusCode.value())
            .withDescription("Unrecognized HTTP/2 error code: " + code);
      }

      return error.status();
    }
  }

  /**
   * Indicates whether or not the given value is a valid gRPC content-type.
   */
  public static boolean isGrpcContentType(String contentType) {
    if (contentType == null) {
      return false;
    }

    if (CONTENT_TYPE_GRPC.length() > contentType.length()) {
      return false;
    }

    contentType = contentType.toLowerCase();
    if (!contentType.startsWith(CONTENT_TYPE_GRPC)) {
      // Not a gRPC content-type.
      return false;
    }

    if (contentType.length() == CONTENT_TYPE_GRPC.length()) {
      // The strings match exactly.
      return true;
    }

    // The contentType matches, but is longer than the expected string.
    // We need to support variations on the content-type (e.g. +proto, +json) as defined by the
    // gRPC wire spec.
    char nextChar = contentType.charAt(CONTENT_TYPE_GRPC.length());
    return nextChar == '+' || nextChar == ';';
  }

  /**
   * Gets the User-Agent string for the gRPC transport.
   */
  public static String getGrpcUserAgent(
      String transportName, @Nullable String applicationUserAgent) {
    StringBuilder builder = new StringBuilder();
    if (applicationUserAgent != null) {
      builder.append(applicationUserAgent);
      builder.append(' ');
    }
    builder.append("grpc-java-");
    builder.append(transportName);
    builder.append('/');
    builder.append(IMPLEMENTATION_VERSION);
    return builder.toString();
  }

  @Immutable
  public static final class GrpcBuildVersion {
    private final String userAgent;
    private final String implementationVersion;

    private GrpcBuildVersion(String userAgent, String implementationVersion) {
      this.userAgent = Preconditions.checkNotNull(userAgent, "userAgentName");
      this.implementationVersion =
          Preconditions.checkNotNull(implementationVersion, "implementationVersion");
    }

    public String getUserAgent() {
      return userAgent;
    }

    public String getImplementationVersion() {
      return implementationVersion;
    }

    @Override
    public String toString() {
      return userAgent + " " + implementationVersion;
    }
  }

  /**
   * Returns the build version of gRPC.
   */
  public static GrpcBuildVersion getGrpcBuildVersion() {
    return new GrpcBuildVersion("gRPC Java", IMPLEMENTATION_VERSION);
  }

  /**
   * Parse an authority into a URI for retrieving the host and port.
   */
  public static URI authorityToUri(String authority) {
    Preconditions.checkNotNull(authority, "authority");
    URI uri;
    try {
      uri = new URI(null, authority, null, null, null);
    } catch (URISyntaxException ex) {
      throw new IllegalArgumentException("Invalid authority: " + authority, ex);
    }
    return uri;
  }

  /**
   * Verify {@code authority} is valid for use with gRPC. The syntax must be valid and it must not
   * include userinfo.
   *
   * @return the {@code authority} provided
   */
  public static String checkAuthority(String authority) {
    URI uri = authorityToUri(authority);
    checkArgument(uri.getHost() != null, "No host in authority '%s'", authority);
    checkArgument(uri.getUserInfo() == null,
        "Userinfo must not be present on authority: '%s'", authority);
    return authority;
  }

  /**
   * Combine a host and port into an authority string.
   */
  // There is a copy of this method in io.grpc.Grpc
  public static String authorityFromHostAndPort(String host, int port) {
    try {
      return new URI(null, null, host, port, null, null, null).getAuthority();
    } catch (URISyntaxException ex) {
      throw new IllegalArgumentException("Invalid host or port: " + host + " " + port, ex);
    }
  }

  /**
   * Shared executor for channels.
   */
  public static final Resource<Executor> SHARED_CHANNEL_EXECUTOR =
      new Resource<Executor>() {
        private static final String NAME = "grpc-default-executor";
        @Override
        public Executor create() {
          return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
        }

        @Override
        public void close(Executor instance) {
          ((ExecutorService) instance).shutdown();
        }

        @Override
        public String toString() {
          return NAME;
        }
      };

  /**
   * Shared single-threaded executor for managing channel timers.
   */
  public static final Resource<ScheduledExecutorService> TIMER_SERVICE =
      new Resource<ScheduledExecutorService>() {
        @Override
        public ScheduledExecutorService create() {
          // We don't use newSingleThreadScheduledExecutor because it doesn't return a
          // ScheduledThreadPoolExecutor.
          ScheduledExecutorService service = Executors.newScheduledThreadPool(
              1,
              getThreadFactory("grpc-timer-%d", true));

          // If there are long timeouts that are cancelled, they will not actually be removed from
          // the executors queue.  This forces immediate removal upon cancellation to avoid a
          // memory leak.  Reflection is used because we cannot use methods added in Java 1.7.  If
          // the method does not exist, we give up.  Note that the method is not present in 1.6, but
          // _is_ present in the android standard library.
          try {
            Method method = service.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class);
            method.invoke(service, true);
          } catch (NoSuchMethodException e) {
            // no op
          } catch (RuntimeException e) {
            throw e;
          } catch (Exception e) {
            throw new RuntimeException(e);
          }

          return Executors.unconfigurableScheduledExecutorService(service);
        }

        @Override
        public void close(ScheduledExecutorService instance) {
          instance.shutdown();
        }
      };


  /**
   * Get a {@link ThreadFactory} suitable for use in the current environment.
   * @param nameFormat to apply to threads created by the factory.
   * @param daemon {@code true} if the threads the factory creates are daemon threads, {@code false}
   *     otherwise.
   * @return a {@link ThreadFactory}.
   */
  public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
    return new ThreadFactoryBuilder()
        .setDaemon(daemon)
        .setNameFormat(nameFormat)
        .build();
  }

  /**
   * The factory of default Stopwatches.
   */
  public static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
      @Override
      public Stopwatch get() {
        return Stopwatch.createUnstarted();
      }
    };

  /**
   * Returns the host via {@link InetSocketAddress#getHostString} if it is possible,
   * i.e. in jdk >= 7.
   * Otherwise, return it via {@link InetSocketAddress#getHostName} which may incur a DNS lookup.
   */
  public static String getHost(InetSocketAddress addr) {
    try {
      Method getHostStringMethod = InetSocketAddress.class.getMethod("getHostString");
      return (String) getHostStringMethod.invoke(addr);
    } catch (NoSuchMethodException e) {
      // noop
    } catch (IllegalAccessException e) {
      // noop
    } catch (InvocationTargetException e) {
      // noop
    }
    return addr.getHostName();
  }

  /**
   * Marshals a nanoseconds representation of the timeout to and from a string representation,
   * consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a
   * unit. Available units:
   * n = nanoseconds
   * u = microseconds
   * m = milliseconds
   * S = seconds
   * M = minutes
   * H = hours
   *
   * <p>The representation is greedy with respect to precision. That is, 2 seconds will be
   * represented as `2000000u`.</p>
   *
   * <p>See <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">the
   * request header definition</a></p>
   */
  @VisibleForTesting
  static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> {

    @Override
    public String toAsciiString(Long timeoutNanos) {
      long cutoff = 100000000;
      TimeUnit unit = TimeUnit.NANOSECONDS;
      if (timeoutNanos < 0) {
        throw new IllegalArgumentException("Timeout too small");
      } else if (timeoutNanos < cutoff) {
        return timeoutNanos + "n";
      } else if (timeoutNanos < cutoff * 1000L) {
        return unit.toMicros(timeoutNanos) + "u";
      } else if (timeoutNanos < cutoff * 1000L * 1000L) {
        return unit.toMillis(timeoutNanos) + "m";
      } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L) {
        return unit.toSeconds(timeoutNanos) + "S";
      } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L * 60L) {
        return unit.toMinutes(timeoutNanos) + "M";
      } else {
        return unit.toHours(timeoutNanos) + "H";
      }
    }

    @Override
    public Long parseAsciiString(String serialized) {
      checkArgument(serialized.length() > 0, "empty timeout");
      checkArgument(serialized.length() <= 9, "bad timeout format");
      long value = Long.parseLong(serialized.substring(0, serialized.length() - 1));
      char unit = serialized.charAt(serialized.length() - 1);
      switch (unit) {
        case 'n':
          return value;
        case 'u':
          return TimeUnit.MICROSECONDS.toNanos(value);
        case 'm':
          return TimeUnit.MILLISECONDS.toNanos(value);
        case 'S':
          return TimeUnit.SECONDS.toNanos(value);
        case 'M':
          return TimeUnit.MINUTES.toNanos(value);
        case 'H':
          return TimeUnit.HOURS.toNanos(value);
        default:
          throw new IllegalArgumentException(String.format("Invalid timeout unit: %s", unit));
      }
    }
  }

  /**
   * Returns a transport out of a PickResult, or {@code null} if the result is "buffer".
   */
  @Nullable
  static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) {
    final ClientTransport transport;
    Subchannel subchannel = result.getSubchannel();
    if (subchannel != null) {
      transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
    } else {
      transport = null;
    }
    if (transport != null) {
      final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
      if (streamTracerFactory == null) {
        return transport;
      }
      return new ClientTransport() {
        @Override
        public ClientStream newStream(
            MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
            ClientStreamTracer[] tracers) {
          StreamInfo info = StreamInfo.newBuilder().setCallOptions(callOptions).build();
          ClientStreamTracer streamTracer =
              streamTracerFactory.newClientStreamTracer(info, headers);
          checkState(tracers[tracers.length - 1] == NOOP_TRACER, "lb tracer already assigned");
          tracers[tracers.length - 1] = streamTracer;
          return transport.newStream(method, headers, callOptions, tracers);
        }

        @Override
        public void ping(PingCallback callback, Executor executor) {
          transport.ping(callback, executor);
        }

        @Override
        public InternalLogId getLogId() {
          return transport.getLogId();
        }

        @Override
        public ListenableFuture<SocketStats> getStats() {
          return transport.getStats();
        }
      };
    }
    if (!result.getStatus().isOk()) {
      if (result.isDrop()) {
        return new FailingClientTransport(
            replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.DROPPED);
      }
      if (!isWaitForReady) {
        return new FailingClientTransport(
            replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.PROCESSED);
      }
    }
    return null;
  }

  /** Gets stream tracers based on CallOptions. */
  public static ClientStreamTracer[] getClientStreamTracers(
      CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
    List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
    ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
    StreamInfo streamInfo = StreamInfo.newBuilder()
        .setCallOptions(callOptions)
        .setPreviousAttempts(previousAttempts)
        .setIsTransparentRetry(isTransparentRetry)
        .build();
    for (int i = 0; i < factories.size(); i++) {
      tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers);
    }
    // Reserved to be set later by the lb as per the API contract of ClientTransport.newStream().
    // See also GrpcUtil.getTransportFromPickResult()
    tracers[tracers.length - 1] = NOOP_TRACER;
    return tracers;
  }

  /** Quietly closes all messages in MessageProducer. */
  static void closeQuietly(MessageProducer producer) {
    InputStream message;
    while ((message = producer.next()) != null) {
      closeQuietly(message);
    }
  }

  /**
   * Closes a Closeable, ignoring IOExceptions.
   * This method exists because Guava's {@code Closeables.closeQuietly()} is beta.
   */
  public static void closeQuietly(@Nullable Closeable message) {
    if (message == null) {
      return;
    }
    try {
      message.close();
    } catch (IOException ioException) {
      // do nothing except log
      log.log(Level.WARNING, "exception caught in closeQuietly", ioException);
    }
  }

  /** Reads {@code in} until end of stream. */
  public static void exhaust(InputStream in) throws IOException {
    byte[] buf = new byte[256];
    while (in.read(buf) != -1) {}
  }

  /**
   * Some status codes from the control plane are not appropritate to use in the data plane. If one
   * is given it will be replaced with INTERNAL, indicating a bug in the control plane
   * implementation.
   */
  public static Status replaceInappropriateControlPlaneStatus(Status status) {
    checkArgument(status != null);
    return INAPPROPRIATE_CONTROL_PLANE_STATUS.contains(status.getCode())
        ? Status.INTERNAL.withDescription(
        "Inappropriate status code from control plane: " + status.getCode() + " "
            + status.getDescription()).withCause(status.getCause()) : status;
  }

  /**
   * Checks whether the given item exists in the iterable.  This is copied from Guava Collect's
   * {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't
   * depend on it.
   */
  static <T> boolean iterableContains(Iterable<T> iterable, T item) {
    if (iterable instanceof Collection) {
      Collection<?> collection = (Collection<?>) iterable;
      try {
        return collection.contains(item);
      } catch (NullPointerException e) {
        return false;
      } catch (ClassCastException e) {
        return false;
      }
    }
    for (T i : iterable) {
      if (Objects.equal(i, item)) {
        return true;
      }
    }
    return false;
  }

  private GrpcUtil() {}
}