El protocolo WebSocket define dos tipos de mensajes (de texto y binarios), pero su contenido no está definido. El protocolo define un mecanismo para que el cliente y el servidor acuerden un subprotocolo (es decir, un protocolo de mensajería de nivel superior) que se utilizará además de WebSocket para determinar qué mensajes puede enviar cada uno, cuál es su formato y el contenido de cada uno, mensaje, etcétera. El uso de un subprotocolo es opcional, pero en cualquier caso se debe acordar algún protocolo entre cliente y servidor que defina el contenido del mensaje.

Breve descripción

Protocolo STOMP (Protocolo simple de mensajería orientada a texto) se creó originalmente para lenguajes de secuencias de comandos ( como Ruby, Python y Perl) para conectarse a intermediarios de mensajes corporativos. Está diseñado para funcionar con un subconjunto simple de patrones de mensajería comúnmente utilizados. STOMP se puede utilizar sobre cualquier protocolo de red de transmisión bidireccional confiable, como TCP y WebSocket. Aunque STOMP es un protocolo orientado a texto, la carga útil del mensaje puede ser en formato texto o binario.

STOMP es un protocolo basado en marcos cuyos marcos se modelan a partir de HTTP. El siguiente listado muestra la estructura del marco STOMP:

COMMAND
header1:value1
header2:value2
Body^@

Los clientes pueden usar los comandos SEND o SUBSCRIBE para enviar o suscribirse a mensajes junto con un de encabezado destino, que describe de qué trata el mensaje y quién debe recibirlo. Esto le permite crear un mecanismo de publicación-suscripción simple que puede usarse para enviar mensajes a través del intermediario a otros clientes conectados, o para enviar mensajes al servidor solicitando que se realice un trabajo específico.

Si es así Al utilizar el soporte del protocolo STOMP de Spring, una aplicación WebSocket en Spring actúa como un intermediario STOMP para los clientes. Los mensajes se enrutan a métodos de procesamiento de mensajes marcados con la anotación @Controller, o a un simple intermediario en memoria que rastrea las suscripciones y distribuye mensajes a los usuarios suscritos. También puede configurar Spring para que funcione con un agente STOM especial (como RabbitMQ, ActiveMQ y otros) para transmitir mensajes. En este caso, Spring admite el establecimiento de conexiones TCP con el intermediario, le transmite mensajes y reenvía mensajes desde él a los clientes WebSocket conectados. De esta manera, las aplicaciones web Spring pueden utilizar seguridad unificada basada en HTTP, validación común y un modelo de programación familiar para el procesamiento de mensajes.

El siguiente ejemplo muestra un cliente que se suscribe para recibir cotizaciones de acciones que el servidor puede generar. periódicamente (por ejemplo, a través de una tarea programada que envía mensajes a través de SimpMessagingTemplate al broker):

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@

El siguiente ejemplo muestra una cliente que envía una solicitud comercial que el servidor puede procesar usando un método con la anotación @MessageMapping:

SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@

Una vez ejecutado, el servidor puede enviar un mensaje de confirmación de la transacción e información detallada al cliente.

El significado del La dirección de destino se deja intencionalmente opaca en la especificación STOMP. Puede ser cualquier cadena y los servidores STOMP definen completamente la semántica y la sintaxis de las direcciones de destino que admiten. Sin embargo, muy a menudo los destinos son cadenas como rutas, donde /topic/.. implica publicación-suscripción (uno a muchos) y /queue/ implica mensajería". a punto" (uno a uno).

Los servidores STOMP pueden usar el comando MESSAGE para enviar mensajes a todos los suscriptores. El siguiente ejemplo muestra el servidor enviando una cotización de acciones a un cliente suscrito:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@

El servidor no puede enviar mensajes no solicitados. Todos los mensajes del servidor deben ser en respuesta a una suscripción de cliente específica, y el encabezado subscription-id del mensaje del servidor debe coincidir con el encabezado id de la suscripción del cliente.

Anterior breve Esta descripción tiene como objetivo proporcionar una comprensión muy básica del protocolo STOMP. Le recomendamos que lea la especificación del protocolo completa.

Ventajas

El uso de STOMP como subprotocolo permite que Spring Framework y Spring Security proporcionen un modelo de programación más rico que el uso de WebSockets sin formato del protocolo WebSocket. Lo mismo se puede decir acerca de comparar HTTP con TCP sin formato y cómo esto permite que Spring MVC y otros marcos web proporcionen una funcionalidad rica. A continuación se muestra una lista de ventajas:

  • No es necesario inventar un protocolo de mensajería ni un formato de mensaje personalizados.

  • Clientes STOMP, incluidos un cliente Java, están disponibles en Spring Framework.

  • Puede (opcionalmente) utilizar intermediarios de mensajes (como RabbitMQ, ActiveMQ y otros) para administrar suscripciones y transmitir mensajes.

  • La lógica de la aplicación se puede organizar en cualquier número de instancias @Controller y los mensajes se pueden enrutar a ellas según el encabezado de la dirección de destino de STOMP, como en lugar de procesar mensajes WebSocket sin procesar usando un único WebSocketHandler para una conexión determinada.

  • Puedes usar Spring Security para proteger mensajes basados en destinos y mensajes STOMP. tipos.

Activación de STOMP

La compatibilidad con STOMP a través de WebSocket está disponible en spring-messaging y módulos spring-websocket. Una vez que tenga estas dependencias implementadas, puede exponer los puntos finales STOMP a través de WebSocket usando un respaldo a través del protocolo SockJS, como se muestra en el siguiente ejemplo:


import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS(); 
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app"); 
        config.enableSimpleBroker("/topic", "/queue"); 
    }
}
  1. /portfolio es la URL HTTP para el punto final al que el cliente WebSocket (o SockJS) debe conectarse para confirmar el protocolo de enlace de WebSocket.
  2. Mensajes STOMP cuyo encabezado de destino comienza con /app se enrutan a métodos anotados con @MessageMapping en clases con la anotación @Controller.
  3. Usamos el intermediario de mensajes integrado para suscripción y difusión y envío de mensajes cuyo encabezado de destino comience con /topic ` o `/queue al intermediario.

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>
</beans>
En el caso de un intermediario simple integrado, los prefijos son /topic y /queue no tienen ningún significado especial. Son simplemente una convención para diferenciar entre mensajería editor-suscriptor y de igual a igual (es decir, entre muchos suscriptores y un solo consumidor). Si está utilizando un broker externo, consulte la página STOMP del broker para comprender qué destinos y prefijos STOMP admite.

Para conectarse desde el navegador, cuando utilice SockJS, puede utilizar sockjs-client. Para STOMP, muchas aplicaciones utilizaron la biblioteca (también conocida como stomp.js). , que es completamente funcional y se ha utilizado en producción durante muchos años, pero ya no cuenta con soporte. Actualmente, JSteunou/webstomp-client es el sucesor de esta biblioteca que se mantiene y desarrolla más activamente. El siguiente ejemplo de código se basa en él:

 
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}

Además, si la conexión se realiza a través de WebSocket (sin SockJS), entonces puede utilice el siguiente código:


var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}

Tenga en cuenta que en el ejemplo anterior de stompClient, no necesita especificar los encabezados login y passcode. Incluso si lo hiciera, serían ignorados (o más bien anulados) en el lado del servidor.

Para obtener más ejemplos de código, consulte:

Servidor WebSocket

