aboutsummaryrefslogtreecommitdiff
path: root/src/java.net.http/share/classes/jdk/internal/net/http/ConnectionPool.java
blob: e03dc34e900558164695380ec9eae5b4fb4aa853 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
/*
 * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;
import static jdk.internal.net.http.HttpClientImpl.KEEP_ALIVE_TIMEOUT; //seconds

/**
 * Http 1.1 connection pool.
 */
final class ConnectionPool {

    static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
            "jdk.httpclient.connectionPoolSize", 0); // unbounded
    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

    // Pools of idle connections

    private final ReentrantLock stateLock = new ReentrantLock();
    private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
    private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
    private final ExpiryList expiryList;
    private final String dbgTag; // used for debug
    volatile boolean stopped;

    /**
     * Entries in connection pool are keyed by destination address and/or
     * proxy address:
     * case 1: plain TCP not via proxy (destination IP only)
     * case 2: plain TCP via proxy (proxy only)
     * case 3: SSL not via proxy (destination IP+hostname only)
     * case 4: SSL over tunnel (destination IP+hostname and proxy)
     */
    static class CacheKey {
        final InetSocketAddress proxy;
        final InetSocketAddress destination;
        final boolean secure;

        private CacheKey(boolean secure, InetSocketAddress destination,
                         InetSocketAddress proxy) {
            this.proxy = proxy;
            this.destination = destination;
            this.secure = secure;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final CacheKey other = (CacheKey) obj;
            if (this.secure != other.secure) {
                return false;
            }
            if (!Objects.equals(this.proxy, other.proxy)) {
                return false;
            }
            if (!Objects.equals(this.destination, other.destination)) {
                return false;
            }
            if (secure && destination != null) {
                if (destination.getHostName() != null) {
                    if (!destination.getHostName().equalsIgnoreCase(
                            other.destination.getHostName())) {
                        return false;
                    }
                } else {
                    if (other.destination.getHostName() != null)
                        return false;
                }
            }
            return true;
        }

        @Override
        public int hashCode() {
            return Objects.hash(proxy, destination);
        }
    }

    ConnectionPool(long clientId) {
        this("ConnectionPool("+clientId+")");
    }

    /**
     * There should be one of these per HttpClient.
     */
    private ConnectionPool(String tag) {
        dbgTag = tag;
        plainPool = new HashMap<>();
        sslPool = new HashMap<>();
        expiryList = new ExpiryList();
    }

    final String dbgString() {
        return dbgTag;
    }

    void start() {
        assert !stopped : "Already stopped";
    }

    static CacheKey cacheKey(boolean secure, InetSocketAddress destination,
                             InetSocketAddress proxy)
    {
        return new CacheKey(secure, destination, proxy);
    }

    HttpConnection getConnection(boolean secure,
                                 InetSocketAddress addr,
                                 InetSocketAddress proxy) {
        if (stopped) return null;
        stateLock.lock();
        try {
            return getConnection0(secure, addr, proxy);
        } finally {
            stateLock.unlock();
        }
    }

    private HttpConnection getConnection0(boolean secure,
                                          InetSocketAddress addr,
                                          InetSocketAddress proxy) {
        if (stopped) return null;
        // for plain (unsecure) proxy connection the destination address is irrelevant.
        addr = secure || proxy == null ? addr : null;
        CacheKey key = new CacheKey(secure, addr, proxy);
        HttpConnection c = secure ? findConnection(key, sslPool)
                                  : findConnection(key, plainPool);
        //System.out.println ("getConnection returning: " + c);
        assert c == null || c.isSecure() == secure;
        return c;
    }

    /**
     * Returns the connection to the pool.
     */
    void returnToPool(HttpConnection conn) {
        returnToPool(conn, Instant.now(), KEEP_ALIVE_TIMEOUT);
    }

    // Called also by whitebox tests
    void returnToPool(HttpConnection conn, Instant now, long keepAlive) {

        assert (conn instanceof PlainHttpConnection) || conn.isSecure()
            : "Attempting to return unsecure connection to SSL pool: "
                + conn.getClass();

        // Don't call registerCleanupTrigger while holding a lock,
        // but register it before the connection is added to the pool,
        // since we don't want to trigger the cleanup if the connection
        // is not in the pool.
        CleanupTrigger cleanup = registerCleanupTrigger(conn);

        // it's possible that cleanup may have been called.
        HttpConnection toClose = null;
        stateLock.lock();
        try {
            if (cleanup.isDone()) {
                return;
            } else if (stopped) {
                conn.close();
                return;
            }
            if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
                toClose = expiryList.removeOldest();
                if (toClose != null) removeFromPool(toClose);
            }
            if (conn instanceof PlainHttpConnection) {
                putConnection(conn, plainPool);
            } else {
                assert conn.isSecure();
                putConnection(conn, sslPool);
            }
            expiryList.add(conn, now, keepAlive);
        } finally {
            stateLock.unlock();
        }
        if (toClose != null) {
            if (debug.on()) {
                debug.log("Maximum pool size reached: removing oldest connection %s",
                          toClose.dbgString());
            }
            close(toClose);
        }
        //System.out.println("Return to pool: " + conn);
    }

    private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
        // Connect the connection flow to a pub/sub pair that will take the
        // connection out of the pool and close it if anything happens
        // while the connection is sitting in the pool.
        CleanupTrigger cleanup = new CleanupTrigger(conn);
        FlowTube flow = conn.getConnectionFlow();
        if (debug.on()) debug.log("registering %s", cleanup);
        flow.connectFlows(cleanup, cleanup);
        return cleanup;
    }

    private HttpConnection
    findConnection(CacheKey key,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        LinkedList<HttpConnection> l = pool.get(key);
        if (l == null || l.isEmpty()) {
            return null;
        } else {
            HttpConnection c = l.removeFirst();
            expiryList.remove(c);
            return c;
        }
    }

    /* called from cache cleaner only  */
    private boolean
    removeFromPool(HttpConnection c,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        //System.out.println("cacheCleaner removing: " + c);
        assert stateLock.isHeldByCurrentThread();
        CacheKey k = c.cacheKey();
        List<HttpConnection> l = pool.get(k);
        if (l == null || l.isEmpty()) {
            pool.remove(k);
            return false;
        }
        return l.remove(c);
    }

    private void
    putConnection(HttpConnection c,
                  HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        CacheKey key = c.cacheKey();
        LinkedList<HttpConnection> l = pool.get(key);
        if (l == null) {
            l = new LinkedList<>();
            pool.put(key, l);
        }
        l.add(c);
    }

    /**
     * Purge expired connection and return the number of milliseconds
     * in which the next connection is scheduled to expire.
     * If no connections are scheduled to be purged return 0.
     * @return the delay in milliseconds in which the next connection will
     *         expire.
     */
    long purgeExpiredConnectionsAndReturnNextDeadline() {
        if (!expiryList.purgeMaybeRequired()) return 0;
        return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
    }

    // Used for whitebox testing
    long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
        long nextPurge = 0;

        // We may be in the process of adding new elements
        // to the expiry list - but those elements will not
        // have outlast their keep alive timer yet since we're
        // just adding them.
        if (!expiryList.purgeMaybeRequired()) return nextPurge;

        List<HttpConnection> closelist;
        stateLock.lock();
        try {
            closelist = expiryList.purgeUntil(now);
            for (HttpConnection c : closelist) {
                if (c instanceof PlainHttpConnection) {
                    boolean wasPresent = removeFromPool(c, plainPool);
                    assert wasPresent;
                } else {
                    boolean wasPresent = removeFromPool(c, sslPool);
                    assert wasPresent;
                }
            }
            nextPurge = now.until(
                    expiryList.nextExpiryDeadline().orElse(now),
                    ChronoUnit.MILLIS);
        } finally {
            stateLock.unlock();
        }
        closelist.forEach(this::close);
        return nextPurge;
    }

    private void close(HttpConnection c) {
        try {
            c.close();
        } catch (Throwable e) {} // ignore
    }

    void stop() {
        List<HttpConnection> closelist = Collections.emptyList();
        try {
            stateLock.lock();
            try {
                stopped = true;
                closelist = expiryList.stream()
                    .map(e -> e.connection)
                    .collect(Collectors.toList());
                expiryList.clear();
                plainPool.clear();
                sslPool.clear();
            } finally {
                stateLock.unlock();
            }
        } finally {
            closelist.forEach(this::close);
        }
    }

    static final class ExpiryEntry {
        final HttpConnection connection;
        final Instant expiry; // absolute time in seconds of expiry time
        ExpiryEntry(HttpConnection connection, Instant expiry) {
            this.connection = connection;
            this.expiry = expiry;
        }
    }

    /**
     * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
     * deadline is at the tail of the list, and the entry with the farther
     * deadline is at the head. In the most common situation, new elements
     * will need to be added at the head (or close to it), and expired elements
     * will need to be purged from the tail.
     */
    private static final class ExpiryList {
        private final LinkedList<ExpiryEntry> list = new LinkedList<>();
        private volatile boolean mayContainEntries;

        int size() { return list.size(); }

        // A loosely accurate boolean whose value is computed
        // at the end of each operation performed on ExpiryList;
        // Does not require holding the ConnectionPool stateLock.
        boolean purgeMaybeRequired() {
            return mayContainEntries;
        }

        // Returns the next expiry deadline
        // should only be called while holding the ConnectionPool stateLock.
        Optional<Instant> nextExpiryDeadline() {
            if (list.isEmpty()) return Optional.empty();
            else return Optional.of(list.getLast().expiry);
        }

        // should only be called while holding the ConnectionPool stateLock.
        HttpConnection removeOldest() {
            ExpiryEntry entry = list.pollLast();
            return entry == null ? null : entry.connection;
        }

        // should only be called while holding the ConnectionPool stateLock.
        void add(HttpConnection conn) {
            add(conn, Instant.now(), KEEP_ALIVE_TIMEOUT);
        }

        // Used by whitebox test.
        void add(HttpConnection conn, Instant now, long keepAlive) {
            Instant then = now.truncatedTo(ChronoUnit.SECONDS)
                    .plus(keepAlive, ChronoUnit.SECONDS);

            // Elements with the farther deadline are at the head of
            // the list. It's more likely that the new element will
            // have the farthest deadline, and will need to be inserted
            // at the head of the list, so we're using an ascending
            // list iterator to find the right insertion point.
            ListIterator<ExpiryEntry> li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();

                if (then.isAfter(entry.expiry)) {
                    li.previous();
                    // insert here
                    li.add(new ExpiryEntry(conn, then));
                    mayContainEntries = true;
                    return;
                }
            }
            // last (or first) element of list (the last element is
            // the first when the list is empty)
            list.add(new ExpiryEntry(conn, then));
            mayContainEntries = true;
        }

        // should only be called while holding the ConnectionPool stateLock.
        void remove(HttpConnection c) {
            if (c == null || list.isEmpty()) return;
            ListIterator<ExpiryEntry> li = list.listIterator();
            while (li.hasNext()) {
                ExpiryEntry e = li.next();
                if (e.connection.equals(c)) {
                    li.remove();
                    mayContainEntries = !list.isEmpty();
                    return;
                }
            }
        }

        // should only be called while holding the ConnectionPool stateLock.
        // Purge all elements whose deadline is before now (now included).
        List<HttpConnection> purgeUntil(Instant now) {
            if (list.isEmpty()) return Collections.emptyList();

            List<HttpConnection> closelist = new ArrayList<>();

            // elements with the closest deadlines are at the tail
            // of the queue, so we're going to use a descending iterator
            // to remove them, and stop when we find the first element
            // that has not expired yet.
            Iterator<ExpiryEntry> li = list.descendingIterator();
            while (li.hasNext()) {
                ExpiryEntry entry = li.next();
                // use !isAfter instead of isBefore in order to
                // remove the entry if its expiry == now
                if (!entry.expiry.isAfter(now)) {
                    li.remove();
                    HttpConnection c = entry.connection;
                    closelist.add(c);
                } else break; // the list is sorted
            }
            mayContainEntries = !list.isEmpty();
            return closelist;
        }

        // should only be called while holding the ConnectionPool stateLock.
        java.util.stream.Stream<ExpiryEntry> stream() {
            return list.stream();
        }

        // should only be called while holding the ConnectionPool stateLock.
        void clear() {
            list.clear();
            mayContainEntries = false;
        }
    }

    // Remove a connection from the pool.
    // should only be called while holding the ConnectionPool stateLock.
    private void removeFromPool(HttpConnection c) {
        assert stateLock.isHeldByCurrentThread();
        if (c instanceof PlainHttpConnection) {
            removeFromPool(c, plainPool);
        } else {
            assert c.isSecure() : "connection " + c + " is not secure!";
            removeFromPool(c, sslPool);
        }
    }

    // Used by tests
    boolean contains(HttpConnection c) {
        stateLock.lock();
        try {
            return contains0(c);
        } finally {
            stateLock.unlock();
        }
    }

    private boolean contains0(HttpConnection c) {
        final CacheKey key = c.cacheKey();
        List<HttpConnection> list;
        if ((list = plainPool.get(key)) != null) {
            if (list.contains(c)) return true;
        }
        if ((list = sslPool.get(key)) != null) {
            if (list.contains(c)) return true;
        }
        return false;
    }

    void cleanup(HttpConnection c, Throwable error) {
        if (debug.on())
            debug.log("%s : ConnectionPool.cleanup(%s)",
                    String.valueOf(c.getConnectionFlow()), error);
        stateLock.lock();
        try {
            removeFromPool(c);
            expiryList.remove(c);
        } finally {
            stateLock.unlock();
        }
        c.close();
    }

    /**
     * An object that subscribes to the flow while the connection is in
     * the pool. Anything that comes in will cause the connection to be closed
     * and removed from the pool.
     */
    private final class CleanupTrigger implements
            FlowTube.TubeSubscriber, FlowTube.TubePublisher,
            Flow.Subscription {

        private final HttpConnection connection;
        private volatile boolean done;

        public CleanupTrigger(HttpConnection connection) {
            this.connection = connection;
        }

        public boolean isDone() { return done;}

        private void triggerCleanup(Throwable error) {
            done = true;
            cleanup(connection, error);
        }

        @Override public void request(long n) {}
        @Override public void cancel() {}

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(1);
        }
        @Override
        public void onError(Throwable error) { triggerCleanup(error); }
        @Override
        public void onComplete() { triggerCleanup(null); }
        @Override
        public void onNext(List<ByteBuffer> item) {
            triggerCleanup(new IOException("Data received while in pool"));
        }

        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            subscriber.onSubscribe(this);
        }

        @Override
        public String toString() {
            return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
        }
    }
}