Spring WebFlux contains a client for making HTTP requests. WebClient
has a functional, fluid API based
on Reactor that allows you to declaratively compose asynchronous logic without the need to deal with threads or
concurrency. It is completely non-blocking, supports streaming, and is based on the same codecs used to encode and
decode the content of requests and responses on the server side.
WebClient
requires an HTTP
client library to run requests. There is built-in support for:
Other clients can be connected via
ClientHttpConnector
.
Configuration
The easiest way to create WebClient
is to use one of the static factory
methods:
WebClient.create()
WebClient.create(String baseUrl)
You can also use
WebClient.builder()
with additional parameters:
uriBuilderFactory
: ConfiguredUriBuilderFactory
to be used as the base URL.defaultUriVariables
: Default values to use when expanding URI templates.defaultHeader
: Headers for each request.defaultCookie
: Cookies for each request.defaultRequest
:Consumer
to configure each request.filter
: Client filter for each request.exchangeStrategies
: Settings for reading/writing HTTP messages.clientConnector
: Client HTTP library settings.
For example
WebClient client = WebClient.builder()
.codecs(configurer -> ... )
.build();
val webClient = WebClient.builder()
.codecs { configurer -> ... }
.build()
Once created, the WebClient
is immutable. However, you can clone it and create a modified copy as
follows:
WebClient client1 = WebClient.builder()
.filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate()
.filter(filterC).filter(filterD).build();
// client1 имеет filterA, filterB
// client2 имеет filterA, filterB, filterC, filterD
val client1 = WebClient.builder()
.filter(filterA).filter(filterB).build()
val client2 = client1.mutate()
.filter(filterC).filter(filterD).build()
// client1 имеет filterA, filterB
// client2 имеет filterA, filterB, filterC, filterD
MaxInMemorySize
Codecs have limitations on buffering data in memory to avoid application memory problems. By default they are set to 256 KB. If this is not enough, the following error will be received:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer
To change the default limit for codecs, do the following:
WebClient webClient = WebClient.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.build();
val webClient = WebClient.builder()
.codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024) }
.build()
Reactor Netty
To configure Reactor Netty parameters, provide a pre-configured HttpClient
:
HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
val httpClient = HttpClient.create().secure { ... }
val webClient = WebClient.builder()
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
Resources
By default, HttpClient
participates in the use of global Reactor Netty
resources stored in reactor.netty.http.HttpResources
, including event loop threads and pool
connections. This mode is recommended because for the purpose of concurrency in event wait loops, it is preferable
to use fixed, shared resources. In this mode, global resources remain active until the process terminates.
If
the server is synchronized with the process, there is usually no need to terminate the process explicitly. However,
if the server can be started or stopped in-process (as is the case with a Spring MVC application deployed as a WAR
file), then you can declare a Spring managed bean of type ReactorResourceFactory
with the parameter
globalResources=true
(default) so that the use of Reactor Netty global resources is guaranteed to end
when the ApplicationContext
is closed from Spring, as shown in the following example:
@Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory();
}
@Bean
fun reactorResourceFactory() = ReactorResourceFactory()
You can also avoid using Reactor Netty global resources. However, in this mode, you are responsible for ensuring that all instances of the Reactor Netty client and server share resources, as shown in the following example:
@Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory();
factory.setUseGlobalResources(false);
return factory;
}
@Bean
public WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> {
// Further configuration...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory(), mapper);
return WebClient.builder().clientConnector(connector).build();
}
- We create resources independent of global.
- Use the
ReactorClientHttpConnector
constructor with the resource factory. - Connect the connector to
WebClient.Builder
.
@Bean
fun resourceFactory() = ReactorResourceFactory().apply {
isUseGlobalResources = false
}
@Bean
fun webClient(): WebClient {
val mapper: (HttpClient) -> HttpClient = {
// Further configuration...
}
val connector = ReactorClientHttpConnector(resourceFactory(), mapper)
return WebClient.builder().clientConnector(connector).build()
}
- We create resources independent of global.
- Use the
ReactorClientHttpConnector
constructor with the resource factory. - Connect the connector to
WebClient.Builder
.
Timeout
Configuring connection timeout values:
import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
import io.netty.channel.ChannelOption
val httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
val webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
Setting the read and write timeout value:
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
// Create a WebClient...
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutHandler
val httpClient = HttpClient.create()
.doOnConnected { conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10))
}// Create a WebClient...
Setting the response timeout for a specific request:
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create a WebClient...
val httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create a WebClient...
The following example shows how to configure the HttpClient
from Jetty:
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest { httpRequest: ClientHttpRequest ->
val reactorRequest = httpRequest.getNativeRequest<HttpClientRequest>()
reactorRequest.responseTimeout(Duration.ofSeconds(2))
}
.retrieve()
.bodyToMono(String::class.java)
Jetty
The following example shows how to configure HttpClient
settings from Jetty:
HttpClient httpClient = new HttpClient();
httpClient.setCookieStore(...);
WebClient webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
val httpClient = HttpClient()
httpClient.cookieStore = ...
val webClient = WebClient.builder()
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
By default, HttpClient
creates its own resources (Executor
,
ByteBufferPool
, Scheduler
) that remain active until the process completes execution or
calling the stop()
function.
It is possible to share resources among multiple instances of the
Jetty client (and server) and ensure that resource use ends when the ApplicationContext
is closed from
Spring, by declaring a Spring managed bean of type JettyResourceFactory
, as shown in the following
example:
@Bean
public JettyResourceFactory resourceFactory() {
return new JettyResourceFactory();
}
@Bean
public WebClient webClient() {
HttpClient httpClient = new HttpClient();
// Further configuration...
ClientHttpConnector connector =
new JettyClientHttpConnector(httpClient, resourceFactory());
return WebClient.builder().clientConnector(connector).build();
}
- Use the
constructor JettyClientHttpConnector
with a resource factory. - Connect the connector to
WebClient.Builder
.
@Bean
fun resourceFactory() = JettyResourceFactory()
@Bean
fun webClient(): WebClient {
val httpClient = HttpClient()
// Further configuration...
val connector = JettyClientHttpConnector(httpClient, resourceFactory())
return WebClient.builder().clientConnector(connector).build()
}
- Use the
JettyClientHttpConnector
constructor with a resource factory. - Connect the connector to
WebClient.Builder
.
HttpComponents
The following example shows how to configure HttpClient
parameters from
Apache HttpComponents:
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
clientBuilder.setDefaultRequestConfig(...);
CloseableHttpAsyncClient client = clientBuilder.build();
ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client);
WebClient webClient = WebClient.builder().clientConnector(connector).build();
val client = HttpAsyncClients.custom().apply {
setDefaultRequestConfig(...)
}.build()
val connector = HttpComponentsClientHttpConnector(client)
val webClient = WebClient.builder().clientConnector(connector).build()
retrieve()
The retrieve()
method can be used to declare how to retrieve the response. For example:
WebClient client = WebClient.create("https://example.org");
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Person.class);
val client = WebClient.create("https://example.org")
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity<Person>().awaitSingle()
Or we get only the body:
WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Person.class);
val client = WebClient.create("https://example.org")
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.awaitBody<Person>()
Getting a stream of decoded objects:
Flux<Quote> result = client.get()
.uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Quote.class);
val result = client.get()
.uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlow<Quote>()
By default, 4xx or 5xx responses result in a WebClientResponseException
, including subclasses for
certain HTTP status codes. To configure how error messages are handled, use onStatus
handlers as
follows:
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> ...)
.onStatus(HttpStatus::is5xxServerError, response -> ...)
.bodyToMono(Person.class);
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::is4xxClientError) { ... }
.onStatus(HttpStatus::is5xxServerError) { ... }
.awaitBody<Person>()
Exchange
Methods exchangeToMono()
and exchangeToFlux()
(or awaitExchange
{ }
and exchangeToFlow {
}
in Kotlin) useful for more complex cases that require more control, such as decoding a response differently
depending on the status of the response:
Mono<Person> entityMono = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(Person.class);
}
else {
// Address the error
return response.createException().flatMap(Mono::error);
}
});
val entity = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange {
if (response.statusCode() == HttpStatus.OK) {
return response.awaitBody<Person>()
}
else {
throw response.createExceptionAndAwait()
}
}
When using the above code, after completion of the returned Mono
or Flux
, the
response body is checked and, if not used, freed to prevent memory and connection leaks. Therefore, the response
cannot be decoded further downstream. The provided function should determine how to decode the response if
necessary.
Request Body
The request body can be encoded from any asynchronous type
handled by ReactiveAdapterRegistry
, such as Mono
or Deferred
from Kotlin
coroutines, as shown in the following example:
Mono<Person> personMono = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(personMono, Person.class)
.retrieve()
.bodyToMono(Void.class);
val personDeferred: Deferred<Person> = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body<Person>(personDeferred)
.retrieve()
.awaitBody<Unit>()
Can also be coded stream of objects, as shown in the following example:
Flux<Person> personFlux = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(personFlux, Person.class)
.retrieve()
.bodyToMono(Void.class);
val people: Flow<Person> = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.body(people)
.retrieve()
.awaitBody<Unit>()
In addition, if there is an actual value, then you can use the bodyValue
shortcut method,
as shown in the following example:
Person person = ... ;
Mono<Void> result = client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.bodyToMono(Void.class);
val person: Person = ...
client.post()
.uri("/persons/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(person)
.retrieve()
.awaitBody<Unit>()
Form Data
To submit form data, you can specify MultiValueMap<String, String>
as
the body. Note that the content is automatically set to application/x-www-form-urlencoded
using FormHttpMessageWriter
.
The following example shows how to use MultiValueMap<String, String>
:
MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.bodyToMono(Void.class);
val formData: MultiValueMap<String, String> = ...
client.post()
.uri("/path", id)
.bodyValue(formData)
.retrieve()
.awaitBody<Unit>()
You can also add form data inline using BodyInserters
as shown in the following example:
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.bodyToMono(Void.class);
import org.springframework.web.reactive.function.BodyInserters.*
client.post()
.uri("/path", id)
.body(fromFormData("k1", "v1").with("k2", "v2"))
.retrieve()
.awaitBody<Unit>()
Multicomponent data
To send
multicomponent data, you must specify a MultiValueMap<String, ?>
whose values are either Object
instances representing the content of the component or HttpEntity
instances representing the content
and component headers. MultipartBodyBuilder
provides a convenient API for preparing a multipart
request. The following example shows how to create a MultiValueMap<String, ?>
:
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("fieldPart", "fieldValue");
builder.part("filePart1", new FileSystemResource("...logo.png"));
builder.part("jsonPart", new Person("Jason"));
builder.part("myPart", part); // Part from a server request
MultiValueMap<String, HttpEntity<?>> parts = builder.build();
val builder = MultipartBodyBuilder().apply {
part("fieldPart", "fieldValue")
part("filePart1", new FileSystemResource("...logo.png"))
part("jsonPart", new Person("Jason"))
part("myPart", part) // Part from a server request
}
val parts = builder.build()
In most cases it is not necessary to set Content-Type
for each component. The content type is
determined automatically based on the HttpMessageWriter
selected for serialization, or, in the case of
Resource
, based on the file extension. If necessary, you can explicitly set MediaType
for
each component through one of the build tool's part
overloads.
After preparing MultiValueMap
The easiest way to pass it to the WebClient
is through the body
method, as shown in the
following example:
MultipartBodyBuilder builder = ...;
Mono<Void> result = client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.bodyToMono(Void.class);
val builder: MultipartBodyBuilder = ...
client.post()
.uri("/path", id)
.body(builder.build())
.retrieve()
.awaitBody<Unit>()
If MultiValueMap
contains at least a single non-String
value that can also represent
regular form data (i.e. application/x-www-form-urlencoded
), no need to set Content-Type
in multipart/form-data
. This always happens when using MultipartBodyBuilder
, which
provides a HttpEntity
wrapper function.
As an alternative to MultipartBodyBuilder
,
you can also provide inline-style multi-part content using inline BodyInserters
, as shown in the
following example:
import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.bodyToMono(Void.class);
import org.springframework.web.reactive.function.BodyInserters.*
client.post()
.uri("/path", id)
.body(fromMultipartData("fieldPart", "value").with("filePart", resource))
.retrieve()
.awaitBody<Unit>()
Filters
You can register a client filter ExchangeFilterFunction
) through WebClient.Builder
to
intercept and modify requests, as shown in the following example :
WebClient client = WebClient.builder()
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.build();
val client = WebClient.builder()
.filter { request, next ->
val filtered = ClientRequest.from(request)
.header("foo", "bar")
.build()
next.exchange(filtered)
}
.build()
This can be used to end-to-end functionality such as authentication. The following example uses a filter for basic authentication through a static factory method:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build();
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication
val client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build()
Filters can be added or removed by modifying an existing WebClient
instance, which creates a new
WebClient
instance without affecting the original one. For example:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
val client = webClient.mutate()
.filters { it.add(0, basicAuthentication("user", "password")) }
.build()
WebClient
is a thin interface above the filter chain, accompanied by an
ExchangeFunction
. It provides a workflow for making requests, encoding to and from higher-level
objects, and helps ensure that the response content is always consumed. If filters process the response in any way,
care must be taken to ensure that its contents are always consumed or otherwise propagated downstream to the WebClient
,
which will provide the same. Below is a filter that handles the UNAUTHORIZED
status code but ensures
that any response content, whether expected or not, is returned:
public ExchangeFilterFunction renewTokenFilter() {
return (request, next) -> next.exchange(request).flatMap(response -> {
if (response.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
return response.releaseBody()
.then(renewToken())
.flatMap(token -> {
ClientRequest newRequest = ClientRequest.from(request).build();
return next.exchange(newRequest);
});
} else {
return Mono.just(response);
}
});
}
fun renewTokenFilter(): ExchangeFilterFunction? {
return ExchangeFilterFunction { request: ClientRequest?, next: ExchangeFunction ->
next.exchange(request!!).flatMap { response: ClientResponse ->
if (response.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
return@flatMap response.releaseBody()
.then(renewToken())
.flatMap { token: String? ->
val newRequest = ClientRequest.from(request).build()
next.exchange(newRequest)
}
} else {
return@flatMap Mono.just(response)
}
}
}
}
Attributes
Attributes can be added to the request. This is convenient if you need to pass information along a chain of filters and influence the logic of the filters within a given request. For example:
WebClient client = WebClient.builder()
.filter((request, next) -> {
Optional<Object> usr = request.attribute("myAttribute");
// ...
})
.build();
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.bodyToMono(Void.class);
}
val client = WebClient.builder()
.filter { request, _ ->
val usr = request.attributes()["myAttribute"];
// ...
}
.build()
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.awaitBody<Unit>()
Note that you can globally configure a defaultRequest
callback at the WebClient.Builder
level, which allows attributes to be inserted into all requests, which can be used, for example, in an application
on Spring MVC to populate request attributes based on ThreadLocal
data.
Context
Attributes provide convenient transmission of information to the filter chain, but only affect the current request.
If you need to pass information that extends to additional requests that are nested, for example, via
flatMap
, or executed after, for example, via concatMap
, then you need to use
Context
from Reactor.
The Context
from the Reactor project must be filled in at the end of the reactive chain so that it
applies to all operations. For example:
WebClient client = WebClient.builder()
.filter((request, next) ->
Mono.deferContextual(contextView -> {
String value = contextView.get("foo");
// ...
}))
.build();
client.get().uri("https://example.org/")
.retrieve()
.bodyToMono(String.class)
.flatMap(body -> {
// execute a nested query (the context is propagated automatically)...
})
.contextWrite(context -> context.put("foo", ...));
Synchronous use
WebClient
can be used in a synchronous style, blocking at the end to obtain the result:
Person person = client.get().uri("/person/{id}", i).retrieve()
.bodyToMono(Person.class)
.block();
List<Person> persons = client.get().uri("/persons").retrieve()
.bodyToFlux(Person.class)
.collectList()
.block();
val person = runBlocking {
client.get().uri("/person/{id}", i).retrieve( )
.awaitBody<Person>()
}
val persons = runBlocking {
client.get().uri("/persons").retrieve()
.bodyToFlow<Person>()
.toList()
}
However, if you need to make several calls, it is more efficient not to block each response individually, but to wait for the combined result:
Mono<Person> personMono = client.get().uri("/person/{id}", personId)
.retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
Map<String, String> map = new LinkedHashMap<>();
map.put("person", person);
map.put("hobbies", hobbies);
return map;
})
.block();
val data = runBlocking {
val personDeferred = async {
client.get().uri("/person/{id}", personId)
.retrieve().awaitBody<Person>()
}
val hobbiesDeferred = async {
client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlow<Hobby>().toList()
}
mapOf("person" to personDeferred.await(), "hobbies" to hobbiesDeferred.await())
}
The above is just one example. There are many other patterns and operators for creating a reactive pipeline that makes many remote calls, potentially multiple nested, interdependent, without blocking until the very end.
When using Flux
or Mono
you won't have to block the Spring MVC or Spring WebFlux
controller at all. It will simply be possible to return the resulting reactive type from the controller method.
The same principle applies to Kotlin and Spring WebFlux coroutines - just use a pause function or return Flow
in the controller method.
Testing
To test code that uses WebClient
, you can use a mock web server object, for example OkHttp MockWebServer. For an example
of its use, see WebClientIntegrationTests
in the Spring Framework test suite or example static
server
in the OkHttp repository.
GO TO FULL VERSION