Reactive Core

Module 5. Spring
Level 13 , Lesson 0
Available
  • There are two levels of support for handling server requests.

    • HttpHandler: Basic contract for handling HTTP requests with non-blocking I/O and callback according to the Reactive specification Streams, as well as adapters for Reactor Netty, Undertow, Tomcat, Jetty and any Servlet 3.1+ container.

    • WebHandler API: A slightly higher level web A general purpose API for processing requests, upon which specific programming models such as annotated controllers and functional endpoints are built.

  • For the client side, there is the base contract ClientHttpConnector for making HTTP requests with non-blocking I/O and feedback according to the Reactive Streams specification, as well as adapters for Reactor Netty, reactive Jetty HttpClient and Apache HttpComponents. The higher-level WebClient used in applications is based on this basic contract.

  • Codecs are provided for the client and server to serialize and deserialize the content of HTTP requests and responses.

HttpHandler

HttpHandler is a simple contract with a single method to handle the request and response. It is intentionally strict, and its main and only purpose is to be a simple abstraction over the various HTTP server APIs.

The following table describes the supported server APIs:

Server name Server API used Support for Reactive Streams

Netty

Netty API

Reactor Netty

Undertow

Undertow API

spring-web: Connection to the Reactive Streams bridge

Tomcat

Non-blocking I/O based on Servlet 3.1; Tomcat API for reading and writing ByteBuffers vs. byte[]

spring-web: Non-blocking Servlet 3.1 I/O to Reactive Streams Bridge

Jetty

Non-blocking I/O based on Servlet 3.1; Jetty API for writing ByteBuffers vs. byte[]

spring-web: Non-blocking Servlet 3.1 I/O to Reactive Streams bridge

Servlet 3.1 container

Non-blocking I/O based on Servlet 3.1

spring-web : Non-blocking Servlet 3.1 I/O to the Reactive Streams bridge

The following table describes the server dependencies (also see supported versions):

Server name Group ID Artifact name

Reactor Netty

io.projectreactor.netty

reactor-netty

Undertow

io.undertow

undertow-core

Tomcat

org.apache.tomcat.embed

tomcat-embed-core

Jetty

org.eclipse.jetty

jetty-server, jetty-servlet

The code snippets below show the use of adapters HttpHandler with each server API:

Reactor Netty

Java

HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create().host(host).port(port).handle(adapter).bind().block();
Kotlin

val handler: HttpHandler = ...
val adapter = ReactorHttpHandlerAdapter (handler)
HttpServer.create().host(host).port(port).handle(adapter).bind().block()

Undertow

Java

HttpHandler handler = ...
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
Kotlin

val handler: HttpHandler = ...
val adapter = UndertowHttpHandlerAdapter(handler)
val server = Undertow.builder ().addHttpListener(port, host).setHandler(adapter).build()
server.start()

Tomcat

Java

HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);
Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
Kotlin

val handler: HttpHandler = ...
val servlet = TomcatHttpHandlerAdapter(handler)
val server = Tomcat()
val base = File(System.getProperty("java.io.tmpdir"))
val rootContext = server.addContext("", base.absolutePath)
Tomcat.addServlet(rootContext, "main", servlet)
rootContext.addServletMappingDecoded("/", "main")
server.host = host
server.setPort(port)
server.start()

Jetty

Java

HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);
Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();
ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();
Kotlin

val handler: HttpHandler = ...
val servlet = JettyHttpHandlerAdapter(handler)
val server = Server()
val contextHandler = ServletContextHandler(server, "")
contextHandler.addServlet(ServletHolder(servlet), "/")
contextHandler.start(); val
connector = ServerConnector(server)
connector.host = host connector.port = port
server.addConnector(connector)
server.start()

Servlet 3.1+ container

To deploy the WAR file to any Servlet 3.1+ container, you can expand to include AbstractReactiveWebInitializer to a WAR file. This class wraps a HttpHandler in a ServletHttpHandlerAdapter and registers it as a Servlet.

WebHandler API

The org.springframework.web.server package is based on the HttpHandler contract and provides a general purpose web API for processing requests through a chain of several WebExceptionHandler, several WebFilter and one component WebHandler. The chain can be put together with the WebHttpHandlerBuilder by simply pointing to the ApplicationContext from Spring, where the beans are defined automatically, and/or by registering the beans in the constructor.

While the purpose of the HttpHandler is simple - to abstract the use of various HTTP servers, the WebHandler API aims to provide a broader set of functions commonly used in web applications, such as:

  • User session with attributes.

  • Request attributes.

  • Allowed Locale or Principal for the request.

  • Access parsed and cached form data.

  • Abstractions for multi-component data.

  • and much more...

Specialized types of beans

The table below lists the components that the WebHttpHandlerBuilder can automatically discover in the ApplicationContext from Spring, or that can be registered directly with it:

