而且 Future apply 方法,声明方式如下:

def apply[T](b: => T)(implicit e: ExecutionContext): Future[T]

关于=> T的定义,具体可以参考这篇文章 上述用到了柯理化&隐式参数关于柯理化&隐式参数,可以参考这篇文章

给出一个最基本的用法:

package TestFuturePromise
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object TestFuture {
  def main(args: Array[String]): Unit = {
    val future = Future {  // Future[Unit] 类型的
      Thread.sleep(1000)
      val tid = Thread.currentThread().getName
      println(s"future finished in $tid")
    while (!future.isCompleted) {
      println("main thread wait for future")
      Thread.sleep(200)
  // 模拟下载
  def downLoadUrl(url: String): String = {
    val t = 3
    Thread.sleep(1000 * t)
    val tid = Thread.currentThread().getName
    println(s"finish download after $t s from thread $tid")
    "hello world"

结果输出:

main thread wait for future
main thread wait for future
main thread wait for future
main thread wait for future
main thread wait for future
future finished in scala-execution-context-global-13

可以看出主线程一直等待Future执行完毕。

和C++不同的是,scala 支持在Future 上添加回调函数,一般借助foreach 的方式。先看下接口定义:

def foreach[U](f : scala.Function1[T, U])(implicit executor : scala.concurrent.ExecutionContext) : scala.Unit

回调函数传入的参数是Future 填充的结果,并使用一个executor,我们在这里先利用全局的隐式参数。

我们把上面代码简单变更下即可,变更部分如下:

object TestFuture {
  def main(args: Array[String]): Unit = {
    val future = Future {  // Future[String] 类型的
      Thread.sleep(1000)
      val tid = Thread.currentThread().getName
      println(s"future finished in $tid")
      "hello future" // 这里新增返回值
    // 增加回调函数
    future.foreach(s => {
      val tid = Thread.currentThread().getName
      println(s"callback from $tid get content $s")
    while (!future.isCompleted) {
      println("main thread wait for future")
      Thread.sleep(200)

上面的foreach 方式,只有在Future 成功时候才会调用。这个也等价与onSuccess 方法;失败的情况可以调用onFailed 方法。

如果我们知道Future会有失败等的异常情况,那么可以借助onComplete方法,内部借助match适配Try[T] 值,关于Try 的用法,参考这篇文章

Future 组合

map 用于转化Future,可以理解为链接多个Future。举个例子:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object TestFuture {
  def main(args: Array[String]): Unit = {
    val result = Future[Int] {
    }.map {
      num => {
        println("int to string map")
        num + "234"
    result.onComplete {
      case Success(num) => println(num)
      case Failure(e) => println(e.getMessage)
    Thread.sleep(10000)

最终的输出结果:

int to string map

map 调用的时候,我们不需要显式地返回Future类型,只需要返回需要的值即可,scala 会自动生成 对应的Future

还有一些其他用法,比如filter & flatMap 等,直接参考官方文档即可

Promise

Promise 用于给Future 填充数据。之前例子中给的Future,我们是直接在构造方法中声明了计算包体,然后等待获取数据的。而Promise 直接分离了数据提供方和使用方;提供方给使用方提供一个Future,然后自己的在Pormise 中填充数据即可,

举个例子:

package TestFuturePromise
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Random, Success}
object TestFuture {
  def main(args: Array[String]): Unit = {
    val fut = downLoad("www.foo.com", tag = true)
    fut.onComplete {
      case Success(txt) => println(txt)
      case Failure(e) => println(e.getMessage)
    val fut1 = downLoad("www.foo1.com", tag = false)
    fut1.onComplete {
      case Success(txt) => println(txt)
      case Failure(e) => println(e.getMessage)
    println("finish add callback")
    Thread.sleep(5 * 1000)
    println("main thread finished")
  def downLoad(url: String, tag: Boolean): Future[String] = {
    val promise = Promise[String]
    // 模拟异步下载
    global.execute(() => {
      val t = Random.between(1, 4)
      Thread.sleep(t * 1000)
      val tid = Thread.currentThread().getName
      if (tag) {
        println(s"$tid download from [$url] takes $t s")
        promise.success("hello world")
      } else {
        println(s"$tid download from [$url] [FAILED] takes $t s")
        promise.failure(new Exception("download failed"))
    promise.future

输出结果:

finish add callback
scala-execution-context-global-14 download from [www.foo1.com] [FAILED] takes 3 s
download failed
scala-execution-context-global-13 download from [www.foo.com] takes 4 s
hello world
main thread finished

耗时控制以及 or 控制

Future没有timeout 方法,但是有些时候我们想要Future超时就立刻返回了,此时可以借助 Java 的 Timer 类来实现。给出代码示例:

package TestFuturePromise
import java.util.{Timer, TimerTask}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
object TestFuture {
  def main(args: Array[String]): Unit = {
    timeout(1000).foreach(_ => println("timer Future finished"))
    Thread.sleep(2000)
  def timeout(t: Int): Future[Unit] = {
    val timer = new Timer(true)
    val p = Promise[Unit]
    timer.schedule(new TimerTask {
      override def run(): Unit = {
        p.success(())  // 超时了,此时立刻设置返回值
        timer.cancel()
    }, t)
    p.future

单纯的 timeout 方法用处不大,实际的场景是,我们想给一个任意类型的Future[T] 设置超时方法,如果超时了,则调用超时对应的方法。

给出代码示例:

package TestFuturePromise
import java.util.{Timer, TimerTask}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
object TestFuture {
  def main(args: Array[String]): Unit = {
    // 利用 map,转换成 Future[String]类型,并返回超时的处理结果
    val f = timeout(1000).map(_ => "timeout") or Future {
      Thread.sleep(8)
      println("or Future callback")
      "hello timeout"
    f.onComplete {
      case Success(txt) => println(txt)
      case Failure(e) => println(e.getMessage)
    Thread.sleep(2000)
  def timeout(t: Int): Future[Unit] = {
    val timer = new Timer(true)
    val p = Promise[Unit]
    timer.schedule(new TimerTask {
      override def run(): Unit = {
        p.success(())
        timer.cancel()
    }, t)
    p.future
  implicit class FutureOp[T](val self: Future[T]) {
    def or(that: Future[T]): Future[T] = {
      val p = Promise[T]
      // tryComplete 只会成功调用一次
      self.onComplete(x => p.tryComplete(x))
      that.onComplete(y => p.tryComplete(y))
      p.future

这里用到了implicit class的方式,关于该类型的使用方式,可以参考这篇文章。我们重点看下or方法的定义。or内部定义了Promise,并返回对应Future。我们内部同时给self &that 注册回调,注册的时候,内部都使用了tryComplete 方法,这以为着这两个回调函数只有一个会实际执行onComplete方法,即第一次调用的那个。

Future 阻塞

很多时候,我们想要阻塞在一个没返回结果的Future 上,此时可以使用Await的方法。给出代码示例:

package TestFuturePromise
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
object TestFuture {
  def main(args: Array[String]): Unit = {
    val fut = Future {
      Thread.sleep(1000)
      println("future weak up")
      "hello Await"
    Await.result(fut, 1200.seconds)
    println(fut.value)
        Scala
                         
私信