本文棣属于 Spring API 服务开发系列:

  1. Spring Boot API 服务开发指南
  2. Spring Boot API 服务测试指南
  3. Spring Cloud 微服务开发指南
  4. Spring Cloud OAuth2 微服务认证授权

本文将把前面开发的 Spring Boot API 服务 Spring Boot in Practice 改造成为一个 Spring Cloud 微服务应用,完整代码可从 GitHub 获取 Spring Cloud in Practice

Spring Cloud 简介

Spring Cloud 是一系列框架的集合,它利用 Spring Boot 来简化分布式系统中各种基础组件的开发,包括服务注册发现、配置管理、消息总线、负载均衡、断路器、数据监控等。Spring Cloud 并没有重复制造轮子,它只是将各家公司开发的比较成熟且经过实践考验的各种框架组合起来,按照 Spring Boot 风格进行封装以屏蔽掉复杂的配置和实现,最终呈现给开发者一套简单易用的分布式系统开发工具包。微服务是可以独立开发和部署的服务单元,采用微服务架构的应用本质上是一个分布式系统。单体应用只有一个服务,系统中的各个功能模块通过进程内的函数或方法调用进行通信,高效可靠。升级到微服务架构之后,各个功能模块独立成为了运行在不同进程甚至不同机器上的服务,只能通过调用网络服务来进行通信。随之就会出现服务地址管理、配置管理、服务稳定性等问题,Spring Cloud 提供了各种组件来分别解决不同的问题。

Spring Cloud 包含非常多的组件,其中有些由第三方公司开发和维护,下面是一些常用的。

  • 服务注册与发现 Spring Cloud Consul、Spring Cloud Zookeeper、Spring Cloud Alibaba Nacos、Spring Cloud Netflix Eureka
  • 服务通信协议 REST、Spring Cloud Alibaba Dubbo
  • 服务调用客户端 Spring Cloud OpenFeign、Spring Cloud Netflix Feign
  • 服务调用负载均衡 Spring Cloud LoadBalancer、Spring Cloud Netflix Ribbon
  • 服务降级 Spring Cloud Circuit Breaker、Spring Cloud Alibaba Sentinel、Spring Cloud Netflix Hystrix
  • 服务追踪 Spring Cloud Sleuth
  • 服务网关 Spring Cloud Gateway、Spring Cloud Netflix Zuul
  • 安全保护 Spring Cloud Security
  • 配置管理 Spring Cloud Config、Spring Cloud Alibaba Nacos、Spring Cloud Netflix Archaius
  • 消息总线 Spring Cloud Bus
  • 数据流 Spring Cloud Data Flow
  • 事件驱动服务 Spring Cloud Stream
  • 任务执行 Spring Cloud Task
  • 分布式事务 Spring Cloud Alibaba Seata
  • 云存储 Spring Cloud Alibaba OSS

其中 Spring Cloud Netflix 由 Netflix 开发,使用比较广泛,不过目前已进入维护状态,不再更新,不建议新项目使用。Spring Cloud Alibaba 由阿里巴巴开发,2019.7 月从 Spring 孵化器毕业,2019.10 月正式挂牌于 Spring Cloud 官方平台,对 Dubbo、Nacos、RocketMQ 等阿里巴巴开源组件比较熟悉的可以使用。除了这两家大公司提供的组件,Spring Cloud 官方也不断在开发和集成各种组件,目前已经可以满足大多数场景的需求。如果对其它公司提供的组件的稳定性和长久性存疑,完全可以全部使用由官方开发和维护的组件。本文开发项目所用组件均为官方提供,包括 Spring Cloud Consul、Spring Cloud LoadBalancer、Spring Cloud Circuit Breaker、Spring Cloud Gateway、Spring Cloud Security、Spring Cloud Config 等。

整体架构

采用微服务架构的应用有三个问题需要优先解决,一是各个微服务如何安全可控地对外暴露,二是各个微服务如何认证授权,三是如何保障各个微服务之间的调用稳定可靠。

服务暴露

对于服务暴露,微服务系统里的各个微服务一般不直接暴露给外面,而是通过一个网关来集中对外提供服务。这样可以在一个地方统一实施安全、限流、监控等方案,而不是将它们分散在系统各处。这里我们选择 Spring Cloud Gateway 来作为我们的网关,它基于 Spring、Project Reactor 和 Spring Boot 来构建,跟 Spring 生态能够很好地融合。除了其强大的路由功能,由于底层采用了事件驱动模型(使用 Netty 作为应用容器),因此性能上也非常的高效。

认证授权

微服务应用中由于服务众多,如果每个服务都要去处理认证授权,将会出现很多重复性工作。因此建议将认证授权从各个微服务中剥离出来放到网关里集中处理,虽然这会造成网关跟后端微服务轻微耦合,不过换个角度来看,认证和授权属于安全范畴而不属于业务,而网关的职责之一就是保障应用安全,所以放在网关里也是合理的。许多时候后端微服务都需要获取当前登录用户身份,比如用户名或用户 ID,这种情况需要网关在转发请求时以某种方式将登录用户身份一并传递给后端微服务。为了避免跟正常请求体耦合,可放在请求头里,比如使用 HTTP 头 X-User-Id 来传递登录用户 ID。由于只在网关一个地方进行认证授权,避免了多处认证授权带来的单点登录(SSO)需求,因此跟普通的 Spring Boot 应用一样使用 Spring Security 即可。

服务调用

微服务架构里除了网关会调用各个后端微服务,各微服务之间也会相互调用。服务调用的前提是必须先知道被调用的服务有哪些可用的节点,这就需要服务注册与发现,我们将使用 Spring Cloud Consul 来实现此功能。Spring Cloud Consul 会自动将当前服务注册到 Consul 服务里,以及自动从 Consul 服务获取每个服务的可用节点信息,并且在服务节点发生变化时自动更新这些信息。得益于 Spring Cloud Consul 的良好封装,只需使用少量配置和简单注解即可实现服务注册与发现。

逻辑架构

拆分成微服务后应用的逻辑架构如下:

spring-cloud-micro-service-architecture-2

其中需要关注的点有:

  1. 每个微服务的后端数据库互相隔离,不能直接访问其它微服务的数据库,以避免微服务之间紧耦合。
  2. 微服务之间可以相互调用,但不应过多,耦合较紧的功能模块应划分到一个微服务中。不要为了拆分而拆分,避免出现分布式单体。
  3. 网关将外部请求路由到某个后端微服务,同时还承担安全、限流、监控等各微服务的横切需求。网关实现了登录、退出 API,通过请求用户微服务来获取认证用户信息,认证结果保存在 Session 里。为了防止重启后丢失,Session 存放在外部缓存里。
  4. 网关在转发请求给后端微服务时,通过 HTTP 头 X-User-Id 将登录用户 ID 传递过去。同样,微服务之间相互调用时,也要按此方式传递登录用户 ID。
  5. Consul 服务作为分布式系统的协调中心,负责在各节点之间同步服务和配置信息。每个微服务的节点在启动时自动注册到 Consul 服务里,网关和其它微服务通过 Consul 服务自动发现每个微服务的可用节点地址。

项目结构

为了方便编写代码,本项目采用了 Maven 多模块目录结构,这样可以在一个工程里编写所有微服务的代码。实际项目开发中,为了更好地隔离各个微服务的代码,可将每个微服务独立成为一个项目。各模块共用的配置,比如基础依赖、依赖包版本号等,统一放在了根模块的 POM 文件中,以避免重复。此外还将各微服务中公用的代码,比如异常处理、微服务接口封装等,抽取到了 common 模块中。

.
├── common # 公共库
│   ├── pom.xml
│   └── src
├── docker-compose.yml # Docker Compose 配置文件
├── file # 文件服务
│   ├── Dockerfile
│   ├── pom.xml
│   └── src
├── gateway # 网关
│   ├── Dockerfile
│   ├── pom.xml
│   └── src
├── pom.xml # 根模块 Maven 配置文件
├── post # 动态服务
│   ├── Dockerfile
│   ├── pom.xml
│   └── src
├── stat # 统计服务
│   ├── Dockerfile
│   ├── pom.xml
│   └── src
└── user # 用户服务
    ├── Dockerfile
    ├── pom.xml
    └── src

开发微服务

每个微服务的长相都比较类似,下面以用户服务为例。由于是升级改造,这里我们只关注跟微服务架构有关的部分。

Maven 配置

<?xml version="1.0" ?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>net.jaggerwang</groupId>
        <artifactId>spring-cloud-in-practice</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>

    <groupId>net.jaggerwang</groupId>
    <artifactId>spring-cloud-in-practice-user</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>spring-cloud-in-practice-user</name>
    <description>Spring cloud in practice user</description>

    <dependencies>
        <dependency>
            <groupId>net.jaggerwang</groupId>
            <artifactId>spring-cloud-in-practice-common</artifactId>
            <version>${scip-common.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
        </dependency>

        <!--...-->
    </dependencies>

    <!--...-->
</project>

上面的配置里,首先设置父模块为 spring-cloud-in-practice,里面包含了 Spring Boot 和 Spring Cloud 的 BOM(物料清单),以及其它本项目中要用到的依赖包的版本号,其目的是为了统一维护各子模块里的依赖包版本。接下来依次引入了本模块的依赖包,包括:

  1. 公共库 spring-cloud-in-practice-common
  2. Spring Boot Web 应用 spring-boot-starter-web
  3. Spring Cloud Consul 服务注册与发现 spring-cloud-starter-consul-discovery
  4. Spring Cloud Circuitbreaker 服务调用断路器 spring-cloud-starter-circuitbreaker-reactor-resilience4j

服务配置

package net.jaggerwang.scip.user.api.config;

...

@Configuration(proxyBeanMethods = false)
public class ServiceConfig {
    @Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> cbFactoryCustomizer() {
        return factory -> factory.configureDefault(id -> {
            var timeout = Duration.ofSeconds(2);
            if (id.equals("fast")) {
                timeout = Duration.ofSeconds(1);
            } else if (id.equals("slow")) {
                timeout = Duration.ofSeconds(5);
            }

            return new Resilience4JConfigBuilder(id)
                    .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
                    .timeLimiterConfig(TimeLimiterConfig.custom()
                            .timeoutDuration(timeout)
                            .build())
                    .build();
        });
    }

    @LoadBalanced
    @Bean
    public RestTemplate fileServiceRestTemplate(RestTemplateBuilder builder) {
        return builder
                .rootUri("http://spring-cloud-in-practice-file")
                .interceptors(new HeadersRelayRequestInterceptor("X-User-Id"))
                .build();
    }

    @Bean
    public FileSyncService fileSyncService(@Qualifier("fileServiceRestTemplate") RestTemplate restTemplate,
                                           CircuitBreakerFactory cbFactory,
                                           ObjectMapper objectMapper) {
        return new FileSyncServiceImpl(restTemplate, cbFactory, objectMapper);
    }

    @LoadBalanced
    @Bean
    public RestTemplate statServiceRestTemplate(RestTemplateBuilder builder) {
        return builder
                .rootUri("http://spring-cloud-in-practice-stat")
                .interceptors(new HeadersRelayRequestInterceptor("X-User-Id"))
                .build();
    }

    @Bean
    public StatSyncService statSyncService(@Qualifier("statServiceRestTemplate") RestTemplate restTemplate,
                                           CircuitBreakerFactory cbFactory,
                                           ObjectMapper objectMapper) {
        return new StatSyncServiceImpl(restTemplate, cbFactory, objectMapper);
    }
}

上面的断路器配置了三种超时级别,分别是默认的 5s,fast 的 2s,以及 slow 的 10s,在调用其它微服务时可依据当前 API 来决定使用哪种级别的断路器。

为了返回完整的用户信息,用户服务会调用文件服务来查询头像信息,并调用统计服务来查询统计信息,因此这里定义了 FileSyncServiceStatSyncService Bean。以 FileSyncService 为例,它只是一个接口,其实现为 FileSyncServiceImpl。从名字可以看出其为同步调用,与之对应的还有异步接口 FileAsyncService 和其实现 FileAsyncServiceImpl。它们均放在 common 模块里,以便在各个微服务之间共享代码。传递给 FileSyncServicerestTemplate 对象来自于自定义的 RestTemplate Bean,我们为其指定了 rootUri 并通过 @LoadBalanced 注解为其添加了负载均衡能力,此外还通过拦截器 HeadersRelayRequestInterceptor 来自动中继 HTTP 头 X-User-Id

拦截器 HeadersRelayRequestInterceptor 用来将当前请求头里的指定头传递给被调用的服务,具体实现如下:

package net.jaggerwang.scip.common.api.interceptor;

...

public class HeadersRelayRequestInterceptor implements ClientHttpRequestInterceptor {
    private List<String> headers;

    public HeadersRelayRequestInterceptor(String... headers) {
        this.headers = Arrays.asList(headers);
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                                        ClientHttpRequestExecution execution) throws IOException {
        var requestAttrs = RequestContextHolder.getRequestAttributes();
        if (requestAttrs instanceof ServletRequestAttributes) {
            var upstreamRequest = ((ServletRequestAttributes) requestAttrs).getRequest();
            for (var header: headers) {
                request.getHeaders().addAll(header,
                        Collections.list(upstreamRequest.getHeaders(header)));
            }
        }
        return execution.execute(request, body);
    }
}

注意,如果调用服务是在一个子线程里进行,为了能使用 RequestContextHolder 获取到当前请求对象,需要设置 DispatcherServlet.threadContextInheritable 属性为 true

package net.jaggerwang.scip.user.api.config;

...

@Configuration(proxyBeanMethods = false)
public class CommonConfig {
    @Autowired
    private DispatcherServlet dispatcherServlet;

    @PostConstruct
    public void init() {
        dispatcherServlet.setThreadContextInheritable(true);
    }

    ...
}

应用配置

应用配置文件里除了通常的 Spring Boot 配置,增加了一些跟服务注册发现相关的。

...

spring:
    cloud:
        loadbalancer:
            ribbon:
                enabled: false
        consul:
            host: ${SCIP_SPRING_CLOUD_CONSUL_HOST:localhost}
            port: ${SCIP_SPRING_CLOUD_CONSUL_PORT:8500}
            discovery:
                enabled: true
                register: true
                healthCheckPath: /actuator/health

上面配置禁用了不再更新的 Ribbon 负载均衡器,以启用 Spring Cloud 官方的 LoadBalancer。此外还开启了 Spring Cloud Consul 的服务注册与发现功能,用户服务既要被其它服务调用,又需要调用其它服务。

配置网关

路由配置

package net.jaggerwang.scip.gateway.api.config;

...

@Configuration(proxyBeanMethods = false)
public class RouteConfig {
    @Bean
    public RouteLocator routes(RouteLocatorBuilder builder) {
        return builder.routes()
                .route(p -> p.path("/user/**")
                        .filters(f -> f.removeRequestHeader("Cookie"))
                        .uri("lb://spring-cloud-in-practice-user"))
                .route(p -> p.path("/post/**")
                        .filters(f -> f.removeRequestHeader("Cookie"))
                        .uri("lb://spring-cloud-in-practice-post"))
                .route(p -> p.path("/file/**", "/files/**")
                        .filters(f -> f.removeRequestHeader("Cookie"))
                        .uri("lb://spring-cloud-in-practice-file"))
                .route(p -> p.path("/stat/**")
                        .filters(f -> f.removeRequestHeader("Cookie"))
                        .uri("lb://spring-cloud-in-practice-stat"))
                .build();
    }
}

上面的配置将不同路径前缀的请求路由到对应的后端微服务,注意协议用的是 lb,服务名字为各个微服务应用配置里指定的名称 spring.application.name

为了将当前登录用户 ID 传递给后端微服务,使用了一个全局的网关过滤器来完成此功能。

package net.jaggerwang.scip.gateway.api.filter;

...

@Component
public class UserIdGatewayGlobalFilter implements GlobalFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return ReactiveSecurityContextHolder.getContext()
                .flatMap(securityContext -> {
                    var auth = securityContext.getAuthentication();
                    if (auth == null || auth instanceof AnonymousAuthenticationToken ||
                            !auth.isAuthenticated()) {
                        return chain.filter(exchange);
                    }

                    var loggedUser = (LoggedUser) auth.getPrincipal();
                    return chain.filter(exchange.mutate()
                            .request(exchange.getRequest()
                                    .mutate()
                                    .headers(headers -> headers
                                            .set("X-User-Id", loggedUser.getId().toString()))
                                    .build())
                            .build());
                });
    }
}

为了防止未登录时 ReactiveSecurityContextHolder.getContext() 结果为空,使用了一个 Web 过滤器来初始化 Mono 上下文里的 SecurityContext

package net.jaggerwang.scip.gateway.api.filter;

...

@Component
public class ReactiveContextWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return exchange.getSession()
                .flatMap(session -> {
                    session.getAttributes().putIfAbsent(DEFAULT_SPRING_SECURITY_CONTEXT_ATTR_NAME,
                            new SecurityContextImpl());
                    return chain.filter(exchange)
                            .subscriberContext(context -> Context.of(
                                    ServerWebExchange.class, exchange,
                                    SecurityContext.class, Mono.just(session.getAttributes().get(
                                            DEFAULT_SPRING_SECURITY_CONTEXT_ATTR_NAME))));
                });
    }
}

安全配置

package net.jaggerwang.scip.gateway.api.config;

...

@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }

    @Bean
    public ReactiveAuthenticationManager authenticationManager(
            ReactiveUserDetailsService userDetailsService) {
        var authenticationManager = new UserDetailsRepositoryReactiveAuthenticationManager(
                userDetailsService);
        authenticationManager.setPasswordEncoder(passwordEncoder());
        return authenticationManager;
    }

    @Bean
    public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
        return http
                .csrf(csrf -> csrf.disable())
                .exceptionHandling(exceptionHandling -> exceptionHandling
                        .authenticationEntryPoint(new HttpStatusServerEntryPoint(
                                HttpStatus.UNAUTHORIZED))
                )
                .authorizeExchange(authorizeExchange -> authorizeExchange
                        .pathMatchers("/favicon.ico", "/csrf", "/vendor/**", "/webjars/**",
                                "/*/actuator/**", "/", "/graphql", "/login", "/logout",
                                "/auth/**", "/user/register", "/files/**").permitAll()
                        .anyExchange().authenticated())
                .build();
    }
}

