El protocolo WebSocket define dos tipos de mensajes (de texto y binarios), pero su contenido no está definido. El protocolo define un mecanismo para que el cliente y el servidor acuerden un subprotocolo (es decir, un protocolo de mensajería de nivel superior) que se utilizará además de WebSocket para determinar qué mensajes puede enviar cada uno, cuál es su formato y el contenido de cada uno, mensaje, etcétera. El uso de un subprotocolo es opcional, pero en cualquier caso se debe acordar algún protocolo entre cliente y servidor que defina el contenido del mensaje.
Breve descripción
Protocolo STOMP (Protocolo simple de mensajería orientada a texto) se creó originalmente para lenguajes de secuencias de comandos ( como Ruby, Python y Perl) para conectarse a intermediarios de mensajes corporativos. Está diseñado para funcionar con un subconjunto simple de patrones de mensajería comúnmente utilizados. STOMP se puede utilizar sobre cualquier protocolo de red de transmisión bidireccional confiable, como TCP y WebSocket. Aunque STOMP es un protocolo orientado a texto, la carga útil del mensaje puede ser en formato texto o binario.
STOMP es un protocolo basado en marcos cuyos marcos se modelan a partir de HTTP. El siguiente listado muestra la estructura del marco STOMP:
COMMAND header1:value1 header2:value2 Body^@
Los clientes pueden usar los comandos SEND
o SUBSCRIBE
para enviar o suscribirse a mensajes
junto con un de encabezado
destino
, que describe de qué trata el mensaje y quién debe recibirlo. Esto le permite crear un
mecanismo de
publicación-suscripción simple que puede usarse para enviar mensajes a través del intermediario a otros clientes
conectados, o para enviar mensajes al servidor solicitando que se realice un trabajo específico.
Si es así Al utilizar el soporte del protocolo STOMP de Spring, una aplicación WebSocket en Spring actúa como un
intermediario STOMP para los clientes. Los mensajes se enrutan a métodos de procesamiento de mensajes marcados con
la anotación @Controller
, o a un simple intermediario en memoria que rastrea las suscripciones y
distribuye mensajes a los usuarios suscritos. También puede configurar Spring para que funcione con un agente STOM
especial (como RabbitMQ, ActiveMQ y otros) para transmitir mensajes. En este caso, Spring admite el establecimiento
de conexiones TCP con el intermediario, le transmite mensajes y reenvía mensajes desde él a los clientes WebSocket
conectados. De esta manera, las aplicaciones web Spring pueden utilizar seguridad unificada basada en HTTP,
validación común y un modelo de programación familiar para el procesamiento de mensajes.
El siguiente ejemplo
muestra un cliente que se suscribe para recibir cotizaciones de acciones que el servidor puede generar.
periódicamente (por ejemplo, a través de una tarea programada que envía mensajes a través de SimpMessagingTemplate
al broker):
SUBSCRIBE id:sub-1 destination:/topic/price.stock.* ^@
El siguiente ejemplo muestra una cliente que envía una solicitud comercial que el servidor puede
procesar usando un método con la anotación @MessageMapping
:
SEND destination:/queue/trade content-type:application/json content-length:44 {"action":"BUY","ticker":"MMM","shares",44}^@
Una vez ejecutado, el servidor puede enviar un mensaje de confirmación de la transacción e información detallada al cliente.
El significado del La
dirección de destino se deja intencionalmente opaca en la especificación STOMP. Puede ser cualquier cadena y los
servidores STOMP definen completamente la semántica y la sintaxis de las direcciones de destino que admiten. Sin
embargo, muy a menudo los destinos son cadenas como rutas, donde /topic/..
implica
publicación-suscripción (uno a muchos) y /queue/
implica mensajería". a punto" (uno a uno).
Los
servidores STOMP pueden usar el comando MESSAGE
para enviar mensajes a todos los suscriptores. El
siguiente ejemplo muestra el servidor enviando una cotización de acciones a un cliente suscrito:
MESSAGE message-id:nxahklf6-1 subscription:sub-1 destination:/topic/price.stock.MMM {"ticker":"MMM","price":129.45}^@
El servidor no puede enviar mensajes no solicitados. Todos los mensajes del servidor deben ser en
respuesta a una suscripción de cliente específica, y el encabezado subscription-id
del mensaje del
servidor debe coincidir con el encabezado id
de la suscripción del cliente.
Anterior breve Esta descripción tiene como objetivo proporcionar una comprensión muy básica del protocolo STOMP. Le recomendamos que lea la especificación del protocolo completa.
Ventajas
El uso de STOMP como subprotocolo permite que Spring Framework y Spring Security proporcionen un modelo de programación más rico que el uso de WebSockets sin formato del protocolo WebSocket. Lo mismo se puede decir acerca de comparar HTTP con TCP sin formato y cómo esto permite que Spring MVC y otros marcos web proporcionen una funcionalidad rica. A continuación se muestra una lista de ventajas:
No es necesario inventar un protocolo de mensajería ni un formato de mensaje personalizados.
Clientes STOMP, incluidos un cliente Java, están disponibles en Spring Framework.
Puede (opcionalmente) utilizar intermediarios de mensajes (como RabbitMQ, ActiveMQ y otros) para administrar suscripciones y transmitir mensajes.
La lógica de la aplicación se puede organizar en cualquier número de instancias
@Controller
y los mensajes se pueden enrutar a ellas según el encabezado de la dirección de destino de STOMP, como en lugar de procesar mensajes WebSocket sin procesar usando un únicoWebSocketHandler
para una conexión determinada.Puedes usar Spring Security para proteger mensajes basados en destinos y mensajes STOMP. tipos.
Activación de STOMP
La compatibilidad con STOMP a través de WebSocket está disponible en spring-messaging
y módulos spring-websocket
.
Una vez que tenga estas dependencias implementadas, puede exponer los puntos finales
STOMP a través de WebSocket usando un respaldo a través del protocolo SockJS, como se muestra en el siguiente
ejemplo:
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic", "/queue");
}
}
/portfolio
es la URL HTTP para el punto final al que el cliente WebSocket (o SockJS) debe conectarse para confirmar el protocolo de enlace de WebSocket.- Mensajes STOMP cuyo encabezado de destino comienza con
/app
se enrutan a métodos anotados con@MessageMapping
en clases con la anotación@Controller
. - Usamos el intermediario de mensajes integrado para suscripción y difusión y envío de mensajes cuyo
encabezado de destino comience con
/topic ` o `/queue
al intermediario.
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
/topic
y /queue
no tienen ningún significado especial. Son simplemente una convención para
diferenciar entre mensajería editor-suscriptor y de igual a igual (es decir, entre muchos suscriptores y un solo
consumidor). Si está utilizando un broker externo, consulte la página STOMP del broker para comprender qué
destinos y prefijos STOMP admite.
Para conectarse desde el navegador, cuando utilice SockJS, puede utilizar sockjs-client
. Para STOMP,
muchas aplicaciones utilizaron la biblioteca (también conocida
como stomp.js). , que es completamente funcional y se ha utilizado en producción durante muchos años, pero ya no
cuenta con soporte. Actualmente, JSteunou/webstomp-client
es el sucesor de esta biblioteca que se mantiene y desarrolla más activamente. El siguiente ejemplo de código se
basa en él:
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}
Además, si la conexión se realiza a través de WebSocket (sin SockJS), entonces puede utilice el siguiente código:
var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}
Tenga en cuenta que en el ejemplo anterior de stompClient
, no necesita especificar los
encabezados login
y passcode
. Incluso si lo hiciera, serían ignorados (o más bien
anulados) en el lado del servidor.
Para obtener más ejemplos de código, consulte:
"Uso de WebSocket para crear una aplicación web interactiva": guía de introducción.
Cartera de acciones - aplicación de muestra.
Servidor WebSocket
Para configurar un servidor WebSocket básico, información de la sección "Servidor" es la configuración
aplicable". Sin embargo, en el caso de Jetty, debe configurar HandshakeHandler
y WebSocketPolicy
mediante StompEndpointRegistry
:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
Flujo de mensajes
Si el punto final STOMP está abierto, la aplicación Spring se convierte en un intermediario STOMP para los clientes conectados. Esta sección describe el flujo de mensajes del lado del servidor.
El módulo spring-messaging
proporciona soporte fundamental para las aplicaciones de mensajería que
se originaron en el proyecto Integración de
Spring y luego se extrajeron e incorporaron en Spring Framework para un uso más amplio en muchos Proyectos de primavera y escenarios de
aplicación. La siguiente lista describe brevemente algunas de las abstracciones de mensajería disponibles:
Mensaje: una representación simple del mensaje, incluidos encabezados y carga útil.
MessageHandler: Acuerdo de procesamiento de mensajes.
MessageChannel: una convención de envío de mensajes que permite la libre comunicación entre remitentes y destinatarios.
SubscribableChannel:
MessageChannel
con suscriptores deMessageHandler
.ExecutorSubscribableChannel:
SubscribableChannel
que usaExecutor
para entregar mensajes.
Tanto la configuración de Java (es decir, la anotación @EnableWebSocketMessageBroker
) como la
configuración del espacio de nombres XML (que Esto es <websocket:message-broker>
) utiliza los
componentes anteriores para ensamblar el flujo de trabajo de paso de mensajes. El siguiente diagrama muestra los
componentes utilizados al activar un intermediario de mensajes integrado simple:
![](https://cdn.codegym.cc/images/article/b9213687-0ff3-47ca-9ea0-c29bad416f6f/800.jpeg)
El diagrama anterior muestra tres canales de mensajes:
clientInboundChannel
: Para transmitir mensajes recibidos de clientes WebSocket.clientOutboundChannel
: Para enviar servidor mensajes del lado del cliente WebSocket.brokerChannel
: para enviar mensajes al intermediario de mensajes desde el código de la aplicación del lado del servidor.
El siguiente diagrama muestra los componentes utilizados si un agente externo (como RabbitMQ) está configurado para administrar suscripciones y transmisiones de mensajes:
![](https://cdn.codegym.cc/images/article/97b78063-332d-407a-bba6-0d396073c151/800.jpeg)
La principal diferencia entre los dos esquemas anteriores es utilizar una "retransmisión de intermediario" para pasar mensajes a un intermediario STOMP externo a través de TCP y transmitir mensajes desde el intermediario a los clientes suscriptores.
Si los mensajes son recibidos de una conexión WebSocket, se decodifican en marcos STOMP, se convierten en una
representación Message
de Spring y se envían a lo largo del clientInboundChannel
para
su posterior procesamiento. Por ejemplo, los mensajes STOMP cuyos encabezados de destino comienzan con
/app
se pueden enrutar a métodos con la anotación @MessageMapping
en controladores anotados, mientras
que los mensajes /topic
y /queue
se pueden enviar directamente al intermediario de
mensajes.
Un @Controller
anotado que maneja el mensaje STOMP del cliente puede enviar un mensaje al
intermediario de mensajes a través de brokerChannel
, y el intermediario transmite el mensaje a los
suscriptores apropiados a través de clientOutboundChannel
. El mismo controlador puede hacer lo
mismo en respuesta a solicitudes HTTP, por lo que el cliente puede realizar un método HTTP POST y luego el
método con la anotación @PostMapping
puede enviar un mensaje al intermediario de mensajes para
distribuirloel resto de la asignación suscriptores.
Podemos seguir este proceso con un ejemplo sencillo. Considere el siguiente ejemplo en el que se configura el servidor:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}
El ejemplo anterior admite el siguiente flujo:
El cliente se conecta a
http://localhost:8080/portfolio
y tan pronto como se establece la conexión WebSocket, comienzan los fotogramas STOMP para ser transmitido a través de él.El cliente envía una trama SUBSCRIBE con un encabezado de destino
/topic/greeting
. Una vez recibido y decodificado, el mensaje se envía alclientInboundChannel
y luego se enruta al intermediario de mensajes, que almacena la suscripción del cliente.El cliente envía un ENVIAR marco a
/app/greeting
. El prefijo/app
ayuda a enrutarlo a los controladores anotados. Después de eliminar el prefijo/app
, el resto de la asignación/greeting
se asigna a un método anotado con@MessageMapping
enGreetingController
.El valor devuelto por
GreetingController
se convierte en unMensaje
de Spring con una carga útil basada en el valor de retorno y un encabezado de destino por defecto/topic/greeting
(derivado del destino de entrada, reemplazando/app
con/topic
). El mensaje recibido se envía albrokerChannel
y el intermediario de mensajes lo procesa.El intermediario de mensajes encuentra todos los suscriptores elegibles y les envía un MENSAJE a cada uno de ellos. marco en el
clientOutboundChannel
, desde donde los mensajes se codifican como marcos STOMP y se envían a través de una conexión WebSocket.
La siguiente sección contiene más detalles sobre las anotaciones. métodos, incluidos los tipos de argumentos admitidos y los valores de retorno.
Controladores anotados
Las aplicaciones pueden usar clases marcadas con la anotación @Controller
para procesar mensajes de
los clientes. Estas clases pueden declarar métodos con las anotaciones @MessageMapping
, @SubscribeMapping
y @ExceptionHandler
, como se describe en los siguientes temas:
@MessageMapping
@SubscribeMapping
@MessageExceptionHandler
@MessageMapping
Puedes usar el anotación@MessageMapping
para anotar métodos que enrutan mensajes según su
destino. Se admite tanto a nivel de método como a nivel de tipo. A nivel de tipo, la anotación
@MessageMapping
se utiliza para expresar asignaciones comunes para todos los métodos del controlador.
De forma predeterminada, los valores de asignación son patrones de ruta estilo Ant (por ejemplo ,
/thing*
, /thing/**
), incluida la compatibilidad con variables de plantilla (por
ejemplo, /thing/{id}
). Se puede hacer referencia a los valores a través de los argumentos de un
método marcado con la anotación @DestinationVariable
. Las aplicaciones también pueden cambiar a una
convención de destino: separar el mensaje para las asignaciones.
Argumentos de métodos admitidos
La siguiente tabla describe los argumentos de los métodos:
Argumento del método | Descripción |
---|---|
|
Proporciona acceso al mensaje completo . |
|
Proporciona acceso a encabezados dentro de un |
|
Proporciona acceso a los encabezados mediante métodos de acceso escritos. |
|
Proporciona acceso a la carga útil del mensaje convertida (por ejemplo, desde JSON) utilizando el
Esta anotación es opcional, ya que se proporciona de forma predeterminada si no coincide ningún otro argumento. Puede anotar argumentos de carga útil
con |
|
Proporciona acceso a un valor de encabezado específico, junto con una conversión de tipo usando
|
|
Proporciona acceso a todos encabezados en el mensaje. Este argumento debe poder asignarse a |
|
Proporciona acceso a las variables de plantilla extraídas del destino del mensaje. Los valores se convierten si es necesario según el tipo declarado del argumento del método. |
|
Refleja el usuario que inició sesión durante el protocolo de enlace de WebSocket a través de HTTP. |
Volver Valores
De forma predeterminada, el valor de retorno de un método marcado con la anotación @MessageMapping
se serializa en una carga útil a través del MessageConverter
correspondiente y se envía como un
Message
a través de brokerChannel
, desde donde se envía a los suscriptores. El
propósito de un mensaje saliente es el mismo que el de un mensaje entrante, pero con el prefijo
/topic
.
Puedes usar @SendTo
y anotaciones @SendToUser
para configurar el destino del mensaje
de salida. La anotación @SendTo
se utiliza para configurar un destino o para especificar varios
destinos. La anotación @SendToUser
se usa para dirigir el mensaje de salida solo al usuario
asociado con el mensaje de entrada.
Puede usar @SendTo
y @SendToUser
al mismo tiempo code> en el mismo método, ambos
métodos son compatibles a nivel de clase, en cuyo caso serán los predeterminados para los métodos de la clase.
Sin embargo, recuerde que cualquier anotación con @SendTo
o @SendToUser
a nivel de
método anula cualquier anotación similar a nivel de clase.
Los mensajes se pueden procesar de forma asíncrona, y un método con la anotación @MessageMapping
puede devolver un ListenableFuture
, CompletableFuture
o
CompletionStage
.
Nota Tenga en cuenta que las anotaciones @SendTo
y @SendToUser
son simplemente una
medida de conveniencia equivalente a usar SimpMessagingTemplate
para enviar mensajes. Si es
necesario, en escenarios más complejos, los métodos marcados con la anotación @MessageMapping
pueden recurrir a SimpMessagingTemplate
directamente. Esto puede ocurrir en lugar de, o quizás
además de, devolver un valor.
@SubscribeMapping
La anotación @SubscribeMapping
es lo mismo que @MessageMapping
, pero limita la
asignación a solo mensajes de suscripción. Admite los mismos argumentos de método que la anotación @MessageMapping
.
Sin embargo, en el caso de un valor de retorno, de forma predeterminada, el mensaje se envía directamente al
cliente (a través de clientOutboundChannel
, en respuesta a una suscripción) en lugar de al
intermediario (a través de brokerChannel
, como una transmisión a las suscripciones
correspondientes). Agregar una anotación @SendTo
o una anotación @SendToUser
anula
esta lógica y envía mensajes al intermediario.
¿Cuándo resulta práctico? Supongamos que el intermediario está asignado a /topic
y
/queue
, y que los controladores de la aplicación están asignados a /app
. Con esta
configuración, el corredor almacena todas las suscripciones a /topic
y /queue
para
envíos de correo repetidos, y la aplicación no necesita participar en esto. El cliente también puede suscribirse
a alguna dirección de destino /app
y el controlador puede devolver un valor en respuesta a esa
suscripción sin la participación del intermediario, sin almacenar ni reutilizar la suscripción (de hecho, una
solicitud-respuesta única). intercambio) ). Un caso de uso para este mecanismo es completar la interfaz de
usuario con datos iniciales al inicio.
¿Cuándo no resulta práctico? No intente asignar el agente y los controladores al mismo prefijo de dirección de destino a menos que por alguna razón necesite que ambos procesen mensajes, incluidas las suscripciones, de forma independiente. Los mensajes entrantes se procesan en paralelo. No hay garantías de si el corredor o el controlador procesará este mensaje primero. Si el objetivo es recibir una notificación de que la suscripción se ha guardado y está lista para transmitirse, el cliente debe solicitar un acuse de recibo si el servidor lo admite (un simple broker no lo hace). Por ejemplo, utilizando el cliente STOMP en Java, puede hacer lo siguiente para agregar un acuse de recibo:
@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// Durante la inicialización...
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// Al suscribirse...
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
// Suscripción lista...
});
En el lado del servidor, puede registrar ExecutorChannelInterceptor
para brokerChannel
e implementar el método afterMessageHandled
, que se llama después de que se hayan procesado los
mensajes, incluidas las suscripciones.
@MessageExceptionHandler
Una aplicación puede utilizar métodos anotados con @MessageExceptionHandler
para manejar excepciones
de métodos anotados con @MessageMapping
. Puede declarar excepciones en la anotación misma o
mediante un argumento de método si necesita acceder a la instancia de excepción. El siguiente ejemplo declara
una excepción a través de un argumento de método:
@Controller
public class MyController {
// ...
@MessageExceptionHandler
public ApplicationError handleException(MyException exception) {
// ...
return appError;
}
}
Los métodos con la anotación @MessageExceptionHandler
admiten firmas de métodos flexibles y los
mismos tipos de argumentos de método y valores de retorno que métodos con la anotación
@MessageMapping
.
Normalmente, los métodos marcados con la anotación @MessageExceptionHandler
se aplican dentro de una
clase con @Controller
anotación (o jerarquía de clases) en la que se declaran. Si desea que
dichos métodos se apliquen de manera más global (en todos los controladores), puede declararlos en una clase
marcada con la anotación @ControllerAdvice
. Esto es comparable a un soporte similar disponible
en Spring MVC.
Envío de mensajes
¿Qué sucede si necesita enviar mensajes a clientes conectados desde cualquier lugar de la aplicación? Cualquier
componente de la aplicación puede enviar mensajes a través de brokerChannel
. La forma más sencilla
de hacerlo es implementar un SimpMessagingTemplate
y usarlo para enviar mensajes. Normalmente, se
incrusta por tipo, como se muestra en el siguiente ejemplo:
@Controller
public class GreetingController {
private SimpMessagingTemplate template;
@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}
}
Sin embargo, también puede identificarlo por su nombre(brokerMessagingTemplate
) si existe otro bean
del mismo tipo.
Simple Broker
El intermediario de mensajes simple integrado procesa las solicitudes de suscripción de los clientes, las almacena en la memoria y envía mensajes a los clientes conectados con direcciones de destino adecuadas. El agente admite rutas de direcciones de destino, incluidas suscripciones a patrones de direcciones de destino estilo Ant.
Si el programador de tareas está configurado, el intermediario simple admitirá mensajes de
latido del corazón - STOMP. Para configurar el programador, puede declarar su propio bean
TaskScheduler
e instalarlo a través de MessageBrokerRegistry
. Alternativamente, puede usar uno que se declara
automáticamente en la configuración de WebSocket incorporada; sin embargo, luego necesitará la anotación
@Lazy
para evitar bucles entre la configuración de WebSocket incorporada y su
WebSocketMessageBrokerConfigurer
. Por ejemplo:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000})
.setTaskScheduler(this.messageBrokerTaskScheduler);
// ...
}
}
Broker externo
Un broker simple es excelente para comenzar, pero solo admite algunas de las Los comandos STOMP (no admite caracteres de confirmación, reconocimientos y algunas otras funciones), utilizan un bucle de mensajes simple y no son adecuados para agrupaciones. Alternativamente, puede actualizar sus aplicaciones para utilizar un intermediario de mensajes con todas las funciones.
Consulte la documentación de STOMP para el intermediario de mensajes de su elección (por ejemplo, RabbitMQ, ActiveMQ y otros), instale el broker y ejecútelo con el soporte STOMP habilitado. Luego puede habilitar el relé del intermediario STOMP (en lugar de un intermediario simple) en la configuración de Spring.
El siguiente ejemplo de configuración proporciona un intermediario completamente funcional:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
}
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio" />
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
</beans>
El relé del agente STOMP en la configuración anterior es MessageHandler
de Spring, que procesa mensajes reenviándolos a mensajes de un
intermediario externo. Para hacer esto, establece una conexión TCP con el corredor, le reenvía todos los mensajes y
luego reenvía todos los mensajes recibidos del corredor a los clientes a través de sus sesiones WebSocket.
Básicamente, actúa como un "relé" que reenvía mensajes en ambas direcciones.
io.projectreactor.netty:reactor-netty
y io.netty:netty-all
para administrar conexiones
TCP.
Además, los componentes de la aplicación (como métodos para procesar solicitudes HTTP, servicios comerciales y otros) también pueden enviar mensajes al intermediario.
Básicamente, el intermediario proporciona una distribución de mensajes confiable y escalable.
Conexión al intermediario
Broker STOMP El relé
mantiene una única conexión TCP de "sistema" con el intermediario. Esta conexión se utiliza únicamente para mensajes
que se originan en la aplicación del lado del servidor y no para recibir mensajes. Puede configurar las credenciales
STOMP (es decir, los encabezados login
y passcode
en el marco STOMP) para esta conexión.
Esto se expondrá tanto en el espacio de nombres XML como en la configuración de Java como las propiedades systemLogin
y systemPasscode
con los valores predeterminados de guest
y guest
.
El
relé del agente STOMP también crea una conexión TCP separada para cada cliente WebSocket conectado. Puede configurar
las credenciales STOMP que se utilizan para todas las conexiones TCP realizadas en nombre de los clientes. Esto
quedará expuesto tanto en el espacio de nombres XML como en la configuración de Java como las propiedades clientLogin
y clientPasscode
con los valores predeterminados de guest
y invitado
.
login
y
el passcode
encabezados en cada trama CONNECT
que reenvía al corredor en nombre de los
clientes. Por lo tanto, los clientes WebSocket no necesitan configurar estos encabezados. Son ignorados. Los
clientes de WebSocket deben utilizar la autenticación HTTP para proteger el punto final de WebSocket y establecer la
identidad del cliente.
El relé del agente STOMP también envía y recibe mensajes de latido hacia y desde el agente de mensajes a través de la conexión TCP del "sistema". Puede configurar los intervalos para enviar y recibir mensajes de latido (predeterminado 10 segundos). Si se pierde la comunicación con el intermediario, el relé del intermediario seguirá intentando restablecer la conexión cada 5 segundos hasta que falle.
Cualquier Spring Bean puede implementar
ApplicationListener<BrokerAvailabilityEvent>
para recibir notificaciones si la conexión del
"sistema" con el corredor se pierde y luego se restablece. Por ejemplo, el servicio Stock Quote que envía
cotizaciones de acciones puede dejar de intentar enviar mensajes si no hay una conexión de "sistema" activa.
De forma predeterminada, el relé del corredor STOMP siempre se conecta y, opcionalmente, se vuelve a conectar si la conexión se pierde - al mismo host y puerto. Si necesita proporcionar varias direcciones en cada intento de conexión, puede configurar un proveedor de direcciones en lugar de un host y un puerto fijos. El siguiente ejemplo muestra cómo hacer esto:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
También puede configurar un relé de agente STOMP utilizando la propiedad virtualHost
. El valor de
esta propiedad se establece como el encabezado host
de cada trama CONNECT
y puede resultar
útil (por ejemplo, en un entorno de nube donde el host real al que se conecta la conexión TCP se establece es
diferente del host, que proporciona un servicio STOMP en la nube).
Puntos como delimitadores
Si los
mensajes se enrutan a métodos anotados con @MessageMapping
, coinciden con AntPathMatcher
.
De forma predeterminada, se espera que las plantillas utilicen una barra diagonal (/
) como delimitador.
Esta es una buena convención para aplicaciones web, similar a las URL HTTP. Sin embargo, si está más acostumbrado a
las convenciones de mensajería, puede pasar a utilizar un punto (.
) como separador.
El siguiente ejemplo muestra cómo hacer esto usando Java. configuraciones:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
<websocket:stomp-endpoint path="/stomp"/>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
<bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
<constructor-arg index="0" value="."/>
</bean>
</beans>
El controlador podrá entonces utilizar el punto (.
) como separador en los métodos marcados con la
anotación de
@MessageMapping
, como se muestra en el siguiente ejemplo:
@Controller
@MessageMapping("red")
public class RedController {
@MessageMapping("blue.{green}")
public void handleGreen(@DestinationVariable String green) {
// ...
}
}
El cliente ahora puede enviar un mensaje a /app/red.blue.green123
.
En el ejemplo anterior no cambiamos los prefijos para el "retransmisión del intermediario" porque dependen completamente del intermediario de mensajes externo. Consulte las páginas de documentación de STOMP del broker que está utilizando para ver qué convenciones admite para el encabezado de la dirección de destino.
Un "broker simple", por otro lado, se basa
en un PathMatcher
configurado. Por lo tanto, si cambia el separador, ese cambio también se aplicará al
corredor y a cómo el corredor relaciona los destinos del mensaje con los patrones en las suscripciones.
Autenticación
Cada uno La sesión de mensajería STOMP a través de un WebSocket comienza con una solicitud HTTP. Esto podría ser una solicitud para pasar al protocolo WebSocket (es decir, un protocolo de enlace sobre el protocolo WebSocket) o, en el caso de respaldos a través del protocolo SockJS, una serie de solicitudes HTTP a los mecanismos de transferencia para SockJS.
Muchas aplicaciones web ya cuentan con herramientas de autenticación y autorización para proteger las solicitudes HTTP. Normalmente, el usuario se autentica a través de Spring Security mediante algún mecanismo, como una página de inicio de sesión, autenticación básica HTTP u otro método. El contexto de seguridad para el usuario autenticado se almacena en la sesión HTTP y se asocia con solicitudes posteriores en la misma sesión según la cookie.
Por lo tanto, en el caso del protocolo de enlace WebSocket o solicitudes HTTP,
se utilizan mecanismos de transferencia para SockJS como Normalmente, ya existe un usuario autenticado, al que se
puede acceder a través de HttpServletRequest#getUserPrincipal()
. Spring asocia automáticamente a este
usuario con la sesión WebSocket o SockJS creada para él y, posteriormente, con todos los mensajes STOMP enviados a
través de esta sesión a través del encabezado del usuario.
En resumen, una aplicación web típica no necesita
hacer nada más allá de lo que ya hace para garantizar la seguridad. El usuario se autentica en el nivel de solicitud
HTTP mediante un contexto de seguridad que se mantiene en el nivel de sesión HTTP basado en cookies (que luego se
asocia con sesiones WebSocket o SockJS creadas para ese usuario), lo que da como resultado que se agregue un
encabezado de usuario a cada Message
que pasa a través de la aplicación.
El protocolo STOMP
tiene encabezados login
y passcode
en CONNECT
marco. Fueron diseñados
originalmente y son necesarios para usar STOMP sobre TCP. Sin embargo, cuando se utiliza STOMP sobre WebSocket, de
forma predeterminada, Spring ignora los encabezados de autenticación en el nivel del protocolo STOMP y asume que el
usuario ya está autenticado en el nivel del mecanismo de transporte HTTP. Se espera que la sesión WebSocket o SockJS
contenga un usuario autenticado.
Autenticación de token
Proyecto Spring Security OAuthproporciona soporte para seguridad basada en tokens, incluido JSON Web Token (JWT). Puede usarlo como mecanismo de autenticación en aplicaciones web, incluido STOMP sobre WebSocket, como se describe en la sección anterior (es decir, para preservar la identidad a través de una sesión basada en cookies).
Al mismo tiempo, Las cookies basadas en sesiones no siempre son mejores (por ejemplo, en aplicaciones que no admiten una sesión del lado del servidor o en aplicaciones móviles que normalmente usan encabezados para la autenticación).
Protocolo WebSocket, RFC 6455 "no prescribe ninguna forma específica en la que los servidores puedan autenticar a los clientes durante el protocolo de enlace WebSocket". En la práctica, sin embargo, los clientes del navegador sólo pueden utilizar encabezados de autenticación estándar (es decir, autenticación básica HTTP) o cookies, pero no pueden (por ejemplo) proporcionar encabezados personalizados. Asimismo, el cliente JavaScript de SockJS no tiene la capacidad de enviar encabezados HTTP en solicitudes de transporte de SockJS. Consulte sockjs-client problema 196. En su lugar, le permite enviar parámetros de solicitud que puedes usarlo para enviar un token, pero esto tiene sus inconvenientes (por ejemplo, el token puede registrarse accidentalmente junto con la URL en los registros del servidor).
Por lo tanto, para las aplicaciones que necesitan evitar cookies, es posible que no existan buenas alternativas a la autenticación a nivel de protocolo HTTP. En este caso, en lugar de utilizar cookies, puede optar por la autenticación mediante encabezados en el nivel del protocolo de mensajería STOMP. Esto requiere dos pasos simples:
Usar el cliente STOMP para pasar encabezados de autenticación durante la conexión.
Procesar encabezados de autenticación usando
ChannelInterceptor
.
El siguiente ejemplo utiliza la configuración del lado del servidor para registrar un interceptor de
autenticación personalizado. Tenga en cuenta que el interceptor solo necesita autenticarse y configurar el
encabezado del usuario en un Message
del formato CONNECT. Spring marca y almacena al usuario
autenticado y lo asocia con mensajes STOMP posteriores en la misma sesión. El siguiente ejemplo muestra cómo
registrar un interceptor de autenticación personalizado:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message preSend(Message message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Authentication user = ... ; // access authentication header(s)
accessor.setUser(user);
}
return message;
}
});
}
}}
Tenga en cuenta también que cuando utilice la autorización de mensajes de Spring Security, actualmente debe
asegurarse de que la configuración de ChannelInterceptor
para la autenticación sea en orden antes de la
configuración de Spring Security. La mejor manera de hacerlo es declarar un interceptor personalizado en su propia
implementación WebSocketMessageBrokerConfigurer
, que está marcada con la anotación @Order(Ordered.HIGHEST_PRECEDENCE
+ 99)
.
Autorización
Spring Security proporciona WebSocket de autorización de subprotocolo, que utiliza un ChannelInterceptor
para autorizar mensajes según el encabezado de usuario que contienen. Además, Spring Session proporciona integración del
protocolo WebSocket, lo que garantiza que la sesión HTTP del usuario no caducará mientras la sesión de
WebSocket aún esté activa.
Direcciones de destino personalizadas
Una aplicación puede enviar
mensajes dirigidos a un usuario específico y las funciones STOMP de Spring admiten este reconocimiento. direcciones
de destino con el prefijo /user/
. Por ejemplo, un cliente podría suscribirse a la dirección de destino
/user/queue/position-updates
. El UserDestinationMessageHandler
procesa esta dirección de
destino y la traduce en una dirección de destino única para la sesión del usuario (por ejemplo, /queue/position-updates-user123
).
Esto brinda la conveniencia de suscribirse a una dirección de destino común y al mismo tiempo garantiza que no haya
conflictos con otros usuarios suscritos a la misma dirección de destino, de modo que cada usuario pueda recibir
actualizaciones únicas de la posición de las acciones.
UserDestinationMessageHandler
.
En el lado de la transmisión, los mensajes se pueden enviar a una
dirección de destino como /user/{username}/queue/position-updates
, que a su vez es traducido por
UserDestinationMessageHandler
a uno o más destinos, uno para cada sesión asociada con el usuario. Esto
permite que cualquier componente de la aplicación envíe mensajes dirigidos a un usuario específico sin saber nada
más que el nombre del usuario y la dirección de destino general. Esto también se puede hacer usando la anotación y
la plantilla de mensajería.
El método de manejo de mensajes puede enviar mensajes al usuario asociado con el
mensaje que se está procesando usando la anotación @SendToUser
(también compatible a nivel de clase
para un destino general), como se muestra en el siguiente ejemplo:
@Controller
public class PortfolioController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
Si un usuario tiene más de una sesión, de forma predeterminada se dirigen a todas las sesiones suscritas a la
dirección de destino determinada. Sin embargo, a veces es posible que desee centrarse únicamente en la sesión que
envió el mensaje que se está procesando. Puede hacer esto estableciendo el atributo broadcast
en falso,
como se muestra en el siguiente ejemplo:
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// MyBusinessException se lanza aquí
}
@MessageExceptionHandler
@SendToUser(destinations="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
@SendToUser
funciona exactamente
igual que con broadcast=false
(es decir, apunta solo a la sesión que envió el mensaje que se está
procesando).
Puede enviar un mensaje a destinos personalizados desde cualquier componente de la aplicación, por ejemplo,
implementando un SimpMessagingTemplate
generado por una configuración de Java o un espacio de nombres
XML. (El nombre del bean es brokerMessagingTemplate
si es necesario para calificar completamente el
nombre utilizando la anotación @Qualifier
). El siguiente ejemplo muestra cómo hacer esto:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
}
/exchange/amq.direct/position-updates
. En este caso, el
cliente puede suscribirse a /user/exchange/amq.direct/position-updates
. Asimismo, ActiveMQ tiene opciones de
configuración para limpiar destinos inactivos.
En un escenario con múltiples servidores de aplicaciones, la dirección de destino del usuario puede permanecer
sin resolver porque el usuario está conectado a otro servidor. En tales casos, puede configurar la dirección de
destino para transmitir mensajes no resueltos para que otros servidores puedan intentar hacerlo. Esto se puede hacer
usando la propiedad userDestinationBroadcast
del registro MessageBrokerRegistry
en la
configuración de Java y el atributo user-destination-broadcast
del message-broker
elemento code> en XML.
Orden de los mensajes
Los mensajes del broker se publican a través del canal
clientOutboundChannel
, desde donde se se escriben en sesiones de WebSocket. Debido a que el canal está
respaldado por un ThreadPoolExecutor
, los mensajes se procesan en diferentes hilos y la secuencia
resultante recibida por el cliente puede no coincidir con el orden exacto de publicación.
Si este es el caso
Si hay un problema, habilite el flag setPreservePublishOrder
, como se muestra en el siguiente ejemplo:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
}
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker preserve-publish-order="true">
<!-- ... -->
</websocket:message-broker>
</beans>
Si se establece el indicador, los mensajes dentro de la misma sesión del cliente se publican a través
del clientOutboundChannel
en un tiempo, lo que asegura el orden de publicación requerido. Tenga en
cuenta que esto produce un ligero impacto en el rendimiento, por lo que solo debe habilitar este indicador si es
necesario.
Eventos
Se publican varios eventos ApplicationContext
y se pueden obtenido
implementando la interfaz ApplicationListener
para Spring:
BrokerAvailabilityEvent
: indica cuándo el broker pasa a estar disponible o no. Aunque un intermediario "simple" pasa a estar disponible inmediatamente después del inicio y permanece así mientras se ejecuta la aplicación, el "retransmisión de intermediario" STOMP puede perder la conexión con un intermediario con todas las funciones (por ejemplo, cuando se reinicia el intermediario). El relé del intermediario contiene lógica de reconexión y restablece una conexión de "sistema" con el intermediario cuando se reactiva. Como resultado, este evento se publica cada vez que el estado cambia de conectado a desconectado y viceversa. Los componentes que utilizanSimpMessagingTemplate
deben suscribirse a este evento y evitar enviar mensajes mientras el intermediario no esté disponible. En cualquier caso, deben estar preparados para manejarMessageDeliveryException
cuando se envía un mensaje.SessionConnectEvent
: Publicado cuando Se recibe una nueva trama CONNECT del protocolo STOMP para indicar el inicio de una nueva sesión de cliente. El evento contiene un mensaje que representa la conexión, incluido el ID de la sesión, la información del usuario (si corresponde) y los encabezados personalizados enviados por el cliente. Esto es útil para realizar un seguimiento de las sesiones de los clientes. Los componentes que se suscriben a este evento pueden encapsular el mensaje contenido usandoSimpMessageHeaderAccessor
oStompMessageHeaderAccessor
.SessionConnectedEvent
: publicado poco después de unSessionConnectEvent
cuando el intermediario envió una trama CONNECTED del protocolo STOMP en respuesta a una trama CONNECT. En este punto, la sesión STOMP se puede considerar completamente establecida.SessionSubscribeEvent
: se publica cuando se recibe una nueva trama SUBSCRIBE del protocolo STOMP.SessionUnsubscribeEvent
: se publica cuando se recibe una nueva trama de UNSUBSCRIBE del protocolo STOMP.SessionDisconnectEvent
: Publicado al finalizar la sesión STOMP. El marco DISCONNECT puede enviarse desde el cliente o generarse automáticamente cuando se cierra la sesión de WebSocket. En algunos casos, este evento se publica más de una vez por sesión. Los componentes deben ser idempotentes con respecto a múltiples eventos de pérdida de conexión.
Intercepción
Los eventos informan notificaciones sobre el ciclo de vida de una conexión STOMP, pero
no sobre cada mensaje del cliente. Las aplicaciones también pueden registrar un ChannelInterceptor
para
interceptar cualquier mensaje en cualquier parte de la cadena de procesamiento. El siguiente ejemplo muestra cómo
interceptar mensajes entrantes de clientes:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}
Un ChannelInterceptor
personalizado puede usar StompHeaderAccessor
o SimpMessageHeaderAccessor
para acceder a la información del mensaje, como se muestra en el siguiente ejemplo:
public class MyChannelInterceptor implements ChannelInterceptor {
@Override
public Message preSend(Message message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getStompCommand();
// ...
return message;
}
}
Las aplicaciones también pueden implementar ExecutorChannelInterceptor
, que es una subinterfaz de
ChannelInterceptor
con in- Devoluciones de llamada de subprocesos, en las que se procesan los mensajes.
Aunque ChannelInterceptor
se llama una vez por cada mensaje enviado al canal, ExecutorChannelInterceptor
proporciona interceptores en el hilo de cada MessageHandler
que se suscribe a mensajes de el
canal.
Tenga en cuenta que, al igual que con el SessionDisconnectEvent
descrito
anteriormente, se puede recibir un mensaje DISCONNECT del cliente y también se puede generar automáticamente cuando
se cierra la sesión de WebSocket. . En algunos casos, un espía puede interceptar este mensaje más de una vez en cada
sesión. Los componentes deben ser idempotentes con respecto a múltiples eventos de pérdida de conexión.
Cliente STOMP
Spring proporciona un cliente STOMP a través de WebSocket y un cliente STOMP a través de TCP.
Para empezar, puede crear y configurar un WebSocketStompClient
como se muestra en el
siguiente ejemplo:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // para latidos
En el ejemplo anterior, puedes reemplazar StandardWebSocketClient
con SockJsClient
,
ya que también es una implementación de WebSocketClient
. SockJsClient
puede usar WebSocket
o un mecanismo de transporte basado en HTTP como alternativa.
Luego puede establecer una conexión y proporcionar un controlador para la sesión STOMP, como se muestra en el siguiente ejemplo. :
String url = "ws://127.0. 0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
Cuando la sesión esté lista para usarse, se notificará al controlador, como se muestra en el siguiente ejemplo:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
Una vez establecida la sesión, puedes enviar cualquier carga útil que se serializa usando MessageConverter
configurado, como se muestra en el siguiente ejemplo:
session.send("/topic/something", "payload");
También puede suscribirse a direcciones de destino. Los métodos subscribe
requieren un
controlador de mensajes para la suscripción y devuelven un identificador Subscription
que puede
utilizar para cancelar la suscripción. Para cada mensaje recibido, el controlador puede especificar un tipo de
destino Object
en el que se debe deserializar la carga útil, como se muestra en el siguiente ejemplo:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
Para habilitar los mensajes de latido STOMP, puede configurar WebSocketStompClient
usando TaskScheduler
y, opcionalmente, configurar intervalos de transmisión de mensajes de latido (10 segundos sin actividad de
escritura, lo que provocará que se envíe un mensaje de latido). se enviará y 10 segundos si no hay actividad de
lectura, lo que provocará que se cierre la conexión).
WebSocketStompClient
solo envía un mensaje de latido si no hay actividad, es decir. cuando no se envían
otros mensajes. Esto puede ser un problema cuando se utiliza un intermediario externo porque los mensajes con un
destino que no es un intermediario reflejan actividad pero en realidad no se reenvían al intermediario. En este
caso, puede configurar TaskScheduler
al inicializar el broker externo, lo que garantiza que el mensaje
de latido se reenvíe al broker también si solo se envían mensajes con una dirección de destino que no sea del
broker.
WebSocketStompClient
para pruebas de
rendimiento para simular miles de clientes desde una sola máquina, considere deshabilitar los mensajes de latido, ya
que cada conexión programa su propios latidos -tareas, y no hay optimización para este proceso para una gran
cantidad de clientes que se ejecutan en la misma máquina.
El protocolo STOMP también brinda soporte para acuse de recibo si el cliente debe agregar un
receipt
encabezado al que el servidor responde con un marco RECEIPT después de procesar el envío o la
suscripción. Para brindar este soporte, StompSession
ofrece setAutoReceipt(boolean)
,
lo que garantiza que el encabezado receipt
se agregue en cada evento de envío o suscripción
posterior. También puede agregar manualmente un encabezado de acuse de recibo a StompHeaders
. Tanto
el envío como la suscripción devuelven una instancia Receiptable
que se puede utilizar para
registrar la recepción de devoluciones de llamadas exitosas y fallidas. Esta característica requiere que el
cliente esté configurado para usar TaskScheduler
y establecer la cantidad de tiempo antes de que
expire el acuse de recibo (el valor predeterminado es 15 segundos).
Tenga en cuenta que StompSessionHandler
es en sí mismo un StompFrameHandler
, lo que
le permite manejar marcos de ERROR además de la devolución de llamada handleException
destinada a las
excepciones lanzadas durante el procesamiento de mensajes y handleTransportError
apuntar a errores a
nivel de transmisión, incluida ConnectionLostException
.
Accesibilidad del protocolo WebSocket
Cada sesión de WebSocket tiene un mapa de atributos. El mapa está vinculado como encabezado a los mensajes entrantes del cliente y se puede acceder a él desde un método de controlador, como se muestra en el siguiente ejemplo:
@Controller
public class MyController {
@MessageMapping("/action")
public void handle(SimpMessageHeaderAccessor headerAccessor) {
Map<String, Object> attrs = headerAccessor.getSessionAttributes();
// ...
}
}
Puede declarar un bean administrado Spring en el alcance de disponibilidad websocket
.
Puede inyectar beans con ámbito WebSocket en controladores y en cualquier enlace de canal registrado a través de
clientInboundChannel
. Como regla general, son singletons y su ciclo de vida es más largo que el de cada
sesión individual de WebSocket. Por lo tanto, los beans que forman parte de un alcance de accesibilidad de WebSocket
deben usar el modo proxy para el alcance de accesibilidad, como se muestra en el siguiente ejemplo:
@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
@PostConstruct
public void init() {
// Llamado después de la inyección de dependencia
}
// ...
@PreDestroy
public void destroy() {
// Llamado después de la inyección de dependencia
}
}
@Controller
public class MyController {
private final MyBean myBean;
@Autowired
public MyController(MyBean myBean) {
this.myBean = myBean;
}
@MessageMapping("/action")
public void handle() {
// this.myBean de la sesión actual de WebSocket
}
}
Como cualquier alcance personalizado disponibilidad, Spring inicializa una nueva instancia de
MyBean
la primera vez que se accede a ella desde el controlador y la almacena en los atributos de sesión de WebSocket.
Posteriormente se devuelve la misma copia antes del final de la sesión. Para los beans dentro del alcance de
accesibilidad de WebSocket, se llaman a todos los métodos del ciclo de vida de Spring, como se muestra en los
ejemplos anteriores.
Rendimiento
Cuando se trata de rendimiento, no existe una única solución adecuada. Se ve afectado por muchos factores, incluido el tamaño y el volumen de los mensajes, si la aplicación está realizando un trabajo que requiere bloqueo y factores externos (como la velocidad de la red y otros detalles). El propósito de esta sección es describir brevemente las opciones de configuración disponibles y brindar algunas ideas y debates sobre el escalado.
En una aplicación de mensajería, los mensajes pasan a través de canales para la ejecución asincrónica, que son compatibles con grupos de subprocesos. . Configurar una aplicación de este tipo requiere una buena comprensión de cómo funcionan los canales y el flujo de mensajes.
El lugar obvio para
comenzar es configurar grupos de subprocesos que admitan clientInboundChannel
y clientOutboundChannel
.
De forma predeterminada, ambas configuraciones están configuradas para duplicar el número de
procesadores disponibles.
Si el procesamiento de mensajes en los métodos anotados depende principalmente
del procesador, el número de subprocesos para clientInboundChannel
debe permanecer cercano al número de
procesadores. Si el trabajo que realizan requiere más E/S y requiere bloqueo o espera en una base de datos u otro
sistema externo, probablemente sea necesario aumentar el tamaño del grupo de subprocesos.
ThreadPoolExecutor
tiene tres propiedades importantes: el
tamaño del grupo de subprocesos principal, el tamaño máximo del grupo de subprocesos y la capacidad de la cola para
contener tareas para las que no hay subprocesos libres.
A menudo surge confusión: la configuración del tamaño del grupo principal (por ejemplo, 10) y el tamaño máximo del grupo (por ejemplo, 20) da como resultado un grupo de subprocesos con entre 10 y 20 subprocesos. De hecho, si deja el valor de capacidad predeterminado de Integer.MAX_VALUE, el grupo de subprocesos nunca excederá el tamaño del grupo principal, ya que todas las tareas adicionales están en cola.
Consulte el javadoc en ThreadPoolExecutor
, para aprender cómo funcionan estas propiedades y
familiarizarse con diferentes estrategias de cola.
El lado clientOutboundChannel
trata
de enviar mensajes a clientes WebSocket. Si los clientes están en una red rápida, la cantidad de subprocesos debe
permanecer cercana a la cantidad de procesadores disponibles. Si son lentos o tienen un rendimiento bajo, tardan más
en consumir mensajes y ejercen presión sobre el grupo de subprocesos. Por lo tanto, es necesario aumentar el tamaño
del grupo de subprocesos.
Si bien la carga de trabajo para el canal clientInboundChannel
se
puede predecir (después de todo, depende de lo que esté haciendo la aplicación). configurar el clientOutboundChannel
es más difícil, ya que su funcionamiento se basa en factores independientes de la aplicación. Por este motivo, hay
dos propiedades adicionales asociadas con el envío de mensajes: sendTimeLimit
y sendBufferSizeLimit
.
Puede utilizar estos métodos para configurar la duración del envío y la cantidad de datos que se almacenarán en el
búfer al enviar mensajes al cliente.
La idea general es que solo se puede utilizar un hilo para enviar al cliente en cualquier momento. . Mientras tanto, todos los mensajes adicionales se almacenan en el búfer y puede utilizar estas propiedades para decidir cuánto tiempo debe tardar en enviarse un mensaje y cuántos datos se pueden almacenar en el búfer durante ese tiempo. Consulte la documentación del esquema javadoc y XML para obtener detalles adicionales importantes.
El siguiente ejemplo muestra una posible configuración:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
}
// ...
}
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport send-timeout="15000" send-buffer-size="524288" />
<!-- ... -->
</websocket:message-broker>
</beans>
También puede utilizar la configuración del mecanismo de transporte WebSocket que se mostró anteriormente para configurar el tamaño máximo permitido de los mensajes STOMP entrantes. En teoría, el tamaño de un mensaje WebSocket puede ser prácticamente ilimitado. En la práctica, los servidores WebSocket establecen límites, por ejemplo, 8 KB para Tomcat y 64 KB para Jetty. Por este motivo, los clientes STOMP (como webstomp-client de JavaScript y otros) rompen mensajes STOMP grandes, cuando alcanza un tamaño de 16 KB y los envía como múltiples mensajes WebSocket, lo que requiere que el servidor los almacene en buffer y los vuelva a ensamblar.
El soporte STOMP sobre WebSocket de Spring hace esto para que las aplicaciones puedan configurar el tamaño máximo de los mensajes STOMP independientemente del mensaje, tamaños específicos del servidor WebSocket. Recuerde que el tamaño del mensaje WebSocket se ajusta automáticamente según sea necesario para garantizar que los mensajes WebSocket se envíen con un tamaño mínimo de 16 KB.
El siguiente ejemplo muestra una configuración posible:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024);
}
// ...
}
El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport message-size="131072" />
<!-- ... -->
</websocket:message-broker>
</beans>
Un punto importante en el escalado es el uso de múltiples instancias de aplicación. Actualmente no es posible hacer esto con un simple corredor. Sin embargo, cuando se utiliza un intermediario con todas las funciones (como RabbitMQ), cada instancia de la aplicación está conectada al intermediario y los mensajes enviados por una instancia de la aplicación se pueden enviar a través del intermediario a los clientes WebSocket conectados a través de cualquier otra instancia de la aplicación.
Monitoreo
Si utiliza la anotación @EnableWebSocketMessageBroker
o <websocket:message-broker>
, los componentes clave de la infraestructura recopilan
automáticamente estadísticas y contadores que proporcionan información importante sobre las aplicaciones de estado
interno. La configuración también declara un bean de tipo WebSocketMessageBrokerStats
, que recopila
toda la información disponible en un solo lugar y, de forma predeterminada, la registra en el nivel
INFO
una vez cada 30 minutos. Este bean se puede exportar a JMX a través de MBeanExporter
desde Spring para verlo en tiempo de ejecución (por ejemplo, a través de jconsole
desde JDK). La
siguiente lista proporciona un resumen:
- Sesiones de cliente WebSocket
-
- Actual
Muestra cuántas sesiones de cliente existen actualmente, con un desglose adicional por transmisión de WebSocket versus HTTP y sesiones de sondeo a través del protocolo SockJS.
- Total
Indica cuántas sesiones totales se establecieron.
- Cerrado por emergencia
-
- Errores de conexión
Sesiones que se establecieron pero se cerraron después de que no se recibieron mensajes durante 60 segundos . Esto generalmente indica problemas con el servidor proxy o la red.
- Límite de envío excedido
Las sesiones se cierran después de exceder el tiempo de espera de envío configurado o el envío límite de búfer, lo que puede ocurrir con clientes lentos (consulte la sección anterior).
- Errores de transporte
Las sesiones se cierran después de un error de transmisión , como que la conexión WebSocket o la solicitud o respuesta HTTP no se puedan leer o escribir.
- Marcos STOMP
El número total de tramas CONNECT, CONNECTED y DISCONNECT procesadas, lo que indica cuántos clientes se conectaron en el nivel STOMP. Tenga en cuenta que el contador de tramas DISCONNECT puede ser inferior si las sesiones se cierran de forma anormal o si los clientes se cierran sin enviar una trama DISCONNECT.
- STOMP Broker Relay
-
- Conexiones TCP
Indica cuántas conexiones TCP hay En nombre del cliente, las sesiones WebSocket se establecen con el corredor. El valor debe ser igual al número de sesiones de WebSocket del cliente + 1 conexión de "sistema" general adicional para enviar mensajes desde la aplicación.
- STOMP Frames
El número total de tramas CONNECT, CONNECTED y DISCONNECT enviadas o recibidas del corredor en nombre de los clientes. Tenga en cuenta que la trama DISCONNECT se envía al intermediario independientemente de cómo se cerró la sesión del WebSocket del cliente. Por lo tanto, menos tramas DISCONNECT son una señal de que el intermediario está cerrando conexiones activamente (quizás debido a un latido inoportuno, una trama de entrada no válida u otro problema).
- Canal de entrada del cliente
Estadísticas del grupo de subprocesos que admite
clientInboundChannel
que proporcionan información sobre el estado de procesamiento de los mensajes entrantes. Las tareas en cola son una señal de que la aplicación puede estar procesando mensajes demasiado lentamente. Si tiene tareas que requieren un uso intensivo de E/S (como consultas lentas a bases de datos, solicitudes HTTP a una API REST de terceros, etc.), considere aumentar el tamaño del grupo de subprocesos.- Canal de salida del cliente
Estadísticas del grupo de subprocesos que respaldan el
clientOutboundChannel
que brindan información sobre el estado de las transmisiones de mensajes a los clientes. Las tareas en cola son una señal de que los clientes tardan demasiado en aceptar mensajes. Una forma de resolver este problema es aumentar el tamaño del grupo de subprocesos para dar cabida a la cantidad esperada de clientes lentos simultáneos. Otra opción es reducir los límites de tiempo de espera de envío y tamaño del búfer de envío (consulte la sección anterior).- Programador de tareas SockJS
Estadísticas del grupo de subprocesos del programador de tareas SockJS, que se utiliza para enviar mensajes de latido. Tenga en cuenta que al negociar mensajes de latido en el nivel STOMP, el envío de mensajes de latido para SockJS está deshabilitado.
Pruebas
Hay dos enfoque principal para las pruebas de aplicaciones cuando se utiliza el soporte STOMP de Spring sobre WebSocket. La primera es escribir pruebas del lado del servidor para probar la funcionalidad de los controladores y sus métodos de manejo de mensajes anotados. El segundo es escribir pruebas completas de un extremo a otro que incluyan la ejecución tanto del cliente como del servidor.
Los dos enfoques no son mutuamente excluyentes. Al contrario, cada uno de ellos tiene su propio lugar en la estrategia general de pruebas. Las pruebas del lado del servidor están más enfocadas y son más fáciles de escribir y mantener. Por otro lado, las pruebas de integración de un extremo a otro son más completas y cubren mucho más, pero también son más complejas de escribir y mantener.
La forma más sencilla de prueba del lado del servidor es escribir el controlador, pruebas unitarias. Sin embargo, esto no es lo suficientemente práctico porque mucho de lo que hace el controlador depende de sus anotaciones. Las pruebas unitarias puras simplemente no probarán cómo funciona.
Lo ideal es que los controladores bajo prueba se llamen tal como están durante la ejecución del programa, similar al enfoque para probar controladores que manejan solicitudes HTTP utilizando Spring MVC Test, framework, es decir, sin ejecutar un contenedor de servlets, pero usando Spring Framework para llamar a controladores anotados. Al igual que con la prueba Spring MVC, aquí tiene dos alternativas posibles: usar una configuración "específica del contexto" o "independiente":
Cargue la configuración Spring real usando el marco Spring TestContext , implemente
clientInboundChannel
como campo de prueba y utilícelo para enviar mensajes que serán procesados por los métodos del controlador.Instale manualmente la infraestructura mínima del marco Spring necesario para llamar a los controladores (es decir,
SimpAnnotationMethodMessageHandler
) y pasarle mensajes para los controladores directamente.
Ambos escenarios de configuración se demuestran en la aplicación de muestra pruebas para acciones portfolio.
El segundo enfoque es crear pruebas de integración de un extremo a otro. Para hacer esto, debe ejecutar el servidor WebSocket en modo integrado y conectarse a él como un cliente WebSocket que envía mensajes WebSocket que contienen fotogramas STOMP. Pruebas de aplicación de muestra Stock Portfoliotambién adopta este enfoque, utilizando Tomcat como servidor WebSocket integrado y un cliente STOMP simple para fines de prueba.
GO TO FULL VERSION