Class FluxUtil
Flux.-
Method Summary
Modifier and TypeMethodDescriptionstatic Flux<ByteBuffer> addProgressReporting(Flux<ByteBuffer> flux, ProgressReporter progressReporter) Adds progress reporting to the providedFluxofByteBuffer.static byte[]byteBufferToArray(ByteBuffer byteBuffer) Gets the content of the provided ByteBuffer as a byte array.static Mono<byte[]> collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers) Collects ByteBuffers returned in a network response into a byte array.static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream) Collects ByteBuffers emitted by a Flux into a byte array.static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint) Collects ByteBuffers emitted by a Flux into a byte array.static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, RetryOptions retryOptions, long position) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.static <T> Flux<T> fluxContext(Function<Context, Flux<T>> serviceCall) This method converts the incomingdeferContextualfromReactor ContexttoAzure Contextand calls the given lambda function with this context and returns a collection of typeTstatic <T> Flux<T> fluxError(ClientLogger logger, RuntimeException ex) Propagates aRuntimeExceptionthrough the error channel ofFlux.static booleanisFluxByteBuffer(Type entityType) Checks if a type is Flux<ByteBuffer>.static <T> Mono<T> monoError(ClientLogger logger, RuntimeException ex) Propagates aRuntimeExceptionthrough the error channel ofMono.static <T> Mono<T> monoError(LoggingEventBuilder logBuilder, RuntimeException ex) Propagates aRuntimeExceptionthrough the error channel ofMono.static <T> PagedFlux<T> pagedFluxError(ClientLogger logger, RuntimeException ex) Propagates aRuntimeExceptionthrough the error channel ofPagedFlux.static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel) Creates aFluxfrom anAsynchronousFileChannelwhich reads the entire file.static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) Creates aFluxfrom anAsynchronousFileChannelwhich reads part of a file into chunks of the given size.static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, long offset, long length) Creates aFluxfrom anAsynchronousFileChannelwhich reads part of a file.static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream) static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, int chunkSize) static <T> Mono<T> Converts the incoming content to Mono.static ContexttoReactorContext(Context context) Converts an Azure context to Reactor context.static <T> Mono<T> withContext(Function<Context, Mono<T>> serviceCall) This method converts the incomingdeferContextualfromReactor ContexttoAzure Contextand calls the given lambda function with this context and returns a single entity of typeTstatic <T> Mono<T> This method converts the incomingdeferContextualfromReactor ContexttoAzure Context, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of typeTwriteFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile) writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position) Writes theByteBuffersemitted by aFluxofByteBufferto anAsynchronousFileChannelstarting at the givenpositionin the file.writeToAsynchronousByteChannel(Flux<ByteBuffer> content, AsynchronousByteChannel channel) writeToOutputStream(Flux<ByteBuffer> content, OutputStream stream) writeToWritableByteChannel(Flux<ByteBuffer> content, WritableByteChannel channel)
-
Method Details
-
isFluxByteBuffer
Checks if a type is Flux<ByteBuffer>.- Parameters:
entityType- the type to check- Returns:
- whether the type represents a Flux that emits ByteBuffer
-
addProgressReporting
public static Flux<ByteBuffer> addProgressReporting(Flux<ByteBuffer> flux, ProgressReporter progressReporter) Adds progress reporting to the providedFluxofByteBuffer.Each
ByteBufferthat's emitted from theFluxwill reportBuffer.remaining().When
Fluxis resubscribed the progress is reset. If the flux is not replayable, resubscribing can result in empty or partial data then progress reporting might not be accurate.If
ProgressReporteris not provided, i.e. isnull, then this method returns unmodifiedFlux.- Parameters:
flux- AFluxto report progress on.progressReporter- OptionalProgressReporter.- Returns:
- A
Fluxthat reports progress, or originalFluxifProgressReporteris not provided.
-
collectBytesInByteBufferStream
Collects ByteBuffers emitted by a Flux into a byte array.- Parameters:
stream- A stream which emits ByteBuffer instances.- Returns:
- A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
- Throws:
IllegalStateException- If the combined size of the emitted ByteBuffers is greater thanInteger.MAX_VALUE.
-
collectBytesInByteBufferStream
Collects ByteBuffers emitted by a Flux into a byte array.Unlike
collectBytesInByteBufferStream(Flux), this method accepts a second parametersizeHint. This size hint allows for optimizations when creating the initial buffer to reduce the number of times it needs to be resized while concatenating emitted ByteBuffers.- Parameters:
stream- A stream which emits ByteBuffer instances.sizeHint- A hint about the expected stream size.- Returns:
- A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
- Throws:
IllegalArgumentException- IfsizeHintis equal to or less than0.IllegalStateException- If the combined size of the emitted ByteBuffers is greater thanInteger.MAX_VALUE.
-
collectBytesFromNetworkResponse
public static Mono<byte[]> collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers) Collects ByteBuffers returned in a network response into a byte array.The
headersare inspected for containing anContent-Lengthwhich determines if a size hinted collection,collectBytesInByteBufferStream(Flux, int), or default collection,collectBytesInByteBufferStream(Flux), will be used.- Parameters:
stream- A network response ByteBuffer stream.headers- The HTTP headers of the response.- Returns:
- A Mono which emits the collected network response ByteBuffers.
- Throws:
NullPointerException- Ifheadersis null.IllegalStateException- If the size of the network response is greater thanInteger.MAX_VALUE.
-
byteBufferToArray
Gets the content of the provided ByteBuffer as a byte array. This method will create a new byte array even if the ByteBuffer can have optionally backing array.- Parameters:
byteBuffer- the byte buffer- Returns:
- the byte array
-
createRetriableDownloadFlux
public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.- Parameters:
downloadSupplier- Supplier of the initial download.onDownloadErrorResume-BiFunctionofThrowableandLongwhich is used to resume downloading when an error occurs.maxRetries- The maximum number of times a download can be resumed when an error occurs.- Returns:
- A
Fluxthat downloads reliably.
-
createRetriableDownloadFlux
public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.- Parameters:
downloadSupplier- Supplier of the initial download.onDownloadErrorResume-BiFunctionofThrowableandLongwhich is used to resume downloading when an error occurs.maxRetries- The maximum number of times a download can be resumed when an error occurs.position- The initial offset for the download.- Returns:
- A
Fluxthat downloads reliably.
-
createRetriableDownloadFlux
public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable, Long, Flux<ByteBuffer>> onDownloadErrorResume, RetryOptions retryOptions, long position) Creates aFluxthat is capable of resuming a download by applying retry logic when an error occurs.- Parameters:
downloadSupplier- Supplier of the initial download.onDownloadErrorResume-BiFunctionofThrowableandLongwhich is used to resume downloading when an error occurs.retryOptions- The options for retrying.position- The initial offset for the download.- Returns:
- A
Fluxthat downloads reliably.
-
toFluxByteBuffer
Converts anInputStreaminto aFluxofByteBufferusing a chunk size of 4096.Given that
InputStreamis not guaranteed to be replayable the returnedFluxshould be considered non-replayable as well.If the passed
InputStreamisnullFlux.empty()will be returned.- Parameters:
inputStream- TheInputStreamto convert into aFlux.- Returns:
- A
FluxofByteBuffersthat contains the contents of the stream.
-
toFluxByteBuffer
Converts anInputStreaminto aFluxofByteBuffer.Given that
InputStreamis not guaranteed to be replayable the returnedFluxshould be considered non-replayable as well.If the passed
InputStreamisnullFlux.empty()will be returned.- Parameters:
inputStream- TheInputStreamto convert into aFlux.chunkSize- The requested size for eachByteBuffer.- Returns:
- A
FluxofByteBuffersthat contains the contents of the stream. - Throws:
IllegalArgumentException- IfchunkSizeis less than or equal to0.
-
withContext
This method converts the incomingdeferContextualfromReactor ContexttoAzure Contextand calls the given lambda function with this context and returns a single entity of typeTIf the reactor context is empty,
Context.NONEwill be used to call the lambda functionCode samples
String prefix = "Hello, "; Mono<String> response = FluxUtil .withContext(context -> serviceCallReturnsSingle(prefix, context));- Type Parameters:
T- The type of response returned from the service call- Parameters:
serviceCall- The lambda function that makes the service call into which azure context will be passed- Returns:
- The response from service call
-
withContext
public static <T> Mono<T> withContext(Function<Context, Mono<T>> serviceCall, Map<String, String> contextAttributes) This method converts the incomingdeferContextualfromReactor ContexttoAzure Context, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of typeTIf the reactor context is empty,
Context.NONEwill be used to call the lambda function- Type Parameters:
T- The type of response returned from the service call- Parameters:
serviceCall- serviceCall The lambda function that makes the service call into which azure context will be passedcontextAttributes- The map of attributes sent by the calling method to be set onContext.- Returns:
- The response from service call
-
toMono
Converts the incoming content to Mono. -
monoError
Propagates aRuntimeExceptionthrough the error channel ofMono.- Type Parameters:
T- The return type.- Parameters:
logger- TheClientLoggerto log the exception.ex- TheRuntimeException.- Returns:
- A
Monothat terminates with error wrapping theRuntimeException.
-
monoError
Propagates aRuntimeExceptionthrough the error channel ofMono.- Type Parameters:
T- The return type.- Parameters:
logBuilder- TheLoggingEventBuilderwith context to log the exception.ex- TheRuntimeException.- Returns:
- A
Monothat terminates with error wrapping theRuntimeException.
-
fluxError
Propagates aRuntimeExceptionthrough the error channel ofFlux.- Type Parameters:
T- The return type.- Parameters:
logger- TheClientLoggerto log the exception.ex- TheRuntimeException.- Returns:
- A
Fluxthat terminates with error wrapping theRuntimeException.
-
pagedFluxError
Propagates aRuntimeExceptionthrough the error channel ofPagedFlux.- Type Parameters:
T- The return type.- Parameters:
logger- TheClientLoggerto log the exception.ex- TheRuntimeException.- Returns:
- A
PagedFluxthat terminates with error wrapping theRuntimeException.
-
fluxContext
This method converts the incomingdeferContextualfromReactor ContexttoAzure Contextand calls the given lambda function with this context and returns a collection of typeTIf the reactor context is empty,
Context.NONEwill be used to call the lambda functionCode samples
String prefix = "Hello, "; Flux<String> response = FluxUtil .fluxContext(context -> serviceCallReturnsCollection(prefix, context));- Type Parameters:
T- The type of response returned from the service call- Parameters:
serviceCall- The lambda function that makes the service call into which the context will be passed- Returns:
- The response from service call
-
toReactorContext
Converts an Azure context to Reactor context. If the Azure context isnullor empty,Context.empty()will be returned.- Parameters:
context- The Azure context.- Returns:
- The Reactor context.
-
writeToOutputStream
Writes theByteBuffersemitted by aFluxofByteBufferto anOutputStream.The
streamis not closed by this call, closing of thestreamis managed by the caller.The response
Monowill emit an error ifcontentorstreamare null. Additionally, an error will be emitted if an exception occurs while writing thecontentto thestream.- Parameters:
content- TheFluxofByteBuffercontent.stream- TheOutputStreambeing written into.- Returns:
- A
Monowhich emits a completion status once theFluxhas been written to theOutputStream, or an error status if writing fails.
-
writeFile
Writes theByteBuffersemitted by aFluxofByteBufferto anAsynchronousFileChannel.The
outFileis not closed by this call, closing of theoutFileis managed by the caller.The response
Monowill emit an error ifcontentoroutFileare null. Additionally, an error will be emitted if theoutFilewasn't opened with the proper open options, such asStandardOpenOption.WRITE.- Parameters:
content- TheFluxofByteBuffercontent.outFile- TheAsynchronousFileChannel.- Returns:
- A
Monowhich emits a completion status once theFluxhas been written to theAsynchronousFileChannel. - Throws:
NullPointerException- Whencontentis null.NullPointerException- WhenoutFileis null.
-
writeFile
public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position) Writes theByteBuffersemitted by aFluxofByteBufferto anAsynchronousFileChannelstarting at the givenpositionin the file.The
outFileis not closed by this call, closing of theoutFileis managed by the caller.The response
Monowill emit an error ifcontentoroutFileare null orpositionis less than 0. Additionally, an error will be emitted if theoutFilewasn't opened with the proper open options, such asStandardOpenOption.WRITE.- Parameters:
content- TheFluxofByteBuffercontent.outFile- TheAsynchronousFileChannel.position- The position in the file to begin writing thecontent.- Returns:
- A
Monowhich emits a completion status once theFluxhas been written to theAsynchronousFileChannel. - Throws:
NullPointerException- Whencontentis null.NullPointerException- WhenoutFileis null.IllegalArgumentException- Whenpositionis negative.
-
writeToAsynchronousByteChannel
public static Mono<Void> writeToAsynchronousByteChannel(Flux<ByteBuffer> content, AsynchronousByteChannel channel) Writes theByteBuffersemitted by aFluxofByteBufferto anAsynchronousByteChannel.The
channelis not closed by this call, closing of thechannelis managed by the caller.The response
Monowill emit an error ifcontentorchannelare null.- Parameters:
content- TheFluxofByteBuffercontent.channel- TheAsynchronousByteChannel.- Returns:
- A
Monowhich emits a completion status once theFluxhas been written to theAsynchronousByteChannel. - Throws:
NullPointerException- Whencontentis null.NullPointerException- Whenchannelis null.
-
writeToWritableByteChannel
public static Mono<Void> writeToWritableByteChannel(Flux<ByteBuffer> content, WritableByteChannel channel) Writes theByteBuffersemitted by aFluxofByteBufferto anWritableByteChannel.The
channelis not closed by this call, closing of thechannelis managed by the caller.The response
Monowill emit an error ifcontentorchannelare null.- Parameters:
content- TheFluxofByteBuffercontent.channel- TheWritableByteChannel.- Returns:
- A
Monowhich emits a completion status once theFluxhas been written to theWritableByteChannel. - Throws:
NullPointerException- Whencontentis null.NullPointerException- Whenchannelis null.
-
readFile
public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) Creates aFluxfrom anAsynchronousFileChannelwhich reads part of a file into chunks of the given size.- Parameters:
fileChannel- The file channel.chunkSize- the size of file chunks to read.offset- The offset in the file to begin reading.length- The number of bytes to read from the file.- Returns:
- the Flux.
-
readFile
public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, long offset, long length) Creates aFluxfrom anAsynchronousFileChannelwhich reads part of a file.- Parameters:
fileChannel- The file channel.offset- The offset in the file to begin reading.length- The number of bytes to read from the file.- Returns:
- the Flux.
-
readFile
Creates aFluxfrom anAsynchronousFileChannelwhich reads the entire file.- Parameters:
fileChannel- The file channel.- Returns:
- The AsyncInputStream.
-