1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spring WebFlux Webサービスで Server-Sent Events で Hello World する

Last updated at Posted at 2023-03-20

Spring WebFlux Webサービスで Server-Sent Events で Hello World する

こんにちは、 @studio_meowtoon です。今回は、WSL の Ubuntu 22.04 で、Server-Sent Events を実装する Spring WebFlux Web サービスを作成して非同期動作を確認する方法を紹介します。

Windows 11 の Linux でクラウド開発します。

こちらから記事の一覧がご覧いただけます。

非同期・反応的なリクエスト処理 Spring WebFlux は、リアクティブストリームと呼ばれる仕組みを使用して、リクエストを非同期で処理することができます。これにより、少ないリソースで高いスループットを実現することができます。 WebFlux サーバー Spring WebFlux には、Netty をベースにした WebFlux サーバーが含まれています。このサーバーは、高いスループットと低いレイテンシーを実現することができます。 Reactive Streams API のサポート Spring WebFlux は、Reactive Streams API と呼ばれる仕様に準拠しており、他の Reactive Streams API を使用するライブラリと統合することができます。これにより、より柔軟で拡張性の高いアプリケーションを開発することができます。

Server-Sent Events (SSE) は、サーバーからの単方向のリアルタイム通信に特化した技術であり、サーバーがクライアントに対してリアルタイムに更新情報をプッシュすることができます。 長時間接続を維持することで、クライアントがサーバーからの更新情報をリアルタイムで受け取ることができます。

SSE は、リアルタイムの情報配信が必要なウェブアプリケーションに使用されます。例えば、以下のような状況で使用されることがあります。

キーワード ソーシャルメディアのリアルタイム更新 ソーシャルメディアのようなウェブアプリケーションでは、ユーザーが投稿した新しい情報や、フォローしている人々の活動情報などをリアルタイムに更新する必要があります。SSE を使用することで、サーバーが新しい情報を受信するたびに、クライアントにその情報をプッシュすることができます。 ストック情報のリアルタイム更新 株式市場のような業界では、株価の変動などのリアルタイム情報が必要です。SSE を使用することで、株価や取引の更新情報をリアルタイムに配信することができます。 オンラインゲームのリアルタイム更新 オンラインゲームでは、プレイヤーのアクションや他のプレイヤーの行動など、リアルタイムの情報配信が必要です。SSE を使用することで、プレイヤーにリアルタイムな情報を配信することができます。

これらは一例であり、リアルタイムな情報配信が必要なさまざまな種類のウェブアプリケーションで SSE が使用されることがあります。

Java JDK ※ こちらの関連記事からインストール方法をご確認いただけます

$ java -version
openjdk version "17.0.6" 2023-01-17
OpenJDK Runtime Environment GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13)
OpenJDK 64-Bit Server VM GraalVM CE 22.3.1 (build 17.0.6+10-jvmci-22.3-b13, mixed mode, sharing)

Maven ※ こちらの関連記事からインストール方法をご確認いただけます

$ mvn -version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.18, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64

プロジェクトフォルダを作成します。
※ ~/tmp/sync-spring-mvc をプロジェクトフォルダとします。

$ mkdir -p ~/tmp/sync-spring-mvc
$ cd ~/tmp/sync-spring-mvc
アプリケーションクラスの作成

アプリケーションクラスを作成します。

プロジェクト構成を単純にするために全ての要素を Application クラスに記述しています。

$ mkdir -p src/main/java/com/example/springmvc
$ vim src/main/java/com/example/springmvc/Application.java

ファイルの内容