Bean name Bean type Counter Description

<any>

WebExceptionHandler

0 ..N

Provides exception handling from the chain of WebFilter instances and the target WebHandler.

<any>

WebFilter

0..N

Applies interception-style logic before and after the rest of the filter chain and the target WebHandler.

webHandler

WebHandler

1

Request handler.

webSessionManager

WebSessionManager

0..1

Dispatcher for WebSession instances opened through the method for ServerWebExchange. DefaultWebSessionManager by default.

serverCodecConfigurer

ServerCodecConfigurer

0..1

Provides access to HttpMessageReader for parsing form data and multipart data, which is then exposed via methods to ServerWebExchange. ServerCodecConfigurer.create() by default.

localeContextResolver

LocaleContextResolver

0..1

Resolver for LocaleContext, exposed through a method for ServerWebExchange. AcceptHeaderLocaleContextResolver by default.

forwardedHeaderTransformer

ForwardedHeaderTransformer

0..1

Designed for processing headers of forwarded types either by extracting them and removing them, or by removing them only. Not used by default.

Form Data

ServerWebExchange provides the following method to access to form data:

Java
Mono<MultiValueMap<String, String>> getFormData();
Kotlin
suspend fun getFormData(): MultiValueMap<String, String>

The DefaultServerWebExchange uses the configured HttpMessageReader to parse form data (application/x-www-form-urlencoded) into a MultiValueMap. By default, FormHttpMessageReader is configured to be used by the ServerCodecConfigurer bean.

Multipart Data

ServerWebExchange provides the following method to access multipart data:

Java
Mono<MultiValueMap<String, Part>> getMultipartData();
Kotlin
suspend fun getMultipartData(): MultiValueMap<String, Part>

DefaultServerWebExchange uses the configured HttpMessageReader<MultiValueMap<String, Part>> to parse the contents of multipart/form-data into MultiValueMap. The default is DefaultPartHttpMessageReader, which does not have any third-party dependencies. As an alternative, you can use SynchronossPartHttpMessageReader, which is based on the Synchronoss NIO Multipart library . Both are configured using the ServerCodecConfigurer bean.

For streaming parsing of multi-part data, you can use Flux<Part> returned from HttpMessageReader<Part>. For example, in an annotated controller, using @RequestPart implies Map-like access to individual components by name and therefore requires full parsing of multi-part data. Conversely, you can use the @RequestBody annotation to decode the content in Flux<Part> without collecting it in MultiValueMap.

Forwarded Headers

If the request goes through proxy servers (such as load balancers), the host, port, and scheme may change. This makes it difficult for the client to create links that point to the correct host, port, and scheme.

RFC 7239 defines a Forwarded HTTP header that proxy servers can use to provide information about the original request. There are other non-standard headers, including X-Forwarded-Host, X-Forwarded-Port, X-Forwarded-Proto, X -Forwarded-Ssl and X-Forwarded-Prefix.

ForwardedHeaderTransformer is a component that changes the host, port and request scheme to based on the forwarded headers and then removes those headers. If you declare it as a bean named forwardedHeaderTransformer, it will be detected and used.

There are some security precautions for forwarded headers, since the application cannot know whether the headers were added by a proxy -server, as expected, or a malicious client. This is why the proxy at the trust boundary must be configured to remove untrusted forward traffic coming from outside. You can also configure ForwardedHeaderTransformer to use removeOnly=true, in which case it will remove but not use headers.

In version 5.1, ForwardedHeaderFilter has been deprecated and replaced by ForwardedHeaderTransformer, so forwarded headers can be processed earlier, before the exchange is created. If the filter is configured anyway, it is removed from the filter list and the ForwardedHeaderTransformer is used instead.

Filters

In the WebHandler API you can use WebFilter to apply interception-style logic before and after the rest of the filter processing chain and the target WebHandler. When using the WebFlux configuration, registering a WebFilter is extremely simple - we declare it as a Spring bean and (optionally) express the precedence level using the @Order annotation in the bean declaration or by implementing the Ordered class.

CORS

Spring WebFlux provides fine-grained CORS configuration support through controller annotations. However, if you are using it with Spring Security, we recommend relying on the built-in CorsFilter, which should be in order in front of the entire Spring Security filter chain.

Exceptions

In the WebHandler API, you can use WebExceptionHandler to handle exceptions from a chain of WebFilter instances and a target WebHandler. When using a WebFlux configuration, registering a WebExceptionHandler is as simple as declaring it as a Spring bean and (optionally) expressing precedence using the @Order annotation on the bean declaration or by implementing a class Ordered.

The following table describes the available WebExceptionHandler implementations:

Exception handler Description

ResponseStatusExceptionHandler

Provides handling of exceptions of type ResponseStatusException by setting the response to the HTTP status code corresponding to the exception.

