RSocketRequester provides a free API for making RSocket requests, accepting and returning objects for data and metadata instead of low-level data buffers. It can be used symmetrically, to make requests from clients and to make requests from servers.

Client-side requester

To receive RSocketRequester on the client side, you need to connect to the server, which involves sending a SETUP frame via the RSocket protocol with connection settings. RSocketRequester provides a build tool that helps prepare io.rsocket.core.RSocketConnector, including connection settings for the SETUP frame.

This is the easiest way to connect with default settings:

Java

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin

val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

With the above, the connection is established not right away. When requests are received, a shared connection is transparently established and used.

Connection setup

RSocketRequester.Builder provides the following to set up the initial frame SETUP:

  • dataMimeType(MimeType) – sets the MIME data type for the connection.

  • metadataMimeType(MimeType) – sets the MIME type of metadata for the connection.

  • setupData(Object) – data to include in SETUP.

  • setupRoute(String, Object…) – route in metadata to be included in SETUP .

  • setupMetadata(Object, MimeType) – other metadata to include in SETUP.

For data, the default MIME type is determined by the first configured Decoder. For metadata, the default MIME type is composite metadata, which allows use multiple pairs of metadata values and MIME types for each request. Typically, both do not require replacement.

The data and metadata in the SETUP frame are optional. On the server side, methods marked with the @ConnectMapping annotation can be used to handle the start of a connection and the contents of a SETUP frame. Metadata can be used to provide connection-level security.

Strategies

RSocketRequester.Builder accepts RSocketStrategies to configure the requester. You will need to use it to pass frames and decoders to (de)-serialize data and metadata values. By default, only the base codecs from spring-core for String, byte[] and ByteBuffer are registered. Adding spring-web provides access to more functions, which can be registered like this:

Java

RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin

val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000) 

RSocketStrategies is intended to be reusable. In some scenarios, such as when the client and server are in the same application, it may be preferable to declare this in the Spring configuration.

Client-side responders

RSocketRequester.Builder can be used to set up responders that react to requests from the server.

You can use annotated handlers for client-side responding, based on the same infrastructure as the backend, but registered programmatically as follows:

Java

RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())   
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))  
.tcp("localhost", 7000);
        
  1. Use PathPatternRouteMatcher, if spring-web is available, for more efficient route selection.
  2. Create a responder from a class with methods marked with the annotation @MessageMapping and/or @ConnectMapping.
  3. Register responder.
Kotlin

val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())   
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }   
.tcp("localhost", 7000)
        
  1. Use PathPatternRouteMatcher if spring-web is available for more efficient route selection.
  2. Create a responder from a class with methods marked with the annotation @MessageMapping and/or @ConnectMapping .
  3. Registering a responder.

Note that the above abbreviation is solely for programmatically registering client responders. In alternative scenarios, if the client responders are in a Spring configuration, you can declare RSocketMessageHandler as a Spring bean and then use it like this:

Java

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin

import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)

For the above, you may also need to use setHandlerPredicate in RSocketMessageHandler to switch to a different strategy for detecting client responders, for example based on a custom annotation such as @RSocketClientResponder, rather than the standard @Controller. This is necessary in scenarios with a client and server or multiple clients in the same application.

Advanced mode

RSocketRequesterBuilder provides the ability to make a callback that opens the underlying io.rsocket.core.RSocketConnector to access additional configuration options for keepalive intervals, session resumes, interceptors, etc. You can configure options at this level as follows:

Java

RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin

val requester = RSocketRequester.builder()
.rsocketConnector {
    //...
}
.tcp("localhost", 7000)

Server-side requester

To make requests from the server to connected clients, you need to receive a requester from the server for the connected client.

In annotated responders, methods with the @ConnectMapping and @MessageMapping annotations support the RSocketRequester argument. Use it to access a specific requester for a specific connection. Remember that methods marked with the @ConnectMapping annotation are essentially handlers for the SETUP frame, which must be processed before requests are sent. Therefore, requests must be separated from the processing procedure at the very beginning. For example:

Java

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { 
    // ...
});
return ... 
}
  1. Run the request asynchronously, regardless of the processing procedure.
  2. Perform processing and return completion Mono<Void>.
Kotlin

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 
    // ...
}
}
/// ... (2)
}
  1. We launch the request asynchronously, regardless of the processing procedure.
  2. Perform handling in the suspending function.

Requests

Immediately after receiving a request for a client or server, requests can be sent as follows:

Java

ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlux(AirportLocation.class); 
  1. Set a route to include in the metadata request message.
  2. Specify the data for the request message.
  3. Declare the expected response.
Kotlin

val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlow<AirportLocation>()
  1. Set the route to be included in the metadata of the request message.
  2. Specify the data for the request message.
  3. Declare the expected response.

The type of interaction is determined implicitly by the cardinality of the set ) input and output. The above example is a Request-Stream because it sends a single value and receives a stream of values. For the most part, there is no need to think about this as long as the input and output choices match the type of RSocket interaction and the types of input and output expected by the responder. The only example of an invalid combination is a many-to-one combination.

The data(Object) method also accepts any Publisher from the Reactive Streams specification, including Flux and Mono, as well as any other value generator registered in the ReactiveAdapterRegistry. In the case of a multi-valued Publisher such as Flux that generates the same value types, consider using one of the data overloads to avoid type checking and search Encoder for each element:


data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

The data(Object) step is optional. We skip it in case of requests that do not send data:

Java

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin

import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()

Additional metadata values can be added if composite metadata (default) and if the values are supported by the registered Encoder. For example:

Java

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
        
Kotlin

import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()

For Fire-and-Forget, use the send() method, which returns Mono<Void>. Note that Mono only indicates that the message was successfully sent, not that it was processed.

For Metadata-Push use the sendMetadata() method with the return value Mono<Void>.