Application.java
package com.example.springmvc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit




    
;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    @GetMapping("/one")
    public Map<String, String> getOne() throws InterruptedException {
        Map<String, String> map = Map.of("message", "Hello Object!");
        TimeUnit.SECONDS.sleep(2);
        log.info("Sending message: {}", map);
        return map;
    @GetMapping("/list")
    public List<Map<String, String>> getList() throws InterruptedException {
        List<Map<String, String>> list = List.of(
            Map.of("message", "Hello List 1!"),
            Map.of("message", "Hello List 2!"),
            Map.of("message", "Hello List 3!"),
            Map.of("message", "Hello List 4!"),
            Map.of("message", "Hello List 5!"));
        list.forEach(map -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                log.error(e.getMessage());
                throw new RuntimeException(e);
            log.info("Sending message: {}", map);
        return list;
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sync-spring-mvc</artifactId>
    <version>1.0</version>
    <name>sync-spring-mvc</name>
    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
ログファイルを出力する設定
  • application.properties
  • logback-spring.xml
  • こちらの関連記事で手順がご確認いただけます。

    https://qiita.com/studio_meowtoon/items/ffbcb6ac9179852b2816

    アプリのビルド

    Java アプリをビルドします。
    ※ target/app.jar が作成されます。

    $ mvn clean install
    アプリの起動
    

    アプリを起動します。
    ※ アプリを停止するときは ctrl + C を押します。

    $ rm -rf log
    $ mvn spring-boot:run
    アプリの動作確認
    

    別ターミナルから curl コマンドで確認します。

    オブジェクト(Map) を取得した場合
    $ curl -v http://localhost:8080/one -w '\n'
    *   Trying 127.0.0.1:8080...
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /one HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.81.0
    > Accept: */*
    * Mark bundle as not supporting multiuse
    < HTTP/1.1 200
    < Content-Type: application/json
    < Transfer-Encoding: chunked
    < Date: Mon, 20 Mar 2023 03:53:45 GMT
    * Connection #0 to host localhost left intact
    {"message":"Hello Object!"}
    

    別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

    $ cd ~/tmp/sync-spring-mvc/log
    $ cat app.log
    2023-03-20 13:05:32.679 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
    2023-03-20 13:05:32.686 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.954 seconds (JVM running for 1.11)
    2023-03-20 13:05:45.910 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
    2023-03-20 13:05:45.913 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
    2023-03-20 13:05:47.930 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.getOne:26 - Sending message: {message=Hello Object!}
    

    http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。

    リストを取得した場合

    別ターミナルから curl コマンドで確認します。

    $ curl -v http://localhost:8080/list -w '\n'
    *   Trying 127.0.0.1:8080...
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /list HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.81.0
    > Accept: */*
    * Mark bundle as not supporting multiuse
    < HTTP/1.1 200
    < Content-Type: application/json
    < Transfer-Encoding: chunked
    < Date: Mon, 20 Mar 2023 03:56:19 GMT
    * Connection #0 to host localhost left intact
    [{"message":"Hello List 1!"},{"message":"Hello List 2!"},{"message":"Hello List 3!"},{"message":"Hello List 4!"},{"message":"Hello List 5!"}]
    

    別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

    $ cd ~/tmp/sync-spring-mvc/log
    $ cat app.log
    2023-03-20 13:06:34.424 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
    2023-03-20 13:06:34.430 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.947 seconds (JVM running for 1.102)
    2023-03-20 13:06:38.337 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
    2023-03-20 13:06:38.340 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
    2023-03-20 13:06:40.522 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 1!}
    2023-03-20 13:06:42.691 [INFO ]
    
    
    
    
        
     [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 2!}
    2023-03-20 13:06:44.721 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 3!}
    2023-03-20 13:06:46.740 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 4!}
    2023-03-20 13:06:48.748 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.Application.lambda$getList$0:45 - Sending message: {message=Hello List 5!}
    

    http-nio-8080-exec-1 というスレッド名から Spring MVC の Tomcat のスレッドで処理していると推測出来ます。クライアント (curl) 側にはデータは時間が経ってから一度に取得されます。

    ここまでのまとめ

    Spring MVC では、組み込みの Web サーバーとして Tomcat が使用されています。 List オブジェクトは同期で処理されています。

    非同期処理 Spring WebFlux Web サービスの作成

    Spring Boot で作成します。

    プロジェクトフォルダの作成

    プロジェクトフォルダを作成します。
    ※ ~/tmp/async-spring-webflux をプロジェクトフォルダとします。

    $ mkdir -p ~/tmp/async-spring-webflux
    $ cd ~/tmp/async-spring-webflux
    アプリケーションクラスの作成
    

    アプリケーションクラスを作成します。

    プロジェクト構成を単純にするために全ての要素を Application クラスに記述しています。

    $ mkdir -p src/main/java/com/example/springwebflux
    $ vim src/main/java/com/example/springwebflux/Application.java
    

    ファイルの内容

    Application.java
    package com.example.springwebflux;
    import java.time.Duration;
    import java.util.Map;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.http.codec.ServerSentEvent;
    import org.springframework.web.bind.annotation.CrossOrigin;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    @Slf4j
    @CrossOrigin(origins = "*", allowedHeaders = "*")
    @RestController
    @SpringBootApplication
    public class Application {
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        @GetMapping("/mono")
        public Mono<Map<String, String>> getMono() {
            return Mono.just(
                Map.of("message", "Hello Mono!"))
                .delayElement(Duration.ofSeconds(2))
                .doOnNext(map -> log.info("Sending message: {}", map));
        @GetMapping("/flux")
        public Flux<Map<String, String>> getFlux() {
            return Flux.range(1, 5)
                .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
                .delayElements(Duration.ofSeconds(2))
                .doOnNext(map -> log.info("Sending message: {}", map));
        @GetMapping("/flux-sse")
        public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
            return Flux.range(1, 5)
                .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
                .delayElements(Duration.ofSeconds(2))
                .doOnNext(map -> log.info("Sending message: {}", map))
                .map(map -> ServerSentEvent.builder(map).build())
                .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
    クライアントに非同期の単一値応答を返すエンドポイント
    
    @GetMapping("/mono")
    public Mono<Map<String, String>> getMono() {
        return Mono.just(
            Map.of("message", "Hello Mono!"))
            .delayElement(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    Mono.just(Map.of("message", "Hello Mono!")) は、"message" キーとその値を持つ Map オブジェクトを作成しそれを Mono にラップして返します。
    .delayElement(Duration.ofSeconds(2)) は、応答を2秒間遅延させるために Mono を変換する演算子です。
    .doOnNext(map -> log.info("Sending message: {}", map)) は、値が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
    つまり、このエンドポイントは、2秒間待ってから、"Hello Mono!" メッセージを含む Map オブジェクトを含む単一値を返します。また、ログには、値が通知されたときに "Sending message: {message=Hello Mono!}" というメッセージが出力されます。
    クライアントに非同期の複数値応答を返すエンドポイント
    
    @GetMapping("/flux")
    public Flux<Map<String, String>> getFlux() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    .map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。
    .delayElements(Duration.ofSeconds(2)) は、各要素を2秒間遅延させるための演算子です。
    .doOnNext(map -> log.info("Sending message: {}", map)) は、各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
    つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。
    クライアントに Server-Sent Events(SSE)プロトコルを使用して非同期でデータを送信するエンドポイント
    
    @GetMapping("/flux-sse")
    public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
        return Flux.range(1, 5
    
    
    
    
        
    )
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map))
            .map(map -> ServerSentEvent.builder(map).build())
            .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
    Flux<ServerSentEvent<Map<String, String>>> は Server-Sent Events を使用して複数の非同期値を表す Reactive Streams の型です。この場合、複数の Map オブジェクトを表しています。
    Flux.range(1, 5) は、1から5までの整数の Flux を生成します。
    .map(idx -> Map.of("message", "Hello Flux " + idx + "!")) は、各要素に対して "message" キーとその値を持つ Map オブジェクトを作成するための演算子です。
    .delayElements(Duration.ofSeconds(2)) は各要素を2秒間遅延させるための演算子です。
    .doOnNext(map -> log.info("Sending message: {}", map)) は各要素が通知されるたびに log オブジェクトにメッセージを出力するための副作用を追加するための演算子です。
    .map(map -> ServerSentEvent.builder(map).build()) は各 Map オブジェクトを ServerSentEvent オブジェクトに変換するための演算子です。
    .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build())) はストリームの最後に ServerSentEvent オブジェクトを追加するための演算子です。これにより SSEストリームの終わりを示す event: end タグが送信されます。
    つまり、このエンドポイントは、各要素が2秒間隔で通知され "Hello Flux {index}!" メッセージを含む Map オブジェクトが含まれる複数の値を Server-Sent Events 形式で返します。また、ログには各値が通知されたときに "Sending message: {message=Hello Flux {index}!}" というメッセージが出力されます。
    
    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.8</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>sync-spring-webflux</artifactId>
        <version>1.0</version>
        <name>sync-spring-webflux</name>
        <properties>
            <java.version>11</java.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webflux</artifactId>
            </dependency>
            <!-- Lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.26</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
        <build>
            <finalName>app</finalName>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    ログファイルを出力する設定
    
  • application.properties
  • logback-spring.xml
  • こちらの関連記事で手順がご確認いただけます。

    Mono を普通に HTTP でリクエストした場合
    $ curl -v http://localhost:8080/mono -w '\n'
    *   Trying 127.0.0.1:8080...
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /mono HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.81.0
    > Accept: */*
    * Mark bundle as not supporting multiuse
    < HTTP/1.1 200 OK
    < Vary: Origin
    < Vary: Access-Control-Request-Method
    < Vary: Access-Control-Request-Headers
    < Content-Type: application/json
    < Content-Length: 25
    * Connection #0 to host localhost left intact
    {"message":"Hello Mono!"}
    

    別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

    $ cd ~/tmp/async-spring-webflux/log
    $ cat app.log
    2023-03-20 12:11:59.869 [DEBUG] [main] org.springframework.boot.StartupInfoLogger.logStarting:56 - Running with Spring Boot v2.7.8, Spring v5.3.25
    2023-03-20 12:11:59.870 [INFO ] [main] org.springframework.boot.SpringApplication.logStartupProfileInfo:637 - The following 1 profile is active: "develop"
    2023-03-20 12:12:00.556 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
    2023-03-20 12:12:00.562 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.987 seconds (JVM running for 1.221)
    2023-03-20 12:15:32.844 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getMono$0:32 - Sending message: {message=Hello Mono!}
    

    parallel-1 というスレッド名から Spring WebFlux は並列処理のスレッドで Mono を処理していると推測出来ます。

    Flux を普通の HTTP でリクエストした場合

    別ターミナルから curl コマンドで確認します。

    $ curl -v http://localhost:8080/flux -w '\n'
    *   Trying 127.0.0.1:8080...
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /flux HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.81.0
    > Accept: */*
    * Mark bundle as not supporting multiuse
    < HTTP/1.1 200 OK
    < transfer-encoding: chunked
    < Vary: Origin
    < Vary: Access-Control-Request-Method
    < Vary: Access-Control-Request-Headers
    < Content-Type: application/json
    * Connection #0 to host localhost left intact
    [{"message":"Hello Flux 1!"},{"message":"Hello Flux 2!"},{"message":"Hello Flux 3!"},{"message":"Hello Flux 4!"},{"message":"Hello Flux 5!"}]
    

    別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

    $ cd ~/tmp/async-spring-webflux/log
    $ cat app.log
    2023-03-20 12:19:28.477 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
    2023-03-20 12:19:28.482 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 0.95 seconds (JVM running for 1.124)
    2023-03-20 12:19:42.389 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 1!}
    2023-03-20 12:19:44.392 [INFO ] [parallel-2] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 2!}
    2023-03-20 12:19:46.394 [INFO ] [parallel-3] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 3!}
    2023-03-20 12:19:48.396 [INFO ] [parallel-4] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 4!}
    2023-03-20 12:19:50.399 [INFO ] [parallel-5] com.example.springwebflux.Application.lambda$getFlux$2:40 - Sending message: {message=Hello Flux 5!}
    

    parallel-n というスレッド名から Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理していると推測出来ます。クライアント (curl) 側にはデータは時間が経ってから一度に取得されます。

    Flux を Server-Sent Events 指定の HTTP でリクエストした場合

    別ターミナルから curl コマンドで確認します。

    Accept: text/event-stream ヘッダーを追加します。

    $ curl -v http://localhost:8080/flux-sse -H 'Accept: text/event-stream' -w '\n'
    *   Trying 127.0.0.1:8080...
    * Connected to localhost (127.0.0.1) port 8080 (#0)
    > GET /flux-sse HTTP/1.1
    > Host: localhost:8080
    > User-Agent: curl/7.81.0
    > Accept: text/event-stream
    * Mark bundle as not supporting multiuse
    < HTTP/1.1 200 OK
    < transfer-encoding: chunked
    < Vary: Origin
    < Vary: Access-Control-Request-Method
    < Vary: Access-Control-Request-Headers
    < Content-Type: text/event-stream;charset=UTF-8
    data:{"message":"Hello Flux 1!"}
    data:{"message":"Hello Flux 2!"}
    data:{"message":"Hello Flux 3!"}
    data:{"message":"Hello Flux 4!"}
    data:{"message":"Hello Flux 5!"}
    event:end
    * Connection #0 to host localhost left intact
    

    別ターミナルからログファイルを確認します。※ 必要な個所を抜粋しています。

    $ cd ~/tmp/async-spring-webflux/log
    $ cat app.log
    cat app.log
    2023-03-20 12:22:02.249 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
    2023-03-20 12:22:02.256 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started Application in 1.035 seconds (JVM running for 1.224)
    2023-03-20 12:22:17.814 [INFO ] [parallel-1] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 1!}
    2023-03-20 12:22:19.827 [INFO ] [parallel-2] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 2!}
    2023-03-20 12:22:21.831 [
    
    
    
    
        
    INFO ] [parallel-3] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 3!}
    2023-03-20 12:22:23.842 [INFO ] [parallel-4] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 4!}
    2023-03-20 12:22:25.854 [INFO ] [parallel-5] com.example.springwebflux.Application.lambda$getFluxWithSSE$4:48 - Sending message: {message=Hello Flux 5!}
    

    parallel-n というスレッド名から Spring WebFlux は2秒ごとに並列処理のスレッドで Flux を処理しています。クライアント (curl) 側にはデータは一つづつリアルタイムで取得されます。

    ここまでのまとめ

    Spring WebFlux では、組み込みの Web サーバーとして Netty が使用されています。 Flux オブジェクトは非同期で処理されています。

    Ubuntu に構築したシンプルな Java 開発環境で、Server-Sent Events を実装する Spring WebFlux Web サービスを実行することができました。

    Ubuntu を使うと Linux の知識も身に付きます。最初は難しく感じるかもしれませんが、徐々に進めていけば自信を持って書けるようになります。

    どうでしたか? WSL Ubuntu で、Server-Sent Events を実装する Spring Boot Web アプリケーションを手軽に実行することができます。ぜひお試しください。今後も Java の開発環境などを紹介していきますので、ぜひお楽しみにしてください。

    W3C Server-Sent Events
    1
    0
    0

    Register as a new user and use Qiita more conveniently

    1. You get articles that match your needs
    2. You can efficiently read back useful information
    3. You can use dark theme
    What you can do with signing up
    1
    0

    Delete article

    Deleted articles cannot be recovered.

    Draft of this article would be also deleted.

    Are you sure you want to delete this article?