WebFluxResponseStatusExceptionHandler

The ResponseStatusExceptionHandler extension, which can also determine the HTTP status code from the @ResponseStatus annotation for any exception.

This handler is declared in WebFlux Config.

Codecs

Modules spring-web and spring-core provide a means to support the serialization and deserialization of byte content to and from higher-level objects via non-blocking reverse I/O from the Reactive Streams specification. These supports are described below:

  • Encoder and Decoder are low-level contracts for encoding and decoding content regardless HTTP protocol.

  • HttpMessageReader and HttpMessageWriter are contracts for encoding and decoding the content of HTTP messages.

  • Encoder can be wrapped in EncoderHttpMessageWriter to adapt it for use in a web application, and Decoder can be wrapped in DecoderHttpMessageReader.

  • DataBuffer abstracts various representations of byte buffers (e.g. ByteBuf, java.nio.ByteBuffer for Netty, etc.), and this is what all codecs work with.

The spring-core module contains implementations of the encoder and decoder byte[], ByteBuffer, DataBuffer, Resource and String. The spring-web module contains Jackson JSON, Jackson Smile, JAXB2, Protocol Buffers, and other encoders and decoders along with implementations of web-centric HTTP message readers and writers for form data, multipart content, events sent server, etc.

ClientCodecConfigurer and ServerCodecConfigurer are typically used to configure and configure codecs for use in an application.

Jackson JSON

JSON format and binary JSON (Smile) are supported where available Jackson libraries.

Jackson2Decoder works as follows:

  • The Jackson library's asynchronous, non-blocking parser is used to combine a stream of byte fragments into instances of TokenBuffer, each of which is a JSON object.

  • Each TokenBuffer is passed to the ObjectMapper library Jackson to create a higher-level object.

  • When decoding to a single-value publisher (such as Mono), there is one TokenBuffer.

  • When decoding to a multi-valued publisher (for example, Flux), each TokenBuffer is passed to a ObjectMapper as soon as enough bytes of information have been received for a fully formed object. The input content can be a JSON array or any JSON line delimited format such as NDJSON, JSON Lines or JSON Text Sequences.

Jackson2Encoder works like this:

  • For a publisher with a single value (for example, Mono), just serialize it via ObjectMapper.

  • For a multi-valued publisher with application/json by default, collect the values using Flux#collectToList(), and then serialize the resulting collection.

  • For a multi-valued publisher with a streaming media type, such as application/x-ndjson or application/stream+x-jackson-smile, encode, write and reset each value separately using the format JSON with line delimitation. Other types of streaming media may be registered with the encoder.

  • In the case of SSE (server sent events), Jackson2Encoder is called for each event and the output is reset to ensure transmission without delay.

Default and Jackson2Encoder , and Jackson2Decoder do not support String elements. Instead, by default, the string or sequence of strings is assumed to be serialized JSON content, which will be rendered using CharSequenceEncoder. If you need to generate a JSON array from Flux<String>, use Flux#collectToList() and encode Mono<List<String>>.

Form Data

FormHttpMessageReader and FormHttpMessageWriter support decoding and encoding of application/x-www-form-urlencoded content.

On the server side, where form content often must be accessed from multiple places, ServerWebExchange provides a special method getFormData(), which parses the content via FormHttpMessageReader and then caches the result for re-access.

After using getFormData() the original raw content can no longer be read from the request body. For this reason, applications are expected to sequentially pass through ServerWebExchange to access cached form data instead of reading from the raw request body.

Multicomponentity

MultipartHttpMessageReader and MultipartHttpMessageWriter support decoding and encoding of "multipart/form-data" content. In turn, the MultipartHttpMessageReader delegates to another HttpMessageReader to do the actual parsing into a Flux<Part>, and then simply assembles the parts into a MultiValueMap. The default is DefaultPartHttpMessageReader, but this can be changed using ServerCodecConfigurer. For more information about DefaultPartHttpMessageReader, please refer to javadoc on DefaultPartHttpMessageReader.

On the server side, where the content of a multipart form may need to be accessed from multiple places, ServerWebExchange provides a special method getMultipartData() that parses the content through the MultipartHttpMessageReader and then caches the result for re-access.

After using the getMultipartData() parameter, the original raw content can no longer be read from the request body. For this reason, applications must constantly use the getMultipartData() parameter to access components multiple times as a Map, or rely on SynchronossPartHttpMessageReader to access Flux<Part> .

Limitations

Decoder and HttpMessageReader implementations that buffer part or all of the input stream may be configured with a limit on the maximum number of bytes to buffer in memory. In some cases, buffering occurs because the input data is aggregated and represented as a single object - for example, a controller method with the annotation @RequestBody byte[], x-www-form-urlencoded data and so on. Buffering can also occur in streaming if the input stream is split—for example, delimited text, a JSON stream of objects, and so on. For these streaming cases, the limit applies to the number of bytes associated with a single object in the stream.

