3.WebSockets

3.1 WebSocket介绍

WebSocket协议RFC 6455提供了一种标准化方法,可通过单个TCP连接在客户端和服务器之间建立全双工双向通信通道。 它是与HTTP不同的TCP协议,但旨在通过端口80和443在HTTP上工作,并允许重复使用现有的防火墙规则。

WebSocket交互始于一个HTTP请求,该请求使用HTTP Upgrade标头进行升级,或在这种情况下切换到WebSocket协议。 以下示例显示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

具有WebSocket支持的服务器代替通常的200状态代码,返回的输出类似于以下内容:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

成功握手后,HTTP升级请求的基础TCP套接字将保持打开状态,客户端和服务器均可继续发送和接收消息。

WebSockets的工作原理的完整介绍超出了本文档的范围。 请参阅RFC 6455,HTML5的WebSocket章节或Web上的许多简介和教程中的任何一个。

请注意,如果WebSocket服务器在Web服务器(例如nginx)后面运行,则可能需要对其进行配置,以将WebSocket升级请求传递到WebSocket服务器。 同样,如果应用程序在云环境中运行,请检查与WebSocket支持相关的云提供商的说明。

3.1.1 HTTP 和 WebSocket

尽管WebSocket被设计为与HTTP兼容并以HTTP请求开头,但重要的是要了解这两个协议导致了截然不同的体系结构和应用程序编程模型。

在HTTP和REST中,应用程序被建模为许多URL。为了与应用程序交互,客户端访问那些URL,即请求-响应样式。服务器根据HTTP URL,方法和标头将请求路由到适当的处理程序。

相反,在WebSockets中,通常只有一个URL用于初始连接。随后,所有应用程序消息在同一TCP连接上流动。这指向了一个完全不同的异步,事件驱动的消息传递体系结构。

WebSocket也是一种低级传输协议,与HTTP不同,它不对消息的内容规定任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则就无法路由或处理消息。

WebSocket客户端和服务器可以通过HTTP握手请求上的Sec-WebSocket-Protocol标头,协商使用更高级别的消息传递协议(例如,STOMP)。在这种情况下,他们需要提出自己的约定。

3.1.2 什么时候使用WebSockets

WebSockets可以使网页具有动态性和交互性。但是,在许多情况下,结合使用Ajax和HTTP流或长轮询可以提供一种简单有效的解决方案。

例如,新闻,邮件和社交订阅源需要动态更新,但是每几分钟进行一次更新可能是完全可以的。另一方面,协作,游戏和金融应用程序需要更接近实时。

仅延迟并不是决定因素。如果消息量相对较少(例如,监视网络故障),则HTTP流或轮询可以提供有效的解决方案。低延迟,高频率和高音量的结合才是使用WebSocket的最佳案例。

还请记住,在Internet上,控件之外的限制性代理可能会阻止WebSocket交互,这可能是因为未将它们配置为传递Upgrade标头,或者是因为它们关闭了长期处于空闲状态的连接。这意味着与面向公众的应用程序相比,将WebSocket用于防火墙内部的应用程序是一个更直接的决定。

3.2 WebSocket API

Spring框架提供了一个WebSocket API,可用于编写处理WebSocket消息的客户端和服务器端应用程序。

3.2.1. Server

要创建WebSocket服务器,您可以首先创建WebSocketHandler。 以下示例显示了如何执行此操作:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}

然后,可以将其映射到URL并添加一个WebSocketHandlerAdapter,如以下示例所示:

@Configuration
static class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1); // before annotated controllers
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

3.2.2 WebSocketHandler

WebSocketHandler的handle方法采用WebSocketSession并返回Mono <Void>来指示会话的应用程序处理何时完成。 通过两个流处理会话,一个流用于入站消息,一个流用于出站消息。 下表描述了两种处理流的方法:

WebSocketSession method

Description

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在关闭连接时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

获取传出消息的源,编写消息,并返回一个Mono <Void>,当源完成并完成写入时,Mono <Void>完成。

WebSocketHandler必须将入站和出站流组成一个统一的流,并返回反映该流完成情况的Mono <Void>。 根据应用程序要求,统一流程在以下情况下完成:

  • 入站或出站消息流都已完成。

  • 入站流完成(即,连接已关闭),而出站流是无限的。

  • 在选定的时间点,通过WebSocketSession的close方法。

将入站和出站消息流组合在一起时,无需检查连接是否打开,因为响应式流信号会终止活动。 入站流接收完成或错误信号,而出站流接收取消信号。

处理程序最基本的实现是处理入站流的实现。 以下示例显示了这样的实现:

class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
}
}

对于嵌套的异步操作,您可能需要在使用池化数据缓冲区的底层服务器(例如Netty)上调用message.retain()。 否则,在您有机会读取数据之前,可能会释放数据缓冲区。 有关更多背景信息,请参见数据缓冲区和编解码器。

以下实现将入站和出站流组合在一起:

class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value));
return session.send(output);
}
}

入站和出站流可以是独立的,并且只能为了完成而加入,如以下示例所示:

class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage));
return Mono.zip(input, output).then();
}
}

3.2.3 DataBuffer

DataBuffer是WebFlux中字节缓冲区的表示形式。 该参考书的Spring Core部分在有关数据缓冲区和编解码器的部分中有更多内容。 要理解的关键点是,在诸如Netty之类的某些服务器上,字节缓冲区被池化并且对引用计数进行计数,并且在消耗字节缓冲区时必须将其释放以避免内存泄漏。

在Netty上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用DataBufferUtils.retain(dataBuffer),并在使用完缓冲区后随后使用DataBufferUtils.release(dataBuffer)。

3.2.4 Handshake

WebSocketHandlerAdapter委托给WebSocketService。 默认情况下,它是HandshakeWebSocketService的实例,该实例对WebSocket请求执行基本检查,然后对所使用的服务器使用RequestUpgradeStrategy。 当前,内置了对Reactor Netty,Tomcat,Jetty和Undertow的支持。

HandshakeWebSocketService公开了一个sessionAttributePredicate属性,该属性允许设置Predicate <String>从WebSession中提取属性并将其插入WebSocketSession的属性中。

3.2.5 Server Configation

每个服务器的RequestUpgradeStrategy公开了可用于基础WebSocket引擎的WebSocket相关配置选项。 以下示例在Tomcat上运行时设置WebSocket选项:

@Configuration
static class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}

检查服务器的升级策略,以查看可用的选项。 当前,只有Tomcat和Jetty公开了此类选项。

3.2.6 CORS

配置CORS并限制对WebSocket端点的访问的最简单方法是让WebSocketHandler实现CorsConfigurationSource并返回带有允许的源,标头和其他详细信息的CorsConfiguraiton。 如果无法执行此操作,则还可以在SimpleUrlHandler上设置corsConfigurations属性,以通过URL模式指定CORS设置。 如果同时指定了两者,则使用CorsConfiguration上的Combine方法将它们合并。

3.2.7 Client

Spring WebFlux为WebSocketClient抽象提供了Reactor Netty,Tomcat,Jetty,Undertow和标准Java(即JSR-356)的实现。

Tomcat客户端实际上是标准Java客户端的扩展,在WebSocketSession处理中具有一些额外功能,以利用特定于Tomcat的API暂停接收消息以产生反压。

要启动WebSocket会话,您可以创建客户端的实例并使用其execute方法:

WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());

一些客户端(例如Jetty)实现了生命周期,需要先停止和启动,然后才能使用它们。 所有客户端都具有与基础WebSocket客户端的配置有关的构造器选项。