RSocketRequester proporciona una API gratuita para realizar solicitudes de RSocket, aceptar y devolver objetos para datos y metadatos en lugar de buffers de datos de bajo nivel. Se puede utilizar de forma simétrica, para realizar solicitudes desde clientes y para realizar solicitudes desde servidores.

Solicitante del lado del cliente

Para recibir RSocketRequester en el lado del cliente , debe conectarse al servidor, lo que implica enviar una trama SETUP a través del protocolo RSocket con la configuración de conexión. RSocketRequester proporciona una herramienta de compilación que ayuda a preparar io.rsocket.core.RSocketConnector, incluida la configuración de conexión para el marco SETUP.

Esta es la forma más sencilla de conectarse con la configuración predeterminada:

Java

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin

val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

Con lo anterior, la conexión no se establece de inmediato. Cuando se reciben solicitudes, se establece y utiliza de forma transparente una conexión compartida.

Configuración de la conexión

RSocketRequester.Builder proporciona lo siguiente para configurar el marco inicial SETUP :

  • dataMimeType(MimeType) – establece el tipo de datos MIME para la conexión.

  • metadataMimeType(MimeType): establece el tipo MIME de metadatos para la conexión.

  • setupData(Object) – datos para incluir en SETUP.

  • setupRoute(String, Object…) – ruta en metadatos a incluirse en SETUP .

  • setupMetadata(Object, MimeType) – otros metadatos para incluir en SETUP.

Para los datos, el tipo MIME predeterminado está determinado por el primer Decoder configurado. Para los metadatos, el tipo MIME predeterminado es metadatos compuestos, lo que permite utilice varios pares de valores de metadatos y tipos MIME para cada solicitud. Normalmente, ambos no requieren reemplazo.

Los datos y metadatos en el marco SETUP son opcionales. En el lado del servidor, los métodos marcados con la anotación @ConnectMapping se pueden usar para manejar el inicio de una conexión y el contenido de un marco SETUP. Los metadatos se pueden utilizar para proporcionar seguridad a nivel de conexión.

Strategies

RSocketRequester.Builder acepta RSocketStrategies para configurar el solicitante. Deberá usarlo para pasar fotogramas y decodificadores para (des)serializar datos y valores de metadatos. De forma predeterminada, solo se registran los códecs base de spring-core para String, byte[] y ByteBuffer. Agregar spring-web proporciona acceso a más funciones, que se pueden registrar de esta manera:

Java

RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
        
Kotlin

val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
         

RSocketStrategies está pensado para ser reutilizable. En algunos escenarios, como cuando el cliente y el servidor están en la misma aplicación, puede ser preferible declarar esto en la configuración de Spring.

Contestadores del lado del cliente

RSocketRequester.Builder se puede usar para configurar respondedores que reaccionen a las solicitudes del servidor.

Puede usar controladores anotados para responder del lado del cliente, basados en la misma infraestructura que el backend, pero registrado mediante programación de la siguiente manera:

Java

RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher())   
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder))  
.tcp("localhost", 7000);
        
  1. Utilice PathPatternRouteMatcher, si spring-web está disponible, para selección de ruta más eficiente.
  2. Cree un respondedor a partir de una clase con métodos marcados con la anotación @MessageMapping y/o @ConnectMapping.
  3. Registrar respondedor.
Kotlin

val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher())   
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler());  
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) }   
.tcp("localhost", 7000)
        
  1. Utilice PathPatternRouteMatcher si está disponible spring-web, para una selección de ruta más eficiente.
  2. Cree un respondedor a partir de una clase con métodos marcados con la anotación @MessageMapping y/o @ConnectMapping .
  3. Registrar un respondedor.

Tenga en cuenta que la abreviatura anterior es únicamente para registrar respondedores de clientes mediante programación. En escenarios alternativos, si los respondedores del cliente están en una configuración Spring, puede declarar RSocketMessageHandler como un Spring Bean y luego usarlo así:

Java

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
    
Kotlin

import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
        