由于将认证授权提取到了网关中,因此网关里要实现登录、退出 API,并验证权限。对于 WebFlux 环境,需要使用响应式的认证管理器 ReactiveAuthenticationManager 来手动设置登录状态。登录、退出 API 在控制器 net.jaggerwang.scip.gateway.adapter.controller.AuthController 里提供了实现。

实现 GraphQL API

设置 DataFetcher 上下文

在网关里实现 GraphQL API 跟普通的 Spring Boot 应用类似,只不过是把 DataFetcher 里调用用例层方法替换成调用 REST API。其中的麻烦之处在于如何在 DataFetcher 里获取到原始 GraphQL API 请求的上下文。每个 GraphQL API 请求会触发多个 DataFetcher 异步并发执行,GraphQL Java 默认并没有把原始请求的上下文传递给 DataFetcher,需要我们自己来实现。

package net.jaggerwang.scip.gateway.adapter.graphql;

...

@Component
@Primary
public class CustomExecutionInputCustomizer implements ExecutionInputCustomizer {
    @Override
    public Mono<ExecutionInput> customizeExecutionInput(ExecutionInput executionInput,
                                                        ServerWebExchange serverWebExchange) {
        return Mono.subscriberContext()
                .map(context -> executionInput.transform(builder -> builder.context(context)));
    }
}

上面设置了 ExecutionInput.context 属性值为原始请求的上下文,该上下文最终会发送给 DataFetcher 的数据获取环境 DataFetchingEnvironment,可通过其 getContext() 方法来在 DataFetcher 里获取到原始请求的上下文。

安全配置

类似于前面 Spring Boot 应用里的 GraphQL API,网关里也使用切面来执行安全检查,只不过在 WebFlux 环境里要更麻烦一些。

package net.jaggerwang.scip.gateway.api.security;

...

@Component
@Aspect
public class SecureGraphQLAspect {
    @Around("allDataFetchers() && isInApplication()")
    public Object doSecurityCheck(ProceedingJoinPoint joinPoint) {
        var args = joinPoint.getArgs();
        var env = (DataFetchingEnvironment) args[0];
        return ReactiveSecurityContextHolder.getContext()
                .doOnSuccess(securityContext ->  {
                    var method = ((MethodSignature) joinPoint.getSignature()).getMethod();
                    var permitAll = method.getAnnotation(PermitAll.class);
                    if (permitAll == null) {
                        var auth = securityContext.getAuthentication();
                        if (auth == null || auth instanceof AnonymousAuthenticationToken ||
                                !auth.isAuthenticated()) {
                            throw new UnauthenticatedException("未认证");
                        }
                    }
                })
                .flatMap(securityContext -> {
                    Object result;
                    try {
                        result = joinPoint.proceed();
                    } catch (Throwable e) {
                        return Mono.error(new RuntimeException(e));
                    }
                    return result instanceof Mono ? (Mono) result : Mono.just(result);
                })
                .subscriberContext(context -> env.getContext())
                .toFuture();
    }