Para configurar un servidor WebSocket básico, información de la sección "Servidor" es la configuración aplicable". Sin embargo, en el caso de Jetty, debe configurar HandshakeHandler y WebSocketPolicy mediante StompEndpointRegistry:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
    }
    @Bean
    public DefaultHandshakeHandler handshakeHandler() {
        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);
        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }
}

Flujo de mensajes

Si el punto final STOMP está abierto, la aplicación Spring se convierte en un intermediario STOMP para los clientes conectados. Esta sección describe el flujo de mensajes del lado del servidor.

El módulo spring-messaging proporciona soporte fundamental para las aplicaciones de mensajería que se originaron en el proyecto Integración de Spring y luego se extrajeron e incorporaron en Spring Framework para un uso más amplio en muchos Proyectos de primavera y escenarios de aplicación. La siguiente lista describe brevemente algunas de las abstracciones de mensajería disponibles:

  • Mensaje: una representación simple del mensaje, incluidos encabezados y carga útil.

  • MessageHandler: Acuerdo de procesamiento de mensajes.

  • MessageChannel: una convención de envío de mensajes que permite la libre comunicación entre remitentes y destinatarios.

  • SubscribableChannel: MessageChannel con suscriptores de MessageHandler.

  • ExecutorSubscribableChannel: SubscribableChannel que usa Executor para entregar mensajes.

Tanto la configuración de Java (es decir, la anotación @EnableWebSocketMessageBroker) como la configuración del espacio de nombres XML (que Esto es <websocket:message-broker>) utiliza los componentes anteriores para ensamblar el flujo de trabajo de paso de mensajes. El siguiente diagrama muestra los componentes utilizados al activar un intermediario de mensajes integrado simple:

El diagrama anterior muestra tres canales de mensajes:

  • clientInboundChannel: Para transmitir mensajes recibidos de clientes WebSocket.

  • clientOutboundChannel: Para enviar servidor mensajes del lado del cliente WebSocket.

  • brokerChannel: para enviar mensajes al intermediario de mensajes desde el código de la aplicación del lado del servidor.

El siguiente diagrama muestra los componentes utilizados si un agente externo (como RabbitMQ) está configurado para administrar suscripciones y transmisiones de mensajes:

La principal diferencia entre los dos esquemas anteriores es utilizar una "retransmisión de intermediario" para pasar mensajes a un intermediario STOMP externo a través de TCP y transmitir mensajes desde el intermediario a los clientes suscriptores.

Si los mensajes son recibidos de una conexión WebSocket, se decodifican en marcos STOMP, se convierten en una representación Message de Spring y se envían a lo largo del clientInboundChannel para su posterior procesamiento. Por ejemplo, los mensajes STOMP cuyos encabezados de destino comienzan con /app se pueden enrutar a métodos con la anotación @MessageMapping en controladores anotados, mientras que los mensajes /topic y /queue se pueden enviar directamente al intermediario de mensajes.

Un @Controller anotado que maneja el mensaje STOMP del cliente puede enviar un mensaje al intermediario de mensajes a través de brokerChannel, y el intermediario transmite el mensaje a los suscriptores apropiados a través de clientOutboundChannel. El mismo controlador puede hacer lo mismo en respuesta a solicitudes HTTP, por lo que el cliente puede realizar un método HTTP POST y luego el método con la anotación @PostMapping puede enviar un mensaje al intermediario de mensajes para distribuirloel resto de la asignación suscriptores.

Podemos seguir este proceso con un ejemplo sencillo. Considere el siguiente ejemplo en el que se configura el servidor:

 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }
}
@Controller
public class GreetingController {
    @MessageMapping("/greeting")
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }
}

El ejemplo anterior admite el siguiente flujo:

  1. El cliente se conecta a http://localhost:8080/portfolio y tan pronto como se establece la conexión WebSocket, comienzan los fotogramas STOMP para ser transmitido a través de él.

  2. El cliente envía una trama SUBSCRIBE con un encabezado de destino /topic/greeting. Una vez recibido y decodificado, el mensaje se envía al clientInboundChannel y luego se enruta al intermediario de mensajes, que almacena la suscripción del cliente.

  3. El cliente envía un ENVIAR marco a /app/greeting. El prefijo /app ayuda a enrutarlo a los controladores anotados. Después de eliminar el prefijo /app, el resto de la asignación /greeting se asigna a un método anotado con @MessageMapping en GreetingController.

  4. El valor devuelto por GreetingController se convierte en un Mensaje de Spring con una carga útil basada en el valor de retorno y un encabezado de destino por defecto /topic/greeting (derivado del destino de entrada, reemplazando /app con /topic). El mensaje recibido se envía al brokerChannel y el intermediario de mensajes lo procesa.

  5. El intermediario de mensajes encuentra todos los suscriptores elegibles y les envía un MENSAJE a cada uno de ellos. marco en el clientOutboundChannel, desde donde los mensajes se codifican como marcos STOMP y se envían a través de una conexión WebSocket.

La siguiente sección contiene más detalles sobre las anotaciones. métodos, incluidos los tipos de argumentos admitidos y los valores de retorno.

Controladores anotados

Las aplicaciones pueden usar clases marcadas con la anotación @Controller para procesar mensajes de los clientes. Estas clases pueden declarar métodos con las anotaciones @MessageMapping, @SubscribeMapping y @ExceptionHandler, como se describe en los siguientes temas:

  • @MessageMapping

  • @SubscribeMapping

  • @MessageExceptionHandler

@MessageMapping

Puedes usar el anotación@MessageMapping para anotar métodos que enrutan mensajes según su destino. Se admite tanto a nivel de método como a nivel de tipo. A nivel de tipo, la anotación @MessageMapping se utiliza para expresar asignaciones comunes para todos los métodos del controlador.

