Introduction
I recently needed to create a health endpoint for our Java WebFlux application that would retrieve results from 5 different service endpoints. Three of them were Mono<>
and two were Flux<>
. Each one of them had a different generic type. Through this effort, I learned a little more about combining disparate results in Spring WebFlux.
The first part of the challenge is figuring out how to best combine something like Mono
with something like Flux
. All of the examples I found would execute something like:
Iterable filteredCityList = cityData.getAll(filter).toIterable();
or
State currentState = List stateData.getStateById(abbreviation).block();
The key issue with these types of examples being that they are using blocking calls which isn’t permitted and won’t compile when using Spring WebFlux. Eventually, I gave up on finding a working example that would suit my purpose. However, the examples did give me some clues on methods that I might use to accomplish the task. To that end, I looked at merge, zip/zipWith, and concat/concatWith as methods in the WebFlux framework that might suit my purpose.
Options
Before we dive into each of them, I want to clarify the unmodified goal:
Goal: Execute requests across multiple services in parallel and capture duration at the completion of each of them.
The merge and zip methods had the same behaviour in that the results would interleave. However, I need to complete the full request and provide a duration for that completion. Interleaving results that get sent to the subscriber make it much more difficult to tease apart the results and figure out when each one completes. Additionally, due to the interleaving, the results from the publishers being merged would affect the measurement for all the publishers being measured. The zip methods had a similar issue with the added complexity of returning the results in tuples that must be handled in such a way to still arrive at the goal of calculating the duration for each of the calls distinct from the concurrent calls to the other publishers.
My imperfect solution was to use concatWith()
. This had the benefit of allowing me to treat each request as a single complete execution. The primary flaw being that the subscription to each of the chained publishers would only happen sequentially. So, the previous examples with concatWith would result in something similar to:
Flux>City> citiesFlux = cityData.getAll(filter);
Mono stateMono = stateData.getById(abbreviation);
Flux = stateMono.concatWith(citiesFlux);
Solution
So, this gets me part of the way there in that I may now successfully create a chain of calls, but I need to do two more things:
- I need to measure and record the execution time for each of the discrete
Mono<>
orFlux<>
calls - I need to collect all of the discrete measurements into a single return and provide an overall status
In order to get time measurements for the completion, I need a subscriber for each of the calls. In this case, I use map(). In particular, I use map() to produce MeasurementResult type. For the Mono<> calls this is pretty straightforward as it looks something like:
Mono stateByIdResult = stateMono.getById(abbreviation)
.map(state -> {
return buildResult ("getStateById", getElapsedTime(), threshold);
});
The buildResult()
method returns a MeasurementResult type. The call to getElapsedTime()
is a way for me to externalize time measurement in a way that would meet scoping requirements due to the compiler requiring that the variable I use to track and calculate duration be final.
So, great, now I can easily chain all of the Mono<>
return types together and return a custom type. What about the Flux<>
? If I do the same for Flux, I’ll get a bunch of MeasurementResults, because I get one execution and one result value for each of the data items that flow out of the Flux publisher. This is where one is tempted to figure out how to make toIterable()
or block()
to work, but they simply won’t as the blocking calls aren’t permitted. My solution was to covert all of the types to Mono<>. For the Flux<> methods this meant using collectList()
. Using collectList()
helped me in two ways:
- it would convert the result to a Mono<> making it easier to chain
- it would allow me to process the stream output at the end which is what I need in order to prevent multiple results and provide a duration
To that end, my Flux<>
calls end up looking like:
Mono citiesResult = cityData.getAll(filter)
.collectList() // <-- creates the Mono
.map(cityData -> {
return buildResult ("getAllCities", getElapsedTime(), threshold);
});
The last part was to chain them together and produce the final result in code that looks similar to:
Mono<List> buildChain() {
//Mono<> API call
Mono stateByIdResult = stateMono.getById(abbreviation)
.map(state -> {
return buildResult ("getStateById", getElapsedTime(), threshold);
});
//Flux<> API call
Mono citiesResult = cityData.getAll(filter)
.collectList() // <-- creates the Mono
.map(cityData -> {
return buildResult ("getAllCities", getElapsedTime(), threshold);
});
Return stateByIdResult.concatWith(citiesResult).collectList();
}
Using collectList()
allows me to return the discrete MeasurementResult
objects from each call in a single List
. In turn, that helps me at the end, because now I need to process those results and put them into an overall results object so that the final bit of code that returns to the caller looks something like:
//return of Mono<ResponseEntity>
return buildChain()
.map(discreteResultsList -> {
FinalResult finalResult = new FinalResult();
finalResult.individualResults = discreteResultsList;
finalResult.description = "[some text]";
finalResult.version = buildConfig.getVersion();
return finalResult;
}).map(result -> ResponseEntity.ok().body(result));
The pattern described above may be used for any arbitrary chain by calling concatWith()
and adding the next publisher. The key being that they must all converge on a single output type. Additionally, if you need the actual data, you could transform to a common type or interface for the types in play or do something like add each result to a List
and return it.
Caveats
There are a few things to note with this approach. The first and foremost is that concatWith()
subscribes to each of the publishers sequentially. Thus, if you’re looking for parallel execution, you’ll need to look at a different method such as zip, merge, or create your own subscriber with subscribe()
using something like the parallel scheduler.
Secondly, as noted previously, I had to track duration. To do this, I need a way to record and update a start time variable. Since that information is needed in the lambda used to map the data, it creates a closure. Java requires that the variable used in the closure from the outer scope must be final. To that end, I just declared a List
that I used:
final List timeList = new ArrayList();
For the completion of each call, I would use the last element of that list to calculate duration and add a new value to the list for the next subscriber to use. This would introduce some extra time into the duration calculation, but nothing significant enough that mattered for my use case.