OkHttp是主流的网络请求框架,Android网络请求基本的项目封装也是有Rxjava+Retrofit+Okhttp进行封装,面对Kotlin语法可能也有的同学使用Coroutine+Retrofit+Okhttp进行封装 这篇文章并非将封装 而是对OkHttp源码性进行阅读 对OkHttp进行一步的了解,并且学习里面一些里面的设计思想。 源码是最好的老师! 本文基于okhttp:4.2.2

okhttp的基本使用:

//////////////////////////////////////////////
//下面代码中调用到的协程相关信息
class OkhttpActivity : AppCompatActivity(){
 private val job = Job()
 private val ioScope = CoroutineScope(Dispatchers.IO + job)
 lateinit var handler:CoroutineExceptionHandler
 companion object{
       val TAG = "OkhttpActivity"
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_okhttp)
         handler = CoroutineExceptionHandler{_,exception ->
            run {
                Log.d(TAG, "exception" + exception.message)
复制代码

同步请求方式

//创建OkHttpClient对象,这里使用的是Builder设计模式的创建方式
var client = OkHttpClient.Builder()
               .connectTimeout(5, TimeUnit.SECONDS)
               .build()
//创建Request对象
var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
ioScope.launch(handler) {
//同步请求
  var response = client.newCall(request).execute()
复制代码

以上的操作是基本的实例代码 :

步骤如下:

  • 创建 OkHttpClient
  • 创建请求对象 Request 同步请求直接写成一句了
  • ,实际上是先 调 newCall() 方法返回一个 Call 对象
  • 调用 execute() 方法,最终根据返回Respone对象
  • 首先分析: OkHttpClient ,从 OkHttpClient.Builder()
  • open class OkHttpClient internal constructor(builder: Builder) : 
    Cloneable, Call.Factory, WebSocket.Factory {
    //关键2
    constructor() : this(Builder())
    //...省略其他代码
    class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()
        internal var connectionPool: ConnectionPool = ConnectionPool()
        internal val interceptors: MutableList<Interceptor> = mutableListOf()
        internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
        internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
        internal var retryOnConnectionFailure = true
        internal var authenticator: Authenticator = Authenticator.NONE
      fun dispatcher(dispatcher: Dispatcher) = apply {
          this.dispatcher = dispatcher
    //关键3
    fun connectTimeout(timeout: Long, unit: TimeUnit) = apply {
          connectTimeout = checkDuration("timeout", timeout, unit)
    //... 省略其他代码
    //关键1
    fun build(): OkHttpClient = OkHttpClient(this)
    

    上面可以看到BuilderOkHttpClient中的内部类,内部类中的build()将自己作为参数传入OkHttpClient(this)调用了constructor() : this(Builder()),这里面的代码中,将所有设置方法返回自己本身this使用的是设计模式的建造者模式。 (可能有些同学没怎么看懂dispatcher()这个方法返回Builder本身,这个可以看一下kotlin的apply这些函数)

  • 第二步分析建请求对象 Request
  • class Request internal constructor(
      @get:JvmName("url") val url: HttpUrl,
      fun newBuilder(): Builder = Builder(this)
    open class Builder {
        internal var url: HttpUrl? = null
        internal var method: String
        internal var headers: Headers.Builder
        internal var body: RequestBody? = null
        /** A mutable map of tags, or an immutable empty map if we don't have any. */
        internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
    //无参数构造方法
        constructor() {
          this.method = "GET"
          this.headers = Headers.Builder()
    //带Request参数的构造方法
        internal constructor(request: Request) {
          this.url = request.url
          this.method = request.method
          this.body = request.body
          this.tags = if (request.tags.isEmpty()) {
            mutableMapOf()
          } else {
            request.tags.toMutableMap()
          this.headers = request.headers.newBuilder()
    

    其实跟OkHttpClient类似,Reuqest这个类也有个内部类Builder ,同样是建造者模式的一个应用,其中可以看到构造方法中 this.method = "GET"代表默认是为GET的请求的,且有个带参的构造函数constructor(request: Request)Reuqest中被newBuilder()方法调用。(这个方法一般用于重新设置新的请求头的需求,例如 拦截器~)

  • 接下来 分析newCall()
  • OkHttpClient{
       //...省略其他代码
      override fun newCall(request: Request): Call {
        return RealCall.newRealCall(this, request, forWebSocket = false)
    /////////////////////////////////////////////////
      //RealCall 类   我们进去newRealCall()方法查看一下
    internal class RealCall private constructor(
      val client: OkHttpClient,
      val originalRequest: Request,
      val forWebSocket: Boolean
    ) : Call {
    companion object {
        fun newRealCall(
          client: OkHttpClient,
          originalRequest: Request,
          forWebSocket: Boolean
        ): RealCall {
          // Safely publish the Call instance to the EventListener.
          return RealCall(client, originalRequest, forWebSocket).apply {
            transmitter = Transmitter(client, this)
    

    调用newCall()方法,可以看到实例化了RealCall并且transmitter = Transmitter(client, this)实例化了RealCall中的transmitter,并返回了RealCall本身

  • 接下来看execute
  • //RealCall类
    override fun execute(): Response {
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        transmitter.timeoutEnter()
    // 关键1
        transmitter.callStart()
        try {
    //  关键2
          client.dispatcher.executed(this)
    //  关键3
          return getResponseWithInterceptorChain()
        } finally {
          client.dispatcher.finished(this)
    ///////////////////////////////////
    //关键1 :callStart()的调用
    // Transmitter类:
      fun callStart() {
        this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
        eventListener.callStart(call)
    ////////////////////////////////////
    // 关键2 executed()的调用:
    class Dispatcher constructor() {
    // ....省略其他代码
      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
          return executorServiceOrNull!!
      private val runningSyncCalls = ArrayDeque<RealCall>()
      @Synchronized internal fun executed(call: RealCall) {
        runningSyncCalls.add(call)
    /////////////////////////////////////////////////////////////////////////////////////
    // 关键3  getResponseWithInterceptorChain()
      @Throws(IOException::class)
      fun getResponseWithInterceptorChain(): Response {
        // Build a full stack of interceptors.
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        interceptors += CallServerInterceptor(forWebSocket)
     //关键代码5
        val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
            client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
        var calledNoMoreExchanges = false
        try {
    //关键代码4 真正处理拦截器的地方 并返回response 数据
          val response = chain.proceed(originalRequest)
          if (transmitter.isCanceled) {
            response.closeQuietly()
            throw IOException("Canceled")
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw transmitter.noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null)
    ///////////////////////////////////////////////////////////////////////
    // 关键代码4 真正处理拦截器的地方
    RealInterceptorChain类
      @Throws(IOException::class)
      fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
        if (index >= interceptors.size) throw AssertionError()
        calls++
        // If we already have a stream, confirm that the incoming request will use it.
        check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
          "network interceptor ${interceptors[index - 1]} must retain the same host and port"
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        check(this.exchange == null || calls <= 1) {
          "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
        // Call the next interceptor in the chain.
    // 关键代码5
        val next = RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
        val interceptor = interceptors[index]
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException(
            "interceptor $interceptor returned null")
        // Confirm that the next interceptor made its required call to chain.proceed().
        check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
          "network interceptor $interceptor must call proceed() exactly once"
        check(response.body != null) { "interceptor $interceptor returned a response with no body" }
        return response
    

    关键代码1:transmitter.callStart(),中最终调用了eventListener.callStart(call) 设置监听回调。

    关键代码2: client.dispatcher.executed(this) 做请求一次的判断跟标记,如果!executed == true,则抛出异常,false则是标记为true``client.dispatcher.executed(this),Dispatcher中调用executed将其添加到runningSyncCalls数组双端队列中

    关键代码3 getResponseWithInterceptorChain()这个是最终获取到数据Response的调用,方法中设置的各种拦截器,包括cookie cache等,默认cookieCookieJar.NO_COOKIES

    关键代码4 真正处理代码拦截器的地方,这里使用责任链的设计模式,在// Call the next interceptor in the chain.// 关键代码5 这个地方一直调用链表的下个拦截器的 intercept()方法,递归的形式.

    关键代码5 我们可以看到,最新放入的拦截器最先处理, 优先我们设置的拦截器

    异步请求方式

     创建OkHttpClient对象
    var client = OkHttpClient.Builder()
    .connectTimeout(5, TimeUnit.SECONDS)
    .build()
    创建Request对象
    var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
    ioScope.launch(handler) {
      client.newCall(request).enqueue(object : Callback {
        override fun onFailure(call: Call, e: IOException) {
        Log.d(TAG, "onFailure" + e.message)
        override fun onResponse(call: Call, response:   Response) {
        Log.d(TAG, "onResponse" + response.message)
    复制代码

    以上的操作是基本的实例代码 :

    步骤如下:

  • 创建 OkHttpClient
  • 创建请求对象 Request
  • newCall()方法返回一个Call对象
  • 调用enqueue()方法,并添加内部实现类Callback,实现回调的两个方法
  • 直接从RealCallenqueue()入手
  •   override fun enqueue(responseCallback: Callback) {
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        transmitter.callStart()
      //   关键代码1
    client.dispatcher.enqueue(AsyncCall(responseCallback))
    

    这里有两点要跟 enqueue()方法跟AsyncCall()类,先看下面AsyncCall()类的

    /////////////////////////////////////////////
    // 先分析一下AsyncCall
      internal inner class AsyncCall(
        private val responseCallback: Callback
      ) : Runnable {
        @Volatile private var callsPerHost = AtomicInteger(0)
        fun callsPerHost(): AtomicInteger = callsPerHost
        fun executeOn(executorService: ExecutorService) {
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
          } finally {
            if (!success) {
              client.dispatcher.finished(this) // This call is no longer running!
        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            transmitter.timeoutEnter()
            try {
      //  关键代码1.1
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
            } finally {
              client.dispatcher.finished(this)
    

    AsyncCall实现了Runnable接口,实现run()方法 线程池跑的时候就是跑run里面的逻辑。这里调用了getResponseWithInterceptorChain()这里上面有讲到是去进行真正网络请求的地方。然后将获取回来的结果通过传进来的responseCallback,回调回去。 client.dispatcher.enqueue(AsyncCall(responseCallback))AsyncCall()我们来讲讲enqueue()这个方法

    /////////////////////////////////////////////
    class Dispatcher constructor() {
    // ...省略其它代码
     private val readyAsyncCalls = ArrayDeque<AsyncCall>()
      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          readyAsyncCalls.add(call) // 关键代码1
     // 关键代码 2 
          if (!call.get().forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host())
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    // 关键代码3
        promoteAndExecute()
    //////////////////////////////////////////////////////////////////////////////////////
    //  关键代码3
      private fun promoteAndExecute(): Boolean {
        assert(!Thread.holdsLock(this))
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    //    关键代码4
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
            i.remove()
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          isRunning = runningCallsCount() > 0
    //    关键代码5
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
    //  关键代码 6
          asyncCall.executeOn(executorService)
        return isRunning
    

    关键代码1 调用Dispatcher类中的enqueue,把call放入readyAsyncCalls 这里补充一点关于这几个Call的数组对列分别记录什么:

      // 用于记录准备好要运行的异步Call
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
    //用于记录异步运行的Call,包括已经取消但是还没有结束的调用
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
      //用于记录同步运行的Call,包括已经取消但是还没有结束的调用
      private val runningSyncCalls = ArrayDeque<RealCall>()
    

    关键代码2 将其他的Call里面的callsPerHost的值传给它 (这里是为了后面做判断主机的而做的操作,主机数超过5时暂时不放到运行数组队列中)

    关键代码3 4 promoteAndExecute这个比较重点,从准备好的readyAsyncCalls轮训获取出每个AsyncCalls,如果RunningAsyncCalls的总数大于this.maxRequestsPerHost(64)则停止轮训,或者 asyncCall.callsPerHost().get()的值超过this.maxRequests(5)则忽略这次的逻辑。 其逻辑主要就是将准备态的ReadyAsyncCallsAsyncCall添加到runningAsyncCalls中,并从ReadyAsyncCalls删除。

    关键代码5 轮训使用线程池(下面有代码展示)运行readyAsyncCalls里面的符合目前逻辑要求的asyncCall进行运行。

    //线程池
      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
          return executorServiceOrNull!!
    

    跟着 走一下关键代码6的逻辑asyncCall.executeOn(executorService)

        fun executeOn(executorService: ExecutorService) {
          assert(!Thread.holdsLock(client.dispatcher))
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            transmitter.noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              client.dispatcher.finished(this) // This call is no longer running!
    

    这里调用了 executorService.execute(this)至此 便调用到了AsyncCallrun方法,这里可以看一下 上面源码的 // 先分析一下AsyncCall这块调用

    总结: OkHttp通过建造者模式创建OkHttpClientRequestResponse,在通过创建Call发起同步异步请求,Dispatcher作为中转点,做Call的请求队列的维护为主,使用线程池对Call的请求队列进行调度运行,最后通过各种拦截器(责任链模式) 最终请求网络返回数据信息

    分类:
    阅读
    标签: