# WebFlux 之动态路由
因为想基于 WebFlux 实现一个网关的路由转发功能,且 Gateway 也是基于 WebFlux 实现的网关,网上也有许多 Gateway 的动态路由教程,而本章主要将单纯基于 WebFlux 如何实现动态路由
# WebFlux 接口实现
首先是最底层 HttpHandler 接口,该接口是对 HTTP 请求处理的最底层抽象,通过这个接口将不同 “接口” 实现方式统一起来。所以不管是注解控制器方式的编程还是函数式的编程,在初始化环境时,都需要初始化这么一个类的实例,HttpHandler 的定义如下所示
package org.springframework.http.server.reactive; | |
import reactor.core.publisher.Mono; | |
public interface HttpHandler { | |
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response); | |
} |
# 注解方式例子
基于如下就实现了一个 /api/mono2 接口
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import reactor.core.publisher.Mono; | |
@RestController | |
@RequestMapping("/api/") | |
@Slf4j | |
public class HelloController { | |
@GetMapping(value = "/mono2") | |
public Mono<String> monoDemo() { | |
log.info("start..."); | |
Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux")); | |
log.info("end..."); | |
return mono; | |
} | |
} |
# 函数方式例子
基于如下就实现了一个 /api/mono 接口
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.web.reactive.function.server.ServerRequest; | |
import org.springframework.web.reactive.function.server.ServerResponse; | |
import org.springframework.web.reactive.function.server.RouterFunction; | |
import static org.springframework.web.reactive.function.server.ServerResponse.ok; | |
import static org.springframework.web.reactive.function.server.RouterFunctions.route; | |
import static org.springframework.web.reactive.function.server.RequestPredicates.GET; | |
import reactor.core.publisher.Mono; | |
@Configuration | |
@Slf4j | |
public class RouterConfig { | |
@Bean | |
public RouterFunction<ServerResponse> timerRouter() { | |
return route(GET("/api/mono"), req -> this.monoDemo(req)); | |
} | |
public RouterFunction<ServerResponse> monoDemo(ServerRequest serverRequest){ | |
log.info("start..."); | |
Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux")); | |
log.info("end..."); | |
return ok().body(mono,String.class); | |
} | |
} |
所以我将使用函数方式实现动态路由
# 函数式接口请求过程
首先 DispatcherHandler 类
该类是接口 WebHandler 的实现。WebHandler 在 HttpHandler 功能上提供链式处理能力,可以组合异常处理器(WebExceptionHandler)、过滤器(WebFilter)以及目标。
// 地址:package org.springframework.web.reactive | |
@Override | |
public Mono<Void> handle(ServerWebExchange exchange) { | |
if (this.handlerMappings == null) { // 是否有处理工具 | |
return createNotFoundError(); | |
} | |
return Flux.fromIterable(this.handlerMappings) // 便利 handlerMappings | |
.concatMap(mapping -> mapping.getHandler(exchange))// 一个一个取出去执行 getHandler | |
.next() //Flux 转 Mono | |
.switchIfEmpty(createNotFoundError()) // 如果前面数据为空 | |
.flatMap(handler -> invokeHandler(exchange, handler))// 只取一个数据 | |
.flatMap(result -> handleResult(exchange, result));// 只取一个数据 | |
} |
在 getHandler 方法处进入 AbstractHandlerMapping 类
// 地址:package org.springframework.web.reactive.handler; | |
@Override | |
public Mono<Object> getHandler(ServerWebExchange exchange) { | |
return getHandlerInternal(exchange).map(handler -> { // 注意 getHandlerInternal 方法 | |
if (logger.isDebugEnabled()) { | |
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler); | |
} | |
ServerHttpRequest request = exchange.getRequest(); | |
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) { | |
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null); | |
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange); | |
config = (config != null ? config.combine(handlerConfig) : handlerConfig); | |
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) { | |
return REQUEST_HANDLED_HANDLER; | |
} | |
} | |
return handler; | |
}); | |
} |
通过 getHandlerInternal 方法进入 RouterFunctionMapping 类
// 地址:package org.springframework.web.reactive.function.server.support; | |
@Override | |
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { | |
if (this.routerFunction != null) { // 注意 routerFunction 变量。从这可看到熟悉接口路径 | |
ServerRequest request = ServerRequest.create(exchange, this.messageReaders); | |
return this.routerFunction.route(request) | |
.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler)); | |
} | |
else { | |
return Mono.empty(); | |
} | |
} |
总结:程序运行时需要动态改变 RouterFunctionMapping 类下的 routerFunction 变量。增加接口和函数的映射关系。
WebHandler 和 HttpHandler 关系图
# 函数式接口初始化过程
首先 RouterFunctionMapping 类初始化
// 地址:package org.springframework.web.reactive.function.server.support; | |
protected void initRouterFunctions() { // 通过这个方法初始化路由 routerFunction 变量 | |
List<RouterFunction<?>> routerFunctions = routerFunctions(); //RouterFunction 函数 | |
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null); | |
logRouterFunctions(routerFunctions); | |
} | |
// 通过 routerFunctions 方法。获取所有的 RouterFunction。 | |
private List<RouterFunction<?>> routerFunctions() { | |
List<RouterFunction<?>> functions = obtainApplicationContext() // 获取上下文 | |
.getBeanProvider(RouterFunction.class) // 获取 RouterFunction 的 bean | |
.orderedStream() | |
.map(router -> (RouterFunction<?>)router) | |
.collect(Collectors.toList()); | |
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList()); | |
} |
# 实现动态路由
实现修改 Spring 上下文中的单例 bean
package com.cn.demotest.bean; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.ApplicationContextAware; | |
import org.springframework.context.support.AbstractApplicationContext; | |
import org.springframework.stereotype.Component; | |
/** | |
* <P> | |
* bean 工厂 | |
* </p> | |
* | |
* @author 昔日织 | |
* @since 2021-07-02 | |
*/ | |
@Component | |
public class RouterBeanFactory implements ApplicationContextAware { | |
private ApplicationContext applicationContext; | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
this.applicationContext = applicationContext; | |
} | |
/** | |
* 获取 bean 注册表 | |
* @return | |
*/ | |
public DefaultSingletonBeanRegistry getBeanRegistry(){ | |
return (DefaultSingletonBeanRegistry)((AbstractApplicationContext)applicationContext).getBeanFactory(); | |
} | |
public void upDateBean(String name,Object o){ | |
DefaultSingletonBeanRegistry beanRegistry = getBeanRegistry(); | |
if(beanRegistry.containsSingleton(name)){ | |
beanRegistry.destroySingleton(name); | |
} | |
beanRegistry.registerSingleton(name,o); | |
} | |
} |
新增或者修改路由的 updateRouter
import org.springframework.stereotype.Component; | |
import static org.springframework.web.reactive.function.server.RouterFunctions.route | |
import org.springframework.web.reactive.function.server.RouterFunction; | |
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping; | |
import java.lang.reflect.Method; | |
import java.lang.reflect.InvocationTargetException; | |
@Component | |
public class updateRouter implements ApplicationContextAware { | |
@Autowired | |
private RouterBeanFactory routerBeanFactory; | |
private ApplicationContext applicationContext; | |
public void creat(String appName){ | |
String format = String.format("/%s/**", appName); | |
RouterFunction<ServerResponse> appRouter = route(path(format), serverRequest -> requestTranspond(serverRequest, appName)); //requestTranspond 方法是用于接口转发。 | |
routerBeanFactory.upDateBean("bean"+appName,appRouter); | |
RouterFunctionMapping routerFunctionMapping = applicationContext.getBean(RouterFunctionMapping.class); | |
Class<? extends RouterFunctionMapping> aClass = routerFunctionMapping.getClass(); | |
try { | |
Method initRouterFunctions = aClass.getDeclaredMethod("initRouterFunctions", new Class[] {}); | |
initRouterFunctions.setAccessible(true); | |
initRouterFunctions.invoke(routerFunctionMapping,new Object[] {}); | |
} catch (NoSuchMethodException e) { | |
e.printStackTrace(); | |
} catch (IllegalAccessException e) { | |
e.printStackTrace(); | |
} catch (InvocationTargetException e) { | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
this.applicationContext = applicationContext; | |
} | |
} |
至此调用 creat 方法可以增加一个新的接口。
# 实现新服务接入增加接口(简)
网关可以通过服务注册发现来对外提供接口。如果新服务来了则增加一套对应接口。
实现思路
- 实现 ApplicationRunner 接口。使 Spring boot 启动完毕后会自动回调这个方法。
- 实现 ApplicationContextAware 方法获取上下文 applicationContext 。
- 通过 applicationContext 获取 Controller 注解过的单例对象。
- 遍历对象拼接得到所有接口
- 带个唯一标识和这些接口与 ip 和端口信息注册到网关上。
- 通过 creat 方法网关增加一个 / 唯一标识 /** 的路由接口。
- 访问该服务就是 网关的网址 / 唯一标识 / 服务的接口路径。网关在通过 requestTranspond 方法进行转发
requestTranspond 方法请求参考文章 WebFlux 之 WebClient 的使用记录
# 总结
功能实现有些潦草,有什么建议可以留言。如有不足之处,还望指出。