相关文章推荐
听话的牛肉面  ·  npm install gulp-sass ...·  2 年前    · 

RSocket提供了四種request-response模式,為擺脫傳統單一request單一response的限制而生,並原生以WebFlux的reactive方式來架構程式。

引入以下dependency來使用RSocket:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
#application.yaml
spring:
  rsocket:
    server:
      port: 7000

這4種模式為request-reposne、request-stream、fire-and-forget、channel。

1.request-response

server:

package rsocket;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class HelloController {
    @MessageMapping("hello/{name}")
    public Mono<String> handleRequest(@DestinationVariable("name") String name, 
													Mono<String> helloMono) {
        return helloMono
            .doOnNext(hello -> 
							logger.info("Received a hello: {} says {}", name, hello)
            .map(hello -> "Hello, " + name + "!");

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
String name = "Mario";
tcp.route("hello/{name}", name)
  .data("Hello RSocket!")
  .retrieveMono(String.class)
  .subscribe(response -> log.info("Got a response: {}", response));

2.request-steam

server:

import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
@Controller
public class StockQuoteController {
    @MessageMapping("stock/{symbol}")
    public Flux<StockQuote> getStockPrice(
            @DestinationVariable("symbol") String symbol) {
        return Flux
            .interval(Duration.ofSeconds(1))
            .map(symbol -> {
                return SomePricer.someLogicReturnStockQuote(symbol);

client:

String stockSymbol = "TSLA";
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("stock/{symbol}", stockSymbol)
   .retrieveFlux(StockQuote.class)
   .doOnNext(stockQuote -> 
       logger.info("StockQuote: " + stockQuote)
   .subscribe();

3.fire-and-forget

server:

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class AlertController {
    @MessageMapping("alert")
    public Mono<Void> setAlert(Mono<Alert> alertMono) {
        return alertMono
            .doOnNext(alert ->
                logger.info("Receive alert: " + alert)
            .thenEmpty(Mono.empty());

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("alert")
   .data(new Alert("red alert!"))
   .send()
   .subscribe();
logger.info("Alert sent");

4.channel

server:

import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class TipsController {
    @MessageMapping("tip")
    public Flux<PriceOutput> calculate(Flux<PriceInput> priceInputFlux) {
        return priceInputFlux
            .doOnNext(in -> logger.info("Calculating tips:  {}", in))
            .map(in -> PriceOutputCalculator.getPriceOutput(in));

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<PriceInput> priceInputFlux =
        Flux.fromArray(new PriceInput[] {
                new PriceInput(10000),
                new PriceInput(3500),
                new PriceInput(7800)
        .delayElements(Duration.ofSeconds(1));
        tcp.route("tips")
           .data(priceInputFlux)
           .retrieveFlux(PriceOutput.class)
           .subscribe(out -> logger.info("PriceOutput: " + out);

RSocket除了設定TCP協定的port來接收訊息外,也可設定成websocket的方式。引入下面的dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
#application.yaml
spring:
  rsocket:
    server:
      transport: websocket
      mapping-path: /rsocket

client example:

RSocketRequester requester = requesterBuilder.websocket(
																			URI.create("ws://localhost:8081/rsocket"));
requester.route("hello")
				 .data("Hello RSocket in websocket!")
				 .retrieveMono(String.class)
				 .subscribe(response -> logger.info("Resposne: " + response);