    @Pointcut("target(graphql.schema.DataFetcher)")
    private void allDataFetchers() {
    }

    @Pointcut("within(net.jaggerwang.scip.gateway.adapter.graphql..*)")
    private void isInApplication() {
    }
}

注意点如下:

  1. 由于 DataFetcher 是异步执行,在方法返回时执行尚未结束,因此这里使用了 @Around 注解,以便对返回的 Mono 对象增加其它算子(Operator)。
  2. 通过 subscriberContext(context -> env.getContext()) 把 DataFetcher 的上下文传递给其返回的 Mono 对象,以便在 Mono 计算过程中可以获取到原始请求的上下文,包括认证信息。
  3. 由于 GraphQL Java 不支持 Reactor Mono,这里通过 toFuture() 统一将所有 DataFetcher 返回的 Mono 转换成了 CompletableFuture

传递登录用户 ID 到后端微服务

DataFetcher 使用 WebClient 来请求后端微服务,因此可以使用 WebClient 过滤器来透明地传递登录用户 ID。

package net.jaggerwang.scip.gateway.api.filter;

...

@Component
public class UserIdExchangeFilter implements ExchangeFilterFunction {
    @Override
    public Mono<ClientResponse> filter(ClientRequest clientRequest,
                                       ExchangeFunction exchangeFunction) {
        return ReactiveSecurityContextHolder.getContext()
                .flatMap(securityContext -> {
                    var auth = securityContext.getAuthentication();
                    if (auth == null || auth instanceof AnonymousAuthenticationToken ||
                            !auth.isAuthenticated()) {
                        return exchangeFunction.exchange(clientRequest);
                    }

                    var loggedUser = (LoggedUser) auth.getPrincipal();
                    return exchangeFunction.exchange(ClientRequest.from(clientRequest)
                            .headers(headers -> headers
                                    .set("X-User-Id", loggedUser.getId().toString()))
                            .build());
                });
    }
}

服务配置

DataFetcher 里需要调用各个后端微服务,因此需要定义各个后端微服务的 Bean。

package net.jaggerwang.scip.gateway.api.config;

...

@Configuration(proxyBeanMethods = false)
public class ServiceConfig {
    @Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> cbFactoryCustomizer() {
        return factory -> factory.configureDefault(id -> {
            var timeout = Duration.ofSeconds(5);
            if (id.equals("fast")) {
                timeout = Duration.ofSeconds(2);
            } else if (id.equals("slow")) {
                timeout = Duration.ofSeconds(10);
            }

            return new Resilience4JConfigBuilder(id)
                    .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
                    .timeLimiterConfig(TimeLimiterConfig.custom()
                            .timeoutDuration(timeout)
                            .build())
                    .build();
        });
    }

    @LoadBalanced
    @Bean
    public WebClient.Builder webClientBuilder(UserIdExchangeFilter userIdExchangeFilter) {
        return WebClient.builder().filter(userIdExchangeFilter);
    }

    @Bean
    public UserAsyncService userAsyncService(WebClient.Builder builder,
                                             ReactiveCircuitBreakerFactory cbFactory,
                                             ObjectMapper objectMapper) {
        var webClient = builder.baseUrl("http://spring-cloud-in-practice-user").build();
        return new UserAsyncServiceImpl(webClient, cbFactory, objectMapper);
    }

    ...
}

因为是在 WebFlux 环境,这里使用了异步版本的断路器工厂 ReactiveResilience4JCircuitBreakerFactory,以及异步服务对象。

部署服务

本应用共包含 8 个服务,分别是网关、四个微服务(用户、动态、文件、统计)、Redis 服务、Consul 服务和 MySQL 服务。部署起来比较麻烦,这也是微服务架构的弊端之一,不过可以借助 Docker Compose 来简化部署工作。关于详细部署步骤,可以查看本项目的 README 文档,其中包含了本地手动部署和 Docker Compose 部署两种方式。

参考资料

  1. Spring Cloud in Practice
  2. Spring Boot
  3. Spring Data JPA
  4. Querydsl JPA
  5. Spring Security
  6. GraphQL Java
  7. Flyway
  8. Spring Cloud Gateway
  9. Spring Cloud Consul
  10. Spring Cloud Circuit Breaker