大量のJSON配列を受け取らなければいけなかった背景

API提供元がレスポンスの件数制限機能やページング機能を提供しておらず、先頭10件しか必要ないにも関わらず大量のJSON配列を受け取らなければならなかった。

APIを一度呼び出すだけでサーバーのメモリが1.6GB程度増えてしまう状態になっており、処理速度の問題もあったがメモリの問題がクリティカルなものとなっていた。

version

  • Spring Boot 3.2.5
  • Jackson 2.15.4

WebClient

API呼び出しにはSpringのRestClientを使用しており、同期的なHTTP通信をしていた。問題を解決するため、WebClientにHTTPクライアントを変更し、ストリーミング処理にした。

RestClientのBean

同期的なHTTPクライアントであるRestClientの設定は以下のようなものだった。

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.client.RestClient;

import java.net.http.HttpClient;
import java.time.Duration;

import static org.springframework.http.HttpHeaders.ACCEPT;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;

@Configuration
public class SomeApiRestClientConfig {

    @Bean("someApiRestClient")
    public RestClient someApiRestClient(@Value("$(baseUrl:http://example.com}") String baseUrl) {
        var factory = new JdkClientHttpRequestFactory(HttpClient
                                                              .newBuilder()
                                                              .connectTimeout(Duration.ofMillis(10000))
                                                              .build());
        return RestClient
                .builder()
                .requestFactory(factory)
                .baseUrl(baseUrl)
                .defaultStatusHandler(/* 略 */)
                .defaultHeader(ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .requestInterceptor(/* 略 */)
                .build();
    }
}

WebClientのBean作成

RestClientの設定を削除し、代わりにWebClientの共通基本設定を行う。

import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.springframework.http.HttpHeaders.ACCEPT;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;

@Configuration
public class SomeApiWebClientConfig {

    @Bean("someApiWebClient")
    public WebClient someApiWebClient(@Value("$(baseUrl:http://example.com}") String baseUrl) {
        HttpClient httpClient = HttpClient
                .create()
                // 接続タイムアウト
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                // 最初のレスポンスが受信されるまでのタイムアウト
                .responseTimeout(Duration.ofSeconds(30))
                // 読み取りタイムアウト(次のデータチャンクが到着するまで)
                .doOnConnected(con -> con.addHandlerLast(new ReadTimeoutHandler(30, TimeUnit.SECONDS)));
        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
        return WebClient.builder()
                .clientConnector(connector)
                .baseUrl(baseUrl)
                .defaultHeader(ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .defaultHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .filter(ExchangeFilterFunction.ofRequestProcessor(this::logRequest))
                .filter(ExchangeFilterFunction.ofResponseProcessor(this::logResponse))
                .build();
    }

    private Mono<ClientRequest> logRequest(ClientRequest request) {
        System.out.println(request.url().toASCIIString());
        System.out.println(request.method().name());
        System.out.println(request.headers());
        return Mono.just(request);
    }

    private Mono<ClientResponse> logResponse(ClientResponse response) {
        System.out.println(response.request().getURI().toASCIIString());
        System.out.println(response.statusCode().value());
        System.out.println(response.headers().asHttpHeaders());
        return Mono.just(response);
    }
}

APIのURL等定義

APIをURL毎に定義していく。

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.List;
import java.util.Map;

@Component
public class SomeApiWebClient {

    private final WebClient someApiWebClient;
    private final ObjectMapper objectMapper;

    public SomeApiWebClient(WebClient someApiWebClient,
                            ObjectMapper objectMapper) {
        this.someApiWebClient = someApiWebClient;
        this.objectMapper = objectMapper;
    }

    public List<Response> api1(String query) {
        return someApiWebClient
                .get()
                .uri("/api1?query={query}", Map.of("query", query))
                .retrieve()
                .bodyToFlux(JsonNode.class)
                .take(10)
                .map(x -> objectMapper.convertValue(x, Response.class))
                .toStream()
                .toList();
    }
    public record Response(String id, String name) {}
}

ここでポイントとなるのがbodyToFlux(JsonNode.class)take(10)である。

RestClientのときはretrieve()の後はbody(new ParameterizedTypeReference<>()でJSON配列を丸ごとそのままAPIの戻り型であるList<Response>に変換していたが、WebClientでは先頭10件に絞っている。

Flux

Spring WebFluxフレームワークにおけるリアクティブストリームの2 つの基本的な型としてMonoとFluxの2つがある。

  • Mono: 0または1つの要素を非同期に処理するためのリアクティブタイプ
  • Flux:0個以上の要素を非同期に処理するためのリアクティブタイプで複数の値をストリームとして返す

今はレスポンスボディがJSON「配列」なので、Fluxを使うことになる。WebClientは以下の手順でJSON配列をストリーム処理で扱ってくれる。

  1. Content-Type: application/json によってレスポンスボディはJSONであると認識される
  2. bodyToFluxJsonNode.class を引数として渡すことでJSON配列の各要素が JsonNode.class に変換される

take(10)をしていることから、ストリーム処理は最初の10件だけを処理してくれる。

ブロッキング操作: toStream().toList(), collectList().block()

toStream()を使ってJavaのStream APIに変換したり、collectList().block()をしてブロッキング操作をするとリアクティブでなくなる。WebFluxを基本として実装していれば問題だが、今回はSpring MVCで全体を実装しており、元々リアクティブ前提で設計しているわけではないので問題ない。今の目的は大規模なJSON配列を全てオンメモリで処理せずに先頭n件を取得することである。

Jackson Streaming API

Jackson Streaming APIでもストリーミング処理ができるため、解決できそうだ。