ä¸ãæ¦è¿°
å¨ Spring Reactor 项ç®ä¸ï¼æ两个åºéè¾å°çæ¹æ³ï¼
publishOn
å
subscribeOn
ãè¿ä¸¤ä¸ªæ¹æ³çä½ç¨æ¯æå®æ§è¡ Reactive Streaming ç Schedulerï¼å¯ç解为线ç¨æ± ï¼ã
为ä½éè¦æå®æ§è¡ Scheduler å¢ï¼ä¸ä¸ªæ¾èæè§çåå æ¯ï¼ç»æä¸ä¸ªååºå¼æµç代ç æå¿«ææ ¢ï¼ä¾å¦ NIOãBIOãå¦æå°è¿äºåè½é½æ¾å¨ä¸ä¸ªçº¿ç¨éæ§è¡ï¼å¿«çå°±ä¼è¢«æ ¢çå½±åï¼æ以éè¦ç¸äºé离ãè¿æ¯è¿ä¸¤ä¸ªæ¹æ³åºç¨çæå ¸åçåºæ¯ã
äºãScheduler
å¨ä»ç»
publishOn
å
subscribeOn
æ¹æ³ä¹åï¼éè¦å
ä»ç»
Scheduler
è¿ä¸ªæ¦å¿µãå¨ Reactor ä¸ï¼
Scheduler
ç¨æ¥å®ä¹æ§è¡è°åº¦ä»»å¡çæ½è±¡ãå¯ä»¥ç®åç解为线ç¨æ± ï¼ä½å
¶å®é
ä½ç¨è¦æ´å¤ãå
ç®åä»ç»
Scheduler
çå®ç°ï¼
Schedulers.elastic()
: è°åº¦å¨ä¼å¨æå建工ä½çº¿ç¨ï¼çº¿ç¨æ°æ ä¸çï¼ç±»ä¼¼äº
Execturos.newCachedThreadPool()
Schedulers.parallel()
: å建åºå®çº¿ç¨æ°çè°åº¦å¨ï¼é»è®¤çº¿ç¨æ°çäº CPU æ ¸å¿æ°ã
å
³äº
Scheduler
çæ´å¤ä½ç¨çå¨ä»¥åä»ç»ã
ä¸ãpublishOn ä¸ subscribeOn
æ¥ä¸æ¥è¿å ¥æ£é¢ãå ç两个ä¾åï¼æ¥èª https://github.com/reactor/lite-rx-api-hands-on ï¼
publishOn
çä¾å
Mono<Void> fluxToBlockingRepository(Flux<User> flux,
BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
subscribeOn
çä¾å
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
è¿éç repository
çç±»åæ¯ BlockingRepository
ï¼æçæ¯ä¼å¯¼è´çº¿ç¨é»å¡çæ°æ®åºæä½çéåï¼ä¾å¦ JPAãMyBatis çåºäº JDBC ææ¯å®ç°ç DAOã
å¨ç¬¬ä¸ä¸ªä¾åä¸ï¼å¨æ§è¡äº publishOn(Schedulers.elastic())
ä¹åï¼repository::save
å°±ä¼è¢« Schedulers.elastic()
å®ä¹ç线ç¨æ± ææ§è¡ã
èå¨ç¬¬äºä¸ªä¾åä¸ï¼subscribeOn(Schedulers.elastic())
çä½ç¨ç±»ä¼¼ãå®ä½¿å¾ repository.findAll()
ï¼ä¹å
æ¬ Flux.fromIterable
ï¼çæ§è¡åçå¨ Schedulers.elastic()
æå®ä¹ç线ç¨æ± ä¸ã
ä»ä¸é¢çæè¿°çï¼publishOn
å subscribeOn
çä½ç¨ç±»ä¼¼ï¼é£ä¸¤è
çåºå«åæ¯ä»ä¹ï¼
两è
çåºå«
ç®å说ï¼ä¸¤è
çåºå«å¨äºå½±åèå´ãpublishOn
å½±åå¨å
¶ä¹åç operator æ§è¡ç线ç¨æ± ï¼è subscribeOn
åä¼ä»æºå¤´å½±åæ´ä¸ªæ§è¡è¿ç¨ãæ以ï¼publishOn
çå½±åèå´åå®çä½ç½®æå
³ï¼è subscribeOn
çå½±åèå´ååä½ç½®æ å
³ã
ç个 publishOn
å subscribeOn
åæ¶ä½¿ç¨çä¾å
Flux.just("tom"ï¼
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat("@mail.com");
.publishOn(Schedulers.newElastic("thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("t");
.subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
è¾åºç»æå¦ä¸ï¼
[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com
ä»ä¸é¢çä¾åå¯ä»¥çåºï¼subscribeOn
å®ä¹å¨ publishOn
ä¹åï¼ä½æ¯å´ä»æºå¤´å¼å§çæãèå¨ publishOn
æ§è¡ä¹åï¼çº¿ç¨æ± åæ´ä¸º publishOn
æå®ä¹çã
è¿éä»ç» publishOn
å subscribeOn
çä¸ç§å®é
ç¨éï¼é£å°±æ¯ååºå¼ç¼ç¨åä¼ ç»çï¼ä¼å¯¼è´çº¿ç¨é»å¡çç¼ç¨ææ¯æ··ç¨çåºæ¯ãå
¶å®å¼å¤´ä¸¤ä¸ªä¾åå·²ç»è§£éäºè¿ä¸ªåºæ¯ã
å¨ç¬¬ä¸ä¸ª publishOn
çä¾åä¸ï¼repository::save
ä¼å¯¼è´çº¿ç¨é»å¡ï¼ä¸ºäºé¿å
é æ对å
¶å®ååºå¼æä½çå½±åï¼ä¾¿ä½¿ç¨ publishOn
æ¹åå
¶æ§è¡çº¿ç¨ã
å¨ç¬¬äºä¸ª subscribeOn
çä¾åä¸ï¼repository.findAll()
ä¼å¯¼è´çº¿ç¨é»å¡ãä½æ¯å
¶æ¯æºå¤´ç publisherï¼å æ¤ä¸è½ä½¿ç¨ publishOn
æ¹åå
¶ æ§è¡çº¿ç¨ãè¿æ¶å°±éè¦ä½¿ç¨ subscribeOn
ï¼å¨æºå¤´ä¸ä¿®æ¹å
¶æ§è¡çº¿ç¨ã
è¿æ ·ï¼éè¿ publishOn
å subscribeOn
å°±å¨ååºå¼ç¼ç¨ä¸å®ç°äºçº¿ç¨æ± é离çç®çï¼ä¸å®ç¨åº¦ä¸é¿å
äºä¼å¯¼è´çº¿ç¨é»å¡çç¨åºæ§è¡å½±åå°ååºå¼ç¼ç¨çç¨åºæ§è¡æçã
ä½¿ç¨ publishOn
å subscribeOn
åªè½å¨ä¸å®ç¨åº¦ä¸é¿å
ååºå¼ç¼ç¨ä»£ç æ§è¡çæç被影åãå 为ç¨æ¥é离ç线ç¨æ± èµæºç»å½æ¯æéçï¼æ¯å¦å½åºç°æ°æ®åºèµæºä¸è¶³ãæ
¢æ¥è¯¢çé®é¢æ¶ï¼å¯¹åºç线ç¨æ± èµæºå¦æ被èå°½ï¼è¿æ¯ä¼ä½¿æ´ä¸ªååºå¼ç¼ç¨çæ§è¡æçåå°å½±åã
ç®åï¼RedisãMongoãCouchbase çéå
³ç³»åæ°æ®åºåæç¸åºçååºå¼ç¼ç¨ç解å³æ¹æ¡ï¼ä½æ¯å
³ç³»åæ°æ®åºå´æ²¡æçæ³çæ¹æ¡ãä¸ä¸ªéè¦åå æ¯ JDBC æ¬èº«å°±æ¯ä¸ä¸ªé»å¡å¼ç APIï¼æ ¹æ¬ä¸å¯è½è®©å
¶éåºååºå¼ç¼ç¨ãå æ¤éè¦ä¸ä¸ªæ°çæ¹æ¡ãç®å Oracle æ£å¨æ¨å¨ ADBA (Asynchronous Database Access API)ï¼ä½¿å¾å
³ç³»åæ°æ®åºå¯ä»¥æ»¡è¶³å¼æ¥ç¼ç¨çéè¦ãä½æ¯ï¼å ä¸ºæ¯ Oracle 主导ï¼å¤§å®¶é½æçï¼æ以ç®ååæ¯è¿ä¸æ¯å¾ææãå¦å¤ä¸ä¸ªææ¯æ¹æ¡æ¯ Spring æ¨å¨ç R2DBCï¼ä»ååä¸æ¥çå°±å¾åæ¯ JDBC å¨ååºå¼ç¼ç¨é¢åç对åºç解å³æ¹æ¡ãç®åå¯ä»¥æ¯æ PostgreSQLï¼æ¯æ MySQL ç®åè¿å°éæ¶æ¥ã
æ¥ä¸æ¥å
³äº Project Reactor çæç« ææç®å大家ä»ç»ä¸ä¸ Hot å Cold Publisher çæ¦å¿µä»¥å Project Reactor çæºç å®ç°ã