summaryrefslogtreecommitdiff
path: root/androidx/paging/RxPagedListBuilder.java
blob: f98bc1ecf81c3e1ce90dd95fa35680a5a1c2d9dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/*
 * Copyright 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.paging;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.arch.core.executor.ArchTaskExecutor;

import java.util.concurrent.Executor;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.functions.Cancellable;
import io.reactivex.schedulers.Schedulers;

/**
 * Builder for {@code Observable<PagedList>} or {@code Flowable<PagedList>}, given a
 * {@link DataSource.Factory} and a {@link PagedList.Config}.
 * <p>
 * The required parameters are in the constructor, so you can simply construct and build, or
 * optionally enable extra features (such as initial load key, or BoundaryCallback).
 * <p>
 * The returned observable/flowable will already be subscribed on the
 * {@link #setFetchScheduler(Scheduler)}, and will perform all loading on that scheduler. It will
 * already be observed on {@link #setNotifyScheduler(Scheduler)}, and will dispatch new PagedLists,
 * as well as their updates to that scheduler.
 *
 * @param <Key> Type of input valued used to load data from the DataSource. Must be integer if
 *             you're using PositionalDataSource.
 * @param <Value> Item type being presented.
 */
public final class RxPagedListBuilder<Key, Value> {
    private Key mInitialLoadKey;
    private PagedList.Config mConfig;
    private DataSource.Factory<Key, Value> mDataSourceFactory;
    private PagedList.BoundaryCallback mBoundaryCallback;
    private Executor mNotifyExecutor;
    private Executor mFetchExecutor;
    private Scheduler mFetchScheduler;
    private Scheduler mNotifyScheduler;

    /**
     * Creates a RxPagedListBuilder with required parameters.
     *
     * @param dataSourceFactory DataSource factory providing DataSource generations.
     * @param config Paging configuration.
     */
    public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
            @NonNull PagedList.Config config) {
        //noinspection ConstantConditions
        if (config == null) {
            throw new IllegalArgumentException("PagedList.Config must be provided");
        }
        //noinspection ConstantConditions
        if (dataSourceFactory == null) {
            throw new IllegalArgumentException("DataSource.Factory must be provided");
        }
        mDataSourceFactory = dataSourceFactory;
        mConfig = config;
    }

    /**
     * Creates a RxPagedListBuilder with required parameters.
     * <p>
     * This method is a convenience for:
     * <pre>
     * RxPagedListBuilder(dataSourceFactory,
     *         new PagedList.Config.Builder().setPageSize(pageSize).build())
     * </pre>
     *
     * @param dataSourceFactory DataSource.Factory providing DataSource generations.
     * @param pageSize Size of pages to load.
     */
    @SuppressWarnings("unused")
    public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
            int pageSize) {
        this(dataSourceFactory, new PagedList.Config.Builder().setPageSize(pageSize).build());
    }

    /**
     * First loading key passed to the first PagedList/DataSource.
     * <p>
     * When a new PagedList/DataSource pair is created after the first, it acquires a load key from
     * the previous generation so that data is loaded around the position already being observed.
     *
     * @param key Initial load key passed to the first PagedList/DataSource.
     * @return this
     */
    @SuppressWarnings("unused")
    @NonNull
    public RxPagedListBuilder<Key, Value> setInitialLoadKey(@Nullable Key key) {
        mInitialLoadKey = key;
        return this;
    }

    /**
     * Sets a {@link PagedList.BoundaryCallback} on each PagedList created, typically used to load
     * additional data from network when paging from local storage.
     * <p>
     * Pass a BoundaryCallback to listen to when the PagedList runs out of data to load. If this
     * method is not called, or {@code null} is passed, you will not be notified when each
     * DataSource runs out of data to provide to its PagedList.
     * <p>
     * If you are paging from a DataSource.Factory backed by local storage, you can set a
     * BoundaryCallback to know when there is no more information to page from local storage.
     * This is useful to page from the network when local storage is a cache of network data.
     * <p>
     * Note that when using a BoundaryCallback with a {@code Observable<PagedList>}, method calls
     * on the callback may be dispatched multiple times - one for each PagedList/DataSource
     * pair. If loading network data from a BoundaryCallback, you should prevent multiple
     * dispatches of the same method from triggering multiple simultaneous network loads.
     *
     * @param boundaryCallback The boundary callback for listening to PagedList load state.
     * @return this
     */
    @SuppressWarnings("unused")
    @NonNull
    public RxPagedListBuilder<Key, Value> setBoundaryCallback(
            @Nullable PagedList.BoundaryCallback<Value> boundaryCallback) {
        mBoundaryCallback = boundaryCallback;
        return this;
    }

    /**
     * Sets scheduler which will be used for observing new PagedLists, as well as loading updates
     * within the PagedLists.
     * <p>
     * The built observable will be {@link Observable#observeOn(Scheduler) observed on} this
     * scheduler, so that the thread receiving PagedLists will also receive the internal updates to
     * the PagedList.
     *
     * @param scheduler Scheduler for background DataSource loading.
     * @return this
     */
    public RxPagedListBuilder<Key, Value> setNotifyScheduler(
            final @NonNull Scheduler scheduler) {
        mNotifyScheduler = scheduler;
        final Scheduler.Worker worker = scheduler.createWorker();
        mNotifyExecutor = new Executor() {
            @Override
            public void execute(@NonNull Runnable command) {
                // We use a worker here since the page load notifications
                // should not be dispatched in parallel
                worker.schedule(command);
            }
        };
        return this;
    }

    /**
     * Sets scheduler which will be used for background fetching of PagedLists, as well as on-demand
     * fetching of pages inside.
     *
     * @param scheduler Scheduler for background DataSource loading.
     * @return this
     */
    @SuppressWarnings({"unused", "WeakerAccess"})
    @NonNull
    public RxPagedListBuilder<Key, Value> setFetchScheduler(
            final @NonNull Scheduler scheduler) {
        mFetchExecutor = new Executor() {
            @Override
            public void execute(@NonNull Runnable command) {
                // We use scheduleDirect since the page loads that use
                // executor are intentionally parallel.
                scheduler.scheduleDirect(command);
            }
        };
        mFetchScheduler = scheduler;
        return this;
    }

    /**
     * Constructs a {@code Observable<PagedList>}.
     * <p>
     * The returned Observable will already be observed on the
     * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
     * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
     *
     * @return The Observable of PagedLists
     */
    @NonNull
    public Observable<PagedList<Value>> buildObservable() {
        if (mNotifyExecutor == null) {
            mNotifyExecutor = ArchTaskExecutor.getMainThreadExecutor();
            mNotifyScheduler = Schedulers.from(mNotifyExecutor);
        }
        if (mFetchExecutor == null) {
            mFetchExecutor = ArchTaskExecutor.getIOThreadExecutor();
            mFetchScheduler = Schedulers.from(mFetchExecutor);
        }
        return Observable.create(new PagingObservableOnSubscribe<>(
                mInitialLoadKey,
                mConfig,
                mBoundaryCallback,
                mDataSourceFactory,
                mNotifyExecutor,
                mFetchExecutor))
                        .observeOn(mNotifyScheduler)
                        .subscribeOn(mFetchScheduler);
    }

    /**
     * Constructs a {@code Flowable<PagedList>}.
     *
     * The returned Observable will already be observed on the
     * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
     * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
     *
     * @param backpressureStrategy BackpressureStrategy for the Flowable to use.
     * @return The Flowable of PagedLists
     */
    @NonNull
    public Flowable<PagedList<Value>> buildFlowable(BackpressureStrategy backpressureStrategy) {
        return buildObservable()
                .toFlowable(backpressureStrategy);
    }

    static class PagingObservableOnSubscribe<Key, Value>
            implements ObservableOnSubscribe<PagedList<Value>>, DataSource.InvalidatedCallback,
            Cancellable,
            Runnable {

        @Nullable
        private final Key mInitialLoadKey;
        @NonNull
        private final PagedList.Config mConfig;
        @Nullable
        private final PagedList.BoundaryCallback mBoundaryCallback;
        @NonNull
        private final DataSource.Factory<Key, Value> mDataSourceFactory;
        @NonNull
        private final Executor mNotifyExecutor;
        @NonNull
        private final Executor mFetchExecutor;

        @Nullable
        private PagedList<Value> mList;
        @Nullable
        private DataSource<Key, Value> mDataSource;

        private ObservableEmitter<PagedList<Value>> mEmitter;

        private PagingObservableOnSubscribe(@Nullable Key initialLoadKey,
                @NonNull PagedList.Config config,
                @Nullable PagedList.BoundaryCallback boundaryCallback,
                @NonNull DataSource.Factory<Key, Value> dataSourceFactory,
                @NonNull Executor notifyExecutor,
                @NonNull Executor fetchExecutor) {
            mInitialLoadKey = initialLoadKey;
            mConfig = config;
            mBoundaryCallback = boundaryCallback;
            mDataSourceFactory = dataSourceFactory;
            mNotifyExecutor = notifyExecutor;
            mFetchExecutor = fetchExecutor;
        }

        @Override
        public void subscribe(ObservableEmitter<PagedList<Value>> emitter)
                throws Exception {
            mEmitter = emitter;
            mEmitter.setCancellable(this);

            // known that subscribe is already on fetchScheduler
            mEmitter.onNext(createPagedList());
        }

        @Override
        public void cancel() throws Exception {
            if (mDataSource != null) {
                mDataSource.removeInvalidatedCallback(this);
            }
        }

        @Override
        public void run() {
            // fetch data, run on fetchExecutor
            mEmitter.onNext(createPagedList());
        }

        @Override
        public void onInvalidated() {
            if (!mEmitter.isDisposed()) {
                mFetchExecutor.execute(this);
            }
        }

        private PagedList<Value> createPagedList() {
            @Nullable Key initializeKey = mInitialLoadKey;
            if (mList != null) {
                //noinspection unchecked
                initializeKey = (Key) mList.getLastKey();
            }

            do {
                if (mDataSource != null) {
                    mDataSource.removeInvalidatedCallback(this);
                }
                mDataSource = mDataSourceFactory.create();
                mDataSource.addInvalidatedCallback(this);

                mList = new PagedList.Builder<>(mDataSource, mConfig)
                        .setNotifyExecutor(mNotifyExecutor)
                        .setFetchExecutor(mFetchExecutor)
                        .setBoundaryCallback(mBoundaryCallback)
                        .setInitialKey(initializeKey)
                        .build();
            } while (mList.isDetached());
            return mList;
        }
    }
}