To configure buffer sizes, you can check whether a given Decoder or HttpMessageReader is opening. maxInMemorySize property, and if so, the Javadoc will contain details about the default values. On the server side, ServerCodecConfigurer provides a single location from which all codecs can be installed. On the client side, the limit for all codecs can be changed in WebClient.Builder.

For multi-component parsing, the maxInMemorySize property limits the size of non-file components. For file components, it defines the threshold at which the component is written to disk. For file components written to disk, there is an additional property maxDiskUsagePerPart that limits the amount of disk space for each component. There is also a maxParts property to limit the total number of components in a multi-part request. To configure all three components in WebFlux, you need to specify a pre-configured instance of MultipartHttpMessageReader in ServerCodecConfigurer.

Streaming

When streaming data in an HTTP response (for example, text/event-stream, application/x-ndjson), it is important to transmit the data periodically in order to accurately detect a disconnected client as early as possible. This way you can only send a comment, an empty SSE event, or any other “empty operations” data that will effectively serve as a heartbeat message.

DataBuffer

DataBuffer is a representation for a byte buffer in WebFlux. It is important to understand that on some servers, such as Netty, byte buffers are pooled and reference-counted, and must be deallocated when consumed to avoid memory leaks.

For WebFlux applications, there is usually no need to worry about such problems, unless they consume and produce data buffers directly instead of using codecs to convert to and from higher-level objects, or unless they create their own codecs.

Logging

Logging at the DEBUG level in Spring WebFlux is designed to be compact, simple, and human-friendly. It focuses on the most significant bits of information that will be used again and again, as opposed to others that are only used when debugging a specific problem.

Logging at the TRACE level generally follows those same principles as DEBUG (and, for example, should also not be overloaded), but can be used to debug any problem. In addition, some log messages may exhibit different levels of detail at the TRACE and DEBUG levels.

Proper logging depends on experience with logs. If you notice something that does not meet its stated goals, please let us know.

Log ID

In WebFlux, a single request can be executed in multiple threads, and the thread ID is useless to match log messages related to a specific request. This is why WebFlux log messages are by default prefixed with the specific request ID.

On the server side, the log ID is stored in the ServerWebExchange attribute (LOG_ID_ATTRIBUTE) , and a fully formatted prefix based on this identifier is available from ServerWebExchange#getLogPrefix(). On the WebClient side, the log ID is stored in the ClientRequest attribute (LOG_ID_ATTRIBUTE), and the fully formatted prefix is available from ClientRequest#logPrefix().

Confidential data

Log at the DEBUG and TRACE levels can log sensitive information . This is why form parameters and headers are masked by default, and you need to explicitly enable full logging of them.

The following example shows how to do this for server-side requests:

Java

@Configuration
@EnableWebFlux
class MyConfig implements WebFluxConfigurer {
@Override
public void
configureHttpMessageCodecs(ServerCodecConfigurer configurer ) {
configurer.defaultCodecs().enableLoggingRequestDetails(true);
}
}
Kotlin

@Configuration
@EnableWebFlux
class  MyConfig : WebFluxConfigurer {
 override fun  configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true)
}
}

The following example shows how to do this for client-side requests:

Java

Consumer<ClientCodecConfigurer> ; consumer = configurer ->
configurer.defaultCodecs().enableLoggingRequestDetails(true);
WebClient webClient = WebClient.builder()
.exchangeStrategies(strategies -> strategies.codecs(consumer))
.build();
Kotlin

val consumer: (ClientCodecConfigurer) -> Unit = { configurer -> configurer.defaultCodecs().enableLoggingRequestDetails(true) }
val webClient = WebClient.builder()
.exchangeStrategies( { strategies -> strategies.codecs(consumer) })
.build()

Appenders

Logging libraries such as SLF4J and Log4J 2, provide asynchronous loggers that avoid blocking. While they have their drawbacks, such as potentially missing messages that cannot be queued for logging, they are the best available options for use in reactive, non-blocking applications.

Custom Codecs

Applications can register custom codecs to support additional data transfer types or specific operating logic that are not supported by default codecs.

Some configuration options expressed by developers apply to default codecs. Custom codecs may need to be obtained ability to conform to these settings, such as forcing limit buffering or logging sensitive data.

The following example shows how to do this for client-side requests:

Java

WebClient webClient = WebClient.builder()
.codecs(configurer -> {
        CustomDecoder decoder = new CustomDecoder();
        configurer.customCodecs().registerWithDefaultConfig(decoder);
})
.build();
Kotlin

val webClient = WebClient.builder()
.codecs({ configurer ->
        val decoder = CustomDecoder()
        configurer.customCodecs().registerWithDefaultConfig(decoder)
})
.build()
Comments
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION