/* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.volley; import android.os.Handler; import android.os.Looper; import android.os.SystemClock; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import com.android.volley.AsyncCache.OnGetCompleteCallback; import com.android.volley.AsyncNetwork.OnRequestComplete; import com.android.volley.Cache.Entry; import java.net.HttpURLConnection; import java.util.Comparator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * An asynchronous request dispatch queue. * *

Add requests to the queue with {@link #add(Request)}. Once completed, responses will be * delivered on the main thread (unless a custom {@link ResponseDelivery} has been provided) */ public class AsyncRequestQueue extends RequestQueue { /** Default number of blocking threads to start. */ private static final int DEFAULT_BLOCKING_THREAD_POOL_SIZE = 4; /** * AsyncCache used to retrieve and store responses. * *

{@code null} indicates use of blocking Cache. */ @Nullable private final AsyncCache mAsyncCache; /** AsyncNetwork used to perform nework requests. */ private final AsyncNetwork mNetwork; /** Executor for non-blocking tasks. */ private ExecutorService mNonBlockingExecutor; /** Executor to be used for non-blocking tasks that need to be scheduled. */ private ScheduledExecutorService mNonBlockingScheduledExecutor; /** * Executor for blocking tasks. * *

Some tasks in handling requests may not be easy to implement in a non-blocking way, such * as reading or parsing the response data. This executor is used to run these tasks. */ private ExecutorService mBlockingExecutor; /** * This interface may be used by advanced applications to provide custom executors according to * their needs. Apps must create ExecutorServices dynamically given a blocking queue rather than * providing them directly so that Volley can provide a PriorityQueue which will prioritize * requests according to Request#getPriority. */ private ExecutorFactory mExecutorFactory; /** Manage list of waiting requests and de-duplicate requests with same cache key. */ private final WaitingRequestManager mWaitingRequestManager = new WaitingRequestManager(this); /** * Sets all the variables, but processing does not begin until {@link #start()} is called. * * @param cache to use for persisting responses to disk. If an AsyncCache was provided, then * this will be a {@link ThrowingCache} * @param network to perform HTTP requests * @param asyncCache to use for persisting responses to disk. May be null to indicate use of * blocking cache * @param responseDelivery interface for posting responses and errors * @param executorFactory Interface to be used to provide custom executors according to the * users needs. */ private AsyncRequestQueue( Cache cache, AsyncNetwork network, @Nullable AsyncCache asyncCache, ResponseDelivery responseDelivery, ExecutorFactory executorFactory) { super(cache, network, /* threadPoolSize= */ 0, responseDelivery); mAsyncCache = asyncCache; mNetwork = network; mExecutorFactory = executorFactory; } /** Sets the executors and initializes the cache. */ @Override public void start() { stop(); // Make sure any currently running threads are stopped // Create blocking / non-blocking executors and set them in the network and stack. mNonBlockingExecutor = mExecutorFactory.createNonBlockingExecutor(getBlockingQueue()); mBlockingExecutor = mExecutorFactory.createBlockingExecutor(getBlockingQueue()); mNonBlockingScheduledExecutor = mExecutorFactory.createNonBlockingScheduledExecutor(); mNetwork.setBlockingExecutor(mBlockingExecutor); mNetwork.setNonBlockingExecutor(mNonBlockingExecutor); mNetwork.setNonBlockingScheduledExecutor(mNonBlockingScheduledExecutor); mNonBlockingExecutor.execute( new Runnable() { @Override public void run() { // This is intentionally blocking, because we don't want to process any // requests until the cache is initialized. if (mAsyncCache != null) { final CountDownLatch latch = new CountDownLatch(1); mAsyncCache.initialize( new AsyncCache.OnWriteCompleteCallback() { @Override public void onWriteComplete() { latch.countDown(); } }); try { latch.await(); } catch (InterruptedException e) { VolleyLog.e( e, "Thread was interrupted while initializing the cache."); Thread.currentThread().interrupt(); throw new RuntimeException(e); } } else { getCache().initialize(); } } }); } /** Shuts down and nullifies both executors */ @Override public void stop() { if (mNonBlockingExecutor != null) { mNonBlockingExecutor.shutdownNow(); mNonBlockingExecutor = null; } if (mBlockingExecutor != null) { mBlockingExecutor.shutdownNow(); mBlockingExecutor = null; } if (mNonBlockingScheduledExecutor != null) { mNonBlockingScheduledExecutor.shutdownNow(); mNonBlockingScheduledExecutor = null; } } /** Begins the request by sending it to the Cache or Network. */ @Override void beginRequest(Request request) { // If the request is uncacheable, send it over the network. if (request.shouldCache()) { if (mAsyncCache != null) { mNonBlockingExecutor.execute(new CacheTask<>(request)); } else { mBlockingExecutor.execute(new CacheTask<>(request)); } } else { sendRequestOverNetwork(request); } } @Override void sendRequestOverNetwork(Request request) { mNonBlockingExecutor.execute(new NetworkTask<>(request)); } /** Runnable that gets an entry from the cache. */ private class CacheTask extends RequestTask { CacheTask(Request request) { super(request); } @Override public void run() { // If the request has been canceled, don't bother dispatching it. if (mRequest.isCanceled()) { mRequest.finish("cache-discard-canceled"); return; } mRequest.addMarker("cache-queue-take"); // Attempt to retrieve this item from cache. if (mAsyncCache != null) { mAsyncCache.get( mRequest.getCacheKey(), new OnGetCompleteCallback() { @Override public void onGetComplete(Entry entry) { handleEntry(entry, mRequest); } }); } else { Entry entry = getCache().get(mRequest.getCacheKey()); handleEntry(entry, mRequest); } } } /** Helper method that handles the cache entry after getting it from the Cache. */ private void handleEntry(final Entry entry, final Request mRequest) { if (entry == null) { mRequest.addMarker("cache-miss"); // Cache miss; send off to the network dispatcher. if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { sendRequestOverNetwork(mRequest); } return; } // If it is completely expired, just send it to the network. if (entry.isExpired()) { mRequest.addMarker("cache-hit-expired"); mRequest.setCacheEntry(entry); if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { sendRequestOverNetwork(mRequest); } return; } // We have a cache hit; parse its data for delivery back to the request. mBlockingExecutor.execute(new CacheParseTask<>(mRequest, entry)); } private class CacheParseTask extends RequestTask { Cache.Entry entry; CacheParseTask(Request request, Cache.Entry entry) { super(request); this.entry = entry; } @Override public void run() { mRequest.addMarker("cache-hit"); Response response = mRequest.parseNetworkResponse( new NetworkResponse( HttpURLConnection.HTTP_OK, entry.data, /* notModified= */ false, /* networkTimeMs= */ 0, entry.allResponseHeaders)); mRequest.addMarker("cache-hit-parsed"); if (!entry.refreshNeeded()) { // Completely unexpired cache hit. Just deliver the response. getResponseDelivery().postResponse(mRequest, response); } else { // Soft-expired cache hit. We can deliver the cached response, // but we need to also send the request to the network for // refreshing. mRequest.addMarker("cache-hit-refresh-needed"); mRequest.setCacheEntry(entry); // Mark the response as intermediate. response.intermediate = true; if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { // Post the intermediate response back to the user and have // the delivery then forward the request along to the network. getResponseDelivery() .postResponse( mRequest, response, new Runnable() { @Override public void run() { sendRequestOverNetwork(mRequest); } }); } else { // request has been added to list of waiting requests // to receive the network response from the first request once it // returns. getResponseDelivery().postResponse(mRequest, response); } } } } private class ParseErrorTask extends RequestTask { VolleyError volleyError; ParseErrorTask(Request request, VolleyError volleyError) { super(request); this.volleyError = volleyError; } @Override public void run() { VolleyError parsedError = mRequest.parseNetworkError(volleyError); getResponseDelivery().postError(mRequest, parsedError); mRequest.notifyListenerResponseNotUsable(); } } /** Runnable that performs the network request */ private class NetworkTask extends RequestTask { NetworkTask(Request request) { super(request); } @Override public void run() { // If the request was cancelled already, do not perform the network request. if (mRequest.isCanceled()) { mRequest.finish("network-discard-cancelled"); mRequest.notifyListenerResponseNotUsable(); return; } final long startTimeMs = SystemClock.elapsedRealtime(); mRequest.addMarker("network-queue-take"); // TODO: Figure out what to do with traffic stats tags. Can this be pushed to the // HTTP stack, or is it no longer feasible to support? // Perform the network request. mNetwork.performRequest( mRequest, new OnRequestComplete() { @Override public void onSuccess(final NetworkResponse networkResponse) { mRequest.addMarker("network-http-complete"); // If the server returned 304 AND we delivered a response already, // we're done -- don't deliver a second identical response. if (networkResponse.notModified && mRequest.hasHadResponseDelivered()) { mRequest.finish("not-modified"); mRequest.notifyListenerResponseNotUsable(); return; } // Parse the response here on the worker thread. mBlockingExecutor.execute( new NetworkParseTask<>(mRequest, networkResponse)); } @Override public void onError(final VolleyError volleyError) { volleyError.setNetworkTimeMs( SystemClock.elapsedRealtime() - startTimeMs); mBlockingExecutor.execute(new ParseErrorTask<>(mRequest, volleyError)); } }); } } /** Runnable that parses a network response. */ private class NetworkParseTask extends RequestTask { NetworkResponse networkResponse; NetworkParseTask(Request request, NetworkResponse networkResponse) { super(request); this.networkResponse = networkResponse; } @Override public void run() { final Response response = mRequest.parseNetworkResponse(networkResponse); mRequest.addMarker("network-parse-complete"); // Write to cache if applicable. // TODO: Only update cache metadata instead of entire // record for 304s. if (mRequest.shouldCache() && response.cacheEntry != null) { if (mAsyncCache != null) { mNonBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); } else { mBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); } } else { finishRequest(mRequest, response, /* cached= */ false); } } } private class CachePutTask extends RequestTask { Response response; CachePutTask(Request request, Response response) { super(request); this.response = response; } @Override public void run() { if (mAsyncCache != null) { mAsyncCache.put( mRequest.getCacheKey(), response.cacheEntry, new AsyncCache.OnWriteCompleteCallback() { @Override public void onWriteComplete() { finishRequest(mRequest, response, /* cached= */ true); } }); } else { getCache().put(mRequest.getCacheKey(), response.cacheEntry); finishRequest(mRequest, response, /* cached= */ true); } } } /** Posts response and notifies listener */ private void finishRequest(Request mRequest, Response response, boolean cached) { if (cached) { mRequest.addMarker("network-cache-written"); } // Post the response back. mRequest.markDelivered(); getResponseDelivery().postResponse(mRequest, response); mRequest.notifyListenerResponseReceived(response); } /** * This class may be used by advanced applications to provide custom executors according to * their needs. Apps must create ExecutorServices dynamically given a blocking queue rather than * providing them directly so that Volley can provide a PriorityQueue which will prioritize * requests according to Request#getPriority. */ public abstract static class ExecutorFactory { abstract ExecutorService createNonBlockingExecutor(BlockingQueue taskQueue); abstract ExecutorService createBlockingExecutor(BlockingQueue taskQueue); abstract ScheduledExecutorService createNonBlockingScheduledExecutor(); } /** Provides a BlockingQueue to be used to create executors. */ private static PriorityBlockingQueue getBlockingQueue() { return new PriorityBlockingQueue<>( /* initialCapacity= */ 11, new Comparator() { @Override public int compare(Runnable r1, Runnable r2) { // Vanilla runnables are prioritized first, then RequestTasks are ordered // by the underlying Request. if (r1 instanceof RequestTask) { if (r2 instanceof RequestTask) { return ((RequestTask) r1).compareTo(((RequestTask) r2)); } return 1; } return r2 instanceof RequestTask ? -1 : 0; } }); } /** * Builder is used to build an instance of {@link AsyncRequestQueue} from values configured by * the setters. */ public static class Builder { @Nullable private AsyncCache mAsyncCache = null; private final AsyncNetwork mNetwork; @Nullable private Cache mCache = null; @Nullable private ExecutorFactory mExecutorFactory = null; @Nullable private ResponseDelivery mResponseDelivery = null; public Builder(AsyncNetwork asyncNetwork) { if (asyncNetwork == null) { throw new IllegalArgumentException("Network cannot be null"); } mNetwork = asyncNetwork; } /** * Sets the executor factory to be used by the AsyncRequestQueue. If this is not called, * Volley will create suitable private thread pools. */ public Builder setExecutorFactory(ExecutorFactory executorFactory) { mExecutorFactory = executorFactory; return this; } /** * Sets the response deliver to be used by the AsyncRequestQueue. If this is not called, we * will default to creating a new {@link ExecutorDelivery} with the application's main * thread. */ public Builder setResponseDelivery(ResponseDelivery responseDelivery) { mResponseDelivery = responseDelivery; return this; } /** Sets the AsyncCache to be used by the AsyncRequestQueue. */ public Builder setAsyncCache(AsyncCache asyncCache) { mAsyncCache = asyncCache; return this; } /** Sets the Cache to be used by the AsyncRequestQueue. */ public Builder setCache(Cache cache) { mCache = cache; return this; } /** Provides a default ExecutorFactory to use, if one is never set. */ private ExecutorFactory getDefaultExecutorFactory() { return new ExecutorFactory() { @Override public ExecutorService createNonBlockingExecutor( BlockingQueue taskQueue) { return getNewThreadPoolExecutor( /* maximumPoolSize= */ 1, /* threadNameSuffix= */ "Non-BlockingExecutor", taskQueue); } @Override public ExecutorService createBlockingExecutor(BlockingQueue taskQueue) { return getNewThreadPoolExecutor( /* maximumPoolSize= */ DEFAULT_BLOCKING_THREAD_POOL_SIZE, /* threadNameSuffix= */ "BlockingExecutor", taskQueue); } @Override public ScheduledExecutorService createNonBlockingScheduledExecutor() { return new ScheduledThreadPoolExecutor( /* corePoolSize= */ 0, getThreadFactory("ScheduledExecutor")); } private ThreadPoolExecutor getNewThreadPoolExecutor( int maximumPoolSize, final String threadNameSuffix, BlockingQueue taskQueue) { return new ThreadPoolExecutor( /* corePoolSize= */ 0, /* maximumPoolSize= */ maximumPoolSize, /* keepAliveTime= */ 60, /* unit= */ TimeUnit.SECONDS, taskQueue, getThreadFactory(threadNameSuffix)); } private ThreadFactory getThreadFactory(final String threadNameSuffix) { return new ThreadFactory() { @Override public Thread newThread(@NonNull Runnable runnable) { Thread t = Executors.defaultThreadFactory().newThread(runnable); t.setName("Volley-" + threadNameSuffix); return t; } }; } }; } public AsyncRequestQueue build() { // If neither cache is set by the caller, throw an illegal argument exception. if (mCache == null && mAsyncCache == null) { throw new IllegalArgumentException("You must set one of the cache objects"); } if (mCache == null) { // if no cache is provided, we will provide one that throws // UnsupportedOperationExceptions to pass into the parent class. mCache = new ThrowingCache(); } if (mResponseDelivery == null) { mResponseDelivery = new ExecutorDelivery(new Handler(Looper.getMainLooper())); } if (mExecutorFactory == null) { mExecutorFactory = getDefaultExecutorFactory(); } return new AsyncRequestQueue( mCache, mNetwork, mAsyncCache, mResponseDelivery, mExecutorFactory); } } /** A cache that throws an error if a method is called. */ private static class ThrowingCache implements Cache { @Override public Entry get(String key) { throw new UnsupportedOperationException(); } @Override public void put(String key, Entry entry) { throw new UnsupportedOperationException(); } @Override public void initialize() { throw new UnsupportedOperationException(); } @Override public void invalidate(String key, boolean fullExpire) { throw new UnsupportedOperationException(); } @Override public void remove(String key) { throw new UnsupportedOperationException(); } @Override public void clear() { throw new UnsupportedOperationException(); } } }