RSocketRequester
provides a free API for making RSocket requests, accepting and returning objects for data and metadata instead of low-level data buffers. It can be used symmetrically, to make requests from clients and to make requests from servers.
Client-side requester
To receive RSocketRequester
on the client side, you need to connect to the server, which involves sending a SETUP
frame via the RSocket protocol with connection settings. RSocketRequester
provides a build tool that helps prepare io.rsocket.core.RSocketConnector
, including connection settings for the SETUP
frame.
This is the easiest way to connect with default settings:
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
With the above, the connection is established not right away. When requests are received, a shared connection is transparently established and used.
Connection setup
RSocketRequester.Builder
provides the following to set up the initial frame SETUP
:
dataMimeType(MimeType)
– sets the MIME data type for the connection.metadataMimeType(MimeType)
– sets the MIME type of metadata for the connection.setupData(Object)
– data to include inSETUP
.setupRoute(String, Object…)
– route in metadata to be included inSETUP
.setupMetadata(Object, MimeType)
– other metadata to include inSETUP
.
For data, the default MIME type is determined by the first configured Decoder
. For metadata, the default MIME type is composite metadata, which allows use multiple pairs of metadata values and MIME types for each request. Typically, both do not require replacement.
The data and metadata in the SETUP
frame are optional. On the server side, methods marked with the @ConnectMapping annotation can be used to handle the start of a connection and the contents of a SETUP
frame. Metadata can be used to provide connection-level security.
Strategies
RSocketRequester.Builder
accepts RSocketStrategies
to configure the requester. You will need to use it to pass frames and decoders to (de)-serialize data and metadata values. By default, only the base codecs from spring-core
for String
, byte[]
and ByteBuffer
are registered. Adding spring-web
provides access to more functions, which can be registered like this:
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
is intended to be reusable. In some scenarios, such as when the client and server are in the same application, it may be preferable to declare this in the Spring configuration.
Client-side responders
RSocketRequester.Builder
can be used to set up responders that react to requests from the server.
You can use annotated handlers for client-side responding, based on the same infrastructure as the backend, but registered programmatically as follows:
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))
.tcp("localhost", 7000);
- Use
PathPatternRouteMatcher
, ifspring-web
is available, for more efficient route selection. - Create a responder from a class with methods marked with the annotation
@MessageMapping
and/or@ConnectMapping
. - Register responder.
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }
.tcp("localhost", 7000)
- Use
PathPatternRouteMatcher
ifspring-web
is available for more efficient route selection. - Create a responder from a class with methods marked with the annotation
@MessageMapping
and/or@ConnectMapping
. - Registering a responder.
Note that the above abbreviation is solely for programmatically registering client responders. In alternative scenarios, if the client responders are in a Spring configuration, you can declare RSocketMessageHandler
as a Spring bean and then use it like this:
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
For the above, you may also need to use setHandlerPredicate
in RSocketMessageHandler
to switch to a different strategy for detecting client responders, for example based on a custom annotation such as @RSocketClientResponder
, rather than the standard @Controller
. This is necessary in scenarios with a client and server or multiple clients in the same application.
Advanced mode
RSocketRequesterBuilder
provides the ability to make a callback that opens the underlying io.rsocket.core.RSocketConnector
to access additional configuration options for keepalive intervals, session resumes, interceptors, etc. You can configure options at this level as follows:
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
Server-side requester
To make requests from the server to connected clients, you need to receive a requester from the server for the connected client.
In annotated responders, methods with the @ConnectMapping
and @MessageMapping
annotations support the RSocketRequester
argument. Use it to access a specific requester for a specific connection. Remember that methods marked with the @ConnectMapping
annotation are essentially handlers for the SETUP
frame, which must be processed before requests are sent. Therefore, requests must be separated from the processing procedure at the very beginning. For example:
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> {
// ...
});
return ...
}
- Run the request asynchronously, regardless of the processing procedure.
- Perform processing and return completion
Mono<Void>
.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect {
// ...
}
}
/// ... (2)
}
- We launch the request asynchronously, regardless of the processing procedure.
- Perform handling in the suspending function.
Requests
Immediately after receiving a request for a client or server, requests can be sent as follows:
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlux(AirportLocation.class);
- Set a route to include in the metadata request message.
- Specify the data for the request message.
- Declare the expected response.
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within")
.data(viewBox)
.retrieveFlow<AirportLocation>()
- Set the route to be included in the metadata of the request message.
- Specify the data for the request message.
- Declare the expected response.
The type of interaction is determined implicitly by the cardinality of the set ) input and output. The above example is a Request-Stream
because it sends a single value and receives a stream of values. For the most part, there is no need to think about this as long as the input and output choices match the type of RSocket interaction and the types of input and output expected by the responder. The only example of an invalid combination is a many-to-one combination.
The data(Object)
method also accepts any Publisher
from the Reactive Streams specification, including Flux
and Mono
, as well as any other value generator registered in the ReactiveAdapterRegistry
. In the case of a multi-valued Publisher
such as Flux
that generates the same value types, consider using one of the data
overloads to avoid type checking and search Encoder
for each element:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
The data(Object)
step is optional. We skip it in case of requests that do not send data:
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
Additional metadata values can be added if composite metadata (default) and if the values are supported by the registered Encoder
. For example:
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
For Fire-and-Forget
, use the send()
method, which returns Mono<Void>
. Note that Mono
only indicates that the message was successfully sent, not that it was processed.
For Metadata-Push
use the sendMetadata()
method with the return value Mono<Void>
.
GO TO FULL VERSION