Class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>>
- Type Parameters:
C- the type of the continuation tokenT- The type of elements in aContinuablePageP- TheContinuablePageholding items of typeT.
- All Implemented Interfaces:
org.reactivestreams.Publisher<T>,CorePublisher<T>
- Direct Known Subclasses:
PagedFluxBase
ContinuablePagedFlux.
This type is a Flux that provides the ability to operate on pages of type ContinuablePage and individual
items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a
previously-retrieved continuation token.
The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return
PageRetriever when called. The provider is invoked for each Subscription to this Flux. Given provider is
called per Subscription, the provider implementation can create one or more objects to store any state and Page
Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The
Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned
by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by
the Flux returned by the Page Retriever has null continuation token.
Extending PagedFluxCore for Custom Continuation Token support
class ContinuationState<C> {
private C lastContinuationToken;
private boolean isDone;
ContinuationState(C token) {
this.lastContinuationToken = token;
}
void setLastContinuationToken(C token) {
this.isDone = token == null;
this.lastContinuationToken = token;
}
C getLastContinuationToken() {
return this.lastContinuationToken;
}
boolean isDone() {
return this.isDone;
}
}
class FileContinuationToken {
private final int nextLinkId;
FileContinuationToken(int nextLinkId) {
this.nextLinkId = nextLinkId;
}
public int getNextLinkId() {
return nextLinkId;
}
}
class File {
private final String guid;
File(String guid) {
this.guid = guid;
}
public String getGuid() {
return guid;
}
}
class FilePage implements ContinuablePage<FileContinuationToken, File> {
private final IterableStream<File> elements;
private final FileContinuationToken fileContinuationToken;
FilePage(List<File> elements, FileContinuationToken fileContinuationToken) {
this.elements = IterableStream.of(elements);
this.fileContinuationToken = fileContinuationToken;
}
@Override
public IterableStream<File> getElements() {
return elements;
}
@Override
public FileContinuationToken getContinuationToken() {
return fileContinuationToken;
}
}
class FileShareServiceClient {
Flux<FilePage> getFilePages(FileContinuationToken token) {
List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString()));
if (token.getNextLinkId() < 10) {
return Flux.just(new FilePage(files, null));
} else {
return Flux.just(new FilePage(files,
new FileContinuationToken((int) Math.floor(Math.random() * 20))));
}
}
}
FileShareServiceClient client = new FileShareServiceClient();
Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () ->
(continuationToken, pageSize) -> client.getFilePages(continuationToken);
class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> {
FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>>
pageRetrieverProvider) {
super(pageRetrieverProvider);
}
}
FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);
- See Also:
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider) Creates an instance ofContinuablePagedFluxCore.protectedContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) Creates an instance ofContinuablePagedFluxCore.protectedContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate) Creates an instance ofContinuablePagedFluxCore. -
Method Summary
Modifier and TypeMethodDescriptionbyPage()Gets aFluxofContinuablePagestarting at the first page.byPage(int preferredPageSize) Gets aFluxofContinuablePagestarting at the first page requesting each page to contain a number of elements equal to the preferred page size.Gets aFluxofContinuablePagebeginning at the page identified by the given continuation token.Gets aFluxofContinuablePagebeginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.Get the page size configured thisContinuablePagedFluxCore.voidsubscribe(CoreSubscriber<? super T> coreSubscriber) Subscribe to consume all items of typeTin the sequence respectively.Methods inherited from class com.azure.core.util.paging.ContinuablePagedFlux
getContinuationPredicateMethods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextCapture, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergePriority, mergePriority, mergePriority, mergePriorityDelayError, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorComplete, onErrorComplete, onErrorComplete, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, tap, tap, tap, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
-
Constructor Details
-
ContinuablePagedFluxCore
Creates an instance ofContinuablePagedFluxCore.- Parameters:
pageRetrieverProvider- a provider that returnsPageRetriever.- Throws:
NullPointerException- IfpageRetrieverProvideris null.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) Creates an instance ofContinuablePagedFluxCore.- Parameters:
pageRetrieverProvider- a provider that returnsPageRetriever.pageSize- the preferred page size- Throws:
NullPointerException- IfpageRetrieverProvideris null.IllegalArgumentException- IfpageSizeis less than or equal to zero.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate) Creates an instance ofContinuablePagedFluxCore.- Parameters:
pageRetrieverProvider- A provider that returnsPageRetriever.pageSize- The preferred page size.continuationPredicate- A predicate which determines if paging should continue.- Throws:
NullPointerException- IfpageRetrieverProvideris null.IllegalArgumentException- IfpageSizeis not null and is less than or equal to zero.
-
-
Method Details
-
getPageSize
Get the page size configured thisContinuablePagedFluxCore.- Returns:
- the page size configured,
nullif unspecified.
-
byPage
Description copied from class:ContinuablePagedFluxGets aFluxofContinuablePagestarting at the first page.- Specified by:
byPagein classContinuablePagedFlux<C,T, P extends ContinuablePage<C, T>> - Returns:
- A
FluxofContinuablePage.
-
byPage
Description copied from class:ContinuablePagedFluxGets aFluxofContinuablePagebeginning at the page identified by the given continuation token.- Specified by:
byPagein classContinuablePagedFlux<C,T, P extends ContinuablePage<C, T>> - Parameters:
continuationToken- A continuation token identifying the page to select.- Returns:
- A
FluxofContinuablePage.
-
byPage
Description copied from class:ContinuablePagedFluxGets aFluxofContinuablePagestarting at the first page requesting each page to contain a number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPagein classContinuablePagedFlux<C,T, P extends ContinuablePage<C, T>> - Parameters:
preferredPageSize- The preferred page size.- Returns:
- A
FluxofContinuablePage.
-
byPage
Description copied from class:ContinuablePagedFluxGets aFluxofContinuablePagebeginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPagein classContinuablePagedFlux<C,T, P extends ContinuablePage<C, T>> - Parameters:
continuationToken- A continuation token identifying the page to select.preferredPageSize- The preferred page size.- Returns:
- A
FluxofContinuablePage.
-
subscribe
Subscribe to consume all items of typeTin the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with aFluxof items.- Specified by:
subscribein interfaceCorePublisher<C>- Specified by:
subscribein classFlux<T>- Parameters:
coreSubscriber- The subscriber for thisContinuablePagedFluxCore
-