而且
Future
有
apply
方法,声明方式如下:
def apply [T ](b: => T )(implicit e: ExecutionContext ): Future [T ]
关于=> T
的定义,具体可以参考这篇文章
上述用到了柯理化&隐式参数关于柯理化&隐式参数,可以参考这篇文章
给出一个最基本的用法:
package TestFuturePromise
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.Future
object TestFuture {
def main (args: Array [String ]): Unit = {
val future = Future {
Thread .sleep(1000 )
val tid = Thread .currentThread().getName
println(s"future finished in $tid " )
while (!future.isCompleted) {
println("main thread wait for future" )
Thread .sleep(200 )
def downLoadUrl (url: String ): String = {
val t = 3
Thread .sleep(1000 * t)
val tid = Thread .currentThread().getName
println(s"finish download after $t s from thread $tid " )
"hello world"
结果输出:
main thread wait for future
main thread wait for future
main thread wait for future
main thread wait for future
main thread wait for future
future finished in scala-execution-context-global-13
可以看出主线程一直等待Future执行完毕。
和C++不同的是,scala 支持在Future
上添加回调函数,一般借助foreach
的方式。先看下接口定义:
def foreach [U ](f : scala.Function1 [T , U ])(implicit executor : scala.concurrent.ExecutionContext ) : scala.Unit
回调函数传入的参数是Future
填充的结果,并使用一个executor
,我们在这里先利用全局的隐式参数。
我们把上面代码简单变更下即可,变更部分如下:
object TestFuture {
def main (args: Array [String ]): Unit = {
val future = Future {
Thread .sleep(1000 )
val tid = Thread .currentThread().getName
println(s"future finished in $tid " )
"hello future"
future.foreach(s => {
val tid = Thread .currentThread().getName
println(s"callback from $tid get content $s " )
while (!future.isCompleted) {
println("main thread wait for future" )
Thread .sleep(200 )
上面的foreach
方式,只有在Future
成功时候才会调用。这个也等价与onSuccess
方法;失败的情况可以调用onFailed
方法。
如果我们知道Future
会有失败等的异常情况,那么可以借助onComplete
方法,内部借助match
适配Try[T]
值,关于Try
的用法,参考这篇文章
Future 组合
map
用于转化Future
,可以理解为链接多个Future
。举个例子:
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.Future
import scala.util.{Failure , Success }
object TestFuture {
def main (args: Array [String ]): Unit = {
val result = Future [Int ] {
}.map {
num => {
println("int to string map" )
num + "234"
result.onComplete {
case Success (num) => println(num)
case Failure (e) => println(e.getMessage)
Thread .sleep(10000 )
最终的输出结果:
int to string map
map
调用的时候,我们不需要显式地返回Future
类型,只需要返回需要的值即可,scala 会自动生成 对应的Future
还有一些其他用法,比如filter
& flatMap
等,直接参考官方文档 即可
Promise
Promise
用于给Future
填充数据。之前例子中给的Future
,我们是直接在构造方法中声明了计算包体,然后等待获取数据的。而Promise
直接分离了数据提供方和使用方;提供方给使用方提供一个Future
,然后自己的在Pormise
中填充数据即可,
举个例子:
package TestFuturePromise
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.{Future , Promise }
import scala.util.{Failure , Random , Success }
object TestFuture {
def main (args: Array [String ]): Unit = {
val fut = downLoad("www.foo.com" , tag = true )
fut.onComplete {
case Success (txt) => println(txt)
case Failure (e) => println(e.getMessage)
val fut1 = downLoad("www.foo1.com" , tag = false )
fut1.onComplete {
case Success (txt) => println(txt)
case Failure (e) => println(e.getMessage)
println("finish add callback" )
Thread .sleep(5 * 1000 )
println("main thread finished" )
def downLoad (url: String , tag: Boolean ): Future [String ] = {
val promise = Promise [String ]
global.execute(() => {
val t = Random .between(1 , 4 )
Thread .sleep(t * 1000 )
val tid = Thread .currentThread().getName
if (tag) {
println(s"$tid download from [$url ] takes $t s" )
promise.success("hello world" )
} else {
println(s"$tid download from [$url ] [FAILED] takes $t s" )
promise.failure(new Exception ("download failed" ))
promise.future
输出结果:
finish add callback
scala-execution-context-global-14 download from [www.foo1.com] [FAILED] takes 3 s
download failed
scala-execution-context-global-13 download from [www.foo.com] takes 4 s
hello world
main thread finished
耗时控制以及 or 控制
Future
没有timeout
方法,但是有些时候我们想要Future
超时就立刻返回了,此时可以借助 Java 的 Timer 类来实现。给出代码示例:
package TestFuturePromise
import java.util.{Timer , TimerTask }
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.{Future , Promise }
object TestFuture {
def main (args: Array [String ]): Unit = {
timeout(1000 ).foreach(_ => println("timer Future finished" ))
Thread .sleep(2000 )
def timeout (t: Int ): Future [Unit ] = {
val timer = new Timer (true )
val p = Promise [Unit ]
timer.schedule(new TimerTask {
override def run (): Unit = {
p.success(())
timer.cancel()
}, t)
p.future
单纯的 timeout
方法用处不大,实际的场景是,我们想给一个任意类型的Future[T]
设置超时方法,如果超时了,则调用超时对应的方法。
给出代码示例:
package TestFuturePromise
import java.util.{Timer , TimerTask }
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.{Future , Promise }
import scala.util.{Failure , Success }
object TestFuture {
def main (args: Array [String ]): Unit = {
val f = timeout(1000 ).map(_ => "timeout" ) or Future {
Thread .sleep(8 )
println("or Future callback" )
"hello timeout"
f.onComplete {
case Success (txt) => println(txt)
case Failure (e) => println(e.getMessage)
Thread .sleep(2000 )
def timeout (t: Int ): Future [Unit ] = {
val timer = new Timer (true )
val p = Promise [Unit ]
timer.schedule(new TimerTask {
override def run (): Unit = {
p.success(())
timer.cancel()
}, t)
p.future
implicit class FutureOp [T ](val self: Future [T ] ) {
def or (that: Future [T ]): Future [T ] = {
val p = Promise [T ]
self.onComplete(x => p.tryComplete(x))
that.onComplete(y => p.tryComplete(y))
p.future
这里用到了implicit class
的方式,关于该类型的使用方式,可以参考这篇文章 。我们重点看下or
方法的定义。or
内部定义了Promise
,并返回对应Future
。我们内部同时给self
&that
注册回调,注册的时候,内部都使用了tryComplete
方法,这以为着这两个回调函数只有一个会实际执行onComplete
方法,即第一次调用的那个。
Future 阻塞
很多时候,我们想要阻塞在一个没返回结果的Future
上,此时可以使用Await
的方法。给出代码示例:
package TestFuturePromise
import scala.concurrent.ExecutionContext .Implicits .global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await , Future }
object TestFuture {
def main (args: Array [String ]): Unit = {
val fut = Future {
Thread .sleep(1000 )
println("future weak up" )
"hello Await"
Await .result(fut, 1200. seconds)
println(fut.value)
Scala
1227
WHYBIGDATA
Spark
Scala
3332
Aarongo
JavaScript
Scala
203
Erick_Lv
wxg 后端研发 @ 腾讯