相关文章推荐
侠义非凡的菠菜  ·  Python pandas ...·  4 天前    · 
侠义非凡的菠菜  ·  rc-form源码浅析 - ·  7 月前    · 
侠义非凡的菠菜  ·  Configuring HTTP ...·  10 月前    · 
聪明的作业本  ·  Advanced query ...·  59 分钟前    · 
失望的鸡蛋面  ·  "Microsoft Outlook ...·  1小时前    · 
坚强的柿子  ·  mongodb 多表关联处理 : ...·  1小时前    · 
不爱学习的火腿肠  ·  java ...·  2 小时前    · 
旅行中的铁链  ·  错误信息:SSL ShakeHand ...·  3 小时前    · 
憨厚的金鱼  ·  Scanpy数据结构:AnnData - 何帅 ·  3 小时前    · 

完整的代码请参考 github.com/javahongxi/…

我们的一些企业对于HTTP服务有一些非正常的做法,它们客户端的请求body是加密的,即在服务端需要对请求body进行解密,而服务端响应的body也要求加密。本文就来揭秘这一需求在 WebFlux 中如何实现,我们给 request/response body 均增加一个表示时间戳的字段 start/end 来模拟请求数据解密和响应数据加密,思路如下。

首先我们需要知道,WebFlux 的过滤器/拦截器是统一用 WebFilter 来表示的,与 Spring MVC 类似,对于 application/json 请求,WebFlux 读取 body inputstream 也只能读取一次,对于query params / form-data / form-x-www 请求,可以反复获取。 所以,怎么修改 application/json 请求的 request/response body 是一个难点。网上找遍了例子,大都是错误的例子或是hello demo,完全没法解决本文的需求。经过苦心探索,本人终于找到了解决方案,代码如下。

import lombok.extern.slf4j.Slf4j;
import org.hongxi.sample.webflux.support.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import java.util.Map;
 * Created by shenhongxi on 2021/4/29.
@Slf4j
@Order(-1)
@Component
public class ModifyBodyFilter implements WebFilter {
    @Autowired
    private ServerCodecConfigurer codecConfigurer;
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if (WebUtils.shouldNotFilter(exchange)) {
            return chain.filter(exchange);
        return ParamUtils.from(exchange)
                .map(params -> decorate(exchange, params))
                .flatMap(chain::filter);
    private ServerWebExchange decorate(ServerWebExchange exchange, Map<String, Object> params) {
        if (params.isEmpty()) {
            return exchange;
        ServerHttpResponse serverHttpResponse = new ModifiedServerHttpResponse(exchange, codecConfigurer.getReaders());
        MediaType contentType = exchange.getRequest().getHeaders().getContentType();
        if (contentType != null && contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
            Map<String, Object> decrypted = Crypto.decrypt(params);
            exchange.getAttributes().put(WebUtils.REQUEST_PARAMS_ATTR, decrypted);
            byte[] rawBody = JacksonUtils.serialize(decrypted);
            ServerHttpRequest serverHttpRequest = new ModifiedServerHttpRequest(exchange.getRequest(), rawBody);
            return exchange.mutate().request(serverHttpRequest).response(serverHttpResponse).build();
        ServerWebExchange serverWebExchange = new ModifiedServerWebExchange(exchange);
        return serverWebExchange.mutate().response(serverHttpResponse).build();
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.codec.DecoderHttpMessageReader;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.web.bind.support.WebExchangeDataBinder;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.Map;
 * Created by shenhongxi on 2021/4/29.
public abstract class ParamUtils {
    @SuppressWarnings("rawtypes")
    private static final HttpMessageReader messageReader =
            new DecoderHttpMessageReader<>(new Jackson2JsonDecoder());
    @SuppressWarnings("unchecked")
    public static Mono<Map<String, Object>> from(ServerWebExchange exchange) {
        Map<String, Object> data = exchange.getAttribute(WebUtils.REQUEST_PARAMS_ATTR);
        if (data != null) {
            return Mono.just(data);
        Mono<Map<String, Object>> params;
        MediaType contentType = exchange.getRequest().getHeaders().getContentType();
        if (contentType != null && contentType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
            params = (Mono<Map<String, Object>>) messageReader.readMono(
                    ResolvableType.forType(Map.class), exchange.getRequest(), Collections.emptyMap());
        } else {
            params = WebExchangeDataBinder.extractValuesToBind(exchange);
        return params.doOnNext(e -> exchange.getAttributes().put(WebUtils.REQUEST_PARAMS_ATTR, e));
import io.netty.buffer.ByteBufAllocator;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets;
 * Created by shenhongxi on 2021/4/29.
public class ModifiedServerHttpRequest extends ServerHttpRequestDecorator {
    private final byte[] rawBody;
    public ModifiedServerHttpRequest(ServerHttpRequest delegate, byte[] rawBody) {
        super(delegate);
        this.rawBody = rawBody;
    @Override
    public Flux<DataBuffer> getBody() {
        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(this.rawBody.length);
        buffer.write(this.rawBody);
        return Flux.just(buffer);
    @Override
    public HttpHeaders getHeaders() {
        // 必须 new,不能直接操作 super.getHeaders()(readonly)
        HttpHeaders headers = new HttpHeaders();
        headers.addAll(super.getHeaders());
        headers.setContentLength(this.rawBody.length);
        return headers;
     * @return body json string
    public String bodyString() {
        return new String(rawBody, StandardCharsets.UTF_8);
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
 * Created by shenhongxi on 2021/4/29.
public class ModifiedServerHttpResponse extends ServerHttpResponseDecorator {
    private final ServerWebExchange exchange;
    private final List<HttpMessageReader<?>> messageReaders;
    public ModifiedServerHttpResponse(ServerWebExchange exchange,
                                      List<HttpMessageReader<?>> messageReaders) {
        super(exchange.getResponse());
        this.exchange = exchange;
        this.messageReaders = messageReaders;
    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.setContentType(MediaType.APPLICATION_JSON);
        // 这里只是借用 ClientResponse 这个类获取修改之前的 body
        // server 端最终返回的是 ServerResponse/ServerHttpResponse
        ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);
        Mono<byte[]> modifiedBody = clientResponse.bodyToMono(byte[].class)
                .flatMap(originalBody -> Mono.just(Crypto.encrypt(originalBody)));
        BodyInserter<Mono<byte[]>, ReactiveHttpOutputMessage> bodyInserter =
                BodyInserters.fromPublisher(modifiedBody, byte[].class);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
                exchange.getResponse().getHeaders());
        return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
            Mono<DataBuffer> messageBody = DataBufferUtils.join(outputMessage.getBody());
            HttpHeaders headers = getDelegate().getHeaders();
            if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
                    || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
            return getDelegate().writeWith(messageBody);
    @Override
    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
        return writeWith(Flux.from(body).flatMapSequential(p -> p));
    private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
        ClientResponse.Builder builder = ClientResponse.create(HttpStatus.OK, messageReaders);
        return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
import reactor.core.publisher.Mono;
 * Created by shenhongxi on 2021/4/29.
public class ModifiedServerWebExchange extends ServerWebExchangeDecorator {
    public ModifiedServerWebExchange(ServerWebExchange delegate) {
        super(delegate);
    @Override
    public Mono<MultiValueMap<String, String>> getFormData() {
        return super.getFormData()
                .map(Crypto::decrypt)
                .doOnNext(decrypted -> getDelegate().getAttributes()
                        .put(WebUtils.REQUEST_PARAMS_ATTR, decrypted)
import org.springframework.util.MultiValueMap;
import org.springframework.util.MultiValueMapAdapter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
 * Created by shenhongxi on 2021/4/29.
public class Crypto {
     * 模拟解密逻辑:添加一个请求参数 start
     * @param requestBody
     * @return
    public static Map<String, Object> decrypt(Map<String, Object> requestBody) {
        Map<String, Object> decrypted = new HashMap<>(requestBody);
        decrypted.put("start", System.currentTimeMillis());
        return decrypted;
     * 模拟解密逻辑:添加一个请求参数 start
     * @param formData
     * @return
    public static MultiValueMap<String, String> decrypt(MultiValueMap<String, String> formData) {
        MultiValueMap<String, String> decrypted = new MultiValueMapAdapter<>(formData);
        decrypted.put("start", Collections.singletonList(String.valueOf(System.currentTimeMillis())));
        return decrypted;
     * 模拟加密逻辑:添加一个响应参数 end
     * @param responseBody
     * @return
    public static byte[] encrypt(byte[] responseBody) {
        Map<String, Object> result = JacksonUtils.deserialize(responseBody, Map.class);
        result.put("end", System.currentTimeMillis());
        return JacksonUtils.serialize(result);
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
 * Created by shenhongxi on 2021/5/3.
public class BodyInserterContext implements BodyInserter.Context {
	private final ExchangeStrategies exchangeStrategies;
	public BodyInserterContext() {
		this.exchangeStrategies = ExchangeStrategies.withDefaults();
	public BodyInserterContext(ExchangeStrategies exchangeStrategies) {
		this.exchangeStrategies = exchangeStrategies;
	@Override
	public List<HttpMessageWriter<?>> messageWriters() {
		return exchangeStrategies.messageWriters();
	@Override
	public Optional<ServerHttpRequest> serverRequest() {
		return Optional.empty();
	@Override
	public Map<String, Object> hints() {
		return Collections.emptyMap();
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.server.ServerWebExchange;
 * Created by shenhongxi on 2021/5/3.
public class CachedBodyOutputMessage implements ReactiveHttpOutputMessage {
	private final DataBufferFactory bufferFactory;
	private final HttpHeaders httpHeaders;
	private boolean cached = false;
	private Flux<DataBuffer> body = Flux
			.error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));
	public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
		this.bufferFactory = exchange.getResponse().bufferFactory();
		this.httpHeaders = httpHeaders;
	@Override
	public void beforeCommit(Supplier<? extends Mono<Void>> action) {
	@Override
	public boolean isCommitted() {
		return false;
	boolean isCached() {
		return this.cached;
	@Override
	public HttpHeaders getHeaders() {
		return this.httpHeaders;
	@Override
	public DataBufferFactory bufferFactory() {
		return this.bufferFactory;
	 * Return the request body, or an error stream if the body was never set or when.
	 * @return body as {@link Flux}
	public Flux<DataBuffer> getBody() {
		return this.body;
	public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
		this.body = Flux.from(body);
		this.cached = true;
		return Mono.empty();
	@Override
	public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
		return writeWith(Flux.from(body).flatMap(p -> p));
	@Override
	public Mono<Void> setComplete() {
		return writeWith(Flux.empty());

完整的代码请参考 github.com/javahongxi/…

 
推荐文章