De forma predeterminada, los valores de asignación son patrones de ruta estilo Ant (por ejemplo , /thing* , /thing/**), incluida la compatibilidad con variables de plantilla (por ejemplo, /thing/{id}). Se puede hacer referencia a los valores a través de los argumentos de un método marcado con la anotación @DestinationVariable. Las aplicaciones también pueden cambiar a una convención de destino: separar el mensaje para las asignaciones.

Argumentos de métodos admitidos

La siguiente tabla describe los argumentos de los métodos:

Argumento del método Descripción

Mensaje

Proporciona acceso al mensaje completo .

MessageHeaders

Proporciona acceso a encabezados dentro de un Mensaje.

MessageHeaderAccessor, SimpMessageHeaderAccessor y StompHeaderAccessor

Proporciona acceso a los encabezados mediante métodos de acceso escritos.

@Payload

Proporciona acceso a la carga útil del mensaje convertida (por ejemplo, desde JSON) utilizando el MessageConverter configurado.

Esta anotación es opcional, ya que se proporciona de forma predeterminada si no coincide ningún otro argumento.

Puede anotar argumentos de carga útil con @javax.validation.Valid o @Validated de Spring para que los argumentos de la carga útil se validen automáticamente.

@Header

Proporciona acceso a un valor de encabezado específico, junto con una conversión de tipo usando org.springframework.core.convert.converter.Converter, si esto necesario.

@Headers

Proporciona acceso a todos encabezados en el mensaje. Este argumento debe poder asignarse a java.util.Map.

@DestinationVariable

Proporciona acceso a las variables de plantilla extraídas del destino del mensaje. Los valores se convierten si es necesario según el tipo declarado del argumento del método.

java.security.Principal

Refleja el usuario que inició sesión durante el protocolo de enlace de WebSocket a través de HTTP.

Volver Valores

De forma predeterminada, el valor de retorno de un método marcado con la anotación @MessageMapping se serializa en una carga útil a través del MessageConverter correspondiente y se envía como un Message a través de brokerChannel, desde donde se envía a los suscriptores. El propósito de un mensaje saliente es el mismo que el de un mensaje entrante, pero con el prefijo /topic.

Puedes usar @SendTo y anotaciones @SendToUser para configurar el destino del mensaje de salida. La anotación @SendTo se utiliza para configurar un destino o para especificar varios destinos. La anotación @SendToUser se usa para dirigir el mensaje de salida solo al usuario asociado con el mensaje de entrada.

Puede usar @SendTo y @SendToUser al mismo tiempo code> en el mismo método, ambos métodos son compatibles a nivel de clase, en cuyo caso serán los predeterminados para los métodos de la clase. Sin embargo, recuerde que cualquier anotación con @SendTo o @SendToUser a nivel de método anula cualquier anotación similar a nivel de clase.

Los mensajes se pueden procesar de forma asíncrona, y un método con la anotación @MessageMapping puede devolver un ListenableFuture, CompletableFuture o CompletionStage.

Nota Tenga en cuenta que las anotaciones @SendTo y @SendToUser son simplemente una medida de conveniencia equivalente a usar SimpMessagingTemplate para enviar mensajes. Si es necesario, en escenarios más complejos, los métodos marcados con la anotación @MessageMapping pueden recurrir a SimpMessagingTemplate directamente. Esto puede ocurrir en lugar de, o quizás además de, devolver un valor.

@SubscribeMapping

La anotación @SubscribeMapping es lo mismo que @MessageMapping, pero limita la asignación a solo mensajes de suscripción. Admite los mismos argumentos de método que la anotación @MessageMapping. Sin embargo, en el caso de un valor de retorno, de forma predeterminada, el mensaje se envía directamente al cliente (a través de clientOutboundChannel, en respuesta a una suscripción) en lugar de al intermediario (a través de brokerChannel, como una transmisión a las suscripciones correspondientes). Agregar una anotación @SendTo o una anotación @SendToUser anula esta lógica y envía mensajes al intermediario.

¿Cuándo resulta práctico? Supongamos que el intermediario está asignado a /topic y /queue, y que los controladores de la aplicación están asignados a /app. Con esta configuración, el corredor almacena todas las suscripciones a /topic y /queue para envíos de correo repetidos, y la aplicación no necesita participar en esto. El cliente también puede suscribirse a alguna dirección de destino /app y el controlador puede devolver un valor en respuesta a esa suscripción sin la participación del intermediario, sin almacenar ni reutilizar la suscripción (de hecho, una solicitud-respuesta única). intercambio) ). Un caso de uso para este mecanismo es completar la interfaz de usuario con datos iniciales al inicio.

¿Cuándo no resulta práctico? No intente asignar el agente y los controladores al mismo prefijo de dirección de destino a menos que por alguna razón necesite que ambos procesen mensajes, incluidas las suscripciones, de forma independiente. Los mensajes entrantes se procesan en paralelo. No hay garantías de si el corredor o el controlador procesará este mensaje primero. Si el objetivo es recibir una notificación de que la suscripción se ha guardado y está lista para transmitirse, el cliente debe solicitar un acuse de recibo si el servidor lo admite (un simple broker no lo hace). Por ejemplo, utilizando el cliente STOMP en Java, puede hacer lo siguiente para agregar un acuse de recibo:


@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// Durante la inicialización...
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// Al suscribirse...
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
     // Suscripción lista...
});

En el lado del servidor, puede registrar ExecutorChannelInterceptor para brokerChannel e implementar el método afterMessageHandled, que se llama después de que se hayan procesado los mensajes, incluidas las suscripciones.

@MessageExceptionHandler

Una aplicación puede utilizar métodos anotados con @MessageExceptionHandler para manejar excepciones de métodos anotados con @MessageMapping. Puede declarar excepciones en la anotación misma o mediante un argumento de método si necesita acceder a la instancia de excepción. El siguiente ejemplo declara una excepción a través de un argumento de método:

 
@Controller
public class MyController {
    // ...
    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

Los métodos con la anotación @MessageExceptionHandler admiten firmas de métodos flexibles y los mismos tipos de argumentos de método y valores de retorno que métodos con la anotación @MessageMapping.

Normalmente, los métodos marcados con la anotación @MessageExceptionHandler se aplican dentro de una clase con @Controller anotación (o jerarquía de clases) en la que se declaran. Si desea que dichos métodos se apliquen de manera más global (en todos los controladores), puede declararlos en una clase marcada con la anotación @ControllerAdvice. Esto es comparable a un soporte similar disponible en Spring MVC.

Envío de mensajes

¿Qué sucede si necesita enviar mensajes a clientes conectados desde cualquier lugar de la aplicación? Cualquier componente de la aplicación puede enviar mensajes a través de brokerChannel. La forma más sencilla de hacerlo es implementar un SimpMessagingTemplate y usarlo para enviar mensajes. Normalmente, se incrusta por tipo, como se muestra en el siguiente ejemplo:


@Controller
public class GreetingController {
    private SimpMessagingTemplate template;
    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }
    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }
}

Sin embargo, también puede identificarlo por su nombre(brokerMessagingTemplate) si existe otro bean del mismo tipo.

Simple Broker

El intermediario de mensajes simple integrado procesa las solicitudes de suscripción de los clientes, las almacena en la memoria y envía mensajes a los clientes conectados con direcciones de destino adecuadas. El agente admite rutas de direcciones de destino, incluidas suscripciones a patrones de direcciones de destino estilo Ant.

Las aplicaciones también pueden usar direcciones de destino separadas por puntos (en lugar de barras). ) ).

Si el programador de tareas está configurado, el intermediario simple admitirá mensajes de latido del corazón - STOMP. Para configurar el programador, puede declarar su propio bean TaskScheduler e instalarlo a través de MessageBrokerRegistry. Alternativamente, puede usar uno que se declara automáticamente en la configuración de WebSocket incorporada; sin embargo, luego necesitará la anotación @Lazy para evitar bucles entre la configuración de WebSocket incorporada y su WebSocketMessageBrokerConfigurer. Por ejemplo:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private TaskScheduler messageBrokerTaskScheduler;
    @Autowired
    public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
        this.messageBrokerTaskScheduler = taskScheduler;
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue/", "/topic/")
                .setHeartbeatValue(new long[] {10000, 20000})
                .setTaskScheduler(this.messageBrokerTaskScheduler);
        // ...
    }
}

Broker externo

Un broker simple es excelente para comenzar, pero solo admite algunas de las Los comandos STOMP (no admite caracteres de confirmación, reconocimientos y algunas otras funciones), utilizan un bucle de mensajes simple y no son adecuados para agrupaciones. Alternativamente, puede actualizar sus aplicaciones para utilizar un intermediario de mensajes con todas las funciones.

Consulte la documentación de STOMP para el intermediario de mensajes de su elección (por ejemplo, RabbitMQ, ActiveMQ y otros), instale el broker y ejecútelo con el soporte STOMP habilitado. Luego puede habilitar el relé del intermediario STOMP (en lugar de un intermediario simple) en la configuración de Spring.

El siguiente ejemplo de configuración proporciona un intermediario completamente funcional:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
</beans>

El relé del agente STOMP en la configuración anterior es MessageHandler de Spring, que procesa mensajes reenviándolos a mensajes de un intermediario externo. Para hacer esto, establece una conexión TCP con el corredor, le reenvía todos los mensajes y luego reenvía todos los mensajes recibidos del corredor a los clientes a través de sus sesiones WebSocket. Básicamente, actúa como un "relé" que reenvía mensajes en ambas direcciones.

Agregue dependencias a su project io.projectreactor.netty:reactor-netty y io.netty:netty-all para administrar conexiones TCP.

Además, los componentes de la aplicación (como métodos para procesar solicitudes HTTP, servicios comerciales y otros) también pueden enviar mensajes al intermediario.

Básicamente, el intermediario proporciona una distribución de mensajes confiable y escalable.

Conexión al intermediario

Broker STOMP El relé mantiene una única conexión TCP de "sistema" con el intermediario. Esta conexión se utiliza únicamente para mensajes que se originan en la aplicación del lado del servidor y no para recibir mensajes. Puede configurar las credenciales STOMP (es decir, los encabezados login y passcode en el marco STOMP) para esta conexión. Esto se expondrá tanto en el espacio de nombres XML como en la configuración de Java como las propiedades systemLogin y systemPasscode con los valores predeterminados de guest y guest.

El relé del agente STOMP también crea una conexión TCP separada para cada cliente WebSocket conectado. Puede configurar las credenciales STOMP que se utilizan para todas las conexiones TCP realizadas en nombre de los clientes. Esto quedará expuesto tanto en el espacio de nombres XML como en la configuración de Java como las propiedades clientLogin y clientPasscode con los valores predeterminados de guest y invitado.

El relé del agente STOMP siempre establece el login y el passcode encabezados en cada trama CONNECT que reenvía al corredor en nombre de los clientes. Por lo tanto, los clientes WebSocket no necesitan configurar estos encabezados. Son ignorados. Los clientes de WebSocket deben utilizar la autenticación HTTP para proteger el punto final de WebSocket y establecer la identidad del cliente.

El relé del agente STOMP también envía y recibe mensajes de latido hacia y desde el agente de mensajes a través de la conexión TCP del "sistema". Puede configurar los intervalos para enviar y recibir mensajes de latido (predeterminado 10 segundos). Si se pierde la comunicación con el intermediario, el relé del intermediario seguirá intentando restablecer la conexión cada 5 segundos hasta que falle.

Cualquier Spring Bean puede implementar ApplicationListener<BrokerAvailabilityEvent> para recibir notificaciones si la conexión del "sistema" con el corredor se pierde y luego se restablece. Por ejemplo, el servicio Stock Quote que envía cotizaciones de acciones puede dejar de intentar enviar mensajes si no hay una conexión de "sistema" activa.

De forma predeterminada, el relé del corredor STOMP siempre se conecta y, opcionalmente, se vuelve a conectar si la conexión se pierde - al mismo host y puerto. Si necesita proporcionar varias direcciones en cada intento de conexión, puede configurar un proveedor de direcciones en lugar de un host y un puerto fijos. El siguiente ejemplo muestra cómo hacer esto:

 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
        registry.setApplicationDestinationPrefixes("/app");
    }
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                client -> client.addressSupplier(() -> ... ),
                new StompReactorNettyCodec());
    }
}

También puede configurar un relé de agente STOMP utilizando la propiedad virtualHost. El valor de esta propiedad se establece como el encabezado host de cada trama CONNECT y puede resultar útil (por ejemplo, en un entorno de nube donde el host real al que se conecta la conexión TCP se establece es diferente del host, que proporciona un servicio STOMP en la nube).

Puntos como delimitadores

Si los mensajes se enrutan a métodos anotados con @MessageMapping , coinciden con AntPathMatcher. De forma predeterminada, se espera que las plantillas utilicen una barra diagonal (/) como delimitador. Esta es una buena convención para aplicaciones web, similar a las URL HTTP. Sin embargo, si está más acostumbrado a las convenciones de mensajería, puede pasar a utilizar un punto (.) como separador.

El siguiente ejemplo muestra cómo hacer esto usando Java. configuraciones:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    // ...
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:websocket="http://www.springframework.org/schema/websocket"
       xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                https://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>
    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>
</beans>

El controlador podrá entonces utilizar el punto (.) como separador en los métodos marcados con la anotación de @MessageMapping, como se muestra en el siguiente ejemplo:


@Controller
@MessageMapping("red")
public class RedController {
    @MessageMapping("blue.{green}")
    public void handleGreen(@DestinationVariable String green) {
        // ...
    }
}

El cliente ahora puede enviar un mensaje a /app/red.blue.green123.

En el ejemplo anterior no cambiamos los prefijos para el "retransmisión del intermediario" porque dependen completamente del intermediario de mensajes externo. Consulte las páginas de documentación de STOMP del broker que está utilizando para ver qué convenciones admite para el encabezado de la dirección de destino.

Un "broker simple", por otro lado, se basa en un PathMatcher configurado. Por lo tanto, si cambia el separador, ese cambio también se aplicará al corredor y a cómo el corredor relaciona los destinos del mensaje con los patrones en las suscripciones.

Autenticación

Cada uno La sesión de mensajería STOMP a través de un WebSocket comienza con una solicitud HTTP. Esto podría ser una solicitud para pasar al protocolo WebSocket (es decir, un protocolo de enlace sobre el protocolo WebSocket) o, en el caso de respaldos a través del protocolo SockJS, una serie de solicitudes HTTP a los mecanismos de transferencia para SockJS.

Muchas aplicaciones web ya cuentan con herramientas de autenticación y autorización para proteger las solicitudes HTTP. Normalmente, el usuario se autentica a través de Spring Security mediante algún mecanismo, como una página de inicio de sesión, autenticación básica HTTP u otro método. El contexto de seguridad para el usuario autenticado se almacena en la sesión HTTP y se asocia con solicitudes posteriores en la misma sesión según la cookie.

Por lo tanto, en el caso del protocolo de enlace WebSocket o solicitudes HTTP, se utilizan mecanismos de transferencia para SockJS como Normalmente, ya existe un usuario autenticado, al que se puede acceder a través de HttpServletRequest#getUserPrincipal(). Spring asocia automáticamente a este usuario con la sesión WebSocket o SockJS creada para él y, posteriormente, con todos los mensajes STOMP enviados a través de esta sesión a través del encabezado del usuario.

En resumen, una aplicación web típica no necesita hacer nada más allá de lo que ya hace para garantizar la seguridad. El usuario se autentica en el nivel de solicitud HTTP mediante un contexto de seguridad que se mantiene en el nivel de sesión HTTP basado en cookies (que luego se asocia con sesiones WebSocket o SockJS creadas para ese usuario), lo que da como resultado que se agregue un encabezado de usuario a cada Message que pasa a través de la aplicación.

El protocolo STOMP tiene encabezados login y passcode en CONNECT marco. Fueron diseñados originalmente y son necesarios para usar STOMP sobre TCP. Sin embargo, cuando se utiliza STOMP sobre WebSocket, de forma predeterminada, Spring ignora los encabezados de autenticación en el nivel del protocolo STOMP y asume que el usuario ya está autenticado en el nivel del mecanismo de transporte HTTP. Se espera que la sesión WebSocket o SockJS contenga un usuario autenticado.

Autenticación de token

Proyecto Spring Security OAuthproporciona soporte para seguridad basada en tokens, incluido JSON Web Token (JWT). Puede usarlo como mecanismo de autenticación en aplicaciones web, incluido STOMP sobre WebSocket, como se describe en la sección anterior (es decir, para preservar la identidad a través de una sesión basada en cookies).

Al mismo tiempo, Las cookies basadas en sesiones no siempre son mejores (por ejemplo, en aplicaciones que no admiten una sesión del lado del servidor o en aplicaciones móviles que normalmente usan encabezados para la autenticación).

Protocolo WebSocket, RFC 6455 "no prescribe ninguna forma específica en la que los servidores puedan autenticar a los clientes durante el protocolo de enlace WebSocket". En la práctica, sin embargo, los clientes del navegador sólo pueden utilizar encabezados de autenticación estándar (es decir, autenticación básica HTTP) o cookies, pero no pueden (por ejemplo) proporcionar encabezados personalizados. Asimismo, el cliente JavaScript de SockJS no tiene la capacidad de enviar encabezados HTTP en solicitudes de transporte de SockJS. Consulte sockjs-client problema 196. En su lugar, le permite enviar parámetros de solicitud que puedes usarlo para enviar un token, pero esto tiene sus inconvenientes (por ejemplo, el token puede registrarse accidentalmente junto con la URL en los registros del servidor).

Las restricciones anteriores se aplican a los clientes basados en navegador y no se aplican al cliente STOMP basado en Java en Spring, que admite el envío de encabezados de solicitud a través de los protocolos WebSocket y SockJS. .

Por lo tanto, para las aplicaciones que necesitan evitar cookies, es posible que no existan buenas alternativas a la autenticación a nivel de protocolo HTTP. En este caso, en lugar de utilizar cookies, puede optar por la autenticación mediante encabezados en el nivel del protocolo de mensajería STOMP. Esto requiere dos pasos simples:

  1. Usar el cliente STOMP para pasar encabezados de autenticación durante la conexión.

  2. Procesar encabezados de autenticación usando ChannelInterceptor.

El siguiente ejemplo utiliza la configuración del lado del servidor para registrar un interceptor de autenticación personalizado. Tenga en cuenta que el interceptor solo necesita autenticarse y configurar el encabezado del usuario en un Message del formato CONNECT. Spring marca y almacena al usuario autenticado y lo asocia con mensajes STOMP posteriores en la misma sesión. El siguiente ejemplo muestra cómo registrar un interceptor de autenticación personalizado:


@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message preSend(Message message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}}

Tenga en cuenta también que cuando utilice la autorización de mensajes de Spring Security, actualmente debe asegurarse de que la configuración de ChannelInterceptor para la autenticación sea en orden antes de la configuración de Spring Security. La mejor manera de hacerlo es declarar un interceptor personalizado en su propia implementación WebSocketMessageBrokerConfigurer, que está marcada con la anotación @Order(Ordered.HIGHEST_PRECEDENCE + 99).

Autorización

Spring Security proporciona WebSocket de autorización de subprotocolo, que utiliza un ChannelInterceptor para autorizar mensajes según el encabezado de usuario que contienen. Además, Spring Session proporciona integración del protocolo WebSocket, lo que garantiza que la sesión HTTP del usuario no caducará mientras la sesión de WebSocket aún esté activa.

Direcciones de destino personalizadas

Una aplicación puede enviar mensajes dirigidos a un usuario específico y las funciones STOMP de Spring admiten este reconocimiento. direcciones de destino con el prefijo /user/. Por ejemplo, un cliente podría suscribirse a la dirección de destino /user/queue/position-updates. El UserDestinationMessageHandler procesa esta dirección de destino y la traduce en una dirección de destino única para la sesión del usuario (por ejemplo, /queue/position-updates-user123). Esto brinda la conveniencia de suscribirse a una dirección de destino común y al mismo tiempo garantiza que no haya conflictos con otros usuarios suscritos a la misma dirección de destino, de modo que cada usuario pueda recibir actualizaciones únicas de la posición de las acciones.

Cuando se trabaja con direcciones de destino personalizadas, es importante configurar los prefijos de dirección de destino del agente y de la aplicación; de lo contrario, el agente procesará mensajes con el prefijo "/usuario", que solo debe ser procesado por UserDestinationMessageHandler.

En el lado de la transmisión, los mensajes se pueden enviar a una dirección de destino como /user/{username}/queue/position-updates , que a su vez es traducido por UserDestinationMessageHandler a uno o más destinos, uno para cada sesión asociada con el usuario. Esto permite que cualquier componente de la aplicación envíe mensajes dirigidos a un usuario específico sin saber nada más que el nombre del usuario y la dirección de destino general. Esto también se puede hacer usando la anotación y la plantilla de mensajería.

El método de manejo de mensajes puede enviar mensajes al usuario asociado con el mensaje que se está procesando usando la anotación @SendToUser (también compatible a nivel de clase para un destino general), como se muestra en el siguiente ejemplo:


@Controller
public class PortfolioController {
    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

Si un usuario tiene más de una sesión, de forma predeterminada se dirigen a todas las sesiones suscritas a la dirección de destino determinada. Sin embargo, a veces es posible que desee centrarse únicamente en la sesión que envió el mensaje que se está procesando. Puede hacer esto estableciendo el atributo broadcast en falso, como se muestra en el siguiente ejemplo:


@Controller
public class MyController {
    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // MyBusinessException se lanza aquí
    }
    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}
Aunque los destinos de usuario normalmente suponen un usuario autenticado, esto no es estrictamente necesario. Una sesión de WebSocket no asociada con un usuario autenticado puede suscribirse a la dirección de destino del usuario. En tales casos, la anotación @SendToUser funciona exactamente igual que con broadcast=false (es decir, apunta solo a la sesión que envió el mensaje que se está procesando).

Puede enviar un mensaje a destinos personalizados desde cualquier componente de la aplicación, por ejemplo, implementando un SimpMessagingTemplate generado por una configuración de Java o un espacio de nombres XML. (El nombre del bean es brokerMessagingTemplate si es necesario para calificar completamente el nombre utilizando la anotación @Qualifier). El siguiente ejemplo muestra cómo hacer esto:


@Service
public class TradeServiceImpl implements TradeService {
    private final SimpMessagingTemplate messagingTemplate;
    @Autowired
    public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }
    // ...
    public void afterTradeExecuted(Trade trade) {
        this.messagingTemplate.convertAndSendToUser(
                trade.getUserName(), "/queue/position-updates", trade.getResult());
    }
}
Si está utilizando destinos personalizados con un agente de mensajes externo, debe revisar la documentación para el corredor sobre cómo administrar colas inactivas para que cuando finalice la sesión del usuario, se eliminen todas las colas únicas del usuario. Por ejemplo, RabbitMQ crea colas de eliminación automática cuando se utilizan destinos como /exchange/amq.direct/position-updates. En este caso, el cliente puede suscribirse a /user/exchange/amq.direct/position-updates. Asimismo, ActiveMQ tiene opciones de configuración para limpiar destinos inactivos.

En un escenario con múltiples servidores de aplicaciones, la dirección de destino del usuario puede permanecer sin resolver porque el usuario está conectado a otro servidor. En tales casos, puede configurar la dirección de destino para transmitir mensajes no resueltos para que otros servidores puedan intentar hacerlo. Esto se puede hacer usando la propiedad userDestinationBroadcast del registro MessageBrokerRegistry en la configuración de Java y el atributo user-destination-broadcast del message-broker elemento code> en XML.

Orden de los mensajes

Los mensajes del broker se publican a través del canal clientOutboundChannel, desde donde se se escriben en sesiones de WebSocket. Debido a que el canal está respaldado por un ThreadPoolExecutor, los mensajes se procesan en diferentes hilos y la secuencia resultante recibida por el cliente puede no coincidir con el orden exacto de publicación.

Si este es el caso Si hay un problema, habilite el flag setPreservePublishOrder, como se muestra en el siguiente ejemplo:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    protected void configureMessageBroker(MessageBrokerRegistry registry) {
        // ...
        registry.setPreservePublishOrder(true);
    }
}

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker preserve-publish-order="true">
            <!-- ... -->
    </websocket:message-broker>
</beans>

Si se establece el indicador, los mensajes dentro de la misma sesión del cliente se publican a través del clientOutboundChannel en un tiempo, lo que asegura el orden de publicación requerido. Tenga en cuenta que esto produce un ligero impacto en el rendimiento, por lo que solo debe habilitar este indicador si es necesario.

Eventos

Se publican varios eventos ApplicationContext y se pueden obtenido implementando la interfaz ApplicationListener para Spring:

  • BrokerAvailabilityEvent: indica cuándo el broker pasa a estar disponible o no. Aunque un intermediario "simple" pasa a estar disponible inmediatamente después del inicio y permanece así mientras se ejecuta la aplicación, el "retransmisión de intermediario" STOMP puede perder la conexión con un intermediario con todas las funciones (por ejemplo, cuando se reinicia el intermediario). El relé del intermediario contiene lógica de reconexión y restablece una conexión de "sistema" con el intermediario cuando se reactiva. Como resultado, este evento se publica cada vez que el estado cambia de conectado a desconectado y viceversa. Los componentes que utilizan SimpMessagingTemplate deben suscribirse a este evento y evitar enviar mensajes mientras el intermediario no esté disponible. En cualquier caso, deben estar preparados para manejar MessageDeliveryException cuando se envía un mensaje.

  • SessionConnectEvent: Publicado cuando Se recibe una nueva trama CONNECT del protocolo STOMP para indicar el inicio de una nueva sesión de cliente. El evento contiene un mensaje que representa la conexión, incluido el ID de la sesión, la información del usuario (si corresponde) y los encabezados personalizados enviados por el cliente. Esto es útil para realizar un seguimiento de las sesiones de los clientes. Los componentes que se suscriben a este evento pueden encapsular el mensaje contenido usando SimpMessageHeaderAccessor o StompMessageHeaderAccessor.

  • SessionConnectedEvent: publicado poco después de un SessionConnectEvent cuando el intermediario envió una trama CONNECTED del protocolo STOMP en respuesta a una trama CONNECT. En este punto, la sesión STOMP se puede considerar completamente establecida.

  • SessionSubscribeEvent: se publica cuando se recibe una nueva trama SUBSCRIBE del protocolo STOMP.

  • SessionUnsubscribeEvent: se publica cuando se recibe una nueva trama de UNSUBSCRIBE del protocolo STOMP.

  • SessionDisconnectEvent: Publicado al finalizar la sesión STOMP. El marco DISCONNECT puede enviarse desde el cliente o generarse automáticamente cuando se cierra la sesión de WebSocket. En algunos casos, este evento se publica más de una vez por sesión. Los componentes deben ser idempotentes con respecto a múltiples eventos de pérdida de conexión.

Si está utilizando un agente con todas las funciones, un STOMP de "retransmisión de intermediario" restablece automáticamente una conexión de "sistema" si el intermediario deja de estar disponible temporalmente. Sin embargo, las conexiones de los clientes no se restablecen automáticamente. Si los mensajes de latido están habilitados, el cliente generalmente reacciona cuando el corredor no responde dentro de los 10 segundos. Los clientes deben implementar su propia lógica de restablecimiento de conexión.

Intercepción

Los eventos informan notificaciones sobre el ciclo de vida de una conexión STOMP, pero no sobre cada mensaje del cliente. Las aplicaciones también pueden registrar un ChannelInterceptor para interceptar cualquier mensaje en cualquier parte de la cadena de procesamiento. El siguiente ejemplo muestra cómo interceptar mensajes entrantes de clientes:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new MyChannelInterceptor());
    }
}

Un ChannelInterceptor personalizado puede usar StompHeaderAccessor o SimpMessageHeaderAccessor para acceder a la información del mensaje, como se muestra en el siguiente ejemplo:


public class MyChannelInterceptor implements ChannelInterceptor {
    @Override
    public Message preSend(Message message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

Las aplicaciones también pueden implementar ExecutorChannelInterceptor, que es una subinterfaz de ChannelInterceptor con in- Devoluciones de llamada de subprocesos, en las que se procesan los mensajes. Aunque ChannelInterceptor se llama una vez por cada mensaje enviado al canal, ExecutorChannelInterceptor proporciona interceptores en el hilo de cada MessageHandler que se suscribe a mensajes de el canal.

Tenga en cuenta que, al igual que con el SessionDisconnectEvent descrito anteriormente, se puede recibir un mensaje DISCONNECT del cliente y también se puede generar automáticamente cuando se cierra la sesión de WebSocket. . En algunos casos, un espía puede interceptar este mensaje más de una vez en cada sesión. Los componentes deben ser idempotentes con respecto a múltiples eventos de pérdida de conexión.

Cliente STOMP

Spring proporciona un cliente STOMP a través de WebSocket y un cliente STOMP a través de TCP.

Para empezar, puede crear y configurar un WebSocketStompClient como se muestra en el siguiente ejemplo:


WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // para latidos

En el ejemplo anterior, puedes reemplazar StandardWebSocketClient con SockJsClient, ya que también es una implementación de WebSocketClient. SockJsClient puede usar WebSocket o un mecanismo de transporte basado en HTTP como alternativa.

Luego puede establecer una conexión y proporcionar un controlador para la sesión STOMP, como se muestra en el siguiente ejemplo. :


String url = "ws://127.0. 0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

Cuando la sesión esté lista para usarse, se notificará al controlador, como se muestra en el siguiente ejemplo:


public class MyStompSessionHandler extends StompSessionHandlerAdapter {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

Una vez establecida la sesión, puedes enviar cualquier carga útil que se serializa usando MessageConverter configurado, como se muestra en el siguiente ejemplo:

session.send("/topic/something", "payload");

También puede suscribirse a direcciones de destino. Los métodos subscribe requieren un controlador de mensajes para la suscripción y devuelven un identificador Subscription que puede utilizar para cancelar la suscripción. Para cada mensaje recibido, el controlador puede especificar un tipo de destino Object en el que se debe deserializar la carga útil, como se muestra en el siguiente ejemplo:


session.subscribe("/topic/something", new StompFrameHandler() {
    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }
});

Para habilitar los mensajes de latido STOMP, puede configurar WebSocketStompClient usando TaskScheduler y, opcionalmente, configurar intervalos de transmisión de mensajes de latido (10 segundos sin actividad de escritura, lo que provocará que se envíe un mensaje de latido). se enviará y 10 segundos si no hay actividad de lectura, lo que provocará que se cierre la conexión).

WebSocketStompClient solo envía un mensaje de latido si no hay actividad, es decir. cuando no se envían otros mensajes. Esto puede ser un problema cuando se utiliza un intermediario externo porque los mensajes con un destino que no es un intermediario reflejan actividad pero en realidad no se reenvían al intermediario. En este caso, puede configurar TaskScheduler al inicializar el broker externo, lo que garantiza que el mensaje de latido se reenvíe al broker también si solo se envían mensajes con una dirección de destino que no sea del broker.

Si está utilizando WebSocketStompClient para pruebas de rendimiento para simular miles de clientes desde una sola máquina, considere deshabilitar los mensajes de latido, ya que cada conexión programa su propios latidos -tareas, y no hay optimización para este proceso para una gran cantidad de clientes que se ejecutan en la misma máquina.

El protocolo STOMP también brinda soporte para acuse de recibo si el cliente debe agregar un receipt encabezado al que el servidor responde con un marco RECEIPT después de procesar el envío o la suscripción. Para brindar este soporte, StompSession ofrece setAutoReceipt(boolean), lo que garantiza que el encabezado receipt se agregue en cada evento de envío o suscripción posterior. También puede agregar manualmente un encabezado de acuse de recibo a StompHeaders. Tanto el envío como la suscripción devuelven una instancia Receiptable que se puede utilizar para registrar la recepción de devoluciones de llamadas exitosas y fallidas. Esta característica requiere que el cliente esté configurado para usar TaskScheduler y establecer la cantidad de tiempo antes de que expire el acuse de recibo (el valor predeterminado es 15 segundos).

Tenga en cuenta que StompSessionHandler es en sí mismo un StompFrameHandler, lo que le permite manejar marcos de ERROR además de la devolución de llamada handleException destinada a las excepciones lanzadas durante el procesamiento de mensajes y handleTransportError apuntar a errores a nivel de transmisión, incluida ConnectionLostException.

Accesibilidad del protocolo WebSocket

Cada sesión de WebSocket tiene un mapa de atributos. El mapa está vinculado como encabezado a los mensajes entrantes del cliente y se puede acceder a él desde un método de controlador, como se muestra en el siguiente ejemplo:


@Controller
public class MyController {
    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // ...
    }
}

Puede declarar un bean administrado Spring en el alcance de disponibilidad websocket. Puede inyectar beans con ámbito WebSocket en controladores y en cualquier enlace de canal registrado a través de clientInboundChannel. Como regla general, son singletons y su ciclo de vida es más largo que el de cada sesión individual de WebSocket. Por lo tanto, los beans que forman parte de un alcance de accesibilidad de WebSocket deben usar el modo proxy para el alcance de accesibilidad, como se muestra en el siguiente ejemplo:


@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
    @PostConstruct
    public void init() {
        // Llamado después de la inyección de dependencia
    }
    // ...
    @PreDestroy
    public void destroy() {
        // Llamado después de la inyección de dependencia
    }
}
@Controller
public class MyController {
    private final MyBean myBean;
    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }
    @MessageMapping("/action")
    public void handle() {
        // this.myBean de la sesión actual de WebSocket
    }
}

Como cualquier alcance personalizado disponibilidad, Spring inicializa una nueva instancia de MyBean la primera vez que se accede a ella desde el controlador y la almacena en los atributos de sesión de WebSocket. Posteriormente se devuelve la misma copia antes del final de la sesión. Para los beans dentro del alcance de accesibilidad de WebSocket, se llaman a todos los métodos del ciclo de vida de Spring, como se muestra en los ejemplos anteriores.

Rendimiento

Cuando se trata de rendimiento, no existe una única solución adecuada. Se ve afectado por muchos factores, incluido el tamaño y el volumen de los mensajes, si la aplicación está realizando un trabajo que requiere bloqueo y factores externos (como la velocidad de la red y otros detalles). El propósito de esta sección es describir brevemente las opciones de configuración disponibles y brindar algunas ideas y debates sobre el escalado.

En una aplicación de mensajería, los mensajes pasan a través de canales para la ejecución asincrónica, que son compatibles con grupos de subprocesos. . Configurar una aplicación de este tipo requiere una buena comprensión de cómo funcionan los canales y el flujo de mensajes.

El lugar obvio para comenzar es configurar grupos de subprocesos que admitan clientInboundChannel y clientOutboundChannel. De forma predeterminada, ambas configuraciones están configuradas para duplicar el número de procesadores disponibles.

Si el procesamiento de mensajes en los métodos anotados depende principalmente del procesador, el número de subprocesos para clientInboundChannel debe permanecer cercano al número de procesadores. Si el trabajo que realizan requiere más E/S y requiere bloqueo o espera en una base de datos u otro sistema externo, probablemente sea necesario aumentar el tamaño del grupo de subprocesos.

ThreadPoolExecutor tiene tres propiedades importantes: el tamaño del grupo de subprocesos principal, el tamaño máximo del grupo de subprocesos y la capacidad de la cola para contener tareas para las que no hay subprocesos libres.

A menudo surge confusión: la configuración del tamaño del grupo principal (por ejemplo, 10) y el tamaño máximo del grupo (por ejemplo, 20) da como resultado un grupo de subprocesos con entre 10 y 20 subprocesos. De hecho, si deja el valor de capacidad predeterminado de Integer.MAX_VALUE, el grupo de subprocesos nunca excederá el tamaño del grupo principal, ya que todas las tareas adicionales están en cola.

Consulte el javadoc en ThreadPoolExecutor , para aprender cómo funcionan estas propiedades y familiarizarse con diferentes estrategias de cola.

El lado clientOutboundChannel trata de enviar mensajes a clientes WebSocket. Si los clientes están en una red rápida, la cantidad de subprocesos debe permanecer cercana a la cantidad de procesadores disponibles. Si son lentos o tienen un rendimiento bajo, tardan más en consumir mensajes y ejercen presión sobre el grupo de subprocesos. Por lo tanto, es necesario aumentar el tamaño del grupo de subprocesos.

Si bien la carga de trabajo para el canal clientInboundChannel se puede predecir (después de todo, depende de lo que esté haciendo la aplicación). configurar el clientOutboundChannel es más difícil, ya que su funcionamiento se basa en factores independientes de la aplicación. Por este motivo, hay dos propiedades adicionales asociadas con el envío de mensajes: sendTimeLimit y sendBufferSizeLimit. Puede utilizar estos métodos para configurar la duración del envío y la cantidad de datos que se almacenarán en el búfer al enviar mensajes al cliente.

La idea general es que solo se puede utilizar un hilo para enviar al cliente en cualquier momento. . Mientras tanto, todos los mensajes adicionales se almacenan en el búfer y puede utilizar estas propiedades para decidir cuánto tiempo debe tardar en enviarse un mensaje y cuántos datos se pueden almacenar en el búfer durante ese tiempo. Consulte la documentación del esquema javadoc y XML para obtener detalles adicionales importantes.

El siguiente ejemplo muestra una posible configuración:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }
    // ...
}

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>
</beans>

También puede utilizar la configuración del mecanismo de transporte WebSocket que se mostró anteriormente para configurar el tamaño máximo permitido de los mensajes STOMP entrantes. En teoría, el tamaño de un mensaje WebSocket puede ser prácticamente ilimitado. En la práctica, los servidores WebSocket establecen límites, por ejemplo, 8 KB para Tomcat y 64 KB para Jetty. Por este motivo, los clientes STOMP (como webstomp-client de JavaScript y otros) rompen mensajes STOMP grandes, cuando alcanza un tamaño de 16 KB y los envía como múltiples mensajes WebSocket, lo que requiere que el servidor los almacene en buffer y los vuelva a ensamblar.

El soporte STOMP sobre WebSocket de Spring hace esto para que las aplicaciones puedan configurar el tamaño máximo de los mensajes STOMP independientemente del mensaje, tamaños específicos del servidor WebSocket. Recuerde que el tamaño del mensaje WebSocket se ajusta automáticamente según sea necesario para garantizar que los mensajes WebSocket se envíen con un tamaño mínimo de 16 KB.

El siguiente ejemplo muestra una configuración posible:


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }
    // ...
}

El siguiente ejemplo muestra el equivalente XML de la configuración del ejemplo anterior:


<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        https://www.springframework.org/schema/websocket/spring-websocket.xsd">
    <websocket:message-broker>
        <websocket:transport message-size="131072" />
            <!-- ... -->
    </websocket:message-broker>
</beans>

Un punto importante en el escalado es el uso de múltiples instancias de aplicación. Actualmente no es posible hacer esto con un simple corredor. Sin embargo, cuando se utiliza un intermediario con todas las funciones (como RabbitMQ), cada instancia de la aplicación está conectada al intermediario y los mensajes enviados por una instancia de la aplicación se pueden enviar a través del intermediario a los clientes WebSocket conectados a través de cualquier otra instancia de la aplicación.

Monitoreo

Si utiliza la anotación @EnableWebSocketMessageBroker o <websocket:message-broker>, los componentes clave de la infraestructura recopilan automáticamente estadísticas y contadores que proporcionan información importante sobre las aplicaciones de estado interno. La configuración también declara un bean de tipo WebSocketMessageBrokerStats, que recopila toda la información disponible en un solo lugar y, de forma predeterminada, la registra en el nivel INFO una vez cada 30 minutos. Este bean se puede exportar a JMX a través de MBeanExporter desde Spring para verlo en tiempo de ejecución (por ejemplo, a través de jconsole desde JDK). La siguiente lista proporciona un resumen:

Sesiones de cliente WebSocket
Actual

Muestra cuántas sesiones de cliente existen actualmente, con un desglose adicional por transmisión de WebSocket versus HTTP y sesiones de sondeo a través del protocolo SockJS.

Total

Indica cuántas sesiones totales se establecieron.

Cerrado por emergencia
Errores de conexión

Sesiones que se establecieron pero se cerraron después de que no se recibieron mensajes durante 60 segundos . Esto generalmente indica problemas con el servidor proxy o la red.

Límite de envío excedido

Las sesiones se cierran después de exceder el tiempo de espera de envío configurado o el envío límite de búfer, lo que puede ocurrir con clientes lentos (consulte la sección anterior).

Errores de transporte

Las sesiones se cierran después de un error de transmisión , como que la conexión WebSocket o la solicitud o respuesta HTTP no se puedan leer o escribir.

Marcos STOMP

El número total de tramas CONNECT, CONNECTED y DISCONNECT procesadas, lo que indica cuántos clientes se conectaron en el nivel STOMP. Tenga en cuenta que el contador de tramas DISCONNECT puede ser inferior si las sesiones se cierran de forma anormal o si los clientes se cierran sin enviar una trama DISCONNECT.

STOMP Broker Relay
Conexiones TCP

Indica cuántas conexiones TCP hay En nombre del cliente, las sesiones WebSocket se establecen con el corredor. El valor debe ser igual al número de sesiones de WebSocket del cliente + 1 conexión de "sistema" general adicional para enviar mensajes desde la aplicación.

STOMP Frames

El número total de tramas CONNECT, CONNECTED y DISCONNECT enviadas o recibidas del corredor en nombre de los clientes. Tenga en cuenta que la trama DISCONNECT se envía al intermediario independientemente de cómo se cerró la sesión del WebSocket del cliente. Por lo tanto, menos tramas DISCONNECT son una señal de que el intermediario está cerrando conexiones activamente (quizás debido a un latido inoportuno, una trama de entrada no válida u otro problema).

Canal de entrada del cliente

Estadísticas del grupo de subprocesos que admite clientInboundChannel que proporcionan información sobre el estado de procesamiento de los mensajes entrantes. Las tareas en cola son una señal de que la aplicación puede estar procesando mensajes demasiado lentamente. Si tiene tareas que requieren un uso intensivo de E/S (como consultas lentas a bases de datos, solicitudes HTTP a una API REST de terceros, etc.), considere aumentar el tamaño del grupo de subprocesos.

Canal de salida del cliente

Estadísticas del grupo de subprocesos que respaldan el clientOutboundChannel que brindan información sobre el estado de las transmisiones de mensajes a los clientes. Las tareas en cola son una señal de que los clientes tardan demasiado en aceptar mensajes. Una forma de resolver este problema es aumentar el tamaño del grupo de subprocesos para dar cabida a la cantidad esperada de clientes lentos simultáneos. Otra opción es reducir los límites de tiempo de espera de envío y tamaño del búfer de envío (consulte la sección anterior).

Programador de tareas SockJS

Estadísticas del grupo de subprocesos del programador de tareas SockJS, que se utiliza para enviar mensajes de latido. Tenga en cuenta que al negociar mensajes de latido en el nivel STOMP, el envío de mensajes de latido para SockJS está deshabilitado.

Pruebas

Hay dos enfoque principal para las pruebas de aplicaciones cuando se utiliza el soporte STOMP de Spring sobre WebSocket. La primera es escribir pruebas del lado del servidor para probar la funcionalidad de los controladores y sus métodos de manejo de mensajes anotados. El segundo es escribir pruebas completas de un extremo a otro que incluyan la ejecución tanto del cliente como del servidor.

Los dos enfoques no son mutuamente excluyentes. Al contrario, cada uno de ellos tiene su propio lugar en la estrategia general de pruebas. Las pruebas del lado del servidor están más enfocadas y son más fáciles de escribir y mantener. Por otro lado, las pruebas de integración de un extremo a otro son más completas y cubren mucho más, pero también son más complejas de escribir y mantener.

La forma más sencilla de prueba del lado del servidor es escribir el controlador, pruebas unitarias. Sin embargo, esto no es lo suficientemente práctico porque mucho de lo que hace el controlador depende de sus anotaciones. Las pruebas unitarias puras simplemente no probarán cómo funciona.

Lo ideal es que los controladores bajo prueba se llamen tal como están durante la ejecución del programa, similar al enfoque para probar controladores que manejan solicitudes HTTP utilizando Spring MVC Test, framework, es decir, sin ejecutar un contenedor de servlets, pero usando Spring Framework para llamar a controladores anotados. Al igual que con la prueba Spring MVC, aquí tiene dos alternativas posibles: usar una configuración "específica del contexto" o "independiente":

  • Cargue la configuración Spring real usando el marco Spring TestContext , implemente clientInboundChannel como campo de prueba y utilícelo para enviar mensajes que serán procesados por los métodos del controlador.

  • Instale manualmente la infraestructura mínima del marco Spring necesario para llamar a los controladores (es decir, SimpAnnotationMethodMessageHandler) y pasarle mensajes para los controladores directamente.

Ambos escenarios de configuración se demuestran en la aplicación de muestra pruebas para acciones portfolio.

El segundo enfoque es crear pruebas de integración de un extremo a otro. Para hacer esto, debe ejecutar el servidor WebSocket en modo integrado y conectarse a él como un cliente WebSocket que envía mensajes WebSocket que contienen fotogramas STOMP. Pruebas de aplicación de muestra Stock Portfoliotambién adopta este enfoque, utilizando Tomcat como servidor WebSocket integrado y un cliente STOMP simple para fines de prueba.