Spring WebFlux包括用于HTTP请求的反应性,非阻塞WebClient。 客户端具有功能性,流利的API,具有用于声明式组合的反应式类型,请参见反应式库。 WebFlux客户端和服务器依靠相同的非阻塞编解码器对请求和响应内容进行编码和解码。
WebClient在内部委托给HTTP客户端库。 默认情况下,它使用Reactor Netty,内置了对Jetty反应式HttpClient的支持,其他的则可以通过ClientHttpConnector插入。
2.1 Configuration
创建WebClient的最简单方法是通过静态工厂方法之一:
WebClient.create(String baseUrl)
上面的方法使用默认设置的Reactor Netty HttpClient,并期望io.projectreactor.netty:reactor-netty位于类路径上。
您还可以将WebClient.builder()与其他选项一起使用:
uriBuilderFactory:定制的UriBuilderFactory用作基本URL。
defaultCookie:每个请求的cookie。
defaultRequest:消费者自定义每个请求。
exchangeStrategies:HTTP消息读取器/写入器定制。
clientConnector:HTTP客户端库设置。
以下示例配置HTTP编解码器:
复制 ExchangeStrategies strategies = ExchangeStrategies . builder ()
. codecs (configurer -> {
// ...
})
. build ();
WebClient client = WebClient . builder ()
. exchangeStrategies (strategies)
. build ();
构建后,WebClient实例是不可变的。 但是,您可以克隆它并构建修改后的副本,而不会影响原始实例,如以下示例所示:
复制 WebClient client1 = WebClient . builder ()
. filter (filterA) . filter (filterB) . build ();
WebClient client2 = client1 . mutate ()
. filter (filterC) . filter (filterD) . build ();
// client1 has filterA, filterB
// client2 has filterA, filterB, filterC, filter
2.1.1 Reactor Netty
要自定义Reactor Netty设置,只需提供一个预先配置的HttpClient:
复制 HttpClient httpClient = HttpClient . create () . secure (sslSpec -> ... );
WebClient webClient = WebClient . builder ()
. clientConnector ( new ReactorClientHttpConnector(httpClient) )
. build ()
Resources
默认情况下,HttpClient会参与Reactor.netty.http.HttpResources中包含的全局Reactor Netty资源,包括事件循环线程和连接池。 这是推荐的模式,因为固定的共享资源是事件循环并发的首选。 在这种模式下,全局资源将保持活动状态,直到进程退出。
如果服务器为该进程计时,则通常无需显式关闭。 但是,如果服务器可以启动或停止进程内(例如,部署为WAR的Spring MVC应用程序),则可以声明类型为ReactorResourceFactory的Spring托管Bean,其中globalResources = true(默认值)以确保Reactor 关闭Spring ApplicationContext时,将关闭Netty全局资源,如以下示例所示:
复制 @ Bean
public ReactorResourceFactory reactorResourceFactory() {
return new ReactorResourceFactory() ;
}
您也可以选择不参与全局Reactor Netty资源。 但是,在这种模式下,确保所有Reactor Netty客户端和服务器实例使用共享资源是您的重担,如以下示例所示:
复制 @ Bean
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory factory = new ReactorResourceFactory() ;
factory . setGlobalResources ( false );
return factory;
}
@ Bean
public WebClient webClient() {
Function < HttpClient , HttpClient > mapper = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new ReactorClientHttpConnector(resourceFactory() , mapper) ;
return WebClient . builder () . clientConnector (connector) . build ();
}
Timeouts
要配置连接超时:
复制 import io . netty . channel . ChannelOption ;
HttpClient httpClient = HttpClient . create ()
. tcpConfiguration (client ->
client . option ( ChannelOption . CONNECT_TIMEOUT_MILLIS , 10000 ));
要配置读取和/或写入超时值:
复制 import io . netty . handler . timeout . ReadTimeoutHandler ;
import io . netty . handler . timeout . WriteTimeoutHandler ;
HttpClient httpClient = HttpClient . create ()
. tcpConfiguration (client ->
client . doOnConnected (conn -> conn
. addHandlerLast ( new ReadTimeoutHandler( 10 ) )
. addHandlerLast ( new WriteTimeoutHandler( 10 ) )));
2.1.2 Jetty
以下示例显示如何自定义Jetty HttpClient设置:
复制 HttpClient httpClient = new HttpClient() ;
httpClient . setCookieStore ( ... );
ClientHttpConnector connector = new JettyClientHttpConnector(httpClient) ;
WebClient webClient = WebClient . builder () . clientConnector (connector) . build ()
默认情况下,HttpClient创建自己的资源(执行程序,ByteBufferPool,调度程序),这些资源将保持活动状态,直到进程退出或调用stop()为止。
您可以在Jetty客户端(和服务器)的多个实例之间共享资源,并通过声明JettyResourceFactory类型的Spring托管bean来确保在关闭Spring ApplicationContext时关闭资源,如以下示例所示:
复制 @ Bean
public JettyResourceFactory resourceFactory() {
return new JettyResourceFactory() ;
}
@ Bean
public WebClient webClient() {
Consumer < HttpClient > customizer = client -> {
// Further customizations...
};
ClientHttpConnector connector =
new JettyClientHttpConnector(resourceFactory() , customizer) ;
return WebClient . builder () . clientConnector (connector) . build ();
}
2.2 retrieve()
Retrieve()方法是获取响应主体并将其解码的最简单方法。 以下示例显示了如何执行此操作:
复制 WebClient client = WebClient . create ( "https://example.org" );
Mono < Person > result = client . get ()
. uri ( "/persons/{id}" , id) . accept ( MediaType . APPLICATION_JSON )
. retrieve ()
. bodyToMono ( Person . class );
您还可以从响应中解码出一个对象流,如以下示例所示:
复制 Flux < Quote > result = client . get ()
. uri ( "/quotes" ) . accept ( MediaType . TEXT_EVENT_STREAM )
. retrieve ()
. bodyToFlux ( Quote . class );
默认情况下,具有4xx或5xx状态代码的响应会导致WebClientResponseException或其HTTP状态特定的子类之一,例如WebClientResponseException.BadRequest,WebClientResponseException.NotFound等。 您还可以使用onStatus方法来自定义结果异常,如以下示例所示:
复制 Mono < Person > result = client . get ()
. uri ( "/persons/{id}" , id) . accept ( MediaType . APPLICATION_JSON )
. retrieve ()
. onStatus (HttpStatus :: is4xxServerError , response -> ... )
. onStatus (HttpStatus :: is5xxServerError , response -> ... )
. bodyToMono ( Person . class );
使用onStatus时,如果期望响应包含内容,则onStatus回调应使用它。 否则,内容将自动耗尽以确保释放资源。
2.3 exchange()
与检索方法相比,exchange()方法提供了更多的控制。 以下示例等效于retrieve(),但也提供了对ClientResponse的访问:
复制 Mono < Person > result = client . get ()
. uri ( "/persons/{id}" , id) . accept ( MediaType . APPLICATION_JSON )
. exchange ()
. flatMap (response -> response . bodyToMono ( Person . class ));
在此级别,您还可以创建完整的ResponseEntity:
复制 Mono < ResponseEntity < Person >> result = client . get ()
. uri ( "/persons/{id}" , id) . accept ( MediaType . APPLICATION_JSON )
. exchange ()
. flatMap (response -> response . toEntity ( Person . class ));
注意(与retrieve()不同),对于exchange(),没有针对4xx和5xx响应的自动错误信号。 您必须检查状态码并决定如何进行。
使用exchange()时,必须始终使用ClientResponse的body或toEntity方法中的任何一种,以确保释放资源并避免HTTP连接池的潜在问题。 如果不需要响应内容,则可以使用bodyToMono(Void.class)。 但是,如果响应中确实包含内容,则连接将关闭并且不会放回池中。
2.4 Request Body
可以从对象对请求主体进行编码,如以下示例所示:
复制 Mono < Person > personMono = ... ;
Mono < Void > result = client . post ()
. uri ( "/persons/{id}" , id)
. contentType ( MediaType . APPLICATION_JSON )
. body (personMono , Person . class )
. retrieve ()
. bodyToMono ( Void . class );
您还可以对对象流进行编码,如以下示例所示:
复制 Flux < Person > personFlux = ... ;
Mono < Void > result = client . post ()
. uri ( "/persons/{id}" , id)
. contentType ( MediaType . APPLICATION_STREAM_JSON )
. body (personFlux , Person . class )
. retrieve ()
. bodyToMono ( Void . class );
或者,如果您具有实际值,则可以使用syncBody快捷方式,如以下示例所示:
复制 Person person = ... ;
Mono < Void > result = client . post ()
. uri ( "/persons/{id}" , id)
. contentType ( MediaType . APPLICATION_JSON )
. syncBody (person)
. retrieve ()
. bodyToMono ( Void . class )
2.4.1 Form Data
要发送表单数据,可以提供MultiValueMap <String,String>作为正文。 请注意,内容由FormHttpMessageWriter自动设置为application / x-www-form-urlencoded。 下面的示例演示如何使用MultiValueMap <String,String>:
复制 MultiValueMap < String , String > formData = ... ;
Mono < Void > result = client . post ()
. uri ( "/path" , id)
. syncBody (formData)
. retrieve ()
. bodyToMono ( Void . class )
您还可以使用BodyInserters在线提供表单数据,如以下示例所示:
复制 import static org . springframework . web . reactive . function . BodyInserters . * ;
Mono < Void > result = client . post ()
. uri ( "/path" , id)
. body ( fromFormData( "k1" , "v1" ) . with ( "k2" , "v2" ))
. retrieve ()
. bodyToMono ( Void . class );
2.4.2 Multipart Data
要发送多部分数据,您需要提供一个MultiValueMap <String,?>,其值可以是代表零件内容的对象实例或代表零件内容和标题的HttpEntity实例。 MultipartBodyBuilder提供了一个方便的API来准备多部分请求。 下面的示例演示如何创建MultiValueMap <String,?>:
复制 MultipartBodyBuilder builder = new MultipartBodyBuilder() ;
builder . part ( "fieldPart" , "fieldValue" );
builder . part ( "filePart" , new FileSystemResource( "...logo.png" ) );
builder . part ( "jsonPart" , new Person( "Jason" ) );
MultiValueMap < String , HttpEntity < ? >> parts = builder . build ();
在大多数情况下,您不必为每个部分指定Content-Type。 内容类型是根据选择用于对其进行序列化的HttpMessageWriter自动确定的,对于资源而言,取决于文件扩展名。 如有必要,您可以通过重载的构建器part方法之一显式提供MediaType以供每个零件使用。
准备好MultiValueMap之后,将其传递到WebClient的最简单方法是通过syncBody方法,如以下示例所示:
复制 MultipartBodyBuilder builder = ... ;
Mono < Void > result = client . post ()
. uri ( "/path" , id)
. syncBody ( builder . build ())
. retrieve ()
. bodyToMono ( Void . class );
如果MultiValueMap包含至少一个非String值,它也可以表示常规表单数据(即application / x-www-form-urlencoded),则无需将Content-Type设置为multipart / form-data。 使用MultipartBodyBuilder时,总是这样,以确保HttpEntity包装器。
作为MultipartBodyBuilder的替代方案,您还可以通过内置的BodyInserters提供内联样式的多部分内容,如以下示例所示:
复制 import static org . springframework . web . reactive . function . BodyInserters . * ;
Mono < Void > result = client . post ()
. uri ( "/path" , id)
. body ( fromMultipartData( "fieldPart" , "value" ) . with ( "filePart" , resource))
. retrieve ()
. bodyToMono ( Void . class );
2.5 Client Filters
您可以通过WebClient.Builder注册客户端过滤器(ExchangeFilterFunction),以拦截和修改请求,如以下示例所示:
复制 WebClient client = WebClient . builder ()
. filter ((request , next) -> {
ClientRequest filtered = ClientRequest . from (request)
. header ( "foo" , "bar" )
. build ();
return next . exchange (filtered);
})
. build ();
这可以用于跨领域的关注,例如身份验证。 以下示例使用过滤器通过静态工厂方法进行基本身份验证:
复制 WebClient client = WebClient . builder ()
. filter ( basicAuthentication( "user" , "password" ) )
. build ();
过滤器全局应用于每个请求。 要更改特定请求的过滤器行为,您可以将请求属性添加到ClientRequest,然后链中的所有过滤器都可以访问该请求属性,如以下示例所示:
复制 WebClient client = WebClient . builder ()
. filter ((request , next) -> {
Optional < Object > usr = request . attribute ( "myAttribute" );
// ...
})
. build ();
client . get () . uri ( "https://example.org/" )
. attribute ( "myAttribute" , "..." )
. retrieve ()
. bodyToMono ( Void . class );
}
您还可以复制现有的WebClient,插入新的过滤器或删除已注册的过滤器。 以下示例在索引0处插入一个基本身份验证过滤器:
复制 WebClient client = webClient . mutate ()
. filters (filterList -> {
filterList . add ( 0 , basicAuthentication( "user" , "password" ) );
})
. build ();
2.6 Synchronous Use
通过在结果末尾进行阻塞,可以以同步方式使用WebClient:
复制 Person person = client . get () . uri ( "/person/{id}" , i) . retrieve ()
. bodyToMono ( Person . class )
. block ();
List < Person > persons = client . get () . uri ( "/persons" ) . retrieve ()
. bodyToFlux ( Person . class )
. collectList ()
. block ();
但是,如果需要进行多次通话,则可以避免单独阻止每个响应,而等待合并的结果,这样会更有效:
复制 Mono < Person > personMono = client . get () . uri ( "/person/{id}" , personId)
. retrieve () . bodyToMono ( Person . class );
Mono < List < Hobby >> hobbiesMono = client . get () . uri ( "/person/{id}/hobbies" , personId)
. retrieve () . bodyToFlux ( Hobby . class ) . collectList ();
Map < String , Object > data = Mono . zip (personMono , hobbiesMono , (person , hobbies) -> {
Map < String , String > map = new LinkedHashMap <>();
map . put ( "person" , personName);
map . put ( "hobbies" , hobbies);
return map;
})
. block ();
以上仅是一个示例。 还有许多其他模式和运算符可用于构建响应式管道,该响应式管道可进行许多远程调用(可能是嵌套的,相互依赖的),而不会阻塞到最后。
您永远不必阻塞Spring MVC控制器。 只需从控制器方法返回结果Flux或Mono。
2.7 Testing
要测试使用WebClient的代码,可以使用模拟Web服务器,例如OkHttp MockWebServer。 要查看其用法示例,请查看Spring Framework测试套件中的WebClientIntegrationTests或OkHttp存储库中的静态服务器示例。