Para lo anterior, es posible que también necesite usar setHandlerPredicate en RSocketMessageHandler para cambiar a una estrategia diferente para detectar los respondedores del cliente, por ejemplo basada en una anotación personalizada como @RSocketClientResponder, en lugar de la estándar @Controller. Esto es necesario en escenarios con un cliente y un servidor o varios clientes en la misma aplicación.

Modo avanzado

RSocketRequesterBuilder proporciona la capacidad de realizar una devolución de llamada que abre el io.rsocket.core.RSocketConnector subyacente para acceder a opciones de configuración adicionales para intervalos de mantenimiento de actividad, reanudaciones de sesiones, interceptores, etc. Puede configurar opciones en este nivel de la siguiente manera:

Java

RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
    
Kotlin

val requester = RSocketRequester.builder()
.rsocketConnector {
    //...
}
.tcp("localhost", 7000)
        

Solicitante del lado del servidor

Para realizar solicitudes desde el servidor a los clientes conectados, necesita recibir un solicitante del servidor para el cliente conectado.

En los respondedores anotados, los métodos con las anotaciones @ConnectMapping y @MessageMapping admiten el argumento RSocketRequester. Úselo para acceder a un solicitante específico para una conexión específica. Recuerde que los métodos marcados con la anotación @ConnectMapping son esencialmente controladores para el marco SETUP, que debe procesarse antes de enviar las solicitudes. Por lo tanto, las solicitudes deben separarse del procedimiento de tramitación desde el principio. Por ejemplo:

Java

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { 
    // ...
});
return ... 
}
        
  1. Ejecutar la solicitud de forma asincrónica, independientemente del procedimiento de procesamiento.
  2. Realizar el procesamiento y completar la devolución Mono<Void>.
Kotlin

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 
    // ...
}
}
/// ... (2)
}
    
  1. Lanzamos la solicitud de forma asíncrona, independientemente del procedimiento de procesamiento.
  2. Realizar el manejo en la función de suspensión.

Solicitudes

Inmediatamente después de recibir una solicitud de un cliente o servidor, las solicitudes se pueden enviar de la siguiente manera:

Java

ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlux(AirportLocation.class);
        
  1. Establecer una ruta para incluir en el mensaje de solicitud de metadatos.
  2. Especifique los datos para el mensaje de solicitud.
  3. Declare la respuesta esperada.
Kotlin

val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") 
.data(viewBox) 
.retrieveFlow<AirportLocation>()
        
  1. Establezca la ruta que se incluirá en los metadatos de el mensaje de solicitud.
  2. Especifique los datos para el mensaje de solicitud.
  3. Declare la respuesta esperada.

El tipo de interacción está determinado implícitamente por la cardinalidad del conjunto de entradas y salidas. El ejemplo anterior es un Request-Stream porque envía un valor único y recibe un flujo de valores. En su mayor parte, no es necesario pensar en esto siempre que las opciones de entrada y salida coincidan con el tipo de interacción RSocket y los tipos de entrada y salida esperados por el respondedor. El único ejemplo de una combinación no válida es una combinación de muchos a uno.

El método data(Object) también acepta cualquier Publisher del Reactive. Especificación de transmisiones, incluidos Flux y Mono, así como cualquier otro generador de valores registrado en ReactiveAdapterRegistry. En el caso de un Publisher de múltiples valores como Flux que genera los mismos tipos de valores, considere usar una de las sobrecargas de data para evitar tipos comprobando y buscando Encoder para cada elemento:


data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

El paso data(Object) es opcional. Lo omitimos en caso de solicitudes que no envían datos:

Java

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
        
Kotlin

import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()

Se pueden agregar valores de metadatos adicionales si se utiliza metadatos compuestos (predeterminado) y si los valores son compatibles con el Codificador registrado. Por ejemplo:

Java

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
        
Kotlin

import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
        

Para Fire-and-Forget, utilice el método send(), que devuelve Mono<Void>.Tenga en cuenta que Mono solo indica que el mensaje se envió correctamente, no que se procesó.

Para Metadata-Push utilice el Método sendMetadata() con el valor de retorno Mono<Void>.